"""Module for async requests generator."""fromtypingimportAsyncIterator,Optional,Dict,TYPE_CHECKINGfromjina.clients.request.helperimport_new_data_request_from_batch,_new_data_requestfromjina.enumsimportDataInputTypefromjina.importerimportImportExtensionsfromjina.logging.predefinedimportdefault_loggerfromjina.types.requestimportRequestifTYPE_CHECKING:fromjina.clients.requestimportGeneratorSourceType
[docs]asyncdefrequest_generator(exec_endpoint:str,data:'GeneratorSourceType',request_size:int=0,data_type:DataInputType=DataInputType.AUTO,target_executor:Optional[str]=None,parameters:Optional[Dict]=None,**kwargs,# do not remove this, add on purpose to suppress unknown kwargs)->AsyncIterator['Request']:"""An async :function:`request_generator`. :param exec_endpoint: the endpoint string, by convention starts with `/` :param data: the data to use in the request :param request_size: the number of Documents per request :param data_type: if ``data`` is an iterator over self-contained document, i.e. :class:`DocumentSourceType`; or an iterator over possible Document content (set to text, blob and buffer). :param parameters: the kwargs that will be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param kwargs: additional arguments :yield: request """_kwargs=dict(extra_kwargs=kwargs)try:ifdataisNone:# this allows empty inputs, i.e. a data request with only parametersyield_new_data_request(endpoint=exec_endpoint,target=target_executor,parameters=parameters)else:withImportExtensions(required=True):importaiostreamasyncforbatchinaiostream.stream.chunks(data,request_size):yield_new_data_request_from_batch(_kwargs=kwargs,batch=batch,data_type=data_type,endpoint=exec_endpoint,target=target_executor,parameters=parameters,)exceptExceptionasex:# must be handled here, as grpc channel wont handle Python exceptiondefault_logger.critical(f'inputs is not valid! {ex!r}',exc_info=True)