Source code for jina.types.document

import base64
import os
import urllib.parse
import urllib.request
from hashlib import blake2b
from typing import Union, Dict, Optional, TypeVar, Any, Callable, Sequence, Tuple

from google.protobuf import json_format
from google.protobuf.field_mask_pb2 import FieldMask

from .converters import *
from .uid import *
from ..ndarray.generic import NdArray
from ..score import NamedScore
from ..sets.chunk import ChunkSet
from ..sets.match import MatchSet
from ...excepts import BadDocType
from ...helper import is_url, typename
from ...importer import ImportExtensions
from ...proto import jina_pb2

__all__ = ['Document', 'DocumentContentType', 'DocumentSourceType']

DocumentContentType = TypeVar('DocumentContentType', bytes, str, np.ndarray)
DocumentSourceType = TypeVar('DocumentSourceType',
                             jina_pb2.DocumentProto, bytes, str, Dict)


[docs]class Document: """ :class:`Document` is one of the **primitive data type** in Jina. It offers a Pythonic interface to allow users access and manipulate :class:`jina.jina_pb2.DocumentProto` object without working with Protobuf itself. To create a :class:`Document` object, simply: .. highlight:: python .. code-block:: python from jina import Document d = Document() d.text = 'abc' Jina requires each Document to have a string id. You can set a custom one, or if non has been set a random one will be assigned. Or you can use :class:`Document` as a context manager: .. highlight:: python .. code-block:: python with Document() as d: d.text = 'hello' assert d.id # now `id` has value To access and modify the content of the document, you can use :attr:`text`, :attr:`blob`, and :attr:`buffer`. Each property is implemented with proper setter, to improve the integrity and user experience. For example, assigning ``doc.blob`` or ``doc.embedding`` can be simply done via: .. highlight:: python .. code-block:: python import numpy as np # to set as content d.content = np.random.random([10, 5]) # to set as embedding d.embedding = np.random.random([10, 5]) MIME type is auto set/guessed when setting :attr:`content` and :attr:`uri` :class:`Document` also provides multiple way to build from existing Document. You can build :class:`Document` from ``jina_pb2.DocumentProto``, ``bytes``, ``str``, and ``Dict``. You can also use it as view (i.e. weak reference when building from an existing ``jina_pb2.DocumentProto``). For example, .. highlight:: python .. code-block:: python a = DocumentProto() b = Document(a, copy=False) a.text = 'hello' assert b.text == 'hello' You can leverage the :meth:`convert_a_to_b` interface to convert between content forms. """ def __init__(self, document: Optional[DocumentSourceType] = None, copy: bool = False, **kwargs): """ :param document: the document to construct from. If ``bytes`` is given then deserialize a :class:`DocumentProto`; ``dict`` is given then parse a :class:`DocumentProto` from it; ``str`` is given, then consider it as a JSON string and parse a :class:`DocumentProto` from it; finally, one can also give `DocumentProto` directly, then depending on the ``copy``, it builds a view or a copy from it. :param copy: when ``document`` is given as a :class:`DocumentProto` object, build a view (i.e. weak reference) from it or a deep copy from it. :param kwargs: other parameters to be set """ self._document = jina_pb2.DocumentProto() try: if isinstance(document, jina_pb2.DocumentProto): if copy: self._document.CopyFrom(document) else: self._document = document elif isinstance(document, dict): json_format.ParseDict(document, self._document) elif isinstance(document, str): json_format.Parse(document, self._document) elif isinstance(document, bytes): # directly parsing from binary string gives large false-positive # fortunately protobuf throws a warning when the parsing seems go wrong # the context manager below converts this warning into exception and throw it # properly with warnings.catch_warnings(): warnings.filterwarnings('error', 'Unexpected end-group tag', category=RuntimeWarning) try: self._document.ParseFromString(document) except RuntimeWarning as ex: raise BadDocType(f'fail to construct a document from {document}') from ex elif isinstance(document, Document): if copy: self._document.CopyFrom(document.as_pb_object) else: self._document = document.as_pb_object elif document is not None: # note ``None`` is not considered as a bad type raise ValueError(f'{typename(document)} is not recognizable') except Exception as ex: raise BadDocType(f'fail to construct a document from {document}, ' f'if you are trying to set the content ' f'you may use "Document(content=your_content)"') from ex if self._document.id is None or not self._document.id: import random self.id = random.randint(0, np.iinfo(np.int64).max) self.set_attrs(**kwargs) def __getattr__(self, name: str): return getattr(self._document, name) def __str__(self): return f'{self.as_pb_object}' @property def length(self) -> int: # TODO(Han): rename this to siblings as this shadows the built-in `length` return self._document.length @length.setter def length(self, value: int): self._document.length = value @property def weight(self) -> float: """Returns the weight of the document """ return self._document.weight @weight.setter def weight(self, value: float): """Set the weight of the document :param value: the float weight of the document. """ self._document.weight = value @property def modality(self) -> str: """Get the modality of the document """ return self._document.modality @modality.setter def modality(self, value: str): """Set the modality of the document""" self._document.modality = value @property def content_hash(self): return self._document.content_hash
[docs] def update_content_hash(self, exclude_fields: Optional[Tuple[str]] = ( 'id', 'chunks', 'matches', 'content_hash', 'parent_id'), include_fields: Optional[Tuple[str]] = None) -> None: """Update the document hash according to its content. :param exclude_fields: a tuple of field names that excluded when computing content hash :param include_fields: a tuple of field names that included when computing content hash .. note:: "exclude_fields" and "include_fields" are mutually exclusive, use one only """ masked_d = jina_pb2.DocumentProto() masked_d.CopyFrom(self._document) empty_doc = jina_pb2.DocumentProto() if include_fields and exclude_fields: raise ValueError('"exclude_fields" and "exclude_fields" are mutually exclusive, use one only') if include_fields is not None: FieldMask(paths=include_fields).MergeMessage(masked_d, empty_doc) masked_d = empty_doc elif exclude_fields is not None: FieldMask(paths=exclude_fields).MergeMessage(empty_doc, masked_d, replace_repeated_field=True) self._document.content_hash = blake2b(masked_d.SerializeToString(), digest_size=uid._digest_size).hexdigest()
@property def id(self) -> 'UniqueId': """The document id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable. it will be used as the major view. """ return UniqueId(self._document.id) @property def parent_id(self) -> 'UniqueId': """The document's parent id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable. it will be used as the major view. """ return UniqueId(self._document.parent_id) @id.setter def id(self, value: Union[bytes, str, int]): """Set document id to a string value .. note: Customized ``id`` is acceptable as long as - it only contains the symbols "0"–"9" to represent values 0 to 9, and "A"–"F" (or alternatively "a"–"f"). - it has 16 chars described above. :param value: restricted string value :return: """ self._document.id = UniqueId(value) @parent_id.setter def parent_id(self, value: Union[bytes, str, int]): """Set document's parent id to a string value .. note: Customized ``id`` is acceptable as long as - it only contains the symbols "0"–"9" to represent values 0 to 9, and "A"–"F" (or alternatively "a"–"f"). - it has 16 chars described above. :param value: restricted string value :return: """ self._document.parent_id = UniqueId(value) @property def blob(self) -> 'np.ndarray': """Return ``blob``, one of the content form of a Document. .. note:: Use :attr:`content` to return the content of a Document """ return NdArray(self._document.blob).value @blob.setter def blob(self, value: Union['np.ndarray', 'jina_pb2.NdArrayProto', 'NdArray']): self._update_ndarray('blob', value) @property def embedding(self) -> 'np.ndarray': """Return ``embedding`` of the content of a Document. """ return NdArray(self._document.embedding).value @embedding.setter def embedding(self, value: Union['np.ndarray', 'jina_pb2.NdArrayProto', 'NdArray']): self._update_ndarray('embedding', value) def _update_ndarray(self, k, v): if isinstance(v, jina_pb2.NdArrayProto): getattr(self._document, k).CopyFrom(v) elif isinstance(v, np.ndarray): NdArray(getattr(self._document, k)).value = v elif isinstance(v, NdArray): NdArray(getattr(self._document, k)).is_sparse = v.is_sparse NdArray(getattr(self._document, k)).value = v.value else: raise TypeError(f'{k} is in unsupported type {typename(v)}') @property def matches(self) -> 'MatchSet': """Get all matches of the current document """ return MatchSet(self._document.matches, reference_doc=self) @property def chunks(self) -> 'ChunkSet': """Get all chunks of the current document """ return ChunkSet(self._document.chunks, reference_doc=self)
[docs] def set_attrs(self, **kwargs): """Bulk update Document fields with key-value specified in kwargs .. seealso:: :meth:`get_attrs` for bulk get attributes """ for k, v in kwargs.items(): if isinstance(v, list) or isinstance(v, tuple): self._document.ClearField(k) getattr(self._document, k).extend(v) elif isinstance(v, dict): self._document.ClearField(k) getattr(self._document, k).update(v) else: if hasattr(Document, k) and isinstance(getattr(Document, k), property) and getattr(Document, k).fset: # if class property has a setter setattr(self, k, v) elif hasattr(self._document, k): # no property setter, but proto has this attribute so fallback to proto setattr(self._document, k, v) else: raise AttributeError(f'{k} is not recognized')
[docs] def get_attrs(self, *args) -> Dict[str, Any]: """Bulk fetch Document fields and return a dict of the key-value pairs .. seealso:: :meth:`update` for bulk set/update attributes """ return {k: getattr(self, k) for k in args if hasattr(self, k)}
@property def as_pb_object(self) -> 'jina_pb2.DocumentProto': return self._document @property def buffer(self) -> bytes: """Return ``buffer``, one of the content form of a Document. .. note:: Use :attr:`content` to return the content of a Document """ return self._document.buffer @buffer.setter def buffer(self, value: bytes): self._document.buffer = value if value: with ImportExtensions(required=False, pkg_name='python-magic', help_text=f'can not sniff the MIME type ' f'MIME sniffing requires brew install ' f'libmagic (Mac)/ apt-get install libmagic1 (Linux)'): import magic self._document.mime_type = magic.from_buffer(value, mime=True) @property def text(self): """Return ``text``, one of the content form of a Document. .. note:: Use :attr:`content` to return the content of a Document """ return self._document.text @text.setter def text(self, value: str): self._document.text = value self.mime_type = 'text/plain' @property def uri(self) -> str: return self._document.uri @uri.setter def uri(self, value: str): """Set the URI of the document .. note:: :attr:`mime_type` will be updated accordingly :param value: acceptable URI/URL, raise ``ValueError`` when it is not a valid URI :return: """ scheme = urllib.parse.urlparse(value).scheme if ((scheme in {'http', 'https'} and is_url(value)) or (scheme in {'data'}) or os.path.exists(value) or os.access(os.path.dirname(value), os.W_OK)): self._document.uri = value self.mime_type = guess_mime(value) else: raise ValueError(f'{value} is not a valid URI') @property def mime_type(self) -> str: """Get MIME type of the document""" return self._document.mime_type @mime_type.setter def mime_type(self, value: str): """Set MIME type of the document :param value: the acceptable MIME type, raise ``ValueError`` when MIME type is not recognizable. """ if value in mimetypes.types_map.values(): self._document.mime_type = value elif value: # given but not recognizable, do best guess r = mimetypes.guess_type(f'*.{value}')[0] if r: self._document.mime_type = r else: raise ValueError(f'{value} is not a valid MIME type') def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.update_content_hash() @property def content_type(self) -> str: """Return the content type of the document, possible values: text, blob, buffer""" return self._document.WhichOneof('content') @property def content(self) -> DocumentContentType: """Return the content of the document. It checks whichever field among :attr:`blob`, :attr:`text`, :attr:`buffer` has value and return it. .. seealso:: :attr:`blob`, :attr:`buffer`, :attr:`text` """ attr = self.content_type if attr: return getattr(self, attr) @content.setter def content(self, value: DocumentContentType): """Set the content of the document. It assigns the value to field with the right type. .. seealso:: :attr:`blob`, :attr:`buffer`, :attr:`text` """ if isinstance(value, bytes): self.buffer = value elif isinstance(value, str): # TODO(Han): this implicit fallback is too much but that's # how the original _generate function implement. And a lot of # tests depend on this logic. Stay in this # way to keep all tests passing until I got time to refactor this part try: self.uri = value except ValueError: self.text = value elif isinstance(value, np.ndarray): self.blob = value else: # ``None`` is also considered as bad type raise TypeError(f'{typename(value)} is not recognizable') @property def granularity(self): return self._document.granularity @granularity.setter def granularity(self, granularity_value: int): self._document.granularity = granularity_value @property def score(self): return self._document.score @score.setter def score(self, value: Union[jina_pb2.NamedScoreProto, NamedScore]): if isinstance(value, jina_pb2.NamedScoreProto): self._document.score.CopyFrom(value) elif isinstance(value, NamedScore): self._document.score.CopyFrom(value._score) else: raise TypeError(f'score is in unsupported type {typename(value)}')
[docs] def convert_buffer_to_blob(self, **kwargs): """Assuming the :attr:`buffer` is a _valid_ buffer of Numpy ndarray, set :attr:`blob` accordingly. :param kwargs: reserved for maximum compatibility when using with ConvertDriver .. note:: One can only recover values not shape information from pure buffer. """ self.blob = np.frombuffer(self.buffer)
[docs] def convert_blob_to_uri(self, width: int, height: int, resize_method: str = 'BILINEAR', **kwargs): """Assuming :attr:`blob` is a _valid_ image, set :attr:`uri` accordingly""" png_bytes = png_to_buffer(self.blob, width, height, resize_method) self.uri = 'data:image/png;base64,' + base64.b64encode(png_bytes).decode()
[docs] def convert_uri_to_buffer(self, **kwargs): """Convert uri to buffer Internally it downloads from the URI and set :attr:`buffer`. :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ if urllib.parse.urlparse(self.uri).scheme in {'http', 'https', 'data'}: req = urllib.request.Request(self.uri, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req) as fp: self.buffer = fp.read() elif os.path.exists(self.uri): with open(self.uri, 'rb') as fp: self.buffer = fp.read() else: raise FileNotFoundError(f'{self.uri} is not a URL or a valid local path')
[docs] def convert_uri_to_data_uri(self, charset: str = 'utf-8', base64: bool = False, **kwargs): """ Convert uri to data uri. Internally it reads uri into buffer and convert it to data uri :param charset: charset may be any character set registered with IANA :param base64: used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters. :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ self.convert_uri_to_buffer() self.uri = to_datauri(self.mime_type, self.buffer, charset, base64, binary=True)
[docs] def convert_buffer_to_uri(self, charset: str = 'utf-8', base64: bool = False, **kwargs): """ Convert buffer to data uri. Internally it first reads into buffer and then converts it to data URI. :param charset: charset may be any character set registered with IANA :param base64: used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters. :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ if not self.mime_type: raise ValueError(f'{self.mime_type} is unset, can not convert it to data uri') self.uri = to_datauri(self.mime_type, self.buffer, charset, base64, binary=True)
[docs] def convert_text_to_uri(self, charset: str = 'utf-8', base64: bool = False, **kwargs): """ Convert text to data uri. :param charset: charset may be any character set registered with IANA :param base64: used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters. :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ self.uri = to_datauri(self.mime_type, self.text, charset, base64, binary=False)
[docs] def convert_uri_to_text(self, **kwargs): """Assuming URI is text, convert it to text :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ self.convert_uri_to_buffer() self.text = self.buffer.decode()
[docs] def convert_content_to_uri(self, **kwargs): """Convert content in URI with best effort :param kwargs: reserved for maximum compatibility when using with ConvertDriver """ if self.text: self.convert_text_to_uri() elif self.buffer: self.convert_buffer_to_uri() elif self.content_type: raise NotImplementedError
[docs] def MergeFrom(self, doc: 'Document'): self._document.MergeFrom(doc.as_pb_object)
[docs] def CopyFrom(self, doc: 'Document'): self._document.CopyFrom(doc.as_pb_object)
[docs] def traverse(self, traversal_path: str, callback_fn: Callable, *args, **kwargs) -> None: """Traverse leaves of the document.""" from ..sets import DocumentSet self._traverse_rec(DocumentSet([self]), None, None, traversal_path, callback_fn, *args, **kwargs)
def _traverse_rec(self, docs: Sequence['Document'], parent_doc: Optional['Document'], parent_edge_type: Optional[str], traversal_path: str, callback_fn: Callable, *args, **kwargs): if traversal_path: next_edge = traversal_path[0] for doc in docs: if next_edge == 'm': self._traverse_rec( doc.matches, doc, 'matches', traversal_path[1:], callback_fn, *args, **kwargs ) elif next_edge == 'c': self._traverse_rec( doc.chunks, doc, 'chunks', traversal_path[1:], callback_fn, *args, **kwargs ) else: raise ValueError(f'"{next_edge}" in "{traversal_path}" is not a valid traversal path') else: for d in docs: callback_fn(d, parent_doc, parent_edge_type, *args, **kwargs)