jina.orchestrate.deployments package#
Subpackages#
Submodules#
Module contents#
- class jina.orchestrate.deployments.DeploymentType(name, bases, namespace, **kwargs)[source]#
Bases:
ABCMeta
,JAMLCompatibleType
Type of Deployment, metaclass of
Deployment
- mro()#
Return a type’s method resolution order.
- register(subclass)#
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- class jina.orchestrate.deployments.Deployment(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, env_from_secret: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_channel_options: Optional[dict] = None, grpc_metadata: Optional[dict] = None, grpc_server_options: Optional[dict] = None, host: Optional[List[str]] = ['0.0.0.0'], install_requirements: Optional[bool] = False, log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'executor', native: Optional[bool] = False, no_reduce: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = None, prefer_platform: Optional[str] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_before_address: Optional[str] = None, uses_dynamic_batching: Optional[dict] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)[source]#
Bases:
JAMLCompatible
,PostMixin
,BaseOrchestrator
A Deployment is an immutable set of pods, which run in replicas. They share the same input and output socket. Internally, the pods can run with the process/thread backend. They can be also run in their own containers :type args:
Union
[Namespace
,Dict
,None
] :param args: arguments parsed from the CLI :type needs:Optional
[Set
[str
]] :param needs: deployments names of preceding deployments, the output of these deployments are going into the input of this deployment- needs#
used in the
jina.flow.Flow
to build the graph
- update_pod_args()[source]#
Update args of all its pods based on Deployment args. Including head/tail
- update_sandbox_args()[source]#
Update args of all its pods based on the host and port returned by Hubble
- update_worker_pod_args()[source]#
Update args of all its worker pods based on Deployment args. Does not touch head and tail
- property is_sandbox: bool#
Check if this deployment is a sandbox.
- Return type:
bool
- Returns:
True if this deployment is provided as a sandbox, False otherwise
- property role: DeploymentRoleType#
Return the role of this
Deployment
.- Return type:
- property name: str#
The name of this
Deployment
.- Return type:
str
- property head_host: str#
Get the host of the HeadPod of this deployment .. # noqa: DAR201
- Return type:
str
- property head_port#
Get the port of the HeadPod of this deployment .. # noqa: DAR201
- property head_port_monitoring#
Get the port_monitoring of the HeadPod of this deployment .. # noqa: DAR201
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type:
- property deployments: List[Dict]#
Get deployments of the deployment. The Deployment just gives one deployment.
- Return type:
List
[Dict
]- Returns:
list of deployments
- property tls_enabled#
Checks whether secure connection via tls is enabled for this Deployment.
- Returns:
True if tls is enabled, False otherwise
- property external: bool#
Check if this deployment is external.
- Return type:
bool
- Returns:
True if this deployment is provided as an external deployment, False otherwise
- property grpc_metadata#
Get the gRPC metadata for this deployment. :return: The gRPC metadata for this deployment. If the deployment is a gateway, return None.
- property protocol#
- Returns:
the protocol of this deployment
- property first_pod_args: Namespace#
Return the first worker pod’s args
- Return type:
Namespace
- property host: str#
Get the host name of this deployment
- Return type:
str
- property port#
- Returns:
the port of this deployment
- property ports: List[int]#
Returns a list of ports exposed by this Deployment. Exposed means these are the ports a Client/Gateway is supposed to communicate with. For sharded deployments this will be the head_port. For non-sharded deployments it will be all replica ports .. # noqa: DAR201
- Return type:
List
[int
]
- property hosts: List[str]#
Returns a list of host addresses exposed by this Deployment. Exposed means these are the host a Client/Gateway is supposed to communicate with. For sharded deployments this will be the head host. For non-sharded deployments it will be all replica hosts .. # noqa: DAR201
- Return type:
List
[str
]
- property dockerized_uses: bool#
Checks if this Deployment uses a dockerized Executor
- Return type:
bool
- property head_args: Namespace#
Get the arguments for the head of this Deployment.
- Return type:
Namespace
- property uses_before_args: Namespace#
Get the arguments for the uses_before of this Deployment.
- Return type:
Namespace
- property uses_after_args: Namespace#
Get the arguments for the uses_after of this Deployment.
- Return type:
Namespace
- property all_args: List[Namespace]#
Get all arguments of all Pods in this Deployment.
- Return type:
List
[Namespace
]
- property num_pods: int#
Get the number of running
Pod
- Return type:
int
- static get_worker_host(pod_args, pod_is_container, head_is_container)[source]#
Check if the current pod and head are both containerized on the same host If so __docker_host__ needs to be advertised as the worker’s address to the head
- Parameters:
pod_args – arguments of the worker pod
pod_is_container – boolean specifying if pod is to be run in container
head_is_container – boolean specifying if head pod is to be run in container
- Returns:
host to pass in connection list of the head
- start()[source]#
Start to run all
Pod
in this Deployment.- Return type:
- Returns:
started deployment
Note
If one of the
Pod
fails to start, make sure that all of them are properly closed.
- wait_start_success()[source]#
Block until all pods starts successfully.
If not successful, it will raise an error hoping the outer function to catch it
- Return type:
None
- async async_wait_start_success()[source]#
Block until all pods starts successfully.
If not successful, it will raise an error hoping the outer function to catch it
- Return type:
None
- property is_ready: bool#
Checks if Deployment is ready
Note
A Deployment is ready when all the Pods it contains are ready
- Return type:
bool
- block(stop_event=None)[source]#
Block the Deployment until stop_event is set or user hits KeyboardInterrupt
- Parameters:
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that once set will resume the control flow to main thread.
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type:
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type:
str
- callback(**kwds)#
Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
- close()#
Immediately unwind the context stack.
- delete(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- enter_context(cm)#
Enters the supplied context manager.
If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.
- index(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- static is_valid_jaml(obj)#
Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj:
Dict
:param obj: yaml object :rtype:bool
:return: whether the syntax is valid or not
- classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, uses_dynamic_batching=None, needs=None, include_gateway=True, noblock_on_start=False, **kwargs)#
A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements
JAMLCompatible
mixin can enjoy this feature, e.g.BaseFlow
,BaseExecutor
,BaseGateway
and all their subclasses.- Support substitutions in YAML:
Environment variables:
${{ ENV.VAR }}
(recommended),$VAR
(deprecated).Context dict (
context
):${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}
.Internal reference via
this
androot
:${{this.same_level_key}}
,${{root.root_level_key}}
Substitutions are carried in the order and multiple passes to resolve variables with best effort.
!BaseEncoder metas: name: ${{VAR_A}} # env or context variables workspace: my-${{this.name}} # internal reference
# load Executor from yaml file BaseExecutor.load_config('a.yml') # load Executor from yaml file and substitute environment variables os.environ['VAR_A'] = 'hello-world' b = BaseExecutor.load_config('a.yml') assert b.name == 'hello-world' # load Executor from yaml file and substitute variables from a dict b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'}) assert b.name == 'hello-world' # disable substitute b = BaseExecutor.load_config('a.yml', substitute=False)
- Parameters:
source (
Union
[str
,TextIO
,Dict
]) – the multi-kind source of the configs.allow_py_modules (
bool
) – allow importing plugins specified bypy_modules
in YAML at any levelssubstitute (
bool
) – substitute environment, internal reference and context variables.context (
Optional
[Dict
[str
,Any
]]) – context replacement variables in a dict, the value of the dict is the replacement.uses_with (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s with fielduses_metas (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s metas fielduses_requests (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s requests fieldextra_search_paths (
Optional
[List
[str
]]) – extra paths used when looking for executor yaml filespy_modules (
Optional
[str
]) – Optional py_module from which the object need to be loadedruntime_args (
Optional
[Dict
[str
,Any
]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml configuses_dynamic_batching (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s dynamic_batching fieldneeds (
Optional
[Set
[str
]]) – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to Truenoblock_on_start (
bool
) – If set, starting a Pod/Deployment does not block the thread/process. It then relies on ‘ ‘wait_start_success at outer function for the postpone check.kwargs – kwargs for parse_config_source
- Return type:
- Returns:
JAMLCompatible
object
- pop_all()#
Preserve the context stack by transferring it to a new instance.
- post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=2, backoff_multiplier=1.5, results_in_order=False, stream=True, prefetch=None, return_type=<class 'docarray.array.document.DocumentArray'>, **kwargs)#
Post a general data request to the Flow.
- Parameters:
inputs (
Optional
[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.on (
str
) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.on_done (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is resolved.on_error (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is rejected.on_always (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is either resolved or rejected.parameters (
Optional
[Dict
]) – the kwargs that will be sent to the executortarget_executor (
Optional
[str
]) – a regex string. Only matching Executors will process the request.request_size (
int
) – the number of Documents per request. <=0 means all inputs in one request.show_progress (
bool
) – if set, client will show a progress bar on receiving every request.continue_on_error (
bool
) – if set, a Request that causes an error will be logged only without blocking the further requests.return_responses (
bool
) – if set to True, the result will come as Response and not as a DocumentArraymax_attempts (
int
) – Number of sending attempts, including the original request.initial_backoff (
float
) – The first retry will happen with a delay of random(0, initial_backoff)max_backoff (
float
) – The maximum accepted backoff after the exponential incremental delaybackoff_multiplier (
float
) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))results_in_order (
bool
) – return the results in the same order as the inputsstream (
bool
) – Applicable only to grpc client. If True, the requests are sent to the target using the gRPC streaming interface otherwise the gRPC unary interface will be used. The value is True by default.prefetch (
Optional
[int
]) – How many Requests are processed from the Client at the same time. If not provided then Gateway prefetch value will be used.return_type (
Type
[DocumentArray
]) – the DocumentArray type to be returned. By default, it is DocumentArray.kwargs – additional parameters
- Return type:
Union
[DocumentArray
,List
[Response],None
]- Returns:
None or DocumentArray containing all response Documents
Warning
target_executor
usesre.match
for checking if the pattern is matched.target_executor=='foo'
will match both deployments with the namefoo
andfoo_what_ever_suffix
.
- push(exit)#
Registers a callback with the standard __exit__ method signature.
Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).
- save_config(filename=None)#
Save the object’s config into a YAML file.
- Parameters:
filename (
Optional
[str
]) – file path of the yaml file, if not given thenconfig_abspath
is used
- search(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- to_docker_compose_yaml(output_path=None, network_name=None)[source]#
Converts a Jina Deployment into a Docker compose YAML file
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_path (
Optional
[str
]) – The path where to dump the yaml filenetwork_name (
Optional
[str
]) – The name of the network that will be used by the deployment
- update(inputs: ~typing.Optional[InputType] = None, on_done: ~typing.Optional[CallbackFnType] = None, on_error: ~typing.Optional[CallbackFnType] = None, on_always: ~typing.Optional[CallbackFnType] = None, parameters: ~typing.Optional[~typing.Dict] = None, target_executor: ~typing.Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 2, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, prefetch: ~typing.Optional[int] = None, return_type: ~typing.Type[~docarray.array.document.DocumentArray] = <class 'docarray.array.document.DocumentArray'>, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- to_kubernetes_yaml(output_base_path, k8s_namespace=None)[source]#
Converts a Jina Deployment into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.
- to_k8s_yaml(output_base_path, k8s_namespace=None)#
Converts a Jina Deployment into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.