jina.flowΒΆ

class jina.flow.Flow(args=None, **kwargs)[source]ΒΆ

Bases: contextlib.ExitStack

Initialize a flow object

Parameters

kwargs – other keyword arguments that will be shared by all pods in this flow

More explain on optimize_level:

As an example, the following flow will generate 6 Peas,

f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(uses='forward', parallel=3)

The optimized version, i.e. Flow(optimize_level=FlowOptimizeLevel.FULL) will generate 4 Peas, but it will force the GatewayPea to take BIND role, as the head and tail routers are removed.

classmethod to_yaml(representer, data)[source]ΒΆ

Required by ruamel.yaml.constructor

classmethod from_yaml(constructor, node)[source]ΒΆ

Required by ruamel.yaml.constructor

save_config(filename=None)[source]ΒΆ

Serialize the object to a yaml file

Parameters

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

Return type

bool

Returns

successfully dumped or not

property yaml_specΒΆ
classmethod load_config(filename)[source]ΒΆ

Build an executor from a YAML file.

Parameters

filename (Union[str, TextIO]) – the file path of the YAML file or a TextIO stream to be loaded from

Return type

Flow

Returns

an executor object

property last_podΒΆ
needs(needs, uses='_merge', name='joiner', *args, **kwargs)[source]ΒΆ

Add a blocker to the flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • uses (str) – the config of the executor, by default is _merge

  • name (str) – the name of this joiner, by default is joiner

Return type

Flow

Returns

the modified flow

add(needs=None, copy_flow=True, pod_role=<PodRoleType.POD: 0>, **kwargs)[source]ΒΆ

Add a pod to the current flow object and return the new modified flow object. The attribute of the pod can be later changed with set() or deleted with remove()

Note there are shortcut versions of this method. Recommend to use add_encoder(), add_preprocessor(), add_router(), add_indexer() whenever possible.

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the pod(s) that this pod receives data from. One can also use β€˜pod.Gateway’ to indicate the connection with the gateway.

  • pod_role (PodRoleType) – the role of the Pod, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the pod CLI supports

Return type

Flow

Returns

a (new) flow object with modification

inspect(name='inspect', *args, **kwargs)[source]ΒΆ

Add an inspection on the last changed Pod in the Flow

Internally, it adds two pods to the flow. But no worry, the overhead is minimized and you can remove them by simply give Flow(inspect=FlowInspectType.REMOVE) before using the flow.

Flow -- PUB-SUB -- BasePod(_pass) -- Flow
        |
        -- PUB-SUB -- InspectPod (Hanging)

In this way, InspectPod looks like a simple _pass from outside and does not introduce side-effect (e.g. changing the socket type) to the original flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing evaluator into the flow.

See also

gather_inspect()

Return type

Flow

gather_inspect(name='gather_inspect', uses='_merge_eval', include_last_pod=True, *args, **kwargs)[source]ΒΆ

Gather all inspect pods output into one pod. When the flow has no inspect pod then the flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather pod

  • uses – the config of the executor, by default is _merge

  • include_last_pod (bool) – if to include the last modified pod in the flow

  • args –

  • kwargs –

Return type

Flow

Returns

the modified flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]ΒΆ

Build the current flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current flow (by default)

Note

copy_flow=True is recommended if you are building the same flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]ΒΆ

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

property num_podsΒΆ

Get the number of pods in this flow

Return type

int

property num_peasΒΆ

Get the number of peas (parallel count) in this flow

Return type

int

train(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do training on the current flow

It will start a CLIClient and call train().

Example,

with f:
    f.train(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a document for training

with f.build(runtime='thread') as flow:
    flow.train(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Document], Iterator[bytes], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after training

  • kwargs – accepts all keyword arguments of jina client CLI

index_ndarray(array, axis=0, size=None, shuffle=False, output_fn=None, **kwargs)[source]ΒΆ

Using numpy ndarray as the index source for the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • output_fn (Callable[[ForwardRef], None]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_ndarray(array, axis=0, size=None, shuffle=False, output_fn=None, **kwargs)[source]ΒΆ

Use a numpy ndarray as the query source for searching on the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • output_fn (Callable[[ForwardRef], None]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index_lines(lines=None, filepath=None, size=None, sampling_rate=None, read_mode='r', output_fn=None, **kwargs)[source]ΒΆ

Use a list of lines as the index source for indexing on the current flow

Parameters
  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • filepath (Optional[str]) – a text file that each line contains a document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, output_fn=None, **kwargs)[source]ΒΆ

Use a set of files as the index source for indexing on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary mode

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, output_fn=None, **kwargs)[source]ΒΆ

Use a set of files as the query source for searching on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_lines(filepath=None, lines=None, size=None, sampling_rate=None, read_mode='r', output_fn=None, **kwargs)[source]ΒΆ

Use a list of files as the query source for searching on the current flow

Parameters
  • filepath (Optional[str]) – a text file that each line contains a document

  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do indexing on the current flow

Example,

with f:
    f.index(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'  # each yield generates a document to index

with f.build(runtime='thread') as flow:
    flow.index(bytes_gen=my_reader())

It will start a CLIClient and call index().

Parameters
  • input_fn (Union[Iterator[Union[Document, bytes]], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do searching on the current flow

It will start a CLIClient and call search().

Example,

with f:
    f.search(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a query for searching

with f.build(runtime='thread') as flow:
    flow.search(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Union[Document, bytes]], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after searching

  • kwargs – accepts all keyword arguments of jina client CLI

plot(output=None, vertical_layout=False, inline_display=True, build=True, copy_flow=True)[source]ΒΆ

Visualize the flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='pod_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the flow

dry_run(**kwargs)[source]ΒΆ

Send a DRYRUN request to this flow, passing through all pods in this flow, useful for testing connectivity and debugging

to_swarm_yaml(path)[source]ΒΆ

Generate the docker swarm YAML compose file

Parameters

path (TextIO) – the output yaml path

property port_exposeΒΆ
property hostΒΆ
block()[source]ΒΆ

Block the process until user hits KeyboardInterrupt

use_grpc_gateway()[source]ΒΆ

Change to use gRPC gateway for IO

use_rest_gateway()[source]ΒΆ

Change to use REST gateway for IO

join(needs, uses='_merge', name='joiner', *args, **kwargs)ΒΆ

Add a blocker to the flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • uses (str) – the config of the executor, by default is _merge

  • name (str) – the name of this joiner, by default is joiner

Return type

Flow

Returns

the modified flow