"""Decorators and wrappers designed for wrapping :class:`BaseExecutor` functions. """importfunctoolsimportinspectfromfunctoolsimportwrapsfromtypingimportTYPE_CHECKING,Callable,Dict,List,Optional,Sequence,Unionfromjina.helperimportconvert_tuple_to_list,iscoroutinefunctionfromjina.serve.executors.metasimportget_default_metasifTYPE_CHECKING:fromjinaimportDocumentArray
[docs]defwrap_func(cls,func_lst,wrapper):"""Wrapping a class method only once, inherited but not overridden method will not be wrapped again :param cls: class :param func_lst: function list to wrap :param wrapper: the wrapper """forf_nameinfunc_lst:ifhasattr(cls,f_name)andall(getattr(cls,f_name)!=getattr(i,f_name,None)foriincls.mro()[1:]):setattr(cls,f_name,wrapper(getattr(cls,f_name)))
[docs]defstore_init_kwargs(func:Callable)->Callable:"""Mark the args and kwargs of :func:`__init__` later to be stored via :func:`save_config` in YAML :param func: the function to decorate :return: the wrapped function """@wraps(func)defarg_wrapper(self,*args,**kwargs):iffunc.__name__!='__init__':raiseTypeError('this decorator should only be used on __init__ method of an executor')taboo={'self','args','kwargs','metas','requests','runtime_args'}_defaults=get_default_metas()taboo.update(_defaults.keys())all_pars=inspect.signature(func).parameterstmp={k:v.defaultfork,vinall_pars.items()ifknotintaboo}tmp_list=[kforkinall_pars.keys()ifknotintaboo]# set args by aligning tmp_list with arg valuesfork,vinzip(tmp_list,args):tmp[k]=v# set kwargsfork,vinkwargs.items():ifkintmp:tmp[k]=vifhasattr(self,'_init_kwargs_dict'):self._init_kwargs_dict.update(tmp)else:self._init_kwargs_dict=tmpconvert_tuple_to_list(self._init_kwargs_dict)f=func(self,*args,**kwargs)returnfreturnarg_wrapper
[docs]defrequests(func:Callable[['DocumentArray',Dict,'DocumentArray',List['DocumentArray'],List['DocumentArray'],],Optional[Union['DocumentArray',Dict]],]=None,*,on:Optional[Union[str,Sequence[str]]]=None,):""" `@requests` defines when a function will be invoked. It has a keyword `on=` to define the endpoint. A class method decorated with plan `@requests` (without `on=`) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found. :param func: the method to decorate :param on: the endpoint string, by convention starts with `/` :return: decorated function """fromjinaimport__args_executor_func__,__default_endpoint__classFunctionMapper:def__init__(self,fn):arg_spec=inspect.getfullargspec(fn)ifnotarg_spec.varkwandnot__args_executor_func__.issubset(arg_spec.args):raiseTypeError(f'{fn} accepts only {arg_spec.args} which is fewer than expected, 'f'please add `**kwargs` to the function signature.')ifiscoroutinefunction(fn):@functools.wraps(fn)asyncdefarg_wrapper(executor_instance,*args,**kwargs):# we need to get the summary from the executor, so we need to access the selfreturnawaitfn(executor_instance,*args,**kwargs)self.fn=arg_wrapperelse:@functools.wraps(fn)defarg_wrapper(executor_instance,*args,**kwargs):# we need to get the summary from the executor, so we need to access the selfreturnfn(executor_instance,*args,**kwargs)self.fn=arg_wrapperdef__set_name__(self,owner,name):self.fn.class_name=owner.__name__ifnothasattr(owner,'requests'):owner.requests={}ifisinstance(on,(list,tuple)):foroinon:owner.requests[o]=self.fnelse:owner.requests[onor__default_endpoint__]=self.fnsetattr(owner,name,self.fn)iffunc:returnFunctionMapper(func)else:returnFunctionMapper
[docs]defmonitor(*,name:Optional[str]=None,documentation:Optional[str]=None,):""" `@monitor()` allow to monitor internal method of your executor. You can access these metrics by enabling the monitoring on your Executor. It will track the time spend calling the function and the number of times it has been called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/. :warning: Don't use this decorator with the @request decorator as it already handle monitoring under the hood :param name: the name of the metrics, by default it is based on the name of the method it decorates :param documentation: the description of the metrics, by default it is based on the name of the method it decorates :return: decorator which takes as an input a single callable """def_decorator(func:Callable):name_=nameifnameelsef'{func.__name__}_seconds'documentation_=(documentationifdocumentationelsef'Time spent calling method {func.__name__}')@functools.wraps(func)def_f(self,*args,**kwargs):metric=self.get_metrics(name_,documentation_)ifmetric:withmetric.time():returnfunc(self,*args,**kwargs)else:returnfunc(self,*args,**kwargs)return_freturn_decorator