Executors in Flows#

You can chain Executors together into a Flow, which orchestrates Executors into a processing pipeline to accomplish a task. Documents “flow” through the pipeline and are processed by each Executor in turn.

When developing an Executor to put into a Flow, there are several practices you should follow:

Merging upstream DocumentArrays#

Often when you’re building a Flow, you want an Executor to receive DocumentArrays from multiple upstream Executors.


For this you can use the docs_matrix or docs_map parameters (part of Executor endpoints signature). These Flow-specific arguments that can be used alongside an Executor’s default arguments:

from typing import Dict, Union, List, Optional
from jina import Executor, requests, DocumentArray

class MergeExec(Executor):
    async def foo(
        docs: DocumentArray,
        parameters: Dict,
        docs_matrix: Optional[List[DocumentArray]],
        docs_map: Optional[Dict[str, DocumentArray]],
    ) -> Union[DocumentArray, Dict, None]:
  • Use docs_matrix to receive a List of all incoming DocumentArrays from upstream Executors:

    DocumentArray(...),  # from Executor1
    DocumentArray(...),  # from Executor2
    DocumentArray(...),  # from Executor3
  • Use docs_map to receive a Dict, where each item’s key is the name of an upstream Executor and the value is the DocumentArray coming from that Executor:

    'Executor1': DocumentArray(...),
    'Executor2': DocumentArray(...),
    'Executor3': DocumentArray(...),

Reducing multiple DocumentArrays to one DocumentArray#

The no_reduce argument determines whether DocumentArrays are reduced into one when being received:

  • To reduce all incoming DocumentArrays into one single DocumentArray, do not set no_reduce or set it to False. The docs_map and docs_matrix will be None.

  • To receive a list all incoming DocumentArrays set no_reduce to True. The Executor will receive the DocumentArrays independently under docs_matrix and docs_map.

from jina import Flow, Executor, requests, Document, DocumentArray

class Exec1(Executor):
    def foo(self, docs, **kwargs):
        for doc in docs:
            doc.text = 'Exec1'

class Exec2(Executor):
    def foo(self, docs, **kwargs):
        for doc in docs:
            doc.text = 'Exec2'

class MergeExec(Executor):
    def foo(self, docs_matrix, **kwargs):
        documents_to_return = DocumentArray()
        for doc1, doc2 in zip(*docs_matrix):
                f'MergeExec processing pairs of Documents "{doc1.text}" and "{doc2.text}"'
                Document(text=f'Document merging from "{doc1.text}" and "{doc2.text}"')
        return documents_to_return

f = (
    .add(uses=Exec1, name='exec1')
    .add(uses=Exec2, name='exec2')
    .add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True)

with f:
    returned_docs = f.post(on='/', inputs=Document())

print(f'Resulting documents {returned_docs[0].text}')
MergeExec processing pairs of Documents "Exec1" and "Exec2"
Resulting documents Document merging from "Exec1" and "Exec2"


Both served and shared Executors can be used as part of a Flow, by adding them as an external Executor. If the Executor is only used inside other Flows, you should define a shared Executor to save the costs of running the Gateway in Kubernetes.