[docs]classGRPCBaseClient(BaseClient):"""A simple Python client for connecting to the gRPC gateway. It manages the asyncio event loop internally, so all interfaces are synchronous from the outside. """asyncdef_get_results(self,inputs:'InputType',on_done:'CallbackFnType',on_error:Optional['CallbackFnType']=None,on_always:Optional['CallbackFnType']=None,compression:str='NoCompression',**kwargs,):try:ifcompression.lower()notinGRPC_COMPRESSION_MAP:importwarningswarnings.warn(message=f'Your compression "{compression}" is not supported. Supported 'f'algorithms are `Gzip`, `Deflate` and `NoCompression`. NoCompression will be used as 'f'default')compression='NoCompression'self.inputs=inputsreq_iter=self._get_requests(**kwargs)asyncwithGrpcConnectionPool.get_grpc_channel(f'{self.args.host}:{self.args.port}',asyncio=True,tls=self.args.tls,)aschannel:stub=jina_pb2_grpc.JinaRPCStub(channel)self.logger.debug(f'connected to {self.args.host}:{self.args.port}')withProgressBar(total_length=self._inputs_length,disable=not(self.show_progress))asp_bar:asyncforrespinstub.Call(req_iter,compression=GRPC_COMPRESSION_MAP.get(compression.lower(),grpc.Compression.NoCompression),):callback_exec(response=resp,on_error=on_error,on_done=on_done,on_always=on_always,continue_on_error=self.continue_on_error,logger=self.logger,)ifself.show_progress:p_bar.update()yieldrespexceptKeyboardInterrupt:self.logger.warning('user cancel the process')exceptasyncio.CancelledErrorasex:self.logger.warning(f'process error: {ex!r}')exceptgrpc.aio._call.AioRpcErrorasrpc_ex:# Since this object is guaranteed to be a grpc.Call, might as well include that in its name.my_code=rpc_ex.code()my_details=rpc_ex.details()msg=f'gRPC error: {my_code}{my_details}'try:ifmy_code==grpc.StatusCode.UNAVAILABLE:self.logger.error(f'{msg}\nthe ongoing request is terminated as the server is not available or closed already')raiserpc_exelifmy_code==grpc.StatusCode.INTERNAL:self.logger.error(f'{msg}\ninternal error on the server side')raiserpc_exelif(my_code==grpc.StatusCode.UNKNOWNand'asyncio.exceptions.TimeoutError'inmy_details):raiseBadClientInput(f'{msg}\n''often the case is that you define/send a bad input iterator to jina, ''please double check your input iterator')fromrpc_exelse:raiseBadClient(msg)fromrpc_exexcept(grpc.aio._call.AioRpcError,BaseJinaException,)ase:# depending on if there are callbacks we catch or not the exceptionifon_errororon_always:ifon_error:callback_exec_on_error(on_error,e,self.logger)ifon_always:callback_exec(response=None,on_error=None,on_done=None,on_always=on_always,continue_on_error=self.continue_on_error,logger=self.logger,)else:raisee