Source code for jina.types.request.status
from typing import Dict, Optional, TypeVar
from google.protobuf import json_format
from jina.excepts import BadRequestType
from jina.helper import typename
from jina.proto import jina_pb2
from jina.types.mixin import ProtoTypeMixin
StatusSourceType = TypeVar('StatusSourceType', jina_pb2.StatusProto, str, Dict, bytes)
[docs]class StatusMessage(ProtoTypeMixin):
"""Represents a Status message used for health check of the Flow"""
def __init__(
self,
status_object: Optional[StatusSourceType] = None,
):
self._pb_body = jina_pb2.StatusProto()
try:
if isinstance(status_object, jina_pb2.StatusProto):
self._pb_body = status_object
elif isinstance(status_object, dict):
json_format.ParseDict(status_object, self._pb_body)
elif isinstance(status_object, str):
json_format.Parse(status_object, self._pb_body)
elif isinstance(status_object, bytes):
self._pb_body.ParseFromString(status_object)
elif status_object is not None:
# note ``None`` is not considered as a bad type
raise ValueError(f'{typename(status_object)} is not recognizable')
else:
self._pb_body = jina_pb2.StatusProto()
except Exception as ex:
raise BadRequestType(
f'fail to construct a {self.__class__} object from {status_object}'
) from ex
[docs] def set_exception(self, ex: Exception):
"""Set exception information into the Status Message
:param ex: The Exception to be filled
"""
import traceback
self.proto.code = jina_pb2.StatusProto.ERROR
self.proto.description = repr(ex)
self.proto.exception.name = ex.__class__.__name__
self.proto.exception.args.extend([str(v) for v in ex.args])
self.proto.exception.stacks.extend(
traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)
)
[docs] def set_code(self, code):
"""Set the code of the Status Message
:param code: The code to be added
"""
self.proto.code = code