jina.orchestrate.deployments package#

Subpackages#

Submodules#

Module contents#

class jina.orchestrate.deployments.DeploymentType(name, bases, namespace, **kwargs)[source]#

Bases: ABCMeta, JAMLCompatibleType

Type of Deployment, metaclass of Deployment

mro()#

Return a type’s method resolution order.

register(subclass)#

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

class jina.orchestrate.deployments.Deployment(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, env_from_secret: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_channel_options: Optional[dict] = None, grpc_metadata: Optional[dict] = None, grpc_server_options: Optional[dict] = None, host: Optional[List[str]] = ['0.0.0.0'], install_requirements: Optional[bool] = False, log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'executor', native: Optional[bool] = False, no_reduce: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = None, prefer_platform: Optional[str] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_before_address: Optional[str] = None, uses_dynamic_batching: Optional[dict] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)[source]#

Bases: JAMLCompatible, PostMixin, BaseOrchestrator

A Deployment is an immutable set of pods, which run in replicas. They share the same input and output socket. Internally, the pods can run with the process/thread backend. They can be also run in their own containers :type args: Union[Namespace, Dict, None] :param args: arguments parsed from the CLI :type needs: Optional[Set[str]] :param needs: deployments names of preceding deployments, the output of these deployments are going into the input of this deployment

needs#

used in the jina.flow.Flow to build the graph

update_pod_args()[source]#

Update args of all its pods based on Deployment args. Including head/tail

update_sandbox_args()[source]#

Update args of all its pods based on the host and port returned by Hubble

update_worker_pod_args()[source]#

Update args of all its worker pods based on Deployment args. Does not touch head and tail

property is_sandbox: bool#

Check if this deployment is a sandbox.

Return type:

bool

Returns:

True if this deployment is provided as a sandbox, False otherwise

property role: DeploymentRoleType#

Return the role of this Deployment.

Return type:

DeploymentRoleType

property name: str#

The name of this Deployment.

Return type:

str

property head_host: str#

Get the host of the HeadPod of this deployment .. # noqa: DAR201

Return type:

str

property head_port#

Get the port of the HeadPod of this deployment .. # noqa: DAR201

property head_port_monitoring#

Get the port_monitoring of the HeadPod of this deployment .. # noqa: DAR201

property client: BaseClient#

Return a BaseClient object attach to this Flow.

Return type:

BaseClient

property deployments: List[Dict]#

Get deployments of the deployment. The Deployment just gives one deployment.

Return type:

List[Dict]

Returns:

list of deployments

property tls_enabled#

Checks whether secure connection via tls is enabled for this Deployment.

Returns:

True if tls is enabled, False otherwise

property external: bool#

Check if this deployment is external.

Return type:

bool

Returns:

True if this deployment is provided as an external deployment, False otherwise

property grpc_metadata#

Get the gRPC metadata for this deployment. :return: The gRPC metadata for this deployment. If the deployment is a gateway, return None.

property protocol#
Returns:

the protocol of this deployment

property first_pod_args: Namespace#

Return the first worker pod’s args

Return type:

Namespace

property host: str#

Get the host name of this deployment

Return type:

str

property port#
Returns:

the port of this deployment

property ports: List[int]#

Returns a list of ports exposed by this Deployment. Exposed means these are the ports a Client/Gateway is supposed to communicate with. For sharded deployments this will be the head_port. For non-sharded deployments it will be all replica ports .. # noqa: DAR201

Return type:

List[int]

property hosts: List[str]#

Returns a list of host addresses exposed by this Deployment. Exposed means these are the host a Client/Gateway is supposed to communicate with. For sharded deployments this will be the head host. For non-sharded deployments it will be all replica hosts .. # noqa: DAR201

Return type:

List[str]

property dockerized_uses: bool#

Checks if this Deployment uses a dockerized Executor

Return type:

bool

property head_args: Namespace#

Get the arguments for the head of this Deployment.

Return type:

Namespace

property uses_before_args: Namespace#

Get the arguments for the uses_before of this Deployment.

Return type:

Namespace

property uses_after_args: Namespace#

Get the arguments for the uses_after of this Deployment.

Return type:

Namespace

property all_args: List[Namespace]#

Get all arguments of all Pods in this Deployment.

Return type:

List[Namespace]

property num_pods: int#

Get the number of running Pod

Return type:

int

static get_worker_host(pod_args, pod_is_container, head_is_container)[source]#

Check if the current pod and head are both containerized on the same host If so __docker_host__ needs to be advertised as the worker’s address to the head

Parameters:
  • pod_args – arguments of the worker pod

  • pod_is_container – boolean specifying if pod is to be run in container

  • head_is_container – boolean specifying if head pod is to be run in container

Returns:

host to pass in connection list of the head

start()[source]#

Start to run all Pod in this Deployment.

Return type:

Deployment

Returns:

started deployment

Note

If one of the Pod fails to start, make sure that all of them are properly closed.

wait_start_success()[source]#

Block until all pods starts successfully.

If not successful, it will raise an error hoping the outer function to catch it

Return type:

None

async async_wait_start_success()[source]#

Block until all pods starts successfully.

If not successful, it will raise an error hoping the outer function to catch it

Return type:

None

join()[source]#

Wait until all pods exit

property is_ready: bool#

Checks if Deployment is ready

Note

A Deployment is ready when all the Pods it contains are ready

Return type:

bool

block(stop_event=None)[source]#

Block the Deployment until stop_event is set or user hits KeyboardInterrupt

Parameters:

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that once set will resume the control flow to main thread.

property address_private: str#

Return the private IP address of the gateway for connecting from other machine in the same network

Return type:

str

property address_public: str#

Return the public IP address of the gateway for connecting from other machine in the public network

Return type:

str

callback(**kwds)#

Registers an arbitrary callback and arguments.

Cannot suppress exceptions.

close()#

Immediately unwind the context stack.

delete(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
enter_context(cm)#

Enters the supplied context manager.

If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.

index(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
static is_valid_jaml(obj)#

Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj: Dict :param obj: yaml object :rtype: bool :return: whether the syntax is valid or not

classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, uses_dynamic_batching=None, needs=None, include_gateway=True, noblock_on_start=False, **kwargs)#

A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements JAMLCompatible mixin can enjoy this feature, e.g. BaseFlow, BaseExecutor, BaseGateway and all their subclasses.

Support substitutions in YAML:
  • Environment variables: ${{ ENV.VAR }} (recommended), $VAR (deprecated).

  • Context dict (context): ${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}.

  • Internal reference via this and root: ${{this.same_level_key}}, ${{root.root_level_key}}

Substitutions are carried in the order and multiple passes to resolve variables with best effort.

!BaseEncoder
metas:
    name: ${{VAR_A}}  # env or context variables
    workspace: my-${{this.name}}  # internal reference
# load Executor from yaml file
BaseExecutor.load_config('a.yml')

# load Executor from yaml file and substitute environment variables
os.environ['VAR_A'] = 'hello-world'
b = BaseExecutor.load_config('a.yml')
assert b.name == 'hello-world'

# load Executor from yaml file and substitute variables from a dict
b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'})
assert b.name == 'hello-world'

# disable substitute
b = BaseExecutor.load_config('a.yml', substitute=False)
Parameters:
  • source (Union[str, TextIO, Dict]) – the multi-kind source of the configs.

  • allow_py_modules (bool) – allow importing plugins specified by py_modules in YAML at any levels

  • substitute (bool) – substitute environment, internal reference and context variables.

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • extra_search_paths (Optional[List[str]]) – extra paths used when looking for executor yaml files

  • py_modules (Optional[str]) – Optional py_module from which the object need to be loaded

  • runtime_args (Optional[Dict[str, Any]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • needs (Optional[Set[str]]) – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

  • noblock_on_start (bool) – If set, starting a Pod/Deployment does not block the thread/process. It then relies on ‘ ‘wait_start_success at outer function for the postpone check.

  • kwargs – kwargs for parse_config_source

Return type:

JAMLCompatible

Returns:

JAMLCompatible object

pop_all()#

Preserve the context stack by transferring it to a new instance.

post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=2, backoff_multiplier=1.5, results_in_order=False, stream=True, prefetch=None, return_type=<class 'docarray.array.document.DocumentArray'>, **kwargs)#

Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • results_in_order (bool) – return the results in the same order as the inputs

  • stream (bool) – Applicable only to grpc client. If True, the requests are sent to the target using the gRPC streaming interface otherwise the gRPC unary interface will be used. The value is True by default.

  • prefetch (Optional[int]) – How many Requests are processed from the Client at the same time. If not provided then Gateway prefetch value will be used.

  • return_type (Type[DocumentArray]) – the DocumentArray type to be returned. By default, it is DocumentArray.

  • kwargs – additional parameters

Return type:

Union[DocumentArray, List[Response], None]

Returns:

None or DocumentArray containing all response Documents

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

push(exit)#

Registers a callback with the standard __exit__ method signature.

Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).

save_config(filename=None)#

Save the object’s config into a YAML file.

Parameters:

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

search(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
to_docker_compose_yaml(output_path=None, network_name=None)[source]#

Converts a Jina Deployment into a Docker compose YAML file

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_path (Optional[str]) – The path where to dump the yaml file

  • network_name (Optional[str]) – The name of the network that will be used by the deployment

update(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
to_kubernetes_yaml(output_base_path, k8s_namespace=None)[source]#

Converts a Jina Deployment into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

to_k8s_yaml(output_base_path, k8s_namespace=None)#

Converts a Jina Deployment into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.