Source code for jina.types.request

from typing import Union, Optional, TypeVar, Dict

from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson

from ..sets import QueryLangSet, DocumentSet
from ...enums import CompressAlgo, RequestType
from ...excepts import BadRequestType
from ...helper import random_identity, typename
from ...proto import jina_pb2

_body_type = set(str(v).lower() for v in RequestType)
_trigger_body_fields = set(kk
                           for v in [jina_pb2.RequestProto.IndexRequestProto,
                                     jina_pb2.RequestProto.SearchRequestProto,
                                     jina_pb2.RequestProto.TrainRequestProto,
                                     jina_pb2.RequestProto.ControlRequestProto]
                           for kk in v.DESCRIPTOR.fields_by_name.keys())
_trigger_req_fields = set(jina_pb2.RequestProto.DESCRIPTOR.fields_by_name.keys()).difference(_body_type)
_trigger_fields = _trigger_req_fields.union(_trigger_body_fields)

__all__ = ['Request', 'Response']

RequestSourceType = TypeVar('RequestSourceType',
                            jina_pb2.RequestProto, bytes, str, Dict)


[docs]class Request: """ :class:`Request` is one of the **primitive data type** in Jina. It offers a Pythonic interface to allow users access and manipulate :class:`jina.jina_pb2.RequestProto` object without working with Protobuf itself. A container for serialized :class:`jina_pb2.RequestProto` that only triggers deserialization and decompression when receives the first read access to its member. It overrides :meth:`__getattr__` to provide the same get/set interface as an :class:`jina_pb2.RequestProto` object. """ def __init__(self, request: Union[bytes, dict, str, 'jina_pb2.RequestProto', None] = None, envelope: Optional['jina_pb2.EnvelopeProto'] = None, copy: bool = False): self._buffer = None self._request = jina_pb2.RequestProto() # type: 'jina_pb2.RequestProto' try: if isinstance(request, jina_pb2.RequestProto): if copy: self._request.CopyFrom(request) else: self._request = request elif isinstance(request, dict): json_format.ParseDict(request, self._request) elif isinstance(request, str): json_format.Parse(request, self._request) elif isinstance(request, bytes): self._buffer = request self._request = None elif request is None: # make sure every new request has a request id self._request.request_id = random_identity() elif request is not None: # note ``None`` is not considered as a bad type raise ValueError(f'{typename(request)} is not recognizable') except Exception as ex: raise BadRequestType(f'fail to construct a request from {request}') from ex self._envelope = envelope self.is_used = False #: Return True when request has been r/w at least once def __getattr__(self, name: str): # https://docs.python.org/3/reference/datamodel.html#object.__getattr__ if name in _trigger_body_fields: return getattr(self.body, name) else: return getattr(self.as_pb_object, name) @property def body(self): if self._request_type: return getattr(self.as_pb_object, self._request_type) else: raise ValueError(f'"request_type" is not set yet') @property def _request_type(self) -> str: return self.as_pb_object.WhichOneof('body') @property def request_type(self) -> Optional[str]: """Return the request body type, when not set yet, return ``None``""" if self._request_type: return self.body.__class__.__name__ @request_type.setter def request_type(self, value: str): """Set the type of this request, but keep the body empty""" value = value.lower() if value in _body_type: getattr(self.as_pb_object, value).SetInParent() else: raise ValueError(f'{value} is not valid, must be one of {_body_type}') @property def docs(self) -> 'DocumentSet': self.is_used = True return DocumentSet(self.body.docs) @property def groundtruths(self) -> 'DocumentSet': self.is_used = True return DocumentSet(self.body.groundtruths) @staticmethod def _decompress(data: bytes, algorithm: str) -> bytes: if not algorithm: return data ctag = CompressAlgo.from_string(algorithm) if ctag == CompressAlgo.LZ4: import lz4.frame data = lz4.frame.decompress(data) elif ctag == CompressAlgo.BZ2: import bz2 data = bz2.decompress(data) elif ctag == CompressAlgo.LZMA: import lzma data = lzma.decompress(data) elif ctag == CompressAlgo.ZLIB: import zlib data = zlib.decompress(data) elif ctag == CompressAlgo.GZIP: import gzip data = gzip.decompress(data) return data @property def as_pb_object(self) -> 'jina_pb2.RequestProto': """ Cast ``self`` to a :class:`jina_pb2.RequestProto`. This will trigger :attr:`is_used`. Laziness will be broken and serialization will be recomputed when calling :meth:`SerializeToString`. """ if self._request: # if request is already given while init self.is_used = True return self._request else: # if not then build one from buffer r = jina_pb2.RequestProto() _buffer = self._decompress(self._buffer, self._envelope.compression.algorithm if self._envelope else None) r.ParseFromString(_buffer) self.is_used = True self._request = r # # Though I can modify back the envelope, not sure if it is a good design: # # My intuition is: if the content is changed dramatically, e.g. from index to control request, # # then whatever writes on the envelope should be dropped # # depreciated. The only reason to reuse the envelope is saving cost on Envelope(), which is # # really a minor minor (and evil) optimization. # if self._envelope: # self._envelope.request_type = getattr(r, r.WhichOneof('body')).__class__.__name__ return r
[docs] def SerializeToString(self): if self.is_used: return self.as_pb_object.SerializeToString() else: # no touch, skip serialization, return original return self._buffer
@property def queryset(self) -> 'QueryLangSet': self.is_used = True return QueryLangSet(self.as_pb_object.queryset) @property def command(self) -> str: self.is_used = True return jina_pb2.RequestProto.ControlRequestProto.Command.Name(self.as_pb_object.control.command)
[docs] def to_json(self) -> str: """Return the object in JSON string """ return MessageToJson(self._request)
[docs] def to_response(self) -> 'Response': """Return a weak reference of this object but as :class:`Response` object. It gives a more consistent semantics on the client. """ return Response(self._buffer)
[docs]class Response(Request): """Response is the :class:`Request` object returns from the flow. Right now it shares the same representation as :class:`Request`. At 0.8.12, :class:`Response` is a simple alias. But it does give a more consistent semantic on the client API: send a :class:`Request` and receive a :class:`Response`. """