jina.serve.streamer module#
- class jina.serve.streamer.GatewayStreamer(graph_representation, executor_addresses, graph_conditions={}, deployments_metadata={}, deployments_no_reduce=[], timeout_send=None, retries=0, compression=None, runtime_name='custom gateway', prefetch=0, logger=None, metrics_registry=None, meter=None, aio_tracing_client_interceptors=None, tracing_client_interceptor=None)[source]#
Bases:
object
Wrapper object to be used in a Custom Gateway. Naming to be defined
- Parameters:
graph_representation (
Dict
) – A dictionary describing the topology of the Deployments. 2 special nodes are expected, the name start-gateway and end-gateway to determine the nodes that receive the very first request and the ones whose response needs to be sent back to the client. All the nodes with no outgoing nodes will be considered to be floating, and they will be “flagged” so that the user can ignore their tasks and not await them.executor_addresses (
Dict
[str
,Union
[str
,List
[str
]]]) – dictionary JSON with the input addresses of each Deployment. Each Executor can have one single address or a list of addrresses for each Executorgraph_conditions (
Dict
) – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.deployments_metadata (
Dict
[str
,Dict
[str
,str
]]) – Dictionary with the metadata of each Deployment. Each executor deployment can have a list of key-value pairs to provide information associated with the request to the deployment.deployments_no_reduce (
List
[str
]) – list of Executor disabling the built-in merging mechanism.timeout_send (
Optional
[float
]) – Timeout to be considered when sending requests to Executorsretries (
int
) – Number of retries to try to make successfull sendings to Executorscompression (
Optional
[str
]) – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.runtime_name (
str
) – Name to be used for monitoring.prefetch (
int
) – How many Requests are processed from the Client at the same time.logger (
Optional
[JinaLogger
]) – Optional logger that can be used for loggingmetrics_registry (
Optional
[CollectorRegistry
]) – optional metrics registry for prometheus used if we need to expose metricsmeter (
Optional
[Meter
]) – optional OpenTelemetry meter that can provide instruments for collecting metricsaio_tracing_client_interceptors (
Optional
[Sequence
[ClientInterceptor
]]) – Optional list of aio grpc tracing server interceptors.tracing_client_interceptor (
Optional
[OpenTelemetryClientInterceptor
]) – Optional gprc tracing server interceptor.
- stream(*args, **kwargs)[source]#
stream requests from client iterator and stream responses back.
- Parameters:
args – positional arguments to be passed to inner RequestStreamer
kwargs – keyword arguments to be passed to inner RequestStreamer
- Returns:
An iterator over the responses from the Executors
- async stream_docs(docs, request_size=100, return_results=False, exec_endpoint=None, target_executor=None, parameters=None, results_in_order=False)[source]#
stream documents and stream responses back.
- Parameters:
docs (
DocumentArray
) – The Documents to be sent to all the Executorsrequest_size (
int
) – The amount of Documents to be put inside a single request.return_results (
bool
) – If set to True, the generator will yield Responses and not DocumentArraysexec_endpoint (
Optional
[str
]) – The executor endpoint to which to send the Documentstarget_executor (
Optional
[str
]) – A regex expression indicating the Executors that should receive the Requestparameters (
Optional
[Dict
]) – Parameters to be attached to the Requestsresults_in_order (
bool
) – return the results in the same order as the request_iterator
- Yield:
Yields DocumentArrays or Responses from the Executors
- async close()[source]#
Gratefully closes the object making sure all the floating requests are taken care and the connections are closed gracefully
- Call(*args, **kwargs)#
stream requests from client iterator and stream responses back.
- Parameters:
args – positional arguments to be passed to inner RequestStreamer
kwargs – keyword arguments to be passed to inner RequestStreamer
- Returns:
An iterator over the responses from the Executors
- async process_single_data(request, context=None)[source]#
Implements request and response handling of a single DataRequest :type request:
DataRequest
:param request: DataRequest from Client :param context: grpc context :rtype:DataRequest
:return: response DataRequest
- static get_streamer()[source]#
Return a streamer object based on the current environment context. The streamer object is contructed using runtime arguments stored in the JINA_STREAMER_ARGS environment variable. If this method is used outside a Jina context (process not controlled/orchestrated by jina), this method will raise an error. The streamer object does not have tracing/instrumentation capabilities.
- Returns:
Returns an instance of GatewayStreamer