[docs]classAioHttpClientlet(ABC):"""aiohttp session manager"""def__init__(self,url:str,logger:'JinaLogger')->None:"""HTTP Client to be used with the streamer :param url: url to send http/websocket request to :param logger: jina logger """self.url=urlself.logger=loggerself.msg_recv=0self.msg_sent=0self.session=None
[docs]@abstractmethodasyncdefsend_message(self):"""Send message to Gateway"""...
[docs]@abstractmethodasyncdefrecv_message(self):"""Receive message from Gateway"""...
[docs]asyncdefstart(self):"""Create ClientSession and enter context :return: self """withImportExtensions(required=True):importaiohttpself.session=aiohttp.ClientSession()awaitself.session.__aenter__()returnself
[docs]classHTTPClientlet(AioHttpClientlet):"""HTTP Client to be used with the streamer"""
[docs]asyncdefsend_message(self,request:'Request'):"""Sends a POST request to the server :param request: request as dict :return: send post message """req_dict=request.to_dict()req_dict['exec_endpoint']=req_dict['header']['exec_endpoint']if'target_executor'inreq_dict['header']:req_dict['target_executor']=req_dict['header']['target_executor']returnawaitself.session.post(url=self.url,json=req_dict).__aenter__()
[docs]asyncdefrecv_message(self):"""Receive message for HTTP (sleep) :return: await sleep """returnawaitasyncio.sleep(1e10)
[docs]classWebsocketClientlet(AioHttpClientlet):"""Websocket Client to be used with the streamer"""def__init__(self,*args,**kwargs)->None:super().__init__(*args,**kwargs)self.websocket=None
[docs]asyncdefsend_message(self,request:'Request'):"""Send request in bytes to the server. :param request: request object :return: send request as bytes awaitable """try:returnawaitself.websocket.send_bytes(request.SerializeToString())exceptConnectionResetError:self.logger.critical(f'server connection closed already!')
[docs]asyncdefsend_eoi(self):"""To confirm end of iteration, we send `bytes(True)` to the server. :return: send `bytes(True)` awaitable """try:returnawaitself.websocket.send_bytes(bytes(True))exceptConnectionResetError:# server might be in a `CLOSING` state while sending EOI signal# which raises a `ConnectionResetError`, this can be ignored.pass
[docs]asyncdefrecv_message(self)->'DataRequest':"""Receive messages in bytes from server and convert to `DataRequest` ..note:: aiohttp allows only one task which can `receive` concurrently. we need to make sure we don't create multiple tasks with `recv_message` :yield: response objects received from server """asyncforresponseinself.websocket:yieldDataRequest(response.data)