[docs]classGRPCGatewayRuntime(GatewayRuntime):"""Gateway Runtime for gRPC."""
[docs]asyncdefasync_setup(self):""" The async method to setup. Create the gRPC server and expose the port for communication. """ifnotself.args.proxyandos.name!='nt':os.unsetenv('http_proxy')os.unsetenv('https_proxy')ifnot(is_port_free(__default_host__,self.args.port)):raisePortAlreadyUsed(f'port:{self.args.port}')self.server=grpc.aio.server(options=[('grpc.max_send_message_length',-1),('grpc.max_receive_message_length',-1),])self._set_topology_graph()self._set_connection_pool()awaitself._async_setup_server()
asyncdef_async_setup_server(self):request_handler=RequestHandler(self.metrics_registry,self.name)self.streamer=RequestStreamer(args=self.args,request_handler=request_handler.handle_request(graph=self._topology_graph,connection_pool=self._connection_pool),result_handler=request_handler.handle_result(),)self.streamer.Call=self.streamer.streamjina_pb2_grpc.add_JinaRPCServicer_to_server(self.streamer,self.server)jina_pb2_grpc.add_JinaControlRequestRPCServicer_to_server(self,self.server)service_names=(jina_pb2.DESCRIPTOR.services_by_name['JinaRPC'].full_name,jina_pb2.DESCRIPTOR.services_by_name['JinaControlRequestRPC'].full_name,reflection.SERVICE_NAME,)reflection.enable_server_reflection(service_names,self.server)bind_addr=f'{__default_host__}:{self.args.port}'ifself.args.ssl_keyfileandself.args.ssl_certfile:withopen(self.args.ssl_keyfile,'rb')asf:private_key=f.read()withopen(self.args.ssl_certfile,'rb')asf:certificate_chain=f.read()server_credentials=grpc.ssl_server_credentials(((private_key,certificate_chain,),))self.server.add_secure_port(bind_addr,server_credentials)elif(self.args.ssl_keyfile!=self.args.ssl_certfile):# if we have only ssl_keyfile and not ssl_certfile or vice versaraiseValueError(f"you can't pass a ssl_keyfile without a ssl_certfile and vice versa")else:self.server.add_insecure_port(bind_addr)self.logger.debug(f'start server bound to {bind_addr}')awaitself.server.start()
[docs]asyncdefasync_teardown(self):"""Close the connection pool"""# usually async_cancel should already have been called, but then its a noop# if the runtime is stopped without a sigterm (e.g. as a context manager, this can happen)awaitself.async_cancel()awaitself._connection_pool.close()
[docs]asyncdefasync_cancel(self):"""The async method to stop server."""awaitself.server.stop(0)
[docs]asyncdefasync_run_forever(self):"""The async running of server."""self._connection_pool.start()awaitself.server.wait_for_termination()
[docs]asyncdefprocess_control(self,request:ControlRequest,*args)->ControlRequest:""" Should be used to check readiness by sending STATUS ControlRequests. Throws for any other command than STATUS. :param request: the ControlRequest, should have command 'STATUS' :param args: additional arguments in the grpc call, ignored :returns: will be the original request """ifself.logger.debug_enabled:self._log_control_request(request)ifrequest.command!='STATUS':raiseValueError('gateway only support STATUS ControlRequests')returnrequest