jina package#
Subpackages#
- jina.clients package
- jina.helloworld package
- jina.hubble package
- jina.jaml package
- jina.logging package
- jina.orchestrate package
- jina.parsers package
- jina.proto package
- Submodules
- jina.proto.jina_pb2 module
- jina.proto.jina_pb2_grpc module
JinaControlRequestRPCStub
JinaControlRequestRPCServicer
add_JinaControlRequestRPCServicer_to_server()
JinaControlRequestRPC
JinaDataRequestRPCStub
JinaDataRequestRPCServicer
add_JinaDataRequestRPCServicer_to_server()
JinaDataRequestRPC
JinaSingleDataRequestRPCStub
JinaSingleDataRequestRPCServicer
add_JinaSingleDataRequestRPCServicer_to_server()
JinaSingleDataRequestRPC
JinaRPCStub
JinaRPCServicer
add_JinaRPCServicer_to_server()
JinaRPC
- jina.proto.serializer module
- Module contents
- Submodules
- jina.schemas package
- jina.serve package
- jina.types package
Submodules#
- jina.checker module
- jina.enums module
- jina.excepts module
BaseJinaException
ExecutorFailToLoad
RuntimeFailToStart
ScalingFails
RuntimeTerminated
FlowTopologyError
FlowMissingDeploymentError
FlowBuildLevelError
BadConfigSource
BadClient
BadClientCallback
BadClientInput
BadRequestType
BadClientResponse
BadImageNameError
BadYAMLVersion
DaemonConnectivityError
DaemonWorkspaceCreationFailed
DaemonPodCreationFailed
NotSupportedError
RuntimeRunForeverEarlyError
DockerVersionError
DaemonInvalidDockerfile
NoContainerizedError
- jina.helper module
batch_iterator()
parse_arg()
random_port()
random_identity()
random_uuid()
expand_env_var()
colored()
ArgNamespace
is_valid_local_config_source()
cached_property
typename()
get_public_ip()
get_internal_ip()
convert_tuple_to_list()
run_async()
deprecated_alias()
countdown()
CatchAllCleanupContextManager
download_mermaid_url()
get_readable_size()
get_or_reuse_loop()
get_rich_console()
docarray_graphql_compatible()
- jina.importer module
Module contents#
Top-level module of Jina.
The primary function of this module is to import all of the public Jina interfaces into a single place. The interfaces themselves are located in sub-modules, as described below.
- class jina.AsyncFlow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', https: Optional[bool] = False, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, **kwargs)[source]#
- class jina.AsyncFlow(*, connection_list: Optional[str] = None, cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', description: Optional[str] = None, disable_reduce: Optional[bool] = False, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after_address: Optional[str] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.AsyncFlow(*, env: Optional[dict] = None, expose_graphql_endpoint: Optional[bool] = False, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, polling: Optional[str] = 'ANY', quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, timeout_ctrl: Optional[int] = 60, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
AsyncPostMixin
,Flow
AsyncFlow
is the asynchronous version of theFlow
. They share the same interface, except inAsyncFlow
train()
,index()
,search()
methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed. To actually run a coroutine, user need to put them in an eventloop, e.g. viaasyncio.run()
,asyncio.create_task()
.AsyncFlow
can be very useful in the integration settings, where Jina/Jina Flow is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control theasyncio.eventloop
. On contrary,Flow
is controlling and wrapping the eventloop internally, making the Flow looks synchronous from outside.In particular,
AsyncFlow
makes Jina usage in Jupyter Notebook more natural and reliable. For example, the following code will use the eventloop that already spawned in Jupyter/ipython to run Jina Flow (instead of creating a new one).from jina import AsyncFlow from jina.types.document.generators import from_ndarray import numpy as np with AsyncFlow().add() as f: await f.index(from_ndarray(np.random.random([5, 4])), on_done=print)
Notice that the above code will NOT work in standard Python REPL, as only Jupyter/ipython implements “autoawait”.
See also
Asynchronous in REPL: Autoawait
https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
Another example is when using Jina as an integration. Say you have another IO-bounded job
heavylifting()
, you can use this feature to schedule Jinaindex()
andheavylifting()
concurrently.One can think of
Flow
as Jina-managed eventloop, whereasAsyncFlow
is self-managed eventloop.
- jina.Client(args=None, **kwargs)[source]#
Jina Python client.
- Parameters
args (
Optional
[ForwardRef
]) – Namespace args.kwargs – Additional arguments.
- Return type
Union
[ForwardRef
,ForwardRef
,ForwardRef
,ForwardRef
,ForwardRef
,ForwardRef
]- Returns
An instance of
GRPCClient
orWebSocketClient
.
- class jina.Document[source]#
- class jina.Document(_obj: Optional[Document] = None, copy: bool = False)
- class jina.Document(_obj: Optional[Dict], copy: bool = False, field_resolver: Optional[Dict[str, str]] = None, unknown_fields_handler: str = 'catch')
- class jina.Document(blob: Optional[bytes] = None, **kwargs)
- class jina.Document(tensor: Optional[ArrayType] = None, **kwargs)
- class jina.Document(text: Optional[str] = None, **kwargs)
- class jina.Document(uri: Optional[str] = None, **kwargs)
- class jina.Document(parent_id: Optional[str] = None, granularity: Optional[int] = None, adjacency: Optional[int] = None, blob: Optional[bytes] = None, tensor: Optional[ArrayType] = None, mime_type: Optional[str] = None, text: Optional[str] = None, content: Optional[DocumentContentType] = None, weight: Optional[float] = None, uri: Optional[str] = None, tags: Optional[Dict[str, StructValueType]] = None, offset: Optional[float] = None, location: Optional[List[float]] = None, embedding: Optional[ArrayType] = None, modality: Optional[str] = None, evaluations: Optional[Dict[str, Dict[str, StructValueType]]] = None, scores: Optional[Dict[str, Dict[str, StructValueType]]] = None, chunks: Optional[Sequence[Document]] = None, matches: Optional[Sequence[Document]] = None)
Bases:
AllMixins
,BaseDCType
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, copy: bool = False)[source]#
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'sqlite', config: Optional[Union[SqliteConfig, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'weaviate', config: Optional[Union[WeaviateConfig, Dict]] = None)
- class jina.DocumentArray(_docs: Optional[DocumentArraySourceType] = None, storage: str = 'annlite', config: Optional[Union[AnnliteConfig, Dict]] = None)
Bases:
AllMixins
,BaseDocumentArray
- jina.Executor#
alias of
BaseExecutor
- class jina.Flow(*, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', https: Optional[bool] = False, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, return_responses: Optional[bool] = False, **kwargs)[source]#
- class jina.Flow(*, connection_list: Optional[str] = None, cors: Optional[bool] = False, default_swagger_ui: Optional[bool] = False, deployments_addresses: Optional[str] = '{}', description: Optional[str] = None, disable_reduce: Optional[bool] = False, env: Optional[dict] = None, expose_endpoints: Optional[str] = None, expose_graphql_endpoint: Optional[bool] = False, graph_conditions: Optional[str] = '{}', graph_description: Optional[str] = '{}', host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, name: Optional[str] = 'gateway', native: Optional[bool] = False, no_crud_endpoints: Optional[bool] = False, no_debug_endpoints: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, prefetch: Optional[int] = 0, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCGatewayRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, title: Optional[str] = None, uses: Optional[Union[str, Type[BaseExecutor], dict]] = 'BaseExecutor', uses_after_address: Optional[str] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, workspace: Optional[str] = None, **kwargs)
- class jina.Flow(*, env: Optional[dict] = None, expose_graphql_endpoint: Optional[bool] = False, inspect: Optional[str] = 'COLLECT', log_config: Optional[str] = None, name: Optional[str] = None, polling: Optional[str] = 'ANY', quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, timeout_ctrl: Optional[int] = 60, uses: Optional[str] = None, workspace: Optional[str] = None, **kwargs)
Bases:
PostMixin
,JAMLCompatible
,ExitStack
Flow is how Jina streamlines and distributes Executors.
- property last_deployment#
Last deployment
- needs(needs, name='joiner', *args, **kwargs)[source]#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type
- Returns
the modified Flow
- needs_all(name='joiner', *args, **kwargs)[source]#
Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
- Parameters
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
- Return type
- Returns
the modified Flow
- add(*, connection_list: Optional[str] = None, disable_reduce: Optional[bool] = False, docker_kwargs: Optional[dict] = None, entrypoint: Optional[str] = None, env: Optional[dict] = None, external: Optional[bool] = False, force_update: Optional[bool] = False, gpus: Optional[str] = None, host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', input_condition: Optional[dict] = None, install_requirements: Optional[bool] = False, log_config: Optional[str] = None, name: Optional[str] = None, native: Optional[bool] = False, output_array_type: Optional[str] = None, polling: Optional[str] = 'ANY', port: Optional[int] = None, port_jinad: Optional[int] = 8000, pull_latest: Optional[bool] = False, py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, quiet_remote_logs: Optional[bool] = False, replicas: Optional[int] = 1, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'WorkerRuntime', shards: Optional[int] = 1, timeout_ctrl: Optional[int] = 60, timeout_ready: Optional[int] = 600000, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_after_address: Optional[str] = None, uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_before_address: Optional[str] = None, uses_metas: Optional[dict] = None, uses_requests: Optional[dict] = None, uses_with: Optional[dict] = None, volumes: Optional[List[str]] = None, workspace: Optional[str] = None, **kwargs) Union['Flow', 'AsyncFlow'] [source]#
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
- Parameters
needs (
Union
[str
,Tuple
[str
],List
[str
],None
]) – the name of the Deployment(s) that this Deployment receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.deployment_role (
DeploymentRoleType
) – the role of the Deployment, used for visualization and route planningcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationkwargs – other keyword-value arguments that the Deployment CLI supports
- Return type
- Returns
a (new) Flow object with modification
- inspect(name='inspect', *args, **kwargs)[source]#
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow | -- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
- Parameters
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
- Return type
- Returns
the new instance of the Flow
- gather_inspect(name='gather_inspect', include_last_deployment=True, *args, **kwargs)[source]#
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, thengather_inspect()
is auto called beforebuild()
. So in general you don’t need to manually callgather_inspect()
.- Parameters
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
- Return type
- Returns
the modified Flow or the copy of it
See also
- build(copy_flow=False)[source]#
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using
start()
,build()
will be invoked.- Parameters
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification- Return type
- Returns
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow() with f: f.index() with f.build(copy_flow=True) as fl: fl.search()
- start()[source]#
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way fromjina.orchestrate.pods.Pod
- Returns
this instance
- property num_deployments: int#
Get the number of Deployments in this Flow
- Return type
int
- property num_pods: int#
Get the number of pods (shards count) in this Flow
- Return type
int
- property client: BaseClient#
Return a
BaseClient
object attach to this Flow.- Return type
- plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]#
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
- Parameters
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
- Return type
- Returns
the Flow
- property port: int#
Return the exposed port of the gateway .. # noqa: DAR201
- Return type
int
- property host: str#
Return the local address of the gateway .. # noqa: DAR201
- Return type
str
- property address_private: str#
Return the private IP address of the gateway for connecting from other machine in the same network
- Return type
str
- property address_public: str#
Return the public IP address of the gateway for connecting from other machine in the public network
- Return type
str
- block(stop_event=None)[source]#
Block the Flow until stop_event is set or user hits KeyboardInterrupt
- Parameters
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
- property protocol: GatewayProtocolType#
Return the protocol of this Flow
- Return type
- Returns
the protocol of this Flow
- property workspace: str#
Return the workspace path of the flow.
- Return type
str
- property workspace_id: Dict[str, str]#
Get all Deployments’
workspace_id
values in a dict- Return type
Dict
[str
,str
]
- property env: Optional[Dict]#
Get all envs to be set in the Flow
- Return type
Optional
[Dict
]- Returns
envs as dict
- expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]#
- expose_endpoint(exec_endpoint: str, *, path: Optional[str] = None, status_code: int = 200, tags: Optional[List[str]] = None, summary: Optional[str] = None, description: Optional[str] = None, response_description: str = 'Successful Response', deprecated: Optional[bool] = None, methods: Optional[List[str]] = None, operation_id: Optional[str] = None, response_model_by_alias: bool = True, response_model_exclude_unset: bool = False, response_model_exclude_defaults: bool = False, response_model_exclude_none: bool = False, include_in_schema: bool = True, name: Optional[str] = None)
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
- Parameters
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
- join(needs, name='joiner', *args, **kwargs)#
Add a blocker to the Flow, wait until all pods defined in needs completed.
- Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
- Return type
- Returns
the modified Flow
- rolling_update(deployment_name, uses_with=None)[source]#
Reload all replicas of a deployment sequentially
- Parameters
deployment_name (
str
) – deployment to updateuses_with (
Optional
[Dict
]) – a Dictionary of arguments to restart the executor with
- to_k8s_yaml(output_base_path, k8s_namespace=None, k8s_connection_pool=True)[source]#
Converts the Flow into a set of yaml deployments to deploy in Kubernetes :type output_base_path:
str
:param output_base_path: The base path where to dump all the yaml files :type k8s_namespace:Optional
[str
] :param k8s_namespace: The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used. :type k8s_connection_pool:bool
:param k8s_connection_pool: Boolean indicating wether the kubernetes connection pool should be used inside the Executor Runtimes.
- to_docker_compose_yaml(output_path=None, network_name=None)[source]#
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment name
- scale(deployment_name, replicas)[source]#
Scale the amount of replicas of a given Executor.
- Parameters
deployment_name (
str
) – deployment to updatereplicas (
int
) – The number of replicas to scale to
- property client_args: Namespace#
Get Client settings.
# noqa: DAR201
- Return type
Namespace
- property gateway_args: Namespace#
Get Gateway settings.
# noqa: DAR201
- Return type
Namespace
- jina.requests(func=None, *, on=None)[source]#
@requests defines when a function will be invoked. It has a keyword on= to define the endpoint.
A class method decorated with plan @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.
- Parameters
func (
Callable
[[ForwardRef
,Dict
,ForwardRef
,List
[ForwardRef
],List
[ForwardRef
]],Union
[ForwardRef
,Dict
,None
]]) – the method to decorateon (
Union
[str
,Sequence
[str
],None
]) – the endpoint string, by convention starts with /
- Returns
decorated function