[docs]classHTTPBaseClient(BaseClient):"""A MixIn for HTTP Client."""asyncdef_get_results(self,inputs:'InputType',on_done:'CallbackFnType',on_error:Optional['CallbackFnType']=None,on_always:Optional['CallbackFnType']=None,**kwargs,):""" :param inputs: the callable :param on_done: the callback for on_done :param on_error: the callback for on_error :param on_always: the callback for on_always :param kwargs: kwargs for _get_task_name and _get_requests :yields: generator over results """withImportExtensions(required=True):importaiohttpself.inputs=inputsrequest_iterator=self._get_requests(**kwargs)asyncwithAsyncExitStack()asstack:try:cm1=ProgressBar(total_length=self._inputs_length,disable=not(self.show_progress))p_bar=stack.enter_context(cm1)proto='https'ifself.args.tlselse'http'url=f'{proto}://{self.args.host}:{self.args.port}/post'iolet=awaitstack.enter_async_context(HTTPClientlet(url=url,logger=self.logger))def_request_handler(request:'Request')->'asyncio.Future':""" For HTTP Client, for each request in the iterator, we `send_message` using http POST request and add it to the list of tasks which is awaited and yielded. :param request: current request in the iterator :return: asyncio Task for sending message """returnasyncio.ensure_future(iolet.send_message(request=request))def_result_handler(result):returnresultstreamer=RequestStreamer(self.args,request_handler=_request_handler,result_handler=_result_handler,)asyncforresponseinstreamer.stream(request_iterator):r_status=response.statusr_str=awaitresponse.json()ifr_status==404:raiseBadClient(f'no such endpoint {url}')elifr_status<200orr_status>300:raiseValueError(r_str)da=Noneif'data'inr_strandr_str['data']isnotNone:fromdocarrayimportDocumentArrayda=DocumentArray.from_dict(r_str['data'])delr_str['data']resp=DataRequest(r_str)ifdaisnotNone:resp.data.docs=dacallback_exec(response=resp,on_error=on_error,on_done=on_done,on_always=on_always,continue_on_error=self.continue_on_error,logger=self.logger,)ifself.show_progress:p_bar.update()yieldrespexceptaiohttp.ClientErrorase:self.logger.error(f'Error while fetching response from HTTP server {e!r}')ifon_errororon_always:ifon_error:callback_exec_on_error(on_error,e,self.logger)ifon_always:callback_exec(response=None,on_error=None,on_done=None,on_always=on_always,continue_on_error=self.continue_on_error,logger=self.logger,)else:raisee