Source code for jina.proto.serializer

from typing import List, Union, Iterable

from jina.proto import jina_pb2
from jina.types.request.control import ControlRequest
from jina.types.request.data import DataRequest


[docs]class ControlRequestProto: """This class is a drop-in replacement for gRPC default serializer. It replace default serializer to make sure we always work with `Request` """
[docs] @staticmethod def SerializeToString(x: 'ControlRequest'): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ return x.proto.SerializePartialToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ proto = jina_pb2.ControlRequestProto() proto.ParseFromString(x) return ControlRequest(request=proto)
[docs]class DataRequestProto: """This class is a drop-in replacement for gRPC default serializer. It replace default serializer to make sure we always work with `Request` """
[docs] @staticmethod def SerializeToString(x: 'DataRequest'): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ if not x.is_decompressed: return x.buffer return x.proto.SerializePartialToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ return DataRequest(x)
[docs]class DataRequestListProto: """This class is a drop-in replacement for gRPC default serializer. It replace default serializer to make sure the message sending interface is convenient. It can handle sending single messages or a list of messages. It also returns a list of messages. Effectively this is hiding MessageListProto from the consumer """
[docs] @staticmethod def SerializeToString(x: 'Union[List[DataRequest], DataRequest]'): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ protos = [] if not isinstance(x, Iterable): protos.append(x.proto) else: for r in x: protos.append(r.proto) return jina_pb2.DataRequestListProto(requests=protos).SerializeToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ rlp = jina_pb2.DataRequestListProto() rlp.ParseFromString(x) requests = [] for request in rlp.requests: requests.append(DataRequest.from_proto(request)) return requests