jina package

Subpackages

Submodules

Module contents

Top-level module of Jina.

The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.

class jina.AsyncFlow(*, asyncio: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", https: Optional[bool] = 'False', port: Optional[int] = 'None', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', **kwargs)[source]
class jina.AsyncFlow(*, compress: Optional[str] = "'NONE'", compress_min_bytes: Optional[int] = '1024', compress_min_ratio: Optional[float] = '1.1', cors: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'True', daemon: Optional[bool] = 'False', default_swagger_ui: Optional[bool] = 'False', description: Optional[str] = 'None', env: Optional[dict] = 'None', expose_endpoints: Optional[str] = 'None', expose_public: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = "'gateway'", native: Optional[bool] = 'False', no_crud_endpoints: Optional[bool] = 'False', no_debug_endpoints: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", port_ctrl: Optional[int] = 'None', port_expose: Optional[int] = 'None', port_in: Optional[int] = 'None', port_out: Optional[int] = 'None', prefetch: Optional[int] = '0', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'GRPCRuntime'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_CONNECT'", socket_out: Optional[str] = "'PUSH_CONNECT'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', title: Optional[str] = 'None', uses: Optional[Union[str, Type[BaseExecutor], dict]] = "'BaseExecutor'", uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', uvicorn_kwargs: Optional[dict] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs)
class jina.AsyncFlow(*, env: Optional[dict] = 'None', inspect: Optional[str] = "'COLLECT'", log_config: Optional[str] = 'None', name: Optional[str] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', static_routing_table: Optional[bool] = 'False', uses: Optional[str] = 'None', workspace: Optional[str] = "'./'", **kwargs)

Bases: jina.clients.mixin.AsyncPostMixin, jina.flow.base.Flow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
from jina.types.document.generators import from_ndarray
import numpy as np

with AsyncFlow().add() as f:
    await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

jina.Client(args=None, **kwargs)[source]

Jina Python client.

Parameters
  • args (Optional[ForwardRef]) – Namespace args.

  • kwargs – Additional arguments.

Return type

Union[ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef]

Returns

An instance of GRPCClient or WebSocketClient.

class jina.Document(*, adjacency: Optional[int] = 'None', blob: Optional[Union[ArrayType, jina_pb2.NdArrayProto, NdArray]] = 'None', buffer: Optional[bytes] = 'None', chunks: Optional[Iterable[Document]] = 'None', content: Optional[jina.types.document.DocumentContentType] = 'None', embedding: Optional[Union[ArrayType, jina_pb2.NdArrayProto, NdArray]] = 'None', granularity: Optional[int] = 'None', id: Optional[str] = 'None', matches: Optional[Iterable[Document]] = 'None', mime_type: Optional[str] = 'None', modality: Optional[str] = 'None', parent_id: Optional[str] = 'None', tags: Optional[Union[Dict, jina.types.struct.StructView]] = 'None', text: Optional[str] = 'None', uri: Optional[str] = 'None', weight: Optional[float] = 'None', **kwargs)[source]

Bases: jina.types.mixin.ProtoTypeMixin, jina.types.document.helper.VersionedMixin

Document is one of the primitive data type in Jina.

It offers a Pythonic interface to allow users access and manipulate jina.jina_pb2.DocumentProto object without working with Protobuf itself.

To create a Document object, simply:

from jina import Document
d = Document()
d.text = 'abc'

Jina requires each Document to have a string id. You can set a custom one, or if non has been set a random one will be assigned.

To access and modify the content of the document, you can use text, blob, and buffer. Each property is implemented with proper setter, to improve the integrity and user experience. For example, assigning doc.blob or doc.embedding can be simply done via:

import numpy as np

# to set as content
d.content = np.random.random([10, 5])

# to set as embedding
d.embedding = np.random.random([10, 5])

MIME type is auto set/guessed when setting content and uri

Document also provides multiple way to build from existing Document. You can build Document from jina_pb2.DocumentProto, bytes, str, and Dict. You can also use it as view (i.e. weak reference when building from an existing jina_pb2.DocumentProto). For example,

a = DocumentProto()
b = Document(a, copy=False)
a.text = 'hello'
assert b.text == 'hello'

You can leverage the convert_a_to_b() interface to convert between content forms.

Parameters
  • document (Optional[~DocumentSourceType]) – the document to construct from. If bytes is given then deserialize a DocumentProto; dict is given then parse a DocumentProto from it; str is given, then consider it as a JSON string and parse a DocumentProto from it; finally, one can also give DocumentProto directly, then depending on the copy, it builds a view or a copy from it.

  • copy (bool) – when document is given as a DocumentProto object, build a view (i.e. weak reference) from it or a deep copy from it.

  • field_resolver (Optional[Dict[str, str]]) – a map from field names defined in document (JSON, dict) to the field names defined in Protobuf. This is only used when the given document is a JSON string or a Python dict.

  • kwargs – other parameters to be set _after_ the document is constructed

Note

When document is a JSON string or Python dictionary object, the constructor will only map the values from known fields defined in Protobuf, all unknown fields are mapped to document.tags. For example,

d = Document({'id': '123', 'hello': 'world', 'tags': {'good': 'bye'}})

assert d.id == '123'  # true
assert d.tags['hello'] == 'world'  # true
assert d.tags['good'] == 'bye'  # true
ON_GETATTR: List = ['matches', 'chunks']
pop(*fields)[source]

Remove the values from the given fields of this Document.

Parameters

fields – field names

Return type

None

clear()[source]

Remove all values from all fields of this Document.

Return type

None

property weight: float
Return type

float

Returns

the weight of the document

property modality: str
Return type

str

Returns

the modality of the document.

property tags: jina.types.struct.StructView

Return the tags field of this Document as a Python dict

Return type

StructView

Returns

a Python dict view of the tags.

update(source, fields=None)[source]

Updates fields specified in fields from the source to current Document.

Parameters
  • source (Document) – The Document we want to update from as source. The current Document is referred as destination.

  • fields (Optional[List[str]]) – a list of field names that we want to update, if not specified, use all present fields in source.

Note

*. if fields are empty, then all present fields in source will be merged into current document. * tags will be updated like a python dict. *. the current Document will be modified in place, source will be unchanged. *. if current document has more fields than source, these extra fields wll be preserved.

Return type

None

property content_hash: str

Get the document hash according to its content.

Return type

str

Returns

the unique hash code to represent this Document

property id: str

The document id in string.

Return type

str

Returns

the id of this Document

property parent_id: str

The document’s parent id in string.

Return type

str

Returns

the parent id of this Document

get_sparse_blob(sparse_ndarray_cls_type, **kwargs)[source]

Return blob of the content of a Document as an sparse array.

Parameters
  • sparse_ndarray_cls_type (Type[BaseSparseNdArray]) – Sparse class type, such as SparseNdArray.

  • kwargs – Additional key value argument, for scipy backend, we need to set the keyword sp_format as one of the scipy supported sparse format, such as coo or csr.

Return type

SparseArrayType

Returns

the blob of this Document but as an sparse array

property blob: ArrayType

Return blob, one of the content form of a Document.

Note

Use content to return the content of a Document

This property will return the blob of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the blob stored is sparse, it will return them as a coo matrix. If any other type of sparse type is desired, use the :meth:`get_sparse_blob.

Return type

ArrayType

Returns

the blob content of thi Document

get_sparse_embedding(sparse_ndarray_cls_type, **kwargs)[source]

Return embedding of the content of a Document as an sparse array.

Parameters
  • sparse_ndarray_cls_type (Type[BaseSparseNdArray]) – Sparse class type, such as SparseNdArray.

  • kwargs – Additional key value argument, for scipy backend, we need to set the keyword sp_format as one of the scipy supported sparse format, such as coo or csr.

Return type

SparseArrayType

Returns

the embedding of this Document but as as an sparse array

property embedding: SparseArrayType

Return embedding of the content of a Document.

Note

This property will return the embedding of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the embedding stored is sparse, it will return them as a coo matrix. If any other type of sparse type is desired, use the :meth:`get_sparse_embedding.

Return type

SparseArrayType

Returns

the embedding of this Document

property matches: MatchArray

Get all matches of the current document.

Return type

MatchArray

Returns

the array of matches attached to this document

property chunks: ChunkArray

Get all chunks of the current document.

Return type

ChunkArray

Returns

the array of chunks of this document

set_attributes(**kwargs)[source]

Bulk update Document fields with key-value specified in kwargs

See also

get_attributes() for bulk get attributes

Parameters

kwargs – the keyword arguments to set the values, where the keys are the fields to set

get_attributes(*fields)[source]

Bulk fetch Document fields and return a list of the values of these fields

Note

Arguments will be extracted using dunder_get .. highlight:: python .. code-block:: python

d = Document({‘id’: ‘123’, ‘hello’: ‘world’, ‘tags’: {‘id’: ‘external_id’, ‘good’: ‘bye’}})

assert d.id == ‘123’ # true assert d.tags[‘hello’] == ‘world’ # true assert d.tags[‘good’] == ‘bye’ # true assert d.tags[‘id’] == ‘external_id’ # true

res = d.get_attrs_values(*[‘id’, ‘tags__hello’, ‘tags__good’, ‘tags__id’])

assert res == [‘123’, ‘world’, ‘bye’, ‘external_id’]

Parameters

fields (str) – the variable length values to extract from the document

Return type

Union[Any, List[Any]]

Returns

a list with the attributes of this document ordered as the args

property buffer: bytes

Return buffer, one of the content form of a Document.

Note

Use content to return the content of a Document

Return type

bytes

Returns

the buffer bytes from this document

property text

Return text, one of the content form of a Document.

Note

Use content to return the content of a Document

Returns

the text from this document content

property uri: str

Return the URI of the document.

Return type

str

Returns

the uri of this Document

property mime_type: str

Get MIME type of the document

Return type

str

Returns

the mime_type of this Document

property content_type: str

Return the content type of the document, possible values: text, blob, buffer

Return type

str

Returns

the type of content of this Document

property content: jina.types.document.DocumentContentType

Return the content of the document. It checks whichever field among blob, text, buffer has value and return it.

See also

blob, buffer, text

Return type

~DocumentContentType

Returns

the value of the content depending on :meth:`content_type

property granularity

Return the granularity of the document.

Returns

the granularity of this Document

property adjacency

Return the adjacency of the document.

Returns

the adjacency of this Document

property scores

Return the scores of the document.

Returns

the scores attached to this document as :class:NamedScoreMapping

property evaluations

Return the evaluations of the document.

Returns

the evaluations attached to this document as :class:NamedScoreMapping

convert_image_buffer_to_blob(color_axis=- 1)[source]

Convert an image buffer to blob

Parameters

color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

convert_image_blob_to_uri(width=None, height=None, resize_method='BILINEAR', color_axis=- 1)[source]

Assuming blob is a _valid_ image, set uri accordingly :type width: Optional[int] :param width: the width of the blob, if None, interpret from blob shape. :type height: Optional[int] :param height: the height of the blob, if None, interpret from blob shape. :type resize_method: str :param resize_method: the resize method name :type color_axis: int :param color_axis: the axis id of the color channel, -1 indicates the color channel info at the last axis

..note::

if both width and height were provided, will not resize. Otherwise, will get image size by self.blob shape and apply resize method resize_method.

convert_image_uri_to_blob(color_axis=- 1, uri_prefix=None)[source]

Convert uri to blob

Parameters
  • color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • uri_prefix (Optional[str]) – the prefix of the uri

convert_image_datauri_to_blob(color_axis=- 1)[source]

Convert data URI to image blob

Parameters

color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

convert_buffer_to_blob(dtype=None, count=- 1, offset=0)[source]

Assuming the buffer is a _valid_ buffer of Numpy ndarray, set blob accordingly.

Parameters
  • dtype – Data-type of the returned array; default: float.

  • count – Number of items to read. -1 means all data in the buffer.

  • offset – Start reading the buffer from this offset (in bytes); default: 0.

Note

One can only recover values not shape information from pure buffer.

convert_blob_to_buffer()[source]

Convert blob to buffer

convert_uri_to_buffer()[source]

Convert uri to buffer Internally it downloads from the URI and set buffer.

convert_uri_to_datauri(charset='utf-8', base64=False)[source]

Convert uri to data uri. Internally it reads uri into buffer and convert it to data uri

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_buffer_to_uri(charset='utf-8', base64=False)[source]

Convert buffer to data uri. Internally it first reads into buffer and then converts it to data URI.

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_text_to_uri(charset='utf-8', base64=False)[source]

Convert text to data uri.

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_uri_to_text()[source]

Assuming URI is text, convert it to text

convert_content_to_uri()[source]

Convert content in URI with best effort

MergeFrom(doc)[source]

Merge the content of target

Parameters

doc (Document) – the document to merge from

CopyFrom(doc)[source]

Copy the content of target

Parameters

doc (Document) – the document to copy from

plot(output=None, inline_display=False)[source]

Visualize the Document recursively.

Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • inline_display (bool) – show image directly inside the Jupyter Notebook

Return type

None

dict(prettify_ndarrays=False, *args, **kwargs)[source]

Return the object in Python dictionary

Parameters
  • prettify_ndarrays – boolean indicating if the ndarrays need to be prettified to be shown as lists of values

  • args – Extra positional arguments

  • kwargs – Extra keyword arguments

Returns

dict representation of the object

json(prettify_ndarrays=False, *args, **kwargs)[source]

Return the object in JSON string

Parameters
  • prettify_ndarrays – boolean indicating if the ndarrays need to be prettified to be shown as lists of values

  • args – Extra positional arguments

  • kwargs – Extra keyword arguments

Returns

JSON string of the object

property non_empty_fields: Tuple[str]

Return the set fields of the current document that are not empty

Return type

Tuple[str]

Returns

the tuple of non-empty fields

static attributes(include_proto_fields=True, include_proto_fields_camelcase=False, include_properties=False)[source]

Return all attributes supported by the Document, which can be accessed by doc.attribute

Parameters
  • include_proto_fields (bool) – if set, then include all protobuf fields

  • include_proto_fields_camelcase (bool) – if set, then include all protobuf fields in CamelCase

  • include_properties (bool) – if set, then include all properties defined for Document class

Return type

List[str]

Returns

a list of attributes in string.

class jina.DocumentArray(docs=None)[source]

Bases: jina.types.arrays.traversable.TraversableSequence, collections.abc.MutableSequence, jina.types.arrays.document.DocumentArrayGetAttrMixin, jina.types.arrays.neural_ops.DocumentArrayNeuralOpsMixin, jina.types.arrays.search_ops.DocumentArraySearchOpsMixin, collections.abc.Iterable, jina.types.arrays.abstract.AbstractDocumentArray

DocumentArray is a mutable sequence of Document. It gives an efficient view of a list of Document. One can iterate over it like a generator but ALSO modify it, count it, get item, or union two ‘DocumentArray’s using the ‘+’ and ‘+=’ operators.

It is supposed to act as a view containing a pointer to a RepeatedContainer of DocumentProto while offering Document Jina native types when getting items or iterating over it

Parameters

docs (Optional[~DocumentArraySourceType]) – the document array to construct from. One can also give DocumentArrayProto directly, then depending on the copy, it builds a view or a copy from it. It also can accept a List

insert(index, doc)[source]

Insert :param:`doc.proto` at :param:`index` into the list of :class:`DocumentArray .

Parameters
  • index (int) – Position of the insertion.

  • doc (Document) – The doc needs to be inserted.

Return type

None

append(doc)[source]

Append :param:`doc` in DocumentArray.

Parameters

doc (Document) – The doc needs to be appended.

extend(iterable)[source]

Extend the DocumentArray by appending all the items from the iterable.

Parameters

iterable (Iterable[Document]) – the iterable of Documents to extend this array with

Return type

None

clear()[source]

Clear the data of DocumentArray

reverse()[source]

In-place reverse the sequence.

sort(key, top_k=None, reverse=False)[source]

Sort the items of the DocumentArray in place.

Parameters
  • key (Callable) – key callable to sort based upon

  • top_k (Optional[int]) – make sure that the first topk elements are correctly sorted rather than sorting the entire list

  • reverse (bool) – reverse=True will sort the list in descending order. Default is False

save(file, file_format='json')[source]

Save array elements into a JSON, a binary file or a CSV file.

Parameters
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. Note that, CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.

Return type

None

classmethod load(file, file_format='json')[source]

Load array elements from a JSON or a binary file, or a CSV file.

Parameters
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.

Return type

DocumentArray

Returns

the loaded DocumentArray object

save_binary(file)[source]

Save array elements into a binary file.

Comparing to save_json(), it is faster and the file is smaller, but not human-readable.

Parameters

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type

None

save_json(file)[source]

Save array elements into a JSON file.

Comparing to save_binary(), it is human-readable but slower to save/load and the file size larger.

Parameters

file (Union[str, TextIO]) – File or filename to which the data is saved.

Return type

None

save_csv(file, flatten_tags=True)[source]

Save array elements into a CSV file.

Parameters
  • file (Union[str, TextIO]) – File or filename to which the data is saved.

  • flatten_tags (bool) – if set, then all fields in Document.tags will be flattened into tag__fieldname and stored as separated columns. It is useful when tags contain a lot of information.

Return type

None

classmethod load_json(file)[source]

Load array elements from a JSON file.

Parameters

file (Union[str, TextIO]) – File or filename to which the data is saved.

Return type

DocumentArray

Returns

a DocumentArray object

classmethod load_binary(file)[source]

Load array elements from a binary file.

Parameters

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type

DocumentArray

Returns

a DocumentArray object

classmethod load_csv(file)[source]

Load array elements from a binary file.

Parameters

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type

DocumentArray

Returns

a DocumentArray object

property embeddings: numpy.ndarray

Return a np.ndarray stacking all the embedding attributes as rows.

Warning

This operation assumes all embeddings have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray / DocumentArrayMemmap

Warning

This operation currently does not support sparse arrays.

Return type

ndarray

Returns

embeddings stacked per row as np.ndarray.

property tags: List[jina.types.struct.StructView]

Get the tags attribute of all Documents

Return type

List[StructView]

Returns

List of tags attributes for all Documents

property texts: List[str]

Get the text attribute of all Documents

Return type

List[str]

Returns

List of text attributes for all Documents

property buffers: List[bytes]

Get the buffer attribute of all Documents

Return type

List[bytes]

Returns

List of buffer attributes for all Documents

property blobs: numpy.ndarray

Return a np.ndarray stacking all the blob attributes.

The blob attributes are stacked together along a newly created first dimension (as if you would stack using np.stack(X, axis=0)).

Warning

This operation assumes all blobs have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray / DocumentArrayMemmap

Warning

This operation currently does not support sparse arrays.

Return type

ndarray

Returns

blobs stacked per row as np.ndarray.

class jina.DocumentArrayMemmap(path, key_length=36, buffer_pool_size=1000)[source]

Bases: jina.types.arrays.traversable.TraversableSequence, jina.types.arrays.document.DocumentArrayGetAttrMixin, jina.types.arrays.neural_ops.DocumentArrayNeuralOpsMixin, jina.types.arrays.search_ops.DocumentArraySearchOpsMixin, collections.abc.Iterable, jina.types.arrays.abstract.AbstractDocumentArray

Create a memory-map to an DocumentArray stored in binary files on disk.

Memory-mapped files are used for accessing Document of large DocumentArray on disk, without reading the entire file into memory.

The DocumentArrayMemmap on-disk storage consists of two files:
  • header.bin: stores id, offset, length and boundary info of each Document in body.bin;

  • body.bin: stores Documents continuously

When loading DocumentArrayMemmap, it loads the content of header.bin into memory, while storing all body.bin data on disk. As header.bin is often much smaller than body.bin, memory is saved.

DocumentArrayMemmap also loads a portion of the documents in a memory buffer and keeps the memory documents synced with the disk. This helps ensure that modified documents are persisted to the disk. The memory buffer size is configured with parameter buffer_pool_size which represents the number of documents that the buffer can store.

Note

To make sure the documents you modify are persisted to disk, make sure that the number of referenced documents does not exceed the buffer pool size. Otherwise, they won’t be referenced by the buffer pool and they will not be persisted. The best practice is to always reference documents using DAM.

This class is designed to work similarly as DocumentArray but differs in the following aspects:
  • you can set the attribute of elements in a DocumentArrayMemmap but you need to make sure that you

don’t reference more documents than the buffer pool size - each document

To convert between a DocumentArrayMemmap and a DocumentArray

# convert from DocumentArrayMemmap to DocumentArray
dam = DocumentArrayMemmap('./tmp')
...

da = DocumentArray(dam)

# convert from DocumentArray to DocumentArrayMemmap
dam2 = DocumentArrayMemmap('./tmp')
dam2.extend(da)
reload()[source]

Reload header of this object from the disk.

This function is useful when another thread/process modify the on-disk storage and the change has not been reflected in this DocumentArray object.

This function only reloads the header, not the body.

extend(values)[source]

Extend the DocumentArrayMemmap by appending all the items from the iterable.

Parameters

values (Iterable[Document]) – the iterable of Documents to extend this array with

Return type

None

clear()[source]

Clear the on-disk data of DocumentArrayMemmap

Return type

None

append(doc, flush=True, update_buffer=True)[source]

Append :param:`doc` in DocumentArrayMemmap.

Parameters
  • doc (Document) – The doc needs to be appended.

  • update_buffer (bool) – If set, update the buffer.

  • flush (bool) – If set, then flush to disk on done.

Return type

None

get_doc_by_key(key)[source]

returns a document by key (ID) from disk

Parameters

key (str) – id of the document

Returns

returns a document

save()[source]

Persists memory loaded documents to disk

Return type

None

prune()[source]

Prune deleted Documents from this object, this yields a smaller on-disk storage.

Return type

None

property physical_size: int

Return the on-disk physical size of this DocumentArrayMemmap, in bytes

Return type

int

Returns

the number of bytes

get_attributes(*fields)[source]

Return all nonempty values of the fields from all docs this array contains

Parameters

fields (str) – Variable length argument with the name of the fields to extract

Return type

Union[List, List[List]]

Returns

Returns a list of the values for these fields. When fields has multiple values, then it returns a list of list.

get_attributes_with_docs(*fields)[source]

Return all nonempty values of the fields together with their nonempty docs

Parameters

fields (str) – Variable length argument with the name of the fields to extract

Return type

Tuple[Union[List, List[List]], DocumentArray]

Returns

Returns a tuple. The first element is a list of the values for these fields. When fields has multiple values, then it returns a list of list. The second element is the non-empty docs.

property embeddings: numpy.ndarray

Return a np.ndarray stacking all the embedding attributes as rows.

Return type

ndarray

Returns

embeddings stacked per row as np.ndarray.

Warning

This operation assumes all embeddings have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray / DocumentArrayMemmap.

Warning

This operation currently does not support sparse arrays.

property tags: List[jina.types.struct.StructView]

Get the tags attribute of all Documents

Return type

List[StructView]

Returns

List of tags attributes for all Documents

property texts: List[str]

Get the text attribute of all Documents

Return type

List[str]

Returns

List of text attributes for all Documents

property buffers: List[bytes]

Get the buffer attribute of all Documents

Return type

List[bytes]

Returns

List of buffer attributes for all Documents

property blobs: numpy.ndarray

Return a np.ndarray stacking all the blob attributes.

The blob attributes are stacked together along a newly created first dimension (as if you would stack using np.stack(X, axis=0)).

Warning

This operation assumes all blobs have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray / DocumentArrayMemmap

Warning

This operation currently does not support sparse arrays.

Return type

ndarray

Returns

blobs stacked per row as np.ndarray.

property path: str

Get the path where the instance is stored.

Return type

str

Returns

The stored path of the memmap instance.

jina.Executor

alias of jina.executors.BaseExecutor

class jina.Flow(*, asyncio: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", https: Optional[bool] = 'False', port: Optional[int] = 'None', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', **kwargs)[source]
class jina.Flow(*, compress: Optional[str] = "'NONE'", compress_min_bytes: Optional[int] = '1024', compress_min_ratio: Optional[float] = '1.1', cors: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'True', daemon: Optional[bool] = 'False', default_swagger_ui: Optional[bool] = 'False', description: Optional[str] = 'None', env: Optional[dict] = 'None', expose_endpoints: Optional[str] = 'None', expose_public: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = "'gateway'", native: Optional[bool] = 'False', no_crud_endpoints: Optional[bool] = 'False', no_debug_endpoints: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", port_ctrl: Optional[int] = 'None', port_expose: Optional[int] = 'None', port_in: Optional[int] = 'None', port_out: Optional[int] = 'None', prefetch: Optional[int] = '0', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'GRPCRuntime'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_CONNECT'", socket_out: Optional[str] = "'PUSH_CONNECT'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', title: Optional[str] = 'None', uses: Optional[Union[str, Type[BaseExecutor], dict]] = "'BaseExecutor'", uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', uvicorn_kwargs: Optional[dict] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs)
class jina.Flow(*, env: Optional[dict] = 'None', inspect: Optional[str] = "'COLLECT'", log_config: Optional[str] = 'None', name: Optional[str] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', static_routing_table: Optional[bool] = 'False', uses: Optional[str] = 'None', workspace: Optional[str] = "'./'", **kwargs)

Bases: jina.clients.mixin.PostMixin, jina.jaml.JAMLCompatible, contextlib.ExitStack

Flow is how Jina streamlines and distributes Executors.

property last_pod

Last pod

needs(needs, name='joiner', *args, **kwargs)[source]

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]

Collect all hanging Pods so far and add a blocker to the Flow; wait until all handing peas completed.

Parameters
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type

Flow

Returns

the modified Flow

add(*, connect_to_predecessor: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'False', daemon: Optional[bool] = 'False', docker_kwargs: Optional[dict] = 'None', entrypoint: Optional[str] = 'None', env: Optional[dict] = 'None', expose_public: Optional[bool] = 'False', external: Optional[bool] = 'False', force: Optional[bool] = 'False', gpus: Optional[str] = 'None', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', install_requirements: Optional[bool] = 'False', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = 'None', native: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", peas_hosts: Optional[List[str]] = 'None', polling: Optional[str] = "'ANY'", port_ctrl: Optional[int] = 'None', port_in: Optional[int] = 'None', port_jinad: Optional[int] = '8000', port_out: Optional[int] = 'None', pull_latest: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', quiet_remote_logs: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'ZEDRuntime'", scheduling: Optional[str] = "'LOAD_BALANCE'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_BIND'", socket_out: Optional[str] = "'PUSH_BIND'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', upload_files: Optional[List[str]] = 'None', uses: Optional[Union[str, Type['BaseExecutor'], dict]] = "'BaseExecutor'", uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = 'None', uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = 'None', uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', volumes: Optional[List[str]] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs) Union['Flow', 'AsyncFlow'][source]

Add a Pod to the current Flow object and return the new modified Flow object. The attribute of the Pod can be later changed with set() or deleted with remove()

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the Pod(s) that this Pod receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.

  • pod_role (PodRoleType) – the role of the Pod, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Pod CLI supports

Return type

Flow

Returns

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]

Add an inspection on the last changed Pod in the Flow

Internally, it adds two Pods to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BasePod(_pass) -- Flow
        |
        -- PUB-SUB -- InspectPod (Hanging)

In this way, InspectPod looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters
  • name (str) – name of the Pod

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_pod=True, *args, **kwargs)[source]

Gather all inspect Pods output into one Pod. When the Flow has no inspect Pod then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather Pod

  • include_last_pod (bool) – if to include the last modified Pod in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

Returns

this instance

property num_pods: int

Get the number of Pods in this Flow

Return type

int

property num_peas: int

Get the number of peas (shards count) in this Flow

Return type

int

property client: BaseClient

Return a BaseClient object attach to this Flow.

Return type

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='pod_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the Flow

property port_expose: int

Return the exposed port of the gateway .. # noqa: DAR201

Return type

int

property host: str

Return the local address of the gateway .. # noqa: DAR201

Return type

str

property address_private: str

Return the private IP address of the gateway for connecting from other machine in the same network

Return type

str

property address_public: str

Return the public IP address of the gateway for connecting from other machine in the public network

Return type

str

block()[source]

Block the process until user hits KeyboardInterrupt

property protocol: jina.enums.GatewayProtocolType

Return the protocol of this Flow

Return type

GatewayProtocolType

Returns

the protocol of this Flow

property workspace: str

Return the workspace path of the flow.

Return type

str

property workspace_id: Dict[str, str]

Get all Pods’ workspace_id values in a dict

Return type

Dict[str, str]

property env: Optional[Dict]

Get all envs to be set in the Flow

Return type

Optional[Dict]

Returns

envs as dict

property identity: Dict[str, str]

Get all Pods’ identity values in a dict

Return type

Dict[str, str]

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = 'None', status_code: int = '200', tags: Optional[List[str]] = 'None', summary: Optional[str] = 'None', description: Optional[str] = 'None', response_description: str = "'Successful Response'", deprecated: Optional[bool] = 'None', methods: Optional[List[str]] = 'None', operation_id: Optional[str] = 'None', response_model_by_alias: bool = 'True', response_model_exclude_unset: bool = 'False', response_model_exclude_defaults: bool = 'False', response_model_exclude_none: bool = 'False', include_in_schema: bool = 'True', name: Optional[str] = 'None')

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

join(needs, name='joiner', *args, **kwargs)

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

rolling_update(pod_name, dump_path=None, *, uses_with=None)[source]

Reload all replicas of a pod sequentially

Parameters
  • pod_name (str) – pod to update

  • dump_path (Optional[str]) – backwards compatibility This function was only accepting dump_path as the only potential arg to override

  • uses_with (Optional[Dict]) – a Dictionary of arguments to restart the executor with

property client_args: argparse.Namespace

Get Client settings.

# noqa: DAR201

Return type

Namespace

property gateway_args: argparse.Namespace

Get Gateway settings.

# noqa: DAR201

Return type

Namespace

update_network_interface(**kwargs)[source]

Update the network interface of this Flow (affects Gateway & Client)

Parameters

kwargs – new network settings

jina.requests(func=None, *, on=None)[source]

@requests defines when a function will be invoked. It has a keyword on= to define the endpoint.

A class method decorated with plan @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

Parameters
Returns

decorated function