importinspectimportmultiprocessingimportosimportthreadingfromconcurrent.futuresimportThreadPoolExecutorfromtypesimportSimpleNamespacefromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Type,Unionfromjinaimport__args_executor_init__,__default_endpoint__fromjina.helperimport(ArgNamespace,T,iscoroutinefunction,run_in_threadpool,typename,)fromjina.jamlimportJAML,JAMLCompatible,env_var_regex,internal_var_regexfromjina.serve.executors.decoratorsimportrequests,store_init_kwargs,wrap_funcifTYPE_CHECKING:fromjinaimportDocumentArray__all__=['BaseExecutor','ReducerExecutor']classExecutorType(type(JAMLCompatible),type):"""The class of Executor type, which is the metaclass of :class:`BaseExecutor`."""def__new__(cls,*args,**kwargs):""" # noqa: DAR101 # noqa: DAR102 :return: Executor class """_cls=super().__new__(cls,*args,**kwargs)returncls.register_class(_cls)@staticmethoddefregister_class(cls):""" Register a class and wrap update, train, aggregate functions. :param cls: The class. :return: The class, after being registered. """reg_cls_set=getattr(cls,'_registered_class',set())cls_id=f'{cls.__module__}.{cls.__name__}'ifcls_idnotinreg_cls_set:arg_spec=inspect.getfullargspec(cls.__init__)ifnotarg_spec.varkwandnot__args_executor_init__.issubset(arg_spec.args):raiseTypeError(f'{cls.__init__} does not follow the full signature of `Executor.__init__`, 'f'please add `**kwargs` to your __init__ function')wrap_func(cls,['__init__'],store_init_kwargs)reg_cls_set.add(cls_id)setattr(cls,'_registered_class',reg_cls_set)returncls
[docs]classBaseExecutor(JAMLCompatible,metaclass=ExecutorType):""" The base class of the executor, can be used to build encoder, indexer, etc. Any executor inherited from :class:`BaseExecutor` always has the **meta** defined in :mod:`jina.executors.metas.defaults`. All arguments in the :func:`__init__` can be specified with a ``with`` map in the YAML config. Example: .. highlight:: python .. code-block:: python class MyAwesomeExecutor: def __init__(awesomeness=5): pass is equal to .. highlight:: yaml .. code-block:: yaml jtype: MyAwesomeExecutor with: awesomeness: 5 """def__init__(self,metas:Optional[Dict]=None,requests:Optional[Dict]=None,runtime_args:Optional[Dict]=None,**kwargs,):"""`metas` and `requests` are always auto-filled with values from YAML config. :param metas: a dict of metas fields :param requests: a dict of endpoint-function mapping :param runtime_args: a dict of arguments injected from :class:`Runtime` during runtime :param kwargs: additional extra keyword arguments to avoid failing when extra params ara passed that are not expected """self._thread_pool=ThreadPoolExecutor(max_workers=1)self._add_metas(metas)self._add_requests(requests)self._add_runtime_args(runtime_args)def_add_runtime_args(self,_runtime_args:Optional[Dict]):if_runtime_args:self.runtime_args=SimpleNamespace(**_runtime_args)else:self.runtime_args=SimpleNamespace()def_add_requests(self,_requests:Optional[Dict]):ifnothasattr(self,'requests'):self.requests={}if_requests:func_names={f.__name__:efore,finself.requests.items()}forendpoint,funcin_requests.items():# the following line must be `getattr(self.__class__, func)` NOT `getattr(self, func)`# this to ensure we always have `_func` as unbound methodiffuncinfunc_names:delself.requests[func_names[func]]_func=getattr(self.__class__,func)ifcallable(_func):# the target function is not decorated with `@requests` yetself.requests[endpoint]=_funceliftypename(_func)=='jina.executors.decorators.FunctionMapper':# the target function is already decorated with `@requests`, need unwrap with `.fn`self.requests[endpoint]=_func.fnelse:raiseTypeError(f'expect {typename(self)}.{func} to be a function, but receiving {typename(_func)}')def_add_metas(self,_metas:Optional[Dict]):fromjina.serve.executors.metasimportget_default_metastmp=get_default_metas()if_metas:tmp.update(_metas)unresolved_attr=Falsetarget=SimpleNamespace()# set self values filtered by those non-exist, and non-expandablefork,vintmp.items():ifnothasattr(target,k):ifisinstance(v,str):ifnotenv_var_regex.findall(v):setattr(target,k,v)else:unresolved_attr=Trueelse:setattr(target,k,v)eliftype(getattr(target,k))==type(v):setattr(target,k,v)ifunresolved_attr:_tmp=vars(self)_tmp['metas']=tmpnew_metas=JAML.expand_dict(_tmp)['metas']fork,vinnew_metas.items():ifnothasattr(target,k):ifisinstance(v,str):ifnot(env_var_regex.findall(v)orinternal_var_regex.findall(v)):setattr(target,k,v)else:raiseValueError(f'{k}={v} is not substitutable or badly referred')else:setattr(target,k,v)# `name` is important as it serves as an identifier of the executor# if not given, then set a name by the ruleifnotgetattr(target,'name',None):setattr(target,'name',self.__class__.__name__)self.metas=target
[docs]defclose(self)->None:""" Always invoked as executor is destroyed. You can write destructor & saving logic here. """pass
def__call__(self,req_endpoint:str,**kwargs):""" # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ifreq_endpointinself.requests:returnself.requests[req_endpoint](self,**kwargs)# unbound method, self is requiredelif__default_endpoint__inself.requests:returnself.requests[__default_endpoint__](self,**kwargs)# unbound method, self is requiredasyncdef__acall__(self,req_endpoint:str,**kwargs):""" # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ifreq_endpointinself.requests:returnawaitself.__acall_endpoint__(req_endpoint,**kwargs)elif__default_endpoint__inself.requests:returnawaitself.__acall_endpoint__(__default_endpoint__,**kwargs)asyncdef__acall_endpoint__(self,req_endpoint,**kwargs):func=self.requests[req_endpoint]ifiscoroutinefunction(func):returnawaitfunc(self,**kwargs)else:returnawaitrun_in_threadpool(func,self._thread_pool,self,**kwargs)@propertydefworkspace(self)->Optional[str]:""" Get the workspace directory of the Executor. :return: returns the workspace of the current shard of this Executor. """workspace=getattr(self.metas,'workspace')orgetattr(self.runtime_args,'workspace',None)ifworkspace:complete_workspace=os.path.join(workspace,self.metas.name)shard_id=getattr(self.runtime_args,'shard_id',None,)ifshard_idisnotNoneandshard_id!=-1:complete_workspace=os.path.join(complete_workspace,str(shard_id))ifnotos.path.exists(complete_workspace):os.makedirs(complete_workspace)returnos.path.abspath(complete_workspace)def__enter__(self):returnselfdef__exit__(self,exc_type,exc_val,exc_tb):self.close()
[docs]@classmethoddeffrom_hub(cls:Type[T],uri:str,context:Optional[Dict[str,Any]]=None,uses_with:Optional[Dict]=None,uses_metas:Optional[Dict]=None,uses_requests:Optional[Dict]=None,**kwargs,)->T:"""Construct an Executor from Hub. :param uri: a hub Executor scheme starts with `jinahub://` :param context: context replacement variables in a dict, the value of the dict is the replacement. :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param kwargs: other kwargs accepted by the CLI ``jina hub pull`` :return: the Hub Executor object. .. highlight:: python .. code-block:: python from jina import Executor from docarray import Document, DocumentArray executor = Executor.from_hub( uri='jinahub://CLIPImageEncoder', install_requirements=True ) """fromjina.hubble.helperimportis_valid_huburi_source=Noneifis_valid_huburi(uri):fromjina.hubble.hubioimportHubIOfromjina.parsers.hubbleimportset_hub_pull_parser_args=ArgNamespace.kwargs2namespace({'no_usage':True,**kwargs},set_hub_pull_parser(),positional_args=(uri,),)_source=HubIO(args=_args).pull()ifnot_sourceor_source.startswith('docker://'):raiseValueError(f'Can not construct a native Executor from {uri}. Looks like you want to use it as a 'f'Docker container, you may want to use it in the Flow via `.add(uses={uri})` instead.')returncls.load_config(_source,context=context,uses_with=uses_with,uses_metas=uses_metas,uses_requests=uses_requests,)
[docs]@classmethoddefserve(cls,uses_with:Optional[Dict]=None,uses_metas:Optional[Dict]=None,uses_requests:Optional[Dict]=None,stop_event:Optional[Union[threading.Event,multiprocessing.Event]]=None,**kwargs,):"""Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings. :param uses_with: dictionary of parameters to overwrite from the default config's with field :param uses_metas: dictionary of parameters to overwrite from the default config's metas field :param uses_requests: dictionary of parameters to overwrite from the default config's requests field :param stop_event: a threading event or a multiprocessing event that once set will resume the control Flow to main thread. :param kwargs: other kwargs accepted by the Flow, full list can be found `here <https://docs.jina.ai/api/jina.orchestrate.flow.base/>` """fromjinaimportFlowf=Flow(**kwargs).add(uses=cls,uses_with=uses_with,uses_metas=uses_metas,uses_requests=uses_requests,)withf:f.block(stop_event)
[docs]classReducerExecutor(BaseExecutor):""" ReducerExecutor is an Executor that performs a reduce operation on a matrix of DocumentArrays coming from shards. ReducerExecutor relies on DocumentArray.reduce_all to merge all DocumentArray into one DocumentArray which will be sent to the next deployment. This Executor only adds a reduce endpoint to the BaseExecutor. """
[docs]@requestsdefreduce(self,docs_matrix:List['DocumentArray']=[],**kwargs):"""Reduce docs_matrix into one `DocumentArray` using `DocumentArray.reduce_all` :param docs_matrix: a List of DocumentArrays to be reduced :param kwargs: extra keyword arguments :return: the reduced DocumentArray """ifdocs_matrix:da=docs_matrix[0]da.reduce_all(docs_matrix[1:])returnda