jina.serve.networking module

class jina.serve.networking.ReplicaList(summary)[source]

Bases: object

Maintains a list of connections to replicas and uses round robin for selecting a replica

add_connection(address)[source]

Add connection with address to the connection list :type address: str :param address: Target address of this connection

async remove_connection(address)[source]

Remove connection with address from the connection list :type address: str :param address: Remove connection for this address :returns: The removed connection or None if there was not any for the given address

get_next_connection()[source]

Returns a connection from the list. Strategy is round robin :returns: A connection from the pool

get_all_connections()[source]

Returns all available connections :returns: A complete list of all connections from the pool

has_connection(address)[source]

Checks if a connection for ip exists in the list :type address: str :param address: The address to check :rtype: bool :returns: True if a connection for the ip exists in the list

has_connections()[source]

Checks if this contains any connection :rtype: bool :returns: True if any connection is managed, False otherwise

async close()[source]

Close all connections and clean up internal state

class jina.serve.networking.GrpcConnectionPool(logger=None, compression='NoCompression', metrics_registry=None)[source]

Bases: object

Manages a list of grpc connections.

Parameters
  • logger (Optional[JinaLogger]) – the logger to use

  • compression (str) – The compression algorithm to be used by this GRPCConnectionPool when sending data to GRPC

K8S_PORT_USES_AFTER = 8082
K8S_PORT_USES_BEFORE = 8081
K8S_PORT = 8080
class ConnectionStubs(address, channel, summary)[source]

Bases: object

Maintains a list of grpc stubs available for a particular connection

STUB_MAPPING = {'jina.JinaControlRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaControlRequestRPCStub'>, 'jina.JinaDataRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaDataRequestRPCStub'>, 'jina.JinaDiscoverEndpointsRPC': <class 'jina.proto.jina_pb2_grpc.JinaDiscoverEndpointsRPCStub'>, 'jina.JinaRPC': <class 'jina.proto.jina_pb2_grpc.JinaRPCStub'>, 'jina.JinaSingleDataRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaSingleDataRequestRPCStub'>}
async send_discover_endpoint(timeout=None)[source]

Use the endpoint discovery stub to request for the Endpoints Exposed by an Executor

Parameters

timeout (Optional[float]) – defines timeout for sending request

Return type

Tuple

Returns

Tuple of response and metadata about the response

async send_requests(requests, metadata, compression, timeout=None)[source]

Send requests and uses the appropriate grpc stub for this Stub is chosen based on availability and type of requests

Parameters
  • requests (List[Request]) – the requests to send

  • metadata – the metadata to send alongside the requests

  • compression – defines if compression should be used

  • timeout (Optional[float]) – defines timeout for sending request

Return type

Tuple

Returns

Tuple of response and metadata about the response

send_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None)[source]

Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request: Request :param request: a single request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type: PollingType :param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint: Optional[str] :param endpoint: endpoint to target with the request :type timeout: Optional[float] :param timeout: timeout for sending the requests :rtype: List[Task] :return: list of asyncio.Task items for each send call

send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None)[source]

Send a request to target via one or all of the pooled connections, depending on polling_type

Parameters
  • requests (List[Request]) – request (DataRequest/ControlRequest) to send

  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • polling_type (PollingType) – defines if the request should be send to any or all pooled connections for the target

  • endpoint (Optional[str]) – endpoint to target with the requests

  • timeout (Optional[float]) – timeout for sending the requests

Return type

List[Task]

Returns

list of asyncio.Task items for each send call

send_discover_endpoint(deployment, head=True, shard_id=None, timeout=None)[source]

Sends a discover Endpoint call to target.

Parameters
  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • timeout (Optional[float]) – timeout for sending the requests

Return type

Task

Returns

asyncio.Task items to send call

send_request_once(request, deployment, head=False, shard_id=None, timeout=None)[source]

Send msg to target via only one of the pooled connections :type request: Request :param request: request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type timeout: Optional[float] :param timeout: timeout for sending the requests :rtype: Task :return: asyncio.Task representing the send call

send_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None, timeout=None)[source]

Send a request to target via only one of the pooled connections

Parameters
  • requests (List[Request]) – request to send

  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • endpoint (Optional[str]) – endpoint to target with the requests

  • timeout (Optional[float]) – timeout for sending the requests

Return type

Task

Returns

asyncio.Task representing the send call

add_connection(deployment, address, head=False, shard_id=None)[source]

Adds a connection for a deployment to this connection pool

Parameters
  • deployment (str) – The deployment the connection belongs to, like ‘encoder’

  • head (bool) – True if the connection is for a head

  • address (str) – Address used for the grpc connection, format is <host>:<port>

  • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

async remove_connection(deployment, address, head=False, shard_id=None)[source]

Removes a connection to a deployment

Parameters
  • deployment (str) – The deployment the connection belongs to, like ‘encoder’

  • address (str) – Address used for the grpc connection, format is <host>:<port>

  • head (bool) – True if the connection is for a head

  • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

Returns

The removed connection, None if it did not exist

start()[source]

Starts the connection pool

async close()[source]

Closes the connection pool

static get_grpc_channel(address, options=None, asyncio=False, tls=False, root_certificates=None)[source]

Creates a grpc channel to the given address

Parameters
  • address (str) – The address to connect to, format is <host>:<port>

  • options (Optional[list]) – A list of options to pass to the grpc channel

  • asyncio (bool) – If True, use the asyncio implementation of the grpc channel

  • tls (bool) – If True, use tls encryption for the grpc channel

  • root_certificates (Optional[str]) – The path to the root certificates for tls, only used if tls is True

Return type

Channel

Returns

A grpc channel or an asyncio channel

static activate_worker_sync(worker_host, worker_port, target_head, shard_id=None)[source]

Register a given worker to a head by sending an activate request

Parameters
  • worker_host (str) – the host address of the worker

  • worker_port (int) – the port of the worker

  • target_head (str) – address of the head to send the activate request to

  • shard_id (Optional[int]) – id of the shard the worker belongs to

Return type

ControlRequest

Returns

the response request

async static activate_worker(worker_host, worker_port, target_head, shard_id=None)[source]

Register a given worker to a head by sending an activate request

Parameters
  • worker_host (str) – the host address of the worker

  • worker_port (int) – the port of the worker

  • target_head (str) – address of the head to send the activate request to

  • shard_id (Optional[int]) – id of the shard the worker belongs to

Return type

ControlRequest

Returns

the response request

async static deactivate_worker(worker_host, worker_port, target_head, shard_id=None)[source]

Remove a given worker to a head by sending a deactivate request

Parameters
  • worker_host (str) – the host address of the worker

  • worker_port (int) – the port of the worker

  • target_head (str) – address of the head to send the deactivate request to

  • shard_id (Optional[int]) – id of the shard the worker belongs to

Return type

ControlRequest

Returns

the response request

static send_request_sync(request, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]

Sends a request synchronously to the target via grpc

Parameters
  • request (Request) – the request to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout – timeout for the send

  • tls – if True, use tls encryption for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

  • endpoint (Optional[str]) – endpoint to target with the request

Return type

Request

Returns

the response request

static send_requests_sync(requests, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]

Sends a list of requests synchronically to the target via grpc

Parameters
  • requests (List[Request]) – the requests to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout – timeout for the send

  • tls – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

  • endpoint (Optional[str]) – endpoint to target with the request

Return type

Request

Returns

the response request

static get_default_grpc_options()[source]

Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters

async static send_request_async(request, target, timeout=1.0, tls=False, root_certificates=None)[source]

Sends a request asynchronously to the target via grpc

Parameters
  • request (Request) – the request to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout (float) – timeout for the send

  • tls (bool) – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

Return type

Request

Returns

the response request

static create_async_channel_stub(address, tls=False, root_certificates=None, summary=None)[source]

Creates an async GRPC Channel. This channel has to be closed eventually!

Parameters
  • address – the address to create the connection to, like 127.0.0.0.1:8080

  • tls – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only u

  • summary – Optional Prometheus summary object

Return type

Tuple[ConnectionStubs, Channel]

Returns

DataRequest/ControlRequest stubs and an async grpc channel

async static get_available_services(channel)[source]

Lists available services by name, exposed at target address

Parameters

channel – the channel to use

Return type

List[str]

Returns

List of services offered

jina.serve.networking.in_docker()[source]

Checks if the current process is running inside Docker :return: True if the current process is running inside Docker

jina.serve.networking.host_is_local(hostname)[source]

Check if hostname is point to localhost :param hostname: host to check :return: True if hostname means localhost, False otherwise