jina.serve.networking module#

class jina.serve.networking.ReplicaList(metrics, logger, runtine_name)[source]#

Bases: object

Maintains a list of connections to replicas and uses round robin for selecting a replica

async reset_connection(address)[source]#

Removes and then re-adds a connection. Result is the same as calling remove_connection() and then add_connection(), but this allows for handling of race condition if multiple callers reset a connection at the same time.

Parameters:

address (str) – Target address of this connection

Return type:

Optional[Channel]

Returns:

The reset connection or None if there was no connection for the given address

add_connection(address)[source]#

Add connection with address to the connection list :type address: str :param address: Target address of this connection

async remove_connection(address)[source]#

Remove connection with address from the connection list

Warning

This completely removes the connection, including all dictionary keys that point to it. Therefore, be careful not to call this method while iterating over all connections. If you want to reset (remove and re-add) a connection, use jina.serve.networking.ReplicaList.reset_connection(), which is safe to use in this scenario.

Parameters:

address (str) – Remove connection for this address

Return type:

Optional[Channel]

Returns:

The removed connection or None if there was not any for the given address

async get_next_connection(num_retries=3)[source]#

Returns a connection from the list. Strategy is round robin :param num_retries: how many retries should be performed when all connections are currently unavailable :returns: A connection from the pool

get_all_connections()[source]#

Returns all available connections :returns: A complete list of all connections from the pool

has_connection(address)[source]#

Checks if a connection for ip exists in the list :type address: str :param address: The address to check :rtype: bool :returns: True if a connection for the ip exists in the list

has_connections()[source]#

Checks if this contains any connection :rtype: bool :returns: True if any connection is managed, False otherwise

async close()[source]#

Close all connections and clean up internal state

class jina.serve.networking.GrpcConnectionPool(runtime_name, logger=None, compression=None, metrics_registry=None)[source]#

Bases: object

Manages a list of grpc connections.

Parameters:
  • logger (Optional[JinaLogger]) – the logger to use

  • compression (Optional[str]) – The compression algorithm to be used by this GRPCConnectionPool when sending data to GRPC

K8S_PORT_USES_AFTER = 8082#
K8S_PORT_USES_BEFORE = 8081#
K8S_PORT = 8080#
K8S_PORT_MONITORING = 9090#
class ConnectionStubs(address, channel, metrics)[source]#

Bases: object

Maintains a list of grpc stubs available for a particular connection

STUB_MAPPING = {'jina.JinaDataRequestRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaDataRequestRPCStub'>, 'jina.JinaDiscoverEndpointsRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaDiscoverEndpointsRPCStub'>, 'jina.JinaInfoRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaInfoRPCStub'>, 'jina.JinaRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaRPCStub'>, 'jina.JinaSingleDataRequestRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaSingleDataRequestRPCStub'>}#
async send_discover_endpoint(timeout=None)[source]#

Use the endpoint discovery stub to request for the Endpoints Exposed by an Executor

Parameters:

timeout (Optional[float]) – defines timeout for sending request

Return type:

Tuple

Returns:

Tuple of response and metadata about the response

async send_requests(requests, metadata, compression, timeout=None)[source]#

Send requests and uses the appropriate grpc stub for this Stub is chosen based on availability and type of requests

Parameters:
  • requests (List[Request]) – the requests to send

  • metadata – the metadata to send alongside the requests

  • compression – defines if compression should be used

  • timeout (Optional[float]) – defines timeout for sending request

Return type:

Tuple

Returns:

Tuple of response and metadata about the response

send_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None, retries=-1)[source]#

Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request: Request :param request: a single request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type: PollingType :param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint: Optional[str] :param endpoint: endpoint to target with the request :type timeout: Optional[float] :param timeout: timeout for sending the requests :type retries: Optional[int] :param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) :rtype: List[Task] :return: list of asyncio.Task items for each send call

send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None, retries=-1)[source]#

Send a request to target via one or all of the pooled connections, depending on polling_type

Parameters:
  • requests (List[Request]) – request (DataRequest) to send

  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • polling_type (PollingType) – defines if the request should be send to any or all pooled connections for the target

  • endpoint (Optional[str]) – endpoint to target with the requests

  • timeout (Optional[float]) – timeout for sending the requests

  • retries (Optional[int]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

Return type:

List[Task]

Returns:

list of asyncio.Task items for each send call

send_discover_endpoint(deployment, head=True, shard_id=None, timeout=None, retries=-1)[source]#

Sends a discover Endpoint call to target.

Parameters:
  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • timeout (Optional[float]) – timeout for sending the requests

  • retries (Optional[int]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

Return type:

Task

Returns:

asyncio.Task items to send call

send_request_once(request, deployment, head=False, shard_id=None, timeout=None, retries=-1)[source]#

Send msg to target via only one of the pooled connections :type request: Request :param request: request to send :type deployment: str :param deployment: name of the Jina deployment to send the message to :type head: bool :param head: If True it is send to the head, otherwise to the worker pods :type shard_id: Optional[int] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type timeout: Optional[float] :param timeout: timeout for sending the requests :type retries: Optional[int] :param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) :rtype: Task :return: asyncio.Task representing the send call

send_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None, timeout=None, retries=-1)[source]#

Send a request to target via only one of the pooled connections

Parameters:
  • requests (List[Request]) – request to send

  • deployment (str) – name of the Jina deployment to send the request to

  • head (bool) – If True it is send to the head, otherwise to the worker pods

  • shard_id (Optional[int]) – Send to a specific shard of the deployment, ignored for polling ALL

  • endpoint (Optional[str]) – endpoint to target with the requests

  • timeout (Optional[float]) – timeout for sending the requests

  • retries (Optional[int]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

Return type:

Task

Returns:

asyncio.Task representing the send call

add_connection(deployment, address, head=False, shard_id=None)[source]#

Adds a connection for a deployment to this connection pool

Parameters:
  • deployment (str) – The deployment the connection belongs to, like ‘encoder’

  • head (bool) – True if the connection is for a head

  • address (str) – Address used for the grpc connection, format is <host>:<port>

  • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

async remove_connection(deployment, address, head=False, shard_id=None)[source]#

Removes a connection to a deployment

Parameters:
  • deployment (str) – The deployment the connection belongs to, like ‘encoder’

  • address (str) – Address used for the grpc connection, format is <host>:<port>

  • head (bool) – True if the connection is for a head

  • shard_id (Optional[int]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads

Returns:

The removed connection, None if it did not exist

async close()[source]#

Closes the connection pool

static get_grpc_channel(address, options=None, asyncio=False, tls=False, root_certificates=None)[source]#

Creates a grpc channel to the given address

Parameters:
  • address (str) – The address to connect to, format is <host>:<port>

  • options (Optional[list]) – A list of options to pass to the grpc channel

  • asyncio (bool) – If True, use the asyncio implementation of the grpc channel

  • tls (bool) – If True, use tls encryption for the grpc channel

  • root_certificates (Optional[str]) – The path to the root certificates for tls, only used if tls is True

Return type:

Channel

Returns:

A grpc channel or an asyncio channel

static send_request_sync(request, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#

Sends a request synchronously to the target via grpc

Parameters:
  • request (Request) – the request to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout – timeout for the send

  • tls – if True, use tls encryption for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

  • endpoint (Optional[str]) – endpoint to target with the request

Return type:

Request

Returns:

the response request

static send_health_check_sync(target, timeout=100.0, tls=False, root_certificates=None)[source]#

Sends a request synchronously to the target via grpc

Parameters:
  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout – timeout for the send

  • tls – if True, use tls encryption for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

Return type:

HealthCheckResponse

Returns:

the response health check

static send_requests_sync(requests, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#

Sends a list of requests synchronically to the target via grpc

Parameters:
  • requests (List[Request]) – the requests to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout – timeout for the send

  • tls – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

  • endpoint (Optional[str]) – endpoint to target with the request

Return type:

Request

Returns:

the response request

static get_default_grpc_options()[source]#

Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters

async static send_request_async(request, target, timeout=1.0, tls=False, root_certificates=None)[source]#

Sends a request asynchronously to the target via grpc

Parameters:
  • request (Request) – the request to send

  • target (str) – where to send the request to, like 127.0.0.1:8080

  • timeout (float) – timeout for the send

  • tls (bool) – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only used if tls is True

Return type:

Request

Returns:

the response request

static create_async_channel_stub(address, metrics, tls=False, root_certificates=None)[source]#

Creates an async GRPC Channel. This channel has to be closed eventually!

Parameters:
  • address – the address to create the connection to, like 127.0.0.0.1:8080

  • tls – if True, use tls for the grpc channel

  • root_certificates (Optional[str]) – the path to the root certificates for tls, only u

  • metrics (_NetworkingMetrics) – NetworkingMetrics object that contain optional metrics

Return type:

Tuple[ConnectionStubs, Channel]

Returns:

DataRequest stubs and an async grpc channel

async static get_available_services(channel)[source]#

Lists available services by name, exposed at target address

Parameters:

channel – the channel to use

Return type:

List[str]

Returns:

List of services offered

jina.serve.networking.in_docker()[source]#

Checks if the current process is running inside Docker :return: True if the current process is running inside Docker

jina.serve.networking.host_is_local(hostname)[source]#

Check if hostname is point to localhost :param hostname: host to check :return: True if hostname means localhost, False otherwise