jina.flow.base¶
-
class
jina.flow.base.
BaseFlow
(args=None, env=None, **kwargs)[source]¶ Bases:
jina.jaml.JAMLCompatible
,contextlib.ExitStack
An abstract flow object in Jina.
Note
BaseFlow
does not provide train, index, search interfaces. Please useFlow
orAsyncFlow
.Initialize a flow object
- Parameters
kwargs – other keyword arguments that will be shared by all pods in this flow
More explain on
optimize_level
:As an example, the following flow will generate 6 Peas,
f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(uses='forward', parallel=3)
The optimized version, i.e.
Flow(optimize_level=FlowOptimizeLevel.FULL)
will generate 4 Peas, but it will force theGatewayPea
to take BIND role, as the head and tail routers are removed.-
property
yaml_spec
¶
-
property
last_pod
¶
-
needs
(needs, name='joiner', *args, **kwargs)[source]¶ Add a blocker to the flow, wait until all peas defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
- Return type
~FlowLike
- Returns
the modified flow
-
needs_all
(name='joiner', *args, **kwargs)[source]¶ Collect all hanging Pod so far and add a blocker to the flow, wait until all handing peas completed. :param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification :type name:
str
:param name: the name of this joiner, by default isjoiner
:rtype: ~FlowLike :return: the modified flow
-
add
(needs=None, copy_flow=True, pod_role=<PodRoleType.POD: 0>, **kwargs)[source]¶ Add a pod to the current flow object and return the new modified flow object. The attribute of the pod can be later changed with
set()
or deleted withremove()
Note there are shortcut versions of this method. Recommend to use
add_encoder()
,add_preprocessor()
,add_router()
,add_indexer()
whenever possible.- Parameters
needs (
Union
[str
,Tuple
[str
],List
[str
],None
]) – the name of the pod(s) that this pod receives data from. One can also use ‘pod.Gateway’ to indicate the connection with the gateway.pod_role (
PodRoleType
) – the role of the Pod, used for visualization and route planningcopy_flow (
bool
) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modificationkwargs – other keyword-value arguments that the pod CLI supports
- Return type
~FlowLike
- Returns
a (new) flow object with modification
-
inspect
(name='inspect', *args, **kwargs)[source]¶ Add an inspection on the last changed Pod in the Flow
Internally, it adds two pods to the flow. But no worry, the overhead is minimized and you can remove them by simply give Flow(inspect=FlowInspectType.REMOVE) before using the flow.
Flow -- PUB-SUB -- BasePod(_pass) -- Flow | -- PUB-SUB -- InspectPod (Hanging)
In this way,
InspectPod
looks like a simple_pass
from outside and does not introduce side-effect (e.g. changing the socket type) to the original flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing evaluator into the flow.
See also
- Return type
~FlowLike
-
gather_inspect
(name='gather_inspect', uses='_merge_eval', include_last_pod=True, *args, **kwargs)[source]¶ Gather all inspect pods output into one pod. When the flow has no inspect pod then the flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters
name (
str
) – the name of the gather poduses – the config of the executor, by default is
_pass
include_last_pod (
bool
) – if to include the last modified pod in the flowargs –
kwargs –
- Return type
- Returns
the modified flow or the copy of it
See also
-
build
(copy_flow=False)[source]¶ Build the current flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using flow with the context manager, or using
start()
,build()
will be invoked.- Parameters
copy_flow (
bool
) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification- Return type
~FlowLike
- Returns
the current flow (by default)
Note
copy_flow=True
is recommended if you are building the same flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
-
start
()[source]¶ Start to run all Pods in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.peapods.peas.BasePea
-
property
num_pods
¶ Get the number of pods in this flow
- Return type
int
-
property
num_peas
¶ Get the number of peas (parallel count) in this flow
- Return type
int
-
plot
(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=False)[source]¶ Visualize the flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='pod_a').plot('flow.svg')
- Parameters
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type
~FlowLike
- Returns
the flow
-
to_swarm_yaml
(path)[source]¶ Generate the docker swarm YAML compose file
- Parameters
path (
TextIO
) – the output yaml path
-
property
port_expose
¶ Return the exposed port of the gateway
- Return type
int
-
property
host
¶ Return the local address of the gateway
- Return type
str
-
property
address_private
¶ Return the private IP address of the gateway for connecting from other machine in the same network
- Return type
str
-
property
address_public
¶ Return the public IP address of the gateway for connecting from other machine in the public network
- Return type
str
-
join
(needs, name='joiner', *args, **kwargs)¶ Add a blocker to the flow, wait until all peas defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
- Return type
~FlowLike
- Returns
the modified flow