jina.flow

class jina.flow.Flow(args=None, env=None, **kwargs)[source]

Bases: jina.flow.base.BaseFlow

Initialize a flow object

Parameters

kwargs – other keyword arguments that will be shared by all pods in this flow

More explain on optimize_level:

As an example, the following flow will generate 6 Peas,

f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(uses='forward', parallel=3)

The optimized version, i.e. Flow(optimize_level=FlowOptimizeLevel.FULL) will generate 4 Peas, but it will force the GatewayPea to take BIND role, as the head and tail routers are removed.

train(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Do training on the current flow It will start a CLIClient and call train(). Example, .. highlight:: python .. code-block:: python

with f:

f.train(input_fn) …

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. One may also build a reader/generator on your own. Example, .. highlight:: python .. code-block:: python

def my_reader():
for _ in range(10):

yield b’abcdfeg’ # each yield generates a document for training

with f.build(runtime=’thread’) as flow:

flow.train(bytes_gen=my_reader())

Parameters
  • input_fn (Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_ndarray(array, axis=0, size=None, shuffle=False, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Using numpy ndarray as the index source for the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • on_done (Optional[Callable[…, None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_ndarray(array, axis=0, size=None, shuffle=False, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Use a numpy ndarray as the query source for searching on the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_lines(lines=None, filepath=None, size=None, sampling_rate=None, read_mode='r', on_done=None, on_error=None, on_always=None, **kwargs)[source]

Use a list of lines as the index source for indexing on the current flow :type lines: Optional[Iterator[str]] :param lines: a list of strings, each is considered as d document :type filepath: Optional[str] :param filepath: a text file that each line contains a document :type size: Optional[int] :param size: the maximum number of the documents :type sampling_rate: Optional[float] :param sampling_rate: the sampling rate between [0, 1] :param read_mode: specifies the mode in which the file

is opened. ‘r’ for reading in text mode, ‘rb’ for reading in binary

Parameters
  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Use a set of files as the index source for indexing on the current flow :type patterns: Union[str, List[str]] :param patterns: The pattern may contain simple shell-style wildcards, e.g. ‘*.py’, ‘[*.zip, *.gz]’ :type recursive: bool :param recursive: If recursive is true, the pattern ‘**’ will match any files and

zero or more directories and subdirectories.

Parameters
  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. ‘r’ for reading in text mode, ‘rb’ for reading in binary mode

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Use a set of files as the query source for searching on the current flow :type patterns: Union[str, List[str]] :param patterns: The pattern may contain simple shell-style wildcards, e.g. ‘*.py’, ‘[*.zip, *.gz]’ :type recursive: bool :param recursive: If recursive is true, the pattern ‘**’ will match any files and

zero or more directories and subdirectories.

Parameters
  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. ‘r’ for reading in text mode, ‘rb’ for reading in

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

search_lines(filepath=None, lines=None, size=None, sampling_rate=None, read_mode='r', on_done=None, on_error=None, on_always=None, **kwargs)[source]

Use a list of files as the query source for searching on the current flow :type filepath: Optional[str] :param filepath: a text file that each line contains a document :type lines: Optional[Iterator[str]] :param lines: a list of strings, each is considered as d document :type size: Optional[int] :param size: the maximum number of the documents :type sampling_rate: Optional[float] :param sampling_rate: the sampling rate between [0, 1] :param read_mode: specifies the mode in which the file

is opened. ‘r’ for reading in text mode, ‘rb’ for reading in binary

Parameters
  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI

index(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Do indexing on the current flow Example, .. highlight:: python .. code-block:: python

with f:

f.index(input_fn) …

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. One may also build a reader/generator on your own. Example, .. highlight:: python .. code-block:: python

def my_reader():
for _ in range(10):

yield b’abcdfeg’ # each yield generates a document to index

with f.build(runtime=’thread’) as flow:

flow.index(bytes_gen=my_reader())

It will start a CLIClient and call index(). :type input_fn: Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None] :param input_fn: An iterator of bytes. If not given, then you have to specify it in kwargs. :type on_done: Optional[Callable[…, None]] :param on_done: the function to be called when the Request object is resolved. :type on_error: Optional[Callable[…, None]] :param on_error: the function to be called when the Request object is rejected. :type on_always: Optional[Callable[…, None]] :param on_always: the function to be called when the Request object is is either resolved or rejected. :param kwargs: accepts all keyword arguments of jina client CLI

update(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Updates documents on the current flow Example, .. highlight:: python .. code-block:: python

with f:

f.update(input_fn) …

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. One may also build a reader/generator on your own. Example, .. highlight:: python .. code-block:: python

def my_reader():
for _ in range(10):

yield b’abcdfeg’ # each yield generates a document to update

with f.build(runtime=’thread’) as flow:

flow.update(bytes_gen=my_reader())

It will start a CLIClient and call update(). :type input_fn: Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None] :param input_fn: An iterator of bytes. If not given, then you have to specify it in kwargs. :type on_done: Optional[Callable[…, None]] :param on_done: the function to be called when the Request object is resolved. :type on_error: Optional[Callable[…, None]] :param on_error: the function to be called when the Request object is rejected. :type on_always: Optional[Callable[…, None]] :param on_always: the function to be called when the Request object is is either resolved or rejected. :param kwargs: accepts all keyword arguments of jina client CLI

delete(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Do deletion on the current flow Example, .. highlight:: python .. code-block:: python

with f:

f.delete(input_fn) …

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. One may also build a reader/generator on your own. Example, .. highlight:: python .. code-block:: python

def my_reader():
for _ in range(10):

yield b’abcdfeg’ # each yield generates a document to delete

with f.build(runtime=’thread’) as flow:

flow.delete(bytes_gen=my_reader())

It will start a CLIClient and call delete(). :type input_fn: Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None] :param input_fn: An iterator of bytes. If not given, then you have to specify it in kwargs. :type on_done: Optional[Callable[…, None]] :param on_done: the function to be called when the Request object is resolved. :type on_error: Optional[Callable[…, None]] :param on_error: the function to be called when the Request object is rejected. :type on_always: Optional[Callable[…, None]] :param on_always: the function to be called when the Request object is is either resolved or rejected. :param kwargs: accepts all keyword arguments of jina client CLI

search(input_fn=None, on_done=None, on_error=None, on_always=None, **kwargs)[source]

Do searching on the current flow It will start a CLIClient and call search(). Example, .. highlight:: python .. code-block:: python

with f:

f.search(input_fn) …

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. One may also build a reader/generator on your own. Example, .. highlight:: python .. code-block:: python

def my_reader():
for _ in range(10):

yield b’abcdfeg’ # each yield generates a query for searching

with f.build(runtime=’thread’) as flow:

flow.search(bytes_gen=my_reader())

Parameters
  • input_fn (Union[Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]], Callable[…, Iterator[Union[~DocumentContentType, ~DocumentSourceType, Tuple[~DocumentContentType, ~DocumentContentType], Tuple[~DocumentSourceType, ~DocumentSourceType]]]], None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • on_done (Optional[Callable[…, None]]) – the function to be called when the Request object is resolved.

  • on_error (Optional[Callable[…, None]]) – the function to be called when the Request object is rejected.

  • on_always (Optional[Callable[…, None]]) – the function to be called when the Request object is is either resolved or rejected.

  • kwargs – accepts all keyword arguments of jina client CLI