jina.serve.networking module#
- class jina.serve.networking.ReplicaList(summary, logger)[source]#
Bases:
object
Maintains a list of connections to replicas and uses round robin for selecting a replica
- async reset_connection(address)[source]#
Removes and then re-adds a connection. Result is the same as calling
remove_connection()
and thenadd_connection()
, but this allows for handling of race condition if multiple callers reset a connection at the same time.- Parameters:
address (
str
) – Target address of this connection- Return type:
Optional
[Channel
]- Returns:
The reset connection or None if there was no connection for the given address
- add_connection(address)[source]#
Add connection with address to the connection list :type address:
str
:param address: Target address of this connection
- async remove_connection(address)[source]#
Remove connection with address from the connection list
Warning
This completely removes the connection, including all dictionary keys that point to it. Therefore, be careful not to call this method while iterating over all connections. If you want to reset (remove and re-add) a connection, use
jina.serve.networking.ReplicaList.reset_connection()
, which is safe to use in this scenario.- Parameters:
address (
str
) – Remove connection for this address- Return type:
Optional
[Channel
]- Returns:
The removed connection or None if there was not any for the given address
- async get_next_connection(num_retries=3)[source]#
Returns a connection from the list. Strategy is round robin :param num_retries: how many retries should be performed when all connections are currently unavailable :returns: A connection from the pool
- get_all_connections()[source]#
Returns all available connections :returns: A complete list of all connections from the pool
- has_connection(address)[source]#
Checks if a connection for ip exists in the list :type address:
str
:param address: The address to check :rtype:bool
:returns: True if a connection for the ip exists in the list
- class jina.serve.networking.GrpcConnectionPool(logger=None, compression=None, metrics_registry=None)[source]#
Bases:
object
Manages a list of grpc connections.
- Parameters:
logger (
Optional
[JinaLogger
]) – the logger to usecompression (
Optional
[str
]) – The compression algorithm to be used by this GRPCConnectionPool when sending data to GRPC
- K8S_PORT_USES_AFTER = 8082#
- K8S_PORT_USES_BEFORE = 8081#
- K8S_PORT = 8080#
- K8S_PORT_MONITORING = 9090#
- class ConnectionStubs(address, channel, summary)[source]#
Bases:
object
Maintains a list of grpc stubs available for a particular connection
- STUB_MAPPING = {'jina.JinaDataRequestRPC': <class 'jina.proto.pb2.jina_pb2_grpc.JinaDataRequestRPCStub'>, 'jina.JinaDiscoverEndpointsRPC': <class 'jina.proto.pb2.jina_pb2_grpc.JinaDiscoverEndpointsRPCStub'>, 'jina.JinaInfoRPC': <class 'jina.proto.pb2.jina_pb2_grpc.JinaInfoRPCStub'>, 'jina.JinaRPC': <class 'jina.proto.pb2.jina_pb2_grpc.JinaRPCStub'>, 'jina.JinaSingleDataRequestRPC': <class 'jina.proto.pb2.jina_pb2_grpc.JinaSingleDataRequestRPCStub'>}#
- async send_discover_endpoint(timeout=None)[source]#
Use the endpoint discovery stub to request for the Endpoints Exposed by an Executor
- Parameters:
timeout (
Optional
[float
]) – defines timeout for sending request- Return type:
Tuple
- Returns:
Tuple of response and metadata about the response
- async send_requests(requests, metadata, compression, timeout=None)[source]#
Send requests and uses the appropriate grpc stub for this Stub is chosen based on availability and type of requests
- Parameters:
requests (
List
[Request
]) – the requests to sendmetadata – the metadata to send alongside the requests
compression – defines if compression should be used
timeout (
Optional
[float
]) – defines timeout for sending request
- Return type:
Tuple
- Returns:
Tuple of response and metadata about the response
- send_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None, retries=-1)[source]#
Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request:
Request
:param request: a single request to send :type deployment:str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type:PollingType
:param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint:Optional
[str
] :param endpoint: endpoint to target with the request :type timeout:Optional
[float
] :param timeout: timeout for sending the requests :type retries:Optional
[int
] :param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) :rtype:List
[Task
] :return: list of asyncio.Task items for each send call
- send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None, retries=-1)[source]#
Send a request to target via one or all of the pooled connections, depending on polling_type
- Parameters:
requests (
List
[Request
]) – request (DataRequest) to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLpolling_type (
PollingType
) – defines if the request should be send to any or all pooled connections for the targetendpoint (
Optional
[str
]) – endpoint to target with the requeststimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
List
[Task
]- Returns:
list of asyncio.Task items for each send call
- send_discover_endpoint(deployment, head=True, shard_id=None, timeout=None, retries=-1)[source]#
Sends a discover Endpoint call to target.
- Parameters:
deployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLtimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
Task
- Returns:
asyncio.Task items to send call
- send_request_once(request, deployment, head=False, shard_id=None, timeout=None, retries=-1)[source]#
Send msg to target via only one of the pooled connections :type request:
Request
:param request: request to send :type deployment:str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type timeout:Optional
[float
] :param timeout: timeout for sending the requests :type retries:Optional
[int
] :param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) :rtype:Task
:return: asyncio.Task representing the send call
- send_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None, timeout=None, retries=-1)[source]#
Send a request to target via only one of the pooled connections
- Parameters:
requests (
List
[Request
]) – request to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLendpoint (
Optional
[str
]) – endpoint to target with the requeststimeout (
Optional
[float
]) – timeout for sending the requestsretries (
Optional
[int
]) – number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
- Return type:
Task
- Returns:
asyncio.Task representing the send call
- add_connection(deployment, address, head=False, shard_id=None)[source]#
Adds a connection for a deployment to this connection pool
- Parameters:
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’head (
bool
) – True if the connection is for a headaddress (
str
) – Address used for the grpc connection, format is <host>:<port>shard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- async remove_connection(deployment, address, head=False, shard_id=None)[source]#
Removes a connection to a deployment
- Parameters:
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’address (
str
) – Address used for the grpc connection, format is <host>:<port>head (
bool
) – True if the connection is for a headshard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- Returns:
The removed connection, None if it did not exist
- static get_grpc_channel(address, options=None, asyncio=False, tls=False, root_certificates=None)[source]#
Creates a grpc channel to the given address
- Parameters:
address (
str
) – The address to connect to, format is <host>:<port>options (
Optional
[list
]) – A list of options to pass to the grpc channelasyncio (
bool
) – If True, use the asyncio implementation of the grpc channeltls (
bool
) – If True, use tls encryption for the grpc channelroot_certificates (
Optional
[str
]) – The path to the root certificates for tls, only used if tls is True
- Return type:
Channel
- Returns:
A grpc channel or an asyncio channel
- static send_request_sync(request, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a request synchronously to the target via grpc
- Parameters:
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls encryption for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type:
- Returns:
the response request
- static send_health_check_sync(target, timeout=100.0, tls=False, root_certificates=None)[source]#
Sends a request synchronously to the target via grpc
- Parameters:
target (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls encryption for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is True
- Return type:
HealthCheckResponse
- Returns:
the response health check
- static send_requests_sync(requests, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a list of requests synchronically to the target via grpc
- Parameters:
requests (
List
[Request
]) – the requests to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type:
- Returns:
the response request
- static get_default_grpc_options()[source]#
Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters
- async static send_request_async(request, target, timeout=1.0, tls=False, root_certificates=None)[source]#
Sends a request asynchronously to the target via grpc
- Parameters:
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout (
float
) – timeout for the sendtls (
bool
) – if True, use tls for the grpc channelroot_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is True
- Return type:
- Returns:
the response request
- static create_async_channel_stub(address, tls=False, root_certificates=None, summary=None)[source]#
Creates an async GRPC Channel. This channel has to be closed eventually!
- Parameters:
address – the address to create the connection to, like 127.0.0.0.1:8080
tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only usummary – Optional Prometheus summary object
- Return type:
Tuple
[ConnectionStubs
,Channel
]- Returns:
DataRequest stubs and an async grpc channel