Send & Receive Data#

After a Client has connected to a Flow, it can send requests to the Flow using its post() method. This expects as inputs the Executor endpoint that you want to target, as well as a Document or Iterable of Documents:

from docarray import Document

d1 = Document(content='hello')
client = Client(...)'/endpoint', d1)
from docarray import Document

d1 = Document(content='hello')
d2 = Document(content='world')
client = Client(...)'/endpoint', [d1, d2])
from docarray import DocumentArray

da = DocumentArray.empty(10)
client = Client(...)'/endpoint', da)
from docarray import Document

def doc_gen():
    for j in range(10):
        yield Document(content=f'hello {j}')
client = Client(...)'/endpoint', doc_gen)
client = Client(...)'/endpoint')


Flow also provides a .post() method that follows the same interface as However, once your solution is deployed remotely, the Flow interface is not present anymore. Hence, is not recommended outside of testing or debugging use cases.

Send data in batches#

Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow. Those Documents are internally batched into a Request, providing a smaller memory footprint and faster response times thanks to callback functions.

The size of these batches can be controlled with the request_size keyword. The default request_size is 100 Documents. The optimal size will depend on your use case.

from jina import Flow, Client, Document, DocumentArray

with Flow() as f:
    client = Client(port=f.port)'/', DocumentArray(Document() for _ in range(100)), request_size=10)

Send data asynchronously#

There is an async version of the Python Client which works with post() and mutate().

While the standard Client is also asynchronous under the hood, its async version exposes this fact to the outside world, by allowing coroutines as input, and returning an asynchronous iterator. This means you can iterate over Responses one by one, as they come in.

import asyncio

from jina import Client, Flow, Document

async def async_inputs():
    for _ in range(10):
        yield Document()
        await asyncio.sleep(0.1)

async def run_client(port):
    client = Client(port=port, asyncio=True)
    async for resp in'/', async_inputs, request_size=1):

with Flow() as f:  # Using it as a Context Manager will start the Flow

Async send is useful when calling a Flow from an Executor, as described in async-executors.

from jina import Client, Executor, requests, DocumentArray

class DummyExecutor(Executor):
    c = Client(host='grpc://', asyncio=True)

    async def process(self, docs: DocumentArray, **kwargs):'/', docs)

Send data to specific Executors#

Usually a Flow will send each request to all Executors with matching endpoints as configured. But the Client also allows you to only target specific Executors in a Flow using the target_executor keyword. The request will then only be processed by the Executors which match the provided target_executor regex. Its usage is shown in the listing below.

from jina import Client, Executor, Flow, requests, Document, DocumentArray

class FooExecutor(Executor):
    async def foo(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'foo was here and got {len(docs)} document'))

class BarExecutor(Executor):
    async def bar(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'bar was here and got {len(docs)} document'))

f = (
    .add(uses=FooExecutor, name='fooExecutor')
    .add(uses=BarExecutor, name='barExecutor')

with f:  # Using it as a Context Manager will start the Flow
    client = Client(port=f.port)
    docs ='/', target_executor='bar*')

This will send the request to all Executors whose names start with ‘bar’, such as ‘barExecutor’. In the simplest case, you can specify a precise Executor name, and the request will be sent only to that single Executor.

Use Unary or Streaming gRPC#

The Flow with gRPC protocol implements the unary and the streaming RPC lifecycle for communicating with the clients. When sending more than one request using the batching or the iterator mechanism, the RPC lifecycle for the post() method can be controlled using the stream boolean method argument. By default the stream option is set to True which uses the streaming RPC to send the data to the Flow. If the stream option is set to False, the unary RPC is used to send the data to the Flow. Both RPC lifecycles are implemented to provide the flexibility for the clients.

There might be performance penalties when using the streaming RPC in the Python gRPC implementation.


This option is only valid for gRPC protocol.

Refer to the gRPC Performance Best Practices guide for more implementations details and considerations.

Configure gRPC Client options#

The Client supports the grpc_channel_options parameter which allows more customization of the gRPC channel construction. The grpc_channel_options parameter accepts a dictionary of gRPC configuration options which will be used to overwrite the default options. The default gRPC options are:

('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
('grpc.keepalive_time_ms', 9999),
# send keepalive ping every 9 second, default is 2 hours.
('grpc.keepalive_timeout_ms', 4999),
# keepalive ping time out after 4 seconds, default is 20 seconds
('grpc.keepalive_permit_without_calls', True),
# allow keepalive pings when there's no gRPC calls
('grpc.http1.max_pings_without_data', 0),
# allow unlimited amount of keepalive pings without data
('grpc.http1.min_time_between_pings_ms', 10000),
# allow grpc pings from client every 9 seconds
('grpc.http1.min_ping_interval_without_data_ms', 5000),
# allow grpc pings from client without data every 4 seconds

If the max_attempts is greater than 1 on the post() method, the grpc.service_config option will not be applied since the retry options will be configured internally.

Refer to the channel_arguments section for the full list of available gRPC options.


Refer to the Configure Executor gRPC options section for configuring the Executor gRPC options.


post() returns a DocumentArray containing all Documents flattened over all Requests. When setting return_responses=True, this behavior is changed to returning a list of Response objects.

If a callback function is provided, will return none.

from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    docs ='', inputs=Document(text='Hi there!'))
<DocumentArray (length=1) at 140619524357664>
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp ='', inputs=Document(text='Hi there!'), return_responses=True)
[< ('header', 'parameters', 'routes', 'data') at 140619524354592>]
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp =
        inputs=Document(text='Hi there!'),
        on_done=lambda resp: print(,
['Hi there!']

Callbacks vs returns#

Callback operates on every sub-request generated by request_size. The callback function consumes the response one by one. The old response is immediately free from the memory after the consumption.

When callback is not provided, the client accumulates all DocumentArray of all Requests before returning. This means you will not receive results until all Requests have been processed, which is slower and requires more memory.

Force the order of responses#

Note that the Flow processes Documents in an asynchronous and a distributed manner. The order of the Flow processing the requests may not be the same order as the Client sending them. Hence, the response order may also not be consistent as the sending order.

To force the order of the results to be deterministic and the same as when they are sent, passing results_in_order parameter to post().

import random
import time
from jina import Flow, Executor, requests, Client, DocumentArray, Document

class RandomSleepExecutor(Executor):
    def foo(self, *args, **kwargs):
        rand_sleep = random.uniform(0.1, 1.3)

f = Flow().add(uses=RandomSleepExecutor, replicas=3)
input_text = [f'ordinal-{i}' for i in range(180)]
input_da = DocumentArray([Document(text=t) for t in input_text])

with f:
    c = Client(port=f.port, protocol=f.protocol)
    output_da ='/', inputs=input_da, request_size=10, results_in_order=True)
    for input, output in zip(input_da, output_da):
        assert input.text == output.text