jina.serve.networking module#
- class jina.serve.networking.ReplicaList(summary)[source]#
Bases:
object
Maintains a list of connections to replicas and uses round robin for selecting a replica
- add_connection(address)[source]#
Add connection with address to the connection list :type address:
str
:param address: Target address of this connection
- async remove_connection(address)[source]#
Remove connection with address from the connection list :type address:
str
:param address: Remove connection for this address :returns: The removed connection or None if there was not any for the given address
- get_next_connection()[source]#
Returns a connection from the list. Strategy is round robin :returns: A connection from the pool
- get_all_connections()[source]#
Returns all available connections :returns: A complete list of all connections from the pool
- has_connection(address)[source]#
Checks if a connection for ip exists in the list :type address:
str
:param address: The address to check :rtype:bool
:returns: True if a connection for the ip exists in the list
- class jina.serve.networking.GrpcConnectionPool(logger=None, compression='NoCompression', metrics_registry=None)[source]#
Bases:
object
Manages a list of grpc connections.
- Parameters
logger (
Optional
[JinaLogger
]) – the logger to usecompression (
str
) – The compression algorithm to be used by this GRPCConnectionPool when sending data to GRPC
- K8S_PORT_USES_AFTER = 8082#
- K8S_PORT_USES_BEFORE = 8081#
- K8S_PORT = 8080#
- class ConnectionStubs(address, channel, summary)[source]#
Bases:
object
Maintains a list of grpc stubs available for a particular connection
- STUB_MAPPING = {'jina.JinaControlRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaControlRequestRPCStub'>, 'jina.JinaDataRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaDataRequestRPCStub'>, 'jina.JinaDiscoverEndpointsRPC': <class 'jina.proto.jina_pb2_grpc.JinaDiscoverEndpointsRPCStub'>, 'jina.JinaRPC': <class 'jina.proto.jina_pb2_grpc.JinaRPCStub'>, 'jina.JinaSingleDataRequestRPC': <class 'jina.proto.jina_pb2_grpc.JinaSingleDataRequestRPCStub'>}#
- async send_discover_endpoint(timeout=None)[source]#
Use the endpoint discovery stub to request for the Endpoints Exposed by an Executor
- Parameters
timeout (
Optional
[float
]) – defines timeout for sending request- Return type
Tuple
- Returns
Tuple of response and metadata about the response
- async send_requests(requests, metadata, compression, timeout=None)[source]#
Send requests and uses the appropriate grpc stub for this Stub is chosen based on availability and type of requests
- Parameters
requests (
List
[Request
]) – the requests to sendmetadata – the metadata to send alongside the requests
compression – defines if compression should be used
timeout (
Optional
[float
]) – defines timeout for sending request
- Return type
Tuple
- Returns
Tuple of response and metadata about the response
- send_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None)[source]#
Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request:
Request
:param request: a single request to send :type deployment:str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type:PollingType
:param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint:Optional
[str
] :param endpoint: endpoint to target with the request :type timeout:Optional
[float
] :param timeout: timeout for sending the requests :rtype:List
[Task
] :return: list of asyncio.Task items for each send call
- send_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None, timeout=None)[source]#
Send a request to target via one or all of the pooled connections, depending on polling_type
- Parameters
requests (
List
[Request
]) – request (DataRequest/ControlRequest) to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLpolling_type (
PollingType
) – defines if the request should be send to any or all pooled connections for the targetendpoint (
Optional
[str
]) – endpoint to target with the requeststimeout (
Optional
[float
]) – timeout for sending the requests
- Return type
List
[Task
]- Returns
list of asyncio.Task items for each send call
- send_discover_endpoint(deployment, head=True, shard_id=None, timeout=None)[source]#
Sends a discover Endpoint call to target.
- Parameters
deployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLtimeout (
Optional
[float
]) – timeout for sending the requests
- Return type
Task
- Returns
asyncio.Task items to send call
- send_request_once(request, deployment, head=False, shard_id=None, timeout=None)[source]#
Send msg to target via only one of the pooled connections :type request:
Request
:param request: request to send :type deployment:str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type timeout:Optional
[float
] :param timeout: timeout for sending the requests :rtype:Task
:return: asyncio.Task representing the send call
- send_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None, timeout=None)[source]#
Send a request to target via only one of the pooled connections
- Parameters
requests (
List
[Request
]) – request to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLendpoint (
Optional
[str
]) – endpoint to target with the requeststimeout (
Optional
[float
]) – timeout for sending the requests
- Return type
Task
- Returns
asyncio.Task representing the send call
- add_connection(deployment, address, head=False, shard_id=None)[source]#
Adds a connection for a deployment to this connection pool
- Parameters
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’head (
bool
) – True if the connection is for a headaddress (
str
) – Address used for the grpc connection, format is <host>:<port>shard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- async remove_connection(deployment, address, head=False, shard_id=None)[source]#
Removes a connection to a deployment
- Parameters
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’address (
str
) – Address used for the grpc connection, format is <host>:<port>head (
bool
) – True if the connection is for a headshard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
- Returns
The removed connection, None if it did not exist
- static get_grpc_channel(address, options=None, asyncio=False, tls=False, root_certificates=None)[source]#
Creates a grpc channel to the given address
- Parameters
address (
str
) – The address to connect to, format is <host>:<port>options (
Optional
[list
]) – A list of options to pass to the grpc channelasyncio (
bool
) – If True, use the asyncio implementation of the grpc channeltls (
bool
) – If True, use tls encryption for the grpc channelroot_certificates (
Optional
[str
]) – The path to the root certificates for tls, only used if tls is True
- Return type
Channel
- Returns
A grpc channel or an asyncio channel
- static activate_worker_sync(worker_host, worker_port, target_head, shard_id=None)[source]#
Register a given worker to a head by sending an activate request
- Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the activate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
- Return type
- Returns
the response request
- async static activate_worker(worker_host, worker_port, target_head, shard_id=None)[source]#
Register a given worker to a head by sending an activate request
- Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the activate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
- Return type
- Returns
the response request
- async static deactivate_worker(worker_host, worker_port, target_head, shard_id=None)[source]#
Remove a given worker to a head by sending a deactivate request
- Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the deactivate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
- Return type
- Returns
the response request
- static send_request_sync(request, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a request synchronously to the target via grpc
- Parameters
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls encryption for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type
- Returns
the response request
- static send_requests_sync(requests, target, timeout=100.0, tls=False, root_certificates=None, endpoint=None)[source]#
Sends a list of requests synchronically to the target via grpc
- Parameters
requests (
List
[Request
]) – the requests to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
- Return type
- Returns
the response request
- static get_default_grpc_options()[source]#
Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters
- async static send_request_async(request, target, timeout=1.0, tls=False, root_certificates=None)[source]#
Sends a request asynchronously to the target via grpc
- Parameters
request (
Request
) – the request to sendtarget (
str
) – where to send the request to, like 127.0.0.1:8080timeout (
float
) – timeout for the sendtls (
bool
) – if True, use tls for the grpc channelroot_certificates (
Optional
[str
]) – the path to the root certificates for tls, only used if tls is True
- Return type
- Returns
the response request
- static create_async_channel_stub(address, tls=False, root_certificates=None, summary=None)[source]#
Creates an async GRPC Channel. This channel has to be closed eventually!
- Parameters
address – the address to create the connection to, like 127.0.0.0.1:8080
tls – if True, use tls for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for tls, only usummary – Optional Prometheus summary object
- Return type
Tuple
[ConnectionStubs
,Channel
]- Returns
DataRequest/ControlRequest stubs and an async grpc channel