jina.flow.base

class jina.flow.base.BaseFlow(args=None, env=None, **kwargs)[source]

Bases: jina.jaml.JAMLCompatible, contextlib.ExitStack

An abstract Flow object in Jina.

Note

BaseFlow does not provide train, index, search interfaces. Please use Flow or AsyncFlow.

Explanation on optimize_level:

As an example, the following Flow will generate 6 Peas,

f = Flow.add(uses='forward', parallel=3)
Parameters
  • kwargs – other keyword arguments that will be shared by all Pods in this Flow

  • args (Optional[Namespace]) – Namespace args

  • env (Optional[Dict]) – environment variables shared by all Pods

property yaml_spec

get the YAML representation of the instance

property last_pod

Last pod

needs(needs, name='joiner', *args, **kwargs)[source]

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

BaseFlow

Returns

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]

Collect all hanging Pods so far and add a blocker to the Flow; wait until all handing peas completed.

Parameters
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type

BaseFlow

Returns

the modified Flow

add(needs=None, copy_flow=True, pod_role=<PodRoleType.POD: 0>, **kwargs)[source]

Add a Pod to the current Flow object and return the new modified Flow object. The attribute of the Pod can be later changed with set() or deleted with remove()

Note there are shortcut versions of this method. Recommend to use add_encoder(), add_preprocessor(), add_router(), add_indexer() whenever possible.

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the Pod(s) that this Pod receives data from. One can also use ‘pod.Gateway’ to indicate the connection with the gateway.

  • pod_role (PodRoleType) – the role of the Pod, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Pod CLI supports

Return type

BaseFlow

Returns

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]

Add an inspection on the last changed Pod in the Flow

Internally, it adds two Pods to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BasePod(_pass) -- Flow
        |
        -- PUB-SUB -- InspectPod (Hanging)

In this way, InspectPod looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters
  • name (str) – name of the Pod

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

BaseFlow

Returns

the new instance of the Flow

gather_inspect(name='gather_inspect', uses='_merge_eval', include_last_pod=True, *args, **kwargs)[source]

Gather all inspect Pods output into one Pod. When the Flow has no inspect Pod then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather Pod

  • uses – the config of the executor, by default is _pass

  • include_last_pod (bool) – if to include the last modified Pod in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

BaseFlow

Returns

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

BaseFlow

Returns

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

Returns

this instance

property num_pods

Get the number of Pods in this Flow

Return type

int

property num_peas

Get the number of peas (parallel count) in this Flow

Return type

int

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=False)[source]

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='pod_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

BaseFlow

Returns

the Flow

to_swarm_yaml(path)[source]

Generate the docker swarm YAML compose file

Parameters

path (TextIO) – the output yaml path

property port_expose

Return the exposed port of the gateway

Return type

int

property host

Return the local address of the gateway

Return type

str

property address_private

Return the private IP address of the gateway for connecting from other machine in the same network

Return type

str

property address_public

Return the public IP address of the gateway for connecting from other machine in the public network

Return type

str

block()[source]

Block the process until user hits KeyboardInterrupt

use_grpc_gateway(port=None)[source]

Change to use gRPC gateway for IO :type port: Optional[int] :param port: the port to change

use_rest_gateway(port=None)[source]

Change to use REST gateway for IO :type port: Optional[int] :param port: the port to change

property workspace_id

Get all Pods’ workspace_id values in a dict

Return type

Dict[str, str]

property identity

Get all Pods’ identity values in a dict

Return type

Dict[str, str]

join(needs, name='joiner', *args, **kwargs)

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

BaseFlow

Returns

the modified Flow