Flow#
A Flow
orchestrates Executor
s into a processing pipeline to accomplish a task.
Documents “flow” through the pipeline and are processed by Executors.
You can think of Flow as an interface to configure and launch your microservice architecture, while the heavy lifting is done by the services themselves. In particular, each Flow also launches a Gateway service, which can expose all other services through an API that you define.
The most important methods of the Flow
object are the following:
Method |
Description |
---|---|
|
Adds an Executor to the Flow |
|
Starts the Flow. This will start all its Executors and check if they are ready to be used. |
|
Stops and closes the Flow. This will stop and shutdown all its Executors. |
|
Uses the Flow as a context manager. It will automatically start and stop your Flow. |
|
Visualizes the Flow. Helpful for building complex pipelines. |
Sends requests to the Flow API. |
|
|
Blocks execution until the program is terminated. This is useful to keep the Flow alive so it can be used from other places (clients, etc). |
|
Generates a Docker-Compose file listing all Executors as services. |
|
Generates Kubernetes configuration files in |
Check if the Flow is ready to process requests. Returns a boolean indicating the readiness. |
Why should you use a Flow?#
Once you’ve learned DocumentArray and Executor, you can split a big task into small independent modules and services. But you need to chain them together to bring real value and build and serve an application. Flows enable you to do exactly this.
Flows connect microservices (Executors) to build a service with proper client/server style interface over HTTP, gRPC, or WebSockets.
Flows let you scale these Executors independently to match your requirements.
Flows let you easily use other cloud-native orchestrators, such as Kubernetes, to manage your service.
Minimum working example#
from jina import Flow, Executor, requests, Document
class MyExecutor(Executor):
@requests(on='/bar')
def foo(self, docs, **kwargs):
print(docs)
f = Flow().add(name='myexec1', uses=MyExecutor)
with f:
f.post(on='/bar', inputs=Document(), on_done=print)
Server:
from jina import Flow, Executor, requests
class MyExecutor(Executor):
@requests(on='/bar')
def foo(self, docs, **kwargs):
print(docs)
f = Flow(port=12345).add(name='myexec1', uses=MyExecutor)
with f:
f.block()
Client:
from jina import Client, Document
c = Client(port=12345)
c.post(on='/bar', inputs=Document(), on_done=print)
my.yml
:
jtype: Flow
executors:
- name: myexec1
uses: FooExecutor
py_modules: exec.py
exec.py
:
from jina import Executor, requests, Document, DocumentArray
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text='foo was here'))
from jina import Flow, Document
f = Flow.load_config('my.yml')
with f:
try:
f.post(on='/bar', inputs=Document(), on_done=print)
except Exception as ex:
# handle exception
pass
Caution
The statement with f:
starts the Flow, and exiting the indented with block stops the Flow, including all Executors defined in it.
Exceptions raised inside the with f:
block will close the Flow context manager. If you don’t want this, use a try...except
block to surround the statements that could potentially raise an exception.