Source code for jina.clients.request

"""Module for Jina Requests."""

from typing import (
    Iterator,
    Union,
    Tuple,
    AsyncIterable,
    Iterable,
    Optional,
    Dict,
    TYPE_CHECKING,
)

from jina.clients.request.helper import _new_data_request_from_batch, _new_data_request
from jina.enums import DataInputType
from jina.helper import batch_iterator
from jina.logging.predefined import default_logger

if TYPE_CHECKING:
    from jina import Document
    from docarray.document import DocumentSourceType
    from docarray.document.mixins.content import DocumentContentType
    from jina.types.request import Request

    SingletonDataType = Union[
        DocumentContentType,
        DocumentSourceType,
        Document,
        Tuple[DocumentContentType, DocumentContentType],
        Tuple[DocumentSourceType, DocumentSourceType],
    ]

    GeneratorSourceType = Union[
        Document, Iterable[SingletonDataType], AsyncIterable[SingletonDataType]
    ]


[docs]def request_generator( exec_endpoint: str, data: 'GeneratorSourceType', request_size: int = 0, data_type: DataInputType = DataInputType.AUTO, target_executor: Optional[str] = None, parameters: Optional[Dict] = None, **kwargs, # do not remove this, add on purpose to suppress unknown kwargs ) -> Iterator['Request']: """Generate a request iterator. :param exec_endpoint: the endpoint string, by convention starts with `/` :param data: data to send, a list of dict/string/bytes that can be converted into a list of `Document` objects :param request_size: the number of the `Documents` in each request :param data_type: if ``data`` is an iterator over self-contained document, i.e. :class:`DocumentSourceType`; or an iterator over possible Document content (set to text, blob and buffer). :param parameters: a dictionary of parameters to be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param kwargs: additional arguments :yield: request """ _kwargs = dict(extra_kwargs=kwargs) try: if data is None: # this allows empty inputs, i.e. a data request with only parameters yield _new_data_request( endpoint=exec_endpoint, target=target_executor, parameters=parameters ) else: if not isinstance(data, Iterable): data = [data] for batch in batch_iterator(data, request_size): yield _new_data_request_from_batch( _kwargs=kwargs, batch=batch, data_type=data_type, endpoint=exec_endpoint, target=target_executor, parameters=parameters, ) except Exception as ex: # must be handled here, as grpc channel wont handle Python exception default_logger.critical(f'inputs is not valid! {ex!r}', exc_info=True)