Source code for jina.serve.runtimes.gateway.request_handling
import asyncio
import copy
from typing import TYPE_CHECKING, Callable, List
from docarray import DocumentArray
from jina.serve.networking import GrpcConnectionPool
from jina.serve.runtimes.gateway.graph.topology_graph import TopologyGraph
if TYPE_CHECKING:
from jina.types.request import Request
[docs]def handle_request(
graph: 'TopologyGraph', connection_pool: 'GrpcConnectionPool'
) -> Callable[['Request'], 'asyncio.Future']:
"""
Function that handles the requests arriving to the gateway. This will be passed to the streamer.
:param graph: The TopologyGraph of the Flow.
:param connection_pool: The connection pool to be used to send messages to specific nodes of the graph
:return: Return a Function that given a Request will return a Future from where to extract the response
"""
def _handle_request(request: 'Request') -> 'asyncio.Future':
request_graph = copy.deepcopy(graph)
# important that the gateway needs to have an instance of the graph per request
if graph.has_filter_conditions:
request_doc_ids = request.data.docs[
:, 'id'
] # used to maintain order of docs that are filtered by executors
tasks_to_respond = []
tasks_to_ignore = []
endpoint = request.header.exec_endpoint
r = request.routes.add()
r.executor = 'gateway'
r.start_time.GetCurrentTime()
# If the request is targeting a specific deployment, we can send directly to the deployment instead of querying the graph
if request.header.target_executor:
tasks_to_respond.extend(
connection_pool.send_request(
request=request,
deployment=request.header.target_executor,
head=True,
endpoint=endpoint,
)
)
else:
for origin_node in request_graph.origin_nodes:
leaf_tasks = origin_node.get_leaf_tasks(
connection_pool, request, None, endpoint=endpoint
)
# Every origin node returns a set of tasks that are the ones corresponding to the leafs of each of their
# subtrees that unwrap all the previous tasks. It starts like a chain of waiting for tasks from previous
# nodes
tasks_to_respond.extend([task for ret, task in leaf_tasks if ret])
tasks_to_ignore.extend([task for ret, task in leaf_tasks if not ret])
def _sort_response_docs(response):
# sort response docs according to their order in the initial request
def sort_by_request_order(doc):
if doc.id in request_doc_ids:
return request_doc_ids.index(doc.id)
else:
return len(request_doc_ids) # put new/unknown docs at the end
sorted_docs = sorted(response.data.docs, key=sort_by_request_order)
response.data.docs = DocumentArray(sorted_docs)
async def _process_results_at_end_gateway(
tasks: List[asyncio.Task], request_graph: TopologyGraph
) -> asyncio.Future:
partial_responses = await asyncio.gather(*tasks)
partial_responses, metadatas = zip(*partial_responses)
filtered_partial_responses = list(
filter(lambda x: x is not None, partial_responses)
)
response = filtered_partial_responses[0]
request_graph.add_routes(response)
if graph.has_filter_conditions:
_sort_response_docs(response)
return response
# In case of empty topologies
if not tasks_to_respond:
r.end_time.GetCurrentTime()
future = asyncio.Future()
future.set_result((request, {}))
tasks_to_respond.append(future)
return asyncio.ensure_future(
_process_results_at_end_gateway(tasks_to_respond, request_graph)
)
return _handle_request
[docs]def handle_result(result: 'Request'):
"""
Function that handles the result when extracted from the request future
:param result: The result returned to the gateway. It extracts the request to be returned to the client
:return: Returns a request to be returned to the client
"""
for route in result.routes:
if route.executor == 'gateway':
route.end_time.GetCurrentTime()
return result