Source code for jina.serve.runtimes.monitoring

import asyncio
import copy
import time
from typing import TYPE_CHECKING, Optional

from jina.importer import ImportExtensions
from jina.proto import jina_pb2

if TYPE_CHECKING:  # pragma: no cover
    from opentelemetry.metrics import Meter
    from prometheus_client import CollectorRegistry

    from jina.types.request import Request


[docs]class MonitoringMixin: """The Monitoring Mixin for pods""" def _setup_monitoring(self): """ Wait for the monitoring server to start """ if self.args.monitoring: from prometheus_client import CollectorRegistry self.metrics_registry = CollectorRegistry() else: self.metrics_registry = None if self.args.monitoring: from prometheus_client import start_http_server start_http_server( int(self.args.port_monitoring), registry=self.metrics_registry )
[docs]class MonitoringRequestMixin: """ Mixin for the request handling monitoring :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler :param runtime_name: optional runtime_name that will be registered during monitoring """ def __init__( self, metrics_registry: Optional['CollectorRegistry'] = None, meter: Optional['Meter'] = None, runtime_name: Optional[str] = None, ): self._request_init_time = {} if metrics_registry else None self._meter_request_init_time = {} if meter else None if metrics_registry: with ImportExtensions( required=True, help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina', ): from prometheus_client import Counter, Gauge, Summary from jina.serve.monitoring import _SummaryDeprecated self._receiving_request_metrics = Summary( 'receiving_request_seconds', 'Time spent processing successful request', registry=metrics_registry, namespace='jina', labelnames=('runtime_name',), ).labels(runtime_name) self._pending_requests_metrics = Gauge( 'number_of_pending_requests', 'Number of pending requests', registry=metrics_registry, namespace='jina', labelnames=('runtime_name',), ).labels(runtime_name) self._failed_requests_metrics = Counter( 'failed_requests', 'Number of failed requests', registry=metrics_registry, namespace='jina', labelnames=('runtime_name',), ).labels(runtime_name) self._successful_requests_metrics = Counter( 'successful_requests', 'Number of successful requests', registry=metrics_registry, namespace='jina', labelnames=('runtime_name',), ).labels(runtime_name) self._request_size_metrics = _SummaryDeprecated( old_name='request_size_bytes', name='received_request_bytes', documentation='The size in bytes of the request returned to the client', namespace='jina', labelnames=('runtime_name',), registry=metrics_registry, ).labels(runtime_name) self._sent_response_bytes = Summary( 'sent_response_bytes', 'The size in bytes of the request returned to the client', namespace='jina', labelnames=('runtime_name',), registry=metrics_registry, ).labels(runtime_name) else: self._receiving_request_metrics = None self._pending_requests_metrics = None self._failed_requests_metrics = None self._successful_requests_metrics = None self._request_size_metrics = None self._sent_response_bytes = None if meter: self._receiving_request_histogram = meter.create_histogram( name='jina_receiving_request_seconds', description='Time spent processing successful request', ) self._pending_requests_up_down_counter = meter.create_up_down_counter( name='jina_number_of_pending_requests', description='Number of pending requests', ) self._failed_requests_counter = meter.create_counter( name='jina_failed_requests', description='Number of failed requests', ) self._successful_requests_counter = meter.create_counter( name='jina_successful_requests', description='Number of successful requests', ) self._request_size_histogram = meter.create_histogram( name='jina_received_request_bytes', description='The size in bytes of the request returned to the client', ) self._sent_response_bytes_histogram = meter.create_histogram( name='jina_sent_response_bytes', description='The size in bytes of the request returned to the client', ) else: self._receiving_request_histogram = None self._pending_requests_up_down_counter = None self._failed_requests_counter = None self._successful_requests_counter = None self._request_size_histogram = None self._sent_response_bytes_histogram = None self._metric_labels = {'runtime_name': runtime_name} def _update_start_request_metrics(self, request: 'Request'): if self._request_size_metrics: self._request_size_metrics.observe(request.nbytes) if self._request_size_histogram: self._request_size_histogram.record( request.nbytes, attributes=self._metric_labels ) if self._receiving_request_metrics: self._request_init_time[request.request_id] = time.time() if self._receiving_request_histogram: self._meter_request_init_time[request.request_id] = time.time() if self._pending_requests_metrics: self._pending_requests_metrics.inc() if self._pending_requests_up_down_counter: self._pending_requests_up_down_counter.add( 1, attributes=self._metric_labels ) def _update_end_successful_requests_metrics(self, result: 'Request'): if ( self._receiving_request_metrics ): # this one should only be observed when the metrics is succesful init_time = self._request_init_time.pop( result.request_id ) # need to pop otherwise it stays in memory forever self._receiving_request_metrics.observe(time.time() - init_time) if ( self._receiving_request_histogram ): # this one should only be observed when the metrics is succesful init_time = self._meter_request_init_time.pop( result.request_id ) # need to pop otherwise it stays in memory forever self._receiving_request_histogram.record( time.time() - init_time, attributes=self._metric_labels ) if self._pending_requests_metrics: self._pending_requests_metrics.dec() if self._pending_requests_up_down_counter: self._pending_requests_up_down_counter.add( -1, attributes=self._metric_labels ) if self._successful_requests_metrics: self._successful_requests_metrics.inc() if self._successful_requests_counter: self._successful_requests_counter.add(1, attributes=self._metric_labels) if self._sent_response_bytes: self._sent_response_bytes.observe(result.nbytes) if self._sent_response_bytes_histogram: self._sent_response_bytes_histogram.record( result.nbytes, attributes=self._metric_labels ) def _update_end_failed_requests_metrics(self): if self._pending_requests_metrics: self._pending_requests_metrics.dec() if self._pending_requests_up_down_counter: self._pending_requests_up_down_counter.add( -1, attributes=self._metric_labels ) if self._failed_requests_metrics: self._failed_requests_metrics.inc() if self._failed_requests_counter: self._failed_requests_counter.add(1, attributes=self._metric_labels) def _update_end_request_metrics(self, result: 'Request'): if result.status.code != jina_pb2.StatusProto.ERROR: self._update_end_successful_requests_metrics(result) else: self._update_end_failed_requests_metrics()