[docs]classDataRequestHandler:"""Object to encapsulate the code related to handle the data requests passing to executor and its returned values"""def__init__(self,args:'argparse.Namespace',logger:'JinaLogger',**kwargs):"""Initialize private parameters and execute private loading functions. :param args: args from CLI :param logger: the logger provided by the user :param kwargs: extra keyword arguments """super().__init__()self.args=argsself.args.parallel=self.args.shardsself.logger=loggerself._is_closed=Falseself._load_executor()def_load_executor(self):"""Load the executor to this runtime, specified by ``uses`` CLI argument."""try:self._executor:BaseExecutor=BaseExecutor.load_config(self.args.uses,uses_with=self.args.uses_with,uses_metas=self.args.uses_metas,uses_requests=self.args.uses_requests,runtime_args={'workspace':self.args.workspace,'shard_id':self.args.shard_id,'shards':self.args.shards,'replicas':self.args.replicas,'name':self.args.name,'py_modules':self.args.py_modules,},extra_search_paths=self.args.extra_search_paths,)exceptBadConfigSourceasex:self.logger.error(f'fail to load config from {self.args.uses}, if you are using docker image for --uses, 'f'please use "docker://YOUR_IMAGE_NAME"')raiseExecutorFailToLoadfromexexceptFileNotFoundErrorasex:self.logger.error(f'fail to load file dependency')raiseExecutorFailToLoadfromexexceptExceptionasex:self.logger.critical(f'can not load the executor from {self.args.uses}')raiseExecutorFailToLoadfromex@staticmethoddef_parse_params(parameters:Dict,executor_name:str):parsed_params=parametersspecific_parameters=parameters.get(executor_name,None)ifspecific_parameters:parsed_params.update(**specific_parameters)returnparsed_params
[docs]asyncdefhandle(self,requests:List['DataRequest'])->DataRequest:"""Initialize private parameters and execute private loading functions. :param requests: The messages to handle containing a DataRequest :returns: the processed message """# skip executor if endpoints mismatchif(requests[0].header.exec_endpointnotinself._executor.requestsand__default_endpoint__notinself._executor.requests):self.logger.debug(f'skip executor: mismatch request, exec_endpoint: {requests[0].header.exec_endpoint}, requests: {self._executor.requests}')returnrequests[0]params=self._parse_params(requests[0].parameters,self._executor.metas.name)docs=DataRequestHandler.get_docs_from_request(requests,field='docs',)# executor logicreturn_data=awaitself._executor.__acall__(req_endpoint=requests[0].header.exec_endpoint,docs=docs,parameters=params,docs_matrix=DataRequestHandler.get_docs_matrix_from_request(requests,field='docs',),)# assigning result back to requestifreturn_dataisnotNone:ifisinstance(return_data,DocumentArray):docs=return_dataelifisinstance(return_data,dict):params=requests[0].parametersresults_key='__results__'ifnotresults_keyinparams.keys():params[results_key]=dict()params[results_key].update({self.args.name:return_data})requests[0].parameters=paramselse:raiseTypeError(f'The return type must be DocumentArray / Dict / `None`, 'f'but getting {return_data!r}')DataRequestHandler.replace_docs(requests[0],docs,self.args.output_array_type)returnrequests[0]
[docs]@staticmethoddefreplace_docs(request:List['DataRequest'],docs:'DocumentArray',ndarrray_type:str=None)->None:"""Replaces the docs in a message with new Documents. :param request: The request object :param docs: the new docs to be used :param ndarrray_type: type tensor and embedding will be converted to """request.data.set_docs_convert_arrays(docs,ndarray_type=ndarrray_type)
[docs]@staticmethoddefreplace_parameters(request:List['DataRequest'],parameters:Dict)->None:"""Replaces the parameters in a message with new Documents. :param request: The request object :param parameters: the new parameters to be used """request.parameters=parameters
[docs]@staticmethoddefmerge_routes(requests:List['DataRequest'])->None:"""Merges all routes found in requests into the first message :param requests: The messages containing the requests with the routes to merge """iflen(requests)<=1:returnexisting_executor_routes=[r.executorforrinrequests[0].routes]forrequestinrequests[1:]:forrouteinrequest.routes:ifroute.executornotinexisting_executor_routes:requests[0].routes.append(route)existing_executor_routes.append(route.executor)
[docs]defclose(self):""" Close the data request handler, by closing the executor """ifnotself._is_closed:self._executor.close()self._is_closed=True
[docs]@staticmethoddefget_docs_matrix_from_request(requests:List['DataRequest'],field:str,)->List['DocumentArray']:""" Returns a docs matrix from a list of DataRequest objects. :param requests: List of DataRequest objects :param field: field to be retrieved :return: docs matrix: list of DocumentArray objects """iflen(requests)>1:result=[getattr(request,field)forrequestinrequests]else:result=[getattr(requests[0],field)]# to unify all length=0 DocumentArray (or any other results) will simply considered as None# otherwise, the executor has to handle [None, None, None] or [DocArray(0), DocArray(0), DocArray(0)]len_r=sum(len(r)forrinresult)iflen_r:returnresult
[docs]@staticmethoddefget_parameters_dict_from_request(requests:List['DataRequest'],)->'Dict':""" Returns a parameters dict from a list of DataRequest objects. :param requests: List of DataRequest objects :return: parameters matrix: list of parameters (Dict) objects """key_result='__results__'parameters=requests[0].parametersifkey_resultnotinparameters.keys():parameters[key_result]=dict()# we only merge the results and make the assumption that the others params does not change during executionforreqinrequests:parameters[key_result].update(req.parameters.get(key_result,dict()))returnparameters
[docs]@staticmethoddefget_docs_from_request(requests:List['DataRequest'],field:str,)->'DocumentArray':""" Gets a field from the message :param requests: requests to get the field from :param field: field name to access :returns: DocumentArray extraced from the field from all messages """iflen(requests)>1:result=DocumentArray([dforrinreversed([requestforrequestinrequests])fordingetattr(r,field)])else:result=getattr(requests[0],field)returnresult
[docs]@staticmethoddefreduce(docs_matrix:List['DocumentArray'])->Optional['DocumentArray']:""" Reduces a list of DocumentArrays into one DocumentArray. Changes are applied to the first DocumentArray in-place. Reduction consists in reducing every DocumentArray in `docs_matrix` sequentially using :class:`DocumentArray`.:method:`reduce`. The resulting DocumentArray contains Documents of all DocumentArrays. If a Document exists in many DocumentArrays, data properties are merged with priority to the left-most DocumentArrays (that is, if a data attribute is set in a Document belonging to many DocumentArrays, the attribute value of the left-most DocumentArray is kept). Matches and chunks of a Document belonging to many DocumentArrays are also reduced in the same way. Other non-data properties are ignored. .. note:: - Matches are not kept in a sorted order when they are reduced. You might want to re-sort them in a later step. - The final result depends on the order of DocumentArrays when applying reduction. :param docs_matrix: List of DocumentArrays to be reduced :return: the resulting DocumentArray """ifdocs_matrix:da=docs_matrix[0]da.reduce_all(docs_matrix[1:])returnda
[docs]@staticmethoddefreduce_requests(requests:List['DataRequest'])->'DataRequest':""" Reduces a list of requests containing DocumentArrays inton one request object. Changes are applied to the first request object in-place. Reduction consists in reducing every DocumentArray in `requests` sequentially using :class:`DocumentArray`.:method:`reduce`. The resulting DataRequest object contains Documents of all DocumentArrays inside requests. :param requests: List of DataRequest objects :return: the resulting DataRequest """docs_matrix=DataRequestHandler.get_docs_matrix_from_request(requests,field='docs')# Reduction is applied in-place to the first DocumentArray in the matrixda=DataRequestHandler.reduce(docs_matrix)DataRequestHandler.replace_docs(requests[0],da)params=DataRequestHandler.get_parameters_dict_from_request(requests)DataRequestHandler.replace_parameters(requests[0],params)returnrequests[0]