Send & Receive Data#

After a Client has connected to a Flow, it can send requests to the Flow using its post() method. This expects as inputs the Executor endpoint that you want to target, as well as a Document or Iterable of Documents:

from docarray import Document

d1 = Document(content='hello')
client = Client(...)

client.post('/endpoint', d1)
from docarray import Document

d1 = Document(content='hello')
d2 = Document(content='world')
client = Client(...)

client.post('/endpoint', [d1, d2])
from docarray import DocumentArray

da = DocumentArray.empty(10)
client = Client(...)

client.post('/endpoint', da)
from docarray import Document

def doc_gen():
    for j in range(10):
        yield Document(content=f'hello {j}')
        
client = Client(...)

client.post('/endpoint', doc_gen)
client = Client(...)

client.post('/endpoint')

Caution

Flow also provides a .post() method that follows the same interface as client.post(). However, once your solution is deployed remotely, the Flow interface is not present anymore. Hence, flow.post() is not recommended outside of testing or debugging use cases.

Send data in batches#

Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow. Those Documents are internally batched into a Request, providing a smaller memory footprint and faster response times thanks to callback functions.

The size of these batches can be controlled with the request_size keyword. The default request_size is 100 Documents. The optimal size will depend on your use case.

from jina import Flow, Client, Document, DocumentArray

with Flow() as f:
    client = Client(port=f.port)
    client.post('/', DocumentArray(Document() for _ in range(100)), request_size=10)

Send data asynchronously#

There is an async version of the Python Client which works with post() and mutate().

While the standard Client is also asynchronous under the hood, its async version exposes this fact to the outside world, by allowing coroutines as input, and returning an asynchronous iterator. This means you can iterate over Responses one by one, as they come in.

import asyncio

from jina import Client, Flow, Document


async def async_inputs():
    for _ in range(10):
        yield Document()
        await asyncio.sleep(0.1)


async def run_client(port):
    client = Client(port=port, asyncio=True)
    async for resp in client.post('/', async_inputs, request_size=1):
        print(resp)


with Flow() as f:  # Using it as a Context Manager will start the Flow
    asyncio.run(run_client(f.port))

Async send is useful when calling a Flow from an Executor, as described in Async coroutines.

from jina import Client, Executor, requests, DocumentArray


class DummyExecutor(Executor):

    c = Client(host='grpc://0.0.0.0:51234', asyncio=True)

    @requests
    async def process(self, docs: DocumentArray, **kwargs):
        self.c.post('/', docs)

Send data to specific Executors#

Usually a Flow will send each request to all Executors with matching endpoints as configured. But the Client also allows you to only target specific Executors in a Flow using the target_executor keyword. The request will then only be processed by the Executors which match the provided target_executor regex. Its usage is shown in the listing below.

from jina import Client, Executor, Flow, requests, Document, DocumentArray


class FooExecutor(Executor):
    @requests
    async def foo(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'foo was here and got {len(docs)} document'))


class BarExecutor(Executor):
    @requests
    async def bar(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'bar was here and got {len(docs)} document'))


f = (
    Flow()
    .add(uses=FooExecutor, name='fooExecutor')
    .add(uses=BarExecutor, name='barExecutor')
)

with f:  # Using it as a Context Manager will start the Flow
    client = Client(port=f.port)
    docs = client.post(on='/', target_executor='bar*')
    print(docs.texts)

This will send the request to all Executors whose names start with ‘bar’, such as ‘barExecutor’. In the simplest case, you can specify a precise Executor name, and the request will be sent only to that single Executor.

Use Unary or Streaming gRPC#

The Flow with gRPC protocol implements the unary and the streaming RPC lifecycle for communicating with the clients. When sending more than one request using the batching or the iterator mechanism, the RPC lifecycle for the post() method can be controlled using the stream boolean method argument. By default the stream option is set to True which uses the streaming RPC to send the data to the Flow. If the stream option is set to False, the unary RPC is used to send the data to the Flow. Both RPC lifecycles are implemented to provide the flexibility for the clients.

There might be performance penalties when using the streaming RPC in the Python gRPC implementation.

Hint

This option is only valid for gRPC protocol.

Refer to the gRPC Performance Best Practices guide for more implementations details and considerations.

Returns#

post() returns a DocumentArray containing all Documents flattened over all Requests. When setting return_responses=True, this behavior is changed to returning a list of Response objects.

If a callback function is provided, client.post() will return none.

from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    docs = client.post(on='', inputs=Document(text='Hi there!'))
    print(docs)
    print(docs.texts)
<DocumentArray (length=1) at 140619524357664>
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp = client.post(on='', inputs=Document(text='Hi there!'), return_responses=True)
    print(resp)
    print(resp[0].docs.texts)
[<jina.types.request.data.DataRequest ('header', 'parameters', 'routes', 'data') at 140619524354592>]
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp = client.post(
        on='',
        inputs=Document(text='Hi there!'),
        on_done=lambda resp: print(resp.docs.texts),
    )
    print(resp)
['Hi there!']
None

Callbacks vs returns#

Callback operates on every sub-request generated by request_size. The callback function consumes the response one by one. The old response is immediately free from the memory after the consumption.

When callback is not provided, the client accumulates all DocumentArray of all Requests before returning. This means you will not receive results until all Requests have been processed, which is slower and requires more memory.

Force the order of responses#

Note that the Flow processes Documents in an asynchronous and a distributed manner. The order of the Flow processing the requests may not be the same order as the Client sending them. Hence, the response order may also not be consistent as the sending order.

To force the order of the results to be deterministic and the same as when they are sent, passing results_in_order parameter to post().

import random
import time
from jina import Flow, Executor, requests, Client, DocumentArray, Document


class RandomSleepExecutor(Executor):
    @requests
    def foo(self, *args, **kwargs):
        rand_sleep = random.uniform(0.1, 1.3)
        time.sleep(rand_sleep)


f = Flow().add(uses=RandomSleepExecutor, replicas=3)
input_text = [f'ordinal-{i}' for i in range(180)]
input_da = DocumentArray([Document(text=t) for t in input_text])

with f:
    c = Client(port=f.port, protocol=f.protocol)
    output_da = c.post('/', inputs=input_da, request_size=10, results_in_order=True)
    for input, output in zip(input_da, output_da):
        assert input.text == output.text