jina package

Subpackages

Submodules

Module contents

Top-level module of Jina.

The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.

class jina.AsyncFlow(*, asyncio: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", https: Optional[bool] = 'False', port: Optional[int] = 'None', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', **kwargs)[source]
class jina.AsyncFlow(*, compress: Optional[str] = "'NONE'", compress_min_bytes: Optional[int] = '1024', compress_min_ratio: Optional[float] = '1.1', cors: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'True', daemon: Optional[bool] = 'False', default_swagger_ui: Optional[bool] = 'False', description: Optional[str] = 'None', env: Optional[dict] = 'None', expose_endpoints: Optional[str] = 'None', expose_public: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = "'gateway'", native: Optional[bool] = 'False', no_crud_endpoints: Optional[bool] = 'False', no_debug_endpoints: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", port_ctrl: Optional[int] = 'None', port_expose: Optional[int] = 'None', port_in: Optional[int] = 'None', port_out: Optional[int] = 'None', prefetch: Optional[int] = '0', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', reduce: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'GRPCRuntime'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_CONNECT'", socket_out: Optional[str] = "'PUSH_CONNECT'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', title: Optional[str] = 'None', uses: Optional[Union[str, Type[BaseExecutor], dict]] = "'BaseExecutor'", uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', uvicorn_kwargs: Optional[dict] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs)
class jina.AsyncFlow(*, env: Optional[dict] = 'None', inspect: Optional[str] = "'COLLECT'", log_config: Optional[str] = 'None', name: Optional[str] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', reduce: Optional[bool] = 'False', static_routing_table: Optional[bool] = 'False', uses: Optional[str] = 'None', workspace: Optional[str] = "'./'", **kwargs)

Bases: jina.clients.mixin.AsyncPostMixin, jina.flow.base.Flow

AsyncFlow is the asynchronous version of the Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
from jina.types.document.generators import from_ndarray
import numpy as np

with AsyncFlow().add() as f:
    await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

jina.Client(args=None, **kwargs)[source]

Jina Python client.

Parameters
  • args (Optional[ForwardRef]) – Namespace args.

  • kwargs – Additional arguments.

Return type

Union[ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef, ForwardRef]

Returns

An instance of GRPCClient or WebSocketClient.

class jina.Document(adjacency: Optional[int] = None, blob: Optional[ArrayType] = None, buffer: Optional[bytes] = None, chunks: Optional[Iterable[Document]] = None, embedding: Optional[ArrayType] = None, granularity: Optional[int] = None, id: Optional[str] = None, location: Optional[Sequence[float]] = None, matches: Optional[Iterable[Document]] = None, mime_type: Optional[str] = None, modality: Optional[str] = None, offset: Optional[float] = None, parent_id: Optional[str] = None, tags: Optional[Union[Dict, docarray.simple.struct.StructView]] = None, text: Optional[str] = None, uri: Optional[str] = None, weight: Optional[float] = None, **kwargs)[source]

Bases: docarray.document.mixins.AllMixins, docarray.base.BaseProtoView

Document is one of the primitive data type in Jina.

It offers a Pythonic interface to allow users access and manipulate jina.docarray_pb2.DocumentProto object without working with Protobuf itself.

To create a Document object, simply:

from jina import Document
d = Document()
d.text = 'abc'

Jina requires each Document to have a string id. You can set a custom one, or if non has been set a random one will be assigned.

To access and modify the content of the document, you can use text, blob, and buffer. Each property is implemented with proper setter, to improve the integrity and user experience. For example, assigning doc.blob or doc.embedding can be simply done via:

import numpy as np

# to set as content
d.content = np.random.random([10, 5])

# to set as embedding
d.embedding = np.random.random([10, 5])

MIME type is auto set/guessed when setting content and uri

Document also provides multiple way to build from existing Document. You can build Document from docarray_pb2.DocumentProto, bytes, str, and Dict. You can also use it as view (i.e. weak reference when building from an existing docarray_pb2.DocumentProto). For example,

a = DocumentProto()
b = Document(a, copy=False)
a.text = 'hello'
assert b.text == 'hello'

You can leverage the convert_a_to_b() interface to convert between content forms.

Parameters
  • obj (Union[ForwardRef, ForwardRef, None]) – the document to construct from. If bytes is given then deserialize a DocumentProto; dict is given then parse a DocumentProto from it; str is given, then consider it as a JSON string and parse a DocumentProto from it; finally, one can also give DocumentProto directly, then depending on the copy, it builds a view or a copy from it.

  • copy (bool) – when document is given as a DocumentProto object, build a view (i.e. weak reference) from it or a deep copy from it.

  • field_resolver (Dict[str, str]) – a map from field names defined in JSON, dict to the field names defined in Document.

  • kwargs – other parameters to be set _after_ the document is constructed

Note

When document is a JSON string or Python dictionary object, the constructor will only map the values from known fields defined in Protobuf, all unknown fields are mapped to document.tags. For example,

d = Document({'id': '123', 'hello': 'world', 'tags': {'good': 'bye'}})

assert d.id == '123'  # true
assert d.tags['hello'] == 'world'  # true
assert d.tags['good'] == 'bye'  # true
property weight: float
Return type

float

Returns

the weight of the document

property modality: str
Return type

str

Returns

the modality of the document.

property tags: docarray.simple.struct.StructView

Return the tags field of this Document as a Python dict

Return type

StructView

Returns

a Python dict view of the tags.

property id: str

The document id in string.

Return type

str

Returns

the id of this Document

property parent_id: str

The document’s parent id in string.

Return type

str

Returns

the parent id of this Document

property blob: ArrayType

Return blob, one of the content form of a Document.

Note

Use content to return the content of a Document

This property will return the blob of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the blob stored is sparse, it will return them as a coo matrix.

Return type

ArrayType

Returns

the blob content of thi Document

property embedding: ArrayType

Return embedding of the content of a Document.

Note

This property will return the embedding of the Document as a Dense or Sparse array depending on the actual proto instance stored. In the case where the embedding stored is sparse, it will return them as a coo matrix.

Return type

ArrayType

Returns

the embedding of this Document

property matches: MatchArray

Get all matches of the current document.

Return type

MatchArray

Returns

the array of matches attached to this document

property chunks: ChunkArray

Get all chunks of the current document.

Return type

ChunkArray

Returns

the array of chunks of this document

property buffer: bytes

Return buffer, one of the content form of a Document.

Note

Use content to return the content of a Document

Return type

bytes

Returns

the buffer bytes from this document

property text: str

Return text, one of the content form of a Document.

Note

Use content to return the content of a Document

Return type

str

Returns

the text from this document content

property uri: str

Return the URI of the document.

Return type

str

Returns

the uri of this Document

property mime_type: str

Get MIME type of the document

Return type

str

Returns

the mime_type of this Document

property granularity: int

Return the granularity of the document.

Return type

int

Returns

the granularity of this Document

property adjacency: int

Return the adjacency of the document.

Return type

int

Returns

the adjacency of this Document

property scores

Return the scores of the document.

Returns

the scores attached to this document as :class:NamedScoreMapping

property evaluations: docarray.simple.map.NamedScoreMap

Return the evaluations of the document.

Return type

NamedScoreMap

Returns

the evaluations attached to this document as :class:NamedScoreMapping

property location: Tuple[float]

Get the location information.

Return type

Tuple[float]

Returns

location info in a tuple.

property offset: float

Get the offset information of this Document.

Return type

float

Returns

the offset

class jina.DocumentArray(docs=None)[source]

Bases: docarray.array.mixins.AllMixins, collections.abc.MutableSequence

DocumentArray is a mutable sequence of Document. It gives an efficient view of a list of Document. One can iterate over it like a generator but ALSO modify it, count it, get item, or union two ‘DocumentArray’s using the ‘+’ and ‘+=’ operators.

It is supposed to act as a view containing a pointer to a RepeatedContainer of DocumentProto while offering Document Jina native types when getting items or iterating over it

Parameters

docs (Optional[ForwardRef]) – the document array to construct from. One can also give DocumentArrayProto directly, then depending on the copy, it builds a view or a copy from it. It also can accept a List

insert(index, doc)[source]

Insert doc at index.

Parameters
  • index (int) – Position of the insertion.

  • doc (Document) – The doc needs to be inserted.

Return type

None

append(doc)[source]

Append doc in DocumentArray.

Parameters

doc (Document) – The doc needs to be appended.

extend(docs)[source]

Extend the DocumentArray by appending all the items from the iterable.

Parameters

docs (Iterable[Document]) – the iterable of Documents to extend this array with

Return type

None

clear()[source]

Clear the data of DocumentArray

reverse()[source]

In-place reverse the sequence.

sort(key, top_k=None, reverse=False)[source]

Sort the items of the DocumentArray in place.

Parameters
  • key (Callable) – key callable to sort based upon

  • top_k (Optional[int]) – make sure that the first topk elements are correctly sorted rather than sorting the entire list

  • reverse (bool) – reverse=True will sort the list in descending order. Default is False

class jina.DocumentArrayMemmap(path=None, key_length=36, buffer_pool_size=1000)[source]

Bases: docarray.array.mixins.AllMixins, collections.abc.MutableSequence

Create a memory-map to an DocumentArray stored in binary files on disk.

Memory-mapped files are used for accessing Document of large DocumentArray on disk, without reading the entire file into memory.

The DocumentArrayMemmap on-disk storage consists of two files:
  • header.bin: stores id, offset, length and boundary info of each Document in body.bin;

  • body.bin: stores Documents continuously

When loading DocumentArrayMemmap, it loads the content of header.bin into memory, while storing all body.bin data on disk. As header.bin is often much smaller than body.bin, memory is saved.

DocumentArrayMemmap also loads a portion of the documents in a memory buffer and keeps the memory documents synced with the disk. This helps ensure that modified documents are persisted to the disk. The memory buffer size is configured with parameter buffer_pool_size which represents the number of documents that the buffer can store.

Note

To make sure the documents you modify are persisted to disk, make sure that the number of referenced documents does not exceed the buffer pool size. Otherwise, they won’t be referenced by the buffer pool and they will not be persisted. The best practice is to always reference documents using DAM.

This class is designed to work similarly as DocumentArray but differs in the following aspects:
  • you can set the attribute of elements in a DocumentArrayMemmap but you need to make sure that you

don’t reference more documents than the buffer pool size - each document

To convert between a DocumentArrayMemmap and a DocumentArray

# convert from DocumentArrayMemmap to DocumentArray
dam = DocumentArrayMemmap('./tmp')
...

da = DocumentArray(dam)

# convert from DocumentArray to DocumentArrayMemmap
dam2 = DocumentArrayMemmap('./tmp')
dam2.extend(da)
insert(index, doc)[source]

Insert doc at index.

Parameters
  • index (int) – the offset index of the insertion.

  • doc (Document) – the doc needs to be inserted.

Return type

None

reload()[source]

Reload header of this object from the disk.

This function is useful when another thread/process modify the on-disk storage and the change has not been reflected in this DocumentArray object.

This function only reloads the header, not the body.

extend(docs)[source]

Extend the DocumentArrayMemmap by appending all the items from the iterable.

Parameters

docs (Iterable[Document]) – the iterable of Documents to extend this array with

Return type

None

clear()[source]

Clear the on-disk data of DocumentArrayMemmap

Return type

None

append(doc, flush=True, update_buffer=True)[source]

Append doc in DocumentArrayMemmap.

Parameters
  • doc (Document) – The doc needs to be appended.

  • update_buffer (bool) – If set, update the buffer.

  • flush (bool) – If set, then flush to disk on done.

Return type

None

flush()[source]

Persists memory loaded documents to disk

Return type

None

prune()[source]

Prune deleted Documents from this object, this yields a smaller on-disk storage.

Return type

None

property physical_size: int

Return the on-disk physical size of this DocumentArrayMemmap, in bytes

Return type

int

Returns

the number of bytes

property path: str

Get the path where the instance is stored.

Return type

str

Returns

The stored path of the memmap instance.

jina.Executor

alias of jina.executors.BaseExecutor

class jina.Flow(*, asyncio: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", https: Optional[bool] = 'False', port: Optional[int] = 'None', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', **kwargs)[source]
class jina.Flow(*, compress: Optional[str] = "'NONE'", compress_min_bytes: Optional[int] = '1024', compress_min_ratio: Optional[float] = '1.1', cors: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'True', daemon: Optional[bool] = 'False', default_swagger_ui: Optional[bool] = 'False', description: Optional[str] = 'None', env: Optional[dict] = 'None', expose_endpoints: Optional[str] = 'None', expose_public: Optional[bool] = 'False', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = "'gateway'", native: Optional[bool] = 'False', no_crud_endpoints: Optional[bool] = 'False', no_debug_endpoints: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", port_ctrl: Optional[int] = 'None', port_expose: Optional[int] = 'None', port_in: Optional[int] = 'None', port_out: Optional[int] = 'None', prefetch: Optional[int] = '0', protocol: Optional[str] = "'GRPC'", proxy: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', reduce: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'GRPCRuntime'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_CONNECT'", socket_out: Optional[str] = "'PUSH_CONNECT'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', title: Optional[str] = 'None', uses: Optional[Union[str, Type[BaseExecutor], dict]] = "'BaseExecutor'", uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', uvicorn_kwargs: Optional[dict] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs)
class jina.Flow(*, env: Optional[dict] = 'None', inspect: Optional[str] = "'COLLECT'", log_config: Optional[str] = 'None', name: Optional[str] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', reduce: Optional[bool] = 'False', static_routing_table: Optional[bool] = 'False', uses: Optional[str] = 'None', workspace: Optional[str] = "'./'", **kwargs)

Bases: jina.clients.mixin.PostMixin, jina.jaml.JAMLCompatible, contextlib.ExitStack

Flow is how Jina streamlines and distributes Executors.

property last_pod

Last pod

needs(needs, name='joiner', *args, **kwargs)[source]

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]

Collect all hanging Pods so far and add a blocker to the Flow; wait until all handing peas completed.

Parameters
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type

Flow

Returns

the modified Flow

add(*, connect_to_predecessor: Optional[bool] = 'False', ctrl_with_ipc: Optional[bool] = 'False', daemon: Optional[bool] = 'False', docker_kwargs: Optional[dict] = 'None', entrypoint: Optional[str] = 'None', env: Optional[dict] = 'None', expose_public: Optional[bool] = 'False', external: Optional[bool] = 'False', force_update: Optional[bool] = 'False', gpus: Optional[str] = 'None', host: Optional[str] = "'0.0.0.0'", host_in: Optional[str] = "'0.0.0.0'", host_out: Optional[str] = "'0.0.0.0'", hosts_in_connect: Optional[List[str]] = 'None', install_requirements: Optional[bool] = 'False', log_config: Optional[str] = 'None', memory_hwm: Optional[int] = '- 1', name: Optional[str] = 'None', native: Optional[bool] = 'False', on_error_strategy: Optional[str] = "'IGNORE'", peas_hosts: Optional[List[str]] = 'None', polling: Optional[str] = "'ANY'", port_ctrl: Optional[int] = 'None', port_in: Optional[int] = 'None', port_jinad: Optional[int] = '8000', port_out: Optional[int] = 'None', pull_latest: Optional[bool] = 'False', py_modules: Optional[List[str]] = 'None', quiet: Optional[bool] = 'False', quiet_error: Optional[bool] = 'False', quiet_remote_logs: Optional[bool] = 'False', reduce: Optional[bool] = 'False', replicas: Optional[int] = '1', runs_in_docker: Optional[bool] = 'False', runtime_backend: Optional[str] = "'PROCESS'", runtime_cls: Optional[str] = "'ZEDRuntime'", scheduling: Optional[str] = "'LOAD_BALANCE'", shards: Optional[int] = '1', socket_in: Optional[str] = "'PULL_BIND'", socket_out: Optional[str] = "'PUSH_BIND'", ssh_keyfile: Optional[str] = 'None', ssh_password: Optional[str] = 'None', ssh_server: Optional[str] = 'None', static_routing_table: Optional[bool] = 'False', timeout_ctrl: Optional[int] = '5000', timeout_ready: Optional[int] = '600000', upload_files: Optional[List[str]] = 'None', uses: Optional[Union[str, Type['BaseExecutor'], dict]] = "'BaseExecutor'", uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = 'None', uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = 'None', uses_metas: Optional[dict] = 'None', uses_requests: Optional[dict] = 'None', uses_with: Optional[dict] = 'None', volumes: Optional[List[str]] = 'None', workspace: Optional[str] = 'None', zmq_identity: Optional[str] = 'None', **kwargs) Union['Flow', 'AsyncFlow'][source]

Add a Pod to the current Flow object and return the new modified Flow object. The attribute of the Pod can be later changed with set() or deleted with remove()

Parameters
  • needs (Union[str, Tuple[str], List[str], None]) – the name of the Pod(s) that this Pod receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.

  • pod_role (PodRoleType) – the role of the Pod, used for visualization and route planning

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Pod CLI supports

Return type

Flow

Returns

a (new) Flow object with modification

inspect(name='inspect', *args, **kwargs)[source]

Add an inspection on the last changed Pod in the Flow

Internally, it adds two Pods to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BasePod(_pass) -- Flow
        |
        -- PUB-SUB -- InspectPod (Hanging)

In this way, InspectPod looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters
  • name (str) – name of the Pod

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_pod=True, *args, **kwargs)[source]

Gather all inspect Pods output into one Pod. When the Flow has no inspect Pod then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters
  • name (str) – the name of the gather Pod

  • include_last_pod (bool) – if to include the last modified Pod in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type

Flow

Returns

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False)[source]

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters

copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]

Start to run all Pods in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.peapods.peas.BasePea

Returns

this instance

property num_pods: int

Get the number of Pods in this Flow

Return type

int

property num_peas: int

Get the number of peas (shards count) in this Flow

Return type

int

property client: BaseClient

Return a BaseClient object attach to this Flow.

Return type

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='pod_a').plot('flow.svg')
Parameters
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type

Flow

Returns

the Flow

property port_expose: int

Return the exposed port of the gateway .. # noqa: DAR201

Return type

int

property host: str

Return the local address of the gateway .. # noqa: DAR201

Return type

str

property address_private: str

Return the private IP address of the gateway for connecting from other machine in the same network

Return type

str

property address_public: str

Return the public IP address of the gateway for connecting from other machine in the public network

Return type

str

block(stop_event=None)[source]

Block the Flow until stop_event is set or user hits KeyboardInterrupt

Parameters

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

property protocol: jina.enums.GatewayProtocolType

Return the protocol of this Flow

Return type

GatewayProtocolType

Returns

the protocol of this Flow

property workspace: str

Return the workspace path of the flow.

Return type

str

property workspace_id: Dict[str, str]

Get all Pods’ workspace_id values in a dict

Return type

Dict[str, str]

property env: Optional[Dict]

Get all envs to be set in the Flow

Return type

Optional[Dict]

Returns

envs as dict

property identity: Dict[str, str]

Get all Pods’ identity values in a dict

Return type

Dict[str, str]

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = 'None', status_code: int = '200', tags: Optional[List[str]] = 'None', summary: Optional[str] = 'None', description: Optional[str] = 'None', response_description: str = "'Successful Response'", deprecated: Optional[bool] = 'None', methods: Optional[List[str]] = 'None', operation_id: Optional[str] = 'None', response_model_by_alias: bool = 'True', response_model_exclude_unset: bool = 'False', response_model_exclude_defaults: bool = 'False', response_model_exclude_none: bool = 'False', include_in_schema: bool = 'True', name: Optional[str] = 'None')

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

join(needs, name='joiner', *args, **kwargs)

Add a blocker to the Flow, wait until all peas defined in needs completed.

Parameters
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type

Flow

Returns

the modified Flow

rolling_update(pod_name, dump_path=None, *, uses_with=None)[source]

Reload all replicas of a pod sequentially

Parameters
  • pod_name (str) – pod to update

  • dump_path (Optional[str]) – backwards compatibility This function was only accepting dump_path as the only potential arg to override

  • uses_with (Optional[Dict]) – a Dictionary of arguments to restart the executor with

scale(pod_name, replicas)[source]

Scale the amount of replicas of a given Executor.

Parameters
  • pod_name (str) – pod to update

  • replicas (int) – The number of replicas to scale to

property client_args: argparse.Namespace

Get Client settings.

# noqa: DAR201

Return type

Namespace

property gateway_args: argparse.Namespace

Get Gateway settings.

# noqa: DAR201

Return type

Namespace

update_network_interface(**kwargs)[source]

Update the network interface of this Flow (affects Gateway & Client)

Parameters

kwargs – new network settings

jina.requests(func=None, *, on=None)[source]

@requests defines when a function will be invoked. It has a keyword on= to define the endpoint.

A class method decorated with plan @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

Parameters
  • func (Callable[[ForwardRef, Dict, ForwardRef, List[ForwardRef], List[ForwardRef]], Union[ForwardRef, Dict, None]]) – the method to decorate

  • on (Union[str, Sequence[str], None]) – the endpoint string, by convention starts with /

Returns

decorated function