Create a Flow#
Creating a Flow, on its face, means instantiating a Python object. More importantly, however, creating and configuring a Flow means defining your search microservice architecture.
The most trivial Flow
is the empty Flow
as shown below:
from jina import Flow
f = Flow() # Create the empty Flow
with f: # Using it as a Context Manager will start the Flow
f.post(on='/search') # This sends a request to the /search endpoint of the Flow
flow.yml
:
jtype: Flow
from jina import Flow
f = Flow.load_config('flow.yml') # Load the Flow definition from Yaml file
with f: # Using it as a Context Manager will start the Flow
f.post(on='/search') # This sends a request to the /search endpoint of the Flow
Start and stop a Flow#
Creating a Flow means defining your microservice architecture, and starting a Flow means launching it. When a Flow starts, all its added Executors will start as well, making it possible to reach the service through its API.
Jina Flow
s are context managers and can be started and stopped using Pythons with
notation:
from jina import Flow
f = Flow()
with f:
pass
The statement with f
starts the Flow, and exiting the indented with
block closes the Flow.
In most scenarios, a Flow should remain reachable for prolonged periods of time. This can be achieved by blocking the execution:
from jina import Flow
f = Flow()
with f:
f.block()
The .block()
method blocks the execution of the current thread or process, which enables external clients to access the Flow.
In this case, the Flow can be stopped by interrupting the thread or process.
Alternatively, a stop event can be passed to .block()
. This is a multiprocessing or threading event that stops the Flow
once the event is set.
from jina import Flow
import threading, multiprocessing
from typing import Optional, Union
def start_flow(stop_event: Optional[Union[threading.Event, multiprocessing.Event]]):
"""start a blocking Flow."""
with Flow() as f:
f.block(stop_event=stop_event)
e = threading.Event() # create new Event
t = threading.Thread(name='Blocked-Flow', target=start_flow, args=(e,))
t.start() # start Flow in new Thread
e.set() # set event and stop (unblock) the Flow
Add Executors#
A Flow
orchestrates its Executors as a graph and will send requests to all Executors in the desired order. Executors can be added with the .add()
method of the Flow
or be listed in the yaml configuration of a Flow. When you start a Flow
, it will check the configured Executors and starts instances of these Executors accordingly. When adding Executors you have to define its type with the uses
keyword. Executors can be used from various sources like code, docker images and the Hub:
from docarray import Document, DocumentArray
from jina import Executor, Flow, requests
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='foo was here'))
class BarExecutor(Executor):
@requests
def bar(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='bar was here'))
f = (
Flow()
.add(uses=FooExecutor, name='fooExecutor')
.add(uses=BarExecutor, name='barExecutor')
) # Create the empty Flow
with f: # Using it as a Context Manager will start the Flow
response = f.post(
on='/search'
) # This sends a request to the /search endpoint of the Flow
print(response.texts)
flow.yml
:
jtype: Flow
executors:
- name: myexec1
uses: FooExecutor
py_modules: exec.py
- name: myexec2
uses: BarExecutor
py_modules: exec.py
exec.py
from docarray import Document, DocumentArray
from jina import Executor, requests
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='foo was here'))
class BarExecutor(Executor):
@requests
def bar(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='bar was here'))
from jina import Flow
f = Flow.load_config('flow.yml')
with f:
response = f.post(
on='/search'
) # This sends a request to the /search endpoint of the Flow
print(response.texts)
The response of the Flow
defined above is ['foo was here', 'bar was here']
, because the request was first sent to FooExecutor and then to BarExecutor.
Executor discovery#
As explained above, the type of Executor is defined by providing the uses
keyword. The source of an Executor can be code, docker images or Hub images.
class ExecutorClass(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='foo was here'))
f = (
Flow()
.add(uses=ExecutorClass, name='executor1')
.add(uses='jinahub://TransformerTorchEncoder/', name='executor2')
.add(uses='jinahub+docker://TransformerTorchEncoder', name='executor3')
.add(uses='jinahub+sandbox://TransformerTorchEncoder', name='executor4')
.add(uses='docker://sentence-encoder', name='executor5')
.add(uses='executor-config.yml', name='executor6')
)
executor1
will useExecutorClass
from code, and will be created as a separate process.executor2
will download the Executor class from Hub, and will be created as a separate process.executor3
will use an Executor docker image coming from the Hub, and will be created as a docker container of this image.executor4
will use a Sandbox Executor run by Hubble, in the cloud.executor5
will use a Docker image tagged assentence-encoder
, and will be created as a docker container of this image.executor6
will use an Executor configuration file defining the Executor YAML interface, and will be created as a separate process.
More complex Executors typically are used from Docker images or will be structured into separate Python modules.
Executor from configuration#
You can use Executors from code, being defined outside your current PATH
environment variable. To do this you need to define your Executor configuration in a separate configuration yaml, which will be used in the Flow
:
.
βββ app
β βββ βΆ main.py
βββ executor
βββ config.yml
βββ my_executor.py
executor/my_executor.py
:
from docarray import DocumentArray
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
pass
executor/config.yml
:
jtype: MyExecutor
metas:
py_modules:
- executor.py
Now, in app/main.py
, to correctly load the Executor, you can specify the directory of the Executor in Python or in a Flow
yaml:
from docarray import Document
from jina import Flow
f = Flow(extra_search_paths=['../executor']).add(uses='config.yml')
with f:
r = f.post('/', inputs=Document())
flow.yml
:
jtype: Flow
executors:
- name: executor
uses: ../executor/config.yml
main.py
:
from docarray import Document
from jina import Flow
f = Flow.load_config('../flow/flow.yml')
with f:
r = f.post('/', inputs=Document())
Override Executor configuration#
You can override the various configuration options available to Executors when adding them into a Flow
.
Override metas
configuration#
To override the metas
configuration of an executor, use uses_metas
:
from jina import Executor, requests, Flow
class MyExecutor(Executor):
@requests
def foo(self, docs, **kwargs):
print(self.metas.workspace)
flow = Flow().add(
uses=MyExecutor,
uses_metas={'workspace': 'different_workspace'},
)
with flow as f:
f.post('/')
executor0@219291[L]:ready and listening
gateway@219291[L]:ready and listening
Flow@219291[I]:π Flow is ready to use!
π Protocol: GRPC
π Local access: 0.0.0.0:58827
π Private network: 192.168.1.101:58827
different_workspace
Override with
configuration#
To override the with
configuration of an executor, use uses_with
. The with
configuration refers to user-defined
constructor kwargs.
from jina import Executor, requests, Flow
class MyExecutor(Executor):
def __init__(self, param1=1, param2=2, param3=3, *args, **kwargs):
super().__init__(*args, **kwargs)
self.param1 = param1
self.param2 = param2
self.param3 = param3
@requests
def foo(self, docs, **kwargs):
print('param1:', self.param1)
print('param2:', self.param2)
print('param3:', self.param3)
flow = Flow().add(uses=MyExecutor, uses_with={'param1': 10, 'param3': 30})
with flow as f:
f.post('/')
executor0@219662[L]:ready and listening
gateway@219662[L]:ready and listening
Flow@219662[I]:π Flow is ready to use!
π Protocol: GRPC
π Local access: 0.0.0.0:32825
π Private network: 192.168.1.101:32825
π Public address: 197.28.82.165:32825
param1: 10
param2: 2
param3: 30
Override requests
configuration#
You can override the requests
configuration of an executor and bind methods to endpoints that you provide. In the following codes, we replace the endpoint /foo
binded to the foo()
function with /non_foo
and add a new endpoint /bar
for binding bar()
. Note the all_req()
function is binded to all the endpoints except those explicitly binded to other functions, i.e. /non_foo
and /bar
.
from jina import Executor, requests, Flow
class MyExecutor(Executor):
@requests
def all_req(self, parameters, **kwargs):
print(f'all req {parameters.get("recipient")}')
@requests(on='/foo')
def foo(self, parameters, **kwargs):
print(f'foo {parameters.get("recipient")}')
def bar(self, parameters, **kwargs):
print(f'bar {parameters.get("recipient")}')
flow = Flow().add(
uses=MyExecutor,
uses_requests={
'/bar': 'bar',
'/non_foo': 'foo',
},
)
with flow as f:
f.post('/bar', parameters={'recipient': 'bar()'})
f.post('/non_foo', parameters={'recipient': 'foo()'})
f.post('/foo', parameters={'recipient': 'all_req()'})
executor0@221058[L]:ready and listening
gateway@221058[L]:ready and listening
Flow@221058[I]:π Flow is ready to use!
π Protocol: GRPC
π Local access: 0.0.0.0:36507
π Private network: 192.168.1.101:36507
π Public address: 197.28.82.165:36507
bar
foo
Convert array types between Executors#
Different Executors in a Flow may depend on slightly different types
for array-like data such as doc.tensor
and doc.embedding
,
for example because they were written using different machine learning frameworks.
As the builder of a Flow you donβt always have control over this, for example when using Executors from the Jina Hub.
In order to facilitate the integration between different Executors, the Flow allows you to convert tensor
and embedding
by using the f.add(..., output_array_type=..)
:
from jina import Flow
f = Flow().add(uses=MyExecutor, output_array_type='numpy').add(uses=NeedsNumpyExecutor)
This converts the .tensor
and .embedding
fields of all output Documents of MyExecutor
to numpy.ndarray
, making the data
usable by NeedsNumpyExecutor
. This works regardless of whether MyExecutor populates these fields with arrays/tensors from
PyTorch, TensorFlow, or any other popular ML framework.
Output types
output_array_type=
supports more types than 'numpy'
. For a full specification, and further details, take a look at the
documentation about protobuf serialization.
External executors#
Usually a Flow
will manage all of its Executors.
In some cases it is desirable though to use externally managed Executors. These are named external Executors
. This is especially useful to share expensive Executors between Flows. Often these Executors are stateless, GPU based Encoders.
Those Executors are marked with the external
keyword when added to a Flow
:
from jina import Flow
Flow().add(host='123.45.67.89', port=12345, external=True)
This is adding an external Executor to the Flow. The Flow will not start or stop this Executor and assumes that is externally managed and available at 123.45.67.89:12345
Complex Flow topologies#
Flows are not restricted to sequential execution. Internally they are modelled as graphs and as such can represent any complex, non-cyclic topology.
A typical use case for such a Flow is a topology with a common pre-processing part, but different indexers separating embeddings and data.
To define a custom Flow
topology you can use the needs
keyword when adding an Executor. By default, a Flow
assumes that every Executor needs the previously added Executor.
from docarray import Document, DocumentArray
from jina import Executor, Flow, requests
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'foo was here and got {len(docs)} document'))
class BarExecutor(Executor):
@requests
def bar(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'bar was here and got {len(docs)} document'))
class BazExecutor(Executor):
@requests
def baz(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'baz was here and got {len(docs)} document'))
f = (
Flow()
.add(uses=FooExecutor, name='fooExecutor')
.add(uses=BarExecutor, name='barExecutor', needs='fooExecutor')
.add(uses=BazExecutor, name='bazExecutor', needs='fooExecutor')
.add(needs=['barExecutor', 'bazExecutor'])
)
with f: # Using it as a Context Manager will start the Flow
response = f.post(
on='/search'
) # This sends a request to the /search endpoint of the Flow
print(response.texts)
This will get you the following output:
['foo was here and got 0 document', 'bar was here and got 1 document', 'baz was here and got 1 document']
So both BarExecutor
and BazExecutor
only received a single Document
from FooExecutor
as they are run in parallel. The last Executor executor3
will receive both DocumentArrays and merges them automatically.
The automated merging can be disabled by setting disable_reduce=True
. This can be useful when you need to provide your custom merge logic in a separate Executor. In this case the last .add()
call would like .add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, disable_reduce=True)
. This feature requires Jina >= 3.0.2.
Add filter conditions to Executors#
Starting from Jina 3.2
, you can filter the input to each
Executor.
To define a filter condition, you can use DocArrays rich query language. You can set a filter for each individual Executor, and every Document that does not satisfy the filter condition will be removed before reaching that Executor.
To add a filter condition to an Executor, you pass it to the input_condition
parameter of flow.add()
:
from docarray import DocumentArray, Document
from jina import Flow
f = Flow().add().add(input_condition={'tags__key': {'$eq': 5}}) # Create the empty Flow, add condition
with f: # Using it as a Context Manager will start the Flow
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)
print(
ret[:, 'tags']
) # only the Document fullfilling the condition is processed and therefore returned.
[{'key': 5.0}]
flow.yml
:
jtype: Flow
executors:
- name: executor
input_condition:
tags__key:
$eq: 5
from docarray import DocumentArray, Document
from jina import Flow
f = Flow.load_config('flow.yml') # Load the Flow definition from Yaml file
with f: # Using it as a Context Manager will start the Flow
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)
print(
ret[:, 'tags']
) # only the Document fullfilling the condition is processed and therefore returned.
[{'key': 5.0}]
Note that whenever a Document does not satisfy the input_condition
of a filter, the filter removes it for the entire branch of the Flow.
This means that every Executor that is located behind a filter is affected by this, not just the specific Executor that defines the condition.
Like with a real-life filter, once something does not pass through it, it will not re-appear behind the filter.
Naturally, parallel branches in a Flow do not affect each other. So if a Document gets filtered out in only one branch, it can still be used in the other branch, and also after the branches are re-joined together:
from docarray import DocumentArray, Document
from jina import Flow
f = (
Flow()
.add(name='first')
.add(input_condition={'tags__key': {'$eq': 5}}, needs='first', name='exec1')
.add(input_condition={'tags__key': {'$eq': 4}}, needs='first', name='exec2')
.needs_all(name='join')
) # Create Flow with parallel Executors
# exec1
# / \
# Flow topology: Gateway --> first join --> Gateway
# \ /
# exec2
with f:
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)
print(ret[:, 'tags']) # Each Document satisfies one parallel branch/filter
[{'key': 5.0}, {'key': 4.0}]
from docarray import DocumentArray, Document
from jina import Flow
f = (
Flow()
.add(name='first')
.add(input_condition={'tags__key': {'$eq': 5}}, name='exec1', needs='first')
.add(input_condition={'tags__key': {'$eq': 4}}, needs='exec1', name='exec2)
) # Create Flow with sequential Executors
# Flow topology: Gateway --> first --> exec1 --> exec2 --> Gateway
with f:
ret = f.post(
on='/search',
inputs=DocumentArray([Document(tags={'key': 5}), Document(tags={'key': 4})]),
)
print(ret[:, 'tags']) # No Document satisfies both sequential filters
[]
This feature is useful to prevent some specialized Executors from processing certain Documents. It can also be used to build switch-like nodes, where some Documents pass through one parallel branch of the Flow, while other Documents pass through a different branch.
Also note that whenever a Document does not satisfy the condition of an Executor, it will not even be sent to that Executor. Instead, only a lightweight Request without any payload will be transferred. This means that you can not only use this feature to build complex logic, but also to minimize your networking overhead.
See Also
For a hands-on example on how to leverage these filter conditions, see this how-to.
Replicate Executors#
Replication can be used to create multiple copies of the same Executor. Each request in the Flow is then passed to only one replica (instance) of your Executor. This can be useful for a couple of challenges like performance and availability:
If you have slow Executors (like some Encoders) you may want to scale up the number of instances of this particular Executor so that you can process multiple requests in parallel
Executors might need to be taken offline from time to time (updates, failures, etc.), but you may want your Flow to be able to process requests without downtimes. In this case Replicas can be used as well so that any Replica of an Executor can be taken offline as long as there is still one running Replica online. Using this technique it is possible to create a High availability setup for your Flow.
from jina import Flow
f = Flow().add(name='slow_encoder', replicas=3).add(name='fast_indexer')
The above Flow will create a topology with three Replicas of Executor slow_encoder
. The Flow
will send every
request to exactly one of the three instances. Then the replica will send its result to fast_indexer
.
Partition data by using Shards#
Sharding can be used to partition data (like an Index) into several parts. This enables the distribution of data across multiple machines. This is helpful in two situations:
When the full data does not fit on one machine
When the latency of a single request becomes too large.
Then splitting the load across two or more machines yields better results.
For Shards, you can define which shard (instance) will receive the request from its predecessor. This behaviour is called polling
. ANY
means only one shard will receive a request and ALL
means that all Shards will receive a request.
Polling can be configured per endpoint (like /index
) and Executor.
By default the following polling
is applied:
ANY
for endpoints at/index
ALL
for endpoints at/search
ANY
for all other endpoints
When you shard your index, the request handling usually differs between index and search requests:
Index (and update, delete) will just be handled by a single shard =>
polling='any'
Search requests are handled by all Shards =>
polling='all'
For indexing, you only want a single shard to receive a request, because this is sufficient to add it to the index. For searching, you probably need to send the search request to all Shards, because the requested data could be on any shard.
from jina import Flow
flow = Flow().add(name='ExecutorWithShards', shards=3, polling={'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'})
The example above will result in a Flow having the Executor ExecutorWithShards
with the following polling options configured
/index
has pollingANY
(the default value is not changed here)/search
has pollingANY
as it is explicitly set (usually that should not be necessary)/custom
has pollingALL
all other endpoints will have polling
ANY
due to the usage of*
as a wildcard to catch all other cases
Visualize a Flow
#
Flow
has a built-in .plot()
function which can be used to visualize a Flow
:
from jina import Flow
f = Flow().add().add()
f.plot('flow.svg')
from jina import Flow
f = Flow().add(name='e1').add(needs='e1').add(needs='e1')
f.plot('flow-2.svg')