Executors inside a Flow#
Executors are a way to group your DocumentArray
functions and processing logic into a class that can share configuration and state in a modular way.
The main advantage of Executor
s is using them in a Flow
, which can serve, scale, and deploy them with ease.
YAML and Python API#
An Executor can be loaded from a YAML file or via Python API.
executor.py
This is a basic Executor, defined in its own file. Notice that there are no request bindings. If an Executor is used in a Flow, these bindings can be configured via the Python or YAML Flow syntax, so each Flow can use the Executor in its own way. However, the standard way of adding @requests
in the Executor itself would still work just fine.
from jina import Executor
from docarray import DocumentArray
class MyExecutor(Executor):
def __init__(self, parameter_1, parameter_2, **kwargs):
super().__init__(**kwargs)
print(f'parameter_1 = {parameter_1}')
print(f'parameter_2 = {parameter_2}')
def my_index(self, docs: DocumentArray, **kwargs):
print('in my_index, bound to /index')
def my_search(self, docs: DocumentArray, **kwargs):
print('in my_search, bound to /search')
def foo(self, docs: DocumentArray, **kwargs):
print('in foo, bound to /random')
This YAML configuration can also be referenced or directly used inside a Flow YAML. The YAML file has the following format:
This example shows how to start a Flow with an Executor via the Python API:
with Flow().add(
uses='MyExecutor',
uses_with={"parameter_1": "foo", "parameter_2": "bar"},
uses_metas={
"name": "MyExecutor",
"description": "MyExecutor does a thing to the stuff in your Documents",
"py_modules": ["executor.py"],
},
uses_requests={"/index": "my_index", "/search": "my_search", "/random": "foo"},
workspace="some_custom_path",
) as f:
...
Python API-specific options:
uses
can be a class or string that defines the Executor to load. You can also run Executors available on Jina Hub with a special syntax.
jtype: MyExecutor
uses_with:
parameter_1: foo
parameter_2: bar
uses_metas:
name: MyExecutor
description: "MyExecutor does a thing to the stuff in your Documents"
py_modules:
- executor.py
uses_requests:
/index: MyExecutor_index_method
/search: MyExecutor_search_method
/random: MyExecutor_other_method
workspace: some_custom_path
YAML-specific options:
jtype
is a string that defines the class name, interchangeable with bang mark!
;
Common arguments for both YAML and Python API:
uses_with
is a key-value map that defines the arguments of the Executor’__init__
method.uses_metas
is a key-value map that defines some internal attributes of the Executor. It contains the following fields:name
is a string that defines the name of the executor;description
is a string that defines the description of this executor. It will be used in automatic docs UI;py_modules
is a list of strings that defines the Python dependencies of the executor;
uses_requests
is a key-value map that defines the mapping from endpoint to class method. Useful if one needs to overwrite the default endpoint-to-method mapping defined in the Executor python implementation.workspace
is a string value that defines the workspace.
Passing and overriding arguments#
When using an Executor in a Flow, there are two ways of passing arguments.
from jina import Executor, Flow
class MyExecutor(Executor):
def __init__(self, foo, bar, **kwargs):
super().__init__(**kwargs)
print(f'foo = {foo}')
print(f'bar = {bar}')
f = Flow().add(uses=MyExecutor, uses_with={'foo': 'hello', 'bar': 1})
with f:
...
my-exec.yml
jtype: MyExecutor
uses_with:
foo: hello
bar: 1
my-flow.py
from jina import Executor, Flow
class MyExecutor(Executor):
def __init__(self, foo, bar, **kwargs):
super().__init__(**kwargs)
print(f'foo = {foo}')
print(f'bar = {bar}')
f = Flow().add(uses='my-exec.yml')
with f:
...
Hint
uses_with
in Python API has higher priority than predefined uses_with
config in YAML.
The same applies to uses_metas
and uses_requests
. You can define them statically inside the Executor definition YAML, or update their default values through uses_metas
and uses_requests
in Python API.
Example
from jina import Executor, requests, Flow
class MyExecutor(Executor):
def __init__(self, parameter_1, parameter_2, **kwargs):
super().__init__(**kwargs)
self.parameter_1 = parameter_1
self.parameter_2 = parameter_2
print(
f' \nparameter_1: {parameter_1}\nparameter_2: {parameter_2}\nmetas: {self.metas}\nrequests: {self.requests}'
)
@requests(on='/default')
def default_fn(self, **kwargs):
pass
@requests
def foo(self, **kwargs):
pass
@requests
def bar(self, **kwargs):
pass
exec_yaml = """
jtype: MyExecutor
uses_with:
parameter_1: static_parameter_1
parameter_2: static_parameter_2
uses_metas:
name: MyExecutor
description: "MyExecutor does a thing to the stuff in your Documents"
uses_requests:
/index: default_fn
/search: default_fn
"""
flow1 = Flow().add(uses=exec_yaml)
print(f'\nStarting Flow with default Executor parameters')
with flow1:
pass
flow2 = Flow().add(
uses=exec_yaml,
uses_with={
'parameter_1': 'overridden_parameter_1',
'parameter_2': 'overridden_parameter_2',
},
uses_metas={'name': 'Dynamic Name'},
workspace='workspace',
uses_requests={'/index': 'foo', '/search': 'bar'},
)
print(f'\nStarting Flow with overriden Executor parameters')
with flow2:
pass
[...]
parameter_1: static_parameter_1
parameter_2: static_parameter_2
metas: namespace(description='MyExecutor does a thing to the stuff in your Documents', name='MyExecutor', py_modules='', workspace='workspace')
requests: {'/default': <function MyExecutor.bar at 0x7fc3163f0710>, '/index': <function MyExecutor.default_fn at 0x7fc3163cc050>, '/search': <function MyExecutor.default_fn at 0x7fc3163cc050>}
[...]
parameter_1: overriden_parameter_1
parameter_2: overriden_parameter_2
metas: namespace(description='MyExecutor does a thing to the stuff in your Documents', name='Dynamic Name', py_modules='', workspace='overriden_worskpace')
requests: {'/index': <function MyExecutor.foo at 0x7fc3163ddb90>, '/search': <function MyExecutor.bar at 0x7fc3163f0710>}
[...]
Internal Executor attributes#
When implementing an Executor
, if your Executor overrides __init__
, it needs to carry **kwargs
in the signature and call super().__init__(**kwargs)
from jina import Executor
class MyExecutor(Executor):
def __init__(self, foo: str, bar: int, **kwargs):
super().__init__(**kwargs)
self.bar = bar
self.foo = foo
This is important because when an Executor
is instantiated in the context of a Flow, Jina is adding extra arguments.
Some of these arguments
can be used when developing the internal logic of the Executor.
These special
arguments are workspace
, requests
, metas
, runtime_args
.
workspace
#
Each Executor
has a special workspace that is reserved for that specific Executor instance.
The .workspace
property contains the path to this workspace.
This workspace
is based on the workspace passed when adding the Executor: flow.add(..., workspace='path/to/workspace/')
.
The final workspace
is generated by appending '/<executor_name>/<shard_id>/'
.
This can be provided to the Executor via the Python or YAML API.
Default workspace
If the user hasn’t provided a workspace, the Executor uses a default workspace, which is defined in the JINA_DEFAULT_WORKSPACE_BASE
environment variable.
Caution
After you install jina, the JINA_DEFAULT_WORKSPACE_BASE
environment variable will be set in your .bashrc
, .zshrc
, or
.fish
file.
To change the default Executor workspace on your system, you can change the value of this environment variable.
However, if you directly edit the corresponding command in your .bashrc
(or .zshrc
/.fish
) file, your changes will be reverted the next time
you install jina on your system.
Instead, you can add export JINA_DEFAULT_WORKSPACE_BASE=$YOUR_WOKSPACE
after the # JINA_CLI_END
comment.
requests
#
By default, an Executor
object contains .requests
as an attribute when loaded from the Flow
. This attribute is a Dict
describing the mapping between Executor methods and network endpoints: It holds endpoint strings as keys, and pointers to functions as values.
These can be provided to the Executor via the Python or YAML API.
metas
#
An Executor
object contains .metas
as an attribute when loaded from the Flow
. It is of SimpleNamespace
type and contains some key-value information.
The list of the metas
are:
name
: Name given to theExecutor
description
: Optional description of the Executorpy_modules
: List of Python modules needed to import the Executor
These can be provided to the Executor via the Python or YAML API.
runtime_args
#
By default, an Executor
object contains .runtime_args
as an attribute when loaded from the Flow
. It is of SimpleNamespace
type and contains information in key-value format.
As the name suggests, runtime_args
are dynamically determined during runtime, meaning that you don’t know the value before running the Executor
. These values are often related to the system/network environment around the Executor
, and less about the Executor
itself, like shard_id
and replicas
. They are usually set with the add()
method.
The list of the runtime_args
is:
name
: Name given to theExecutor
. This is dynamically adapted from thename
inmetas
and depends on some additional arguments likeshard_id
.replicas
: Number of replicas of the sameExecutor
deployed with theFlow
.shards
: Number of shards of the sameExecutor
deployed with theFlow
.shard_id
: Identifier of theshard
corresponding to the givenExecutor
instance.workspace
: Path to be used by theExecutor
. Note that the actual workspace directory used by the Executor is obtained by appending'/<executor_name>/<shard_id>/'
to this value.py_modules
: Path to the modules needed to import theExecutor
. This is another way to passpy-modules
to theExecutor
from theFlow
These can not be provided by the user through any API. They are generated by the Flow orchestration.
Passing and changing request parameters#
An important feature of Executor
, beyond the capacity of processing Documents, is the capacity to receive parameters
and to return additional results.
Results are returned inside the parameters
dictionary, behind the reserved __results__
key.
Note, however, that not all Executors or request methods populate this field with results. Some just modify the Document
s passed to them, without adding further information.
In this example, MyExec
receives the parameter {'top_k': 10}
from the client, which can then be used internally.
It also exposes a status
endpoint returning internal information in the form of a dict
.
from jina import requests, Executor
class MyExec(Executor):
def __init__(self, parameter=20, **kwargs):
super().__init__(**kwargs)
self.parameter = parameter
@requests(on='/status')
def status(self, **kwargs):
return {'internal_parameter': self.parameter}
@requests(on='/index')
def search(self, parameters, **kwargs):
print(f'Searching with top_k {parameters["top_k"]}')
pass
Example usage and output
f = Flow().add(uses=MyExec, name='my_executor')
def print_response_parameters(resp):
print(f' {resp.to_dict()["parameters"]}')
with f:
f.post(on='/index', parameters={'top_k': 10}, inputs=[])
f.post(on='/status', inputs=[], on_done=print_response_parameters)
[...]
Searching with top_k 10.0
{'__results__': {'my_executor/rep-0': {'internal_parameter': 20.0}}}
Exception handling#
Exceptions raised inside @requests
-decorated functions can simply be raised. The Flow will handle it.
from jina import Executor, requests, Flow
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
raise NotImplementedError('no time for it')
Example usage and output
from jina import Flow
from executor import MyExecutor
f = Flow().add(uses=MyExecutor)
def print_why(resp, exception):
print(resp.status.description)
with f:
f.post('', on_error=print_why)
[...]
executor0/rep-0@28271[E]:NotImplementedError('no time for it')
add "--quiet-error" to suppress the exception details
[...]
File "/home/joan/jina/jina/jina/serve/executors/decorators.py", line 115, in arg_wrapper
return fn(*args, **kwargs)
File "/home/joan/jina/jina/toy.py", line 8, in foo
raise NotImplementedError('no time for it')
NotImplementedError: no time for it
NotImplementedError('no time for it')
Graceful shutdown of an Executor#
You might need to execute some logic when your Executor’s destructor is called.
For example, you want to persist data to the disk (e.g. in-memory indexed data, fine-tuned model,…).
To do so, you can overwrite the close
method and add your logic.
Jina will make sure that the close
method is executed when the Executor
is terminated inside a Flow or when deployed in any cloud-native environment.
You can think of this as Jina
using the Executor
as a context manager, making sure that the close
method is always executed.
from jina import Executor, requests
class MyExec(Executor):
@requests
def foo(self, docs, **kwargs):
for doc in docs:
print(doc.text)
def close(self):
print("closing...")
Usage
from jina import DocumentArray, Document, Flow
with MyExec() as executor:
executor.foo(DocumentArray([Document(text='hello world')]))
f = Flow().add(uses=MyExec)
with f:
f.post(inputs=DocumentArray([Document(text='hello world')]))
Using MyExec as a context manager
hello world
closing...
Using MyExec in the Flow
Flow@30104[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:54787
🔒 Private network: 192.168.1.203:54787
🌐 Public address: 212.231.186.65:54787
hello world
closing...
Process finished with exit code 0
Multiple DocumentArrays as input#
You have seen that Executor
methods can receive three types of parameters: docs
, parameters
and docs_matrix
.
docs_matrix
is a parameter that is only used in some special cases.
One case is when an Executor receives messages from more than one upstream Executor in the Flow.
Let’s see an example:
from docarray import Document, DocumentArray
from jina import Flow, Executor, requests
class Exec1(Executor):
@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'Exec1'
class Exec2(Executor):
@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'Exec2'
class MergeExec(Executor):
@requests
def foo(self, docs_matrix, **kwargs):
documents_to_return = DocumentArray()
for doc1, doc2 in zip(*docs_matrix):
print(
f'MergeExec processing pairs of Documents "{doc1.text}" and "{doc2.text}"'
)
documents_to_return.append(
Document(text=f'Document merging from "{doc1.text}" and "{doc2.text}"')
)
return documents_to_return
f = (
Flow()
.add(uses=Exec1, name='exec1')
.add(uses=Exec2, name='exec2')
.add(uses=MergeExec, needs=['exec1', 'exec2'], disable_reduce=True)
)
with f:
returned_docs = f.post(on='/', inputs=DocumentArray.empty(1))
print(f'Resulting documents {returned_docs[0].text}')
Output
Flow@1244[I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:54550
🔒 Private network: 192.168.1.187:54550
🌐 Public address: 212.231.186.65:54550
MergeExec processing pairs of Documents "Exec1" and "Exec2"
Resulting documents Document merging from "Exec1" and "Exec2"