Source code for jina.serve.runtimes.gateway.grpc
import os
import grpc
from jina import __default_host__
from jina.proto import jina_pb2_grpc
from jina.serve.runtimes.gateway import GatewayRuntime
from jina.serve.stream import RequestStreamer
from jina.serve.runtimes.gateway.request_handling import handle_request, handle_result
__all__ = ['GRPCGatewayRuntime']
from jina.types.request.control import ControlRequest
[docs]class GRPCGatewayRuntime(GatewayRuntime):
"""Gateway Runtime for gRPC."""
[docs] async def async_setup(self):
"""
The async method to setup.
Create the gRPC server and expose the port for communication.
"""
if not self.args.proxy and os.name != 'nt':
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
self.server = grpc.aio.server(
options=[
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
]
)
self._set_topology_graph()
self._set_connection_pool()
self.streamer = RequestStreamer(
args=self.args,
request_handler=handle_request(
graph=self._topology_graph, connection_pool=self._connection_pool
),
result_handler=handle_result,
)
self.streamer.Call = self.streamer.stream
jina_pb2_grpc.add_JinaRPCServicer_to_server(self.streamer, self.server)
jina_pb2_grpc.add_JinaControlRequestRPCServicer_to_server(self, self.server)
bind_addr = f'{__default_host__}:{self.args.port}'
self.server.add_insecure_port(bind_addr)
self.logger.debug(f' Start server bound to {bind_addr}')
await self.server.start()
[docs] async def async_teardown(self):
"""Close the connection pool"""
# usually async_cancel should already have been called, but then its a noop
# if the runtime is stopped without a sigterm (e.g. as a context manager, this can happen)
await self.async_cancel()
await self._connection_pool.close()
[docs] async def async_cancel(self):
"""The async method to stop server."""
await self.server.stop(0)
[docs] async def async_run_forever(self):
"""The async running of server."""
self._connection_pool.start()
await self.server.wait_for_termination()
[docs] async def process_control(self, request: ControlRequest, *args) -> ControlRequest:
"""
Should be used to check readiness by sending STATUS ControlRequests.
Throws for any other command than STATUS.
:param request: the ControlRequest, should have command 'STATUS'
:param args: additional arguments in the grpc call, ignored
:returns: will be the original request
"""
if self.logger.debug_enabled:
self._log_control_request(request)
if request.command != 'STATUS':
raise ValueError('Gateway only support STATUS ControlRequests')
return request