jina.serve.networking module#
- class jina.serve.networking.ReplicaList(metrics, histograms, logger, runtime_name, aio_tracing_client_interceptors=None, tracing_client_interceptor=None, deployment_name='')[source]#
Bases:
object
Maintains a list of connections to replicas and uses round robin for selecting a replica
- async reset_connection(address, deployment_name)[source]#
Removes and then re-adds a connection. Result is the same as calling
remove_connection()
and thenadd_connection()
, but this allows for handling of race condition if multiple callers reset a connection at the same time.- Parameters:
address (
str
) – Target address of this connectiondeployment_name (
str
) – Target deployment of this connection
- Return type:
Optional
[Channel
]- Returns:
The reset connection or None if there was no connection for the given address
- add_connection(address, deployment_name)[source]#
Add connection with address to the connection list :type address:
str
:param address: Target address of this connection :type deployment_name:str
:param deployment_name: Target deployment of this connection
- async remove_connection(address)[source]#
Remove connection with address from the connection list
Warning
This completely removes the connection, including all dictionary keys that point to it. Therefore, be careful not to call this method while iterating over all connections. If you want to reset (remove and re-add) a connection, use
jina.serve.networking.ReplicaList.reset_connection()
, which is safe to use in this scenario.- Parameters:
address (
str
) – Remove connection for this address- Return type:
Optional
[Channel
]- Returns:
The removed connection or None if there was not any for the given address
- async get_next_connection(num_retries=3)[source]#
Returns a connection from the list. Strategy is round robin :param num_retries: how many retries should be performed when all connections are currently unavailable :returns: A connection from the pool
- get_all_connections()[source]#
Returns all available connections :returns: A complete list of all connections from the pool
- has_connection(address)[source]#
Checks if a connection for ip exists in the list :type address:
str
:param address: The address to check :rtype:bool
:returns: True if a connection for the ip exists in the list
- has_connections()[source]#
Checks if this contains any connection :rtype:
bool
:returns: True if any connection is managed, False otherwise
- property warmup_stubs#
Return set of warmup stubs :returns: Set of stubs. The set doesn’t remove any items once added.
- class jina.serve.networking.GrpcConnectionPool(runtime_name, logger=None, compression=None, metrics_registry=None, meter=None, aio_tracing_client_interceptors=None, tracing_client_interceptor=None)[source]#
Bases:
object
Manages a list of grpc connections.
- Parameters:
logger (
Optional
[JinaLogger
]) – the logger to usecompression (
Optional
[str
]) – The compression algorithm to be used by this GRPCConnectionPool when sending data to GRPC
- K8S_PORT_USES_AFTER = 8082#
- K8S_PORT_USES_BEFORE = 8081#
- K8S_PORT = 8080#
- K8S_PORT_MONITORING = 9090#
- class ConnectionStubs(address, channel, deployment_name, metrics, histograms)[source]#
Bases:
object
Maintains a list of grpc stubs available for a particular connection
- STUB_MAPPING = {'jina.JinaDataRequestRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaDataRequestRPCStub'>, 'jina.JinaDiscoverEndpointsRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaDiscoverEndpointsRPCStub'>, 'jina.JinaInfoRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaInfoRPCStub'>, 'jina.JinaRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaRPCStub'>, 'jina.JinaSingleDataRequestRPC': <class 'jina.proto.pb.jina_pb2_grpc.JinaSingleDataRequestRPCStub'>}#
- async send_discover_endpoint(timeout=None)[source]#
Use the endpoint discovery stub to request for the Endpoints Exposed by an Executor
- Parameters:
timeout (
Optional
[float
]) – defines timeout for sending request- Return type:
Tuple
- Returns:
Tuple of response and metadata about the response
- async send_requests(requests, metadata, compression, timeout=None)[source]#
Send requests and uses the appropriate grpc stub for this Stub is chosen based on availability and type of requests
- Parameters:
requests (
List
[Request
]) – the requests to sendmetadata – the metadata to send alongside the requests
compression – defines if compression should be used
timeout (
Optional
[float
]) – defines timeout for sending request
- Return type:
Tuple
- Returns:
Tuple of response and metadata about the response
- send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, metadata=None, timeout=None, retries=-1)[source]#
Send a request to target via one or all of the pooled connections, depending on polling_type
- Parameters:
requests (
List
[Request
]) – request (DataRequest) to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLpolling_type (
PollingType
) – defines if the request should be send to any or all pooled connections for the targetendpoint (
Optional
[str
]) – endpoint to target with the requestsmetadata (
Optional
[Dict
[str
,str
]]) – metadata to send with the requeststimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
List
[Task
]- Returns:
list of asyncio.Task items for each send call
- send_discover_endpoint(deployment, head=True, shard_id=None, timeout=None, retries=-1)[source]#
Sends a discover Endpoint call to target.
- Parameters:
deployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLtimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
Optional
[Task
]- Returns:
asyncio.Task items to send call
- send_requests_once(requests, deployment, metadata=None, head=False, shard_id=None, endpoint=None, timeout=None, retries=-1)[source]#
Send a request to target via only one of the pooled connections
- Parameters:
requests (
List
[Request
]) – request to senddeployment (
str
) – name of the Jina deployment to send the request tometadata (
Optional
[Dict
[str
,str
]]) – metadata to send with the requesthead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLendpoint (
Optional
[str
]) – endpoint to target with the requeststimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
Optional
[Task
]- Returns:
asyncio.Task representing the send call
- add_connection(deployment, address, head=False, shard_id=None)[source]#
Adds a connection for a deployment to this connection pool
- Parameters:
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’head (
bool
) – True if the connection is for a headaddress (
str
) – Address used for the grpc connection, format is <host>:<port>shard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- async remove_connection(deployment, address, head=False, shard_id=None)[source]#
Removes a connection to a deployment
- Parameters:
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’address (
str
) – Address used for the grpc connection, format is <host>:<port>head (
bool
) – True if the connection is for a headshard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- Returns:
The removed connection, None if it did not exist
- async warmup(deployment, stop_event)[source]#
Executes JinaInfoRPC against the provided deployment. A single task is created for each replica connection. :type deployment:
str
:param deployment: deployment name and the replicas that needs to be warmed up. :type stop_event:Event
:param stop_event: signal to indicate if an early termination of the task is required for graceful teardown.
- static get_grpc_channel(address, options=None, asyncio=False, tls=False, root_certificates=None, aio_tracing_client_interceptors=None, tracing_client_interceptor=None)[source]#
Creates a grpc channel to the given address
- Parameters:
address (
str
) – The address to connect to, format is <host>:<port>options (
Optional
[list
]) – A list of options to pass to the grpc channelasyncio (
bool
) – If True, use the asyncio implementation of the grpc channeltls (
bool
) – If True, use tls encryption for the grpc channelroot_certificates (
Optional
[str
]) – The path to the root certificates for tls, only used if tls is Trueaio_tracing_client_interceptors (
Optional
[Sequence
[ClientInterceptor
]]) – List of async io gprc client tracing interceptors for tracing requests if asycnio is Truetracing_client_interceptor (
Optional
[OpenTelemetryClientInterceptor
]) – A gprc client tracing interceptor for tracing requests if asyncio is False
- Return type:
Channel
- Returns:
A grpc channel or an asyncio channel
- static send_request_sync(request, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a request synchronously to the target via grpc
- Parameters:
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls encryption for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type:
- Returns:
the response request
- static send_health_check_sync(target, timeout=100.0, tls=False, root_certificates=None)[source]#
Sends a request synchronously to the target via grpc
- Parameters:
target (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls encryption for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is True
- Return type:
HealthCheckResponse
- Returns:
the response health check
- static send_requests_sync(requests, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a list of requests synchronically to the target via grpc
- Parameters:
requests (
List
[Request
]) – the requests to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type:
- Returns:
the response request
- static get_default_grpc_options()[source]#
Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters
- async static send_request_async(request, target, timeout=1.0, tls=False, root_certificates=None)[source]#
Sends a request asynchronously to the target via grpc
- Parameters:
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout (
float
) – timeout for the sendtls (
bool
) – if True, use tls for the grpc channelroot_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is True
- Return type:
- Returns:
the response request
- static create_async_channel_stub(address, deployment_name, metrics, histograms, tls=False, root_certificates=None, aio_tracing_client_interceptors=None)[source]#
Creates an async GRPC Channel. This channel has to be closed eventually!
- Parameters:
address – the address to create the connection to, like 127.0.0.0.1:8080
deployment_name (
str
) – the name of the deployment (e.g. executor0)tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only umetrics (
_NetworkingMetrics
) – NetworkingMetrics object that contain optional metricshistograms (
_NetworkingHistograms
) – NetworkingHistograms object that optionally record metricsaio_tracing_client_interceptors (
Optional
[Sequence
[ClientInterceptor
]]) – List of async io gprc client tracing interceptors for tracing requests for asycnio channel
- Return type:
Tuple
[ConnectionStubs
,Channel
]- Returns:
DataRequest stubs and an async grpc channel