jina.orchestrate.flow.base module#
- class jina.orchestrate.flow.base.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
- class jina.orchestrate.flow.base.Flow(*, compression: Optional[str] = 'NoCompression', cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_kwargs: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = 9090, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.orchestrate.flow.base.Flow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
PostMixin
,JAMLCompatible
,ExitStack
Flow is how Jina streamlines and distributes Executors.
- property last_deployment#
Last deployment
- needs(needs, name='joiner', *args, **kwargs)[source]#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type
- Returns
the modified Flow
- needs_all(name='joiner', *args, **kwargs)[source]#
Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
- Parameters
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
- Return type
- Returns
the modified Flow
- add(*, compression: Optional[str] = 'NoCompression', connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, external: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = 9090, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'] [source]#
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
- Parameters
needs (
Union
[str
,Tuple
[str
],List
[str
],None
]) – the name of the Deployment(s) that this Deployment receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.deployment_role (
DeploymentRoleType
) – the role of the Deployment, used for visualization and route planningcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationkwargs – other keyword-value arguments that the Deployment CLI supports
- Return type
- Returns
a (new) Flow object with modification
- inspect(name='inspect', *args, **kwargs)[source]#
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
- Parameters
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
- Return type
- Returns
the new instance of the Flow
- gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
- Return type
- Returns
the modified Flow or the copy of it
See also
- build(copy_flow=False)[source]#
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using
start()
,build()
will be invoked.- Parameters
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification- Return type
- Returns
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
- start()[source]#
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.orchestrate.pods.Pod
- Returns
this instance
- property num_deployments: int#
Get the number of Deployments in this Flow
- Return type
int
- property num_pods: int#
Get the number of pods (shards count) in this Flow
- Return type
int
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type
- plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
- Parameters
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type
- Returns
the Flow
- property port: int#
Return the exposed port of the gateway .. # noqa: DAR201
- Return type
int
- property host: str#
Return the local address of the gateway .. # noqa: DAR201
- Return type
str
- property monitoring: bool#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type
bool
- property port_monitoring: int#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type
int
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type
str
- block(stop_event=None)[source]#
Block the Flow until stop_event is set or user hits KeyboardInterrupt
- Parameters
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
- property protocol: GatewayProtocolType#
Return the protocol of this Flow
- Return type
- Returns
the protocol of this Flow
- property workspace: str#
Return the workspace path of the flow.
- Return type
str
- property workspace_id: Dict[str, str]#
Get all Deployments’
workspace_id
values in a dict- Return type
Dict
[str
,str
]
- property env: Optional[Dict]#
Get all envs to be set in the Flow
- Return type
Optional
[Dict
]- Returns
envs as dict
- expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
- expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
- Parameters
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
- join(needs, name='joiner', *args, **kwargs)#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type
- Returns
the modified Flow
- to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)[source]#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)[source]#
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment name :type include_gateway:bool
:param include_gateway: Defines if the gateway deployment should be included, defaults to True
- property client_args: Namespace#
Get Client settings.
# noqa: DAR201
- Return type
Namespace
- property gateway_args: Namespace#
Get Gateway settings.
# noqa: DAR201
- Return type
Namespace