jina.serve.streamer module#

class jina.serve.streamer.GatewayStreamer(graph_representation, executor_addresses, graph_conditions={}, deployments_metadata={}, deployments_no_reduce=[], timeout_send=None, retries=0, compression=None, runtime_name='custom gateway', prefetch=0, logger=None, metrics_registry=None, meter=None, aio_tracing_client_interceptors=None, tracing_client_interceptor=None)[source]#

Bases: object

Wrapper object to be used in a Custom Gateway. Naming to be defined

Parameters:
  • graph_representation (Dict) – A dictionary describing the topology of the Deployments. 2 special nodes are expected, the name start-gateway and end-gateway to determine the nodes that receive the very first request and the ones whose response needs to be sent back to the client. All the nodes with no outgoing nodes will be considered to be floating, and they will be “flagged” so that the user can ignore their tasks and not await them.

  • executor_addresses (Dict[str, Union[str, List[str]]]) – dictionary JSON with the input addresses of each Deployment. Each Executor can have one single address or a list of addrresses for each Executor

  • graph_conditions (Dict) – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • deployments_metadata (Dict[str, Dict[str, str]]) – Dictionary with the metadata of each Deployment. Each executor deployment can have a list of key-value pairs to provide information associated with the request to the deployment.

  • deployments_no_reduce (List[str]) – list of Executor disabling the built-in merging mechanism.

  • timeout_send (Optional[float]) – Timeout to be considered when sending requests to Executors

  • retries (int) – Number of retries to try to make successfull sendings to Executors

  • compression (Optional[str]) – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • runtime_name (str) – Name to be used for monitoring.

  • prefetch (int) – How many Requests are processed from the Client at the same time.

  • logger (Optional[JinaLogger]) – Optional logger that can be used for logging

  • metrics_registry (Optional[CollectorRegistry]) – optional metrics registry for prometheus used if we need to expose metrics

  • meter (Optional[Meter]) – optional OpenTelemetry meter that can provide instruments for collecting metrics

  • aio_tracing_client_interceptors (Optional[Sequence[ClientInterceptor]]) – Optional list of aio grpc tracing server interceptors.

  • tracing_client_interceptor (Optional[OpenTelemetryClientInterceptor]) – Optional gprc tracing server interceptor.

stream(*args, **kwargs)[source]#

stream requests from client iterator and stream responses back.

Parameters:
  • args – positional arguments to be passed to inner RequestStreamer

  • kwargs – keyword arguments to be passed to inner RequestStreamer

Returns:

An iterator over the responses from the Executors

async stream_docs(docs, request_size=100, return_results=False, exec_endpoint=None, target_executor=None, parameters=None, results_in_order=False)[source]#

stream documents and stream responses back.

Parameters:
  • docs (DocumentArray) – The Documents to be sent to all the Executors

  • request_size (int) – The amount of Documents to be put inside a single request.

  • return_results (bool) – If set to True, the generator will yield Responses and not DocumentArrays

  • exec_endpoint (Optional[str]) – The executor endpoint to which to send the Documents

  • target_executor (Optional[str]) – A regex expression indicating the Executors that should receive the Request

  • parameters (Optional[Dict]) – Parameters to be attached to the Requests

  • results_in_order (bool) – return the results in the same order as the request_iterator

Yield:

Yields DocumentArrays or Responses from the Executors

async close()[source]#

Gratefully closes the object making sure all the floating requests are taken care and the connections are closed gracefully

Call(*args, **kwargs)#

stream requests from client iterator and stream responses back.

Parameters:
  • args – positional arguments to be passed to inner RequestStreamer

  • kwargs – keyword arguments to be passed to inner RequestStreamer

Returns:

An iterator over the responses from the Executors

async process_single_data(request, context=None)[source]#

Implements request and response handling of a single DataRequest :type request: DataRequest :param request: DataRequest from Client :param context: grpc context :rtype: DataRequest :return: response DataRequest

static get_streamer()[source]#

Return a streamer object based on the current environment context. The streamer object is contructed using runtime arguments stored in the JINA_STREAMER_ARGS environment variable. If this method is used outside a Jina context (process not controlled/orchestrated by jina), this method will raise an error. The streamer object does not have tracing/instrumentation capabilities.

Returns:

Returns an instance of GatewayStreamer