"""Module containing the Base Client for Jina."""
import abc
import argparse
import inspect
import os
from abc import ABC
from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Optional, Union
from jina.excepts import BadClientInput
from jina.helper import T, parse_client, send_telemetry_event, typename
from jina.logging.logger import JinaLogger
from jina.logging.predefined import default_logger
if TYPE_CHECKING:
from jina.clients.request import GeneratorSourceType
from jina.types.request import Request, Response
InputType = Union[GeneratorSourceType, Callable[..., GeneratorSourceType]]
CallbackFnType = Optional[Callable[[Response], None]]
[docs]class BaseClient(ABC):
"""A base client for connecting to the Flow Gateway.
:param args: the Namespace from argparse
:param kwargs: additional parameters that can be accepted by client parser
"""
def __init__(
self,
args: Optional['argparse.Namespace'] = None,
**kwargs,
):
if args and isinstance(args, argparse.Namespace):
self.args = args
else:
self.args = parse_client(kwargs)
self.logger = JinaLogger(self.__class__.__name__, **vars(self.args))
if not self.args.proxy and os.name != 'nt':
# (Han 2020 12.12): gRPC channel is over HTTP2 and it does not work when we have proxy
# as many enterprise users are behind proxy, a quick way to
# surpass it is by temporally unset proxy. Please do NOT panic as it will NOT
# affect users os-level envs.
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
self._inputs = None
send_telemetry_event(event='start', obj=self)
def _get_requests(
self, **kwargs
) -> Union[Iterator['Request'], AsyncIterator['Request']]:
"""
Get request in generator.
:param kwargs: Keyword arguments.
:return: Iterator of request.
"""
_kwargs = vars(self.args)
_kwargs['data'] = self.inputs
# override by the caller-specific kwargs
_kwargs.update(kwargs)
if hasattr(self._inputs, '__len__'):
total_docs = len(self._inputs)
elif 'total_docs' in _kwargs:
total_docs = _kwargs['total_docs']
else:
total_docs = None
self._inputs_length = None
if total_docs:
self._inputs_length = max(1, total_docs / _kwargs['request_size'])
if inspect.isasyncgen(self.inputs):
from jina.clients.request.asyncio import request_generator
return request_generator(**_kwargs)
else:
from jina.clients.request import request_generator
return request_generator(**_kwargs)
@property
def inputs(self) -> 'InputType':
"""
An iterator of bytes, each element represents a Document's raw content.
``inputs`` defined in the protobuf
:return: inputs
"""
return self._inputs
@inputs.setter
def inputs(self, bytes_gen: 'InputType') -> None:
"""
Set the input data.
:param bytes_gen: input type
"""
if hasattr(bytes_gen, '__call__'):
self._inputs = bytes_gen()
else:
self._inputs = bytes_gen
@abc.abstractmethod
async def _get_results(
self,
inputs: 'InputType',
on_done: 'CallbackFnType',
on_error: Optional['CallbackFnType'] = None,
on_always: Optional['CallbackFnType'] = None,
**kwargs,
):
...
@abc.abstractmethod
def _dry_run(self, **kwargs) -> bool:
"""Sends a dry run to the Flow to validate if the Flow is ready to receive requests
:param kwargs: potential kwargs received passed from the public interface
"""
...
@property
def client(self: T) -> T:
"""Return the client object itself
:return: the Client object
"""
return self