Source code for jina.proto.serializer

import os
from typing import Iterable, List, Union

from jina.proto import jina_pb2
from jina.types.request.data import DataRequest


[docs]class DataRequestProto: """This class is a drop-in replacement for gRPC default serializer. It replace default serializer to make sure we always work with `Request` """
[docs] @staticmethod def SerializeToString(x: 'DataRequest'): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ if not x.is_decompressed: r = x.buffer else: r = x.proto.SerializePartialToString() os.environ['JINA_GRPC_SEND_BYTES'] = str( len(r) + int(os.environ.get('JINA_GRPC_SEND_BYTES', 0)) ) return r
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ os.environ['JINA_GRPC_RECV_BYTES'] = str( len(x) + int(os.environ.get('JINA_GRPC_RECV_BYTES', 0)) ) return DataRequest(x)
[docs]class DataRequestListProto: """This class is a drop-in replacement for gRPC default serializer. It replaces default serializer to make sure the message sending interface is convenient. It can handle sending single messages or a list of messages. It also returns a list of messages. Effectively this is hiding MessageListProto from the consumer """
[docs] @staticmethod def SerializeToString(x: 'Union[List[DataRequest], DataRequest]'): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ protos = [] if not isinstance(x, Iterable): protos.append(x.proto_with_data) else: protos = [r.proto_with_data for r in x] return jina_pb2.DataRequestListProto(requests=protos).SerializeToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ rlp = jina_pb2.DataRequestListProto() rlp.ParseFromString(x) return [DataRequest.from_proto(request) for request in rlp.requests]
[docs]class EndpointsProto: """Since the serializer is replacing the `jina_pb2 to know how to exactly serialize messages, this is just a placeholder that delegates the serializing and deserializing to the internal protobuf structure with no extra optimization. """
[docs] @staticmethod def SerializeToString(x): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ return x.SerializeToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ ep = jina_pb2.EndpointsProto() ep.ParseFromString(x) return ep
[docs]class StatusProto: """Since the serializer is replacing the `jina_pb2 to know how to exactly serialize messages, this is just a placeholder that delegates the serializing and deserializing to the internal protobuf structure with no extra optimization. """
[docs] @staticmethod def SerializeToString(x): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ return x.SerializeToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ sp = jina_pb2.StatusProto() sp.ParseFromString(x) return sp
[docs]class JinaInfoProto: """Since the serializer is replacing the `jina_pb2` to know how to exactly serialize messages, this is just a placeholder that delegates the serializing and deserializing to the internal protobuf structure with no extra optimization. """
[docs] @staticmethod def SerializeToString(x): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ return x.SerializeToString()
[docs] @staticmethod def FromString(x: bytes): """ # noqa: DAR101 # noqa: DAR102 # noqa: DAR201 """ ip = jina_pb2.JinaInfoProto() ip.ParseFromString(x) return ip