jina package#
Subpackages#
- jina.clients package
- jina.hubble package
- Submodules
- jina.hubble.helper module
get_hub_packages_dir()
get_cache_db()
get_download_cache_dir()
get_request_header()
parse_hub_uri()
replace_secret_of_hub_uri()
is_valid_huburi()
md5file()
unpack_package()
archive_package()
download_with_resume()
upload_file()
disk_cache_offline()
is_requirements_installed()
get_requirements_env_variables()
check_requirements_env_variable()
replace_requirements_env_variables()
install_requirements()
HubbleReturnStatus
NormalizerErrorCode
get_hubble_error_message()
- jina.hubble.hubapi module
- jina.hubble.hubio module
- jina.hubble.requirements module
- jina.hubble.helper module
- Module contents
- Submodules
- jina.jaml package
- jina.logging package
- jina.orchestrate package
- jina.parsers package
- jina.proto package
- jina.schemas package
- jina.serve package
- jina.types package
Submodules#
- jina.checker module
- jina.enums module
- jina.excepts module
BaseJinaException
RuntimeFailToStart
RuntimeTerminated
FlowTopologyError
FlowMissingDeploymentError
FlowBuildLevelError
BadConfigSource
BadClient
BadServer
BadClientCallback
BadClientInput
BadRequestType
BadImageNameError
BadYAMLVersion
NotSupportedError
RuntimeRunForeverEarlyError
DockerVersionError
NoContainerizedError
PortAlreadyUsed
EstablishGrpcConnectionError
InternalNetworkError
- jina.exporter module
- jina.helper module
batch_iterator()
parse_arg()
random_port()
random_identity()
random_uuid()
expand_env_var()
colored()
ArgNamespace
is_valid_local_config_source()
cached_property
typename()
get_public_ip()
get_internal_ip()
convert_tuple_to_list()
run_async()
deprecated_alias()
retry()
countdown()
CatchAllCleanupContextManager
download_mermaid_url()
get_readable_size()
get_or_reuse_loop()
get_rich_console()
- jina.importer module
Module contents#
Top-level module of Jina.
The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.
- class jina.AsyncFlow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
- class jina.AsyncFlow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, prefetch: Optional[int] = 1000, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.AsyncFlow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
AsyncPostMixin
,AsyncProfileMixin
,AsyncHealthCheckMixin
,Flow
Asynchronous version of
jina.Flow
. They share the same interface, except inAsyncFlow
train()
,index()
,search()
methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. viaasyncio.run()
,asyncio.create_task()
.AsyncFlow
can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control theasyncio.eventloop
. On contrary,Flow
is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.In particular,
AsyncFlow
makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).from jina import AsyncFlow from jina.types.document.generators import from_ndarray import numpy as np with AsyncFlow().add() as f: await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)
Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.
See also
Asynchronous in REPL: Autoawait
https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
Another example is when using Jina as an integration. Say you have another IO-bounded job
heavylifting()
, you can use this feature to schedule Jinaindex()
andheavylifting()
concurrently.One can think of
Flow
as Jina-managed eventloop, whereasAsyncFlow
is self-managed eventloop.Create a Flow. Flow is how Jina streamlines and scales Executors.
EXAMPLE USAGE
Python API
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor with f: f.bock() # serve Flow
To and from YAML configuration
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor f.save_config('flow.yml') # save YAML config file f = Flow.load_config('flow.yml') # load Flow from YAML config with f: f.bock() # serve Flow
- Parameters:
asyncio – If set, then the input and output of this Client work in an asynchronous manner.
host – The host address of the runtime, by default it is 0.0.0.0.
port – The port of the Gateway, which the client should connect to.
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
tls – If set, connect to gateway using tls encryption
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
deployments_addresses – dictionary JSON with the input addresses of each Deployment
deployments_disable_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed
description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
env – The map of environment variables that are available inside runtime
expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.
expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.
graph_description – Routing graph for the gateway
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
no_crud_endpoints –
If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.
Any executor that has @requests(on=…) bind with those values will receive data requests.
no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
prefetch –
Number of requests fetched from the client before feeding into the first Executor.
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
ssl_certfile – the path to the certificate file
ssl_keyfile – the path to the key file
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
uvicorn_kwargs –
Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
env – The map of environment variables that are available inside runtime
inspect –
The strategy on those inspect deployments in the flow.
If REMOVE is given then all inspect deployments are removed when building the flow.
log_config – The YAML config of the logger used in this object.
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
quiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
uses – The YAML path represents a flow. It can be either a local file path or a URL.
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
- add(**kwargs)#
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
- Parameters:
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
connection_list – dictionary JSON with a list of connections to configure
disable_auto_volume – Do not automatically mount a volume for dockerized Executors.
disable_reduce – Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head
docker_kwargs –
Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.
More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/
entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.
env – The map of environment variables that are available inside runtime
external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
force_update – If set, always pull the latest Hub Executor bundle even it exists on local
gpus –
This argument allows dockerized Jina executor discover local gpu devices.
Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
install_requirements – If set, install requirements.txt in the Hub Executor bundle to local
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
quiet_remote_logs – Do not display the streaming of remote logs on local console
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
tls – If set, connect to deployment using tls encryption
upload_files –
The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. Python files, data files.
Note, - currently only flatten structure is supported, which means if you upload [./foo/a.py, ./foo/b.pp, ./bar/c.yml], then they will be put under the _same_ workspace on the remote, losing all hierarchies. - by default, –uses YAML file is always uploaded. - uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use –workspace-id to specify the workspace.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_after_address – The address of the uses-before runtime
uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_before_address – The address of the uses-before runtime
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
volumes –
The path on the host to be mounted inside the container.
Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.
when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.
deployment_role – the role of the Deployment, used for visualization and route planning
copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
kwargs – other keyword-value arguments that the Deployment CLI supports
- Returns:
a (new) Flow object with modification
- Return type:
- Returns:
a (new) Flow object with modification
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type:
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type:
str
- block(stop_event=None)#
Block the Flow until stop_event is set or user hits KeyboardInterrupt
- Parameters:
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
- build(copy_flow=False, disable_build_sandbox=False)#
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using
start()
,build()
will be invoked.- Parameters:
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationdisable_build_sandbox (
bool
) – when set to true, the sandbox building part will be skipped, will be used by plot
- Return type:
- Returns:
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
- callback(**kwds)#
Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type:
- property client_args: Namespace#
Get Client settings.
# noqa: DAR201
- Return type:
Namespace
- close()#
Immediately unwind the context stack.
- delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- async dry_run(**kwargs)#
Sends a dry run to the Flow to validate if the Flow is ready to receive requests
- Parameters:
kwargs – potential kwargs received passed from the public interface
- Return type:
bool
- Returns:
boolean indicating the health/readiness of the Flow
- enter_context(cm)#
Enters the supplied context manager.
If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.
- property env: Optional[Dict]#
Get all envs to be set in the Flow
- Return type:
Optional
[Dict
]- Returns:
envs as dict
- expose_endpoint(exec_endpoint, **kwargs)#
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
- Parameters:
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
- property gateway_args: Namespace#
Get Gateway settings.
# noqa: DAR201
- Return type:
Namespace
- gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)#
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters:
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the modified Flow or the copy of it
See also
- property host: str#
Return the local address of the gateway .. # noqa: DAR201
- Return type:
str
- index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- inspect(name='inspect', *args, **kwargs)#
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
- Parameters:
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the new instance of the Flow
- static is_valid_jaml(obj)#
Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj:
Dict
:param obj: yaml object :rtype:bool
:return: whether the syntax is valid or not
- classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, **kwargs)#
A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements
JAMLCompatible
mixin can enjoy this feature, e.g.BaseFlow
,BaseExecutor
,BaseDriver
and all their subclasses.- Support substitutions in YAML:
Environment variables:
${{ ENV.VAR }}
(recommended),$VAR
(deprecated).Context dict (
context
):${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}
.Internal reference via
this
androot
:${{this.same_level_key}}
,${{root.root_level_key}}
Substitutions are carried in the order and multiple passes to resolve variables with best effort.
!BaseEncoder metas: name: ${{VAR_A}} # env or context variables workspace: my-${{this.name}} # internal reference
# load Executor from yaml file BaseExecutor.load_config('a.yml') # load Executor from yaml file and substitute environment variables os.environ['VAR_A'] = 'hello-world' b = BaseExecutor.load_config('a.yml') assert b.name == 'hello-world' # load Executor from yaml file and substitute variables from a dict b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'}) assert b.name == 'hello-world' # disable substitute b = BaseExecutor.load_config('a.yml', substitute=False)
- Parameters:
source (
Union
[str
,TextIO
,Dict
]) – the multi-kind source of the configs.allow_py_modules (
bool
) – allow importing plugins specified bypy_modules
in YAML at any levelssubstitute (
bool
) – substitute environment, internal reference and context variables.context (
Optional
[Dict
[str
,Any
]]) – context replacement variables in a dict, the value of the dict is the replacement.uses_with (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s with fielduses_metas (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s metas fielduses_requests (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s requests fieldextra_search_paths (
Optional
[List
[str
]]) – extra paths used when looking for executor yaml filespy_modules (
Optional
[str
]) – Optional py_module from which the object need to be loadedruntime_args (
Optional
[Dict
[str
,Any
]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config
:param : runtime_args that need to be passed to the yaml
- Parameters:
kwargs – kwargs for parse_config_source
- Return type:
- Returns:
JAMLCompatible
object
- property monitoring: bool#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
bool
- needs(needs, name='joiner', *args, **kwargs)#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters:
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type:
- Returns:
the modified Flow
- needs_all(name='joiner', *args, **kwargs)#
Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
- Parameters:
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
- Return type:
- Returns:
the modified Flow
- property num_deployments: int#
Get the number of Deployments in this Flow
- Return type:
int
- property num_pods: int#
Get the number of pods (shards count) in this Flow
- Return type:
int
- plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)#
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
- Parameters:
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type:
- Returns:
the Flow
- pop_all()#
Preserve the context stack by transferring it to a new instance.
- property port: int#
Return the exposed port of the gateway .. # noqa: DAR201
- Return type:
int
- property port_monitoring: Optional[int]#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
Optional
[int
]
- async post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, **kwargs)#
Async Post a general data request to the Flow.
- Parameters:
inputs (
Optional
[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.on (
str
) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.on_done (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is resolved.on_error (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is rejected.on_always (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is either resolved or rejected.parameters (
Optional
[Dict
]) – the kwargs that will be sent to the executortarget_executor (
Optional
[str
]) – a regex string. Only matching Executors will process the request.request_size (
int
) – the number of Documents per request. <=0 means all inputs in one request.show_progress (
bool
) – if set, client will show a progress bar on receiving every request.continue_on_error (
bool
) – if set, a Request that causes callback error will be logged only without blocking the further requests.return_responses (
bool
) – if set to True, the result will come as Response and not as a DocumentArraykwargs – additional parameters, can be used to pass metadata or authentication information in the server call
- Yield:
Response object
Warning
target_executor
usesre.match
for checking if the pattern is matched.target_executor=='foo'
will match both deployments with the namefoo
andfoo_what_ever_suffix
.- Return type:
AsyncGenerator
[None
,Union
[DocumentArray
, Response]]
- async profiling(show_table=True)#
Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.
- Parameters:
show_table (
bool
) – whether to show the table or not.- Return type:
Dict
[str
,float
]- Returns:
the latency report in a dict.
- property protocol: GatewayProtocolType#
Return the protocol of this Flow
- Return type:
- Returns:
the protocol of this Flow
- push(exit)#
Registers a callback with the standard __exit__ method signature.
Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).
- save_config(filename=None)#
Save the object’s config into a YAML file.
- Parameters:
filename (
Optional
[str
]) – file path of the yaml file, if not given thenconfig_abspath
is used
- search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- start()#
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.orchestrate.pods.Pod
- Returns:
this instance
- to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)#
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment name :type include_gateway:bool
:param include_gateway: Defines if the gateway deployment should be included, defaults to True
- to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]] #
- property workspace: str#
Return the workspace path of the flow.
- Return type:
str
- property workspace_id: Dict[str, str]#
Get all Deployments’
workspace_id
values in a dict- Return type:
Dict
[str
,str
]
- jina.Client(args=None, **kwargs)[source]#
Convenience function that returns client instance for given protocol.
EXAMPLE USAGE
from jina import Client from docarray import Document # select protocol from 'grpc', 'http', or 'websocket'; default is 'grpc' # select asyncio True of False; default is False # select host address to connect to c = Client( protocol='grpc', asyncio=False, host='grpc://my.awesome.flow:1234' ) # returns GRPCClient instance c.post(on='/index', inputs=Document(text='hello!'))
- Parameters:
asyncio – If set, then the input and output of this Client work in an asynchronous manner.
host – The host address of the runtime, by default it is 0.0.0.0.
port – The port of the Gateway, which the client should connect to.
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
tls – If set, connect to gateway using tls encryption
- Return type:
Union
[AsyncWebSocketClient
,WebSocketClient
,AsyncGRPCClient
,GRPCClient
,HTTPClient
,AsyncHTTPClient
]- Returns:
the new Client object
- class jina.Document[source]#
- class jina.Document(_obj: Optional[Document] = None, copy: bool = False)
- class jina.Document(_obj: Optional[Any] = None)
- class jina.Document(_obj: Optional[Dict], copy: bool = False, field_resolver: Optional[Dict[str, str]] = None, unknown_fields_handler: str = 'catch')
- class jina.Document(blob: Optional[bytes] = None, **kwargs)
- class jina.Document(tensor: Optional[ArrayType] = None, **kwargs)
- class jina.Document(text: Optional[str] = None, **kwargs)
- class jina.Document(uri: Optional[str] = None, **kwargs)
- class jina.Document(parent_id: Optional[str] = None, granularity: Optional[int] = None, adjacency: Optional[int] = None, blob: Optional[bytes] = None, tensor: Optional[ArrayType] = None, mime_type: Optional[str] = None, text: Optional[str] = None, content: Optional[DocumentContentType] = None, weight: Optional[float] = None, uri: Optional[str] = None, tags: Optional[Dict[str, StructValueType]] = None, offset: Optional[float] = None, location: Optional[List[float]] = None, embedding: Optional[ArrayType] = None, modality: Optional[str] = None, evaluations: Optional[Dict[str, Dict[str, StructValueType]]] = None, scores: Optional[Dict[str, Dict[str, StructValueType]]] = None, chunks: Optional[Sequence[Document]] = None, matches: Optional[Sequence[Document]] = None)
Bases:
AllMixins
,BaseDCType
Document is the basic data type in DocArray. A Document is a container for any kind of data, be it text, image, audio, video, or 3D meshes.
You can initialize a Document object with given attributes:
from docarray import Document import numpy d1 = Document(text='hello') d3 = Document(tensor=numpy.array([1, 2, 3])) d4 = Document( uri='https://jina.ai', mime_type='text/plain', granularity=1, adjacency=3, tags={'foo': 'bar'}, )
Documents support a nested structure, which can also be specified during construction:
d = Document( id='d0', chunks=[Document(id='d1', chunks=Document(id='d2'))], matches=[Document(id='d3')], )
A Document can embed its contents using the
embed()
method and a provided embedding model:import torchvision q = ( Document(uri='/Users/usr/path/to/image.jpg') .load_uri_to_image_tensor() .set_image_tensor_normalization() .set_image_tensor_channel_axis(-1, 0) ) model = torchvision.models.resnet50(pretrained=True) q.embed(model)
Multiple Documents can be organized into a
DocumentArray
.See also
For further details, see our user guide.
- property adjacency: Optional[int]#
- Return type:
Optional
[int
]
- property blob: Optional[bytes]#
- Return type:
Optional
[bytes
]
- property chunks: Optional[ChunkArray]#
- Return type:
Optional
[ChunkArray
]
- property content: Optional[DocumentContentType]#
- Return type:
Optional
[DocumentContentType]
- property content_hash: int#
Get the document hash according to its content.
- Return type:
int
- Returns:
the unique hash code to represent this Document
- property content_type: Optional[str]#
- Return type:
Optional
[str
]
- convert_blob_to_datauri(charset='utf-8', base64=False)#
Convert
blob
to datauri
in place. Internally it first reads into blob and then converts it to data URI.- Parameters:
charset (
str
) – charset may be any character set registered with IANAbase64 (
bool
) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.
- Return type:
T
- Returns:
itself after processed
- convert_blob_to_image_tensor(width=None, height=None, channel_axis=-1)#
Convert an image
blob
to a ndarraytensor
.- Parameters:
width (
Optional
[int
]) – the width of the image tensor.height (
Optional
[int
]) – the height of the tensor.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axis
- Return type:
T
- Returns:
itself after processed
- convert_blob_to_tensor(dtype=None, count=-1, offset=0)#
Assuming the
blob
is a _valid_ buffer of Numpy ndarray, settensor
accordingly.- Parameters:
dtype (
Optional
[str
]) – Data-type of the returned array; default: float.count (
int
) – Number of items to read.-1
means all data in the buffer.offset (
int
) – Start reading the buffer from this offset (in bytes); default: 0.
- Return type:
T
- Returns:
itself after processed
- convert_content_to_datauri()#
Convert
content
inuri
inplace with best effort- Return type:
T
- Returns:
itself after processed
- convert_image_tensor_to_blob(channel_axis=-1, image_format='png')#
Assuming
tensor
is a _valid_ image, setblob
accordingly- Parameters:
channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimage_format (
str
) – either png or jpeg
- Return type:
T
- Returns:
itself after processed
- convert_image_tensor_to_sliding_windows(window_shape=(64, 64), strides=None, padding=False, channel_axis=-1, as_chunks=False)#
Convert
tensor
into a sliding window view with the given window shapetensor
inplace.- Parameters:
window_shape (
Tuple
[int
,int
]) – desired output size. If size is a sequence like (h, w), the output size will be matched to this. If size is an int, the output will have the same height and width as the target_size.strides (
Optional
[Tuple
[int
,int
]]) – the strides between two neighboring sliding windows. strides is a sequence like (h, w), in which denote the strides on the vertical and the horizontal axis. When not given, using window_shapepadding (
bool
) – If False, only patches which are fully contained in the input image are included. If True, all patches whose starting point is inside the input are included, and areas outside the input default to zero. The padding argument has no effect on the size of each patch, it determines how many patches are extracted. Default is False.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axis.as_chunks (
bool
) – If set, each sliding window will be stored in the chunk of the current Document
- Return type:
T
- Returns:
Document itself after processed
- convert_image_tensor_to_uri(channel_axis=-1, image_format='png')#
Assuming
tensor
is a _valid_ image, seturi
accordingly- Parameters:
channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimage_format (
str
) – either png or jpeg
- Return type:
T
- Returns:
itself after processed
- convert_tensor_to_blob()#
Convert
tensor
toblob
inplace.- Return type:
T
- Returns:
itself after processed
- convert_tensor_to_text(vocab, delimiter=' ')#
Convert
tensor
totext
inplace.- Parameters:
- Return type:
T
- Returns:
Document itself after processed
- convert_text_to_datauri(charset='utf-8', base64=False)#
-
- Parameters:
charset (
str
) – charset may be any character set registered with IANAbase64 (
bool
) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.
- Return type:
T
- Returns:
itself after processed
- convert_text_to_tensor(vocab, max_length=None, dtype='int64')#
Convert
text
totensor
inplace.In the end
tensor
will be a 1D array where D is max_length.To get the vocab of a DocumentArray, you can use jina.types.document.converters.build_vocab to
- Parameters:
vocab (
Dict
[str
,int
]) – a dictionary that maps a word to an integer index, 0 is reserved for padding, 1 is reserved for unknown words intext
. So you should not include these two entries in vocab.max_length (
Optional
[int
]) – the maximum length of the sequence. Sequence longer than this are cut off from beginning. Sequence shorter than this will be padded with 0 from right hand side.dtype (
str
) – the dtype of the generatedtensor
- Return type:
T
- Returns:
Document itself after processed
- convert_uri_to_datauri(charset='utf-8', base64=False)#
Convert
uri
to dataURI and store it inuri
inplace.- Parameters:
charset (
str
) – charset may be any character set registered with IANAbase64 (
bool
) – used to encode arbitrary octet sequences into a form that satisfies the rules of 7bit. Designed to be efficient for non-text 8 bit and binary data. Sometimes used for text data that frequently uses non-US-ASCII characters.
- Return type:
T
- Returns:
itself after processed
- copy_from(other)#
Overwrite self by copying from another
Document
.- Parameters:
other (T) – the other Document to copy from
- Return type:
None
- display(from_=None)#
Plot image data from
uri
or fromtensor
ifuri
is empty .- Parameters:
from – an optional string to decide if a document should display using either the uri or the tensor field.
- embed(*args, **kwargs)#
- Return type:
T
- embed_feature_hashing(n_dim=256, sparse=False, fields=('text', 'tags'), max_value=1000000)#
Convert an arbitrary set of attributes into a fixed-dimensional matrix using the hashing trick.
- Parameters:
n_dim (
int
) – the dimensionality of each document in the output embedding. Small numbers of features are likely to cause hash collisions, but large numbers will cause larger overall parameter dimensions.sparse (
bool
) – whether the resulting feature matrix should be a sparse csr_matrix or dense ndarray. Note that this feature requiresscipy
fields (
Tuple
[str
,...
]) – which attributes to be considered as for feature hashing.
- Return type:
T
- property embedding: Optional[ArrayType]#
- Return type:
Optional
[ArrayType]
- property evaluations: Optional[Dict[str, NamedScore]]#
- Return type:
Optional
[Dict
[str
,NamedScore
]]
- classmethod from_base64(data, protocol='pickle', compress=None)#
Build Document object from binary bytes
- Parameters:
data (
str
) – a base64 encoded stringprotocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress method to use
- Return type:
T
- Returns:
a Document object
- classmethod from_bytes(data, protocol='pickle', compress=None)#
Build Document object from binary bytes
- Parameters:
data (
bytes
) – binary bytesprotocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress method to use
- Return type:
T
- Returns:
a Document object
- classmethod from_dict(obj, protocol='jsonschema', **kwargs)#
Convert a dict object into a Document.
- Parameters:
obj (
Dict
) – a Python dict objectprotocol (
str
) – jsonschema or protobufkwargs – extra key-value args pass to pydantic and protobuf parser.
- Return type:
T
- Returns:
the parsed Document object
- classmethod from_json(obj, protocol='jsonschema', **kwargs)#
Convert a JSON string into a Document.
- Parameters:
obj (
Union
[str
,bytes
,bytearray
]) – a valid JSON stringprotocol (
str
) – jsonschema or protobufkwargs – extra key-value args pass to pydantic and protobuf parser.
- Return type:
T
- Returns:
the parsed Document object
- classmethod from_protobuf(pb_msg)#
- Return type:
T
- classmethod from_pydantic_model(model)#
Build a Document object from a Pydantic model
- Parameters:
model (BaseModel) – the pydantic data model object that represents a Document
- Return type:
T
- Returns:
a Document object
- classmethod from_strawberry_type(model)#
Build a Document object from a Strawberry model
- Parameters:
model – the Strawberry data model object that represents a Document
- Return type:
T
- Returns:
a Document object
- classmethod generator_from_webcam(height_width=None, show_window=True, window_title='webcam', fps=30, exit_key=27, exit_event=None, tags=None)#
Create a generator that yields a
Document
object from the webcam.This feature requires the opencv-python package.
- Parameters:
height_width (
Optional
[Tuple
[int
,int
]]) – the shape of the video frame, if not provided, the shape will be determined from the first frame. Note that this is restricted by the hardware of the camera.show_window (
bool
) – if to show preview window of the webcam videowindow_title (
str
) – the window title of the preview windowfps (
int
) – expected frames per second, note that this is not guaranteed, as the actual fps depends on the hardware limitexit_key (
int
) – the key to press to exit the preview windowexit_event – the multiprocessing/threading/asyncio event that once set to exit the preview window
tags (
Optional
[Dict
]) – the tags to attach to the document
- Return type:
Generator
[T,None
,None
]- Returns:
a generator that yields a
Document
object from a webcam
- classmethod get_json_schema(indent=2)#
Return a JSON Schema of Document class.
- Return type:
str
- get_multi_modal_attribute(attribute)#
- Return type:
- get_vocabulary(text_attrs=('text',))#
Get the text vocabulary in a counter dict that maps from the word to its frequency from all
text_fields
.- Parameters:
text_attrs (
Tuple
[str
,...
]) – the textual attributes where vocabulary will be derived from- Return type:
Dict
[str
,int
]- Returns:
a vocabulary in dictionary where key is the word, value is the frequency of that word in all text fields.
- property granularity: Optional[int]#
- Return type:
Optional
[int
]
- property id: str#
- Return type:
str
- property is_multimodal: bool#
Return true if this Document can be represented by a class wrapped by
docarray.dataclasses.types.dataclass()
.- Return type:
bool
- load_pil_image_to_datauri(image)#
Convert a pillow image into a datauri with header data:image/png.
- Parameters:
image (PILImage) – a pillow image
- Returns:
itself after processed
- load_uri_to_audio_tensor()#
Convert an audio
uri
intotensor
inplace- Return type:
T
- Returns:
Document itself after processed
- load_uri_to_blob()#
Convert
uri
toblob
inplace. Internally it downloads from the URI and setblob
.- Return type:
T
- Returns:
itself after processed
- load_uri_to_image_tensor(width=None, height=None, channel_axis=-1)#
Convert the image-like
uri
intotensor
- Parameters:
width (
Optional
[int
]) – the width of the image tensor.height (
Optional
[int
]) – the height of the tensor.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axis
- Return type:
T
- Returns:
itself after processed
- load_uri_to_point_cloud_tensor(samples, as_chunks=False)#
Convert a 3d mesh-like
uri
intotensor
- Parameters:
samples (
int
) – number of points to sample from the meshas_chunks (
bool
) – when multiple geometry stored in one mesh file, then store each geometry into differentchunks
- Return type:
T
- Returns:
itself after processed
- load_uri_to_text(charset='utf-8')#
Convert
uri
to :attr`.text` inplace.- Parameters:
charset (
str
) – charset may be any character set registered with IANA- Return type:
T
- Returns:
itself after processed
- load_uri_to_video_tensor(only_keyframes=False)#
Convert a
uri
to a video ndarraytensor
.- Parameters:
only_keyframes (
bool
) – only keep the keyframes in the video- Return type:
T
- Returns:
Document itself after processed
- property location: Optional[List[float]]#
- Return type:
Optional
[List
[float
]]
- match(*args, **kwargs)#
- Return type:
T
- property matches: Optional[MatchArray]#
- Return type:
Optional
[MatchArray
]
- property mime_type: Optional[str]#
- Return type:
Optional
[str
]
- property modality: Optional[str]#
- Return type:
Optional
[str
]
- property nbytes: int#
Return total bytes consumed by protobuf.
- Return type:
int
- Returns:
number of bytes
- property non_empty_fields: Tuple[str]#
Get all non-emtpy fields of this
Document
.Non-empty fields are the fields with not-None and not-default values.
- Return type:
Tuple
[str
]- Returns:
field names in a tuple.
- property offset: Optional[float]#
- Return type:
Optional
[float
]
- property parent_id: Optional[str]#
- Return type:
Optional
[str
]
- plot_matches_sprites(top_k=10, channel_axis=-1, inv_normalize=False, skip_empty=False, canvas_size=1920, min_size=100, output=None)#
Generate a sprite image for the query and its matching images in this Document object.
An image sprite is a collection of images put into a single image. Query image is on the left followed by matching images. The Document object should contain matches.
- Parameters:
top_k (
int
) – the number of top matching documents to show in the sprite.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisinv_normalize (
bool
) – If set to True, inverse the normalization of a float32 imagetensor
into a uint8 imagetensor
inplace.skip_empty (
bool
) – skip matches which has no .uri or .tensor.canvas_size (
int
) – the width of the canvasmin_size (
int
) – the minimum size of the imageoutput (
Optional
[str
]) – Optional path to store the visualization. If not given, show in UI
- pop(*fields)#
Clear some fields from this
Document
to their default values.- Parameters:
fields – field names to clear.
- Return type:
None
- post(*args, **kwargs)#
- Return type:
T
- save_audio_tensor_to_file(file, sample_rate=44100, sample_width=2)#
Save
tensor
into an wav file. Mono/stereo is preserved.- Parameters:
file (
Union
[str
,BinaryIO
]) – if file is a string, open the file by that name, otherwise treat it as a file-like object.sample_rate (
int
) – sampling frequencysample_width (
int
) – sample width in bytes
- Return type:
T
- Returns:
Document itself after processed
- save_blob_to_file(file)#
Save
blob
into a file- Parameters:
file (
Union
[str
,BinaryIO
]) – File or filename to which the data is saved.- Return type:
T
- Returns:
itself after processed
- save_image_tensor_to_file(file, channel_axis=-1, image_format='png')#
Save
tensor
into a file- Parameters:
file (
Union
[str
,BinaryIO
]) – File or filename to which the data is saved.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimage_format (
str
) – either png or jpeg
- Return type:
T
- Returns:
itself after processed
- save_uri_to_file(file)#
Save
uri
into a file- Parameters:
file (
Union
[str
,BinaryIO
]) – File or filename to which the data is saved.- Return type:
T
- Returns:
itself after processed
- save_video_tensor_to_file(file, frame_rate=30, codec='h264')#
Save
tensor
as a video mp4/h264 file.- Parameters:
file (
Union
[str
,BinaryIO
]) – The file to open, which can be either a string or a file-like object.frame_rate (
int
) – frames per secondcodec (
str
) – the name of a decoder/encoder
- Return type:
T
- Returns:
itself after processed
- property scores: Optional[Dict[str, NamedScore]]#
- Return type:
Optional
[Dict
[str
,NamedScore
]]
- set_image_tensor_channel_axis(original_channel_axis, new_channel_axis)#
Move the channel axis of the image
tensor
inplace.- Parameters:
original_channel_axis (
int
) – the original axis of the channelnew_channel_axis (
int
) – the new axis of the channel
- Return type:
T
- Returns:
itself after processed
- set_image_tensor_inv_normalization(channel_axis=-1, img_mean=(0.485, 0.456, 0.406), img_std=(0.229, 0.224, 0.225))#
Inverse the normalization of a float32 image
tensor
into a uint8 imagetensor
inplace.- Parameters:
channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimg_mean (
Tuple
[float
]) – the mean of all imagesimg_std (
Tuple
[float
]) – the standard deviation of all images
- Return type:
T
- Returns:
itself after processed
- set_image_tensor_normalization(channel_axis=-1, img_mean=(0.485, 0.456, 0.406), img_std=(0.229, 0.224, 0.225))#
Normalize a uint8 image
tensor
into a float32 imagetensor
inplace.Following Pytorch standard, the image must be in the shape of shape (3 x H x W) and will be normalized in to a range of [0, 1] and then normalized using mean = [0.485, 0.456, 0.406] and std = [0.229, 0.224, 0.225]. These two arrays are computed based on millions of images. If you want to train from scratch on your own dataset, you can calculate the new mean and std. Otherwise, using the Imagenet pretrianed model with its own mean and std is recommended.
- Parameters:
channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimg_mean (
Tuple
[float
]) – the mean of all imagesimg_std (
Tuple
[float
]) – the standard deviation of all images
- Return type:
T
- Returns:
itself after processed
Warning
Please do NOT generalize this function to gray scale, black/white image, it does not make any sense for non RGB image. if you look at their MNIST examples, the mean and stddev are 1-dimensional (since the inputs are greyscale– no RGB channels).
- set_image_tensor_resample(ratio, channel_axis=-1)#
Resample the image
tensor
into different size inplace.- Parameters:
ratio (
float
) – scale ratio of the resampled image tensor.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axis
- Return type:
T
- Returns:
itself after processed
- set_image_tensor_shape(shape, channel_axis=-1)#
Resample the image
tensor
into different size inplace.If your current image tensor has shape
[H,W,C]
, then the new tensor will be[*shape, C]
- Parameters:
shape (
Tuple
[int
,int
]) – the new shape of the image tensor.channel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axis
- Return type:
T
- Returns:
itself after processed
- set_multi_modal_attribute(attribute, value)#
- summary()#
Print non-empty fields and nested structure of this Document object.
- Return type:
None
- property tags: Optional[Dict[str, StructValueType]]#
- Return type:
Optional
[Dict
[str
, StructValueType]]
- property tensor: Optional[ArrayType]#
- Return type:
Optional
[ArrayType]
- property text: Optional[str]#
- Return type:
Optional
[str
]
- to_base64(protocol='pickle', compress=None)#
Serialize a Document object into as base64 string
- Parameters:
protocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress method to use
- Return type:
str
- Returns:
a base64 encoded string
- to_bytes(protocol='pickle', compress=None)#
- Return type:
bytes
- to_dict(protocol='jsonschema', **kwargs)#
Convert itself into a Python dict object.
- Parameters:
protocol (
str
) – jsonschema or protobufkwargs – extra key-value args pass to pydantic and protobuf dumper.
- Return type:
Dict
[str
,Any
]- Returns:
the dumped Document as a dict object
- to_json(protocol='jsonschema', **kwargs)#
Convert itself into a JSON string.
- Parameters:
protocol (
str
) – jsonschema or protobufkwargs – extra key-value args pass to pydantic and protobuf dumper.
- Return type:
str
- Returns:
the dumped JSON string
- to_protobuf(ndarray_type=None)#
Convert Document into a Protobuf message.
- Parameters:
ndarray_type (
Optional
[str
]) – can belist
ornumpy
, if set it will force all ndarray-like object to beList
ornumpy.ndarray
.- Return type:
DocumentProto
- Returns:
the protobuf message
- to_pydantic_model()#
Convert a Document object into a Pydantic model.
- Return type:
PydanticDocument
- to_strawberry_type()#
Convert a Document object into a Strawberry type.
- Return type:
StrawberryDocument
- property uri: Optional[str]#
- Return type:
Optional
[str
]
- property weight: Optional[float]#
- Return type:
Optional
[float
]
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, copy: bool = False, subindex_configs: Optional[Dict[str, None]] = None)[source]#
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'sqlite', config: Optional[Union[SqliteConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'weaviate', config: Optional[Union[WeaviateConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'annlite', config: Optional[Union[AnnliteConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'elasticsearch', config: Optional[Union[ElasticConfig, Dict]] = None, subindex_configs: Optional[Dict[str, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'redis', config: Optional[Union[RedisConfig, Dict]] = None)
Bases:
AllMixins
,BaseDocumentArray
DocumentArray is a list-like container of
Document
objects.A DocumentArray can be used to store, embed, and retrieve
Document
objects.from docarray import Document, DocumentArray da = DocumentArray( [Document(text='The cake is a lie'), Document(text='Do a barrel roll!')] ) da.apply(Document.embed_feature_hashing) query = Document(text='Can i have some cake?').embed_feature_hashing() query.match(da, metric='jaccard', use_scipy=True) print(query.matches[:, ('text', 'scores__jaccard__value')])
[['The cake is a lie', 'Do a barrel roll!'], [0.9, 1.0]]
A DocumentArray can also embed its contents using a neural network, process them using an external Flow or Executor, and persist Documents in a Document Store for fast vector search:
from docarray import Document, DocumentArray import numpy as np n_dim = 3 metric = 'Euclidean' # initialize a DocumentArray with ANNLiter Document Store da = DocumentArray( storage='annlite', config={'n_dim': n_dim, 'columns': [('price', 'float')], 'metric': metric}, ) # add Documents to the DocumentArray with da: da.extend( [ Document(id=f'r{i}', embedding=i * np.ones(n_dim), tags={'price': i}) for i in range(10) ] ) # perform vector search np_query = np.ones(n_dim) * 8 results = da.find(np_query)
See also
For further details, see our user guide.
- append(value)#
S.append(value) – append value to the end of the sequence
- apply(*args, **kwargs)#
# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new
DocumentArray
- apply_batch(*args, **kwargs)#
# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new
DocumentArray
- batch(batch_size, shuffle=False)#
Creates a Generator that yields DocumentArray of size batch_size until docs is fully traversed along the traversal_path. The None docs are filtered out and optionally the docs can be filtered by checking for the existence of a Document attribute. Note, that the last batch might be smaller than batch_size.
- Parameters:
batch_size (
int
) – Size of each generated batch (except the last one, which might be smaller, default: 32)shuffle (
bool
) – If set, shuffle the Documents before dividing into minibatches.
- Yield:
a Generator of DocumentArray, each in the length of batch_size
- Return type:
Generator
[DocumentArray
,None
,None
]
- batch_ids(batch_size, shuffle=False)#
Creates a Generator that yields lists of ids of size batch_size until self is fully traversed. Note, that the last batch might be smaller than batch_size.
- Parameters:
batch_size (
int
) – Size of each generated batch (except the last one, which might be smaller)shuffle (
bool
) – If set, shuffle the Documents before dividing into minibatches.
- Yield:
a Generator of list of IDs, each in the length of batch_size
- Return type:
Generator
[List
[str
],None
,None
]
- property blobs: Optional[List[bytes]]#
Get the blob attribute of all Documents.
- Return type:
Optional
[List
[bytes
]]- Returns:
a list of blobs
- clear() None -- remove all items from S #
- property contents: Optional[Union[Sequence[DocumentContentType], ArrayType]]#
Get the
content
of all Documents.- Return type:
Union
[Sequence
[DocumentContentType], ArrayType,None
]- Returns:
a list of texts, blobs or
ArrayType
- count(value) integer -- return number of occurrences of value #
- classmethod dataloader(path, func, batch_size, protocol='protobuf', compress=None, backend='thread', num_worker=None, pool=None, show_progress=False)#
Load array elements, batches and maps them with a function in parallel, finally yield the batch in DocumentArray
- Parameters:
path (
Union
[str
,Path
]) – Path or filename where the data is stored.func (
Callable
[[DocumentArray
], T]) – a function that takesDocumentArray
as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.batch_size (
int
) – Size of each generated batch (except the last one, which might be smaller)protocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress algorithm to usebackend (
str
) –if to use multi-process or multi-thread as the parallelization backend. In general, if your
func
is IO-bound then perhaps thread is good enough. If yourfunc
is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.Warning
When using process backend, you should not expect
func
modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.num_worker (
Optional
[int
]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.pool (
Union
[Pool, ThreadPool,None
]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.show_progress (
bool
) – if set, show a progressbar
- Return type:
Generator
[DocumentArray
,None
,None
]- Returns:
- embed(embed_model, device='cpu', batch_size=256, to_numpy=False, collate_fn=None)#
Fill
embedding
of Documents inplace by using embed_model- Parameters:
embed_model (AnyDNN) – the embedding model written in Keras/Pytorch/Paddle
device (
str
) – the computational device for embed_model, can be either cpu or cuda.batch_size (
int
) – number of Documents in a batch for embeddingto_numpy (
bool
) – if to store embeddings back to Document innumpy.ndarray
or original framework format.collate_fn (
Optional
[CollateFnType]) – create a mini-batch of Input(s) from the given DocumentArray. Default built-in collate_fn is to use the tensors of the documents.
- Return type:
T
- Returns:
itself after modified.
- property embeddings: Optional[ArrayType]#
Return a
ArrayType
stacking all the embedding attributes as rows.- Return type:
Optional
[ArrayType]- Returns:
a
ArrayType
of embedding
- classmethod empty(size=0, *args, **kwargs)#
Create a
DocumentArray
object withsize
emptyDocument
objects.- Parameters:
size (
int
) – the number of empty Documents in this container- Return type:
T
- Returns:
a
DocumentArray
object
- evaluate(other, metric, hash_fn=None, metric_name=None, strict=True, **kwargs)#
Compute ranking evaluation metrics for a given DocumentArray when compared with a groundtruth.
This implementation expects to provide a groundtruth DocumentArray that is structurally identical to self. It is based on comparing the matches of documents inside the `DocumentArray.
This method will fill the evaluations field of Documents inside this DocumentArray and will return the average of the computations
- Parameters:
other (
DocumentArray
) – The groundtruth DocumentArray` that the DocumentArray compares to.metric (
Union
[str
,Callable
[...
,float
]]) – The name of the metric, or multiple metrics to be computedhash_fn (
Optional
[Callable
[[Document
],str
]]) – The function used for identifying the uniqueness of Documents. If not given, thenDocument.id
is used.metric_name (
Optional
[str
]) – If provided, the results of the metrics computation will be stored in the evaluations field of each Document. If not provided, the name will be computed based on the metrics name.strict (
bool
) – If set, then left and right sides are required to be fully aligned: on the length, and on the semantic of length. These are preventing you to evaluate on irrelevant matches accidentally.kwargs – Additional keyword arguments to be passed to metric_fn
- Return type:
Optional
[float
]- Returns:
The average evaluation computed or a list of them if multiple metrics are required
- extend(values)#
S.extend(iterable) – extend sequence by appending elements from the iterable
- find(query=None, metric='cosine', limit=20, metric_name=None, exclude_self=False, filter=None, only_id=False, index='text', on=None, **kwargs)#
Returns matching Documents given an input query. If the query is a DocumentArray, Document or ArrayType, exhaustive or approximate nearest neighbor search will be performed depending on whether the storage backend supports ANN. Furthermore, if filter is not None, pre-filtering will be applied along with vector search. If the query is a dict object or, query is None and filter is not None, Documents will be filtered and all matching Documents that match the filter will be returned. In this case, query (if it’s dict) or filter will be used for filtering. The object must follow the backend-specific filter format if the backend supports filtering or DocArray’s query language format. In the latter case, filtering will be applied in the client side not the backend side. If the query is a string or list of strings, a search by text will be performed if the backend supports indexing and searching text fields. If not, a NotImplementedError will be raised.
- Parameters:
query (
Union
[DocumentArray, Document, ArrayType,Dict
,str
,List
[str
],None
]) – the input query to search bylimit (
Union
[int
,float
,None
]) – the maximum number of matches, when not given defaults to 20.metric_name (
Optional
[str
]) – if provided, then match result will be marked with this string.metric (
Union
[str
,Callable
[[ArrayType, ArrayType],ndarray
]]) – the distance metric.exclude_self (
bool
) – if set, Documents in results with sameid
as the query values will not be considered as matches. This is only applied when the input query is Document or DocumentArray.filter (
Optional
[Dict
]) – filter query used for pre-filtering or filteringonly_id (
bool
) – if set, then returning matches will only containid
index (
str
) – if the query is a string, text search will be performed on the index field, otherwise, this parameter is ignored. By default, the Document text attribute will be used for search, otherwise the tag field specified by index will be used. You can only use this parameter if the storage backend supports searching by text.on (
Optional
[str
]) – specifies a subindex to search on. If set, the returned DocumentArray will be retrieved from the given subindex.kwargs – other kwargs.
- Return type:
Union
[DocumentArray,List
[DocumentArray]]- Returns:
a list of DocumentArrays containing the closest Document objects for each of the queries in query.
- flatten()#
Flatten all nested chunks and matches into one
DocumentArray
.Note
Flatten an already flattened DocumentArray will have no effect.
- Return type:
- Returns:
a flattened
DocumentArray
object.
- classmethod from_base64(data, protocol='pickle-array', compress=None, _show_progress=False, *args, **kwargs)#
- Return type:
T
- classmethod from_bytes(data, protocol='pickle-array', compress=None, _show_progress=False, *args, **kwargs)#
- Return type:
T
- classmethod from_csv(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_dataframe(df, *args, **kwargs)#
Import a
DocumentArray
from apandas.DataFrame
object.- Parameters:
df (DataFrame) – a
pandas.DataFrame
object.- Return type:
T
- Returns:
a
DocumentArray
object
- classmethod from_dict(values, protocol='jsonschema', **kwargs)#
- Return type:
T
- classmethod from_files(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_huggingface_datasets(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_json(file, protocol='jsonschema', **kwargs)#
- Return type:
T
- classmethod from_lines(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_list(values, protocol='jsonschema', **kwargs)#
- Return type:
T
- classmethod from_ndarray(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_ndjson(*args, **kwargs)#
# noqa: DAR101 # noqa: DAR102 # noqa: DAR201
- Return type:
T
- classmethod from_protobuf(pb_msg)#
- Return type:
T
- classmethod from_pydantic_model(model)#
Convert a list of PydanticDocument into DocumentArray
- Parameters:
model (
List
[BaseModel]) – the list of pydantic data model objects that represents a DocumentArray- Return type:
T
- Returns:
a DocumentArray
- classmethod from_strawberry_type(model)#
Convert a list of Strawberry into DocumentArray
- Parameters:
model (
List
[StrawberryDocument]) – the list of strawberry type objects that represents a DocumentArray- Return type:
T
- Returns:
a DocumentArray
- classmethod get_json_schema(indent=2)#
Return a JSON Schema of DocumentArray class.
- Return type:
str
- get_vocabulary(min_freq=1, text_attrs=('text',))#
Get the text vocabulary in a dict that maps from the word to the index from all Documents.
- Parameters:
text_attrs (
Tuple
[str
,...
]) – the textual attributes where vocabulary will be derived frommin_freq (
int
) – the minimum word frequency to be considered into the vocabulary.
- Return type:
Dict
[str
,int
]- Returns:
a vocabulary in dictionary where key is the word, value is the index. The value is 2-index, where 0 is reserved for padding, 1 is reserved for unknown token.
- index(value[, start[, stop]]) integer -- return first index of value. #
Raises ValueError if the value is not present.
Supporting start and stop arguments is optional, but recommended.
- abstract insert(index, value)#
S.insert(index, value) – insert value before index
- classmethod load(file, file_format='binary', encoding='utf-8', **kwargs)#
Load array elements from a JSON or a binary file, or a CSV file.
- Parameters:
file (
Union
[str
,TextIO
,BinaryIO
]) – File or filename to which the data is saved.file_format (
str
) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.encoding (
str
) – encoding used to load data from a file (it only applies to JSON and CSV format). By default,utf-8
is used.
- Return type:
T
- Returns:
the loaded DocumentArray object
- classmethod load_binary(file, protocol='pickle-array', compress=None, _show_progress=False, streaming=False, *args, **kwargs)#
Load array elements from a compressed binary file.
- Parameters:
file (
Union
[str
,BinaryIO
,bytes
,Path
]) – File or filename or serialized bytes where the data is stored.protocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress algorithm to use_show_progress (
bool
) – show progress bar, only works when protocol is pickle or protobufstreaming (
bool
) – if True returns a generator over Document objects.
In case protocol is pickle the Documents are streamed from disk to save memory usage :rtype:
Union
[DocumentArray,Generator
[Document,None
,None
]] :return: a DocumentArray objectNote
If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be loaded assuming protocol=protobuf and compress=lz4.
- classmethod load_csv(file, field_resolver=None, encoding='utf-8')#
Load array elements from a binary file.
- Parameters:
file (
Union
[str
,TextIO
]) – File or filename to which the data is saved.field_resolver (
Optional
[Dict
[str
,str
]]) – a map from field names defined in JSON, dict to the field names defined in Document.encoding (
str
) – encoding used to read a CSV file. By default,utf-8
is used.
- Return type:
T
- Returns:
a DocumentArray object
- classmethod load_json(file, protocol='jsonschema', encoding='utf-8', **kwargs)#
Load array elements from a JSON file.
- Parameters:
file (
Union
[str
,TextIO
]) – File or filename or a JSON string to which the data is saved.protocol (
str
) – jsonschema or protobufencoding (
str
) – encoding used to load data from a JSON file. By default,utf-8
is used.
- Return type:
T
- Returns:
a DocumentArrayLike object
- map(func, backend='thread', num_worker=None, show_progress=False, pool=None)#
Return an iterator that applies function to every element of iterable in parallel, yielding the results.
See also
To process on a batch of elements, please use
map_batch()
;To return a
DocumentArray
, please useapply()
.
- Parameters:
func (
Callable
[[Document], T]) – a function that takesDocument
as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.backend (
str
) –if to use multi-process or multi-thread as the parallelization backend. In general, if your
func
is IO-bound then perhaps thread is good enough. If yourfunc
is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.Warning
When using process backend, you should not expect
func
modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.num_worker (
Optional
[int
]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.show_progress (
bool
) – show a progress barpool (
Union
[Pool, ThreadPool,None
]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.
- Yield:
anything return from
func
- Return type:
Generator
[T,None
,None
]
- map_batch(func, batch_size, backend='thread', num_worker=None, shuffle=False, show_progress=False, pool=None)#
Return an iterator that applies function to every minibatch of iterable in parallel, yielding the results. Each element in the returned iterator is
DocumentArray
.See also
To process single element, please use
map()
;To return
DocumentArray
, please useapply_batch()
.
- Parameters:
batch_size (
int
) – Size of each generated batch (except the last one, which might be smaller, default: 32)shuffle (
bool
) – If set, shuffle the Documents before dividing into minibatches.func (
Callable
[[DocumentArray], T]) – a function that takesDocumentArray
as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.backend (
str
) –if to use multi-process or multi-thread as the parallelization backend. In general, if your
func
is IO-bound then perhaps thread is good enough. If yourfunc
is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.Warning
When using process backend, you should not expect
func
modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.num_worker (
Optional
[int
]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.show_progress (
bool
) – show a progress barpool (
Union
[Pool, ThreadPool,None
]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.
- Yield:
anything return from
func
- Return type:
Generator
[T,None
,None
]
- match(darray, metric='cosine', limit=20, normalization=None, metric_name=None, batch_size=None, exclude_self=False, filter=None, only_id=False, use_scipy=False, device='cpu', num_worker=1, on=None, **kwargs)#
Compute embedding based nearest neighbour in another for each Document in self, and store results in matches. .. note:
'cosine', 'euclidean', 'sqeuclidean' are supported natively without extra dependency. You can use other distance metric provided by ``scipy``, such as `braycurtis`, `canberra`, `chebyshev`, `cityblock`, `correlation`, `cosine`, `dice`, `euclidean`, `hamming`, `jaccard`, `jensenshannon`, `kulsinski`, `mahalanobis`, `matching`, `minkowski`, `rogerstanimoto`, `russellrao`, `seuclidean`, `sokalmichener`, `sokalsneath`, `sqeuclidean`, `wminkowski`, `yule`. To use scipy metric, please set ``use_scipy=True``.
To make all matches values in [0, 1], use
dA.match(dB, normalization=(0, 1))
- To invert the distance as score and make all values in range [0, 1],
use
dA.match(dB, normalization=(1, 0))
. Note, hownormalization
differs from the previous.
If a custom metric distance is provided. Make sure that it returns scores as distances and not similarity, meaning the smaller the better.
- Parameters:
darray (DocumentArray) – the other DocumentArray to match against
metric (
Union
[str
,Callable
[[ArrayType, ArrayType],ndarray
]]) – the distance metriclimit (
Union
[int
,float
,None
]) – the maximum number of matches, when not given defaults to 20.normalization (
Optional
[Tuple
[float
,float
]]) – a tuple [a, b] to be used with min-max normalization, the min distance will be rescaled to a, the max distance will be rescaled to b all values will be rescaled into range [a, b].metric_name (
Optional
[str
]) – if provided, then match result will be marked with this string.batch_size (
Optional
[int
]) – if provided, thendarray
is loaded in batches, where each of them is at mostbatch_size
elements. When darray is big, this can significantly speedup the computation.exclude_self (
bool
) – if set, Documents indarray
with sameid
as the left-hand values will not be considered as matches.filter (
Optional
[Dict
]) – filter query used for pre-filteringonly_id (
bool
) – if set, then returning matches will only containid
use_scipy (
bool
) – if set, usescipy
as the computation backend. Note,scipy
does not support distance on sparse matrix.device (
str
) – the computational device for.match()
, can be either cpu or cuda.num_worker (
Optional
[int
]) –the number of parallel workers. If not given, then the number of CPUs in the system will be used.
Note
This argument is only effective when
batch_size
is set.on (
Optional
[str
]) – specifies a subindex to search on. If set, the returned DocumentArray will be retrieved from the given subindex.kwargs – other kwargs.
- Return type:
None
- plot_embeddings(title='MyDocumentArray', path=None, image_sprites=False, min_image_size=16, channel_axis=-1, start_server=True, host='127.0.0.1', port=None, image_source='tensor')#
Interactively visualize
embeddings
using the Embedding Projector.- Parameters:
title (
str
) – the title of this visualization. If you want to compare multiple embeddings at the same time, make sure to give different names each time and setpath
to the same value.host (
str
) – if set, bind the embedding-projector frontend to given host. Otherwise localhost is used.port (
Optional
[int
]) – if set, run the embedding-projector frontend at given port. Otherwise a random port is used.image_sprites (
bool
) – if set, visualize the dots usinguri
andtensor
.path (
Optional
[str
]) – if set, then append the visualization to an existing folder, where you can compare multiple embeddings at the same time. Make sure to use a differenttitle
each time .min_image_size (
int
) – only used when image_sprites=True. the minimum size of the imagechannel_axis (
int
) – only used when image_sprites=True. the axis id of the color channel,-1
indicates the color channel info at the last axisstart_server (
bool
) – if set, start a HTTP server and open the frontend directly. Otherwise, you need to rely onreturn
path and serve by yourself.image_source (
str
) – specify where the image comes from, can beuri
ortensor
. empty tensor will fallback to uri
- Return type:
str
- Returns:
the path to the embeddings visualization info.
- plot_image_sprites(output=None, canvas_size=512, min_size=16, channel_axis=-1, image_source='tensor', skip_empty=False, show_progress=False, show_index=False, fig_size=(10, 10), keep_aspect_ratio=False)#
Generate a sprite image for all image tensors in this DocumentArray-like object.
An image sprite is a collection of images put into a single image. It is always square-sized. Each sub-image is also square-sized and equally-sized.
- Parameters:
output (
Optional
[str
]) – Optional path to store the visualization. If not given, show in UIcanvas_size (
int
) – the size of the canvasmin_size (
int
) – the minimum size of the imagechannel_axis (
int
) – the axis id of the color channel,-1
indicates the color channel info at the last axisimage_source (
str
) – specify where the image comes from, can beuri
ortensor
. empty tensor will fallback to uriskip_empty (
bool
) – skip Document who has no .uri or .tensor.show_index (
bool
) – show the index on the top-right corner of every imagefig_size (
Optional
[Tuple
[int
,int
]]) – the size of the figureshow_progress (
bool
) – show a progressbar while plotting.keep_aspect_ratio (
bool
) – preserve the aspect ratio of the image by using the aspect ratio of the first image in self.
- Return type:
None
- pop([index]) item -- remove and return item at index (default last). #
Raise IndexError if list is empty or index is out of range.
- post(host, show_progress=False, batch_size=None, parameters=None, **kwargs)#
Posting itself to a remote Flow/Sandbox and get the modified DocumentArray back
- Parameters:
host (
str
) – a host string. Can be one of the following: - grpc://192.168.0.123:8080/endpoint - ws://192.168.0.123:8080/endpoint - http://192.168.0.123:8080/endpoint - jinahub://Hello/endpoint - jinahub+docker://Hello/endpoint - jinahub+docker://Hello/v0.0.1/endpoint - jinahub+docker://Hello/latest/endpoint - jinahub+sandbox://Hello/endpointshow_progress (
bool
) – if to show a progressbarbatch_size (
Optional
[int
]) – number of Document on each requestparameters (
Optional
[Dict
]) – parameters to send in the request
- Return type:
- Returns:
the new DocumentArray returned from remote
- classmethod pull(name, show_progress=False, local_cache=True, *args, **kwargs)#
Pulling a
DocumentArray
from Jina Cloud Service to local.- Parameters:
name (
str
) – the upload name set duringpush()
show_progress (
bool
) – if to show a progress bar on pullinglocal_cache (
bool
) – store the downloaded DocumentArray to local folder
- Return type:
T
- Returns:
a
DocumentArray
object
- push(name, show_progress=False, public=True)#
Push this DocumentArray object to Jina Cloud which can be later retrieved via
push()
Note
Push with the same
name
will override the existing content.Kinda like a public clipboard where everyone can override anyone’s content. So to make your content survive longer, you may want to use longer & more complicated name.
The lifetime of the content is not promised atm, could be a day, could be a week. Do not use it for persistence. Only use this full temporary transmission/storage/clipboard.
- Parameters:
name (
str
) – a name that later can be used for retrieve thisDocumentArray
.show_progress (
bool
) – if to show a progress bar on pullingpublic (
bool
) – by default anyone can pull a DocumentArray if they know its name. Setting this to False will allow only the creator to pull it. This feature of course you to login first.
- Return type:
Dict
- reduce(other)#
Reduces other and the current DocumentArray into one DocumentArray in-place. Changes are applied to the current DocumentArray. Reducing 2 DocumentArrays consists in adding Documents in the second DocumentArray to the first DocumentArray if they do not exist. If a Document exists in both DocumentArrays, the data properties are merged with priority to the first Document (that is, to the current DocumentArray’s Document). The matches and chunks are also reduced in the same way. :type other: T :param other: DocumentArray :rtype: T :return: DocumentArray
- reduce_all(others)#
Reduces a list of DocumentArrays and this DocumentArray into one DocumentArray. Changes are applied to this DocumentArray in-place.
Reduction consists in reducing this DocumentArray with every DocumentArray in others sequentially using
DocumentArray
.:method:reduce. The resulting DocumentArray contains Documents of all DocumentArrays. If a Document exists in many DocumentArrays, data properties are merged with priority to the left-most DocumentArrays (that is, if a data attribute is set in a Document belonging to many DocumentArrays, the attribute value of the left-most DocumentArray is kept). Matches and chunks of a Document belonging to many DocumentArrays are also reduced in the same way. Other non-data properties are ignored.Note
- Matches are not kept in a sorted order when they are reduced. You might want to re-sort them in a later
step.
The final result depends on the order of DocumentArrays when applying reduction.
- Parameters:
others (
List
[T]) – List of DocumentArrays to be reduced- Return type:
T
- Returns:
the resulting DocumentArray
- remove(value)#
S.remove(value) – remove first occurrence of value. Raise ValueError if the value is not present.
- reverse()#
S.reverse() – reverse IN PLACE
- sample(k, seed=None)#
random sample k elements from
DocumentArray
without replacement.- Parameters:
k (
int
) – Number of elements to sample from the document array.seed (
Optional
[int
]) – initialize the random number generator, by default is None. If set will save the state of the random function to produce certain outputs.
- Return type:
- Returns:
A sampled list of
Document
represented asDocumentArray
.
- save(file, file_format='binary', encoding='utf-8')#
Save array elements into a JSON, a binary file or a CSV file.
- Parameters:
file (
Union
[str
,TextIO
,BinaryIO
]) – File or filename to which the data is saved.file_format (
str
) – json or binary or csv. JSON and CSV files are human-readable, but binary format gives much smaller size and faster save/load speed. Note that, CSV file has very limited compatability, complex DocumentArray with nested structure can not be restored from a CSV file.encoding (
str
) – encoding used to save data into a file (it only applies to JSON and CSV format). By default,utf-8
is used.
- Return type:
None
- save_binary(file, protocol='pickle-array', compress=None)#
Save array elements into a binary file.
- Parameters:
file (
Union
[str
,BinaryIO
]) – File or filename to which the data is saved.protocol (
str
) – protocol to usecompress (
Optional
[str
]) –compress algorithm to use
Note
If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be created using protocol=protobuf and compress=lz4.
Comparing to
save_json()
, it is faster and the file is smaller, but not human-readable.Note
To get a binary presentation in memory, use
bytes(...)
.- Return type:
None
- save_csv(file, flatten_tags=True, exclude_fields=None, dialect='excel', with_header=True, encoding='utf-8')#
Save array elements into a CSV file.
- Parameters:
file (
Union
[str
,TextIO
]) – File or filename to which the data is saved.flatten_tags (
bool
) – if set, then all fields inDocument.tags
will be flattened intotag__fieldname
and stored as separated columns. It is useful whentags
contain a lot of information.exclude_fields (
Optional
[Sequence
[str
]]) – if set, those fields wont show up in the output CSVdialect (
Union
[str
,Dialect
]) – define a set of parameters specific to a particular CSV dialect. could be a string that represents predefined dialects in your system, or could be acsv.Dialect
class that groups specific formatting parameters together.encoding (
str
) – encoding used to save the data into a CSV file. By default,utf-8
is used.
- Return type:
None
- save_embeddings_csv(file, encoding='utf-8', **kwargs)#
Save embeddings to a CSV file
This function utilizes
numpy.savetxt()
internal.- Parameters:
file (
Union
[str
,TextIO
]) – File or filename to which the data is saved.encoding (
str
) – encoding used to save the data into a file. By default,utf-8
is used.kwargs – extra kwargs will be passed to
numpy.savetxt()
.
- Return type:
None
- save_gif(output, channel_axis=-1, duration=200, size_ratio=1.0, inline_display=False, image_source='tensor', skip_empty=False, show_index=False, show_progress=False)#
Save a gif of the DocumentArray. Each frame corresponds to a Document.uri/.tensor in the DocumentArray.
- Parameters:
output (
str
) – the file path to save the gif to.channel_axis (
int
) – the color channel axis of the tensor.duration (
int
) – the duration of each frame in milliseconds.size_ratio (
float
) – the size ratio of each frame.inline_display (
bool
) – if to show the gif in Jupyter notebook.image_source (
str
) – the source of the image in Document atribute.skip_empty (
bool
) – if to skip empty documents.show_index (
bool
) – if to show the index of the document in the top-right corner.show_progress (
bool
) – if to show a progress bar.
- Return type:
None
- Returns:
- save_json(file, protocol='jsonschema', encoding='utf-8', **kwargs)#
Save array elements into a JSON file.
Comparing to
save_binary()
, it is human-readable but slower to save/load and the file size larger.- Parameters:
file (
Union
[str
,TextIO
]) – File or filename to which the data is saved.protocol (
str
) – jsonschema or protobufencoding (
str
) – encoding used to save data into a JSON file. By default,utf-8
is used.
- Return type:
None
- shuffle(seed=None)#
Randomly shuffle documents within the
DocumentArray
.- Parameters:
seed (
Optional
[int
]) – initialize the random number generator, by default is None. If set will save the state of the random function to produce certain outputs.- Return type:
- Returns:
The shuffled list of
Document
represented asDocumentArray
.
- split_by_tag(tag)#
Split the DocumentArray into multiple DocumentArray according to the tag value of each Document.
- Parameters:
tag (
str
) – the tag name to split stored in tags.- Return type:
Dict
[Any
,DocumentArray
]- Returns:
a dict where Documents with the same value on tag are grouped together, their orders are preserved from the original
DocumentArray
.
Note
If the
tags
ofDocument
do not contains the specifiedtag
, return an empty dict.
- summary()#
Print the structure and attribute summary of this DocumentArray object.
Warning
Calling {meth}`.summary` on large DocumentArray can be slow.
- property tensors: Optional[ArrayType]#
Return a
ArrayType
stacking alltensor
.The tensor attributes are stacked together along a newly created first dimension (as if you would stack using
np.stack(X, axis=0)
).Warning
This operation assumes all tensors have the same shape and dtype. All dtype and shape values are assumed to be equal to the values of the first element in the DocumentArray
- Return type:
Optional
[ArrayType]- Returns:
a
ArrayType
of tensors
- property texts: Optional[List[str]]#
Get
text
of all Documents- Return type:
Optional
[List
[str
]]- Returns:
a list of texts
- to_base64(protocol='pickle-array', compress=None, _show_progress=False)#
- Return type:
str
- to_bytes(protocol='pickle-array', compress=None, _file_ctx=None, _show_progress=False)#
Serialize itself into bytes.
For more Pythonic code, please use
bytes(...)
.- Parameters:
_file_ctx (
Optional
[BinaryIO
]) – File or filename or serialized bytes where the data is stored.protocol (
str
) – protocol to usecompress (
Optional
[str
]) – compress algorithm to use_show_progress (
bool
) – show progress bar, only works when protocol is pickle or protobuf
- Return type:
bytes
- Returns:
the binary serialization in bytes
- to_dataframe(**kwargs)#
Export itself to a
pandas.DataFrame
object.- Parameters:
kwargs – the extra kwargs will be passed to
pandas.DataFrame.from_dict()
.- Return type:
DataFrame
- Returns:
a
pandas.DataFrame
object
- to_dict(protocol='jsonschema', **kwargs)#
Convert the object into a Python list.
- Parameters:
protocol (
str
) – jsonschema or protobuf- Return type:
List
- Returns:
a Python list
- to_json(protocol='jsonschema', **kwargs)#
Convert the object into a JSON string. Can be loaded via
load_json()
.- Parameters:
protocol (
str
) – jsonschema or protobuf- Return type:
str
- Returns:
a Python list
- to_list(protocol='jsonschema', **kwargs)#
Convert the object into a Python list.
- Parameters:
protocol (
str
) – jsonschema or protobuf- Return type:
List
- Returns:
a Python list
- to_protobuf(ndarray_type=None)#
Convert DocumentArray into a Protobuf message.
- Parameters:
ndarray_type (
Optional
[str
]) – can belist
ornumpy
, if set it will force all ndarray-like object from all Documents toList
ornumpy.ndarray
.- Return type:
DocumentArrayProto
- Returns:
the protobuf message
- to_pydantic_model()#
Convert a DocumentArray object into a Pydantic model.
- Return type:
List
[PydanticDocument
]
- to_strawberry_type()#
Convert a DocumentArray object into a Pydantic model.
- Return type:
List
[StrawberryDocument]
- traverse(traversal_paths, filter_fn=None)#
Return an Iterator of :class:
TraversableSequence
of the leaves when applying the traversal_paths. Each :class:TraversableSequence
is either the root Documents, a ChunkArray or a MatchArray.- Parameters:
traversal_paths (
str
) – a comma-separated string that represents the traversal pathfilter_fn (
Optional
[Callable
[[Document
],bool
]]) – function to filter docs during traversal
- Yield:
:class:
TraversableSequence
of the leaves when applying the traversal_paths.
Example on
traversal_paths
:r: docs in this TraversableSequence
m: all match-documents at adjacency 1
c: all child-documents at granularity 1
r.[attribute]: access attribute of a multi modal document
cc: all child-documents at granularity 2
mm: all match-documents at adjacency 2
cm: all match-document at adjacency 1 and granularity 1
r,c: docs in this TraversableSequence and all child-documents at granularity 1
r[start:end]: access sub document array using slice
- Return type:
Iterable
[T]
- traverse_flat(traversal_paths, filter_fn=None)#
Returns a single flattened :class:
TraversableSequence
with all Documents, that are reached via thetraversal_paths
.Warning
When defining the
traversal_paths
with multiple paths, the returned :class:Documents
are determined at once and not on the fly. This is a different behavior then in :method:traverse
and :method:traverse_flattened_per_path
!- Parameters:
traversal_paths (
str
) – a list of string that represents the traversal pathfilter_fn (
Optional
[Callable
[[Document
],bool
]]) – function to filter docs during traversal
- Return type:
- Returns:
a single :class:
TraversableSequence
containing the document of all leaves when applying the traversal_paths.
- traverse_flat_per_path(traversal_paths, filter_fn=None)#
Returns a flattened :class:
TraversableSequence
per path intraversal_paths
with all Documents, that are reached by the path.- Parameters:
traversal_paths (
str
) – a comma-separated string that represents the traversal pathfilter_fn (
Optional
[Callable
[[Document
],bool
]]) – function to filter docs during traversal
- Yield:
:class:
TraversableSequence
containing the document of all leaves per path.
- jina.Executor#
alias of
BaseExecutor
- class jina.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, **kwargs)[source]#
- class jina.Flow(*, compression: Optional[str] = None, cors: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', deployments_disable_reduce: Optional[str] = '[]', description: Optional[str] = None, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, floating: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, prefetch: Optional[int] = 1000, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.Flow(*, env: Optional[dict] = None, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
PostMixin
,ProfileMixin
,HealthCheckMixin
,JAMLCompatible
,ExitStack
Flow is how Jina streamlines and distributes Executors.
Create a Flow. Flow is how Jina streamlines and scales Executors.
EXAMPLE USAGE
Python API
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor with f: f.bock() # serve Flow
To and from YAML configuration
from jina import Flow f = Flow().add(uses='jinahub+docker://SimpleIndexer') # create Flow and add Executor f.save_config('flow.yml') # save YAML config file f = Flow.load_config('flow.yml') # load Flow from YAML config with f: f.bock() # serve Flow
- Parameters:
asyncio – If set, then the input and output of this Client work in an asynchronous manner.
host – The host address of the runtime, by default it is 0.0.0.0.
port – The port of the Gateway, which the client should connect to.
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
tls – If set, connect to gateway using tls encryption
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
cors – If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
deployments_addresses – dictionary JSON with the input addresses of each Deployment
deployments_disable_reduce – list JSON disabling the built-in merging mechanism for each Deployment listed
description – The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
env – The map of environment variables that are available inside runtime
expose_endpoints – A JSON string that represents a map from executor endpoints (@requests(on=…)) to HTTP endpoints.
expose_graphql_endpoint – If set, /graphql endpoint is added to HTTP interface.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
graph_conditions – Dictionary stating which filtering conditions each Executor in the graph requires to receive Documents.
graph_description – Routing graph for the gateway
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
no_crud_endpoints –
If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.
Any executor that has @requests(on=…) bind with those values will receive data requests.
no_debug_endpoints – If set, /status /post endpoints are removed from HTTP interface.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
prefetch –
Number of requests fetched from the client before feeding into the first Executor.
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
protocol – Communication protocol between server and client.
proxy – If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
ssl_certfile – the path to the certificate file
ssl_keyfile – the path to the key file
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
title – The title of this HTTP server. It will be used in automatics docs such as Swagger UI.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
uvicorn_kwargs –
Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
env – The map of environment variables that are available inside runtime
inspect –
The strategy on those inspect deployments in the flow.
If REMOVE is given then all inspect deployments are removed when building the flow.
log_config – The YAML config of the logger used in this object.
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
quiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
uses – The YAML path represents a flow. It can be either a local file path or a URL.
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
- needs(needs, name='joiner', *args, **kwargs)[source]#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters:
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type:
- Returns:
the modified Flow
- needs_all(name='joiner', *args, **kwargs)[source]#
Collect all floating Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
- Parameters:
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
- Return type:
- Returns:
the modified Flow
- add(*, compression: Optional[str] = None, connection_list: Optional[str] = None, disable_auto_volume: Optional[bool] = False, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, external: Optional[bool] = False, floating: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, grpc_server_options: Optional[dict] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_monitoring: Optional[str] = None, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, retries: Optional[int] = -1, runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, when: Optional[dict] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'] [source]#
- add(*, needs: Optional[Union[str, Tuple[str], List[str]]] = None, copy_flow: bool = True, deployment_role: DeploymentRoleType = DeploymentRoleType.DEPLOYMENT, **kwargs) Union['Flow', 'AsyncFlow']
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
- Parameters:
compression – The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
connection_list – dictionary JSON with a list of connections to configure
disable_auto_volume – Do not automatically mount a volume for dockerized Executors.
disable_reduce – Disable the built-in reduce mechanism, set this if the reduction is to be handled by the Executor connected to this Head
docker_kwargs –
Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker ‘ container.
More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/
entrypoint – The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.
env – The map of environment variables that are available inside runtime
external – The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow.
floating – If set, the current Pod/Deployment can not be further chained, and the next .add() will chain after the last Pod/Deployment not this current one.
force_update – If set, always pull the latest Hub Executor bundle even it exists on local
gpus –
This argument allows dockerized Jina executor discover local gpu devices.
Note, - To access all gpus, use –gpus all. - To access multiple gpus, e.g. make use of 2 gpus, use –gpus 2. - To access specified gpus based on device id, use –gpus device=[YOUR-GPU-DEVICE-ID] - To access specified gpus based on multiple device id, use –gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2] - To specify more parameters, use `–gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
grpc_server_options – Dictionary of kwargs arguments that will be passed to the grpc server as options when starting the server, example : {‘grpc.max_send_message_length’: -1}
host – The host address of the runtime, by default it is 0.0.0.0.
host_in – The host address for binding to, by default it is 0.0.0.0
install_requirements – If set, install requirements.txt in the Hub Executor bundle to local
log_config – The YAML config of the logger used in this object.
monitoring – If set, spawn an http server with a prometheus endpoint to expose metrics
name –
The name of this object.
This will be used in the following places: - how you refer to this object in Python/YAML/CLI - visualization - log message header - …
When not given, then the default naming strategy will apply.
native – If set, only native Executors is allowed, and the Executor is always run inside WorkerRuntime.
output_array_type –
The type of array tensor and embedding will be serialized to.
Supports the same types as docarray.to_protobuf(.., ndarray_type=…), which can be found here <https://docarray.jina.ai/fundamentals/document/serialization/#from-to-protobuf>. Defaults to retaining whatever type is returned by the Executor.
polling – The polling strategy of the Deployment and its endpoints (when shards>1). Can be defined for all endpoints of a Deployment or by endpoint. Define per Deployment: - ANY: only one (whoever is idle) Pod polls the message - ALL: all Pods poll the message (like a broadcast) Define per Endpoint: JSON dict, {endpoint: PollingType} {‘/custom’: ‘ALL’, ‘/search’: ‘ANY’, ‘*’: ‘ANY’}
port – The port for input data to bind to, default is a random port between [49152, 65535]
port_monitoring – The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
py_modules –
The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your executor can be defined in a single file, or an
__init__.py
file if you have multiple files, which should be structured as a python package. For more details, please see the Executor cookbookquiet – If set, then no log will be emitted from this object.
quiet_error – If set, then exception stack information will not be added to the log
quiet_remote_logs – Do not display the streaming of remote logs on local console
replicas – The number of replicas in the deployment
retries – Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
runtime_cls – The runtime class to run inside the Pod
shards – The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
timeout_ctrl – The timeout in milliseconds of the control request, -1 for waiting forever
timeout_ready – The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever
timeout_send – The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default
tls – If set, connect to deployment using tls encryption
upload_files –
The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. Python files, data files.
Note, - currently only flatten structure is supported, which means if you upload [./foo/a.py, ./foo/b.pp, ./bar/c.yml], then they will be put under the _same_ workspace on the remote, losing all hierarchies. - by default, –uses YAML file is always uploaded. - uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use –workspace-id to specify the workspace.
uses –
The config of the executor, it could be one of the followings: * the string literal of an Executor class name * an Executor YAML file (.yml, .yaml, .jaml) * a Jina Hub Executor (must start with jinahub:// or jinahub+docker://) * a docker image (must start with docker://) * the string literal of a YAML config (must start with ! or `jtype: `) * the string literal of a JSON config
When use it under Python, one can use the following values additionally: - a Python dict that represents the config - a text file stream has .read() interface
uses_after – The executor attached after the Pods described by –uses, typically used for receiving from all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_after_address – The address of the uses-before runtime
uses_before – The executor attached before the Pods described by –uses, typically before sending to all shards, accepted type follows –uses. This argument only applies for sharded Deployments (shards > 1).
uses_before_address – The address of the uses-before runtime
uses_metas – Dictionary of keyword arguments that will override the metas configuration in uses
uses_requests – Dictionary of keyword arguments that will override the requests configuration in uses
uses_with – Dictionary of keyword arguments that will override the with configuration in uses
volumes –
The path on the host to be mounted inside the container.
Note, - If separated by :, then the first part will be considered as the local host path and the second part is the path in the container system. - If no split provided, then the basename of that directory will be mounted into container’s root path, e.g. –volumes=”/user/test/my-workspace” will be mounted into /my-workspace inside the container. - All volumes are mounted with read-write mode.
when – The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>
workspace – The working directory for any IO operations in this object. If not set, then derive from its parent workspace.
needs – the name of the Deployment(s) that this Deployment receives data from. One can also use “gateway” to indicate the connection with the gateway.
deployment_role – the role of the Deployment, used for visualization and route planning
copy_flow – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
kwargs – other keyword-value arguments that the Deployment CLI supports
- Returns:
a (new) Flow object with modification
- Return type:
- Returns:
a (new) Flow object with modification
- inspect(name='inspect', *args, **kwargs)[source]#
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
- Parameters:
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the new instance of the Flow
- gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters:
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
- Return type:
- Returns:
the modified Flow or the copy of it
See also
- build(copy_flow=False, disable_build_sandbox=False)[source]#
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using
start()
,build()
will be invoked.- Parameters:
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationdisable_build_sandbox (
bool
) – when set to true, the sandbox building part will be skipped, will be used by plot
- Return type:
- Returns:
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
- start()[source]#
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.orchestrate.pods.Pod
- Returns:
this instance
- property num_deployments: int#
Get the number of Deployments in this Flow
- Return type:
int
- property num_pods: int#
Get the number of pods (shards count) in this Flow
- Return type:
int
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type:
- plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
- Parameters:
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type:
- Returns:
the Flow
- property port: int#
Return the exposed port of the gateway .. # noqa: DAR201
- Return type:
int
- property host: str#
Return the local address of the gateway .. # noqa: DAR201
- Return type:
str
- property monitoring: bool#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
bool
- property port_monitoring: Optional[int]#
Return if the monitoring is enabled .. # noqa: DAR201
- Return type:
Optional
[int
]
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type:
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type:
str
- block(stop_event=None)[source]#
Block the Flow until stop_event is set or user hits KeyboardInterrupt
- Parameters:
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
- property protocol: GatewayProtocolType#
Return the protocol of this Flow
- Return type:
- Returns:
the protocol of this Flow
- callback(**kwds)#
Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
- close()#
Immediately unwind the context stack.
- delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- dry_run(**kwargs)#
Sends a dry run to the Flow to validate if the Flow is ready to receive requests
- Parameters:
kwargs – potential kwargs received passed from the public interface
- Return type:
bool
- Returns:
boolean indicating the health/readiness of the Flow
- enter_context(cm)#
Enters the supplied context manager.
If successful, also pushes its __exit__ method as a callback and returns the result of the __enter__ method.
- index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- static is_valid_jaml(obj)#
Verifies the yaml syntax of a given object by first serializing it and attempting to deserialize and catch parser errors :type obj:
Dict
:param obj: yaml object :rtype:bool
:return: whether the syntax is valid or not
- classmethod load_config(source, *, allow_py_modules=True, substitute=True, context=None, uses_with=None, uses_metas=None, uses_requests=None, extra_search_paths=None, py_modules=None, runtime_args=None, **kwargs)#
A high-level interface for loading configuration with features of loading extra py_modules, substitute env & context variables. Any class that implements
JAMLCompatible
mixin can enjoy this feature, e.g.BaseFlow
,BaseExecutor
,BaseDriver
and all their subclasses.- Support substitutions in YAML:
Environment variables:
${{ ENV.VAR }}
(recommended),$VAR
(deprecated).Context dict (
context
):${{ CONTEXT.VAR }}``(recommended), ``${{ VAR }}
.Internal reference via
this
androot
:${{this.same_level_key}}
,${{root.root_level_key}}
Substitutions are carried in the order and multiple passes to resolve variables with best effort.
!BaseEncoder metas: name: ${{VAR_A}} # env or context variables workspace: my-${{this.name}} # internal reference
# load Executor from yaml file BaseExecutor.load_config('a.yml') # load Executor from yaml file and substitute environment variables os.environ['VAR_A'] = 'hello-world' b = BaseExecutor.load_config('a.yml') assert b.name == 'hello-world' # load Executor from yaml file and substitute variables from a dict b = BaseExecutor.load_config('a.yml', context={'VAR_A': 'hello-world'}) assert b.name == 'hello-world' # disable substitute b = BaseExecutor.load_config('a.yml', substitute=False)
- Parameters:
source (
Union
[str
,TextIO
,Dict
]) – the multi-kind source of the configs.allow_py_modules (
bool
) – allow importing plugins specified bypy_modules
in YAML at any levelssubstitute (
bool
) – substitute environment, internal reference and context variables.context (
Optional
[Dict
[str
,Any
]]) – context replacement variables in a dict, the value of the dict is the replacement.uses_with (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s with fielduses_metas (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s metas fielduses_requests (
Optional
[Dict
]) – dictionary of parameters to overwrite from the default config’s requests fieldextra_search_paths (
Optional
[List
[str
]]) – extra paths used when looking for executor yaml filespy_modules (
Optional
[str
]) – Optional py_module from which the object need to be loadedruntime_args (
Optional
[Dict
[str
,Any
]]) – Optional dictionary of parameters runtime_args to be directly passed without being parsed into a yaml config
:param : runtime_args that need to be passed to the yaml
- Parameters:
kwargs – kwargs for parse_config_source
- Return type:
- Returns:
JAMLCompatible
object
- pop_all()#
Preserve the context stack by transferring it to a new instance.
- post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, **kwargs)#
Post a general data request to the Flow.
- Parameters:
inputs (
Optional
[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.on (
str
) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.on_done (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is resolved.on_error (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is rejected.on_always (
Optional
[CallbackFnType]) – the function to be called when theRequest
object is either resolved or rejected.parameters (
Optional
[Dict
]) – the kwargs that will be sent to the executortarget_executor (
Optional
[str
]) – a regex string. Only matching Executors will process the request.request_size (
int
) – the number of Documents per request. <=0 means all inputs in one request.show_progress (
bool
) – if set, client will show a progress bar on receiving every request.continue_on_error (
bool
) – if set, a Request that causes callback error will be logged only without blocking the further requests.7return_responses (
bool
) – if set to True, the result will come as Response and not as a DocumentArraykwargs – additional parameters
- Return type:
Union
[DocumentArray
,List
[Response],None
]- Returns:
None or DocumentArray containing all response Documents
Warning
target_executor
usesre.match
for checking if the pattern is matched.target_executor=='foo'
will match both deployments with the namefoo
andfoo_what_ever_suffix
.
- profiling(show_table=True)#
Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.
- Parameters:
show_table (
bool
) – whether to show the table or not.- Return type:
Dict
[str
,float
]- Returns:
the latency report in a dict.
- push(exit)#
Registers a callback with the standard __exit__ method signature.
Can suppress exceptions the same way __exit__ method can. Also accepts any object with an __exit__ method (registering a call to the method instead of the object itself).
- save_config(filename=None)#
Save the object’s config into a YAML file.
- Parameters:
filename (
Optional
[str
]) – file path of the yaml file, if not given thenconfig_abspath
is used
- search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]] #
- property workspace: str#
Return the workspace path of the flow.
- Return type:
str
- property workspace_id: Dict[str, str]#
Get all Deployments’
workspace_id
values in a dict- Return type:
Dict
[str
,str
]
- property env: Optional[Dict]#
Get all envs to be set in the Flow
- Return type:
Optional
[Dict
]- Returns:
envs as dict
- expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
- expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
- Parameters:
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
- to_kubernetes_yaml(output_base_path, k8s_namespace=None, include_gateway=True)[source]#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- to_k8s_yaml(output_base_path, k8s_namespace=None, include_gateway=True)#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes.
If you don’t want to rebuild image on Jina Hub, you can set JINA_HUB_NO_IMAGE_REBUILD environment variable.
- Parameters:
output_base_path (
str
) – The base path where to dump all the yaml filesk8s_namespace (
Optional
[str
]) – The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used.include_gateway (
bool
) – Defines if the gateway deployment should be included, defaults to True
- to_docker_compose_yaml(output_path=None, network_name=None, include_gateway=True)[source]#
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment name :type include_gateway:bool
:param include_gateway: Defines if the gateway deployment should be included, defaults to True
- property client_args: Namespace#
Get Client settings.
# noqa: DAR201
- Return type:
Namespace
- property gateway_args: Namespace#
Get Gateway settings.
# noqa: DAR201
- Return type:
Namespace
- jina.monitor(*, name=None, documentation=None)[source]#
Decorator and context manager that allows monitoring of an Executor.
You can access these metrics by enabling monitoring on your Executor. It will track the time spent calling the function and the number of times it has been called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/.
EXAMPLE USAGE
As decorator
from jina import Executor, monitor class MyExecutor(Executor): @requests # `@requests` are monitored automatically def foo(self, docs, *args, **kwargs): ... self.my_method() ... # custom metric for `my_method` @monitor(name='metric_name', documentation='useful information goes here') def my_method(self): ...
As context manager
from jina import Executor, requests class MyExecutor(Executor): @requests # `@requests` are monitored automatically def foo(self, docs, *args, **kwargs): ... # custom metric for code block with self.monitor('metric_name', 'useful information goes here'): docs = process(docs)
To enable the defined
monitor()
blocks, enable monitoring on the Flow levelfrom jina import Flow f = Flow(monitoring=True, port_monitoring=9090).add( uses=MyExecutor, port_monitoring=9091 ) with f: ...
- Warning:
Don’t use this decorator in combination with the @request decorator. @request’s are already monitored.
- Parameters:
name (
Optional
[str
]) – the name of the metrics, by default it is based on the name of the method it decoratesdocumentation (
Optional
[str
]) – the description of the metrics, by default it is based on the name of the method it decorates
- Returns:
decorator which takes as an input a single callable
- jina.requests(func=None, *, on=None)[source]#
@requests defines the endpoints of an Executor. It has a keyword on= to define the endpoint.
A class method decorated with plain @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.
EXAMPLE USAGE
from jina import Executor, requests, Flow from docarray import Document # define Executor with custom `@requests` endpoints class MyExecutor(Executor): @requests(on='/index') def index(self, docs, **kwargs): print(docs) # index docs here @requests(on=['/search', '/query']) def search(self, docs, **kwargs): print(docs) # perform search here @requests # default/fallback endpoint def foo(self, docs, **kwargs): print(docs) # process docs here f = Flow().add(uses=MyExecutor) # add your Executor to a Flow with f: f.post( on='/index', inputs=Document(text='I am here!') ) # send doc to `index` method f.post( on='/search', inputs=Document(text='Who is there?') ) # send doc to `search` method f.post( on='/query', inputs=Document(text='Who is there?') ) # send doc to `search` method f.post(on='/bar', inputs=Document(text='Who is there?')) # send doc to `foo` method
- Parameters:
func (
Optional
[Callable
[[DocumentArray
,Dict
,DocumentArray
,List
[DocumentArray
],List
[DocumentArray
]],Union
[DocumentArray
,Dict
,None
]]]) – the method to decorateon (
Union
[str
,Sequence
[str
],None
]) – the endpoint string, by convention starts with /
- Returns:
decorated function