Python Client#
The most convenient way to work with the Flow
API is the Python Client.
It enables you to send Documents
to a running Flow
in a number of different ways, as shown below.
HTTP, gRPC, and WebSocket#
Jina Flows and Clients support three different networking protocols: HTTP, gRPC, and WebSocket. For each of them, you first connect your Client to the Flow, before you can send requests to it.
Connect Client to a Flow#
If there is not already a Flow running in the background or on the network, you can start one:
from jina import Flow
PORT = 1234
PROTOCOL = 'grpc' # one of 'grpc', 'http', 'websocket'
with Flow(port=PORT, protocol=PROTOCOL) as f:
f.block()
To connect to the Flow
, the Client has to specify the followings parameters.
All af these have to match the Flow and how it was set up:
the
protocol
it needs to use to communicate with the Flowthe
host
and theport
as exposed by the Flowif it needs to use
TLS
encryption
You can define these parameters by passing a valid URI scheme as part of the host
argument:
from jina import Client
Client(host='http://my.awesome.flow:1234')
Client(host='ws://my.awesome.flow:1234')
Client(host='grpc://my.awesome.flow:1234')
from jina import Client
Client(host='https://my.awesome.flow:1234')
Client(host='wss://my.awesome.flow:1234')
Client(host='grpcs://my.awesome.flow:1234')
Equivalently, you can pass each relevant parameter as a keyword argument:
Client(host='my.awesome.flow', port=1234, protocol='http')
Client(host='my.awesome.flow', port=1234, protocol='websocket')
Client(host='my.awesome.flow', port=1234, protocol='grpc')
Client(host='my.awesome.flow', port=1234, protocol='http', tls=True)
Client(host='my.awesome.flow', port=1234, protocol='websocket', tls=True)
Client(host='my.awesome.flow', port=1234, protocol='grpc', tls=True)
You can also use a mixe of both:
Client(host='https://my.awesome.flow', port=1234)
Client(host='my.awesome.flow:1234', protocol='http', tls=True)
Caution
You can’t define these parameters both by keyword argument and by host scheme - you can’t have two sources of truth. Example: the following code will raise an exception:
Client(host='https://my.awesome.flow:1234', port=4321)
Hint
The arguments above have usefule defaults: protocol='grpc'
and host='0.0.0.0'
.
This is particularly useful when debugging or accessing a Flow on your local machine.
To connect to a Flow f
it is therefore often enough to do the following:
c = Client(port=f.port)
Send requests to the Flow#
After a Client has connected to a Flow, it can send requests to the Flow using its .post()
method.
This expects as inputs the Executor endpoint that you want to target, as well as a Document or Iterable of Documents:
from docarray import Document, DocumentArray
d1 = Document(content='hello')
d2 = Document(content='world')
def doc_gen():
for j in range(10):
yield Document(content=f'hello {j}')
client = Client(port=PORT)
client.post('/endpoint', d1) # Single Document
client.post('/endpoint', [d1, d2]) # List of Documents
client.post('/endpoint', doc_gen) # Document generator
client.post('/endpoint', DocumentArray([d1, d2])) # DocumentArray
client.post('/endpoint') # Empty
Caution
Flow
also provides a .post()
method that follows the same interface as client.post()
.
However, once your solution is deployed remotely, the Flow interface is not present anymore.
Hence, flow.post()
is not recommended outside of testing or debugging use cases.
Batching Requests#
Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow
.
Those Documents are internally batched into a Request
, providing a smaller memory footprint and faster response times thanks
to callback functions.
The size of these batches can be controlled with the request_size
keyword.
The default request_size
is 100 Documents
. The optimal size will depend on your use case.
from docarray import Document, DocumentArray
from jina import Flow, Client
with Flow() as f:
client = Client(port=f.port)
client.post('/', DocumentArray(Document() for _ in range(100)), request_size=10)
Targeting a specific Executor#
Usually a Flow
will send each request to all Executors with matching endpoints as configured. But the Client
also allows you to only target specific Executors in a Flow
using the target_executor
keyword. The request will then only be processed by the Executors which match the provided target_executor regex. Its usage is shown in the listing below.
from docarray import Document, DocumentArray
from jina import Client, Executor, Flow, requests
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'foo was here and got {len(docs)} document'))
class BarExecutor(Executor):
@requests
def bar(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'bar was here and got {len(docs)} document'))
f = (
Flow()
.add(uses=FooExecutor, name='fooExecutor')
.add(uses=BarExecutor, name='barExecutor')
)
with f: # Using it as a Context Manager will start the Flow
client = Client(port=f.port)
docs = client.post(on='/', target_executor='bar*')
print(docs.texts)
This will send the request to all Executors whose names start with ‘bar’, such as ‘barExecutor’. In the simplest case, you can specify a precise Executor name, and the request will be sent only to that single Executor.
Request parameters#
The Client can also send parameters to the Executors as shown below:
from docarray import Document
from jina import Client, Executor, Flow, requests
class MyExecutor(Executor):
@requests
def foo(self, parameters, **kwargs):
print(parameters['hello'])
f = Flow().add(uses=MyExecutor)
with f:
client = Client(port=f.port)
client.post('/', Document(), parameters={'hello': 'world'})
Hint
You can send a parameters-only data request via:
with f:
client = Client(port=f.port)
client.post('/', parameters={'hello': 'world'})
This might be useful to control Executor
objects during their lifetime.
Processing results using callback functions#
After performing client.post()
, you may want to further process the obtained results.
For this purpose, Jina implements a promise-like interface, letting you specify three kinds of callback functions:
on_done
is executed after successful completion ofclient.post()
on_error
is executed whenever an error occurs inclient.post()
on_always
is always performed, no matter the success or failure ofclient.post()
Hint
Both on_done
and on_always
callback won’t be trigger if the failure is due to an error happening outside of
networking or internal jina issues. For example, if a SIGKILL
is triggered by the OS during the handling of the request
none of the callback will be executed.
Callback functions in Jina expect a Response
of the type jina.types.request.data.DataRequest
, which contains resulting Documents,
parameters, and other information.
Hint
Understanding DataRequest
DataRequest
s are objects that are sent by Jina internally. Callback functions process DataRequests, and client.post()
can return DataRequests.
DataRequest
objects can be seen as a container for data relevant for a given request, most importantly:
dr.docs
: The DocumentArray being passed between and returned by the Executors. These are the Documents usually processed in a callback function, and are often the main payload.dr.parameters
: The input parameters of the associated request.dr.parameters['__results__']
: Reserved field that gets populated by Executors returning a Pythondict
. Information in those returneddict
s gets collected here, behind each Executor’s pod_id.
dr.data
: Contains information associated with the data in the request. Most importatnly,dr.data.docs
refers to the same object asdr.docs
.
Accordingly, a callback function can be defined in the following way:
from jina.types.request.data import DataRequest
def my_callback(resp: DataRequest):
... # process request here
from jina.types.request.data import DataRequest
def my_callback(resp: DataRequest):
docs = resp.docs
... # process docs here
In the example below, our Flow passes the message then prints the result when successful. If something goes wrong, it beeps. Finally, the result is written to output.txt.
from jina import Flow, Client
from docarray import Document
def beep(*args):
# make a beep sound
import sys
sys.stdout.write('\a')
with Flow().add() as f, open('output.txt', 'w') as fp:
client = Client(port=f.port)
client.post(
'/',
Document(),
on_done=print,
on_error=beep,
on_always=lambda x: x.docs.save(fp),
)
On failure callback#
Additionally, the on_error
callback can be triggered by a raise of an exception. The callback must take an optional
exception
parameters as an argument.
def on_error(resp, exception: Exception):
...
Returning results from .post()#
If no callback is provided, client.post()
returns a flattened DocumentArray
containing all Documents of all Requests.
By setting return_responses=True
as an argument to client.post(return_responses=True), this behavior can be modified to return a list of Responses (
DataRequest`s) instead.
If a callback is provided, no results will be returned.
Caution
Not using a callback function and instead returning results can come with a serious performance penalty.
Callbacks operate on each individual Request, which represents a batch of the data. In contrast, returning results requires the accumulation of all results of all Requests. This means that you will not receive results until all Requests have been processed. This may not only be slower, but also require more memory.
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port)
docs = client.post(on='', inputs=Document(text='Hi there!'))
print(docs)
print(docs.texts)
>>> <DocumentArray (length=1) at 140619524357664>
>>> ['Hi there!']
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port)
resp = client.post(on='', inputs=Document(text='Hi there!'), return_responses=True)
print(resp)
print(resp[0].docs.texts)
>>> [<jina.types.request.data.DataRequest ('header', 'parameters', 'routes', 'data') at 140619524354592>]
>>> ['Hi there!']
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port)
resp = client.post(
on='',
inputs=Document(text='Hi there!'),
on_done=lambda resp: print(resp.docs.texts),
)
print(resp)
>>> ['Hi there!']
>>> None
Custom gRPC compression for GRPC Client#
If the communication to the Flow
needs to be done via gRPC, you can pass compression
parameter to client.post
to benefit from (grpc compression
)[https://grpc.github.io/grpc/python/grpc.html#compression] methods.
The supported methods are: NoCompression
, Gzip
and Deflate
.
from jina import Client
client = Client()
client.post(..., compression='Gzip')
TLS support#
To connect to a Flow that has been configured to use TLS in combination with gRPC, http, or websocket,
set the Client’s tls
parameter to True
:
c_http = Client(protocol='http', tls=True, host=..., port=...)
c_ws = Client(protocol='websocket', tls=True, host=..., port=...)
c_grpc = Client(protocol='grpc', tls=True, host=..., port=...)
The same can be achieved by passing a valid URI to the host
parameter, and appending ‘s’ to the protocol definition:
Client(host='https://my.awesome.flow:1234')
Client(host='wss://my.awesome.flow:1234')
Client(host='grpcs://my.awesome.flow:1234')
GraphQL#
The Jina Client additionally supports fetching data via GraphQL mutations using client.mutate()
:
from jina import Client
PORT = ...
c = Client(port=PORT)
mut = '''
mutation {
docs(data: {text: "abcd"}) {
id
matches {
embedding
}
}
}
'''
response = c.mutate(mutation=mut)
For details on the allowed mutation arguments and response fields, see here.
Async Python Client#
There also exists an async version of the Python Client which works with .post()
and .mutate()
.
While the standard Client
is also asynchronous under the hood, its async version exposes this fact to the outside world,
by allowing coroutines as input, and returning an asynchronous iterator.
This means you can iterate over Responses one by one, as they come in.
import asyncio
from docarray import Document
from jina import Client, Flow
async def async_inputs():
for _ in range(10):
yield Document()
await asyncio.sleep(0.1)
async def run_client(port):
client = Client(port=port, asyncio=True)
async for resp in client.post('/', async_inputs, request_size=1):
print(resp)
with Flow() as f: # Using it as a Context Manager will start the Flow
asyncio.run(run_client(f.port))