"""Module containing the Base Client for Jina."""importabcimportargparseimportinspectimportosfromabcimportABCfromtypingimportTYPE_CHECKING,AsyncIterator,Callable,Iterator,Optional,Unionfromjina.exceptsimportBadClientInputfromjina.helperimportT,parse_client,typenamefromjina.logging.loggerimportJinaLoggerfromjina.logging.predefinedimportdefault_loggerifTYPE_CHECKING:fromjina.clients.requestimportGeneratorSourceTypefromjina.types.requestimportRequest,ResponseInputType=Union[GeneratorSourceType,Callable[...,GeneratorSourceType]]CallbackFnType=Optional[Callable[[Response],None]]
[docs]classBaseClient(ABC):"""A base client for connecting to the Flow Gateway. :param args: the Namespace from argparse :param kwargs: additional parameters that can be accepted by client parser """def__init__(self,args:Optional['argparse.Namespace']=None,**kwargs,):ifargsandisinstance(args,argparse.Namespace):self.args=argselse:self.args=parse_client(kwargs)self.logger=JinaLogger(self.__class__.__name__,**vars(self.args))ifnotself.args.proxyandos.name!='nt':# (Han 2020 12.12): gRPC channel is over HTTP2 and it does not work when we have proxy# as many enterprise users are behind proxy, a quick way to# surpass it is by temporally unset proxy. Please do NOT panic as it will NOT# affect users os-level envs.os.unsetenv('http_proxy')os.unsetenv('https_proxy')self._inputs=None
[docs]@staticmethoddefcheck_input(inputs:Optional['InputType']=None,**kwargs)->None:"""Validate the inputs and print the first request if success. :param inputs: the inputs :param kwargs: keyword arguments """ifinputsisNone:# empty inputs is considered as validreturnifhasattr(inputs,'__call__'):# it is a functioninputs=inputs()kwargs['data']=inputskwargs['exec_endpoint']='/'ifinspect.isasyncgenfunction(inputs)orinspect.isasyncgen(inputs):raiseBadClientInput('checking the validity of an async generator is not implemented yet')try:fromjina.clients.requestimportrequest_generatorr=next(request_generator(**kwargs))fromjina.types.requestimportRequestifnotisinstance(r,Request):raiseTypeError(f'{typename(r)} is not a valid Request')exceptExceptionasex:default_logger.error(f'inputs is not valid!')raiseBadClientInputfromex
def_get_requests(self,**kwargs)->Union[Iterator['Request'],AsyncIterator['Request']]:""" Get request in generator. :param kwargs: Keyword arguments. :return: Iterator of request. """_kwargs=vars(self.args)_kwargs['data']=self.inputs# override by the caller-specific kwargs_kwargs.update(kwargs)ifhasattr(self._inputs,'__len__'):total_docs=len(self._inputs)elif'total_docs'in_kwargs:total_docs=_kwargs['total_docs']else:total_docs=Noneself._inputs_length=Noneiftotal_docs:self._inputs_length=max(1,total_docs/_kwargs['request_size'])ifinspect.isasyncgen(self.inputs):fromjina.clients.request.asyncioimportrequest_generatorreturnrequest_generator(**_kwargs)else:fromjina.clients.requestimportrequest_generatorreturnrequest_generator(**_kwargs)@propertydefinputs(self)->'InputType':""" An iterator of bytes, each element represents a Document's raw content. ``inputs`` defined in the protobuf :return: inputs """returnself._inputs@inputs.setterdefinputs(self,bytes_gen:'InputType')->None:""" Set the input data. :param bytes_gen: input type """ifhasattr(bytes_gen,'__call__'):self._inputs=bytes_gen()else:self._inputs=bytes_gen@abc.abstractmethodasyncdef_get_results(self,inputs:'InputType',on_done:'CallbackFnType',on_error:Optional['CallbackFnType']=None,on_always:Optional['CallbackFnType']=None,**kwargs,):...@propertydefclient(self:T)->T:"""Return the client object itself :return: the Client object """returnself