Source code for jina.serve.executors

import contextlib
import inspect
import multiprocessing
import os
import threading
import warnings
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, Union

from jina import __args_executor_init__, __default_endpoint__
from jina.enums import BetterEnum
from jina.helper import ArgNamespace, T, iscoroutinefunction, typename
from jina.importer import ImportExtensions
from jina.jaml import JAML, JAMLCompatible, env_var_regex, internal_var_regex
from jina.serve.executors.decorators import requests, store_init_kwargs, wrap_func

if TYPE_CHECKING:
    from docarray import DocumentArray
    from prometheus_client import Summary

__all__ = ['BaseExecutor', 'ReducerExecutor']


class ExecutorType(type(JAMLCompatible), type):
    """The class of Executor type, which is the metaclass of :class:`BaseExecutor`."""

    def __new__(cls, *args, **kwargs):
        """
        # noqa: DAR101
        # noqa: DAR102

        :return: Executor class
        """
        _cls = super().__new__(cls, *args, **kwargs)
        return cls.register_class(_cls)

    @staticmethod
    def register_class(cls):
        """
        Register a class and wrap update, train, aggregate functions.

        :param cls: The class.
        :return: The class, after being registered.
        """

        reg_cls_set = getattr(cls, '_registered_class', set())

        cls_id = f'{cls.__module__}.{cls.__name__}'
        if cls_id not in reg_cls_set:
            arg_spec = inspect.getfullargspec(cls.__init__)

            if not arg_spec.varkw and not __args_executor_init__.issubset(
                arg_spec.args
            ):
                raise TypeError(
                    f'{cls.__init__} does not follow the full signature of `Executor.__init__`, '
                    f'please add `**kwargs` to your __init__ function'
                )
            wrap_func(cls, ['__init__'], store_init_kwargs)

            reg_cls_set.add(cls_id)
            setattr(cls, '_registered_class', reg_cls_set)
        return cls


[docs]class BaseExecutor(JAMLCompatible, metaclass=ExecutorType): """ The base class of the executor, can be used to build encoder, indexer, etc. Any executor inherited from :class:`BaseExecutor` always has the **meta** defined in :mod:`jina.executors.metas.defaults`. All arguments in the :func:`__init__` can be specified with a ``with`` map in the YAML config. Example: .. highlight:: python .. code-block:: python class MyAwesomeExecutor: def __init__(awesomeness=5): pass is equal to .. highlight:: yaml .. code-block:: yaml jtype: MyAwesomeExecutor with: awesomeness: 5 """ def __init__( self, metas: Optional[Dict] = None, requests: Optional[Dict] = None, runtime_args: Optional[Dict] = None, **kwargs, ): """`metas` and `requests` are always auto-filled with values from YAML config. :param metas: a dict of metas fields :param requests: a dict of endpoint-function mapping :param runtime_args: a dict of arguments injected from :class:`Runtime` during runtime :param kwargs: additional extra keyword arguments to avoid failing when extra params ara passed that are not expected """ self._add_metas(metas) self._add_requests(requests) self._add_runtime_args(runtime_args) self._init_monitoring() def _add_runtime_args(self, _runtime_args: Optional[Dict]): if _runtime_args: self.runtime_args = SimpleNamespace(**_runtime_args) else: self.runtime_args = SimpleNamespace() def _init_monitoring(self): if ( hasattr(self.runtime_args, 'metrics_registry') and self.runtime_args.metrics_registry ): with ImportExtensions( required=True, help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina', ): from prometheus_client import Summary self._summary_method = Summary( 'process_request_seconds', 'Time spent when calling the executor request method', registry=self.runtime_args.metrics_registry, namespace='jina', labelnames=('executor', 'executor_endpoint', 'runtime_name'), ) self._metrics_buffer = {'process_request_seconds': self._summary_method} else: self._summary_method = None self._metrics_buffer = None def _add_requests(self, _requests: Optional[Dict]): if not hasattr(self, 'requests'): self.requests = {} if _requests: func_names = {f.__name__: e for e, f in self.requests.items()} for endpoint, func in _requests.items(): # the following line must be `getattr(self.__class__, func)` NOT `getattr(self, func)` # this to ensure we always have `_func` as unbound method if func in func_names: del self.requests[func_names[func]] _func = getattr(self.__class__, func) if callable(_func): # the target function is not decorated with `@requests` yet self.requests[endpoint] = _func elif typename(_func) == 'jina.executors.decorators.FunctionMapper': # the target function is already decorated with `@requests`, need unwrap with `.fn` self.requests[endpoint] = _func.fn else: raise TypeError( f'expect {typename(self)}.{func} to be a function, but receiving {typename(_func)}' ) def _add_metas(self, _metas: Optional[Dict]): from jina.serve.executors.metas import get_default_metas tmp = get_default_metas() if _metas: tmp.update(_metas) unresolved_attr = False target = SimpleNamespace() # set self values filtered by those non-exist, and non-expandable for k, v in tmp.items(): if k == 'workspace' and not (v is None or v == ''): warnings.warn( 'Setting `workspace` via `metas.workspace` is deprecated. ' 'Instead, use `f.add(..., workspace=...)` when defining a a Flow in Python; ' 'the `workspace` parameter when defining a Flow using YAML; ' 'or `--workspace` when starting an Executor using the CLI.', category=DeprecationWarning, ) if not hasattr(target, k): if isinstance(v, str): if not env_var_regex.findall(v): setattr(target, k, v) else: unresolved_attr = True else: setattr(target, k, v) elif type(getattr(target, k)) == type(v): setattr(target, k, v) if unresolved_attr: _tmp = vars(self) _tmp['metas'] = tmp new_metas = JAML.expand_dict(_tmp)['metas'] for k, v in new_metas.items(): if not hasattr(target, k): if isinstance(v, str): if not ( env_var_regex.findall(v) or internal_var_regex.findall(v) ): setattr(target, k, v) else: raise ValueError( f'{k}={v} is not substitutable or badly referred' ) else: setattr(target, k, v) # `name` is important as it serves as an identifier of the executor # if not given, then set a name by the rule if not getattr(target, 'name', None): setattr(target, 'name', self.__class__.__name__) self.metas = target
[docs] def close(self) -> None: """ Always invoked as executor is destroyed. You can write destructor & saving logic here. """ pass
def __call__(self, req_endpoint: str, **kwargs): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ if req_endpoint in self.requests: return self.requests[req_endpoint]( self, **kwargs ) # unbound method, self is required elif __default_endpoint__ in self.requests: return self.requests[__default_endpoint__]( self, **kwargs ) # unbound method, self is required async def __acall__(self, req_endpoint: str, **kwargs): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ if req_endpoint in self.requests: return await self.__acall_endpoint__(req_endpoint, **kwargs) elif __default_endpoint__ in self.requests: return await self.__acall_endpoint__(__default_endpoint__, **kwargs) async def __acall_endpoint__(self, req_endpoint, **kwargs): func = self.requests[req_endpoint] runtime_name = ( self.runtime_args.name if hasattr(self.runtime_args, 'name') else None ) _summary = ( self._summary_method.labels( self.__class__.__name__, req_endpoint, runtime_name ).time() if self._summary_method else contextlib.nullcontext() ) with _summary: if iscoroutinefunction(func): return await func(self, **kwargs) else: return func(self, **kwargs) @property def workspace(self) -> Optional[str]: """ Get the workspace directory of the Executor. :return: returns the workspace of the current shard of this Executor. """ workspace = ( getattr(self.runtime_args, 'workspace', None) or getattr(self.metas, 'workspace') or os.environ.get('JINA_DEFAULT_WORKSPACE_BASE') ) if workspace: complete_workspace = os.path.join(workspace, self.metas.name) shard_id = getattr( self.runtime_args, 'shard_id', None, ) if shard_id is not None and shard_id != -1: complete_workspace = os.path.join(complete_workspace, str(shard_id)) if not os.path.exists(complete_workspace): os.makedirs(complete_workspace) return os.path.abspath(complete_workspace) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] @classmethod def from_hub( cls: Type[T], uri: str, context: Optional[Dict[str, Any]] = None, uses_with: Optional[Dict] = None, uses_metas: Optional[Dict] = None, uses_requests: Optional[Dict] = None, **kwargs, ) -> T: """Construct an Executor from Hub. :param uri: a hub Executor scheme starts with `jinahub://` :param context: context replacement variables in a dict, the value of the dict is the replacement. :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param kwargs: other kwargs accepted by the CLI ``jina hub pull`` :return: the Hub Executor object. .. highlight:: python .. code-block:: python from jina import Executor from docarray import Document, DocumentArray executor = Executor.from_hub( uri='jinahub://CLIPImageEncoder', install_requirements=True ) """ from jina.hubble.helper import is_valid_huburi _source = None if is_valid_huburi(uri): from jina.hubble.hubio import HubIO from jina.parsers.hubble import set_hub_pull_parser _args = ArgNamespace.kwargs2namespace( {'no_usage': True, **kwargs}, set_hub_pull_parser(), positional_args=(uri,), ) _source = HubIO(args=_args).pull() if not _source or _source.startswith('docker://'): raise ValueError( f'Can not construct a native Executor from {uri}. Looks like you want to use it as a ' f'Docker container, you may want to use it in the Flow via `.add(uses={uri})` instead.' ) return cls.load_config( _source, context=context, uses_with=uses_with, uses_metas=uses_metas, uses_requests=uses_requests, )
[docs] @classmethod def serve( cls, uses_with: Optional[Dict] = None, uses_metas: Optional[Dict] = None, uses_requests: Optional[Dict] = None, stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None, **kwargs, ): """Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings. :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param stop_event: a threading event or a multiprocessing event that once set will resume the control Flow to main thread. :param kwargs: other kwargs accepted by the Flow, full list can be found `here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>` """ from jina import Flow f = Flow(**kwargs).add( uses=cls, uses_with=uses_with, uses_metas=uses_metas, uses_requests=uses_requests, ) with f: f.block(stop_event)
[docs] class StandaloneExecutorType(BetterEnum): """ Type of standalone Executors """ EXTERNAL = 0 # served by a gateway SHARED = 1 # not served by a gateway, served by head/worker
[docs] @staticmethod def to_k8s_yaml( uses: str, output_base_path: str, k8s_namespace: Optional[str] = None, executor_type: Optional[ StandaloneExecutorType ] = StandaloneExecutorType.EXTERNAL, uses_with: Optional[Dict] = None, uses_metas: Optional[Dict] = None, uses_requests: Optional[Dict] = None, **kwargs, ): """ Converts the Executor into a set of yaml deployments to deploy in Kubernetes. If you don't want to rebuild image on Jina Hub, you can set `JINA_HUB_NO_IMAGE_REBUILD` environment variable. :param uses: the Executor to use. Has to be containerized and accessible from K8s :param output_base_path: The base path where to dump all the yaml files :param k8s_namespace: The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used. :param executor_type: The type of Executor. Can be external or shared. External Executors include the Gateway. Shared Executors don't. Defaults to External :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param kwargs: other kwargs accepted by the Flow, full list can be found `here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>` """ from jina import Flow f = Flow(**kwargs).add( uses=uses, uses_with=uses_with, uses_metas=uses_metas, uses_requests=uses_requests, ) f.to_k8s_yaml( output_base_path=output_base_path, k8s_namespace=k8s_namespace, include_gateway=executor_type == BaseExecutor.StandaloneExecutorType.EXTERNAL, )
[docs] @staticmethod def to_docker_compose_yaml( uses: str, output_path: Optional[str] = None, network_name: Optional[str] = None, executor_type: Optional[ StandaloneExecutorType ] = StandaloneExecutorType.EXTERNAL, uses_with: Optional[Dict] = None, uses_metas: Optional[Dict] = None, uses_requests: Optional[Dict] = None, **kwargs, ): """ Converts the Executor into a yaml file to run with `docker-compose up` :param uses: the Executor to use. Has to be containerized :param output_path: The output path for the yaml file :param network_name: The name of the network that will be used by the deployment name :param executor_type: The type of Executor. Can be external or shared. External Executors include the Gateway. Shared Executors don't. Defaults to External :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param kwargs: other kwargs accepted by the Flow, full list can be found `here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>` """ from jina import Flow f = Flow(**kwargs).add( uses=uses, uses_with=uses_with, uses_metas=uses_metas, uses_requests=uses_requests, ) f.to_docker_compose_yaml( output_path=output_path, network_name=network_name, include_gateway=executor_type == BaseExecutor.StandaloneExecutorType.EXTERNAL, )
[docs] def get_metrics( self, name: Optional[str] = None, documentation: Optional[str] = None ) -> Optional['Summary']: """ Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer. :param name: the name of the metrics :param documentation: the description of the metrics :return: the given prometheus metrics or None if monitoring is not enable. """ if self._metrics_buffer: if name not in self._metrics_buffer: from prometheus_client import Summary self._metrics_buffer[name] = Summary( name, documentation, registry=self.runtime_args.metrics_registry, namespace='jina', labelnames=('runtime_name',), ).labels(self.runtime_args.name) return self._metrics_buffer[name] else: return None
[docs]class ReducerExecutor(BaseExecutor): """ ReducerExecutor is an Executor that performs a reduce operation on a matrix of DocumentArrays coming from shards. ReducerExecutor relies on DocumentArray.reduce_all to merge all DocumentArray into one DocumentArray which will be sent to the next deployment. This Executor only adds a reduce endpoint to the BaseExecutor. """
[docs] @requests def reduce(self, docs_matrix: List['DocumentArray'] = [], **kwargs): """Reduce docs_matrix into one `DocumentArray` using `DocumentArray.reduce_all` :param docs_matrix: a List of DocumentArrays to be reduced :param kwargs: extra keyword arguments :return: the reduced DocumentArray """ if docs_matrix: da = docs_matrix[0] da.reduce_all(docs_matrix[1:]) return da