jina.orchestrate.flow.asyncio module#
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, prefetch: Optional[int] = 1000, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
AsyncPostMixin
,AsyncProfileMixin
,AsyncHealthCheckMixin
,Flow
Asynchronous version of
jina.Flow
. They share the same interface, except inAsyncFlow
train()
,index()
,search()
methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. viaasyncio.run()
,asyncio.create_task()
.AsyncFlow
can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control theasyncio.eventloop
. On contrary,Flow
is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.In particular,
AsyncFlow
makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).from jina import AsyncFlow from jina.types.document.generators import from_ndarray import numpy as np with AsyncFlow().add() as f: await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)
Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.
See also
Asynchronous in REPL: Autoawait
https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
Another example is when using Jina as an integration. Say you have another IO-bounded job
heavylifting()
, you can use this feature to schedule Jinaindex()
andheavylifting()
concurrently.One can think of
Flow
as Jina-managed eventloop, whereasAsyncFlow
is self-managed eventloop.Create a Flow. Flow is how Jina streamlines and scales Executors.
EXAMPLE USAGE
Python API
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor with f: f.bock() # serve Flow
To and from YAML configuration
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor f.save_config('flow.yml') # save YAML config file f = Flow.load_config('flow.yml') # load Flow from YAML config with f: f.bock() # serve Flow
- Parameters:
asyncio – If set, then the input and output of this Client work in an asynchronous manner.
host – The host address of the runtime, by default it is 0.0.0.0.
port – The port of the Gateway, which the client should connect to.
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
tls – If set, connect to gateway using tls encryption
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
deployments_addresses – dictionary JSON with the input addresses of each Deployment
deployments_disable_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed
description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
env – The map of environment variables that are available inside runtime
expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.
expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.
graph_description – Routing graph for the gateway
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
no_crud_endpoints –
If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.
Any executor that has @requests(on=…) bind with those values will receive data requests.
no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
prefetch –
Number of requests fetched from the client before feeding into the first Executor.
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
ssl_certfile – the path to the certificate file
ssl_keyfile – the path to the key file
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
uvicorn_kwargs –
Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
env – The map of environment variables that are available inside runtime
inspect –
The strategy on those inspect deployments in the flow.
If REMOVE is given then all inspect deployments are removed when building the flow.
log_config – The YAML config of the logger used in this object.
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
quiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
uses – The YAML path represents a flow. It can be either a local file path or a URL.
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
- add(**kwargs)#
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
- Parameters:
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
connection_list – dictionary JSON with a list of connections to configure
disable_auto_volume – Do not automatically mount a volume for dockerized Executors.
disable_reduce – Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head
docker_kwargs –
Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.
More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/
entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.
env – The map of environment variables that are available inside runtime
external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
force_update – If set, always pull the latest Hub Executor bundle even it exists on local
gpus –
This argument allows dockerized Jina executor discover local gpu devices.
Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
install_requirements – If set, install requirements.txt in the Hub Executor bundle to local
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
quiet_remote_logs – Do not display the streaming of remote logs on local console
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
tls – If set, connect to deployment using tls encryption
upload_files –
The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. Python files, data files.
Note, - currently only flatten structure is supported, which means if you upload [./foo/a.py, ./foo/b.pp, ./bar/c.yml], then they will be put under the _same_ workspace on the remote, losing all hierarchies. - by default, –uses YAML file is always uploaded. - uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use –workspace-id to specify the workspace.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_after_address – The address of the uses-before runtime
uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_before_address – The address of the uses-before runtime
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
volumes –
The path on the host to be mounted inside the container.
Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.
when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.
deployment_role – the role of the Deployment, used for visualization and route planning
copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
kwargs – other keyword-value arguments that the Deployment CLI supports
- Returns:
a (new) Flow object with modification
- Return type:
- Returns:
a (new) Flow object with modification
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type:
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type:
str
- block(stop_event=None)#
Block the Flow until stop_event is set or user hits KeyboardInterrupt
- Parameters:
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
- build(copy_flow=False, disable_build_sandbox=False)#
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using
start()
,build()
will be invoked.- Parameters:
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationdisable_build_sandbox (
bool
) – when set to true, the sandbox building part will be skipped, will be used by plot
- Return type:
- Returns:
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
- callback(**kwds)#
Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type:
- property client_args: Namespace#
Get Client settings.
# noqa: DAR201
- Return type:
Namespace
- close()#
Immediately unwind the context stack.
- delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- async dry_run(**kwargs)#
Sends a dry run to the Flow to validate if the Flow is ready to receive requests
- Parameters:
kwargs – potential kwargs received passed from the public interface
- Return type:
bool
- Returns:
boolean indicating the health/readiness of the Flow
- enter_context(cm)#
Enters the supplied context manager.
If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.
- property env: Optional[Dict]#
Get all envs to be set in the Flow
- Return type:
Optional
[Dict
]- Returns:
envs as dict
- expose_endpoint(exec_endpoint, **kwargs)#
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
- Parameters:
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
- property gateway_args: Namespace#
Get Gateway settings.
# noqa: DAR201
- Return type:
Namespace
- gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)#
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters:
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the modified Flow or the copy of it
See also
- property host: str#
Return the local address of the gateway .. # noqa: DAR201
- Return type:
str
- index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- inspect(name='inspect', *args, **kwargs)#
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
- Parameters:
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the new instance of the Flow
- static is_valid_jaml(obj)#
Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj:
Dict
:param obj: yaml object :rtype:bool
:return: whether the syntax is valid or not
- classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, **kwargs)#
A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements
JAMLCompatible
mixin can enjoy this feature, e.g.BaseFlow
,BaseExecutor
,BaseDriver
and all their subclasses.- Support substitutions in YAML:
Environment variables:
${{ ENV.VAR }}
(recommended),$VAR
(deprecated).Context dict (
context
):${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}
.Internal reference via
this
androot
:${{this.same_level_key}}
,${{root.root_level_key}}
Substitutions are carried in the order and multiple passes to resolve variables with best effort.
!BaseEncoder metas: name: ${{VAR_A}} # env or context variables workspace: my-${{this.name}} # internal reference
# load Executor from yaml file BaseExecutor.load_config('a.yml') # load Executor from yaml file and substitute environment variables os.environ['VAR_A'] = 'hello-world' b = BaseExecutor.load_config('a.yml') assert b.name == 'hello-world' # load Executor from yaml file and substitute variables from a dict b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'}) assert b.name == 'hello-world' # disable substitute b = BaseExecutor.load_config('a.yml', substitute=False)
- Parameters:
source (
Union
[str
,TextIO
,Dict
]) – the multi-kind source of the configs.allow_py_modules (
bool
) – allow importing plugins specified bypy_modules
in YAML at any levelssubstitute (
bool
) – substitute environment, internal reference and context variables.context (
Optional
[Dict
[str
,Any
]]) – context replacement variables in a dict, the value of the dict is the replacement.uses_with (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s with fielduses_metas (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s metas fielduses_requests (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s requests fieldextra_search_paths (
Optional
[List
[str
]]) – extra paths used when looking for executor yaml filespy_modules (
Optional
[str
]) – Optional py_module from which the object need to be loadedruntime_args (
Optional
[Dict
[str
,Any
]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config
:param : runtime_args that need to be passed to the yaml
- Parameters:
kwargs – kwargs for parse_config_source
- Return type:
- Returns:
JAMLCompatible
object
- property monitoring: bool#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
bool
- needs(needs, name='joiner', *args, **kwargs)#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters:
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type:
- Returns:
the modified Flow
- needs_all(name='joiner', *args, **kwargs)#
Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
- Parameters:
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
- Return type:
- Returns:
the modified Flow
- property num_deployments: int#
Get the number of Deployments in this Flow
- Return type:
int
- property num_pods: int#
Get the number of pods (shards count) in this Flow
- Return type:
int
- plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)#
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
- Parameters:
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type:
- Returns:
the Flow
- pop_all()#
Preserve the context stack by transferring it to a new instance.
- property port: int#
Return the exposed port of the gateway .. # noqa: DAR201
- Return type:
int
- property port_monitoring: Optional[int]#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
Optional
[int
]
- async post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, **kwargs)#
Async Post a general data request to the Flow.
- Parameters:
inputs (
Optional
[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.on (
str
) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.on_done (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is resolved.on_error (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is rejected.on_always (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is either resolved or rejected.parameters (
Optional
[Dict
]) – the kwargs that will be sent to the executortarget_executor (
Optional
[str
]) – a regex string. Only matching Executors will process the request.request_size (
int
) – the number of Documents per request. <=0 means all inputs in one request.show_progress (
bool
) – if set, client will show a progress bar on receiving every request.continue_on_error (
bool
) – if set, a Request that causes callback error will be logged only without blocking the further requests.return_responses (
bool
) – if set to True, the result will come as Response and not as a DocumentArraykwargs – additional parameters, can be used to pass metadata or authentication information in the server call
- Yield:
Response object
Warning
target_executor
usesre.match
for checking if the pattern is matched.target_executor=='foo'
will match both deployments with the namefoo
andfoo_what_ever_suffix
.- Return type:
AsyncGenerator
[None
,Union
[DocumentArray
, Response]]
- async profiling(show_table=True)#
Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.
- Parameters:
show_table (
bool
) – whether to show the table or not.- Return type:
Dict
[str
,float
]- Returns:
the latency report in a dict.
- property protocol: GatewayProtocolType#
Return the protocol of this Flow
- Return type:
- Returns:
the protocol of this Flow
- push(exit)#
Registers a callback with the standard __exit__ method signature.
Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).
- save_config(filename=None)#
Save the object’s config into a YAML file.
- Parameters:
filename (
Optional
[str
]) – file path of the yaml file, if not given thenconfig_abspath
is used
- search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- start()#
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.orchestrate.pods.Pod
- Returns:
this instance
- to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)#
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment name :type include_gateway:bool
:param include_gateway: Defines if the gateway deployment should be included, defaults to True
- to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- property workspace: str#
Return the workspace path of the flow.
- Return type:
str
- property workspace_id: Dict[str, str]#
Get all Deployments’
workspace_id
values in a dict- Return type:
Dict
[str
,str
]