jina.orchestrate.flow.base module#

class jina.orchestrate.flow.base.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
class jina.orchestrate.flow.base.Flow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, prefetch: Optional[int] = 1000, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
class jina.orchestrate.flow.base.Flow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)

Bases: PostMixin, ProfileMixin, HealthCheckMixin, JAMLCompatible, ExitStack

Flow is how Jina streamlines and distributes Executors.

Create a Flow. Flow is how Jina streamlines and scales Executors.

EXAMPLE USAGE

Python API

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
with f:
    f.bock()  # serve Flow

To and from YAML configuration

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
f.save_config('flow.yml')  # save YAML config file
f = Flow.load_config('flow.yml')  # load Flow from YAML config
with f:
    f.bock()  # serve Flow
Parameters:
  • asyncio – If set, then the input and output of this Client work in an asynchronous manner.

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • port – The port of the Gateway, which the client should connect to.

  • protocol – Communication protocol between server and client.

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • tls – If set, connect to gateway using tls encryption

  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.

  • deployments_addresses – dictionary JSON with the input addresses of each Deployment

  • deployments_disable_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed

  • description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • env – The map of environment variables that are available inside runtime

  • exit_on_exceptions – List of exceptions that will cause the Executor to shut down.

  • expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.

  • expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • graph_description – Routing graph for the gateway

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • host_in – The host address for binding to, by default it is 0.0.0.0

  • log_config – The YAML config of the logger used in this object.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.

  • no_crud_endpoints

    If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.

    Any executor that has @requests(on=…) bind with those values will receive data requests.

  • no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.

  • output_array_type

    The type of array tensor and embedding will be serialized to.

    Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.

  • polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}

  • port – The port for input data to bind to, default is a random port between [49152, 65535]

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • prefetch

    Number of requests fetched from the client before feeding into the first Executor.

    Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)

  • protocol – Communication protocol between server and client.

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • py_modules

    The customized python modules need to be imported before loading the executor

    Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbook

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • replicas – The number of replicas in the deployment

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies

  • ssl_certfile – the path to the certificate file

  • ssl_keyfile – the path to the key file

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • uses

    The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses

  • uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • uvicorn_kwargs

    Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

    More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • env – The map of environment variables that are available inside runtime

  • inspect

    The strategy on those inspect deployments in the flow.

    If REMOVE is given then all inspect deployments are removed when building the flow.

  • log_config – The YAML config of the logger used in this object.

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • uses – The YAML path represents a flow. It can be either a local file path or a URL.

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

needs(needs, name='joiner', *args, **kwargs)[source]#

Add a blocker to the Flow, wait until all pods defined in needs completed.

Parameters:
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type:

Flow

Returns:

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]#

Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.

Parameters:
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type:

Flow

Returns:

the modified Flow

add(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'][source]#
add(*, needs: Optional[Union[str, Tuple[str], List[str]]] = None, copy_flow: bool = True, deployment_role: DeploymentRoleType = DeploymentRoleType.DEPLOYMENT, **kwargs) Union['Flow', 'AsyncFlow']

Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with set() or deleted with remove()

Parameters:
  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • connection_list – dictionary JSON with a list of connections to configure

  • disable_auto_volume – Do not automatically mount a volume for dockerized Executors.

  • disable_reduce – Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • exit_on_exceptions – List of exceptions that will cause the Executor to shut down.

  • external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • force_update – If set, always pull the latest Hub Executor bundle even it exists on local

  • gpus

    This argument allows dockerized Jina executor discover local gpu devices.

    Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • host_in – The host address for binding to, by default it is 0.0.0.0

  • install_requirements – If set, install requirements.txt in the Hub Executor bundle to local

  • log_config – The YAML config of the logger used in this object.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.

  • output_array_type

    The type of array tensor and embedding will be serialized to.

    Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.

  • polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}

  • port – The port for input data to bind to, default is a random port between [49152, 65535]

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • py_modules

    The customized python modules need to be imported before loading the executor

    Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbook

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • quiet_remote_logs – Do not display the streaming of remote logs on local console

  • replicas – The number of replicas in the deployment

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • tls – If set, connect to deployment using tls encryption

  • upload_files

    The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. Python files, data files.

    Note, - currently only flatten structure is supported, which means if you upload [./foo/a.py, ./foo/b.pp, ./bar/c.yml], then they will be put under the _same_ workspace on the remote, losing all hierarchies. - by default, –uses YAML file is always uploaded. - uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use –workspace-id to specify the workspace.

  • uses

    The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_after_address – The address of the uses-before runtime

  • uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_before_address – The address of the uses-before runtime

  • uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses

  • uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • volumes

    The path on the host to be mounted inside the container.

    Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.

  • when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.

  • deployment_role – the role of the Deployment, used for visualization and route planning

  • copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Deployment CLI supports

Returns:

a (new) Flow object with modification

Return type:

Union[Flow, AsyncFlow]

Returns:

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]#

Add an inspection on the last changed Deployment in the Flow

Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
        |
        -- PUB-SUB -- InspectDeployment (Hanging)

In this way, InspectDeployment looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters:
  • name (str) – name of the Deployment

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#

Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters:
  • name (str) – the name of the gather Deployment

  • include_last_deployment (bool) – if to include the last modified Deployment in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False, disable_build_sandbox=False)[source]#

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters:
  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • disable_build_sandbox (bool) – when set to true, the sandbox building part will be skipped, will be used by plot

Return type:

Flow

Returns:

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]#

Start to run all Deployments in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.orchestrate.pods.Pod

Returns:

this instance

property num_deployments: int#

Get the number of Deployments in this Flow

Return type:

int

property num_pods: int#

Get the number of pods (shards count) in this Flow

Return type:

int

property client: BaseClient#

Return a BaseClient object attach to this Flow.

Return type:

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='deployment_a').plot('flow.svg')
Parameters:
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type:

Flow

Returns:

the Flow

property port: int#

Return the exposed port of the gateway .. # noqa: DAR201

Return type:

int

property host: str#

Return the local address of the gateway .. # noqa: DAR201

Return type:

str

property monitoring: bool#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

bool

property port_monitoring: Optional[int]#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

Optional[int]

property address_private: str#

Return the private IP address of the gateway for connecting from other machine in the same network

Return type:

str

property address_public: str#

Return the public IP address of the gateway for connecting from other machine in the public network

Return type:

str

block(stop_event=None)[source]#

Block the Flow until stop_event is set or user hits KeyboardInterrupt

Parameters:

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

property protocol: GatewayProtocolType#

Return the protocol of this Flow

Return type:

GatewayProtocolType

Returns:

the protocol of this Flow

property workspace: str#

Return the workspace path of the flow.

Return type:

str

callback(**kwds)#

Registers an arbitrary callback and arguments.

Cannot suppress exceptions.

close()#

Immediately unwind the context stack.

delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
dry_run(**kwargs)#
enter_context(cm)#

Enters the supplied context manager.

If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.

index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
is_flow_ready(**kwargs)#

Check if the Flow is ready to receive requests

Parameters:

kwargs – potential kwargs received passed from the public interface

Return type:

bool

Returns:

boolean indicating the health/readiness of the Flow

static is_valid_jaml(obj)#

Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj: Dict :param obj: yaml object :rtype: bool :return: whether the syntax is valid or not

classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, **kwargs)#

A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements JAMLCompatible mixin can enjoy this feature, e.g. BaseFlow, BaseExecutor, BaseDriver and all their subclasses.

Support substitutions in YAML:
  • Environment variables: ${{ ENV.VAR }} (recommended), $VAR (deprecated).

  • Context dict (context): ${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}.

  • Internal reference via this and root: ${{this.same_level_key}}, ${{root.root_level_key}}

Substitutions are carried in the order and multiple passes to resolve variables with best effort.

!BaseEncoder
metas:
    name: ${{VAR_A}}  # env or context variables
    workspace: my-${{this.name}}  # internal reference
# load Executor from yaml file
BaseExecutor.load_config('a.yml')

# load Executor from yaml file and substitute environment variables
os.environ['VAR_A'] = 'hello-world'
b = BaseExecutor.load_config('a.yml')
assert b.name == 'hello-world'

# load Executor from yaml file and substitute variables from a dict
b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'})
assert b.name == 'hello-world'

# disable substitute
b = BaseExecutor.load_config('a.yml', substitute=False)
Parameters:
  • source (Union[str, TextIO, Dict]) – the multi-kind source of the configs.

  • allow_py_modules (bool) – allow importing plugins specified by py_modules in YAML at any levels

  • substitute (bool) – substitute environment, internal reference and context variables.

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • extra_search_paths (Optional[List[str]]) – extra paths used when looking for executor yaml files

  • py_modules (Optional[str]) – Optional py_module from which the object need to be loaded

  • runtime_args (Optional[Dict[str, Any]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config

:param : runtime_args that need to be passed to the yaml

Parameters:

kwargs – kwargs for parse_config_source

Return type:

JAMLCompatible

Returns:

JAMLCompatible object

pop_all()#

Preserve the context stack by transferring it to a new instance.

post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=0.1, backoff_multiplier=1.5, **kwargs)#

Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • kwargs – additional parameters

Return type:

Union[DocumentArray, List[Response], None]

Returns:

None or DocumentArray containing all response Documents

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

profiling(show_table=True)#

Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.

Parameters:

show_table (bool) – whether to show the table or not.

Return type:

Dict[str, float]

Returns:

the latency report in a dict.

push(exit)#

Registers a callback with the standard __exit__ method signature.

Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).

save_config(filename=None)#

Save the object’s config into a YAML file.

Parameters:

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
property workspace_id: Dict[str, str]#

Get all Deployments’ workspace_id values in a dict

Return type:

Dict[str, str]

property env: Optional[Dict]#

Get all envs to be set in the Flow

Return type:

Optional[Dict]

Returns:

envs as dict

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters:

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)[source]#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)[source]#

Converts the Flow into a yaml file to run with docker-compose up :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name :type include_gateway: bool :param include_gateway: Defines if the gateway deployment should be included, defaults to True

property client_args: Namespace#

Get Client settings.

# noqa: DAR201

Return type:

Namespace

property gateway_args: Namespace#

Get Gateway settings.

# noqa: DAR201

Return type:

Namespace