jina.flow.asyncioΒΆ

class jina.flow.asyncio.AsyncFlow(args=None, env=None, **kwargs)[source]ΒΆ

Bases: jina.flow.base.BaseFlow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
import numpy as np

with AsyncFlow().add() as f:
    await f.index_ndarray(np.random.random([5, 4]), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements β€œautoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently. For example,

async def run_async_flow_5s():
    # WaitDriver pause 5s makes total roundtrip ~5s
    with AsyncFlow().add(uses='- !WaitDriver {}') as f:
        await f.index_ndarray(np.random.random([5, 4]), on_done=validate)


async def heavylifting():
    # total roundtrip takes ~5s
    print('heavylifting other io-bound jobs, e.g. download, upload, file io')
    await asyncio.sleep(5)
    print('heavylifting done after 5s')


async def concurrent_main():
    # about 5s; but some dispatch cost, can't be just 5s, usually at <7s
    await asyncio.gather(run_async_flow_5s(), heavylifting())

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

Initialize a flow object

Parameters

kwargs – other keyword arguments that will be shared by all pods in this flow

More explain on optimize_level:

As an example, the following flow will generate 6 Peas,

f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(uses='forward', parallel=3)

The optimized version, i.e. Flow(optimize_level=FlowOptimizeLevel.FULL) will generate 4 Peas, but it will force the GatewayPea to take BIND role, as the head and tail routers are removed.

train(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Do training on the current flow

It will start a CLIClient and call train().

Example,

with f:
    f.train(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a document for training

with f.build(runtime='thread') as flow:
    flow.train(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_ndarray(array, axis=0, size=None, shuffle=False, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Using numpy ndarray as the index source for the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search_ndarray(array, axis=0, size=None, shuffle=False, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Use a numpy ndarray as the query source for searching on the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_lines(lines=None, filepath=None, size=None, sampling_rate=None, read_mode='r', on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Use a list of lines as the index source for indexing on the current flow

Parameters
  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • filepath (Optional[str]) – a text file that each line contains a document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Use a set of files as the index source for indexing on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary mode

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Use a set of files as the query source for searching on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search_lines(filepath=None, lines=None, size=None, sampling_rate=None, read_mode='r', on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Use a list of files as the query source for searching on the current flow

Parameters
  • filepath (Optional[str]) – a text file that each line contains a document

  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Do indexing on the current flow

Example,

with f:
    f.index(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'  # each yield generates a document to index

with f.build(runtime='thread') as flow:
    flow.index(bytes_gen=my_reader())

It will start a CLIClient and call index().

Parameters
  • input_fn (Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]ΒΆ

Do searching on the current flow

It will start a CLIClient and call search().

Example,

with f:
    f.search(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a query for searching

with f.build(runtime='thread') as flow:
    flow.search(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI