Source code for jina.orchestrate.flow.base

import argparse
import base64
import copy
import itertools
import json
import multiprocessing
import os
import re
import sys
import threading
import time
import uuid
import warnings
from collections import OrderedDict
from contextlib import ExitStack
from typing import (
    TYPE_CHECKING,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    Union,
    overload,
)

from rich import print
from rich.table import Table

from jina import __default_host__, helper
from jina.clients import Client
from jina.clients.mixin import AsyncPostMixin, PostMixin
from jina.enums import (
    DeploymentRoleType,
    FlowBuildLevel,
    FlowInspectType,
    GatewayProtocolType,
)
from jina.excepts import (
    FlowMissingDeploymentError,
    FlowTopologyError,
    RuntimeFailToStart,
)
from jina.helper import (
    GRAPHQL_MIN_DOCARRAY_VERSION,
    ArgNamespace,
    CatchAllCleanupContextManager,
    colored_rich,
    docarray_graphql_compatible,
    download_mermaid_url,
    get_internal_ip,
    get_public_ip,
    get_rich_console,
    typename,
)
from jina.jaml import JAMLCompatible
from jina.logging.logger import JinaLogger
from jina.orchestrate.deployments import Deployment
from jina.orchestrate.flow.builder import _hanging_deployments, allowed_levels
from jina.parsers import (
    set_client_cli_parser,
    set_deployment_parser,
    set_gateway_parser,
)
from jina.parsers.flow import set_flow_parser

__all__ = ['Flow']


class FlowType(type(ExitStack), type(JAMLCompatible)):
    """Type of Flow, metaclass of :class:`BaseFlow`"""

    pass


_regex_port = r'(.*?):([0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$'

if TYPE_CHECKING:
    from jina.clients.base import BaseClient
    from jina.orchestrate.flow.asyncio import AsyncFlow
    from jina.serve.executors import BaseExecutor

GATEWAY_NAME = 'gateway'
FALLBACK_PARSERS = [
    set_gateway_parser(),
    set_deployment_parser(),
    set_client_cli_parser(),
    set_flow_parser(),
]


[docs]class Flow(PostMixin, JAMLCompatible, ExitStack, metaclass=FlowType): """Flow is how Jina streamlines and distributes Executors.""" # overload_inject_start_client_flow @overload def __init__( self, *, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', https: Optional[bool] = False, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, **kwargs, ): """Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina client` CLI. :param asyncio: If set, then the input and output of this Client work in an asynchronous manner. :param host: The host address of the runtime, by default it is 0.0.0.0. :param https: If set, connect to gateway using https :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param return_responses: If set, return results as List of Requests instead of a reduced DocArray. .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # overload_inject_end_client_flow # overload_inject_start_gateway_flow @overload def __init__( self, *, connection_list: Optional[str] = None, cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', description: Optional[str] = None, disable_reduce: Optional[bool] = False, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after_address: Optional[str] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs, ): """Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina gateway` CLI. :param connection_list: dictionary JSON with a list of connections to configure :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param default_swagger_ui: If set, the default swagger ui is used for `/docs` endpoint. :param deployments_addresses: dictionary JSON with the input addresses of each Deployment :param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI. :param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head :param env: The map of environment variables that are available inside runtime :param expose_endpoints: A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints. :param expose_graphql_endpoint: If set, /graphql endpoint is added to HTTP interface. :param graph_conditions: Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents. :param graph_description: Routing graph for the gateway :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. :param name: The name of this object. This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - ... When not given, then the default naming strategy will apply. :param native: If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. :param no_crud_endpoints: If set, /index, /search, /update, /delete endpoints are removed from HTTP interface. Any executor that has `@requests(on=...)` bind with those values will receive data requests. :param no_debug_endpoints: If set, /status /post endpoints are removed from HTTP interface. :param output_array_type: The type of array `tensor` and `embedding` will be serialized to. Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found `here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`. Defaults to retaining whatever type is returned by the Executor. :param polling: The polling strategy of the Deployment and its endpoints (when `shards>1`). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'} :param port: The port for input data to bind to, default is a random port between [49152, 65535] :param prefetch: Number of requests fetched from the client before feeding into the first Executor. Used to control the speed of data input into a Flow. 0 disables prefetch (disabled by default) :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param py_modules: The customized python modules need to be imported before loading the executor Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files, which should be structured as a python package. For more details, please see the `Executor cookbook <https://docs.jina.ai/fundamentals/executor/repository-structure/>`__ :param quiet: If set, then no log will be emitted from this object. :param quiet_error: If set, then exception stack information will not be added to the log :param replicas: The number of replicas in the deployment :param runtime_backend: The parallel backend of the runtime inside the Pod :param runtime_cls: The runtime class to run inside the Pod :param shards: The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies :param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param title: The title of this HTTP server. It will be used in automatics docs such as Swagger UI. :param uses: The config of the executor, it could be one of the followings: * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with `jinahub://` or `jinahub+docker://`) * a docker image (must start with `docker://`) * the string literal of a YAML config (must start with `!` or `jtype: `) * the string literal of a JSON config When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has `.read()` interface :param uses_after_address: The address of the uses-before runtime :param uses_before_address: The address of the uses-before runtime :param uses_metas: Dictionary of keyword arguments that will override the `metas` configuration in `uses` :param uses_requests: Dictionary of keyword arguments that will override the `requests` configuration in `uses` :param uses_with: Dictionary of keyword arguments that will override the `with` configuration in `uses` :param uvicorn_kwargs: Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/ :param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`. .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # overload_inject_end_gateway_flow # overload_inject_start_flow @overload def __init__( self, *, env: Optional[dict] = None, expose_graphql_endpoint: Optional[bool] = False, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, polling: Optional[str] = 'ANY', quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, timeout_ctrl: Optional[int] = 60, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs, ): """Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina flow` CLI. :param env: The map of environment variables that are available inside runtime :param expose_graphql_endpoint: If set, /graphql endpoint is added to HTTP interface. :param inspect: The strategy on those inspect deployments in the flow. If `REMOVE` is given then all inspect deployments are removed when building the flow. :param log_config: The YAML config of the logger used in this object. :param name: The name of this object. This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - ... When not given, then the default naming strategy will apply. :param polling: The polling strategy of the Deployment and its endpoints (when `shards>1`). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'} :param quiet: If set, then no log will be emitted from this object. :param quiet_error: If set, then exception stack information will not be added to the log :param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever :param uses: The YAML file represents a flow :param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`. .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # overload_inject_end_flow def __init__( self, args: Optional['argparse.Namespace'] = None, **kwargs, ): super().__init__() self._version = '1' #: YAML version number, this will be later overridden if YAML config says the other way self._deployment_nodes = OrderedDict() # type: Dict[str, Deployment] self._inspect_deployments = {} # type: Dict[str, str] self._endpoints_mapping = {} # type: Dict[str, Dict] self._build_level = FlowBuildLevel.EMPTY self._last_changed_deployment = [ GATEWAY_NAME ] #: default first deployment is gateway, will add when build() self._update_args(args, **kwargs) if ( self.protocol == GatewayProtocolType.HTTP and self.args.expose_graphql_endpoint and not docarray_graphql_compatible() ): self.args.expose_graphql_endpoint = False warnings.warn( 'DocArray version is incompatible with GraphQL features. ' 'Automatically setting expose_graphql_endpoint=False. ' f'To use GraphQL features, install docarray>={GRAPHQL_MIN_DOCARRAY_VERSION}' ) if isinstance(self.args, argparse.Namespace): self.logger = JinaLogger( self.__class__.__name__, **vars(self.args), **self._common_kwargs ) else: self.logger = JinaLogger(self.__class__.__name__, **self._common_kwargs) def _update_args(self, args, **kwargs): from jina.helper import ArgNamespace from jina.parsers.flow import set_flow_parser _flow_parser = set_flow_parser() if args is None: args = ArgNamespace.kwargs2namespace( kwargs, _flow_parser, True, fallback_parsers=FALLBACK_PARSERS ) self.args = args # common args should be the ones that can not be parsed by _flow_parser known_keys = vars(args) self._common_kwargs = {k: v for k, v in kwargs.items() if k not in known_keys} self._kwargs = ArgNamespace.get_non_defaults_args( args, _flow_parser ) #: for yaml dump if self._common_kwargs.get('asyncio', False) and not isinstance( self, AsyncPostMixin ): from jina.orchestrate.flow.asyncio import AsyncFlow self.__class__ = AsyncFlow @staticmethod def _parse_endpoints( op_flow, deployment_name, endpoint, connect_to_last_deployment=False ) -> Set: # parsing needs if isinstance(endpoint, str): endpoint = [endpoint] elif not endpoint: if op_flow._last_changed_deployment and connect_to_last_deployment: endpoint = [op_flow.last_deployment] else: endpoint = [] if isinstance(endpoint, (list, tuple)): for idx, s in enumerate(endpoint): if s == deployment_name: raise FlowTopologyError( 'the income/output of a deployment can not be itself' ) else: raise ValueError(f'endpoint={endpoint} is not parsable') # if an endpoint is being inspected, then replace it with inspected Deployment endpoint = set(op_flow._inspect_deployments.get(ep, ep) for ep in endpoint) return endpoint @property def last_deployment(self): """Last deployment .. # noqa: DAR401 .. # noqa: DAR201 """ return self._last_changed_deployment[-1] @last_deployment.setter def last_deployment(self, name: str): """ Set a Deployment as the last Deployment in the Flow, useful when modifying the Flow. .. # noqa: DAR401 :param name: the name of the existing Deployment """ if name not in self._deployment_nodes: raise FlowMissingDeploymentError(f'{name} can not be found in this Flow') if self._last_changed_deployment and name == self.last_deployment: pass else: self._last_changed_deployment.append(name) # graph is now changed so we need to # reset the build level to the lowest self._build_level = FlowBuildLevel.EMPTY @allowed_levels([FlowBuildLevel.EMPTY]) def _add_gateway( self, needs: str, graph_description: Dict[str, List[str]], deployments_addresses: Dict[str, List[str]], graph_conditions: Dict[str, Dict], **kwargs, ): kwargs.update( dict( name=GATEWAY_NAME, ctrl_with_ipc=True, # otherwise ctrl port would be conflicted host=self.host, protocol=self.protocol, port=self.port, deployment_role=DeploymentRoleType.GATEWAY, expose_endpoints=json.dumps(self._endpoints_mapping), ) ) kwargs.update(self._common_kwargs) args = ArgNamespace.kwargs2namespace(kwargs, set_gateway_parser()) args.noblock_on_start = True args.expose_graphql_endpoint = ( self.args.expose_graphql_endpoint ) # also used in Flow, thus not in kwargs args.graph_description = json.dumps(graph_description) args.graph_conditions = json.dumps(graph_conditions) args.deployments_addresses = json.dumps(deployments_addresses) self._deployment_nodes[GATEWAY_NAME] = Deployment(args, needs) def _get_deployments_addresses(self) -> Dict[str, List[str]]: graph_dict = {} for node, v in self._deployment_nodes.items(): if node == 'gateway': continue graph_dict[node] = [f'{v.protocol}://{v.host}:{v.head_port}'] return graph_dict def _get_k8s_deployments_addresses( self, k8s_namespace: str, k8s_connection_pool: bool ) -> Dict[str, List[str]]: graph_dict = {} from jina.orchestrate.deployments.config.helper import to_compatible_name from jina.serve.networking import K8sGrpcConnectionPool for node, v in self._deployment_nodes.items(): if node == 'gateway': continue if v.external: deployment_k8s_address = f'{v.host}' else: deployment_k8s_address = ( f'{to_compatible_name(v.head_args.name)}.{k8s_namespace}.svc' ) # we only need hard coded addresses if the k8s connection pool is disabled or if this deployment is external if not k8s_connection_pool or v.external: graph_dict[node] = [ f'{deployment_k8s_address}:{v.head_port if v.external else K8sGrpcConnectionPool.K8S_PORT}' ] return graph_dict if graph_dict else None def _get_docker_compose_deployments_addresses(self) -> Dict[str, List[str]]: graph_dict = {} from jina.orchestrate.deployments.config.docker_compose import port from jina.orchestrate.deployments.config.helper import to_compatible_name for node, v in self._deployment_nodes.items(): if node == 'gateway': continue deployment_docker_compose_address = ( f'{to_compatible_name(v.head_args.name)}:{port}' ) graph_dict[node] = [deployment_docker_compose_address] return graph_dict def _get_graph_conditions(self) -> Dict[str, Dict]: graph_condition = {} for node, v in self._deployment_nodes.items(): if v.args.input_condition is not None: graph_condition[node] = v.args.input_condition return graph_condition def _get_graph_representation(self) -> Dict[str, List[str]]: def _add_node(graph, n): # in the graph we need to distinguish between start and end gateway, although they are the same deployment if n == 'gateway': n = 'start-gateway' if n not in graph: graph[n] = [] return n graph_dict = {} for node, v in self._deployment_nodes.items(): node = _add_node(graph_dict, node) if node == 'start-gateway': continue for need in sorted(v.needs): need = _add_node(graph_dict, need) graph_dict[need].append(node) # find all non hanging leafs last_deployment = self.last_deployment if last_deployment != 'gateway': graph_dict[last_deployment].append('end-gateway') return graph_dict
[docs] @allowed_levels([FlowBuildLevel.EMPTY]) def needs( self, needs: Union[Tuple[str], List[str]], name: str = 'joiner', *args, **kwargs ) -> 'Flow': """ Add a blocker to the Flow, wait until all pods defined in **needs** completed. .. # noqa: DAR401 :param needs: list of service names to wait :param name: the name of this joiner, by default is ``joiner`` :param args: additional positional arguments forwarded to the add function :param kwargs: additional key value arguments forwarded to the add function :return: the modified Flow """ if len(needs) <= 1: raise FlowTopologyError( 'no need to wait for a single service, need len(needs) > 1' ) return self.add( name=name, needs=needs, deployment_role=DeploymentRoleType.JOIN, *args, **kwargs, )
[docs] def needs_all(self, name: str = 'joiner', *args, **kwargs) -> 'Flow': """ Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed. :param name: the name of this joiner (default is ``joiner``) :param args: additional positional arguments which are forwarded to the add and needs function :param kwargs: additional key value arguments which are forwarded to the add and needs function :return: the modified Flow """ needs = _hanging_deployments(self) if len(needs) == 1: return self.add(name=name, needs=needs, *args, **kwargs) return self.needs(name=name, needs=needs, *args, **kwargs)
# overload_inject_start_deployment @overload def add( self, *, connection_list: Optional[str] = None, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, external: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', input_condition: Optional[dict] = None, install_requirements: Optional[bool] = False, log_config: Optional[str] = None, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_jinad: Optional[int] = 8000, pull_latest: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, workspace: Optional[str] = None, **kwargs, ) -> Union['Flow', 'AsyncFlow']: """Add an Executor to the current Flow object. :param connection_list: dictionary JSON with a list of connections to configure :param disable_reduce: Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head :param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ' container. More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/ :param entrypoint: The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective. :param env: The map of environment variables that are available inside runtime :param external: The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow. :param force_update: If set, always pull the latest Hub Executor bundle even it exists on local :param gpus: This argument allows dockerized Jina executor discover local gpu devices. Note, - To access all gpus, use `--gpus all`. - To access multiple gpus, e.g. make use of 2 gpus, use `--gpus 2`. - To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]` - To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]` - To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display :param host: The host address of the runtime, by default it is 0.0.0.0. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param input_condition: The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a `DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>` :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local :param log_config: The YAML config of the logger used in this object. :param name: The name of this object. This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - ... When not given, then the default naming strategy will apply. :param native: If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime. :param output_array_type: The type of array `tensor` and `embedding` will be serialized to. Supports the same types as `docarray.to_protobuf(.., ndarray_type=...)`, which can be found `here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>`. Defaults to retaining whatever type is returned by the Executor. :param polling: The polling strategy of the Deployment and its endpoints (when `shards>1`). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'} :param port: The port for input data to bind to, default is a random port between [49152, 65535] :param port_jinad: The port of the remote machine for usage with JinaD. :param pull_latest: Pull the latest image before running :param py_modules: The customized python modules need to be imported before loading the executor Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files, which should be structured as a python package. For more details, please see the `Executor cookbook <https://docs.jina.ai/fundamentals/executor/repository-structure/>`__ :param quiet: If set, then no log will be emitted from this object. :param quiet_error: If set, then exception stack information will not be added to the log :param quiet_remote_logs: Do not display the streaming of remote logs on local console :param replicas: The number of replicas in the deployment :param runtime_backend: The parallel backend of the runtime inside the Pod :param runtime_cls: The runtime class to run inside the Pod :param shards: The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies :param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param upload_files: The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. Python files, data files. Note, - currently only flatten structure is supported, which means if you upload `[./foo/a.py, ./foo/b.pp, ./bar/c.yml]`, then they will be put under the _same_ workspace on the remote, losing all hierarchies. - by default, `--uses` YAML file is always uploaded. - uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use `--workspace-id` to specify the workspace. :param uses: The config of the executor, it could be one of the followings: * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with `jinahub://` or `jinahub+docker://`) * a docker image (must start with `docker://`) * the string literal of a YAML config (must start with `!` or `jtype: `) * the string literal of a JSON config When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has `.read()` interface :param uses_after: The executor attached after the Pods described by --uses, typically used for receiving from all shards, accepted type follows `--uses` :param uses_after_address: The address of the uses-before runtime :param uses_before: The executor attached after the Pods described by --uses, typically before sending to all shards, accepted type follows `--uses` :param uses_before_address: The address of the uses-before runtime :param uses_metas: Dictionary of keyword arguments that will override the `metas` configuration in `uses` :param uses_requests: Dictionary of keyword arguments that will override the `requests` configuration in `uses` :param uses_with: Dictionary of keyword arguments that will override the `with` configuration in `uses` :param volumes: The path on the host to be mounted inside the container. Note, - If separated by `:`, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container's root path, e.g. `--volumes="/user/test/my-workspace"` will be mounted into `/my-workspace` inside the container. - All volumes are mounted with read-write mode. :param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`. :return: a (new) Flow object with modification .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # overload_inject_end_deployment
[docs] @allowed_levels([FlowBuildLevel.EMPTY]) def add( self, *, needs: Optional[Union[str, Tuple[str], List[str]]] = None, copy_flow: bool = True, deployment_role: 'DeploymentRoleType' = DeploymentRoleType.DEPLOYMENT, **kwargs, ) -> 'Flow': """ Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with :py:meth:`set` or deleted with :py:meth:`remove` .. # noqa: DAR401 :param needs: the name of the Deployment(s) that this Deployment receives data from. One can also use 'gateway' to indicate the connection with the gateway. :param deployment_role: the role of the Deployment, used for visualization and route planning :param copy_flow: when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification :param kwargs: other keyword-value arguments that the Deployment CLI supports :return: a (new) Flow object with modification """ op_flow = copy.deepcopy(self) if copy_flow else self # deployment naming logic deployment_name = kwargs.get('name', None) if deployment_name in op_flow._deployment_nodes: new_name = f'{deployment_name}{len(op_flow._deployment_nodes)}' self.logger.debug( f'"{deployment_name}" is used in this Flow already! renamed it to "{new_name}"' ) deployment_name = new_name if not deployment_name: deployment_name = f'executor{len(op_flow._deployment_nodes)}' if not deployment_name.isidentifier(): # hyphen - can not be used in the name raise ValueError( f'name: {deployment_name} is invalid, please follow the python variable name conventions' ) # needs logic needs = op_flow._parse_endpoints( op_flow, deployment_name, needs, connect_to_last_deployment=True ) # set the kwargs inherit from `Flow(kwargs1=..., kwargs2=)` for key, value in op_flow._common_kwargs.items(): # do not inherit the port argument from the flow if key not in kwargs and key != 'port': kwargs[key] = value # check if host is set to remote:port if 'host' in kwargs: m = re.match(_regex_port, kwargs['host']) if ( kwargs.get('host', __default_host__) != __default_host__ and m and 'port_jinad' not in kwargs ): kwargs['port_jinad'] = m.group(2) kwargs['host'] = m.group(1) # update kwargs of this Deployment kwargs.update( dict( name=deployment_name, deployment_role=deployment_role, ) ) parser = set_deployment_parser() if deployment_role == DeploymentRoleType.GATEWAY: parser = set_gateway_parser() args = ArgNamespace.kwargs2namespace( kwargs, parser, True, fallback_parsers=FALLBACK_PARSERS ) # deployment workspace if not set then derive from flow workspace if args.workspace: args.workspace = os.path.abspath(args.workspace) else: args.workspace = self.workspace args.noblock_on_start = True args.extra_search_paths = self.args.extra_search_paths port = kwargs.get('port', None) if not port: port = helper.random_port() args.port = port op_flow._deployment_nodes[deployment_name] = Deployment(args, needs) op_flow.last_deployment = deployment_name return op_flow
[docs] @allowed_levels([FlowBuildLevel.EMPTY]) def inspect(self, name: str = 'inspect', *args, **kwargs) -> 'Flow': """Add an inspection on the last changed Deployment in the Flow Internally, it adds two Deployments to the Flow. But don't worry, the overhead is minimized and you can remove them by simply using `Flow(inspect=FlowInspectType.REMOVE)` before using the Flow. .. highlight:: bash .. code-block:: bash Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging) In this way, :class:`InspectDeployment` looks like a simple ``_pass`` from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved. This function is very handy for introducing an Evaluator into the Flow. .. seealso:: :meth:`gather_inspect` :param name: name of the Deployment :param args: args for .add() :param kwargs: kwargs for .add() :return: the new instance of the Flow """ _last_deployment = self.last_deployment op_flow = self.add( name=name, needs=_last_deployment, deployment_role=DeploymentRoleType.INSPECT, *args, **kwargs, ) # now remove uses and add an auxiliary Deployment if 'uses' in kwargs: kwargs.pop('uses') op_flow = op_flow.add( name=f'_aux_{name}', needs=_last_deployment, deployment_role=DeploymentRoleType.INSPECT_AUX_PASS, *args, **kwargs, ) # register any future connection to _last_deployment by the auxiliary Deployment op_flow._inspect_deployments[_last_deployment] = op_flow.last_deployment return op_flow
[docs] @allowed_levels([FlowBuildLevel.EMPTY]) def gather_inspect( self, name: str = 'gather_inspect', include_last_deployment: bool = True, *args, **kwargs, ) -> 'Flow': """Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned. .. note:: If ``--no-inspect`` is **not** given, then :meth:`gather_inspect` is auto called before :meth:`build`. So in general you don't need to manually call :meth:`gather_inspect`. :param name: the name of the gather Deployment :param include_last_deployment: if to include the last modified Deployment in the Flow :param args: args for .add() :param kwargs: kwargs for .add() :return: the modified Flow or the copy of it .. seealso:: :meth:`inspect` """ needs = [ k for k, v in self._deployment_nodes.items() if v.role == DeploymentRoleType.INSPECT ] if needs: if include_last_deployment: needs.append(self.last_deployment) return self.add( name=name, needs=needs, deployment_role=DeploymentRoleType.JOIN_INSPECT, *args, **kwargs, ) else: # no inspect node is in the graph, return the current graph return self
def _get_gateway_target(self, prefix): gateway_deployment = self._deployment_nodes[GATEWAY_NAME] return ( f'{prefix}-{GATEWAY_NAME}', { 'host': gateway_deployment.head_host, 'port': gateway_deployment.head_port, 'expected_parts': 0, }, )
[docs] @allowed_levels([FlowBuildLevel.EMPTY]) def build(self, copy_flow: bool = False) -> 'Flow': """ Build the current Flow and make it ready to use .. note:: No need to manually call it since 0.0.8. When using Flow with the context manager, or using :meth:`start`, :meth:`build` will be invoked. :param copy_flow: when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification :return: the current Flow (by default) .. note:: ``copy_flow=True`` is recommended if you are building the same Flow multiple times in a row. e.g. .. highlight:: python .. code-block:: python f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search() .. # noqa: DAR401 """ op_flow = copy.deepcopy(self) if copy_flow else self if op_flow.args.inspect == FlowInspectType.COLLECT: op_flow.gather_inspect(copy_flow=False) if GATEWAY_NAME not in op_flow._deployment_nodes: op_flow._add_gateway( needs={op_flow.last_deployment}, graph_description=op_flow._get_graph_representation(), deployments_addresses=op_flow._get_deployments_addresses(), graph_conditions=op_flow._get_graph_conditions(), ) removed_deployments = [] # if set no_inspect then all inspect related nodes are removed if op_flow.args.inspect == FlowInspectType.REMOVE: filtered_deployment_nodes = OrderedDict() for k, v in op_flow._deployment_nodes.items(): if not v.role.is_inspect: filtered_deployment_nodes[k] = v else: removed_deployments.append(v.name) op_flow._deployment_nodes = filtered_deployment_nodes reverse_inspect_map = { v: k for k, v in op_flow._inspect_deployments.items() } while ( len(op_flow._last_changed_deployment) > 0 and len(removed_deployments) > 0 and op_flow.last_deployment in removed_deployments ): op_flow._last_changed_deployment.pop() for end, deployment in op_flow._deployment_nodes.items(): # if an endpoint is being inspected, then replace it with inspected Deployment # but not those inspect related node if op_flow.args.inspect.is_keep: deployment.needs = set( ep if deployment.role.is_inspect else op_flow._inspect_deployments.get(ep, ep) for ep in deployment.needs ) else: deployment.needs = set( reverse_inspect_map.get(ep, ep) for ep in deployment.needs ) hanging_deployments = _hanging_deployments(op_flow) if hanging_deployments: op_flow.logger.warning( f'{hanging_deployments} are hanging in this flow with no deployment receiving from them, ' f'you may want to double check if it is intentional or some mistake' ) op_flow._build_level = FlowBuildLevel.GRAPH if len(removed_deployments) > 0: # very dirty op_flow._deployment_nodes[GATEWAY_NAME].args.graph_description = json.dumps( op_flow._get_graph_representation() ) op_flow._deployment_nodes[ GATEWAY_NAME ].args.deployments_addresses = json.dumps( op_flow._get_deployments_addresses() ) op_flow._deployment_nodes[GATEWAY_NAME].update_pod_args() return op_flow
def __call__(self, *args, **kwargs): """Builds the Flow :param args: args for build :param kwargs: kwargs for build :return: the built Flow """ return self.build(*args, **kwargs) def __enter__(self): with CatchAllCleanupContextManager(self): return self.start() def __exit__(self, exc_type, exc_val, exc_tb): if hasattr(self, '_stop_event'): self._stop_event.set() super().__exit__(exc_type, exc_val, exc_tb) # unset all envs to avoid any side-effect if self.args.env: for k in self.args.env.keys(): os.environ.pop(k, None) # do not know why but removing these 2 lines make 2 tests fail if GATEWAY_NAME in self._deployment_nodes: self._deployment_nodes.pop(GATEWAY_NAME) self._build_level = FlowBuildLevel.EMPTY self.logger.debug('Flow is closed!') self.logger.close()
[docs] def start(self): """Start to run all Deployments in this Flow. Remember to close the Flow with :meth:`close`. Note that this method has a timeout of ``timeout_ready`` set in CLI, which is inherited all the way from :class:`jina.orchestrate.pods.Pod` .. # noqa: DAR401 :return: this instance """ if self._build_level.value < FlowBuildLevel.GRAPH.value: self.build(copy_flow=False) # set env only before the Deployment get started if self.args.env: for k, v in self.args.env.items(): os.environ[k] = str(v) for k, v in self: if not v.external: self.enter_context(v) self._wait_until_all_ready() self._build_level = FlowBuildLevel.RUNNING return self
def _wait_until_all_ready(self): results = {} threads = [] def _wait_ready(_deployment_name, _deployment): try: if not _deployment.external: results[_deployment_name] = 'pending' _deployment.wait_start_success() results[_deployment_name] = 'done' except Exception as ex: results[_deployment_name] = repr(ex) def _polling_status(status): while True: num_all = len(results) num_done = 0 pendings = [] for _k, _v in results.items(): sys.stdout.flush() if _v == 'pending': pendings.append(_k) else: num_done += 1 pending_str = colored_rich(' '.join(pendings)[:50], 'yellow') status.update( f'{num_done}/{num_all} waiting {pending_str} to be ready...' ) if not pendings: break time.sleep(0.1) # kick off all deployments wait-ready threads for k, v in self: t = threading.Thread( target=_wait_ready, args=( k, v, ), daemon=True, ) threads.append(t) t.start() console = get_rich_console() with console.status('Working...') as status: # kick off spinner thread t_m = threading.Thread(target=_polling_status, args=[status], daemon=True) t_m.start() # kick off ip getter thread addr_table = self._init_table() t_ip = threading.Thread( target=self._get_address_table, args=(addr_table,), daemon=True ) t_ip.start() for t in threads: t.join() if t_ip is not None: t_ip.join() t_m.join() error_deployments = [k for k, v in results.items() if v != 'done'] if error_deployments: self.logger.error( f'Flow is aborted due to {error_deployments} can not be started.' ) self.close() raise RuntimeFailToStart success_msg = '[green]🎉 Flow is ready to use![/green]' console.print(success_msg) if addr_table: print( addr_table ) # can't use logger here see : https://github.com/Textualize/rich/discussions/2024 self.logger.debug( f'{self.num_deployments} Deployments (i.e. {self.num_pods} Pods) are running in this Flow' ) @property def num_deployments(self) -> int: """Get the number of Deployments in this Flow .. # noqa: DAR201""" return len(self._deployment_nodes) @property def num_pods(self) -> int: """Get the number of pods (shards count) in this Flow .. # noqa: DAR201""" return sum(v.num_pods for v in self._deployment_nodes.values()) def __eq__(self, other: 'Flow') -> bool: """ Compare the topology of a Flow with another Flow. Identification is defined by whether two flows share the same set of edges. :param other: the second Flow object :return: result of equality check """ if self._build_level.value < FlowBuildLevel.GRAPH.value: op_flow = copy.deepcopy(self) a = op_flow.build() else: a = self if other._build_level.value < FlowBuildLevel.GRAPH.value: op_flow_b = copy.deepcopy(other) b = op_flow_b.build() else: b = other return a._deployment_nodes == b._deployment_nodes @property def client(self) -> 'BaseClient': """Return a :class:`BaseClient` object attach to this Flow. .. # noqa: DAR201""" kwargs = dict( host=self.host, port=self.port, protocol=self.protocol, ) kwargs.update(self._common_kwargs) return Client(**kwargs) @property def _mermaid_str(self): mermaid_graph = [ ''' %%{init:{ "theme": "base", "themeVariables": { "primaryColor": "#fff", "primaryBorderColor": "#fff", "mainBkg": "#32C8CD", "clusterBkg": "#EEEDE78C", "secondaryBorderColor": "none", "tertiaryBorderColor": "none", "lineColor": "#a6d8da" } }}%% '''.replace( '\n', '' ), 'flowchart LR;', ] deployment_nodes = [] # plot subgraphs for node, v in self._deployment_nodes.items(): deployment_nodes.append(v.name) mermaid_graph.extend(v._mermaid_str) for node, v in self._deployment_nodes.items(): for need in sorted(v.needs): need_print = need if need == 'gateway': need_print = 'gatewaystart[gateway]' node_print = node if node == 'gateway': node_print = 'gatewayend[gateway]' _s_role = self._deployment_nodes[need].role _e_role = self._deployment_nodes[node].role if self._deployment_nodes[need].external: _s_role = 'EXTERNAL' if self._deployment_nodes[node].external: _e_role = 'EXTERNAL' line_st = '-->' if ( _s_role == DeploymentRoleType.INSPECT or _e_role == DeploymentRoleType.INSPECT ): line_st = '-.->' mermaid_graph.append( f'{need_print}:::{str(_s_role)} {line_st} {node_print}:::{str(_e_role)};' ) mermaid_graph.append( f'classDef {str(DeploymentRoleType.INSPECT)} stroke:#F29C9F' ) mermaid_graph.append( f'classDef {str(DeploymentRoleType.JOIN_INSPECT)} stroke:#F29C9F' ) mermaid_graph.append( f'classDef {str(DeploymentRoleType.GATEWAY)} fill:none,color:#000,stroke:none' ) mermaid_graph.append( f'classDef {str(DeploymentRoleType.INSPECT_AUX_PASS)} stroke-dasharray: 2 2' ) mermaid_graph.append(f'classDef HEADTAIL fill:#32C8CD1D') mermaid_graph.append(f'\nclassDef EXTERNAL fill:#fff,stroke:#32C8CD') return '\n'.join(mermaid_graph)
[docs] def plot( self, output: Optional[str] = None, vertical_layout: bool = False, inline_display: bool = False, build: bool = True, copy_flow: bool = True, ) -> 'Flow': """ Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created. Example, .. highlight:: python .. code-block:: python flow = Flow().add(name='deployment_a').plot('flow.svg') :param output: a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image :param vertical_layout: top-down or left-right layout :param inline_display: show image directly inside the Jupyter Notebook :param build: build the Flow first before plotting, gateway connection can be better showed :param copy_flow: when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification :return: the Flow """ # deepcopy causes the below error while reusing a Flow in Jupyter # 'Pickling an AuthenticationString object is disallowed for security reasons' op_flow = copy.deepcopy(self) if copy_flow else self if build: op_flow.build(False) mermaid_str = op_flow._mermaid_str if vertical_layout: mermaid_str = mermaid_str.replace('flowchart LR', 'flowchart TD') image_type = 'svg' if output and not output.endswith('svg'): image_type = 'img' url = op_flow._mermaid_to_url(mermaid_str, image_type) showed = False if inline_display: try: from IPython.display import Image, display display(Image(url=url)) showed = True except: # no need to panic users pass if output: download_mermaid_url(url, output) elif not showed: op_flow.logger.info(f'flow visualization: {url}') return self
def _ipython_display_(self): """Displays the object in IPython as a side effect""" self.plot( inline_display=True, build=(self._build_level != FlowBuildLevel.GRAPH) ) def _mermaid_to_url(self, mermaid_str: str, img_type: str) -> str: """ Render the current Flow as URL points to a SVG. It needs internet connection :param mermaid_str: the mermaid representation :param img_type: image type (svg/jpg) :return: the url points to a SVG """ encoded_str = base64.b64encode(bytes(mermaid_str, 'utf-8')).decode('utf-8') return f'https://mermaid.ink/{img_type}/{encoded_str}' @property def port(self) -> int: """Return the exposed port of the gateway .. # noqa: DAR201 """ if GATEWAY_NAME in self._deployment_nodes: return self._deployment_nodes[GATEWAY_NAME].port else: return self._common_kwargs.get('port', None) @port.setter def port(self, value: int): """Set the new exposed port of the Flow (affects Gateway and Client) :param value: the new port to expose """ self._common_kwargs['port'] = value # Flow is build to graph already if self._build_level >= FlowBuildLevel.GRAPH: self[GATEWAY_NAME].args.port = self._common_kwargs['port'] # Flow is running already, then close the existing gateway if self._build_level >= FlowBuildLevel.RUNNING: self[GATEWAY_NAME].close() self.enter_context(self[GATEWAY_NAME]) self[GATEWAY_NAME].wait_start_success() @property def host(self) -> str: """Return the local address of the gateway .. # noqa: DAR201 """ if GATEWAY_NAME in self._deployment_nodes: return self._deployment_nodes[GATEWAY_NAME].host else: return self._common_kwargs.get('host', __default_host__) @host.setter def host(self, value: str): """Set the new host of the Flow (affects Gateway and Client) :param value: the new port to expose """ self._common_kwargs['host'] = value # Flow is build to graph already if self._build_level >= FlowBuildLevel.GRAPH: self[GATEWAY_NAME].args.host = self._common_kwargs['host'] # Flow is running already, then close the existing gateway if self._build_level >= FlowBuildLevel.RUNNING: self[GATEWAY_NAME].close() self.enter_context(self[GATEWAY_NAME]) self[GATEWAY_NAME].wait_start_success() @property def address_private(self) -> str: """Return the private IP address of the gateway for connecting from other machine in the same network .. # noqa: DAR201""" return get_internal_ip() @property def address_public(self) -> str: """Return the public IP address of the gateway for connecting from other machine in the public network .. # noqa: DAR201""" return get_public_ip() def __iter__(self): return self._deployment_nodes.items().__iter__() def _init_table(self): table = Table(title=None, box=None, highlight=True) table.add_column('', justify='right') table.add_column('', justify='right') table.add_column('', justify='right') table.add_column('', justify='right') return table def _get_address_table(self, address_table): address_table.add_row('🔗', 'Protocol: ', f'{self.protocol}') address_table.add_row( '🏠', 'Local access: ', f'[underline]{self.host}:{self.port}[/underline]', ) address_table.add_row( '🔒', 'Private network: ', f'[underline]{self.address_private}:{self.port}[/underline]', ) if self.address_public: address_table.add_row( '🌐', 'Public address: ', f'[underline]{self.address_public}:{self.port}[/underline]', ) if self.protocol == GatewayProtocolType.HTTP: address_table.add_row( '💬', 'Swagger UI: ', f'[underline]http://localhost:{self.port}/docs[/underline]', ) address_table.add_row( '📚', 'Redoc: ', f'[underline]http://localhost:{self.port}/redoc[/underline]', ) if self.args.expose_graphql_endpoint: address_table.add_row( '💬', 'GraphQL UI: ', f'[underline][cyan]http://localhost:{self.port}/graphql[/underline][/cyan]', ) return address_table
[docs] def block( self, stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None ): """Block the Flow until `stop_event` is set or user hits KeyboardInterrupt :param stop_event: a threading event or a multiprocessing event that onces set will resume the control Flow to main thread. """ try: if stop_event is None: self._stop_event = ( threading.Event() ) #: this allows `.close` to close the Flow from another thread/proc self._stop_event.wait() else: stop_event.wait() except KeyboardInterrupt: pass
@property def protocol(self) -> GatewayProtocolType: """Return the protocol of this Flow :return: the protocol of this Flow """ v = self._common_kwargs.get('protocol', GatewayProtocolType.GRPC) if isinstance(v, str): v = GatewayProtocolType.from_string(v) return v @protocol.setter def protocol(self, value: Union[str, GatewayProtocolType]): """Set the protocol of this Flow, can only be set before the Flow has been started :param value: the protocol to set """ # Flow is running already, protocol cant be changed anymore if self._build_level >= FlowBuildLevel.RUNNING: raise RuntimeError('Protocol can not be changed after the Flow has started') if isinstance(value, str): self._common_kwargs['protocol'] = GatewayProtocolType.from_string(value) elif isinstance(value, GatewayProtocolType): self._common_kwargs['protocol'] = value else: raise TypeError(f'{value} must be either `str` or `GatewayProtocolType`') # Flow is build to graph already if self._build_level >= FlowBuildLevel.GRAPH: self[GATEWAY_NAME].args.protocol = self._common_kwargs['protocol'] def __getitem__(self, item): if isinstance(item, str): return self._deployment_nodes[item] elif isinstance(item, int): return list(self._deployment_nodes.values())[item] else: raise TypeError(f'{typename(item)} is not supported') @property def workspace(self) -> str: """Return the workspace path of the flow. .. # noqa: DAR201""" if self.args.workspace is not None: return os.path.abspath(self.args.workspace) else: return None @workspace.setter def workspace(self, value: str): """set workspace dir for flow & all deployments :param value: workspace to be set """ self.args.workspace = value for k, p in self: p.args.workspace = value p.update_pod_args() @property def workspace_id(self) -> Dict[str, str]: """Get all Deployments' ``workspace_id`` values in a dict .. # noqa: DAR201""" return { k: p.args.workspace_id for k, p in self if hasattr(p.args, 'workspace_id') } @workspace_id.setter def workspace_id(self, value: str): """Set all Deployments' ``workspace_id`` to ``value`` :param value: a hexadecimal UUID string """ uuid.UUID(value) for k, p in self: if hasattr(p.args, 'workspace_id'): p.args.workspace_id = value args = getattr(p, 'pod_args', getattr(p, 'shards_args', None)) if args is None: raise ValueError( f'could not find "pod_args" or "shards_args" on {p}' ) values = None if isinstance(args, dict): values = args.values() elif isinstance(args, list): values = args for v in values: if v and isinstance(v, argparse.Namespace): v.workspace_id = value if v and isinstance(v, List): for i in v: i.workspace_id = value @property def env(self) -> Optional[Dict]: """Get all envs to be set in the Flow :return: envs as dict """ return self.args.env @env.setter def env(self, value: Dict[str, str]): """set env vars for flow & all deployments. This can be used by jinad to set envs for Flow and all child objects :param value: value to be set """ self.args.env = value for k, v in self: v.args.env = value @overload def expose_endpoint(self, exec_endpoint: str, path: Optional[str] = None): """Expose an Executor's endpoint (defined by `@requests(on=...)`) to HTTP endpoint for easier access. After expose, you can send data request directly to `http://hostname:port/endpoint`. :param exec_endpoint: the endpoint string, by convention starts with `/` :param path: the HTTP endpoint string, when not given, it is `exec_endpoint` """ ... @overload def expose_endpoint( self, exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None, ): """Expose an Executor's endpoint (defined by `@requests(on=...)`) to HTTP endpoint for easier access. After expose, you can send data request directly to `http://hostname:port/endpoint`. Use this method to specify your HTTP endpoint with richer semantic and schema. :param exec_endpoint: the endpoint string, by convention starts with `/` # noqa: DAR101 """ ...
[docs] def expose_endpoint(self, exec_endpoint: str, **kwargs): """Expose an Executor's endpoint (defined by `@requests(on=...)`) to HTTP endpoint for easier access. After expose, you can send data request directly to `http://hostname:port/endpoint`. :param exec_endpoint: the endpoint string, by convention starts with `/` # noqa: DAR101 # noqa: DAR102 """ self._endpoints_mapping[exec_endpoint] = kwargs
# for backward support join = needs
[docs] def rolling_update( self, deployment_name: str, uses_with: Optional[Dict] = None, ): """ Reload all replicas of a deployment sequentially :param deployment_name: deployment to update :param uses_with: a Dictionary of arguments to restart the executor with """ from jina.helper import run_async run_async( self._deployment_nodes[deployment_name].rolling_update, uses_with=uses_with, any_event_loop=True, )
[docs] def to_k8s_yaml( self, output_base_path: str, k8s_namespace: Optional[str] = None, k8s_connection_pool: bool = True, ): """ Converts the Flow into a set of yaml deployments to deploy in Kubernetes :param output_base_path: The base path where to dump all the yaml files :param k8s_namespace: The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used. :param k8s_connection_pool: Boolean indicating wether the kubernetes connection pool should be used inside the Executor Runtimes. """ import yaml if self._build_level.value < FlowBuildLevel.GRAPH.value: self.build(copy_flow=False) from jina.orchestrate.deployments.config.k8s import K8sDeploymentConfig k8s_namespace = k8s_namespace or self.args.name or 'default' for node, v in self._deployment_nodes.items(): if v.external: continue deployment_base = os.path.join(output_base_path, node) k8s_deployment = K8sDeploymentConfig( args=v.args, k8s_namespace=k8s_namespace, k8s_connection_pool=k8s_connection_pool, k8s_deployments_addresses=self._get_k8s_deployments_addresses( k8s_namespace, k8s_connection_pool ) if node == 'gateway' else None, ) configs = k8s_deployment.to_k8s_yaml() for name, k8s_objects in configs: filename = os.path.join(deployment_base, f'{name}.yml') os.makedirs(deployment_base, exist_ok=True) with open(filename, 'w+') as fp: for i, k8s_object in enumerate(k8s_objects): yaml.dump(k8s_object, fp) if i < len(k8s_objects) - 1: fp.write('---\n') self.logger.info( f'K8s yaml files have been created under {output_base_path}. You can use it by running `kubectl apply -R -f {output_base_path}`' )
[docs] def to_docker_compose_yaml( self, output_path: Optional[str] = None, network_name: Optional[str] = None ): """ Converts the Flow into a yaml file to run with `docker-compose up` :param output_path: The output path for the yaml file :param network_name: The name of the network that will be used by the deployment name """ import yaml if self._build_level.value < FlowBuildLevel.GRAPH.value: self.build(copy_flow=False) output_path = output_path or 'docker-compose.yml' network_name = network_name or 'jina-network' from jina.orchestrate.deployments.config.docker_compose import ( DockerComposeConfig, ) docker_compose_dict = { 'version': '3.3', 'networks': {network_name: {'driver': 'bridge'}}, } services = {} for node, v in self._deployment_nodes.items(): docker_compose_deployment = DockerComposeConfig( args=v.args, deployments_addresses=self._get_docker_compose_deployments_addresses(), ) service_configs = docker_compose_deployment.to_docker_compose_config() for service_name, service in service_configs: service['networks'] = [network_name] services[service_name] = service docker_compose_dict['services'] = services with open(output_path, 'w+') as fp: yaml.dump(docker_compose_dict, fp, sort_keys=False) command = ( 'docker-compose up' if output_path is None else f'docker-compose -f {output_path} up' ) self.logger.info( f'Docker compose file has been created under {output_path}. You can use it by running `{command}`' )
[docs] def scale( self, deployment_name: str, replicas: int, ): """ Scale the amount of replicas of a given Executor. :param deployment_name: deployment to update :param replicas: The number of replicas to scale to """ # TODO when replicas-host is ready, needs to be passed here from jina.helper import run_async run_async( self._deployment_nodes[deployment_name].scale, replicas=replicas, any_event_loop=True, )
@property def client_args(self) -> argparse.Namespace: """Get Client settings. # noqa: DAR201 """ if 'port' in self._common_kwargs: kwargs = copy.deepcopy(self._common_kwargs) kwargs['port'] = self._common_kwargs['port'] return ArgNamespace.kwargs2namespace(kwargs, set_client_cli_parser()) @property def gateway_args(self) -> argparse.Namespace: """Get Gateway settings. # noqa: DAR201 """ return ArgNamespace.kwargs2namespace(self._common_kwargs, set_gateway_parser())
[docs] def update_network_interface(self, **kwargs): """Update the network interface of this Flow (affects Gateway & Client) :param kwargs: new network settings """ self._common_kwargs.update(kwargs)