jina package#

Subpackages#

Submodules#

Module contents#

Top-level module of Jina.

The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.

class jina.AsyncFlow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, port: Optional[int] = None, protocol: Optional[Union[str, List[str]]] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, **kwargs)[source]#
class jina.AsyncFlow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_metadata: Optional[str] = '{}', deployments_no_reduce: Optional[str] = '[]', description: Optional[str] = None, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, port: Optional[int] = None, port_monitoring: Optional[int] = None, prefetch: Optional[int] = 1000, protocol: Optional[Union[str, List[str]]] = ['GRPC'], proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GatewayRuntime', ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
class jina.AsyncFlow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)

Bases: AsyncPostMixin, AsyncProfileMixin, AsyncHealthCheckMixin, Flow

Asynchronous version of jina.Flow. They share the same interface, except in AsyncFlow train(), index(), search() methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. via asyncio.run(), asyncio.create_task().

AsyncFlow can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Flow is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.

In particular, AsyncFlow makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).

from jina import AsyncFlow
from jina.types.document.generators import from_ndarray
import numpy as np

with AsyncFlow().add() as f:
    await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)

Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.

Another example is when using Jina as an integration. Say you have another IO-bounded job heavylifting(), you can use this feature to schedule Jina index() and heavylifting() concurrently.

One can think of Flow as Jina-managed eventloop, whereas AsyncFlow is self-managed eventloop.

Create a Flow. Flow is how Jina streamlines and scales Executors.

EXAMPLE USAGE

Python API

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
with f:
    f.bock()  # serve Flow

To and from YAML configuration

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
f.save_config('flow.yml')  # save YAML config file
f = Flow.load_config('flow.yml')  # load Flow from YAML config
with f:
    f.bock()  # serve Flow
Parameters:
  • asyncio – If set, then the input and output of this Client work in an asynchronous manner.

  • host – The host of the Gateway, which the client should connect to, by default it is 0.0.0.0.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • port – The port of the Gateway, which the client should connect to.

  • protocol – Communication protocol between server and client.

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • tls – If set, connect to gateway using tls encryption

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.

  • deployments_addresses – JSON dictionary with the input addresses of each Deployment

  • deployments_metadata – JSON dictionary with the request metadata for each Deployment

  • deployments_no_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed

  • description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.

  • expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • graph_description – Routing graph for the gateway

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • no_crud_endpoints

    If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.

    Any executor that has @requests(on=…) bound with those values will receive data requests.

  • no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.

  • port – The port for input data to bind the gateway server to, by default, random ports between range [49152, 65535] will be assigned. The port argument can be either 1 single value in case only 1 protocol is used or multiple values when many protocols are used.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • prefetch

    Number of requests fetched from the client before feeding into the first Executor.

    Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)

  • protocol – Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: [‘GRPC’, ‘HTTP’, ‘WEBSOCKET’].

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • py_modules

    The customized python modules need to be imported before loading the gateway

    Note that the recommended way is to only import a single module - a simple python file, if your gateway can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Gateway will restart while serving if YAML configuration source is changed.

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • ssl_certfile – the path to the certificate file

  • ssl_keyfile – the path to the key file

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • uvicorn_kwargs

    Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

    More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • env – The map of environment variables that are available inside runtime

  • inspect

    The strategy on those inspect deployments in the flow.

    If REMOVE is given then all inspect deployments are removed when building the flow.

  • log_config – The YAML config of the logger used in this object.

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, auto-reloading on file changes is enabled: the Flow will restart while blocked if YAML configuration source is changed. This also applies apply to underlying Executors, if their source code or YAML configuration has changed.

  • uses – The YAML path represents a flow. It can be either a local file path or a URL.

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

add(**kwargs)#

Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with set() or deleted with remove()

Parameters:
  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • connection_list – dictionary JSON with a list of connections to configure

  • disable_auto_volume – Do not automatically mount a volume for dockerized Executors.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • exit_on_exceptions – List of exceptions that will cause the Executor to shut down.

  • external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • force_update – If set, always pull the latest Hub Executor bundle even it exists on local

  • gpus

    This argument allows dockerized Jina Executors to discover local gpu devices.

    Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display

  • grpc_metadata – The metadata to be passed to the gRPC request.

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host of the Gateway, which the client should connect to, by default it is 0.0.0.0. In the case of an external Executor (–external or external=True) this can be a list of hosts. Then, every resulting address will be considered as one replica of the Executor.

  • install_requirements – If set, try to install requirements.txt from the local Executor if exists in the Executor folder. If using Hub, install requirements.txt in the Hub Executor bundle to local.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.

  • no_reduce – Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a docs_matrix or docs_map

  • output_array_type

    The type of array tensor and embedding will be serialized to.

    Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.

  • polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}

  • port – The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (–external or external=True) this can be a list of ports. Then, every resulting address will be considered as one replica of the Executor.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • py_modules

    The customized python modules need to be imported before loading the executor

    Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbook

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Executor will restart while serving if YAML configuration source or Executor modules are changed. If YAML configuration is changed, the whole deployment is reloaded and new processes will be restarted. If only Python modules of the Executor have changed, they will be reloaded to the interpreter without restarting process.

  • replicas – The number of replicas in the deployment

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/concepts/flow/create-flow/#complex-flow-topologies

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • tls – If set, connect to deployment using tls encryption

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_after_address – The address of the uses-before runtime

  • uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_before_address – The address of the uses-before runtime

  • uses_dynamic_batching – Dictionary of keyword arguments that will override the dynamic_batching configuration in uses

  • uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses

  • uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • volumes

    The path on the host to be mounted inside the container.

    Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.

  • when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.

  • deployment_role – the role of the Deployment, used for visualization and route planning

  • copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Deployment CLI supports

Returns:

a (new) Flow object with modification

Return type:

Union[Flow, AsyncFlow]

Returns:

a (new) Flow object with modification

property address_private: str#

Return the private IP address of the gateway for connecting from other machine in the same network

Return type:

str

property address_public: str#

Return the public IP address of the gateway for connecting from other machine in the public network

Return type:

str

block(stop_event=None)#

Block the Flow until stop_event is set or user hits KeyboardInterrupt

Parameters:

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

build(copy_flow=False, disable_build_sandbox=False)#

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters:
  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • disable_build_sandbox (bool) – when set to true, the sandbox building part will be skipped, will be used by plot

Return type:

Flow

Returns:

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
callback(**kwds)#

Registers an arbitrary callback and arguments.

Cannot suppress exceptions.

property client: BaseClient#

Return a BaseClient object attach to this Flow.

Return type:

BaseClient

property client_args: Namespace#

Get Client settings.

# noqa: DAR201

Return type:

Namespace

close()#

Immediately unwind the context stack.

config_gateway(args=None, **kwargs)#

Configure the Gateway inside a Flow. The Gateway exposes your Flow logic as a service to the internet according to the protocol and configuration you choose.

Parameters:
  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.

  • deployments_addresses – JSON dictionary with the input addresses of each Deployment

  • deployments_metadata – JSON dictionary with the request metadata for each Deployment

  • deployments_no_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed

  • description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.

  • expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • graph_description – Routing graph for the gateway

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • no_crud_endpoints

    If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.

    Any executor that has @requests(on=…) bound with those values will receive data requests.

  • no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.

  • port – The port for input data to bind the gateway server to, by default, random ports between range [49152, 65535] will be assigned. The port argument can be either 1 single value in case only 1 protocol is used or multiple values when many protocols are used.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • prefetch

    Number of requests fetched from the client before feeding into the first Executor.

    Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)

  • protocol – Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: [‘GRPC’, ‘HTTP’, ‘WEBSOCKET’].

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • py_modules

    The customized python modules need to be imported before loading the gateway

    Note that the recommended way is to only import a single module - a simple python file, if your gateway can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Gateway will restart while serving if YAML configuration source is changed.

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • ssl_certfile – the path to the certificate file

  • ssl_keyfile – the path to the key file

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • uvicorn_kwargs

    Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

    More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

Return type:

Union[Flow, AsyncFlow]

Returns:

the new Flow object

delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
dry_run(**kwargs)#
enter_context(cm)#

Enters the supplied context manager.

If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.

property env: Optional[Dict]#

Get all envs to be set in the Flow

Return type:

Optional[Dict]

Returns:

envs as dict

expose_endpoint(exec_endpoint, **kwargs)#

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters:

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

property gateway_args: Namespace#

Get Gateway settings.

# noqa: DAR201

Return type:

Namespace

gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)#

Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters:
  • name (str) – the name of the gather Deployment

  • include_last_deployment (bool) – if to include the last modified Deployment in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the modified Flow or the copy of it

See also

inspect()

property host: str#

Return the local address of the gateway .. # noqa: DAR201

Return type:

str

index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
inspect(name='inspect', *args, **kwargs)#

Add an inspection on the last changed Deployment in the Flow

Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
        |
        -- PUB-SUB -- InspectDeployment (Hanging)

In this way, InspectDeployment looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters:
  • name (str) – name of the Deployment

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the new instance of the Flow

async is_flow_ready(**kwargs)#

Check if the Flow is ready to receive requests

Parameters:

kwargs – potential kwargs received passed from the public interface

Return type:

bool

Returns:

boolean indicating the health/readiness of the Flow

static is_valid_jaml(obj)#

Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj: Dict :param obj: yaml object :rtype: bool :return: whether the syntax is valid or not

classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, uses_dynamic_batching=None, **kwargs)#

A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements JAMLCompatible mixin can enjoy this feature, e.g. BaseFlow, BaseExecutor, BaseGateway and all their subclasses.

Support substitutions in YAML:
  • Environment variables: ${{ ENV.VAR }} (recommended), $VAR (deprecated).

  • Context dict (context): ${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}.

  • Internal reference via this and root: ${{this.same_level_key}}, ${{root.root_level_key}}

Substitutions are carried in the order and multiple passes to resolve variables with best effort.

!BaseEncoder
metas:
    name: ${{VAR_A}}  # env or context variables
    workspace: my-${{this.name}}  # internal reference
# load Executor from yaml file
BaseExecutor.load_config('a.yml')

# load Executor from yaml file and substitute environment variables
os.environ['VAR_A'] = 'hello-world'
b = BaseExecutor.load_config('a.yml')
assert b.name == 'hello-world'

# load Executor from yaml file and substitute variables from a dict
b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'})
assert b.name == 'hello-world'

# disable substitute
b = BaseExecutor.load_config('a.yml', substitute=False)
Parameters:
  • source (Union[str, TextIO, Dict]) – the multi-kind source of the configs.

  • allow_py_modules (bool) – allow importing plugins specified by py_modules in YAML at any levels

  • substitute (bool) – substitute environment, internal reference and context variables.

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • extra_search_paths (Optional[List[str]]) – extra paths used when looking for executor yaml files

  • py_modules (Optional[str]) – Optional py_module from which the object need to be loaded

  • runtime_args (Optional[Dict[str, Any]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – kwargs for parse_config_source

Return type:

JAMLCompatible

Returns:

JAMLCompatible object

property monitoring: bool#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

bool

needs(needs, name='joiner', *args, **kwargs)#

Add a blocker to the Flow, wait until all pods defined in needs completed.

Parameters:
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type:

Flow

Returns:

the modified Flow

needs_all(name='joiner', *args, **kwargs)#

Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.

Parameters:
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type:

Flow

Returns:

the modified Flow

property num_deployments: int#

Get the number of Deployments in this Flow

Return type:

int

property num_pods: int#

Get the number of pods (shards count) in this Flow

Return type:

int

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)#

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='deployment_a').plot('flow.svg')
Parameters:
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type:

Flow

Returns:

the Flow

pop_all()#

Preserve the context stack by transferring it to a new instance.

property port: Optional[Union[List[int], int]]#

Return the exposed port of the gateway .. # noqa: DAR201

Return type:

Union[List[int], int, None]

property port_monitoring: Optional[int]#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

Optional[int]

async post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=0.1, backoff_multiplier=1.5, results_in_order=False, stream=True, **kwargs)#

Async Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • results_in_order (bool) – return the results in the same order as the inputs

  • stream (bool) – Applicable only to grpc client. If True, the requests are sent to the target using the gRPC streaming interface otherwise the gRPC unary interface will be used. The value is True by default.

  • kwargs – additional parameters, can be used to pass metadata or authentication information in the server call

Yield:

Response object

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

Return type:

AsyncGenerator[None, Union[DocumentArray, Response]]

async profiling(show_table=True)#

Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.

Parameters:

show_table (bool) – whether to show the table or not.

Return type:

Dict[str, float]

Returns:

the latency report in a dict.

property protocol: Union[GatewayProtocolType, List[GatewayProtocolType]]#

Return the protocol of this Flow

Return type:

Union[GatewayProtocolType, List[GatewayProtocolType]]

Returns:

the protocol of this Flow, if only 1 protocol is supported otherwise returns the list of protocols

push(exit)#

Registers a callback with the standard __exit__ method signature.

Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).

save_config(filename=None)#

Save the object’s config into a YAML file.

Parameters:

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
start()#

Start to run all Deployments in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.orchestrate.pods.Pod

Returns:

this instance

to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)#

Converts the Flow into a yaml file to run with docker-compose up :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name :type include_gateway: bool :param include_gateway: Defines if the gateway deployment should be included, defaults to True

to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
property workspace: str#

Return the workspace path of the flow.

Return type:

str

property workspace_id: Dict[str, str]#

Get all Deployments’ workspace_id values in a dict

Return type:

Dict[str, str]

jina.Client(args=None, **kwargs)[source]#

Convenience function that returns client instance for given protocol.

EXAMPLE USAGE

from jina import Client
from docarray import Document

# select protocol from 'grpc', 'http', or 'websocket'; default is 'grpc'
# select asyncio True of False; default is False
# select host address to connect to
c = Client(
    protocol='grpc', asyncio=False, host='grpc://my.awesome.flow:1234'
)  # returns GRPCClient instance
c.post(on='/index', inputs=Document(text='hello!'))
Parameters:
  • asyncio – If set, then the input and output of this Client work in an asynchronous manner.

  • host – The host of the Gateway, which the client should connect to, by default it is 0.0.0.0.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • port – The port of the Gateway, which the client should connect to.

  • protocol – Communication protocol between server and client.

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • tls – If set, connect to gateway using tls encryption

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

Return type:

Union[AsyncWebSocketClient, WebSocketClient, AsyncGRPCClient, GRPCClient, HTTPClient, AsyncHTTPClient]

Returns:

the new Client object

class jina.Document[source]#
class jina.Document(_obj: Optional[Document] = None, copy: bool = False)
class jina.Document(_obj: Optional[Any] = None)
class jina.Document(_obj: Optional[Dict], copy: bool = False, field_resolver: Optional[Dict[str, str]] = None, unknown_fields_handler: str = 'catch')
class jina.Document(blob: Optional[bytes] = None, **kwargs)
class jina.Document(tensor: Optional[ArrayType] = None, **kwargs)
class jina.Document(text: Optional[str] = None, **kwargs)
class jina.Document(uri: Optional[str] = None, **kwargs)
class jina.Document(parent_id: Optional[str] = None, granularity: Optional[int] = None, adjacency: Optional[int] = None, blob: Optional[bytes] = None, tensor: Optional[ArrayType] = None, mime_type: Optional[str] = None, text: Optional[str] = None, content: Optional[DocumentContentType] = None, weight: Optional[float] = None, uri: Optional[str] = None, tags: Optional[Dict[str, StructValueType]] = None, offset: Optional[float] = None, location: Optional[List[float]] = None, embedding: Optional[ArrayType] = None, modality: Optional[str] = None, evaluations: Optional[Dict[str, Dict[str, StructValueType]]] = None, scores: Optional[Dict[str, Dict[str, StructValueType]]] = None, chunks: Optional[Sequence[Document]] = None, matches: Optional[Sequence[Document]] = None)

Bases: AllMixins, BaseDCType

Document is the basic data type in DocArray. A Document is a container for any kind of data, be it text, image, audio, video, or 3D meshes.

You can initialize a Document object with given attributes:

from docarray import Document
import numpy

d1 = Document(text='hello')
d3 = Document(tensor=numpy.array([1, 2, 3]))
d4 = Document(
    uri='https://jina.ai',
    mime_type='text/plain',
    granularity=1,
    adjacency=3,
    tags={'foo': 'bar'},
)

Documents support a nested structure, which can also be specified during construction:

d = Document(
    id='d0',
    chunks=[Document(id='d1', chunks=Document(id='d2'))],
    matches=[Document(id='d3')],
)

A Document can embed its contents using the embed() method and a provided embedding model:

import torchvision

q = (
    Document(uri='/Users/usr/path/to/image.jpg')
    .load_uri_to_image_tensor()
    .set_image_tensor_normalization()
    .set_image_tensor_channel_axis(-1, 0)
)
model = torchvision.models.resnet50(pretrained=True)
q.embed(model)

Multiple Documents can be organized into a DocumentArray.

See also

For further details, see our user guide.

property adjacency: Optional[int]#
Return type:

Optional[int]

property blob: Optional[bytes]#
Return type:

Optional[bytes]

property chunks: Optional[ChunkArray]#
Return type:

Optional[ChunkArray]

clear()#

Clear all fields from this Document to their default values.

Return type:

None

property content: Optional[DocumentContentType]#
Return type:

Optional[DocumentContentType]

property content_hash: int#

Get the document hash according to its content.

Return type:

int

Returns:

the unique hash code to represent this Document

property content_type: Optional[str]#
Return type:

Optional[str]

convert_blob_to_datauri(charset='utf-8', base64=False)#

Convert blob to data uri in place. Internally it first reads into blob and then converts it to data URI.

Parameters:
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

Return type:

T

Returns:

itself after processed

convert_blob_to_image_tensor(width=None, height=None, channel_axis=-1)#

Convert an image blob to a ndarray tensor.

Parameters:
  • width (Optional[int]) – the width of the image tensor.

  • height (Optional[int]) – the height of the tensor.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

Return type:

T

Returns:

itself after processed

convert_blob_to_tensor(dtype=None, count=-1, offset=0)#

Assuming the blob is a _valid_ buffer of Numpy ndarray, set tensor accordingly.

Parameters:
  • dtype (Optional[str]) – Data-type of the returned array; default: float.

  • count (int) – Number of items to read. -1 means all data in the buffer.

  • offset (int) – Start reading the buffer from this offset (in bytes); default: 0.

Return type:

T

Returns:

itself after processed

convert_content_to_datauri()#

Convert content in uri inplace with best effort

Return type:

T

Returns:

itself after processed

convert_image_tensor_to_blob(channel_axis=-1, image_format='png')#

Assuming tensor is a _valid_ image, set blob accordingly

Parameters:
  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • image_format (str) – either png or jpeg

Return type:

T

Returns:

itself after processed

convert_image_tensor_to_sliding_windows(window_shape=(64, 64), strides=None, padding=False, channel_axis=-1, as_chunks=False)#

Convert tensor into a sliding window view with the given window shape tensor inplace.

Parameters:
  • window_shape (Tuple[int, int]) – desired output size. If size is a sequence like (h, w), the output size will be matched to this. If size is an int, the output will have the same height and width as the target_size.

  • strides (Optional[Tuple[int, int]]) – the strides between two neighboring sliding windows. strides is a sequence like (h, w), in which denote the strides on the vertical and the horizontal axis. When not given, using window_shape

  • padding (bool) – If False, only patches which are fully contained in the input image are included. If True, all patches whose starting point is inside the input are included, and areas outside the input default to zero. The padding argument has no effect on the size of each patch, it determines how many patches are extracted. Default is False.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis.

  • as_chunks (bool) – If set, each sliding window will be stored in the chunk of the current Document

Return type:

T

Returns:

Document itself after processed

convert_image_tensor_to_uri(channel_axis=-1, image_format='png')#

Assuming tensor is a _valid_ image, set uri accordingly

Parameters:
  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • image_format (str) – either png or jpeg

Return type:

T

Returns:

itself after processed

convert_tensor_to_blob()#

Convert tensor to blob inplace.

Return type:

T

Returns:

itself after processed

convert_tensor_to_text(vocab, delimiter=' ')#

Convert tensor to text inplace.

Parameters:
  • vocab (Union[Dict[str, int], Dict[int, str]]) – a dictionary that maps a word to an integer index, 0 is reserved for padding, 1 is reserved for unknown words in text

  • delimiter (str) – the delimiter that used to connect all words into text

Return type:

T

Returns:

Document itself after processed

convert_text_to_datauri(charset='utf-8', base64=False)#

Convert text to data uri.

Parameters:
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

Return type:

T

Returns:

itself after processed

convert_text_to_tensor(vocab, max_length=None, dtype='int64')#

Convert text to tensor inplace.

In the end tensor will be a 1D array where D is max_length.

To get the vocab of a DocumentArray, you can use jina.types.document.converters.build_vocab to

Parameters:
  • vocab (Dict[str, int]) – a dictionary that maps a word to an integer index, 0 is reserved for padding, 1 is reserved for unknown words in text. So you should not include these two entries in vocab.

  • max_length (Optional[int]) – the maximum length of the sequence. Sequence longer than this are cut off from beginning. Sequence shorter than this will be padded with 0 from right hand side.

  • dtype (str) – the dtype of the generated tensor

Return type:

T

Returns:

Document itself after processed

convert_uri_to_datauri(charset='utf-8', base64=False)#

Convert uri to dataURI and store it in uri inplace.

Parameters:
  • charset (str) – charset may be any character set registered with IANA

  • base64 (bool) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.

Return type:

T

Returns:

itself after processed

copy_from(other)#

Overwrite self by copying from another Document.

Parameters:

other (T) – the other Document to copy from

Return type:

None

display(from_=None)#

Plot image data from uri or from tensor if uri is empty .

Parameters:

from – an optional string to decide if a document should display using either the uri or the tensor field.

display_tensor()#

Plot image data from tensor

display_uri()#

Plot image data from uri

embed(*args, **kwargs)#
Return type:

T

embed_feature_hashing(n_dim=256, sparse=False, fields=('text', 'tags'), max_value=1000000)#

Convert an arbitrary set of attributes into a fixed-dimensional matrix using the hashing trick.

Parameters:
  • n_dim (int) – the dimensionality of each document in the output embedding. Small numbers of features are likely to cause hash collisions, but large numbers will cause larger overall parameter dimensions.

  • sparse (bool) – whether the resulting feature matrix should be a sparse csr_matrix or dense ndarray. Note that this feature requires scipy

  • fields (Tuple[str, ...]) – which attributes to be considered as for feature hashing.

Return type:

T

property embedding: Optional[ArrayType]#
Return type:

Optional[ArrayType]

property evaluations: Optional[Dict[str, NamedScore]]#
Return type:

Optional[Dict[str, NamedScore]]

classmethod from_base64(data, protocol='pickle', compress=None)#

Build Document object from binary bytes

Parameters:
  • data (str) – a base64 encoded string

  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress method to use

Return type:

T

Returns:

a Document object

classmethod from_bytes(data, protocol='pickle', compress=None)#

Build Document object from binary bytes

Parameters:
  • data (bytes) – binary bytes

  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress method to use

Return type:

T

Returns:

a Document object

classmethod from_dict(obj, protocol='jsonschema', **kwargs)#

Convert a dict object into a Document.

Parameters:
  • obj (Dict) – a Python dict object

  • protocol (str) – jsonschema or protobuf

  • kwargs – extra key-value args pass to pydantic and protobuf parser.

Return type:

T

Returns:

the parsed Document object

classmethod from_json(obj, protocol='jsonschema', **kwargs)#

Convert a JSON string into a Document.

Parameters:
  • obj (Union[str, bytes, bytearray]) – a valid JSON string

  • protocol (str) – jsonschema or protobuf

  • kwargs – extra key-value args pass to pydantic and protobuf parser.

Return type:

T

Returns:

the parsed Document object

classmethod from_protobuf(pb_msg)#
Return type:

T

classmethod from_pydantic_model(model)#

Build a Document object from a Pydantic model

Parameters:

model (BaseModel) – the pydantic data model object that represents a Document

Return type:

T

Returns:

a Document object

classmethod from_strawberry_type(model)#

Build a Document object from a Strawberry model

Parameters:

model – the Strawberry data model object that represents a Document

Return type:

T

Returns:

a Document object

classmethod generator_from_webcam(height_width=None, show_window=True, window_title='webcam', fps=30, exit_key=27, exit_event=None, tags=None)#

Create a generator that yields a Document object from the webcam.

This feature requires the opencv-python package.

Parameters:
  • height_width (Optional[Tuple[int, int]]) – the shape of the video frame, if not provided, the shape will be determined from the first frame. Note that this is restricted by the hardware of the camera.

  • show_window (bool) – if to show preview window of the webcam video

  • window_title (str) – the window title of the preview window

  • fps (int) – expected frames per second, note that this is not guaranteed, as the actual fps depends on the hardware limit

  • exit_key (int) – the key to press to exit the preview window

  • exit_event – the multiprocessing/threading/asyncio event that once set to exit the preview window

  • tags (Optional[Dict]) – the tags to attach to the document

Return type:

Generator[T, None, None]

Returns:

a generator that yields a Document object from a webcam

classmethod get_json_schema(indent=2)#

Return a JSON Schema of Document class.

Return type:

str

get_multi_modal_attribute(attribute)#
Return type:

DocumentArray

get_vocabulary(text_attrs=('text',))#

Get the text vocabulary in a counter dict that maps from the word to its frequency from all text_fields.

Parameters:

text_attrs (Tuple[str, ...]) – the textual attributes where vocabulary will be derived from

Return type:

Dict[str, int]

Returns:

a vocabulary in dictionary where key is the word, value is the frequency of that word in all text fields.

property granularity: Optional[int]#
Return type:

Optional[int]

property id: str#
Return type:

str

property is_multimodal: bool#

Return true if this Document can be represented by a class wrapped by docarray.dataclasses.types.dataclass().

Return type:

bool

load_pil_image_to_datauri(image)#

Convert a pillow image into a datauri with header data:image/png.

Parameters:

image (PILImage) – a pillow image

Returns:

itself after processed

load_uri_to_audio_tensor()#

Convert an audio uri into tensor inplace

Return type:

T

Returns:

Document itself after processed

load_uri_to_blob()#

Convert uri to blob inplace. Internally it downloads from the URI and set blob.

Return type:

T

Returns:

itself after processed

load_uri_to_image_tensor(width=None, height=None, channel_axis=-1)#

Convert the image-like uri into tensor

Parameters:
  • width (Optional[int]) – the width of the image tensor.

  • height (Optional[int]) – the height of the tensor.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

Return type:

T

Returns:

itself after processed

load_uri_to_point_cloud_tensor(samples, as_chunks=False)#

Convert a 3d mesh-like uri into tensor

Parameters:
  • samples (int) – number of points to sample from the mesh

  • as_chunks (bool) – when multiple geometry stored in one mesh file, then store each geometry into different chunks

Return type:

T

Returns:

itself after processed

load_uri_to_text(charset='utf-8')#

Convert uri to :attr`.text` inplace.

Parameters:

charset (str) – charset may be any character set registered with IANA

Return type:

T

Returns:

itself after processed

load_uri_to_video_tensor(only_keyframes=False)#

Convert a uri to a video ndarray tensor.

Parameters:

only_keyframes (bool) – only keep the keyframes in the video

Return type:

T

Returns:

Document itself after processed

property location: Optional[List[float]]#
Return type:

Optional[List[float]]

match(*args, **kwargs)#
Return type:

T

property matches: Optional[MatchArray]#
Return type:

Optional[MatchArray]

property mime_type: Optional[str]#
Return type:

Optional[str]

property modality: Optional[str]#
Return type:

Optional[str]

property nbytes: int#

Return total bytes consumed by protobuf.

Return type:

int

Returns:

number of bytes

property non_empty_fields: Tuple[str]#

Get all non-emtpy fields of this Document.

Non-empty fields are the fields with not-None and not-default values.

Return type:

Tuple[str]

Returns:

field names in a tuple.

property offset: Optional[float]#
Return type:

Optional[float]

property parent_id: Optional[str]#
Return type:

Optional[str]

plot_matches_sprites(top_k=10, channel_axis=-1, inv_normalize=False, skip_empty=False, canvas_size=1920, min_size=100, output=None)#

Generate a sprite image for the query and its matching images in this Document object.

An image sprite is a collection of images put into a single image. Query image is on the left followed by matching images. The Document object should contain matches.

Parameters:
  • top_k (int) – the number of top matching documents to show in the sprite.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • inv_normalize (bool) – If set to True, inverse the normalization of a float32 image tensor into a uint8 image tensor inplace.

  • skip_empty (bool) – skip matches which has no .uri or .tensor.

  • canvas_size (int) – the width of the canvas

  • min_size (int) – the minimum size of the image

  • output (Optional[str]) – Optional path to store the visualization. If not given, show in UI

pop(*fields)#

Clear some fields from this Document to their default values.

Parameters:

fields – field names to clear.

Return type:

None

post(*args, **kwargs)#
Return type:

T

save_audio_tensor_to_file(file, sample_rate=44100, sample_width=2)#

Save tensor into an wav file. Mono/stereo is preserved.

Parameters:
  • file (Union[str, BinaryIO]) – if file is a string, open the file by that name, otherwise treat it as a file-like object.

  • sample_rate (int) – sampling frequency

  • sample_width (int) – sample width in bytes

Return type:

T

Returns:

Document itself after processed

save_blob_to_file(file)#

Save blob into a file

Parameters:

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type:

T

Returns:

itself after processed

save_image_tensor_to_file(file, channel_axis=-1, image_format='png')#

Save tensor into a file

Parameters:
  • file (Union[str, BinaryIO]) – File or filename to which the data is saved.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • image_format (str) – either png or jpeg

Return type:

T

Returns:

itself after processed

save_uri_to_file(file)#

Save uri into a file

Parameters:

file (Union[str, BinaryIO]) – File or filename to which the data is saved.

Return type:

T

Returns:

itself after processed

save_video_tensor_to_file(file, frame_rate=30, codec='h264')#

Save tensor as a video mp4/h264 file.

Parameters:
  • file (Union[str, BinaryIO]) – The file to open, which can be either a string or a file-like object.

  • frame_rate (int) – frames per second

  • codec (str) – the name of a decoder/encoder

Return type:

T

Returns:

itself after processed

property scores: Optional[Dict[str, NamedScore]]#
Return type:

Optional[Dict[str, NamedScore]]

set_image_tensor_channel_axis(original_channel_axis, new_channel_axis)#

Move the channel axis of the image tensor inplace.

Parameters:
  • original_channel_axis (int) – the original axis of the channel

  • new_channel_axis (int) – the new axis of the channel

Return type:

T

Returns:

itself after processed

set_image_tensor_inv_normalization(channel_axis=-1, img_mean=(0.485, 0.456, 0.406), img_std=(0.229, 0.224, 0.225))#

Inverse the normalization of a float32 image tensor into a uint8 image tensor inplace.

Parameters:
  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • img_mean (Tuple[float]) – the mean of all images

  • img_std (Tuple[float]) – the standard deviation of all images

Return type:

T

Returns:

itself after processed

set_image_tensor_normalization(channel_axis=-1, img_mean=(0.485, 0.456, 0.406), img_std=(0.229, 0.224, 0.225))#

Normalize a uint8 image tensor into a float32 image tensor inplace.

Following Pytorch standard, the image must be in the shape of shape (3 x H x W) and will be normalized in to a range of [0, 1] and then normalized using mean = [0.485, 0.456, 0.406] and std = [0.229, 0.224, 0.225]. These two arrays are computed based on millions of images. If you want to train from scratch on your own dataset, you can calculate the new mean and std. Otherwise, using the Imagenet pretrianed model with its own mean and std is recommended.

Parameters:
  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • img_mean (Tuple[float]) – the mean of all images

  • img_std (Tuple[float]) – the standard deviation of all images

Return type:

T

Returns:

itself after processed

Warning

Please do NOT generalize this function to gray scale, black/white image, it does not make any sense for non RGB image. if you look at their MNIST examples, the mean and stddev are 1-dimensional (since the inputs are greyscale– no RGB channels).

set_image_tensor_resample(ratio, channel_axis=-1)#

Resample the image tensor into different size inplace.

Parameters:
  • ratio (float) – scale ratio of the resampled image tensor.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

Return type:

T

Returns:

itself after processed

set_image_tensor_shape(shape, channel_axis=-1)#

Resample the image tensor into different size inplace.

If your current image tensor has shape [H,W,C], then the new tensor will be [*shape, C]

Parameters:
  • shape (Tuple[int, int]) – the new shape of the image tensor.

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

Return type:

T

Returns:

itself after processed

set_multi_modal_attribute(attribute, value)#
summary()#

Print non-empty fields and nested structure of this Document object.

Return type:

None

property tags: Optional[Dict[str, StructValueType]]#
Return type:

Optional[Dict[str, StructValueType]]

property tensor: Optional[ArrayType]#
Return type:

Optional[ArrayType]

property text: Optional[str]#
Return type:

Optional[str]

to_base64(protocol='pickle', compress=None)#

Serialize a Document object into as base64 string

Parameters:
  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress method to use

Return type:

str

Returns:

a base64 encoded string

to_bytes(protocol='pickle', compress=None)#
Return type:

bytes

to_dict(protocol='jsonschema', **kwargs)#

Convert itself into a Python dict object.

Parameters:
  • protocol (str) – jsonschema or protobuf

  • kwargs – extra key-value args pass to pydantic and protobuf dumper.

Return type:

Dict[str, Any]

Returns:

the dumped Document as a dict object

to_json(protocol='jsonschema', **kwargs)#

Convert itself into a JSON string.

Parameters:
  • protocol (str) – jsonschema or protobuf

  • kwargs – extra key-value args pass to pydantic and protobuf dumper.

Return type:

str

Returns:

the dumped JSON string

to_protobuf(ndarray_type=None)#

Convert Document into a Protobuf message.

Parameters:

ndarray_type (Optional[str]) – can be list or numpy, if set it will force all ndarray-like object to be List or numpy.ndarray.

Return type:

DocumentProto

Returns:

the protobuf message

to_pydantic_model()#

Convert a Document object into a Pydantic model.

Return type:

PydanticDocument

to_strawberry_type()#

Convert a Document object into a Strawberry type.

Return type:

StrawberryDocument

property uri: Optional[str]#
Return type:

Optional[str]

property weight: Optional[float]#
Return type:

Optional[float]

class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, copy: bool = False, subindex_configs: Optional[Dict[str, None]] = None)[source]#
class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'sqlite', config: Optional[Union[SqliteConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'weaviate', config: Optional[Union[WeaviateConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'annlite', config: Optional[Union[AnnliteConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'elasticsearch', config: Optional[Union[ElasticConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'redis', config: Optional[Union[RedisConfig, Dict]] = None)

Bases: AllMixins, BaseDocumentArray

DocumentArray is a list-like container of Document objects.

A DocumentArray can be used to store, embed, and retrieve Document objects.

from docarray import Document, DocumentArray

da = DocumentArray(
    [Document(text='The cake is a lie'), Document(text='Do a barrel roll!')]
)
da.apply(Document.embed_feature_hashing)

query = Document(text='Can i have some cake?').embed_feature_hashing()
query.match(da, metric='jaccard', use_scipy=True)

print(query.matches[:, ('text', 'scores__jaccard__value')])
[['The cake is a lie', 'Do a barrel roll!'], [0.9, 1.0]]

A DocumentArray can also embed its contents using a neural network, process them using an external Flow or Executor, and persist Documents in a Document Store for fast vector search:

from docarray import Document, DocumentArray
import numpy as np

n_dim = 3
metric = 'Euclidean'

# initialize a DocumentArray with ANNLiter Document Store
da = DocumentArray(
    storage='annlite',
    config={'n_dim': n_dim, 'columns': [('price', 'float')], 'metric': metric},
)
# add Documents to the DocumentArray
with da:
    da.extend(
        [
            Document(id=f'r{i}', embedding=i * np.ones(n_dim), tags={'price': i})
            for i in range(10)
        ]
    )
# perform vector search
np_query = np.ones(n_dim) * 8
results = da.find(np_query)

See also

For further details, see our user guide.

append(value)#

S.append(value) – append value to the end of the sequence

apply(*args, **kwargs)#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

apply_batch(*args, **kwargs)#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

batch(batch_size, shuffle=False)#

Creates a Generator that yields DocumentArray of size batch_size until docs is fully traversed along the traversal_path. The None docs are filtered out and optionally the docs can be filtered by checking for the existence of a Document attribute. Note, that the last batch might be smaller than batch_size.

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller, default: 32)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

Yield:

a Generator of DocumentArray, each in the length of batch_size

Return type:

Generator[DocumentArray, None, None]

batch_ids(batch_size, shuffle=False)#

Creates a Generator that yields lists of ids of size batch_size until self is fully traversed. Note, that the last batch might be smaller than batch_size.

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

Yield:

a Generator of list of IDs, each in the length of batch_size

Return type:

Generator[List[str], None, None]

property blobs: Optional[List[bytes]]#

Get the blob attribute of all Documents.

Return type:

Optional[List[bytes]]

Returns:

a list of blobs

clear() None -- remove all items from S#
property contents: Optional[Union[Sequence[DocumentContentType], ArrayType]]#

Get the content of all Documents.

Return type:

Union[Sequence[DocumentContentType], ArrayType, None]

Returns:

a list of texts, blobs or ArrayType

count(value) integer -- return number of occurrences of value#
classmethod dataloader(path, func, batch_size, protocol='protobuf', compress=None, backend='thread', num_worker=None, pool=None, show_progress=False)#

Load array elements, batches and maps them with a function in parallel, finally yield the batch in DocumentArray

Parameters:
  • path (Union[str, Path]) – Path or filename where the data is stored.

  • func (Callable[[DocumentArray], T]) – a function that takes DocumentArray as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller)

  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress algorithm to use

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

  • show_progress (bool) – if set, show a progressbar

Return type:

Generator[DocumentArray, None, None]

Returns:

embed(embed_model, device='cpu', batch_size=256, to_numpy=False, collate_fn=None)#

Fill embedding of Documents inplace by using embed_model

Parameters:
  • embed_model (AnyDNN) – the embedding model written in Keras/Pytorch/Paddle

  • device (str) – the computational device for embed_model, can be either cpu or cuda.

  • batch_size (int) – number of Documents in a batch for embedding

  • to_numpy (bool) – if to store embeddings back to Document in numpy.ndarray or original framework format.

  • collate_fn (Optional[CollateFnType]) – create a mini-batch of Input(s) from the given DocumentArray. Default built-in collate_fn is to use the tensors of the documents.

Return type:

T

Returns:

itself after modified.

property embeddings: Optional[ArrayType]#

Return a ArrayType stacking all the embedding attributes as rows.

Return type:

Optional[ArrayType]

Returns:

a ArrayType of embedding

classmethod empty(size=0, *args, **kwargs)#

Create a DocumentArray object with size empty Document objects.

Parameters:

size (int) – the number of empty Documents in this container

Return type:

T

Returns:

a DocumentArray object

evaluate(other, metric, hash_fn=None, metric_name=None, strict=True, **kwargs)#

Compute ranking evaluation metrics for a given DocumentArray when compared with a groundtruth.

This implementation expects to provide a groundtruth DocumentArray that is structurally identical to self. It is based on comparing the matches of documents inside the `DocumentArray.

This method will fill the evaluations field of Documents inside this DocumentArray and will return the average of the computations

Parameters:
  • other (DocumentArray) – The groundtruth DocumentArray` that the DocumentArray compares to.

  • metric (Union[str, Callable[..., float]]) – The name of the metric, or multiple metrics to be computed

  • hash_fn (Optional[Callable[[Document], str]]) – The function used for identifying the uniqueness of Documents. If not given, then Document.id is used.

  • metric_name (Optional[str]) – If provided, the results of the metrics computation will be stored in the evaluations field of each Document. If not provided, the name will be computed based on the metrics name.

  • strict (bool) – If set, then left and right sides are required to be fully aligned: on the length, and on the semantic of length. These are preventing you to evaluate on irrelevant matches accidentally.

  • kwargs – Additional keyword arguments to be passed to metric_fn

Return type:

Optional[float]

Returns:

The average evaluation computed or a list of them if multiple metrics are required

extend(values)#

S.extend(iterable) – extend sequence by appending elements from the iterable

find(query=None, metric='cosine', limit=20, metric_name=None, exclude_self=False, filter=None, only_id=False, index='text', on=None, **kwargs)#

Returns matching Documents given an input query. If the query is a DocumentArray, Document or ArrayType, exhaustive or approximate nearest neighbor search will be performed depending on whether the storage backend supports ANN. Furthermore, if filter is not None, pre-filtering will be applied along with vector search. If the query is a dict object or, query is None and filter is not None, Documents will be filtered and all matching Documents that match the filter will be returned. In this case, query (if it’s dict) or filter will be used for filtering. The object must follow the backend-specific filter format if the backend supports filtering or DocArray’s query language format. In the latter case, filtering will be applied in the client side not the backend side. If the query is a string or list of strings, a search by text will be performed if the backend supports indexing and searching text fields. If not, a NotImplementedError will be raised.

Parameters:
  • query (Union[DocumentArray, Document, ArrayType, Dict, str, List[str], None]) – the input query to search by

  • limit (Union[int, float, None]) – the maximum number of matches, when not given defaults to 20.

  • metric_name (Optional[str]) – if provided, then match result will be marked with this string.

  • metric (Union[str, Callable[[ArrayType, ArrayType], ndarray]]) – the distance metric.

  • exclude_self (bool) – if set, Documents in results with same id as the query values will not be considered as matches. This is only applied when the input query is Document or DocumentArray.

  • filter (Optional[Dict]) – filter query used for pre-filtering or filtering

  • only_id (bool) – if set, then returning matches will only contain id

  • index (str) – if the query is a string, text search will be performed on the index field, otherwise, this parameter is ignored. By default, the Document text attribute will be used for search, otherwise the tag field specified by index will be used. You can only use this parameter if the storage backend supports searching by text.

  • on (Optional[str]) – specifies a subindex to search on. If set, the returned DocumentArray will be retrieved from the given subindex.

  • kwargs – other kwargs.

Return type:

Union[DocumentArray, List[DocumentArray]]

Returns:

a list of DocumentArrays containing the closest Document objects for each of the queries in query.

flatten()#

Flatten all nested chunks and matches into one DocumentArray.

Note

Flatten an already flattened DocumentArray will have no effect.

Return type:

DocumentArray

Returns:

a flattened DocumentArray object.

classmethod from_base64(data, protocol='pickle-array', compress=None, _show_progress=False, *args, **kwargs)#
Return type:

T

classmethod from_bytes(data, protocol='pickle-array', compress=None, _show_progress=False, *args, **kwargs)#
Return type:

T

classmethod from_csv(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_dataframe(df, *args, **kwargs)#

Import a DocumentArray from a pandas.DataFrame object.

Parameters:

df (DataFrame) – a pandas.DataFrame object.

Return type:

T

Returns:

a DocumentArray object

classmethod from_dict(values, protocol='jsonschema', **kwargs)#
Return type:

T

classmethod from_files(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_huggingface_datasets(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_json(file, protocol='jsonschema', **kwargs)#
Return type:

T

classmethod from_lines(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_list(values, protocol='jsonschema', **kwargs)#
Return type:

T

classmethod from_ndarray(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_ndjson(*args, **kwargs)#

# noqa: DAR101 # noqa: DAR102 # noqa: DAR201

Return type:

T

classmethod from_protobuf(pb_msg)#
Return type:

T

classmethod from_pydantic_model(model)#

Convert a list of PydanticDocument into DocumentArray

Parameters:

model (List[BaseModel]) – the list of pydantic data model objects that represents a DocumentArray

Return type:

T

Returns:

a DocumentArray

classmethod from_strawberry_type(model)#

Convert a list of Strawberry into DocumentArray

Parameters:

model (List[StrawberryDocument]) – the list of strawberry type objects that represents a DocumentArray

Return type:

T

Returns:

a DocumentArray

classmethod get_json_schema(indent=2)#

Return a JSON Schema of DocumentArray class.

Return type:

str

get_vocabulary(min_freq=1, text_attrs=('text',))#

Get the text vocabulary in a dict that maps from the word to the index from all Documents.

Parameters:
  • text_attrs (Tuple[str, ...]) – the textual attributes where vocabulary will be derived from

  • min_freq (int) – the minimum word frequency to be considered into the vocabulary.

Return type:

Dict[str, int]

Returns:

a vocabulary in dictionary where key is the word, value is the index. The value is 2-index, where 0 is reserved for padding, 1 is reserved for unknown token.

index(value[, start[, stop]]) integer -- return first index of value.#

Raises ValueError if the value is not present.

Supporting start and stop arguments is optional, but recommended.

abstract insert(index, value)#

S.insert(index, value) – insert value before index

classmethod load(file, file_format='binary', encoding='utf-8', **kwargs)#

Load array elements from a JSON or a binary file, or a CSV file.

Parameters:
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.

  • encoding (str) – encoding used to load data from a file (it only applies to JSON and CSV format). By default, utf-8 is used.

Return type:

T

Returns:

the loaded DocumentArray object

classmethod load_binary(file, protocol='pickle-array', compress=None, _show_progress=False, streaming=False, *args, **kwargs)#

Load array elements from a compressed binary file.

Parameters:
  • file (Union[str, BinaryIO, bytes, Path]) – File or filename or serialized bytes where the data is stored.

  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress algorithm to use

  • _show_progress (bool) – show progress bar, only works when protocol is pickle or protobuf

  • streaming (bool) – if True returns a generator over Document objects.

In case protocol is pickle the Documents are streamed from disk to save memory usage :rtype: Union[DocumentArray, Generator[Document, None, None]] :return: a DocumentArray object

Note

If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be loaded assuming protocol=protobuf and compress=lz4.

classmethod load_csv(file, field_resolver=None, encoding='utf-8')#

Load array elements from a binary file.

Parameters:
  • file (Union[str, TextIO]) – File or filename to which the data is saved.

  • field_resolver (Optional[Dict[str, str]]) – a map from field names defined in JSON, dict to the field names defined in Document.

  • encoding (str) – encoding used to read a CSV file. By default, utf-8 is used.

Return type:

T

Returns:

a DocumentArray object

classmethod load_json(file, protocol='jsonschema', encoding='utf-8', **kwargs)#

Load array elements from a JSON file.

Parameters:
  • file (Union[str, TextIO]) – File or filename or a JSON string to which the data is saved.

  • protocol (str) – jsonschema or protobuf

  • encoding (str) – encoding used to load data from a JSON file. By default, utf-8 is used.

Return type:

T

Returns:

a DocumentArrayLike object

map(func, backend='thread', num_worker=None, show_progress=False, pool=None)#

Return an iterator that applies function to every element of iterable in parallel, yielding the results.

See also

Parameters:
  • func (Callable[[Document], T]) – a function that takes Document as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]

map_batch(func, batch_size, backend='thread', num_worker=None, shuffle=False, show_progress=False, pool=None)#

Return an iterator that applies function to every minibatch of iterable in parallel, yielding the results. Each element in the returned iterator is DocumentArray.

See also

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller, default: 32)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

  • func (Callable[[DocumentArray], T]) – a function that takes DocumentArray as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]

match(darray, metric='cosine', limit=20, normalization=None, metric_name=None, batch_size=None, exclude_self=False, filter=None, only_id=False, use_scipy=False, device='cpu', num_worker=1, on=None, **kwargs)#

Compute embedding based nearest neighbour in another for each Document in self, and store results in matches. .. note:

'cosine', 'euclidean', 'sqeuclidean' are supported natively without extra dependency.
You can use other distance metric provided by ``scipy``, such as `braycurtis`, `canberra`, `chebyshev`,
`cityblock`, `correlation`, `cosine`, `dice`, `euclidean`, `hamming`, `jaccard`, `jensenshannon`,
`kulsinski`, `mahalanobis`, `matching`, `minkowski`, `rogerstanimoto`, `russellrao`, `seuclidean`,
`sokalmichener`, `sokalsneath`, `sqeuclidean`, `wminkowski`, `yule`.
To use scipy metric, please set ``use_scipy=True``.
  • To make all matches values in [0, 1], use dA.match(dB, normalization=(0, 1))

  • To invert the distance as score and make all values in range [0, 1],

    use dA.match(dB, normalization=(1, 0)). Note, how normalization differs from the previous.

  • If a custom metric distance is provided. Make sure that it returns scores as distances and not similarity, meaning the smaller the better.

Parameters:
  • darray (DocumentArray) – the other DocumentArray to match against

  • metric (Union[str, Callable[[ArrayType, ArrayType], ndarray]]) – the distance metric

  • limit (Union[int, float, None]) – the maximum number of matches, when not given defaults to 20.

  • normalization (Optional[Tuple[float, float]]) – a tuple [a, b] to be used with min-max normalization, the min distance will be rescaled to a, the max distance will be rescaled to b all values will be rescaled into range [a, b].

  • metric_name (Optional[str]) – if provided, then match result will be marked with this string.

  • batch_size (Optional[int]) – if provided, then darray is loaded in batches, where each of them is at most batch_size elements. When darray is big, this can significantly speedup the computation.

  • exclude_self (bool) – if set, Documents in darray with same id as the left-hand values will not be considered as matches.

  • filter (Optional[Dict]) – filter query used for pre-filtering

  • only_id (bool) – if set, then returning matches will only contain id

  • use_scipy (bool) – if set, use scipy as the computation backend. Note, scipy does not support distance on sparse matrix.

  • device (str) – the computational device for .match(), can be either cpu or cuda.

  • num_worker (Optional[int]) –

    the number of parallel workers. If not given, then the number of CPUs in the system will be used.

    Note

    This argument is only effective when batch_size is set.

  • on (Optional[str]) – specifies a subindex to search on. If set, the returned DocumentArray will be retrieved from the given subindex.

  • kwargs – other kwargs.

Return type:

None

plot_embeddings(title='MyDocumentArray', path=None, image_sprites=False, min_image_size=16, channel_axis=-1, start_server=True, host='127.0.0.1', port=None, image_source='tensor')#

Interactively visualize embeddings using the Embedding Projector.

Parameters:
  • title (str) – the title of this visualization. If you want to compare multiple embeddings at the same time, make sure to give different names each time and set path to the same value.

  • host (str) – if set, bind the embedding-projector frontend to given host. Otherwise localhost is used.

  • port (Optional[int]) – if set, run the embedding-projector frontend at given port. Otherwise a random port is used.

  • image_sprites (bool) – if set, visualize the dots using uri and tensor.

  • path (Optional[str]) – if set, then append the visualization to an existing folder, where you can compare multiple embeddings at the same time. Make sure to use a different title each time .

  • min_image_size (int) – only used when image_sprites=True. the minimum size of the image

  • channel_axis (int) – only used when image_sprites=True. the axis id of the color channel, -1 indicates the color channel info at the last axis

  • start_server (bool) – if set, start a HTTP server and open the frontend directly. Otherwise, you need to rely on return path and serve by yourself.

  • image_source (str) – specify where the image comes from, can be uri or tensor. empty tensor will fallback to uri

Return type:

str

Returns:

the path to the embeddings visualization info.

plot_image_sprites(output=None, canvas_size=512, min_size=16, channel_axis=-1, image_source='tensor', skip_empty=False, show_progress=False, show_index=False, fig_size=(10, 10), keep_aspect_ratio=False)#

Generate a sprite image for all image tensors in this DocumentArray-like object.

An image sprite is a collection of images put into a single image. It is always square-sized. Each sub-image is also square-sized and equally-sized.

Parameters:
  • output (Optional[str]) – Optional path to store the visualization. If not given, show in UI

  • canvas_size (int) – the size of the canvas

  • min_size (int) – the minimum size of the image

  • channel_axis (int) – the axis id of the color channel, -1 indicates the color channel info at the last axis

  • image_source (str) – specify where the image comes from, can be uri or tensor. empty tensor will fallback to uri

  • skip_empty (bool) – skip Document who has no .uri or .tensor.

  • show_index (bool) – show the index on the top-right corner of every image

  • fig_size (Optional[Tuple[int, int]]) – the size of the figure

  • show_progress (bool) – show a progressbar while plotting.

  • keep_aspect_ratio (bool) – preserve the aspect ratio of the image by using the aspect ratio of the first image in self.

Return type:

None

pop([index]) item -- remove and return item at index (default last).#

Raise IndexError if list is empty or index is out of range.

post(host, show_progress=False, batch_size=None, parameters=None, **kwargs)#

Posting itself to a remote Flow/Sandbox and get the modified DocumentArray back

Parameters:
  • host (str) – a host string. Can be one of the following: - grpc://192.168.0.123:8080/endpoint - ws://192.168.0.123:8080/endpoint - http://192.168.0.123:8080/endpoint - jinahub://Hello/endpoint - jinahub+docker://Hello/endpoint - jinahub+docker://Hello/v0.0.1/endpoint - jinahub+docker://Hello/latest/endpoint - jinahub+sandbox://Hello/endpoint

  • show_progress (bool) – if to show a progressbar

  • batch_size (Optional[int]) – number of Document on each request

  • parameters (Optional[Dict]) – parameters to send in the request

Return type:

DocumentArray

Returns:

the new DocumentArray returned from remote

classmethod pull(name, show_progress=False, local_cache=True, *args, **kwargs)#

Pulling a DocumentArray from Jina Cloud Service to local.

Parameters:
  • name (str) – the upload name set during push()

  • show_progress (bool) – if to show a progress bar on pulling

  • local_cache (bool) – store the downloaded DocumentArray to local folder

Return type:

T

Returns:

a DocumentArray object

push(name, show_progress=False, public=True)#

Push this DocumentArray object to Jina Cloud which can be later retrieved via push()

Note

  • Push with the same name will override the existing content.

  • Kinda like a public clipboard where everyone can override anyone’s content. So to make your content survive longer, you may want to use longer & more complicated name.

  • The lifetime of the content is not promised atm, could be a day, could be a week. Do not use it for persistence. Only use this full temporary transmission/storage/clipboard.

Parameters:
  • name (str) – a name that later can be used for retrieve this DocumentArray.

  • show_progress (bool) – if to show a progress bar on pulling

  • public (bool) – by default anyone can pull a DocumentArray if they know its name. Setting this to False will allow only the creator to pull it. This feature of course you to login first.

Return type:

Dict

reduce(other)#

Reduces other and the current DocumentArray into one DocumentArray in-place. Changes are applied to the current DocumentArray. Reducing 2 DocumentArrays consists in adding Documents in the second DocumentArray to the first DocumentArray if they do not exist. If a Document exists in both DocumentArrays, the data properties are merged with priority to the first Document (that is, to the current DocumentArray’s Document). The matches and chunks are also reduced in the same way. :type other: T :param other: DocumentArray :rtype: T :return: DocumentArray

reduce_all(others)#

Reduces a list of DocumentArrays and this DocumentArray into one DocumentArray. Changes are applied to this DocumentArray in-place.

Reduction consists in reducing this DocumentArray with every DocumentArray in others sequentially using DocumentArray.:method:reduce. The resulting DocumentArray contains Documents of all DocumentArrays. If a Document exists in many DocumentArrays, data properties are merged with priority to the left-most DocumentArrays (that is, if a data attribute is set in a Document belonging to many DocumentArrays, the attribute value of the left-most DocumentArray is kept). Matches and chunks of a Document belonging to many DocumentArrays are also reduced in the same way. Other non-data properties are ignored.

Note

  • Matches are not kept in a sorted order when they are reduced. You might want to re-sort them in a later

    step.

  • The final result depends on the order of DocumentArrays when applying reduction.

Parameters:

others (List[T]) – List of DocumentArrays to be reduced

Return type:

T

Returns:

the resulting DocumentArray

remove(value)#

S.remove(value) – remove first occurrence of value. Raise ValueError if the value is not present.

reverse()#

S.reverse() – reverse IN PLACE

sample(k, seed=None)#

random sample k elements from DocumentArray without replacement.

Parameters:
  • k (int) – Number of elements to sample from the document array.

  • seed (Optional[int]) – initialize the random number generator, by default is None. If set will save the state of the random function to produce certain outputs.

Return type:

DocumentArray

Returns:

A sampled list of Document represented as DocumentArray.

save(file, file_format='binary', encoding='utf-8')#

Save array elements into a JSON, a binary file or a CSV file.

Parameters:
  • file (Union[str, TextIO, BinaryIO]) – File or filename to which the data is saved.

  • file_format (str) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. Note that, CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.

  • encoding (str) – encoding used to save data into a file (it only applies to JSON and CSV format). By default, utf-8 is used.

Return type:

None

save_binary(file, protocol='pickle-array', compress=None)#

Save array elements into a binary file.

Parameters:
  • file (Union[str, BinaryIO]) – File or filename to which the data is saved.

  • protocol (str) – protocol to use

  • compress (Optional[str]) –

    compress algorithm to use

    Note

    If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be created using protocol=protobuf and compress=lz4.

Comparing to save_json(), it is faster and the file is smaller, but not human-readable.

Note

To get a binary presentation in memory, use bytes(...).

Return type:

None

save_csv(file, flatten_tags=True, exclude_fields=None, dialect='excel', with_header=True, encoding='utf-8')#

Save array elements into a CSV file.

Parameters:
  • file (Union[str, TextIO]) – File or filename to which the data is saved.

  • flatten_tags (bool) – if set, then all fields in Document.tags will be flattened into tag__fieldname and stored as separated columns. It is useful when tags contain a lot of information.

  • exclude_fields (Optional[Sequence[str]]) – if set, those fields wont show up in the output CSV

  • dialect (Union[str, Dialect]) – define a set of parameters specific to a particular CSV dialect. could be a string that represents predefined dialects in your system, or could be a csv.Dialect class that groups specific formatting parameters together.

  • encoding (str) – encoding used to save the data into a CSV file. By default, utf-8 is used.

Return type:

None

save_embeddings_csv(file, encoding='utf-8', **kwargs)#

Save embeddings to a CSV file

This function utilizes numpy.savetxt() internal.

Parameters:
  • file (Union[str, TextIO]) – File or filename to which the data is saved.

  • encoding (str) – encoding used to save the data into a file. By default, utf-8 is used.

  • kwargs – extra kwargs will be passed to numpy.savetxt().

Return type:

None

save_gif(output, channel_axis=-1, duration=200, size_ratio=1.0, inline_display=False, image_source='tensor', skip_empty=False, show_index=False, show_progress=False)#

Save a gif of the DocumentArray. Each frame corresponds to a Document.uri/.tensor in the DocumentArray.

Parameters:
  • output (str) – the file path to save the gif to.

  • channel_axis (int) – the color channel axis of the tensor.

  • duration (int) – the duration of each frame in milliseconds.

  • size_ratio (float) – the size ratio of each frame.

  • inline_display (bool) – if to show the gif in Jupyter notebook.

  • image_source (str) – the source of the image in Document atribute.

  • skip_empty (bool) – if to skip empty documents.

  • show_index (bool) – if to show the index of the document in the top-right corner.

  • show_progress (bool) – if to show a progress bar.

Return type:

None

Returns:

save_json(file, protocol='jsonschema', encoding='utf-8', **kwargs)#

Save array elements into a JSON file.

Comparing to save_binary(), it is human-readable but slower to save/load and the file size larger.

Parameters:
  • file (Union[str, TextIO]) – File or filename to which the data is saved.

  • protocol (str) – jsonschema or protobuf

  • encoding (str) – encoding used to save data into a JSON file. By default, utf-8 is used.

Return type:

None

shuffle(seed=None)#

Randomly shuffle documents within the DocumentArray.

Parameters:

seed (Optional[int]) – initialize the random number generator, by default is None. If set will save the state of the random function to produce certain outputs.

Return type:

DocumentArray

Returns:

The shuffled list of Document represented as DocumentArray.

split_by_tag(tag)#

Split the DocumentArray into multiple DocumentArray according to the tag value of each Document.

Parameters:

tag (str) – the tag name to split stored in tags.

Return type:

Dict[Any, DocumentArray]

Returns:

a dict where Documents with the same value on tag are grouped together, their orders are preserved from the original DocumentArray.

Note

If the tags of Document do not contains the specified tag, return an empty dict.

summary()#

Print the structure and attribute summary of this DocumentArray object.

Warning

Calling {meth}`.summary` on large DocumentArray can be slow.

property tensors: Optional[ArrayType]#

Return a ArrayType stacking all tensor.

The tensor attributes are stacked together along a newly created first dimension (as if you would stack using np.stack(X, axis=0)).

Warning

This operation assumes all tensors have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray

Return type:

Optional[ArrayType]

Returns:

a ArrayType of tensors

property texts: Optional[List[str]]#

Get text of all Documents

Return type:

Optional[List[str]]

Returns:

a list of texts

to_base64(protocol='pickle-array', compress=None, _show_progress=False)#
Return type:

str

to_bytes(protocol='pickle-array', compress=None, _file_ctx=None, _show_progress=False)#

Serialize itself into bytes.

For more Pythonic code, please use bytes(...).

Parameters:
  • _file_ctx (Optional[BinaryIO]) – File or filename or serialized bytes where the data is stored.

  • protocol (str) – protocol to use

  • compress (Optional[str]) – compress algorithm to use

  • _show_progress (bool) – show progress bar, only works when protocol is pickle or protobuf

Return type:

bytes

Returns:

the binary serialization in bytes

to_dataframe(**kwargs)#

Export itself to a pandas.DataFrame object.

Parameters:

kwargs – the extra kwargs will be passed to pandas.DataFrame.from_dict().

Return type:

DataFrame

Returns:

a pandas.DataFrame object

to_dict(protocol='jsonschema', **kwargs)#

Convert the object into a Python list.

Parameters:

protocol (str) – jsonschema or protobuf

Return type:

List

Returns:

a Python list

to_json(protocol='jsonschema', **kwargs)#

Convert the object into a JSON string. Can be loaded via load_json().

Parameters:

protocol (str) – jsonschema or protobuf

Return type:

str

Returns:

a Python list

to_list(protocol='jsonschema', **kwargs)#

Convert the object into a Python list.

Parameters:

protocol (str) – jsonschema or protobuf

Return type:

List

Returns:

a Python list

to_protobuf(ndarray_type=None)#

Convert DocumentArray into a Protobuf message.

Parameters:

ndarray_type (Optional[str]) – can be list or numpy, if set it will force all ndarray-like object from all Documents to List or numpy.ndarray.

Return type:

DocumentArrayProto

Returns:

the protobuf message

to_pydantic_model()#

Convert a DocumentArray object into a Pydantic model.

Return type:

List[PydanticDocument]

to_strawberry_type()#

Convert a DocumentArray object into a Pydantic model.

Return type:

List[StrawberryDocument]

traverse(traversal_paths, filter_fn=None)#

Return an Iterator of :class:TraversableSequence of the leaves when applying the traversal_paths. Each :class:TraversableSequence is either the root Documents, a ChunkArray or a MatchArray.

Parameters:
  • traversal_paths (str) – a comma-separated string that represents the traversal path

  • filter_fn (Optional[Callable[[Document], bool]]) – function to filter docs during traversal

Yield:

:class:TraversableSequence of the leaves when applying the traversal_paths.

Example on traversal_paths:

  • r: docs in this TraversableSequence

  • m: all match-documents at adjacency 1

  • c: all child-documents at granularity 1

  • r.[attribute]: access attribute of a multi modal document

  • cc: all child-documents at granularity 2

  • mm: all match-documents at adjacency 2

  • cm: all match-document at adjacency 1 and granularity 1

  • r,c: docs in this TraversableSequence and all child-documents at granularity 1

  • r[start:end]: access sub document array using slice

Return type:

Iterable[T]

traverse_flat(traversal_paths, filter_fn=None)#

Returns a single flattened :class:TraversableSequence with all Documents, that are reached via the traversal_paths.

Warning

When defining the traversal_paths with multiple paths, the returned :class:Documents are determined at once and not on the fly. This is a different behavior then in :method:traverse and :method:traverse_flattened_per_path!

Parameters:
  • traversal_paths (str) – a list of string that represents the traversal path

  • filter_fn (Optional[Callable[[Document], bool]]) – function to filter docs during traversal

Return type:

DocumentArray

Returns:

a single :class:TraversableSequence containing the document of all leaves when applying the traversal_paths.

traverse_flat_per_path(traversal_paths, filter_fn=None)#

Returns a flattened :class:TraversableSequence per path in traversal_paths with all Documents, that are reached by the path.

Parameters:
  • traversal_paths (str) – a comma-separated string that represents the traversal path

  • filter_fn (Optional[Callable[[Document], bool]]) – function to filter docs during traversal

Yield:

:class:TraversableSequence containing the document of all leaves per path.

jina.Executor#

alias of BaseExecutor

class jina.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, port: Optional[int] = None, protocol: Optional[Union[str, List[str]]] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, **kwargs)[source]#
class jina.Flow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_metadata: Optional[str] = '{}', deployments_no_reduce: Optional[str] = '[]', description: Optional[str] = None, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, port: Optional[int] = None, port_monitoring: Optional[int] = None, prefetch: Optional[int] = 1000, protocol: Optional[Union[str, List[str]]] = ['GRPC'], proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GatewayRuntime', ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type[BaseExecutor], dict]] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
class jina.Flow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)

Bases: PostMixin, ProfileMixin, HealthCheckMixin, JAMLCompatible, ExitStack

Flow is how Jina streamlines and distributes Executors.

Create a Flow. Flow is how Jina streamlines and scales Executors.

EXAMPLE USAGE

Python API

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
with f:
    f.bock()  # serve Flow

To and from YAML configuration

from jina import Flow

f = Flow().add(uses='jinahub+docker://SimpleIndexer')  # create Flow and add Executor
f.save_config('flow.yml')  # save YAML config file
f = Flow.load_config('flow.yml')  # load Flow from YAML config
with f:
    f.bock()  # serve Flow
Parameters:
  • asyncio – If set, then the input and output of this Client work in an asynchronous manner.

  • host – The host of the Gateway, which the client should connect to, by default it is 0.0.0.0.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • port – The port of the Gateway, which the client should connect to.

  • protocol – Communication protocol between server and client.

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • tls – If set, connect to gateway using tls encryption

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.

  • deployments_addresses – JSON dictionary with the input addresses of each Deployment

  • deployments_metadata – JSON dictionary with the request metadata for each Deployment

  • deployments_no_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed

  • description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.

  • expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • graph_description – Routing graph for the gateway

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • no_crud_endpoints

    If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.

    Any executor that has @requests(on=…) bound with those values will receive data requests.

  • no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.

  • port – The port for input data to bind the gateway server to, by default, random ports between range [49152, 65535] will be assigned. The port argument can be either 1 single value in case only 1 protocol is used or multiple values when many protocols are used.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • prefetch

    Number of requests fetched from the client before feeding into the first Executor.

    Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)

  • protocol – Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: [‘GRPC’, ‘HTTP’, ‘WEBSOCKET’].

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • py_modules

    The customized python modules need to be imported before loading the gateway

    Note that the recommended way is to only import a single module - a simple python file, if your gateway can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Gateway will restart while serving if YAML configuration source is changed.

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • ssl_certfile – the path to the certificate file

  • ssl_keyfile – the path to the key file

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • uvicorn_kwargs

    Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

    More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • env – The map of environment variables that are available inside runtime

  • inspect

    The strategy on those inspect deployments in the flow.

    If REMOVE is given then all inspect deployments are removed when building the flow.

  • log_config – The YAML config of the logger used in this object.

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, auto-reloading on file changes is enabled: the Flow will restart while blocked if YAML configuration source is changed. This also applies apply to underlying Executors, if their source code or YAML configuration has changed.

  • uses – The YAML path represents a flow. It can be either a local file path or a URL.

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

needs(needs, name='joiner', *args, **kwargs)[source]#

Add a blocker to the Flow, wait until all pods defined in needs completed.

Parameters:
  • needs (Union[Tuple[str], List[str]]) – list of service names to wait

  • name (str) – the name of this joiner, by default is joiner

  • args – additional positional arguments forwarded to the add function

  • kwargs – additional key value arguments forwarded to the add function

Return type:

Flow

Returns:

the modified Flow

needs_all(name='joiner', *args, **kwargs)[source]#

Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.

Parameters:
  • name (str) – the name of this joiner (default is joiner)

  • args – additional positional arguments which are forwarded to the add and needs function

  • kwargs – additional key value arguments which are forwarded to the add and needs function

Return type:

Flow

Returns:

the modified Flow

add(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, exit_on_exceptions: Optional[List[str]] = [], external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_metadata: Optional[dict] = None, grpc_server_options: Optional[dict] = None, host: Optional[List[str]] = ['0.0.0.0'], install_requirements: Optional[bool] = False, log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, no_reduce: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[int] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_dynamic_batching: Optional[dict] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'][source]#
add(*, needs: Optional[Union[str, Tuple[str], List[str]]] = None, copy_flow: bool = True, deployment_role: DeploymentRoleType = DeploymentRoleType.DEPLOYMENT, **kwargs) Union['Flow', 'AsyncFlow']

Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with set() or deleted with remove()

Parameters:
  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • connection_list – dictionary JSON with a list of connections to configure

  • disable_auto_volume – Do not automatically mount a volume for dockerized Executors.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • exit_on_exceptions – List of exceptions that will cause the Executor to shut down.

  • external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • force_update – If set, always pull the latest Hub Executor bundle even it exists on local

  • gpus

    This argument allows dockerized Jina Executors to discover local gpu devices.

    Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display

  • grpc_metadata – The metadata to be passed to the gRPC request.

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host of the Gateway, which the client should connect to, by default it is 0.0.0.0. In the case of an external Executor (–external or external=True) this can be a list of hosts. Then, every resulting address will be considered as one replica of the Executor.

  • install_requirements – If set, try to install requirements.txt from the local Executor if exists in the Executor folder. If using Hub, install requirements.txt in the Hub Executor bundle to local.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.

  • no_reduce – Disable the built-in reduction mechanism. Set this if the reduction is to be handled by the Executor itself by operating on a docs_matrix or docs_map

  • output_array_type

    The type of array tensor and embedding will be serialized to.

    Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.

  • polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}

  • port – The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (–external or external=True) this can be a list of ports. Then, every resulting address will be considered as one replica of the Executor.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • py_modules

    The customized python modules need to be imported before loading the executor

    Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbook

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Executor will restart while serving if YAML configuration source or Executor modules are changed. If YAML configuration is changed, the whole deployment is reloaded and new processes will be restarted. If only Python modules of the Executor have changed, they will be reloaded to the interpreter without restarting process.

  • replicas – The number of replicas in the deployment

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/concepts/flow/create-flow/#complex-flow-topologies

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • tls – If set, connect to deployment using tls encryption

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_after_address – The address of the uses-before runtime

  • uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).

  • uses_before_address – The address of the uses-before runtime

  • uses_dynamic_batching – Dictionary of keyword arguments that will override the dynamic_batching configuration in uses

  • uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses

  • uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • volumes

    The path on the host to be mounted inside the container.

    Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.

  • when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

  • needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.

  • deployment_role – the role of the Deployment, used for visualization and route planning

  • copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • kwargs – other keyword-value arguments that the Deployment CLI supports

Returns:

a (new) Flow object with modification

Return type:

Union[Flow, AsyncFlow]

Returns:

a (new) Flow object with modification

config_gateway(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_metadata: Optional[str] = '{}', deployments_no_reduce: Optional[str] = '[]', description: Optional[str] = None, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, metrics: Optional[bool] = False, metrics_exporter_host: Optional[str] = None, metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, port: Optional[int] = None, port_monitoring: Optional[int] = None, prefetch: Optional[int] = 1000, protocol: Optional[Union[str, List[str]]] = ['GRPC'], proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, reload: Optional[bool] = False, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GatewayRuntime', ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, traces_exporter_host: Optional[str] = None, traces_exporter_port: Optional[int] = None, tracing: Optional[bool] = False, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)[source]#

Configure the Gateway inside a Flow. The Gateway exposes your Flow logic as a service to the internet according to the protocol and configuration you choose.

Parameters:
  • compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.

  • cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.

  • deployments_addresses – JSON dictionary with the input addresses of each Deployment

  • deployments_metadata – JSON dictionary with the request metadata for each Deployment

  • deployments_no_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed

  • description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • docker_kwargs

    Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.

    More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/

  • entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.

  • env – The map of environment variables that are available inside runtime

  • expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.

  • expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.

  • floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.

  • graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.

  • graph_description – Routing graph for the gateway

  • grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}

  • host – The host address of the runtime, by default it is 0.0.0.0.

  • log_config – The YAML config of the logger used in this object.

  • metrics – If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided.

  • metrics_exporter_host – If tracing is enabled, this hostname will be used to configure the metrics exporter agent.

  • metrics_exporter_port – If tracing is enabled, this port will be used to configure the metrics exporter agent.

  • monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics

  • name

    The name of this object.

    This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …

    When not given, then the default naming strategy will apply.

  • no_crud_endpoints

    If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.

    Any executor that has @requests(on=…) bound with those values will receive data requests.

  • no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.

  • port – The port for input data to bind the gateway server to, by default, random ports between range [49152, 65535] will be assigned. The port argument can be either 1 single value in case only 1 protocol is used or multiple values when many protocols are used.

  • port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]

  • prefetch

    Number of requests fetched from the client before feeding into the first Executor.

    Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)

  • protocol – Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: [‘GRPC’, ‘HTTP’, ‘WEBSOCKET’].

  • proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy

  • py_modules

    The customized python modules need to be imported before loading the gateway

    Note that the recommended way is to only import a single module - a simple python file, if your gateway can be defined in a single file, or an __init__.py file if you have multiple files, which should be structured as a python package.

  • quiet – If set, then no log will be emitted from this object.

  • quiet_error – If set, then exception stack information will not be added to the log

  • reload – If set, the Gateway will restart while serving if YAML configuration source is changed.

  • retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)

  • runtime_cls – The runtime class to run inside the Pod

  • ssl_certfile – the path to the certificate file

  • ssl_keyfile – the path to the key file

  • timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever

  • timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever

  • timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default

  • title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.

  • traces_exporter_host – If tracing is enabled, this hostname will be used to configure the trace exporter agent.

  • traces_exporter_port – If tracing is enabled, this port will be used to configure the trace exporter agent.

  • tracing – If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided.

  • uses

    The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config

    When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface

  • uses_with – Dictionary of keyword arguments that will override the with configuration in uses

  • uvicorn_kwargs

    Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server

    More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/

  • workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.

Return type:

Union[Flow, AsyncFlow]

Returns:

the new Flow object

inspect(name='inspect', *args, **kwargs)[source]#

Add an inspection on the last changed Deployment in the Flow

Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.

Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
        |
        -- PUB-SUB -- InspectDeployment (Hanging)

In this way, InspectDeployment looks like a simple _pass from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.

This function is very handy for introducing an Evaluator into the Flow.

See also

gather_inspect()

Parameters:
  • name (str) – name of the Deployment

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the new instance of the Flow

gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#

Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.

Note

If --no-inspect is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().

Parameters:
  • name (str) – the name of the gather Deployment

  • include_last_deployment (bool) – if to include the last modified Deployment in the Flow

  • args – args for .add()

  • kwargs – kwargs for .add()

Return type:

Flow

Returns:

the modified Flow or the copy of it

See also

inspect()

build(copy_flow=False, disable_build_sandbox=False)[source]#

Build the current Flow and make it ready to use

Note

No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.

Parameters:
  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

  • disable_build_sandbox (bool) – when set to true, the sandbox building part will be skipped, will be used by plot

Return type:

Flow

Returns:

the current Flow (by default)

Note

copy_flow=True is recommended if you are building the same Flow multiple times in a row. e.g.

f = Flow()
with f:
    f.index()

with f.build(copy_flow=True) as fl:
    fl.search()
start()[source]#

Start to run all Deployments in this Flow.

Remember to close the Flow with close().

Note that this method has a timeout of timeout_ready set in CLI, which is inherited all the way from jina.orchestrate.pods.Pod

Returns:

this instance

property num_deployments: int#

Get the number of Deployments in this Flow

Return type:

int

property num_pods: int#

Get the number of pods (shards count) in this Flow

Return type:

int

property client: BaseClient#

Return a BaseClient object attach to this Flow.

Return type:

BaseClient

plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#

Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.

Example,

flow = Flow().add(name='deployment_a').plot('flow.svg')
Parameters:
  • output (Optional[str]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output image

  • vertical_layout (bool) – top-down or left-right layout

  • inline_display (bool) – show image directly inside the Jupyter Notebook

  • build (bool) – build the Flow first before plotting, gateway connection can be better showed

  • copy_flow (bool) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification

Return type:

Flow

Returns:

the Flow

property port: Optional[Union[List[int], int]]#

Return the exposed port of the gateway .. # noqa: DAR201

Return type:

Union[List[int], int, None]

property host: str#

Return the local address of the gateway .. # noqa: DAR201

Return type:

str

property monitoring: bool#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

bool

property port_monitoring: Optional[int]#

Return if the monitoring is enabled .. # noqa: DAR201

Return type:

Optional[int]

property address_private: str#

Return the private IP address of the gateway for connecting from other machine in the same network

Return type:

str

property address_public: str#

Return the public IP address of the gateway for connecting from other machine in the public network

Return type:

str

block(stop_event=None)[source]#

Block the Flow until stop_event is set or user hits KeyboardInterrupt

Parameters:

stop_event (Union[Event, Event, None]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.

property protocol: Union[GatewayProtocolType, List[GatewayProtocolType]]#

Return the protocol of this Flow

Return type:

Union[GatewayProtocolType, List[GatewayProtocolType]]

Returns:

the protocol of this Flow, if only 1 protocol is supported otherwise returns the list of protocols

callback(**kwds)#

Registers an arbitrary callback and arguments.

Cannot suppress exceptions.

close()#

Immediately unwind the context stack.

delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
dry_run(**kwargs)#
enter_context(cm)#

Enters the supplied context manager.

If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.

index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
is_flow_ready(**kwargs)#

Check if the Flow is ready to receive requests

Parameters:

kwargs – potential kwargs received passed from the public interface

Return type:

bool

Returns:

boolean indicating the health/readiness of the Flow

static is_valid_jaml(obj)#

Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj: Dict :param obj: yaml object :rtype: bool :return: whether the syntax is valid or not

classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, uses_dynamic_batching=None, **kwargs)#

A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements JAMLCompatible mixin can enjoy this feature, e.g. BaseFlow, BaseExecutor, BaseGateway and all their subclasses.

Support substitutions in YAML:
  • Environment variables: ${{ ENV.VAR }} (recommended), $VAR (deprecated).

  • Context dict (context): ${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}.

  • Internal reference via this and root: ${{this.same_level_key}}, ${{root.root_level_key}}

Substitutions are carried in the order and multiple passes to resolve variables with best effort.

!BaseEncoder
metas:
    name: ${{VAR_A}}  # env or context variables
    workspace: my-${{this.name}}  # internal reference
# load Executor from yaml file
BaseExecutor.load_config('a.yml')

# load Executor from yaml file and substitute environment variables
os.environ['VAR_A'] = 'hello-world'
b = BaseExecutor.load_config('a.yml')
assert b.name == 'hello-world'

# load Executor from yaml file and substitute variables from a dict
b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'})
assert b.name == 'hello-world'

# disable substitute
b = BaseExecutor.load_config('a.yml', substitute=False)
Parameters:
  • source (Union[str, TextIO, Dict]) – the multi-kind source of the configs.

  • allow_py_modules (bool) – allow importing plugins specified by py_modules in YAML at any levels

  • substitute (bool) – substitute environment, internal reference and context variables.

  • context (Optional[Dict[str, Any]]) – context replacement variables in a dict, the value of the dict is the replacement.

  • uses_with (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s with field

  • uses_metas (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s metas field

  • uses_requests (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s requests field

  • extra_search_paths (Optional[List[str]]) – extra paths used when looking for executor yaml files

  • py_modules (Optional[str]) – Optional py_module from which the object need to be loaded

  • runtime_args (Optional[Dict[str, Any]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config

  • uses_dynamic_batching (Optional[Dict]) – dictionary of parameters to overwrite from the default config’s dynamic_batching field

  • kwargs – kwargs for parse_config_source

Return type:

JAMLCompatible

Returns:

JAMLCompatible object

pop_all()#

Preserve the context stack by transferring it to a new instance.

post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=0.1, backoff_multiplier=1.5, results_in_order=False, stream=True, **kwargs)#

Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • results_in_order (bool) – return the results in the same order as the inputs

  • stream (bool) – Applicable only to grpc client. If True, the requests are sent to the target using the gRPC streaming interface otherwise the gRPC unary interface will be used. The value is True by default.

  • kwargs – additional parameters

Return type:

Union[DocumentArray, List[Response], None]

Returns:

None or DocumentArray containing all response Documents

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

profiling(show_table=True)#

Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.

Parameters:

show_table (bool) – whether to show the table or not.

Return type:

Dict[str, float]

Returns:

the latency report in a dict.

push(exit)#

Registers a callback with the standard __exit__ method signature.

Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).

save_config(filename=None)#

Save the object’s config into a YAML file.

Parameters:

filename (Optional[str]) – file path of the yaml file, if not given then config_abspath is used

search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, stream: bool = True, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
property workspace: str#

Return the workspace path of the flow.

Return type:

str

property workspace_id: Dict[str, str]#

Get all Deployments’ workspace_id values in a dict

Return type:

Dict[str, str]

property env: Optional[Dict]#

Get all envs to be set in the Flow

Return type:

Optional[Dict]

Returns:

envs as dict

expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)

Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.

After expose, you can send data request directly to http://hostname:port/endpoint.

Parameters:

exec_endpoint (str) – the endpoint string, by convention starts with /

# noqa: DAR101 # noqa: DAR102

to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)[source]#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#

Converts the Flow into a set of yaml deployments to deploy in Kubernetes.

If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.

Parameters:
  • output_base_path (str) – The base path where to dump all the yaml files

  • k8s_namespace (Optional[str]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.

  • include_gateway (bool) – Defines if the gateway deployment should be included, defaults to True

to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)[source]#

Converts the Flow into a yaml file to run with docker-compose up :type output_path: Optional[str] :param output_path: The output path for the yaml file :type network_name: Optional[str] :param network_name: The name of the network that will be used by the deployment name :type include_gateway: bool :param include_gateway: Defines if the gateway deployment should be included, defaults to True

property client_args: Namespace#

Get Client settings.

# noqa: DAR201

Return type:

Namespace

property gateway_args: Namespace#

Get Gateway settings.

# noqa: DAR201

Return type:

Namespace

jina.Gateway#

alias of BaseGateway

jina.dynamic_batching(func=None, *, preferred_batch_size=None, timeout=10000)[source]#

@dynamic_batching defines the dynamic batching behavior of an Executor. Dynamic batching works by collecting Documents from multiple requests in a queue, and passing them to the Executor in batches of specified size. This can improve throughput and resource utilization at the cost of increased latency. TODO(johannes) add docstring example

Parameters:
  • func (Optional[Callable[[DocumentArray, Dict, DocumentArray, List[DocumentArray], List[DocumentArray]], Union[DocumentArray, Dict, None]]]) – the method to decorate

  • preferred_batch_size (Optional[int]) – target number of Documents in a batch. The batcher will collect requests until preferred_batch_size is reached, or until timeout is reached. Therefore, the actual batch size can be smaller or larger than preferred_batch_size.

  • timeout (Optional[float]) – maximum time in milliseconds to wait for a request to be assigned to a batch. If the oldest request in the queue reaches a waiting time of timeout, the batch will be passed to the Executor, even if it contains fewer than preferred_batch_size Documents. Default is 10_000ms (10 seconds).

Returns:

decorated function

jina.monitor(*, name=None, documentation=None)[source]#

Decorator and context manager that allows monitoring of an Executor.

You can access these metrics by enabling monitoring on your Executor. It will track the time spent calling the function and the number of times it has been called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/.

EXAMPLE USAGE

As decorator

from jina import Executor, monitor


class MyExecutor(Executor):
    @requests  # `@requests` are monitored automatically
    def foo(self, docs, *args, **kwargs):
        ...
        self.my_method()
        ...

    # custom metric for `my_method`
    @monitor(name='metric_name', documentation='useful information goes here')
    def my_method(self):
        ...

As context manager

from jina import Executor, requests


class MyExecutor(Executor):
    @requests  # `@requests` are monitored automatically
    def foo(self, docs, *args, **kwargs):
        ...
        # custom metric for code block
        with self.monitor('metric_name', 'useful information goes here'):
            docs = process(docs)

To enable the defined monitor() blocks, enable monitoring on the Flow level

from jina import Flow

f = Flow(monitoring=True, port_monitoring=9090).add(
    uses=MyExecutor, port_monitoring=9091
)
with f:
    ...
Warning:

Don’t use this decorator in combination with the @request decorator. @request’s are already monitored.

Parameters:
  • name (Optional[str]) – the name of the metrics, by default it is based on the name of the method it decorates

  • documentation (Optional[str]) – the description of the metrics, by default it is based on the name of the method it decorates

Returns:

decorator which takes as an input a single callable

jina.requests(func=None, *, on=None)[source]#

@requests defines the endpoints of an Executor. It has a keyword on= to define the endpoint.

A class method decorated with plain @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

EXAMPLE USAGE

from jina import Executor, requests, Flow
from docarray import Document


# define Executor with custom `@requests` endpoints
class MyExecutor(Executor):
    @requests(on='/index')
    def index(self, docs, **kwargs):
        print(docs)  # index docs here

    @requests(on=['/search', '/query'])
    def search(self, docs, **kwargs):
        print(docs)  # perform search here

    @requests  # default/fallback endpoint
    def foo(self, docs, **kwargs):
        print(docs)  # process docs here


f = Flow().add(uses=MyExecutor)  # add your Executor to a Flow
with f:
    f.post(
        on='/index', inputs=Document(text='I am here!')
    )  # send doc to `index` method
    f.post(
        on='/search', inputs=Document(text='Who is there?')
    )  # send doc to `search` method
    f.post(
        on='/query', inputs=Document(text='Who is there?')
    )  # send doc to `search` method
    f.post(on='/bar', inputs=Document(text='Who is there?'))  # send doc to `foo` method
Parameters:
Returns:

decorated function