jina package

Subpackages

Module contents

Top-level module of Jina.

The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.

class jina.AsyncFlow(asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port_expose: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, **kwargs)[source]
class jina.AsyncFlow(compress: Optional[str] = 'NONE', compress_min_bytes: Optional[int] = 1024, compress_min_ratio: Optional[float] = 1.1, cors: Optional[bool] = False, ctrl_with_ipc: Optional[bool] = True, daemon: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_public: Optional[bool] = False, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', host_out: Optional[str] = '0.0.0.0', hosts_in_connect: Optional[List[str]] = None, log_config: Optional[str] = None, memory_hwm: Optional[int] = - 1, name: Optional[str] = 'gateway', no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, on_error_strategy: Optional[str] = 'IGNORE', port_ctrl: Optional[int] = None, port_expose: Optional[int] = None, port_in: Optional[int] = None, port_out: Optional[int] = None, prefetch: Optional[int] = 50, prefetch_on_recv: Optional[int] = 1, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCRuntime', socket_in: Optional[str] = 'PULL_CONNECT', socket_out: Optional[str] = 'PUSH_CONNECT', ssh_keyfile: Optional[str] = None, ssh_password: Optional[str] = None, ssh_server: Optional[str] = None, timeout_ctrl: Optional[int] = 5000, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_with: Optional[dict] = None, workspace: Optional[str] = None, zmq_identity: Optional[str] = None, **kwargs)
class jina.AsyncFlow(env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = './', **kwargs)

Bases: jina.clients.mixin.AsyncPostMixin, jina.flow.base.Flow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
from jina.types.document.generators import from_ndarray
import numpy as np

with AsyncFlow().add() as f:
    await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

jina.Client(args=None, **kwargs)[source]

Jina Python client.

Parameters
  • args (Optional[ForwardRef]) – Namespace args.

  • kwargs – Additional arguments.

Return type

Union[ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef]

Returns

An instance of GRPCClient or WebSocketClient.

class jina.Document(document=None, field_resolver=None, copy=False, hash_content=True, **kwargs)[source]

Bases: jina.types.mixin.ProtoTypeMixin

Document is one of the primitive data type in Jina.

It offers a Pythonic interface to allow users access and manipulate jina.jina_pb2.DocumentProto object without working with Protobuf itself.

To create a Document object, simply:

from jina import Document
d = Document()
d.text = 'abc'

Jina requires each Document to have a string id. You can set a custom one, or if non has been set a random one will be assigned.

Or you can use Document as a context manager:

with Document() as d:
    d.text = 'hello'

assert d.id  # now `id` has value

To access and modify the content of the document, you can use text, blob, and buffer. Each property is implemented with proper setter, to improve the integrity and user experience. For example, assigning doc.blob or doc.embedding can be simply done via:

import numpy as np

# to set as content
d.content = np.random.random([10, 5])

# to set as embedding
d.embedding = np.random.random([10, 5])

MIME type is auto set/guessed when setting content and uri

Document also provides multiple way to build from existing Document. You can build Document from jina_pb2.DocumentProto, bytes, str, and Dict. You can also use it as view (i.e. weak reference when building from an existing jina_pb2.DocumentProto). For example,

a = DocumentProto()
b = Document(a, copy=False)
a.text = 'hello'
assert b.text == 'hello'

You can leverage the convert_a_to_b() interface to convert between content forms.

Parameters
  • document (Optional[~DocumentSourceType]) – the document to construct from. If bytes is given then deserialize a DocumentProto; dict is given then parse a DocumentProto from it; str is given, then consider it as a JSON string and parse a DocumentProto from it; finally, one can also give DocumentProto directly, then depending on the copy, it builds a view or a copy from it.

  • copy (bool) – when document is given as a DocumentProto object, build a view (i.e. weak reference) from it or a deep copy from it.

  • field_resolver (Optional[Dict[str, str]]) – a map from field names defined in document (JSON, dict) to the field names defined in Protobuf. This is only used when the given document is a JSON string or a Python dict.

  • kwargs – other parameters to be set _after_ the document is constructed

  • hash_content (bool) – whether to hash the content of the Document

Note

When document is a JSON string or Python dictionary object, the constructor will only map the values from known fields defined in Protobuf, all unknown fields are mapped to document.tags. For example,

d = Document({'id': '123', 'hello': 'world', 'tags': {'good': 'bye'}})

assert d.id == '123'  # true
assert d.tags['hello'] == 'world'  # true
assert d.tags['good'] == 'bye'  # true
pop(*fields)[source]

Remove the values from the given fields of this Document.

Parameters

fields – field names

Return type

None

clear()[source]

Remove all values from all fields of this Document.

Return type

None

property weight
Return type

float

Returns

the weight of the document

property modality
Return type

str

Returns

the modality of the document.

property content_hash

Get the content hash of the document.

Returns

the content_hash from the proto

property tags

Return the tags field of this Document as a Python dict

Return type

StructView

Returns

a Python dict view of the tags.

update(source, fields=None)[source]

Updates fields specified in fields from the source to current Document.

Parameters
  • source (Document) – source Document object.

  • fields (Optional[List[str]]) – a list of field names that included from the current document, if not specified, merge all fields.

Note

*. destination will be modified in place, source will be unchanged

Return type

None

update_content_hash(fields=('text', 'blob', 'buffer', 'embedding', 'uri', 'tags', 'mime_type', 'granularity', 'adjacency'))[source]

Update the document hash according to its content.

Parameters

fields (Tuple[str]) – a tuple of field names that inclusive when computing content hash.

Return type

None

property id

The document id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable. it will be used as the major view.

Return type

str

Returns

the id from the proto

property parent_id

The document’s parent id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable. it will be used as the major view.

Return type

str

Returns

the parent id from the proto

get_sparse_blob(sparse_ndarray_cls_type, **kwargs)[source]

Return blob of the content of a Document as an sparse array.

Parameters
  • sparse_ndarray_cls_type (Type[BaseSparseNdArray]) – Sparse class type, such as SparseNdArray.

  • kwargs – Additional key value argument, for scipy backend, we need to set the keyword sp_format as one of the scipy supported sparse format, such as coo or csr.

Return type

SparseArrayType

Returns

the blob from the proto as an sparse array

property blob

Return blob, one of the content form of a Document.

Note

Use content to return the content of a Document

This property will return the blob of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the blob stored is sparse, it will return them as a coo matrix. If any other type of sparse type is desired, use the :meth:`get_sparse_blob.

Return type

ArrayType

Returns

the blob content from the proto

get_sparse_embedding(sparse_ndarray_cls_type, **kwargs)[source]

Return embedding of the content of a Document as an sparse array.

Parameters
  • sparse_ndarray_cls_type (Type[BaseSparseNdArray]) – Sparse class type, such as SparseNdArray.

  • kwargs – Additional key value argument, for scipy backend, we need to set the keyword sp_format as one of the scipy supported sparse format, such as coo or csr.

Return type

SparseArrayType

Returns

the embedding from the proto as an sparse array

property embedding

Return embedding of the content of a Document.

Note

This property will return the embedding of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the embedding stored is sparse, it will return them as a coo matrix. If any other type of sparse type is desired, use the :meth:`get_sparse_embedding.

Return type

SparseArrayType

Returns

the embedding from the proto

property matches

Get all matches of the current document.

Return type

MatchArray

Returns

the array of matches attached to this document

property chunks

Get all chunks of the current document.

Return type

ChunkArray

Returns

the array of chunks of this document

set_attributes(**kwargs)[source]

Bulk update Document fields with key-value specified in kwargs

See also

get_attrs() for bulk get attributes

Parameters

kwargs – the keyword arguments to set the values, where the keys are the fields to set

get_attributes(*fields)[source]

Bulk fetch Document fields and return a list of the values of these fields

Note

Arguments will be extracted using dunder_get .. highlight:: python .. code-block:: python

d = Document({‘id’: ‘123’, ‘hello’: ‘world’, ‘tags’: {‘id’: ‘external_id’, ‘good’: ‘bye’}})

assert d.id == ‘123’ # true assert d.tags[‘hello’] == ‘world’ # true assert d.tags[‘good’] == ‘bye’ # true assert d.tags[‘id’] == ‘external_id’ # true

res = d.get_attrs_values(*[‘id’, ‘tags__hello’, ‘tags__good’, ‘tags__id’])

assert res == [‘123’, ‘world’, ‘bye’, ‘external_id’]

Parameters

fields (str) – the variable length values to extract from the document

Return type

Union[Any, List[Any]]

Returns

a list with the attributes of this document ordered as the args

property buffer

Return buffer, one of the content form of a Document.

Note

Use content to return the content of a Document

Return type

bytes

Returns

the buffer bytes from this document

property text

Return text, one of the content form of a Document.

Note

Use content to return the content of a Document

Returns

the text from this document content

property uri

Return the URI of the document.

Return type

str

Returns

the uri from this document proto

property mime_type

Get MIME type of the document

Return type

str

Returns

the mime_type from this document proto

property content_type

Return the content type of the document, possible values: text, blob, buffer

Return type

str

Returns

the type of content present in this document proto

property content

Return the content of the document. It checks whichever field among blob, text, buffer has value and return it.

See also

blob, buffer, text

Return type

~DocumentContentType

Returns

the value of the content depending on :meth:`content_type

property granularity

Return the granularity of the document.

Returns

the granularity from this document proto

property adjacency

Return the adjacency of the document.

Returns

the adjacency from this document proto

property scores

Return the scores of the document.

Returns

the scores attached to this document as :class:NamedScoreMapping

property evaluations

Return the evaluations of the document.

Returns

the evaluations attached to this document as :class:NamedScoreMapping

convert_image_buffer_to_blob(color_axis=- 1)[source]

Convert an image buffer to blob

Parameters

color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

convert_image_blob_to_uri(width, height, resize_method='BILINEAR')[source]

Assuming blob is a _valid_ image, set uri accordingly :type width: int :param width: the width of the blob :type height: int :param height: the height of the blob :type resize_method: str :param resize_method: the resize method name

convert_image_uri_to_blob(color_axis=- 1, uri_prefix=None)[source]

Convert uri to blob

Parameters
  • color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • uri_prefix (Optional[str]) – the prefix of the uri

convert_image_datauri_to_blob(color_axis=- 1)[source]

Convert data URI to image blob

Parameters

color_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

convert_buffer_to_blob(dtype=None, count=- 1, offset=0)[source]

Assuming the buffer is a _valid_ buffer of Numpy ndarray, set blob accordingly.

Parameters
  • dtype – Data-type of the returned array; default: float.

  • count – Number of items to read. -1 means all data in the buffer.

  • offset – Start reading the buffer from this offset (in bytes); default: 0.

Note

One can only recover values not shape information from pure buffer.

convert_blob_to_buffer()[source]

Convert blob to buffer

convert_uri_to_buffer()[source]

Convert uri to buffer Internally it downloads from the URI and set buffer.

convert_uri_to_datauri(charset='utf-8', base64=False)[source]

Convert uri to data uri. Internally it reads uri into buffer and convert it to data uri

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_buffer_to_uri(charset='utf-8', base64=False)[source]

Convert buffer to data uri. Internally it first reads into buffer and then converts it to data URI.

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_text_to_uri(charset='utf-8', base64=False)[source]

Convert text to data uri.

Parameters
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

convert_uri_to_text()[source]

Assuming URI is text, convert it to text

convert_content_to_uri()[source]

Convert content in URI with best effort

MergeFrom(doc)[source]

Merge the content of target

Parameters

doc (Document) – the document to merge from

CopyFrom(doc)[source]

Copy the content of target

Parameters

doc (Document) – the document to copy from

plot(output=None, inline_display=False)[source]

Visualize the Document recursively.

Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • inline_display (bool) – show image directly inside the Jupyter Notebook

Return type

None

dict(prettify_ndarrays=False, *args, **kwargs)[source]

Return the object in Python dictionary

Parameters
  • prettify_ndarrays – boolean indicating if the ndarrays need to be prettified to be shown as lists of values

  • args – Extra positional arguments

  • kwargs – Extra keyword arguments

Returns

dict representation of the object

json(prettify_ndarrays=False, *args, **kwargs)[source]

Return the object in JSON string

Parameters
  • prettify_ndarrays – boolean indicating if the ndarrays need to be prettified to be shown as lists of values

  • args – Extra positional arguments

  • kwargs – Extra keyword arguments

Returns

JSON string of the object

property non_empty_fields

Return the set fields of the current document that are not empty

Return type

Tuple[str]

Returns

the tuple of non-empty fields

static attributes(include_proto_fields=True, include_proto_fields_camelcase=False, include_properties=False)[source]

Return all attributes supported by the Document, which can be accessed by doc.attribute

Parameters
  • include_proto_fields (bool) – if set, then include all protobuf fields

  • include_proto_fields_camelcase (bool) – if set, then include all protobuf fields in CamelCase

  • include_properties (bool) – if set, then include all properties defined for Document class

Return type

List[str]

Returns

a list of attributes in string.

class jina.DocumentArray(docs=None)[source]

Bases: jina.types.arrays.traversable.TraversableSequence, collections.abc.MutableSequence, jina.types.arrays.document.DocumentArrayGetAttrMixin, jina.types.arrays.neural_ops.DocumentArrayNeuralOpsMixin, jina.types.arrays.search_ops.DocumentArraySearchOpsMixin, collections.abc.Iterable

DocumentArray is a mutable sequence of Document. It gives an efficient view of a list of Document. One can iterate over it like a generator but ALSO modify it, count it, get item, or union two ‘DocumentArray’s using the ‘+’ and ‘+=’ operators.

It is supposed to act as a view containing a pointer to a RepeatedContainer of DocumentProto while offering Document Jina native types when getting items or iterating over it

Parameters

docs (Optional[~DocumentArraySourceType]) – the document array to construct from. One can also give DocumentArrayProto directly, then depending on the copy, it builds a view or a copy from it. It also can accept a List

insert(**kwargs)
append(**kwargs)
extend(iterable)[source]

Extend the DocumentArray by appending all the items from the iterable.

Parameters

iterable (Iterable[Document]) – the iterable of Documents to extend this array with

Return type

None

clear(**kwargs)
reverse()[source]

In-place reverse the sequence.

sort(key=None, *args, **kwargs)[source]

Sort the items of the DocumentArray in place.

Parameters
  • key – key callable to sort based upon

  • args – variable set of arguments to pass to the sorting underlying function

  • kwargs – keyword arguments to pass to the sorting underlying function

save(file, file_format='json')[source]

Save array elements into a JSON or a binary file.

Parameters
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary. JSON file is human-readable, but binary format gives much smaller size and faster save/load speed.

Return type

None

classmethod load(file, file_format='json')[source]

Load array elements from a JSON or a binary file.

Parameters
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary. JSON file is human-readable, but binary format gives much smaller size and faster save/load speed.

Return type

DocumentArray

Returns

the loaded DocumentArray object

save_binary(file)[source]

Save array elements into a binary file.

Comparing to save_json(), it is faster and the file is smaller, but not human-readable.

Parameters

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type

None

save_json(file)[source]

Save array elements into a JSON file.

Comparing to save_binary(), it is human-readable but slower to save/load and the file size larger.

Parameters

file (Union[str, TextIO]) – File or filename to which the data is saved.

Return type

None

classmethod load_json(file)[source]

Load array elements from a JSON file.

Parameters

file (Union[str, TextIO]) – File or filename to which the data is saved.

Return type

DocumentArray

Returns

a DocumentArray object

classmethod load_binary(file)[source]

Load array elements from a binary file.

Parameters

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type

DocumentArray

Returns

a DocumentArray object

jina.Executor

alias of jina.executors.BaseExecutor

class jina.Flow(asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port_expose: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, **kwargs)[source]
class jina.Flow(compress: Optional[str] = 'NONE', compress_min_bytes: Optional[int] = 1024, compress_min_ratio: Optional[float] = 1.1, cors: Optional[bool] = False, ctrl_with_ipc: Optional[bool] = True, daemon: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_public: Optional[bool] = False, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', host_out: Optional[str] = '0.0.0.0', hosts_in_connect: Optional[List[str]] = None, log_config: Optional[str] = None, memory_hwm: Optional[int] = - 1, name: Optional[str] = 'gateway', no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, on_error_strategy: Optional[str] = 'IGNORE', port_ctrl: Optional[int] = None, port_expose: Optional[int] = None, port_in: Optional[int] = None, port_out: Optional[int] = None, prefetch: Optional[int] = 50, prefetch_on_recv: Optional[int] = 1, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCRuntime', socket_in: Optional[str] = 'PULL_CONNECT', socket_out: Optional[str] = 'PUSH_CONNECT', ssh_keyfile: Optional[str] = None, ssh_password: Optional[str] = None, ssh_server: Optional[str] = None, timeout_ctrl: Optional[int] = 5000, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_with: Optional[dict] = None, workspace: Optional[str] = None, zmq_identity: Optional[str] = None, **kwargs)
class jina.Flow(env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = './', **kwargs)

Bases: jina.clients.mixin.PostMixin, jina.jaml.JAMLCompatible, contextlib.ExitStack

Flow is how Jina streamlines and distributes Executors.

property last_pod

Last pod

needs(needs, name='joiner', *args, **kwargs)[source]

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]

Collect all hanging Pods so far and add a blocker to the Flow; wait until all handing peas completed.

Parameters
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type

Flow

Returns

the modified Flow

add(ctrl_with_ipc: Optional[bool] = False, daemon: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, expose_public: Optional[bool] = False, external: Optional[bool] = False, gpus: Optional[str] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', host_out: Optional[str] = '0.0.0.0', hosts_in_connect: Optional[List[str]] = None, log_config: Optional[str] = None, memory_hwm: Optional[int] = - 1, name: Optional[str] = None, on_error_strategy: Optional[str] = 'IGNORE', parallel: Optional[int] = 1, peas_hosts: Optional[List[str]] = None, polling: Optional[str] = 'ANY', port_ctrl: Optional[int] = None, port_expose: Optional[int] = None, port_in: Optional[int] = None, port_out: Optional[int] = None, proxy: Optional[bool] = False, pull_latest: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'ZEDRuntime', scheduling: Optional[str] = 'LOAD_BALANCE', socket_in: Optional[str] = 'PULL_BIND', socket_out: Optional[str] = 'PUSH_BIND', ssh_keyfile: Optional[str] = None, ssh_password: Optional[str] = None, ssh_server: Optional[str] = None, timeout_ctrl: Optional[int] = 5000, timeout_ready: Optional[int] = 600000, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type[‘BaseExecutor’], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type[‘BaseExecutor’], dict]] = None, uses_before: Optional[Union[str, Type[‘BaseExecutor’], dict]] = None, uses_metas: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, workspace: Optional[str] = None, zmq_identity: Optional[str] = None, **kwargs)Union[‘Flow’, ‘AsyncFlow’][source]

Add a Pod to the current Flow object and return the new modified Flow object. The attribute of the Pod can be later changed with set() or deleted with remove()

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the Pod(s) that this Pod receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.

  • pod_role (PodRoleType) – the role of the Pod, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Pod CLI supports

Return type

Flow

Returns

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]

Add an inspection on the last changed Pod in the Flow

Internally, it adds two Pods to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BasePod(_pass) -- Flow
        |
        -- PUB-SUB -- InspectPod (Hanging)

In this way, InspectPod looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters
  • name (str) – name of the Pod

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_pod=True, *args, **kwargs)[source]

Gather all inspect Pods output into one Pod. When the Flow has no inspect Pod then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather Pod

  • include_last_pod (bool) – if to include the last modified Pod in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

Returns

this instance

property num_pods

Get the number of Pods in this Flow

Return type

int

property num_peas

Get the number of peas (parallel count) in this Flow

Return type

int

property client

Return a BaseClient object attach to this Flow.

Return type

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='pod_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the Flow

property port_expose

Return the exposed port of the gateway .. # noqa: DAR201

Return type

int

property host

Return the local address of the gateway .. # noqa: DAR201

Return type

str

property address_private

Return the private IP address of the gateway for connecting from other machine in the same network

Return type

str

property address_public

Return the public IP address of the gateway for connecting from other machine in the public network

Return type

str

block()[source]

Block the process until user hits KeyboardInterrupt

property protocol

Return the protocol of this Flow

Return type

GatewayProtocolType

Returns

the protocol of this Flow

property workspace

Return the workspace path of the flow.

Return type

str

property workspace_id

Get all Pods’ workspace_id values in a dict

Return type

Dict[str, str]

property identity

Get all Pods’ identity values in a dict

Return type

Dict[str, str]

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = 'None', status_code: int = '200', tags: Optional[List[str]] = 'None', summary: Optional[str] = 'None', description: Optional[str] = 'None', response_description: str = "'Successful Response'", deprecated: Optional[bool] = 'None', methods: Optional[List[str]] = 'None', operation_id: Optional[str] = 'None', response_model_by_alias: bool = 'True', response_model_exclude_unset: bool = 'False', response_model_exclude_defaults: bool = 'False', response_model_exclude_none: bool = 'False', include_in_schema: bool = 'True', name: Optional[str] = 'None')

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

join(needs, name='joiner', *args, **kwargs)

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

rolling_update(pod_name, dump_path=None)[source]

Reload Pods sequentially - only used for compound pods.

Parameters
  • dump_path (Optional[str]) – the path from which to read the dump data

  • pod_name (str) – pod to update

property client_args

Get Client settings.

# noqa: DAR201

Return type

Namespace

property gateway_args

Get Gateway settings.

# noqa: DAR201

Return type

Namespace

update_network_interface(**kwargs)[source]

Update the network interface of this Flow (affects Gateway & Client)

Parameters

kwargs – new network settings

jina.requests(func=None, *, on=None)[source]

@requests defines when a function will be invoked. It has a keyword on= to define the endpoint.

A class method decorated with plan @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

Parameters
Returns

decorated function