Source code for jina.clients.mixin

import time
import warnings
from functools import partialmethod
from typing import TYPE_CHECKING, AsyncGenerator, Dict, List, Optional, Union

from jina.helper import get_or_reuse_loop, run_async
from jina.importer import ImportExtensions

if TYPE_CHECKING:
    from docarray import DocumentArray

    from jina.clients.base import CallbackFnType, InputType
    from jina.types.request.data import Response


def _include_results_field_in_param(parameters: Optional['Dict']) -> 'Dict':
    key_result = '__results__'

    if parameters:

        if key_result in parameters:
            if not isinstance(parameters[key_result], dict):
                warnings.warn(
                    f'It looks like you passed a dictionary with the key `{key_result}` to `parameters`.'
                    'This key is reserved, so the associated value will be deleted.'
                )
                parameters.update({key_result: dict()})
    else:
        parameters = {key_result: dict()}

    return parameters


[docs]class MutateMixin: """The GraphQL Mutation Mixin for Client and Flow"""
[docs] def mutate( self, mutation: str, variables: Optional[dict] = None, timeout: Optional[float] = None, headers: Optional[dict] = None, ): """Perform a GraphQL mutation :param mutation: the GraphQL mutation as a single string. :param variables: variables to be substituted in the mutation. Not needed if no variables are present in the mutation string. :param timeout: HTTP request timeout :param headers: HTTP headers :return: dict containing the optional keys ``data`` and ``errors``, for response data and errors. """ with ImportExtensions(required=True): from sgqlc.endpoint.http import HTTPEndpoint as SgqlcHTTPEndpoint proto = 'https' if self.args.tls else 'http' graphql_url = f'{proto}://{self.args.host}:{self.args.port}/graphql' endpoint = SgqlcHTTPEndpoint(graphql_url) res = endpoint( mutation, variables=variables, timeout=timeout, extra_headers=headers ) if 'errors' in res and res['errors']: msg = 'GraphQL mutation returned the following errors: ' for err in res['errors']: msg += err['message'] + '. ' raise ConnectionError(msg) return res
[docs]class AsyncMutateMixin(MutateMixin): """The async GraphQL Mutation Mixin for Client and Flow"""
[docs] async def mutate( self, mutation: str, variables: Optional[dict] = None, timeout: Optional[float] = None, headers: Optional[dict] = None, ): """Perform a GraphQL mutation, asynchronously :param mutation: the GraphQL mutation as a single string. :param variables: variables to be substituted in the mutation. Not needed if no variables are present in the mutation string. :param timeout: HTTP request timeout :param headers: HTTP headers :return: dict containing the optional keys ``data`` and ``errors``, for response data and errors. """ return await get_or_reuse_loop().run_in_executor( None, super().mutate, mutation, variables, timeout, headers )
[docs]class HealthCheckMixin: """The Health check Mixin for Client and Flow to expose `dry_run` API"""
[docs] def dry_run(self, **kwargs) -> bool: """Sends a dry run to the Flow to validate if the Flow is ready to receive requests :param kwargs: potential kwargs received passed from the public interface :return: boolean indicating the health/readiness of the Flow """ return run_async(self.client._dry_run, **kwargs)
[docs]class AsyncHealthCheckMixin: """The Health check Mixin for Client and Flow to expose `dry_run` API"""
[docs] async def dry_run(self, **kwargs) -> bool: """Sends a dry run to the Flow to validate if the Flow is ready to receive requests :param kwargs: potential kwargs received passed from the public interface :return: boolean indicating the health/readiness of the Flow """ return await self.client._dry_run(**kwargs)
def _render_response_table(r, st, ed, show_table: bool = True): from rich import print elapsed = (ed - st) * 1000 route = r.routes gateway_time = ( route[0].end_time.ToMilliseconds() - route[0].start_time.ToMilliseconds() ) exec_time = {} if len(route) > 1: for r in route[1:]: exec_time[r.executor] = ( r.end_time.ToMilliseconds() - r.start_time.ToMilliseconds() ) network_time = elapsed - gateway_time server_network = gateway_time - sum(exec_time.values()) from rich.table import Table def make_table(_title, _time, _percent): table = Table(show_header=False, box=None) table.add_row( _title, f'[b]{_time:.0f}[/b]ms', f'[dim]{_percent * 100:.0f}%[/dim]' ) return table from rich.tree import Tree t = Tree(make_table('Roundtrip', elapsed, 1)) t.add(make_table('Client-server network', network_time, network_time / elapsed)) t2 = t.add(make_table('Server', gateway_time, gateway_time / elapsed)) t2.add( make_table( 'Gateway-executors network', server_network, server_network / gateway_time ) ) for _name, _time in exec_time.items(): t2.add(make_table(_name, _time, _time / gateway_time)) if show_table: print(t) return { 'Roundtrip': elapsed, 'Client-server network': network_time, 'Server': gateway_time, 'Gateway-executors network': server_network, **exec_time, }
[docs]class ProfileMixin: """The Profile Mixin for Client and Flow to expose `profile` API"""
[docs] def profiling(self, show_table: bool = True) -> Dict[str, float]: """Profiling a single query's roundtrip including network and computation latency. Results is summarized in a Dict. :param show_table: whether to show the table or not. :return: the latency report in a dict. """ from jina import Document st = time.perf_counter() r = self.client.post('/', Document, return_responses=True) ed = time.perf_counter() return _render_response_table(r[0], st, ed, show_table=show_table)
[docs]class AsyncProfileMixin: """The Profile Mixin for Client and Flow to expose `profile` API"""
[docs] async def profiling(self, show_table: bool = True) -> Dict[str, float]: """Profiling a single query's roundtrip including network and computation latency. Results is summarized in a Dict. :param show_table: whether to show the table or not. :return: the latency report in a dict. """ from jina import Document st = time.perf_counter() async for r in self.client.post('/', Document, return_responses=True): ed = time.perf_counter() return _render_response_table(r, st, ed, show_table=show_table)
[docs]class PostMixin: """The Post Mixin class for Client and Flow"""
[docs] def post( self, on: str, inputs: Optional['InputType'] = None, on_done: Optional['CallbackFnType'] = None, on_error: Optional['CallbackFnType'] = None, on_always: Optional['CallbackFnType'] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs, ) -> Optional[Union['DocumentArray', List['Response']]]: """Post a general data request to the Flow. :param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document. :param on: the endpoint which is invoked. All the functions in the executors decorated by `@requests(on=...)` with the same endpoint are invoked. :param on_done: the function to be called when the :class:`Request` object is resolved. :param on_error: the function to be called when the :class:`Request` object is rejected. :param on_always: the function to be called when the :class:`Request` object is either resolved or rejected. :param parameters: the kwargs that will be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param request_size: the number of Documents per request. <=0 means all inputs in one request. :param show_progress: if set, client will show a progress bar on receiving every request. :param continue_on_error: if set, a Request that causes callback error will be logged only without blocking the further requests.7 :param return_responses: if set to True, the result will come as Response and not as a `DocumentArray` :param kwargs: additional parameters :return: None or DocumentArray containing all response Documents .. warning:: ``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``. """ c = self.client c.show_progress = show_progress c.continue_on_error = continue_on_error parameters = _include_results_field_in_param(parameters) from jina import DocumentArray return_results = (on_always is None) and (on_done is None) async def _get_results(*args, **kwargs): result = [] if return_responses else DocumentArray() async for resp in c._get_results(*args, **kwargs): if return_results: if return_responses: result.append(resp) else: result.extend(resp.data.docs) if return_results: return result return run_async( _get_results, inputs=inputs, on_done=on_done, on_error=on_error, on_always=on_always, exec_endpoint=on, target_executor=target_executor, parameters=parameters, request_size=request_size, **kwargs, )
# ONLY CRUD, for other request please use `.post` index = partialmethod(post, '/index') search = partialmethod(post, '/search') update = partialmethod(post, '/update') delete = partialmethod(post, '/delete')
[docs]class AsyncPostMixin: """The Async Post Mixin class for AsyncClient and AsyncFlow"""
[docs] async def post( self, on: str, inputs: Optional['InputType'] = None, on_done: Optional['CallbackFnType'] = None, on_error: Optional['CallbackFnType'] = None, on_always: Optional['CallbackFnType'] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs, ) -> AsyncGenerator[None, Union['DocumentArray', 'Response']]: """Async Post a general data request to the Flow. :param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document. :param on: the endpoint which is invoked. All the functions in the executors decorated by `@requests(on=...)` with the same endpoint are invoked. :param on_done: the function to be called when the :class:`Request` object is resolved. :param on_error: the function to be called when the :class:`Request` object is rejected. :param on_always: the function to be called when the :class:`Request` object is either resolved or rejected. :param parameters: the kwargs that will be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param request_size: the number of Documents per request. <=0 means all inputs in one request. :param show_progress: if set, client will show a progress bar on receiving every request. :param continue_on_error: if set, a Request that causes callback error will be logged only without blocking the further requests. :param return_responses: if set to True, the result will come as Response and not as a `DocumentArray` :param kwargs: additional parameters, can be used to pass metadata or authentication information in the server call :yield: Response object .. warning:: ``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``. """ c = self.client c.show_progress = show_progress c.continue_on_error = continue_on_error parameters = _include_results_field_in_param(parameters) async for result in c._get_results( inputs=inputs, on_done=on_done, on_error=on_error, on_always=on_always, exec_endpoint=on, target_executor=target_executor, parameters=parameters, request_size=request_size, **kwargs, ): if not return_responses: yield result.data.docs else: yield result
# ONLY CRUD, for other request please use `.post` index = partialmethod(post, '/index') search = partialmethod(post, '/search') update = partialmethod(post, '/update') delete = partialmethod(post, '/delete')