[docs]classTopologyGraph:""" :class TopologyGraph is a class that describes a computational graph of nodes, where each node represents a Deployment that needs to be sent requests in the order respecting the path traversal. :param graph_description: A dictionary describing the topology of the Deployments. 2 special nodes are expected, the name `start-gateway` and `end-gateway` to determine the nodes that receive the very first request and the ones whose response needs to be sent back to the client. All the nodes with no outgoing nodes will be considered to be hanging, and they will be "flagged" so that the user can ignore their tasks and not await them. :param conditions: A dictionary describing which Executors have special conditions to be fullfilled by the `Documents` to be sent to them. """class_ReqReplyNode:def__init__(self,name:str,number_of_parts:int=1,hanging:bool=False,filter_condition:dict=None,):self.name=nameself.outgoing_nodes=[]self.number_of_parts=number_of_partsself.hanging=hangingself.parts_to_send=[]self.start_time=Noneself.end_time=Noneself.status=Noneself._filter_condition=filter_condition@propertydefleaf(self):returnlen(self.outgoing_nodes)==0def_update_requests(self):foriinrange(len(self.parts_to_send)):copy_req=copy.deepcopy(self.parts_to_send[i])filtered_docs=copy_req.docs.find(self._filter_condition)copy_req.data.docs=filtered_docsself.parts_to_send[i]=copy_reqasyncdef_wait_previous_and_send(self,request:DataRequest,previous_task:Optional[asyncio.Task],connection_pool:GrpcConnectionPool,endpoint:Optional[str],):# Check my condition and send request with the conditionmetadata={}ifprevious_taskisnotNone:result=awaitprevious_taskrequest,metadata=result[0],result[1]if'is-error'inmetadata:returnrequest,metadataelifrequestisnotNone:self.parts_to_send.append(request)# this is a specific needsiflen(self.parts_to_send)==self.number_of_parts:self.start_time=datetime.utcnow()ifself._filter_conditionisnotNone:self._update_requests()resp,metadata=awaitconnection_pool.send_requests_once(requests=self.parts_to_send,deployment=self.name,head=True,endpoint=endpoint,)self.end_time=datetime.utcnow()if'is-error'inmetadata:self.status=resp.header.statusreturnresp,metadatareturnNone,{}defget_leaf_tasks(self,connection_pool:GrpcConnectionPool,request_to_send:Optional[DataRequest],previous_task:Optional[asyncio.Task],endpoint:Optional[str]=None,)->List[Tuple[bool,asyncio.Task]]:""" Gets all the tasks corresponding from all the subgraphs born from this node :param connection_pool: The connection_pool need to actually send the requests :param request_to_send: Optional request to be sent when the node is an origin of a graph :param previous_task: Optional task coming from the predecessor of the Node :param endpoint: Optional string defining the endpoint of this request .. note: deployment1 -> outgoing_nodes: deployment2 deployment2 -> outgoing_nodes: deployment4 deployment3 -> outgoing_nodes: deployment4 deployment4 -> outgoing_nodes: deployment6 deployment5 -> outgoing_nodes: deployment6 deployment6 -> outgoing_nodes: [] |-> deployment1 -> deployment2 --> | | -> deployment4 ---> |-> deployment3 ----------> | -> deployment6 |-> deployment5 ------------------------> Let's imagine a graph from this. Node corresponding to Deployment6 will receive 2 calls from deployment4 and deployment5. The task returned by `deployment6` will backpropagated to the caller of deployment1.get_leaf_tasks, deployment3.get_leaf_tasks and deployment5.get_leaf_tasks. When the caller of these methods await them, they will fire the logic of sending requests and responses from and to every deployment :return: Return a list of tuples, where tasks corresponding to the leafs of all the subgraphs born from this node are in each tuple. These tasks will be based on awaiting for the task from previous_node and sending a request to the corresponding node. The other member of the pair is a flag indicating if the task is to be awaited by the gateway or not. """wait_previous_and_send_task=asyncio.create_task(self._wait_previous_and_send(request_to_send,previous_task,connection_pool,endpoint=endpoint))ifself.leaf:# I am like a leafreturn[(notself.hanging,wait_previous_and_send_task)]# I am the last in the chainhanging_tasks_tuples=[]foroutgoing_nodeinself.outgoing_nodes:t=outgoing_node.get_leaf_tasks(connection_pool,None,wait_previous_and_send_task,endpoint=endpoint,)# We are interested in the last one, that will be the task that awaits all the previoushanging_tasks_tuples.extend(t)returnhanging_tasks_tuplesdefadd_route(self,request:'DataRequest'):""" Add routes to the DataRequest based on the state of request processing :param request: the request to add the routes to :return: modified request with added routes """def_find_route(request):forrinrequest.routes:ifr.executor==self.name:returnrreturnNoner=_find_route(request)ifrisNoneandself.start_time:r=request.routes.add()r.executor=self.namer.start_time.FromDatetime(self.start_time)ifself.end_time:r.end_time.FromDatetime(self.end_time)ifself.status:r.status.CopyFrom(self.status)foroutgoing_nodeinself.outgoing_nodes:request=outgoing_node.add_route(request=request)returnrequestdef__init__(self,graph_representation:Dict,graph_conditions:Dict={},*args,**kwargs):num_parts_per_node=defaultdict(int)if'start-gateway'ingraph_representation:origin_node_names=graph_representation['start-gateway']else:origin_node_names=set()hanging_deployment_names=set()node_set=set()fornode_name,outgoing_node_namesingraph_representation.items():ifnode_namenotin{'start-gateway','end-gateway'}:node_set.add(node_name)iflen(outgoing_node_names)==0:hanging_deployment_names.add(node_name)forout_node_nameinoutgoing_node_names:ifout_node_namenotin{'start-gateway','end-gateway'}:node_set.add(out_node_name)num_parts_per_node[out_node_name]+=1nodes={}fornode_nameinnode_set:condition=graph_conditions.get(node_name,None)nodes[node_name]=self._ReqReplyNode(name=node_name,number_of_parts=num_parts_per_node[node_name]ifnum_parts_per_node[node_name]>0else1,hanging=node_nameinhanging_deployment_names,filter_condition=condition,)fornode_name,outgoing_node_namesingraph_representation.items():ifnode_namenotin['start-gateway','end-gateway']:forout_node_nameinoutgoing_node_names:ifout_node_namenotin['start-gateway','end-gateway']:nodes[node_name].outgoing_nodes.append(nodes[out_node_name])self._origin_nodes=[nodes[node_name]fornode_nameinorigin_node_names]self.has_filter_conditions=bool(graph_conditions)
[docs]defadd_routes(self,request:'DataRequest'):""" Add routes to the DataRequest based on the state of request processing :param request: the request to add the routes to :return: modified request with added routes """fornodeinself._origin_nodes:request=node.add_route(request=request)returnrequest
@propertydeforigin_nodes(self):""" The list of origin nodes, the one that depend only on the gateway, so all the subgraphs will be born from them and they will send to their deployments the request as received by the client. :return: A list of nodes """returnself._origin_nodes