Client#

Client enables you to send Documents to a running Flow. Same as Gateway, Client supports four networking protocols: gRPC, HTTP, WebSocket and GraphQL with/without TLS.

You may have observed two styles of using a Client in the docs:

from jina import Flow

f = Flow()

with f:
    f.post('/')
from jina import Client

c = Client(...)  # must match the Flow setup
c.post('/')

The implicit style is easier in debugging and local development, as you don’t need to specify the host, port and protocol of the Flow. However, it makes very strong assumptions on (1) one Flow only corresponds to one client (2) the Flow is running on the same machine as the Client. For those reasons, explicit style is recommended for production use.

Hint

If you want to connect to your Flow from a programming language other than Python, please follow the third party client documentation.

Connect#

To connect to a Flow started by:

from jina import Flow

with Flow(port=1234, protocol='grpc') as f:
    f.block()
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│  ⛓      Protocol                   GRPC  │
│  🏠        Local           0.0.0.0:1234  │
│  🔒      Private     192.168.1.126:1234  │
│  🌍       Public    87.191.159.105:1234  │
╰──────────────────────────────────────────╯

The Client has to specify the followings parameters to match the Flow and how it was set up:

  • the protocol it needs to use to communicate with the Flow

  • the host and the port as exposed by the Flow

  • if it needs to use TLS encryption

Hint

Default port The default port for the Client is 80 unless you are using TLS encryption it will be 443

You can define these parameters by passing a valid URI scheme as part of the host argument:

from jina import Client

Client(host='http://my.awesome.flow:1234')
Client(host='ws://my.awesome.flow:1234')
Client(host='grpc://my.awesome.flow:1234')
from jina import Client

Client(host='https://my.awesome.flow:1234')
Client(host='wss://my.awesome.flow:1234')
Client(host='grpcs://my.awesome.flow:1234')

Equivalently, you can pass each relevant parameter as a keyword argument:

from jina import Client

Client(host='my.awesome.flow', port=1234, protocol='http')
Client(host='my.awesome.flow', port=1234, protocol='websocket')
Client(host='my.awesome.flow', port=1234, protocol='grpc')
from jina import Client

Client(host='my.awesome.flow', port=1234, protocol='http', tls=True)
Client(host='my.awesome.flow', port=1234, protocol='websocket', tls=True)
Client(host='my.awesome.flow', port=1234, protocol='grpc', tls=True)

You can also use a mix of both:

from jina import Client

Client(host='https://my.awesome.flow', port=1234)
Client(host='my.awesome.flow:1234', protocol='http', tls=True)

Caution

You can’t define these parameters both by keyword argument and by host scheme - you can’t have two sources of truth. Example: the following code will raise an exception:

from jina import Client

Client(host='https://my.awesome.flow:1234', port=4321)

Caution

In case you instanciate a Client object using the grpc protocol, keep in mind that grpc clients cannot be used in a multi-threaded environment (check this gRPC issue for reference). What you should do, is to rely on asynchronous programming or multi-processing rather than multi-threading. For instance, if you’re building a web server, you can introduce multi-processing based parallelism to your app using gunicorn: gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker ...

Test readiness of the Flow#

Client offer a convenient API to query these readiness endpoints. You can call is_flow_ready() or is_flow_ready(), it will return True if the Flow is ready, and False when it is not.

from jina import Flow

with Flow().add() as f:
    print(f.is_flow_ready())

print(f.is_flow_ready())
True
False
from jina import Flow

with Flow(port=12345).add() as f:
    f.block()
from jina import Client

client = Client(port=12345)
print(client.is_flow_ready())
True
from jina import Flow

with Flow(port=12345).add() as f:
    f.block()
jina ping flow grpc://localhost:12345
INFO   [email protected] ping grpc://localhost:12345 at 0 round...                                                                                              [09/08/22 12:58:13]
INFO   [email protected] ping grpc://localhost:12345 at 0 round takes 0 seconds (0.04s)
INFO   [email protected] ping grpc://localhost:12345 at 1 round...                                                                                              [09/08/22 12:58:14]
INFO   [email protected] ping grpc://localhost:12345 at 1 round takes 0 seconds (0.01s)
INFO   [email protected] ping grpc://localhost:12345 at 2 round...                                                                                              [09/08/22 12:58:15]
INFO   [email protected] ping grpc://localhost:12345 at 2 round takes 0 seconds (0.01s)
INFO   [email protected] avg. latency: 24 ms                                                                                                                    [09/08/22 12:58:16]
INFO   [email protected] ping grpc://localhost:12345 at 0 round...                                                                                              [09/08/22 12:59:00]
ERROR  [email protected] Error while getting response from grpc server <AioRpcError of RPC that terminated with:                                          [09/08/22 12:59:00]
               status = StatusCode.UNAVAILABLE
               details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"
               debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-09-08T12:59:00.518707+02:00", children:[UNKNOWN:failed to
       connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused {grpc_status:14,
       created_time:"2022-09-08T12:59:00.518706+02:00"}]}"
       >
WARNI… [email protected] not responding, retry (1/3) in 1s
INFO   [email protected] ping grpc://localhost:12345 at 0 round takes 0 seconds (0.01s)
INFO   [email protected] ping grpc://localhost:12345 at 1 round...                                                                                              [09/08/22 12:59:01]
ERROR  [email protected] Error while getting response from grpc server <AioRpcError of RPC that terminated with:                                          [09/08/22 12:59:01]
               status = StatusCode.UNAVAILABLE
               details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"
               debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-09-08T12:59:01.537293+02:00", children:[UNKNOWN:failed to
       connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused {grpc_status:14,
       created_time:"2022-09-08T12:59:01.537291+02:00"}]}"
       >
WARNI… [email protected] not responding, retry (2/3) in 1s
INFO   [email protected] ping grpc://localhost:12345 at 1 round takes 0 seconds (0.01s)
INFO   [email protected] ping grpc://localhost:12345 at 2 round...                                                                                              [09/08/22 12:59:02]
ERROR  [email protected] Error while getting response from grpc server <AioRpcError of RPC that terminated with:                                          [09/08/22 12:59:02]
               status = StatusCode.UNAVAILABLE
               details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"
               debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-09-08T12:59:02.557195+02:00", children:[UNKNOWN:failed to
       connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused {grpc_status:14,
       created_time:"2022-09-08T12:59:02.557193+02:00"}]}"
       >
WARNI… [email protected] not responding, retry (3/3) in 1s
INFO   [email protected] ping grpc://localhost:12345 at 2 round takes 0 seconds (0.02s)
WARNI… [email protected] message lost 100% (3/3)

Profiling the network#

Before sending any real data, you can test the connectivity and network latency by calling the profiling() method:

from jina import Client

c = Client(host='grpc://my.awesome.flow:1234')
c.profiling()
 Roundtrip  24ms  100% 
├──  Client-server network  17ms  71% 
└──  Server  7ms  29% 
    ├──  Gateway-executors network  0ms  0% 
    ├──  executor0  5ms  71% 
    └──  executor1  2ms  29% 

Send data#

After a Client has connected to a Flow, it can send requests to the Flow using its post() method. This expects as inputs the Executor endpoint that you want to target, as well as a Document or Iterable of Documents:

from docarray import Document

d1 = Document(content='hello')
client = Client(...)

client.post('/endpoint', d1)
from docarray import Document

d1 = Document(content='hello')
d2 = Document(content='world')
client = Client(...)

client.post('/endpoint', [d1, d2])
from docarray import DocumentArray

da = DocumentArray.empty(10)
client = Client(...)

client.post('/endpoint', da)
from docarray import Document

def doc_gen():
    for j in range(10):
        yield Document(content=f'hello {j}')
        
client = Client(...)

client.post('/endpoint', doc_gen)
client = Client(...)

client.post('/endpoint')

Caution

Flow also provides a .post() method that follows the same interface as client.post(). However, once your solution is deployed remotely, the Flow interface is not present anymore. Hence, flow.post() is not recommended outside of testing or debugging use cases.

Send data asynchronously#

There also exists an async version of the Python Client which works with post() and mutate().

While the standard Client is also asynchronous under the hood, its async version exposes this fact to the outside world, by allowing coroutines as input, and returning an asynchronous iterator. This means you can iterate over Responses one by one, as they come in.

import asyncio

from jina import Client, Flow, Document


async def async_inputs():
    for _ in range(10):
        yield Document()
        await asyncio.sleep(0.1)


async def run_client(port):
    client = Client(port=port, asyncio=True)
    async for resp in client.post('/', async_inputs, request_size=1):
        print(resp)


with Flow() as f:  # Using it as a Context Manager will start the Flow
    asyncio.run(run_client(f.port))

Async send is useful when calling a Flow from an Executor, as described in Async coroutines.

from jina import Client, Executor, requests, DocumentArray


class DummyExecutor(Executor):

    c = Client(host='grpc://0.0.0.0:51234', asyncio=True)

    @requests
    async def process(self, docs: DocumentArray, **kwargs):
        self.c.post('/', docs)

Send data in batches#

Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow. Those Documents are internally batched into a Request, providing a smaller memory footprint and faster response times thanks to callback functions.

The size of these batches can be controlled with the request_size keyword. The default request_size is 100 Documents. The optimal size will depend on your use case.

from jina import Flow, Client, Document, DocumentArray

with Flow() as f:
    client = Client(port=f.port)
    client.post('/', DocumentArray(Document() for _ in range(100)), request_size=10)

Send data to specific Executor#

Usually a Flow will send each request to all Executors with matching endpoints as configured. But the Client also allows you to only target specific Executors in a Flow using the target_executor keyword. The request will then only be processed by the Executors which match the provided target_executor regex. Its usage is shown in the listing below.

from jina import Client, Executor, Flow, requests, Document, DocumentArray


class FooExecutor(Executor):
    @requests
    async def foo(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'foo was here and got {len(docs)} document'))


class BarExecutor(Executor):
    @requests
    async def bar(self, docs: DocumentArray, **kwargs):
        docs.append(Document(text=f'bar was here and got {len(docs)} document'))


f = (
    Flow()
    .add(uses=FooExecutor, name='fooExecutor')
    .add(uses=BarExecutor, name='barExecutor')
)

with f:  # Using it as a Context Manager will start the Flow
    client = Client(port=f.port)
    docs = client.post(on='/', target_executor='bar*')
    print(docs.texts)

This will send the request to all Executors whose names start with ‘bar’, such as ‘barExecutor’. In the simplest case, you can specify a precise Executor name, and the request will be sent only to that single Executor.

Send parameters#

The Client can also send parameters to the Executors as shown below:

from jina import Client, Executor, Flow, requests, Document

class MyExecutor(Executor):

    @requests
    def foo(self, parameters, **kwargs):
        print(parameters['hello'])

f = Flow().add(uses=MyExecutor)

with f:
    client = Client(port=f.port)
    client.post('/', Document(), parameters={'hello': 'world'})

Hint

You can send a parameters-only data request via:

with f:
    client = Client(port=f.port)
    client.post('/', parameters={'hello': 'world'})

This might be useful to control Executor objects during their lifetime.

Send parameters to specific Executors#

You can send parameters to specific Executor by using the executor__parameter syntax. The Executor named executorname will receive the parameter paramname (without the executorname__ in the key name) and none of the other Executors will receive it.

For instance in the following Flow:

from jina import Flow, DocumentArray, Client

with Flow().add(name='exec1').add(name='exec2') as f:

    client = Client(port=f.port)

    client.post(
        '/index',
        DocumentArray.empty(size=5),
        parameters={'exec1__traversal_path': '@r', 'exec2__traversal_path': '@c'},
    )

The Executor exec1 will receive {'traversal_path':'@r'} as parameters, whereas exec2 will receive {'traversal_path':'@c'} as parameters.

This feature is intended for the case where there are multiple Executors that take the same parameter names, but you want to use different values for each Executor. This is often the case for Executors from the Hub, since they tend to share a common interface for parameters.

Difference to target_executor

Why do we need this feature if we already have target_executor?

On the surface, both of them is about sending information to a partial Flow, i.e. a subset of Executors. However, they work differently under the hood. target_executor directly send info to those specified executors, ignoring the topology of the Flow; whereas executor__parameter’s request follows the topology of the Flow and only send parameters to the Executor that matches.

Think about roll call and passing notes in a classroom. target_executor is like calling a student directly, whereas executor__parameter is like asking him/her to pass the notes to the next student one by one while each picks out the note with its own name.

Callbacks#

After performing post(), you may want to further process the obtained results.

For this purpose, Jina implements a promise-like interface, letting you specify three kinds of callback functions:

  • on_done is executed while streaming, after successful completion of each request

  • on_error is executed while streaming, whenever an error occurs in each request

  • on_always is always performed while streaming, no matter the success or failure of each request

Note that these callbacks only work for requests (and failures) inside the stream, for example inside an Executor. If the failure is due to an error happening outside of streaming, then these callbacks will not be triggered. For example, a SIGKILL from the client OS during the handling of the request, or a networking issue, will not trigger the callback.

Callback functions in Jina expect a Response of the type DataRequest, which contains resulting Documents, parameters, and other information.

Handle DataRequest in callbacks#

DataRequests are objects that are sent by Jina internally. Callback functions process DataRequests, and client.post() can return DataRequests.

DataRequest objects can be seen as a container for data relevant for a given request, it contains the following fields:

The request header.

from pprint import pprint

from jina import Client

Client().post(on='/', on_done=lambda x: pprint(x.header))
request_id: "ea504823e9de415d890a85d1d00ccbe9"
exec_endpoint: "/"
target_executor: ""

The input parameters of the associated request. In particular, DataRequest.parameters['__results__'] is a reserved field that gets populated by Executors returning a Python dict. Information in those returned dicts gets collected here, behind each Executor ID.

from pprint import pprint

from jina import Client

Client().post(on='/', on_done=lambda x: pprint(x.parameters))
{'__results__': {}}

The routing information of the data request. It contains the which Executors have been called, and the order in which they were called. The timing and latency of each Executor is also recorded.

from pprint import pprint

from jina import Client

Client().post(on='/', on_done=lambda x: pprint(x.routes))
[executor: "gateway"
start_time {
  seconds: 1662637747
  nanos: 790248000
}
end_time {
  seconds: 1662637747
  nanos: 794104000
}
, executor: "executor0"
start_time {
  seconds: 1662637747
  nanos: 790466000
}
end_time {
  seconds: 1662637747
  nanos: 793982000
}
]

The DocumentArray being passed between and returned by the Executors. These are the Documents usually processed in a callback function, and are often the main payload.

from pprint import pprint

from jina import Client

Client().post(on='/', on_done=lambda x: pprint(x.docs))
<DocumentArray (length=0) at 5044245248>

Accordingly, a callback that processing documents can be defined as:

from jina.types.request.data import DataRequest

def my_callback(resp: DataRequest):
    foo(resp.docs)

Handle exceptions in callbacks#

Server error can be caught by Client’s on_error callback function. You can get the error message and traceback from header.status:

from pprint import pprint

from jina import Flow, Client, Executor, requests


class MyExec1(Executor):
    @requests
    def foo(self, **kwargs):
        raise NotImplementedError


with Flow(port=12345).add(uses=MyExec1) as f:
    c = Client(port=f.port)
    c.post(on='/', on_error=lambda x: pprint(x.header.status))
code: ERROR
description: "NotImplementedError()"
exception {
  name: "NotImplementedError"
  stacks: "Traceback (most recent call last):\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/jina/serve/runtimes/worker/__init__.py\", line 181, in process_data\n    result = await self._data_request_handler.handle(requests=requests)\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/jina/serve/runtimes/request_handlers/data_request_handler.py\", line 152, in handle\n    return_data = await self._executor.__acall__(\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/jina/serve/executors/__init__.py\", line 301, in __acall__\n    return await self.__acall_endpoint__(__default_endpoint__, **kwargs)\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/jina/serve/executors/__init__.py\", line 322, in __acall_endpoint__\n    return func(self, **kwargs)\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/jina/serve/executors/decorators.py\", line 213, in arg_wrapper\n    return fn(executor_instance, *args, **kwargs)\n"
  stacks: "  File \"/Users/hanxiao/Documents/jina/toy44.py\", line 10, in foo\n    raise NotImplementedError\n"
  stacks: "NotImplementedError\n"
  executor: "MyExec1"
}

In the example below, our Flow passes the message then prints the result when successful. If something goes wrong, it beeps. Finally, the result is written to output.txt.

from jina import Flow, Client, Document


def beep(*args):
    # make a beep sound
    import sys

    sys.stdout.write('\a')


with Flow().add() as f, open('output.txt', 'w') as fp:
    client = Client(port=f.port)
    client.post(
        '/',
        Document(),
        on_done=print,
        on_error=beep,
        on_always=lambda x: x.docs.save(fp),
    )

What errors can be handled by the callback?

Callbacks can handle errors that are caused by Executors raising an Exception.

A callback will not receive exceptions:

  • from the Gateway having connectivity errors with the Executors.

  • between the Client and the Gateway.

Continue streaming when an error occurs#

client.post() accepts a continue_on_error parameter. When set to True, the Client will keep trying to send the remaining requests. The continue_on_error parameter will only apply to Exceptions caused by an Executor, but in case of network connectivity issues, an Exception will be raised.

Transient fault handling with retries#

client.post() accepts max_attempts, initial_backoff, max_backoff and backoff_multiplier parameters to control the capacity to retry requests, when a transient connectivity error occurs, using an exponential backoff strategy. This can help to overcome transient network connectivity issues.

The max_attempts parameter determines the number of sending attempts, including the original request. The initial_backoff, max_backoff, and backoff_multiplier parameters determine the randomized delay in seconds before retry attempts.

The initial retry attempt will occur at random(0, initial_backoff). In general, the n-th attempt will occur at random(0, min(initial_backoff*backoff_multiplier**(n-1), max_backoff)).

Returns#

client.post() returns a flattened DocumentArray containing all Documents of all Requests.

By setting return_responses=True as an argument to client.post(return_responses=True), this behavior can be modified to return a list of Responses (DataRequests) instead.

If a callback is provided, no results will be returned.

Caution

Not using a callback function and instead returning results can come with a serious performance penalty.

Callbacks operate on each individual Request, which represents a batch of the data. In contrast, returning results requires the accumulation of all results of all Requests. This means that you will not receive results until all Requests have been processed. This may not only be slower, but also require more memory.

from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    docs = client.post(on='', inputs=Document(text='Hi there!'))
    print(docs)
    print(docs.texts)
<DocumentArray (length=1) at 140619524357664>
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp = client.post(on='', inputs=Document(text='Hi there!'), return_responses=True)
    print(resp)
    print(resp[0].docs.texts)
[<jina.types.request.data.DataRequest ('header', 'parameters', 'routes', 'data') at 140619524354592>]
['Hi there!']
from jina import Flow, Client, Document

with Flow() as f:
    client = Client(port=f.port)
    resp = client.post(
        on='',
        inputs=Document(text='Hi there!'),
        on_done=lambda resp: print(resp.docs.texts),
    )
    print(resp)
['Hi there!']
None

Enable compression#

If the communication to the Gateway is via gRPC, you can pass compression parameter to post() to benefit from gRPC compression methods.

The supported choices are: None, gzip and deflate.

from jina import Client

client = Client()
client.post(..., compression='Gzip')

Note that this setting is only effective the communication between the client and the Flow’s gateway.

One can also specify the compression of the internal communication as described here.

Enable TLS#

To connect to a Flow that has been configured to use TLS in combination with gRPC, http, or websocket, set the Client’s tls parameter to True:

c_http = Client(protocol='http', tls=True, host=..., port=...)
c_ws = Client(protocol='websocket', tls=True, host=..., port=...)
c_grpc = Client(protocol='grpc', tls=True, host=..., port=...)

The same can be achieved by passing a valid URI to the host parameter, and appending ‘s’ to the protocol definition:

Client(host='https://my.awesome.flow:1234')
Client(host='wss://my.awesome.flow:1234')
Client(host='grpcs://my.awesome.flow:1234')

Use GraphQL#

The Jina Client additionally supports fetching data via GraphQL mutations using mutate():

from jina import Client

PORT = ...
c = Client(port=PORT)
mut = '''
        mutation {
            docs(data: {text: "abcd"}) { 
                id
                matches {
                    embedding
                }
            } 
        }
    '''
response = c.mutate(mutation=mut)

For details on the allowed mutation arguments and response fields, see here.