Source code for jina.serve.networking

import os
import asyncio
import ipaddress
from threading import Thread
from typing import Optional, List, Dict, TYPE_CHECKING, Tuple
from urllib.parse import urlparse

import grpc
from grpc.aio import AioRpcError

from jina.logging.logger import JinaLogger
from jina.proto import jina_pb2_grpc
from jina.enums import PollingType
from jina.helper import get_or_reuse_loop
from jina.types.request import Request
from jina.types.request.control import ControlRequest
from import DataRequest

    import kubernetes

[docs]class ReplicaList: """ Maintains a list of connections to replicas and uses round robin for selecting a replica """ def __init__(self): self._connections = [] self._address_to_connection_idx = {} self._address_to_channel = {} self._rr_counter = 0
[docs] def add_connection(self, address: str): """ Add connection with address to the connection list :param address: Target address of this connection """ if address not in self._address_to_connection_idx: try: parsed_address = urlparse(address) address = parsed_address.netloc if parsed_address.netloc else address use_https = parsed_address.scheme == 'https' except: use_https = False self._address_to_connection_idx[address] = len(self._connections) ( single_data_stub, data_stub, control_stub, channel, ) = GrpcConnectionPool.create_async_channel_stub(address, https=use_https) self._address_to_channel[address] = channel self._connections.append((single_data_stub, data_stub, control_stub))
[docs] async def remove_connection(self, address: str): """ Remove connection with address from the connection list :param address: Remove connection for this address :returns: The removed connection or None if there was not any for the given address """ if address in self._address_to_connection_idx: self._rr_counter = ( self._rr_counter % (len(self._connections) - 1) if (len(self._connections) - 1) else 0 ) idx_to_delete = self._address_to_connection_idx.pop(address) popped_connection = self._connections.pop(idx_to_delete) # we should handle graceful termination better, 0.5 is a rather random number here await self._address_to_channel[address].close(0.5) del self._address_to_channel[address] # update the address/idx mapping for address in self._address_to_connection_idx: if self._address_to_connection_idx[address] > idx_to_delete: self._address_to_connection_idx[address] -= 1 return popped_connection return None
[docs] def get_next_connection(self): """ Returns a connection from the list. Strategy is round robin :returns: A connection from the pool """ try: connection = self._connections[self._rr_counter] except IndexError: # This can happen as a race condition while removing connections self._rr_counter = 0 connection = self._connections[self._rr_counter] self._rr_counter = (self._rr_counter + 1) % len(self._connections) return connection
[docs] def get_all_connections(self): """ Returns all available connections :returns: A complete list of all connections from the pool """ return self._connections
[docs] def has_connection(self, address: str) -> bool: """ Checks if a connection for ip exists in the list :param address: The address to check :returns: True if a connection for the ip exists in the list """ return address in self._address_to_connection_idx
[docs] def has_connections(self) -> bool: """ Checks if this contains any connection :returns: True if any connection is managed, False otherwise """ return len(self._address_to_connection_idx) > 0
[docs] async def close(self): """ Close all connections and clean up internal state """ for address in self._address_to_channel: await self._address_to_channel[address].close(0.5) self._address_to_channel.clear() self._address_to_connection_idx.clear() self._connections.clear() self._rr_counter = 0
[docs]class GrpcConnectionPool: """ Manages a list of grpc connections. :param logger: the logger to use """ class _ConnectionPoolMap: def __init__(self, logger: Optional[JinaLogger]): self._logger = logger # this maps deployments to shards or heads self._deployments: Dict[str, Dict[str, Dict[int, ReplicaList]]] = {} # dict stores last entity id used for a particular deployment, used for round robin self._access_count: Dict[str, int] = {} if != 'nt': os.unsetenv('http_proxy') os.unsetenv('https_proxy') def add_replica(self, deployment: str, shard_id: int, address: str): self._add_connection(deployment, shard_id, address, 'shards') def add_head( self, deployment: str, address: str, head_id: Optional[int] = 0 ): # the head_id is always 0 for now, this will change when scaling the head self._add_connection(deployment, head_id, address, 'heads') def get_replicas( self, deployment: str, head: bool, entity_id: Optional[int] = None ) -> ReplicaList: if deployment in self._deployments: type = 'heads' if head else 'shards' if entity_id is None and head: entity_id = 0 return self._get_connection_list(deployment, type, entity_id) else: self._logger.debug( f'Unknown deployment {deployment}, no replicas available' ) return None def get_replicas_all_shards(self, deployment: str) -> List[ReplicaList]: replicas = [] if deployment in self._deployments: for shard_id in self._deployments[deployment]['shards']: replicas.append( self._get_connection_list(deployment, 'shards', shard_id) ) return replicas async def close(self): # Close all connections to all replicas for deployment in self._deployments: for entity_type in self._deployments[deployment]: for shard_in in self._deployments[deployment][entity_type]: await self._deployments[deployment][entity_type][ shard_in ].close() self._deployments.clear() def _get_connection_list( self, deployment: str, type: str, entity_id: Optional[int] = None ) -> ReplicaList: try: if entity_id is None and len(self._deployments[deployment][type]) > 0: # select a random entity self._access_count[deployment] += 1 return self._deployments[deployment][type][ self._access_count[deployment] % len(self._deployments[deployment][type]) ] else: return self._deployments[deployment][type][entity_id] except KeyError: if ( entity_id is None and deployment in self._deployments and len(self._deployments[deployment][type]) ): # This can happen as a race condition when removing connections while accessing it # In this case we don't care for the concrete entity, so retry with the first one return self._get_connection_list(deployment, type, 0) self._logger.debug( f'Did not find a connection for deployment {deployment}, type {type} and entity_id {entity_id}. There are {len(self._deployments[deployment][type]) if deployment in self._deployments else 0} available connections for this deployment and type. ' ) return None def _add_deployment(self, deployment: str): if deployment not in self._deployments: self._deployments[deployment] = {'shards': {}, 'heads': {}} self._access_count[deployment] = 0 def _add_connection( self, deployment: str, entity_id: int, address: str, type: str, ): self._add_deployment(deployment) if entity_id not in self._deployments[deployment][type]: connection_list = ReplicaList() self._deployments[deployment][type][entity_id] = connection_list if not self._deployments[deployment][type][entity_id].has_connection( address ): self._logger.debug( f'Adding connection for deployment {deployment}/{type}/{entity_id} to {address}' ) self._deployments[deployment][type][entity_id].add_connection(address) else: self._logger.debug( f'Ignoring activation of pod, {address} already known' ) async def remove_head(self, deployment, address, head_id: Optional[int] = 0): return await self._remove_connection(deployment, head_id, address, 'heads') async def remove_replica( self, deployment, address, shard_id: Optional[int] = 0 ): return await self._remove_connection( deployment, shard_id, address, 'shards' ) async def _remove_connection(self, deployment, entity_id, address, type): if ( deployment in self._deployments and entity_id in self._deployments[deployment][type] ): self._logger.debug( f'Removing connection for deployment {deployment}/{type}/{entity_id} to {address}' ) connection = await self._deployments[deployment][type][ entity_id ].remove_connection(address) if not self._deployments[deployment][type][entity_id].has_connections(): del self._deployments[deployment][type][entity_id] return connection return None def __init__(self, logger: Optional[JinaLogger] = None): self._logger = logger or JinaLogger(self.__class__.__name__) self._connections = self._ConnectionPoolMap(self._logger)
[docs] def send_request( self, request: Request, deployment: str, head: bool = False, shard_id: Optional[int] = None, polling_type: PollingType = PollingType.ANY, endpoint: Optional[str] = None, ) -> List[asyncio.Task]: """Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :param request: a single request to send :param deployment: name of the Jina deployment to send the message to :param head: If True it is send to the head, otherwise to the worker pods :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :param polling_type: defines if the message should be send to any or all pooled connections for the target :param endpoint: endpoint to target with the request :return: list of asyncio.Task items for each send call """ return self.send_requests( requests=[request], deployment=deployment, head=head, shard_id=shard_id, polling_type=polling_type, endpoint=endpoint, )
[docs] def send_requests( self, requests: List[Request], deployment: str, head: bool = False, shard_id: Optional[int] = None, polling_type: PollingType = PollingType.ANY, endpoint: Optional[str] = None, ) -> List[asyncio.Task]: """Send a request to target via one or all of the pooled connections, depending on polling_type :param requests: request (DataRequest/ControlRequest) to send :param deployment: name of the Jina deployment to send the request to :param head: If True it is send to the head, otherwise to the worker pods :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :param polling_type: defines if the request should be send to any or all pooled connections for the target :param endpoint: endpoint to target with the requests :return: list of asyncio.Task items for each send call """ results = [] connections = [] if polling_type == PollingType.ANY: connection_list = self._connections.get_replicas(deployment, head, shard_id) if connection_list: connections.append(connection_list.get_next_connection()) elif polling_type == PollingType.ALL: connection_lists = self._connections.get_replicas_all_shards(deployment) for connection_list in connection_lists: connections.append(connection_list.get_next_connection()) else: raise ValueError(f'Unsupported polling type {polling_type}') for connection in connections: task = self._send_requests(requests, connection, endpoint) results.append(task) return results
[docs] def send_request_once( self, request: Request, deployment: str, head: bool = False, shard_id: Optional[int] = None, ) -> asyncio.Task: """Send msg to target via only one of the pooled connections :param request: request to send :param deployment: name of the Jina deployment to send the message to :param head: If True it is send to the head, otherwise to the worker pods :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :return: asyncio.Task representing the send call """ return self.send_requests_once( [request], deployment=deployment, head=head, shard_id=shard_id )
[docs] def send_requests_once( self, requests: List[Request], deployment: str, head: bool = False, shard_id: Optional[int] = None, endpoint: Optional[str] = None, ) -> asyncio.Task: """Send a request to target via only one of the pooled connections :param requests: request to send :param deployment: name of the Jina deployment to send the request to :param head: If True it is send to the head, otherwise to the worker pods :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :param endpoint: endpoint to target with the requests :return: asyncio.Task representing the send call """ replicas = self._connections.get_replicas(deployment, head, shard_id) if replicas: connection = replicas.get_next_connection() return self._send_requests(requests, connection, endpoint) else: self._logger.debug( f'No available connections for deployment {deployment} and shard {shard_id}' ) return None
[docs] def add_connection( self, deployment: str, address: str, head: Optional[bool] = False, shard_id: Optional[int] = None, ): """ Adds a connection for a deployment to this connection pool :param deployment: The deployment the connection belongs to, like 'encoder' :param head: True if the connection is for a head :param address: Address used for the grpc connection, format is <host>:<port> :param shard_id: Optional parameter to indicate this connection belongs to a shard, ignored for heads """ if head: self._connections.add_head(deployment, address, 0) else: if shard_id is None: shard_id = 0 self._connections.add_replica(deployment, shard_id, address)
[docs] async def remove_connection( self, deployment: str, address: str, head: Optional[bool] = False, shard_id: Optional[int] = None, ): """ Removes a connection to a deployment :param deployment: The deployment the connection belongs to, like 'encoder' :param address: Address used for the grpc connection, format is <host>:<port> :param head: True if the connection is for a head :param shard_id: Optional parameter to indicate this connection belongs to a shard, ignored for heads :return: The removed connection, None if it did not exist """ if head: return await self._connections.remove_head(deployment, address) else: if shard_id is None: shard_id = 0 return await self._connections.remove_replica(deployment, address, shard_id)
[docs] def start(self): """ Starts the connection pool """ pass
[docs] async def close(self): """ Closes the connection pool """ await self._connections.close()
def _send_requests( self, requests: List[Request], connection, endpoint: Optional[str] = None ) -> asyncio.Task: # this wraps the awaitable object from grpc as a coroutine so it can be used as a task # the grpc call function is not a coroutine but some _AioCall async def task_wrapper(requests, stubs, endpoint): metadata = (('endpoint', endpoint),) if endpoint else None for i in range(3): try: request_type = type(requests[0]) if request_type == DataRequest and len(requests) == 1: call_result = stubs[0].process_single_data( requests[0], metadata=metadata ) metadata, response = ( await call_result.trailing_metadata(), await call_result, ) return response, metadata if request_type == DataRequest and len(requests) > 1: call_result = stubs[1].process_data(requests, metadata=metadata) metadata, response = ( await call_result.trailing_metadata(), await call_result, ) return response, metadata elif request_type == ControlRequest: call_result = stubs[2].process_control(requests[0]) metadata, response = ( await call_result.trailing_metadata(), await call_result, ) return response, metadata else: raise ValueError( f'Unsupported request type {type(requests[0])}' ) except AioRpcError as e: if e.code() != grpc.StatusCode.UNAVAILABLE: raise elif e.code() == grpc.StatusCode.UNAVAILABLE and i == 2: self._logger.debug(f'GRPC call failed, retries exhausted') raise else: self._logger.debug( f'GRPC call failed with StatusCode.UNAVAILABLE, retry attempt {i+1}/3' ) return asyncio.create_task(task_wrapper(requests, connection, endpoint))
[docs] @staticmethod def get_grpc_channel( address: str, options: Optional[list] = None, asyncio: Optional[bool] = False, https: Optional[bool] = False, root_certificates: Optional[str] = None, ) -> grpc.Channel: """ Creates a grpc channel to the given address :param address: The address to connect to, format is <host>:<port> :param options: A list of options to pass to the grpc channel :param asyncio: If True, use the asyncio implementation of the grpc channel :param https: If True, use https for the grpc channel :param root_certificates: The path to the root certificates for https, only used if https is True :return: A grpc channel or an asyncio channel """ secure_channel = grpc.secure_channel insecure_channel = grpc.insecure_channel if asyncio: secure_channel = grpc.aio.secure_channel insecure_channel = grpc.aio.insecure_channel if options is None: options = GrpcConnectionPool.get_default_grpc_options() if https: credentials = grpc.ssl_channel_credentials( root_certificates=root_certificates ) return secure_channel(address, credentials, options) return insecure_channel(address, options)
[docs] @staticmethod def activate_worker_sync( worker_host: str, worker_port: int, target_head: str, shard_id: Optional[int] = None, ) -> ControlRequest: """ Register a given worker to a head by sending an activate request :param worker_host: the host address of the worker :param worker_port: the port of the worker :param target_head: address of the head to send the activate request to :param shard_id: id of the shard the worker belongs to :returns: the response request """ activate_request = ControlRequest(command='ACTIVATE') activate_request.add_related_entity( 'worker', worker_host, worker_port, shard_id ) if != 'nt': os.unsetenv('http_proxy') os.unsetenv('https_proxy') return GrpcConnectionPool.send_request_sync(activate_request, target_head)
[docs] @staticmethod async def activate_worker( worker_host: str, worker_port: int, target_head: str, shard_id: Optional[int] = None, ) -> ControlRequest: """ Register a given worker to a head by sending an activate request :param worker_host: the host address of the worker :param worker_port: the port of the worker :param target_head: address of the head to send the activate request to :param shard_id: id of the shard the worker belongs to :returns: the response request """ activate_request = ControlRequest(command='ACTIVATE') activate_request.add_related_entity( 'worker', worker_host, worker_port, shard_id ) return await GrpcConnectionPool.send_request_async( activate_request, target_head )
[docs] @staticmethod async def deactivate_worker( worker_host: str, worker_port: int, target_head: str, shard_id: Optional[int] = None, ) -> ControlRequest: """ Remove a given worker to a head by sending a deactivate request :param worker_host: the host address of the worker :param worker_port: the port of the worker :param target_head: address of the head to send the deactivate request to :param shard_id: id of the shard the worker belongs to :returns: the response request """ activate_request = ControlRequest(command='DEACTIVATE') activate_request.add_related_entity( 'worker', worker_host, worker_port, shard_id ) return await GrpcConnectionPool.send_request_async( activate_request, target_head )
[docs] @staticmethod def send_request_sync( request: Request, target: str, timeout=100.0, https=False, root_certificates: Optional[str] = None, endpoint: Optional[str] = None, ) -> Request: """ Sends a request synchronically to the target via grpc :param request: the request to send :param target: where to send the request to, like :param timeout: timeout for the send :param https: if True, use https for the grpc channel :param root_certificates: the path to the root certificates for https, only used if https is True :param endpoint: endpoint to target with the request :returns: the response request """ for i in range(3): try: with GrpcConnectionPool.get_grpc_channel( target, https=https, root_certificates=root_certificates, ) as channel: if type(request) == DataRequest: metadata = (('endpoint', endpoint),) if endpoint else None stub = jina_pb2_grpc.JinaSingleDataRequestRPCStub(channel) response, call = stub.process_single_data.with_call( request, timeout=timeout, metadata=metadata ) elif type(request) == ControlRequest: stub = jina_pb2_grpc.JinaControlRequestRPCStub(channel) response = stub.process_control(request, timeout=timeout) return response except grpc.RpcError as e: if e.code() != grpc.StatusCode.UNAVAILABLE or i == 2: raise
[docs] @staticmethod def send_requests_sync( requests: List[Request], target: str, timeout=100.0, https=False, root_certificates: Optional[str] = None, endpoint: Optional[str] = None, ) -> Request: """ Sends a list of requests synchronically to the target via grpc :param requests: the requests to send :param target: where to send the request to, like :param timeout: timeout for the send :param https: if True, use https for the grpc channel :param root_certificates: the path to the root certificates for https, only used if https is True :param endpoint: endpoint to target with the request :returns: the response request """ for i in range(3): try: with GrpcConnectionPool.get_grpc_channel( target, https=https, root_certificates=root_certificates, ) as channel: metadata = (('endpoint', endpoint),) if endpoint else None stub = jina_pb2_grpc.JinaDataRequestRPCStub(channel) response, call = stub.process_data.with_call( requests, timeout=timeout, metadata=metadata ) return response except grpc.RpcError as e: if e.code() != grpc.StatusCode.UNAVAILABLE or i == 2: raise
[docs] @staticmethod def get_default_grpc_options(): """ Returns a list of default options used for creating grpc channels. Documentation is here :returns: list of tuples defining grpc parameters """ return [ ('grpc.max_send_message_length', -1), ('grpc.max_receive_message_length', -1), ]
[docs] @staticmethod async def send_request_async( request: Request, target: str, timeout: float = 1.0, https: bool = False, root_certificates: Optional[str] = None, ) -> Request: """ Sends a request asynchronously to the target via grpc :param request: the request to send :param target: where to send the request to, like :param timeout: timeout for the send :param https: if True, use https for the grpc channel :param root_certificates: the path to the root certificates for https, only u :returns: the response request """ async with GrpcConnectionPool.get_grpc_channel( target, asyncio=True, https=https, root_certificates=root_certificates, ) as channel: if type(request) == DataRequest: stub = jina_pb2_grpc.JinaSingleDataRequestRPCStub(channel) return await stub.process_single_data(request, timeout=timeout) elif type(request) == ControlRequest: stub = jina_pb2_grpc.JinaControlRequestRPCStub(channel) return await stub.process_control(request, timeout=timeout)
[docs] @staticmethod def create_async_channel_stub( address, https=False, root_certificates: Optional[str] = None, ) -> Tuple[ jina_pb2_grpc.JinaSingleDataRequestRPCStub, jina_pb2_grpc.JinaDataRequestRPCStub, jina_pb2_grpc.JinaControlRequestRPCStub, grpc.aio.Channel, ]: """ Creates an async GRPC Channel. This channel has to be closed eventually! :param address: the address to create the connection to, like :param https: if True, use https for the grpc channel :param root_certificates: the path to the root certificates for https, only u :returns: DataRequest/ControlRequest stubs and an async grpc channel """ channel = GrpcConnectionPool.get_grpc_channel( address, asyncio=True, https=https, root_certificates=root_certificates, ) return ( jina_pb2_grpc.JinaSingleDataRequestRPCStub(channel), jina_pb2_grpc.JinaDataRequestRPCStub(channel), jina_pb2_grpc.JinaControlRequestRPCStub(channel), channel, )
[docs]class K8sGrpcConnectionPool(GrpcConnectionPool): """ Manages grpc connections to replicas in a K8s deployment. :param namespace: K8s namespace to operate in :param client: K8s client :param logger: the logger to use """ K8S_PORT = 8080 K8S_PORT_USES_BEFORE = 8081 K8S_PORT_USES_AFTER = 8082 def __init__( self, namespace: str, client: 'kubernetes.client.CoreV1Api', logger: JinaLogger = None, ): super().__init__(logger=logger) self._namespace = namespace self._process_events_task = None self._k8s_client = client self._k8s_event_queue = asyncio.Queue() self.enabled = False from kubernetes import watch self._api_watch = watch.Watch() self.update_thread = Thread(, daemon=True) async def _fetch_initial_state(self): namespaced_pods = self._k8s_client.list_namespaced_pod(self._namespace) for item in namespaced_pods.items: await self._process_item(item)
[docs] def start(self): """ Subscribe to the K8s API and watch for changes in Pods """ self._loop = get_or_reuse_loop() self._process_events_task = asyncio.create_task(self._process_events()) self.update_thread.start()
async def _process_events(self): await self._fetch_initial_state() while self.enabled: event = await self._k8s_event_queue.get() await self._process_item(event)
[docs] def run(self): """ Subscribes on MODIFIED events from list_namespaced_pod AK8s PI """ self.enabled = True while self.enabled: for event in self._k8s_client.list_namespaced_pod, self._namespace ): if event['type'] == 'MODIFIED': asyncio.run_coroutine_threadsafe( self._k8s_event_queue.put(event['object']), self._loop ) if not self.enabled: break
[docs] async def close(self): """ Closes the connection pool """ self.enabled = False if self._process_events_task: self._process_events_task.cancel() self._api_watch.stop() await super().close()
@staticmethod def _pod_is_up(item): return item.status.pod_ip is not None and item.status.phase == 'Running' @staticmethod def _pod_is_ready(item): return item.status.container_statuses is not None and all( cs.ready for cs in item.status.container_statuses ) async def _process_item(self, item): try: jina_deployment_name = item.metadata.labels['jina_deployment_name'] is_head = item.metadata.labels['pod_type'].lower() == 'head' shard_id = ( int(item.metadata.labels['shard_id']) if item.metadata.labels['shard_id'] and not is_head else None ) is_deleted = item.metadata.deletion_timestamp is not None ip = item.status.pod_ip port = self.K8S_PORT if ( ip and port and not is_deleted and self._pod_is_up(item) and self._pod_is_ready(item) ): self.add_connection( deployment=jina_deployment_name, head=is_head, address=f'{ip}:{port}', shard_id=shard_id, ) elif ip and port and is_deleted and self._pod_is_up(item): await self.remove_connection( deployment=jina_deployment_name, head=is_head, address=f'{ip}:{port}', shard_id=shard_id, ) except KeyError: self._logger.debug( f'Ignoring changes to non Jina resource {}' ) pass @staticmethod def _extract_port(item): for container in item.spec.containers: if == 'executor': return container.ports[0].container_port return None
[docs]def is_remote_local_connection(first: str, second: str): """ Decides, whether ``first`` is remote host and ``second`` is localhost :param first: the ip or host name of the first runtime :param second: the ip or host name of the second runtime :return: True, if first is remote and second is local """ try: first_ip = ipaddress.ip_address(first) first_global = first_ip.is_global except ValueError: if first == 'localhost': first_global = False else: first_global = True try: second_ip = ipaddress.ip_address(second) second_local = second_ip.is_private or second_ip.is_loopback except ValueError: if second == 'localhost': second_local = True else: second_local = False return first_global and second_local
[docs]def create_connection_pool( k8s_connection_pool: bool = False, k8s_namespace: Optional[str] = None, logger: Optional[JinaLogger] = None, ) -> GrpcConnectionPool: """ Creates the appropriate connection pool based on parameters :param k8s_namespace: k8s namespace the pool will live in, None if outside K8s :param k8s_connection_pool: flag to indicate if K8sGrpcConnectionPool should be used, defaults to true in K8s :param logger: the logger to use :return: A connection pool object """ if k8s_connection_pool and k8s_namespace: import kubernetes from kubernetes import client kubernetes.config.load_incluster_config() k8s_client = client.ApiClient() core_client = client.CoreV1Api(api_client=k8s_client) return K8sGrpcConnectionPool( namespace=k8s_namespace, client=core_client, logger=logger ) else: return GrpcConnectionPool(logger=logger)
[docs]def host_is_local(hostname): """ Check if hostname is point to localhost :param hostname: host to check :return: True if hostname means localhost, False otherwise """ import socket fqn = socket.getfqdn(hostname) if fqn in ("localhost", "") or hostname == '': return True return ipaddress.ip_address(hostname).is_loopback