jina.flow.asyncio

class jina.flow.asyncio.AsyncFlow(args=None, env=None, **kwargs)[source]

Bases: jina.flow.mixin.async_crud.AsyncCRUDFlowMixin, jina.flow.mixin.async_control.AsyncControlFlowMixin, jina.flow.base.BaseFlow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
import numpy as np

with AsyncFlow().add() as f:
    await f.index_ndarray(np.random.random([5, 4]), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently. For example,

async def run_async_flow_5s():
    # WaitDriver pause 5s makes total roundtrip ~5s
    with AsyncFlow().add(uses='- !WaitDriver {}') as f:
        await f.index_ndarray(np.random.random([5, 4]), on_done=validate)


async def heavylifting():
    # total roundtrip takes ~5s
    print('heavylifting other io-bound jobs, e.g. download, upload, file io')
    await asyncio.sleep(5)
    print('heavylifting done after 5s')


async def concurrent_main():
    # about 5s; but some dispatch cost, can't be just 5s, usually at <7s
    await asyncio.gather(run_async_flow_5s(), heavylifting())

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.