[docs]classRequestHandler:""" Class that handles the requests arriving to the gateway and the result extracted from the requests future. :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler :param runtime_name: optional runtime_name that will be registered during monitoring """def__init__(self,metrics_registry:Optional['CollectorRegistry']=None,runtime_name:Optional[str]=None,):self.request_init_time={}ifmetrics_registryelseNoneself._executor_endpoint_mapping=Noneifmetrics_registry:withImportExtensions(required=True,help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',):fromprometheus_clientimportSummaryself._summary=Summary('receiving_request_seconds','Time spent processing request',registry=metrics_registry,namespace='jina',labelnames=('runtime_name',),).labels(runtime_name)else:self._summary=None
[docs]defhandle_request(self,graph:'TopologyGraph',connection_pool:'GrpcConnectionPool')->Callable[['Request'],'asyncio.Future']:""" Function that handles the requests arriving to the gateway. This will be passed to the streamer. :param graph: The TopologyGraph of the Flow. :param connection_pool: The connection pool to be used to send messages to specific nodes of the graph :return: Return a Function that given a Request will return a Future from where to extract the response """asyncdefgather_endpoints(request_graph):def_get_all_nodes(node,accum,accum_names):ifnode.namenotinaccum_names:accum.append(node)accum_names.append(node.name)forninnode.outgoing_nodes:_get_all_nodes(n,accum,accum_names)returnaccum,accum_namesnodes=[]node_names=[]fororigin_nodeinrequest_graph.origin_nodes:subtree_nodes,subtree_node_names=_get_all_nodes(origin_node,[],[])forst_node,st_node_nameinzip(subtree_nodes,subtree_node_names):ifst_node_namenotinnode_names:nodes.append(st_node)node_names.append(st_node_name)tasks_to_get_endpoints=[node.get_endpoints(connection_pool)fornodeinnodes]endpoints=awaitasyncio.gather(*tasks_to_get_endpoints)self._executor_endpoint_mapping={}fornode,(endp,_)inzip(nodes,endpoints):self._executor_endpoint_mapping[node.name]=endp.endpointsdef_handle_request(request:'Request')->'asyncio.Future':ifself._summary:self.request_init_time[request.request_id]=time.time()# important that the gateway needs to have an instance of the graph per requestrequest_graph=copy.deepcopy(graph)ifgraph.has_filter_conditions:request_doc_ids=request.data.docs[:,'id']# used to maintain order of docs that are filtered by executorstasks_to_respond=[]tasks_to_ignore=[]endpoint=request.header.exec_endpointr=request.routes.add()r.executor='gateway'r.start_time.GetCurrentTime()# If the request is targeting a specific deployment, we can send directly to the deployment instead of# querying the graphfororigin_nodeinrequest_graph.origin_nodes:leaf_tasks=origin_node.get_leaf_tasks(connection_pool,request,None,endpoint=endpoint,executor_endpoint_mapping=self._executor_endpoint_mapping,target_executor_pattern=request.header.target_executor,)# Every origin node returns a set of tasks that are the ones corresponding to the leafs of each of their# subtrees that unwrap all the previous tasks. It starts like a chain of waiting for tasks from previous# nodestasks_to_respond.extend([taskforret,taskinleaf_tasksifret])tasks_to_ignore.extend([taskforret,taskinleaf_tasksifnotret])def_sort_response_docs(response):# sort response docs according to their order in the initial requestdefsort_by_request_order(doc):ifdoc.idinrequest_doc_ids:returnrequest_doc_ids.index(doc.id)else:returnlen(request_doc_ids)# put new/unknown docs at the endsorted_docs=sorted(response.data.docs,key=sort_by_request_order)response.data.docs=DocumentArray(sorted_docs)asyncdef_process_results_at_end_gateway(tasks:List[asyncio.Task],request_graph:TopologyGraph)->asyncio.Future:ifself._executor_endpoint_mappingisNone:awaitasyncio.gather(gather_endpoints(request_graph))partial_responses=awaitasyncio.gather(*tasks)partial_responses,metadatas=zip(*partial_responses)filtered_partial_responses=list(filter(lambdax:xisnotNone,partial_responses))response=filtered_partial_responses[0]request_graph.add_routes(response)ifgraph.has_filter_conditions:_sort_response_docs(response)returnresponse# In case of empty topologiesifnottasks_to_respond:r.end_time.GetCurrentTime()future=asyncio.Future()future.set_result((request,{}))tasks_to_respond.append(future)returnasyncio.ensure_future(_process_results_at_end_gateway(tasks_to_respond,request_graph))return_handle_request
[docs]defhandle_result(self)->Callable[['Request'],'asyncio.Future']:""" Function that handles the result when extracted from the request future :return: Return a Function that returns a request to be returned to the client """def_handle_result(result:'Request'):""" Function that handles the result when extracted from the request future :param result: The result returned to the gateway. It extracts the request to be returned to the client :return: Returns a request to be returned to the client """forrouteinresult.routes:ifroute.executor=='gateway':route.end_time.GetCurrentTime()ifself._summary:self._summary.observe(time.time()-self.request_init_time[result.request_id])returnresultreturn_handle_result