[docs]classAioHttpClientlet(ABC):"""aiohttp session manager"""def__init__(self,url:str,logger:'JinaLogger')->None:"""HTTP Client to be used with the streamer :param url: url to send http/websocket request to :param logger: jina logger """self.url=urlself.logger=loggerself.msg_recv=0self.msg_sent=0self.session=None
[docs]@abstractmethodasyncdefsend_message(self):"""Send message to Gateway"""...
[docs]@abstractmethodasyncdefrecv_message(self):"""Receive message from Gateway"""...
[docs]asyncdefstart(self):"""Create ClientSession and enter context :return: self """withImportExtensions(required=True):importaiohttpself.session=aiohttp.ClientSession()awaitself.session.__aenter__()returnself
[docs]classHTTPClientlet(AioHttpClientlet):"""HTTP Client to be used with the streamer"""
[docs]asyncdefsend_message(self,request:'Request'):"""Sends a POST request to the server :param request: request as dict :return: send post message """req_dict=request.to_dict()req_dict['exec_endpoint']=req_dict['header']['exec_endpoint']if'target_executor'inreq_dict['header']:req_dict['target_executor']=req_dict['header']['target_executor']returnawaitself.session.post(url=self.url,json=req_dict).__aenter__()
[docs]asyncdefrecv_message(self):"""Receive message for HTTP (sleep) :return: await sleep """returnawaitasyncio.sleep(1e10)
[docs]classWsResponseIter:""" Iterates over all the responses that come in over the websocket connection. In contrast to the iterator built into AioHTTP, this also records the message that was sent at closing-time. """def__init__(self,websocket):self.websocket=websocketself.close_message=Nonedef__aiter__(self):returnselfasyncdef__anext__(self):msg=awaitself.websocket.receive()ifmsg.typein(WSMsgType.CLOSE,WSMsgType.CLOSING,WSMsgType.CLOSED):self.close_message=msgraiseStopAsyncIterationreturnmsg
[docs]classWebsocketClientlet(AioHttpClientlet):"""Websocket Client to be used with the streamer"""def__init__(self,*args,**kwargs)->None:super().__init__(*args,**kwargs)self.websocket=Noneself.response_iter=None
[docs]asyncdefsend_message(self,request:'Request'):"""Send request in bytes to the server. :param request: request object :return: send request as bytes awaitable """try:returnawaitself.websocket.send_bytes(request.SerializeToString())exceptConnectionResetError:self.logger.critical(f'server connection closed already!')
[docs]asyncdefsend_eoi(self):"""To confirm end of iteration, we send `bytes(True)` to the server. :return: send `bytes(True)` awaitable """try:returnawaitself.websocket.send_bytes(bytes(True))exceptConnectionResetError:# server might be in a `CLOSING` state while sending EOI signal# which raises a `ConnectionResetError`, this can be ignored.pass
[docs]asyncdefrecv_message(self)->'DataRequest':"""Receive messages in bytes from server and convert to `DataRequest` ..note:: aiohttp allows only one task which can `receive` concurrently. we need to make sure we don't create multiple tasks with `recv_message` :yield: response objects received from server """asyncforresponseinself.response_iter:yieldDataRequest(response.data)
asyncdef__aenter__(self):awaitsuper().__aenter__()self.websocket=awaitself.session.ws_connect(url=self.url,protocols=(WebsocketSubProtocols.BYTES.value,),).__aenter__()self.response_iter=WsResponseIter(self.websocket)returnself@propertydefclose_message(self):""":return: the close message (reason) of the ws closing response"""returnself.response_iter.close_message.extraifself.response_iterelseNone@propertydefclose_code(self):""":return: the close code of the ws closing response"""returnself.websocket.close_codeifself.websocketelseNone