jina.flow.asyncio module

class jina.flow.asyncio.AsyncFlow(*, asyncio: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", https: Optional[bool] = 'False', port: Optional[int] = 'None', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', **kwargs)[source]
class jina.flow.asyncio.AsyncFlow(*, compress: Optional[str] = "'NONE'", compress_min_bytes: Optional[int] = '1024', compress_min_ratio: Optional[float] = '1.1', cors: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'True', daemon: Optional[bool] = 'False', default_swagger_ui: Optional[bool] = 'False', description: Optional[str] = 'None', env: Optional[dict] = 'None', expose_endpoints: Optional[str] = 'None', expose_public: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = "'gateway'", native: Optional[bool] = 'False', no_crud_endpoints: Optional[bool] = 'False', no_debug_endpoints: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", port_ctrl: Optional[int] = 'None', port_expose: Optional[int] = 'None', port_in: Optional[int] = 'None', port_out: Optional[int] = 'None', prefetch: Optional[int] = '0', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'GRPCRuntime'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_CONNECT'", socket_out: Optional[str] = "'PUSH_CONNECT'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', title: Optional[str] = 'None', uses: Optional[Union[str, Type[BaseExecutor], dict]] = "'BaseExecutor'", uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', uvicorn_kwargs: Optional[dict] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs)
class jina.flow.asyncio.AsyncFlow(*, env: Optional[dict] = 'None', inspect: Optional[str] = "'COLLECT'", log_config: Optional[str] = 'None', name: Optional[str] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', static_routing_table: Optional[bool] = 'False', uses: Optional[str] = 'None', workspace: Optional[str] = "'./'", **kwargs)

Bases: jina.clients.mixin.AsyncPostMixin, jina.flow.base.Flow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
from jina.types.document.generators import from_ndarray
import numpy as np

with AsyncFlow().add() as f:
    await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.