jina.orchestrate.flow.base module#

class jina.orchestrate.flow.base.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
class jina.orchestrate.flow.base.Flow(*, compression: Optional[str] = 'NoCompression', cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_kwargs: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = 9090, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
class jina.orchestrate.flow.base.Flow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)

Bases: PostMixin, JAMLCompatible, ExitStack

Flow is how Jina streamlines and distributes Executors.

property last_deployment#

Last deployment

needs(needs, name='joiner', *args, **kwargs)[source]#

Add a blocker to the Flow, wait until all pods defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]#

Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed.

Parameters
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type

Flow

Returns

the modified Flow

add(*, compression: Optional[str] = 'NoCompression', connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, external: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = 9090, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'][source]#

Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with set() or deleted with remove()

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the Deployment(s) that this Deployment receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.

  • deployment_role (DeploymentRoleType) – the role of the Deployment, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Deployment CLI supports

Return type

Flow

Returns

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]#

Add an inspection on the last changed Deployment in the Flow

Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
        |
        -- PUB-SUB -- InspectDeployment (Hanging)

In this way, InspectDeployment looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters
  • name (str) – name of the Deployment

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#

Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather Deployment

  • include_last_deployment (bool) – if to include the last modified Deployment in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]#

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]#

Start to run all Deployments in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.orchestrate.pods.Pod

Returns

this instance

property num_deployments: int#

Get the number of Deployments in this Flow

Return type

int

property num_pods: int#

Get the number of pods (shards count) in this Flow

Return type

int

property client: BaseClient#

Return a BaseClient object attach to this Flow.

Return type

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='deployment_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the Flow

property port: int#

Return the exposed port of the gateway .. # noqa: DAR201

Return type

int

property host: str#

Return the local address of the gateway .. # noqa: DAR201

Return type

str

property monitoring: bool#

Return if the monitoring is enabled .. # noqa: DAR201

Return type

bool

property port_monitoring: int#

Return if the monitoring is enabled .. # noqa: DAR201

Return type

int

property address_private: str#

Return the private IP address of the gateway for connecting from other machine in the same network

Return type

str

property address_public: str#

Return the public IP address of the gateway for connecting from other machine in the public network

Return type

str

block(stop_event=None)[source]#

Block the Flow until stop_event is set or user hits KeyboardInterrupt

Parameters

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

property protocol: GatewayProtocolType#

Return the protocol of this Flow

Return type

GatewayProtocolType

Returns

the protocol of this Flow

property workspace: str#

Return the workspace path of the flow.

Return type

str

property workspace_id: Dict[str, str]#

Get all Deployments’ workspace_id values in a dict

Return type

Dict[str, str]

property env: Optional[Dict]#

Get all envs to be set in the Flow

Return type

Optional[Dict]

Returns

envs as dict

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

join(needs, name='joiner', *args, **kwargs)#

Add a blocker to the Flow, wait until all pods defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)[source]#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)[source]#

Converts the Flow into a yaml file to run with docker-compose up :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name :type include_gateway: bool :param include_gateway: Defines if the gateway deployment should be included, defaults to True

property client_args: Namespace#

Get Client settings.

# noqa: DAR201

Return type

Namespace

property gateway_args: Namespace#

Get Gateway settings.

# noqa: DAR201

Return type

Namespace

update_network_interface(**kwargs)[source]#

Update the network interface of this Flow (affects Gateway & Client)

Parameters

kwargs – new network settings