jina.serve.executors package#

Submodules#

Module contents#

class jina.serve.executors.BaseExecutor(metas=None, requests=None, runtime_args=None, workspace=None, dynamic_batching=None, **kwargs)[source]#

Bases: JAMLCompatible

The base class of all Executors, can be used to build encoder, indexer, etc.

jina.Executor as an alias for this class.

EXAMPLE USAGE

from jina import Executor, requests, Flow


class MyExecutor(Executor):
    @requests
    def foo(self, docs, **kwargs):
        print(docs)  # process docs here


f = Flow().add(uses=Executor)  # you can add your Executor to a Flow

Any executor inherited from BaseExecutor always has the meta defined in jina.executors.metas.defaults.

All arguments in the __init__() can be specified with a with map in the YAML config. Example:

class MyAwesomeExecutor(Executor):
    def __init__(awesomeness=5):
        pass

is equal to

jtype: MyAwesomeExecutor
with:
    awesomeness: 5

metas and requests are always auto-filled with values from YAML config.

Parameters:
  • metas (Optional[Dict]) – a dict of metas fields

  • requests (Optional[Dict]) – a dict of endpoint-function mapping

  • runtime_args (Optional[Dict]) – a dict of arguments injected from Runtime during runtime

  • kwargs – additional extra keyword arguments to avoid failing when extra params ara passed that are not expected

  • workspace (Optional[str]) – the workspace of the executor. Only used if a workspace is not already provided in metas or runtime_args

  • dynamic_batching (Optional[Dict]) – a dict of endpoint-dynamic_batching config mapping

property requests#

Get the request dictionary corresponding to this specific class

Returns:

Returns the requests corresponding to the specific Executor instance class

close()[source]#

Always invoked as executor is destroyed.

You can write destructor & saving logic here.

Return type:

None

property workspace: Optional[str]#

Get the workspace directory of the Executor.

Return type:

Optional[str]

Returns:

returns the workspace of the current shard of this Executor.

classmethod from_hub(uri, context=None, uses_with=None, uses_metas=None, uses_requests=None, uses_dynamic_batching=None, **kwargs)[source]#

Construct an Executor from Hub.

Parameters:
  • uri (str) – a hub Executor scheme starts with jinahub://

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – other kwargs accepted by the CLI jina hub pull

Return type:

TypeVar(T)

Returns:

the Hub Executor object.

from jina import Executor
from docarray import Document, DocumentArray

executor = Executor.from_hub(
    uri='jinahub://CLIPImageEncoder', install_requirements=True
)
classmethod serve(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, env_from_secret: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_metadata: Optional[dict] = None, grpc_server_options: Optional[dict] = None, host: Optional[List[str]] = ['0.0.0.0'], install_requirements: Optional[bool] = False, log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, no_reduce: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = None, prefer_platform: Optional[str] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_before_address: Optional[str] = None, uses_dynamic_batching: Optional[dict] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)[source]#

Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings.

Parameters:
  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • reload (bool) – If set, the Executor reloads the modules as they change

  • stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that once set will resume the control Flow to main thread.

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • reload – a flag indicating if the Executor should watch the Python files of its implementation to reload the code live while serving.

  • kwargs – other kwargs accepted by the Flow, full list can be found here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>

class StandaloneExecutorType(value)[source]#

Bases: BetterEnum

Type of standalone Executors

EXTERNAL = 0#
SHARED = 1#
static to_kubernetes_yaml(uses, output_base_path, k8s_namespace=None, executor_type=StandaloneExecutorType.EXTERNAL, uses_with=None, uses_metas=None, uses_requests=None, uses_dynamic_batching=None, **kwargs)[source]#

Converts the Executor into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • uses (str) – the Executor to use. Has to be containerized and accessible from K8s

  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • executor_type (Optional[StandaloneExecutorType]) – The type of Executor. Can be external or shared. External Executors include the Gateway. Shared Executors don’t. Defaults to External

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – other kwargs accepted by the Flow, full list can be found here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>

static to_k8s_yaml(uses, output_base_path, k8s_namespace=None, executor_type=StandaloneExecutorType.EXTERNAL, uses_with=None, uses_metas=None, uses_requests=None, uses_dynamic_batching=None, **kwargs)#

Converts the Executor into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • uses (str) – the Executor to use. Has to be containerized and accessible from K8s

  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • executor_type (Optional[StandaloneExecutorType]) – The type of Executor. Can be external or shared. External Executors include the Gateway. Shared Executors don’t. Defaults to External

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – other kwargs accepted by the Flow, full list can be found here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>

static to_docker_compose_yaml(uses, output_path=None, network_name=None, executor_type=StandaloneExecutorType.EXTERNAL, uses_with=None, uses_metas=None, uses_requests=None, uses_dynamic_batching=None, **kwargs)[source]#

Converts the Executor into a yaml file to run with docker-compose up :type uses: str :param uses: the Executor to use. Has to be containerized :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name :type executor_type: Optional[StandaloneExecutorType] :param executor_type: The type of Executor. Can be external or shared. External Executors include the Gateway. Shared Executors don’t. Defaults to External :type uses_with: Optional[Dict] :param uses_with: dictionary of parameters to overwrite from the default config’s with field :type uses_metas: Optional[Dict] :param uses_metas: dictionary of parameters to overwrite from the default config’s metas field :type uses_requests: Optional[Dict] :param uses_requests: dictionary of parameters to overwrite from the default config’s requests field :type uses_dynamic_batching: Optional[Dict] :param uses_dynamic_batching: dictionary of parameters to overwrite from the default config’s requests field :param kwargs: other kwargs accepted by the Flow, full list can be found here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>

static is_valid_jaml(obj)#

Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj: Dict :param obj: yaml object :rtype: bool :return: whether the syntax is valid or not

classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, uses_dynamic_batching=None, **kwargs)#

A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements JAMLCompatible mixin can enjoy this feature, e.g. BaseFlow, BaseExecutor, BaseGateway and all their subclasses.

Support substitutions in YAML:
  • Environment variables: ${{ ENV.VAR }} (recommended), $VAR (deprecated).

  • Context dict (context): ${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}.

  • Internal reference via this and root: ${{this.same_level_key}}, ${{root.root_level_key}}

Substitutions are carried in the order and multiple passes to resolve variables with best effort.

!BaseEncoder
metas:
    name: ${{VAR_A}}  # env or context variables
    workspace: my-${{this.name}}  # internal reference
# load Executor from yaml file
BaseExecutor.load_config('a.yml')

# load Executor from yaml file and substitute environment variables
os.environ['VAR_A'] = 'hello-world'
b = BaseExecutor.load_config('a.yml')
assert b.name == 'hello-world'

# load Executor from yaml file and substitute variables from a dict
b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'})
assert b.name == 'hello-world'

# disable substitute
b = BaseExecutor.load_config('a.yml', substitute=False)
Parameters:
  • source (Union[str, TextIO, Dict]) – the multi-kind source of the configs.

  • allow_py_modules (bool) – allow importing plugins specified by py_modules in YAML at any levels

  • substitute (bool) – substitute environment, internal reference and context variables.

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • extra_search_paths (Optional[List[str]]) – extra paths used when looking for executor yaml files

  • py_modules (Optional[str]) – Optional py_module from which the object need to be loaded

  • runtime_args (Optional[Dict[str, Any]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – kwargs for parse_config_source

Return type:

JAMLCompatible

Returns:

JAMLCompatible object

monitor(name=None, documentation=None)[source]#

Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer. :type name: Optional[str] :param name: the name of the metrics :type documentation: Optional[str] :param documentation: the description of the metrics

Return type:

Optional[MetricsTimer]

Returns:

the given prometheus metrics or None if monitoring is not enable.

requests_by_class = {'BaseExecutor': {}}#
save_config(filename=None)#

Save the object’s config into a YAML file.

Parameters:

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used