Asynchronous Flow

AsyncFlow is an “async version” of the Flow class.

The quote mark represents the explicit async when using AsyncFlow.

While synchronous from outside, Flow also runs asynchronously under the hood: it manages the eventloop(s) for scheduling the jobs. If the user wants more control over the eventloop, then AsyncFlow can be used.

Create AsyncFlow

To create an AsyncFlow, simply

from jina import AsyncFlow

f = AsyncFlow()

There is also a sugary syntax Flow(asyncio=True) for initiating an AsyncFlow object.

from jina import Flow

f = Flow(asyncio=True)

Input & output

Unlike Flow, AsyncFlow accepts input and output functions as async generators. This is useful when your data sources involve other asynchronous libraries (e.g. motor for MongoDB):

import asyncio

from jina import AsyncFlow, Document


async def async_inputs():
    for _ in range(10):
        yield Document()
        await asyncio.sleep(0.1)


with AsyncFlow().add() as f:
    async for resp in f.post('/', async_inputs):
        print(resp)

Using AsyncFlow for overlapping heavy-lifting job

AsyncFlow is particularly useful when Jina and another heavy-lifting job are running concurrently:

import time
import asyncio

from jina import AsyncFlow, Executor, requests


class HeavyWork(Executor):

    @requests
    def foo(self, **kwargs):
        time.sleep(5)


async def run_async_flow_5s():
    with AsyncFlow().add(uses=HeavyWork) as f:
        async for resp in f.post('/'):
            print(resp)


async def heavylifting():  # total roundtrip takes ~5s
    print('heavylifting other io-bound jobs, e.g. download, upload, file io')
    await asyncio.sleep(5)
    print('heavylifting done after 5s')


async def concurrent_main():  # about 5s; but some dispatch cost, can't be just 5s, usually at <7s
    await asyncio.gather(run_async_flow_5s(), heavylifting())


if __name__ == '__main__':
    asyncio.run(concurrent_main())

AsyncFlow is very useful when using Jina inside a Jupyter Notebook, where it can run out-of-the-box.