jina.flowΒΆ

jina.flow.build_required(required_level)[source]ΒΆ

Annotate a function so that it requires certain build level to run.

Parameters

required_level (FlowBuildLevel) – required build level to run this function.

Example:

@build_required(FlowBuildLevel.RUNTIME)
def foo():
    print(1)
class jina.flow.Flow(args=None, **kwargs)[source]ΒΆ

Bases: object

Initialize a flow object

Parameters

kwargs – other keyword arguments that will be shared by all pods in this flow

More explain on optimize_level:

As an example, the following flow will generate 6 Peas,

f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(yaml_path='forward', replicas=3)

The optimized version, i.e. Flow(optimize_level=FlowOptimizeLevel.FULL) will generate 4 Peas, but it will force the GatewayPea to take BIND role, as the head and tail routers are removed.

classmethod to_yaml(representer, data)[source]ΒΆ

Required by ruamel.yaml.constructor

classmethod from_yaml(constructor, node)[source]ΒΆ

Required by ruamel.yaml.constructor

save_config(filename=None)[source]ΒΆ

Serialize the object to a yaml file

Parameters

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

Return type

bool

Returns

successfully dumped or not

property yaml_specΒΆ
classmethod load_config(filename)[source]ΒΆ

Build an executor from a YAML file.

Parameters

filename (Union[str, TextIO]) – the file path of the YAML file or a TextIO stream to be loaded from

Return type

Flow

Returns

an executor object

set_last_pod(name, copy_flow=True)[source]ΒΆ

Set a pod as the last pod in the flow, useful when modifying the flow.

Parameters
  • name (str) – the name of the existing pod

  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

a (new) flow object with modification

join(needs, *args, **kwargs)[source]ΒΆ

Add a blocker to the flow, wait until all peas defined in needs completed.

Parameters

needs (Union[Tuple[str], List[str]]) – list of service names to wait

Return type

Flow

Returns

the modified flow

add(needs=None, copy_flow=True, **kwargs)[source]ΒΆ

Add a pod to the current flow object and return the new modified flow object. The attribute of the pod can be later changed with set() or deleted with remove()

Note there are shortcut versions of this method. Recommend to use add_encoder(), add_preprocessor(), add_router(), add_indexer() whenever possible.

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the pod(s) that this pod receives data from. One can also use β€˜pod.Gateway’ to indicate the connection with the gateway.

  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the pod CLI supports

Return type

Flow

Returns

a (new) flow object with modification

build(copy_flow=False)[source]ΒΆ

Build the current flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current flow (by default)

Note

copy_flow=True is recommended if you are building the same flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]ΒΆ

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

property num_podsΒΆ

Get the number of pods in this flow

Return type

int

property num_peasΒΆ

Get the number of peas (replicas count) in this flow

Return type

int

close()[source]ΒΆ

Close the flow and release all resources associated to it.

train(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do training on the current flow

It will start a CLIClient and call train().

Example,

with f:
    f.train(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a document for training

with f.build(runtime='thread') as flow:
    flow.train(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Document], Iterator[bytes], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after training

  • kwargs – accepts all keyword arguments of jina client CLI

index_ndarray(array, axis=0, size=None, shuffle=False, output_fn=None, **kwargs)[source]ΒΆ

Using numpy ndarray as the index source for the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • output_fn (Callable[[ForwardRef], None]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_ndarray(array, axis=0, size=None, shuffle=False, output_fn=None, **kwargs)[source]ΒΆ

Use a numpy ndarray as the query source for searching on the current flow

Parameters
  • array (np.ndarray) – the numpy ndarray data source

  • axis (int) – iterate over that axis

  • size (int) – the maximum number of the sub arrays

  • shuffle (bool) – shuffle the the numpy data source beforehand

  • output_fn (Callable[[ForwardRef], None]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index_lines(lines=None, filepath=None, size=None, sampling_rate=None, read_mode='r', output_fn=None, **kwargs)[source]ΒΆ

Use a list of lines as the index source for indexing on the current flow

Parameters
  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • filepath (Optional[str]) – a text file that each line contains a document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, output_fn=None, **kwargs)[source]ΒΆ

Use a set of files as the index source for indexing on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_files(patterns, recursive=True, size=None, sampling_rate=None, read_mode=None, output_fn=None, **kwargs)[source]ΒΆ

Use a set of files as the query source for searching on the current flow

Parameters
  • patterns (Union[str, List[str]]) – The pattern may contain simple shell-style wildcards, e.g. β€˜*.py’, β€˜[*.zip, *.gz]’

  • recursive (bool) – If recursive is true, the pattern β€˜**’ will match any files and zero or more directories and subdirectories.

  • size (Optional[int]) – the maximum number of the files

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode (Optional[str]) – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search_lines(filepath=None, lines=None, size=None, sampling_rate=None, read_mode='r', output_fn=None, **kwargs)[source]ΒΆ

Use a list of files as the query source for searching on the current flow

Parameters
  • filepath (Optional[str]) – a text file that each line contains a document

  • lines (Optional[Iterator[str]]) – a list of strings, each is considered as d document

  • size (Optional[int]) – the maximum number of the documents

  • sampling_rate (Optional[float]) – the sampling rate between [0, 1]

  • read_mode – specifies the mode in which the file is opened. β€˜r’ for reading in text mode, β€˜rb’ for reading in binary

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

index(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do indexing on the current flow

Example,

with f:
    f.index(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'  # each yield generates a document to index

with f.build(runtime='thread') as flow:
    flow.index(bytes_gen=my_reader())

It will start a CLIClient and call index().

Parameters
  • input_fn (Union[Iterator[Document], Iterator[bytes], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after indexing

  • kwargs – accepts all keyword arguments of jina client CLI

search(input_fn=None, output_fn=None, **kwargs)[source]ΒΆ

Do searching on the current flow

It will start a CLIClient and call search().

Example,

with f:
    f.search(input_fn)
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a query for searching

with f.build(runtime='thread') as flow:
    flow.search(bytes_gen=my_reader())
Parameters
  • input_fn (Union[Iterator[Document], Iterator[bytes], Callable, None]) – An iterator of bytes. If not given, then you have to specify it in kwargs.

  • output_fn (Optional[Callable[[Message], None]]) – the callback function to invoke after searching

  • kwargs – accepts all keyword arguments of jina client CLI

dry_run(**kwargs)[source]ΒΆ

Send a DRYRUN request to this flow, passing through all pods in this flow, useful for testing connectivity and debugging

to_swarm_yaml(path)[source]ΒΆ

Generate the docker swarm YAML compose file

Parameters

path (TextIO) – the output yaml path

property port_exposeΒΆ
property hostΒΆ
block()[source]ΒΆ

Block the process until user hits KeyboardInterrupt

use_grpc_gateway()[source]ΒΆ

Change to use gRPC gateway for IO

use_rest_gateway()[source]ΒΆ

Change to use REST gateway for IO