jina.orchestrate.flow.asyncio module#
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', https: Optional[bool] = False, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, **kwargs)[source]#
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, connection_list: Optional[str] = None, cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', description: Optional[str] = None, disable_reduce: Optional[bool] = False, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after_address: Optional[str] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.orchestrate.flow.asyncio.AsyncFlow(*, env: Optional[dict] = None, expose_graphql_endpoint: Optional[bool] = False, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, polling: Optional[str] = 'ANY', quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, timeout_ctrl: Optional[int] = 60, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
AsyncPostMixin
,Flow
AsyncFlow
is the asynchronous version of theFlow
. They share the same interface, except inAsyncFlow
train()
,index()
,search()
methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. viaasyncio.run()
,asyncio.create_task()
.AsyncFlow
can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control theasyncio.eventloop
. On contrary,Flow
is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.In particular,
AsyncFlow
makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).from jina import AsyncFlow from jina.types.document.generators import from_ndarray import numpy as np with AsyncFlow().add() as f: await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)
Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.
See also
Asynchronous in REPL: Autoawait
https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
Another example is when using Jina as an integration. Say you have another IO-bounded job
heavylifting()
, you can use this feature to schedule Jinaindex()
andheavylifting()
concurrently.One can think of
Flow
as Jina-managed eventloop, whereasAsyncFlow
is self-managed eventloop.