Basic#

Flow defines how your Executors are connected together and how your data flows through them.

Every Flow can be defined either purely in Python, or be loaded from a YAML file.

Best practice

For production use we recommend YAML files to configure your Flows. This is because YAML files are:

  • independent of Python source code

  • easy to edit, maintain and extend

  • human-readable

Create#

The most trivial Flow is the empty Flow and, like any other Flow, it can be instantiated purely in Python, or from a YAML file:

from jina import Flow

f = Flow()  # Create the empty Flow
with f:  # Using it as a Context Manager will start the Flow
    f.post(on='/search')  # This sends a request to the /search endpoint of the Flow

flow.yml:

jtype: Flow
from jina import Flow

f = Flow.load_config('flow.yml')  # Load the Flow definition from Yaml file

with f:  # Using it as a Context Manager will start the Flow
    f.post(on='/search')  # This sends a request to the /search endpoint of the Flow

Hint: Dump Flow configuration

In addition to loading a Flow from a YAML file, you can also save an existing Flow configuration to YAML. To do so, execute f.save_config('path/to/flow.yml').

Start and stop#

When a Flow starts, all its added Executors will start as well, making it possible to reach the service through its API.

Jina Flows are context managers and can be started and stopped using Pythons with notation:

from jina import Flow

f = Flow()

with f:
    pass

The statement with f: starts the Flow, and exiting the indented with block stops the Flow, including all Executors defined in it.

Start inside __main__#

If applicable, always start the Flow inside if __name__ == '__main__'. For example:

from jina import Flow, Executor, requests

class CustomExecutor(Executor):
    @requests
    async def foo(self, **kwargs):
        ...

f = Flow().add(uses=CustomExecutor)

if __name__ == '__main__':
    with f:
        ...
from jina import Flow, Executor, requests

class CustomExecutor(Executor):
    @requests
    def foo(self, **kwargs):
        ...

f = Flow().add(uses=CustomExecutor)
with f:
    ...

"""
# error
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:

    if _name_ == '_main_':
        freeze_support()
        ...

The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.

"""

Set multiprocessing spawn#

Some cornet cases require to force spawn start method for multiprocessing, e.g. if you encounter “Cannot re-initialize CUDA in forked subprocess”.

You may try JINA_MP_START_METHOD=spawn before starting the Python script to enable this.

JINA_MP_START_METHOD=spawn python app.py

Hint

There’s no need to set this for Windows, as it only supports spawn method for multiprocessing.

Serve forever#

In most scenarios, a Flow should remain reachable for prolonged periods of time. This can be achieved by blocking the execution:

from jina import Flow

f = Flow()
with f:
    f.block()

The .block() method blocks the execution of the current thread or process, which enables external clients to access the Flow.

In this case, the Flow can be stopped by interrupting the thread or process. Alternatively, a multiprocessing or threading Event object can be passed to .block(), which stops the Flow once set.

from jina import Flow
import threading


def start_flow(stop_event):
    """start a blocking Flow."""
    with Flow() as f:
        f.block(stop_event=stop_event)


e = threading.Event()  # create new Event

t = threading.Thread(name='Blocked-Flow', target=start_flow, args=(e,))
t.start()  # start Flow in new Thread

# do some stuff

e.set()  # set event and stop (unblock) the Flow

Visualize#

A Flow has a built-in .plot() function which can be used to visualize a Flow:

from jina import Flow

f = Flow().add().add()
f.plot('flow.svg')
../../../_images/flow.svg
from jina import Flow

f = Flow().add(name='e1').add(needs='e1').add(needs='e1')
f.plot('flow-2.svg')
../../../_images/flow-2.svg

One can also do it in the terminal via:

jina export flowchart flow.yml flow.svg 

Export#

A Flow YAML can be exported as a Docker Compose YAML or a Kubernetes YAML bundle.

Docker Compose#

from jina import Flow

f = Flow().add()
f.to_docker_compose_yaml()

One can also do it in the terminal via:

jina export docker-compose flow.yml docker-compose.yml 

This will generate a single docker-compose.yml file containing all the Executors of the Flow.

For an advance utilisation of Docker Compose with jina please refer to this How to

Kubernetes#

from jina import Flow

f = Flow().add()
f.to_kubernetes_yaml('flow_k8s_configuration')

One can also do it in the terminal via:

jina export kubernetes flow.yml ./my-k8s 

This will generate the necessary Kubernetes configuration files for all the Executors of the Flow. The generated folder can be used directly with kubectl to deploy the Flow to an existing Kubernetes cluster.

For an advance utilisation of Kubernetes with jina please refer to this How to

Tip

Based on your local Jina version, Jina Hub may rebuild the Docker image during the YAML generation process. If you do not wish to rebuild the image, set the environment variable JINA_HUB_NO_IMAGE_REBUILD.

See also

For more in-depth guides on Flow deployment, take a look at our how-tos for Docker compose and Kubernetes.