Client API#
The most convenient way to work with the Flow
API is the Python Client.
It enables you to send Documents
to a running Flow
in a number of different ways, as shown below.
Caution
Flow
provides a .post()
method that follows the same interface as client.post()
.
However, once your solution is deployed in the cloud, the Flow interface is not present anymore.
Hence, flow.post()
is not recommended outside of testing or debugging use cases.
HTTP, gRPC, and WebSocket#
Jina Flows and Clients support three different networking protocols: HTTP, gRPC, and WebSocket.
These can all be used in the same way by using client.post()
.
Starting the Flow:
from jina import Flow
port = 12345
with Flow(port=port) as f:
f.block()
Using the Client:
from docarray import Document, DocumentArray
from jina import Client
PORT = 12345
d1 = Document(content='hello')
d2 = Document(content='world')
def doc_gen():
for j in range(10):
yield Document(content=f'hello {j}')
client = Client(port=PORT)
client.post('/endpoint', d1) # Single Document
client.post('/endpoint', [d1, d2]) # List of Documents
client.post('/endpoint', doc_gen) # Document generator
client.post('/endpoint', DocumentArray([d1, d2])) # DocumentArray
client.post('/endpoint') # Empty
Batching Requests#
Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow
.
Those Documents are internally batched into a Request
, providing a smaller memory footprint and faster response times thanks
to callback functions.
The size of these batches can be controlled with the request_size
keyword.
The default request_size
is 100 Documents
. The optimal size will depend on your use case.
from docarray import Document, DocumentArray
from jina import Flow, Client
with Flow() as f:
client = Client(port=f.port)
client.post('/', DocumentArray(Document() for _ in range(100)), request_size=10)
Targeting a specific Executor#
Usually a Flow
will send each request to all Executors with matching endpoints as configured. But the Client
also allows you to only target a specific Executor in a Flow
using the target_executor
keyword. The request will then only be processed by the Executor with the provided name. Its usage is shown in the listing below.
from docarray import Document, DocumentArray
from jina import Client, Executor, Flow, requests
class FooExecutor(Executor):
@requests
def foo(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'foo was here and got {len(docs)} document'))
class BarExecutor(Executor):
@requests
def bar(self, docs: DocumentArray, **kwargs):
docs.append(Document(text=f'bar was here and got {len(docs)} document'))
f = (
Flow()
.add(uses=FooExecutor, name='fooExecutor')
.add(uses=BarExecutor, name='barExecutor')
)
with f: # Using it as a Context Manager will start the Flow
client = Client(port=f.port)
docs = client.post(on='/', target_executor='barExecutor')
print(docs.texts)
Request parameters#
The Client can also send parameters to the Executors as shown below:
from docarray import Document
from jina import Client, Executor, Flow, requests
class MyExecutor(Executor):
@requests
def foo(self, parameters, **kwargs):
print(parameters['hello'])
f = Flow().add(uses=MyExecutor)
with f:
client = Client(port=f.port)
client.post('/', Document(), parameters={'hello': 'world'})
Note
You can send a parameters-only data request via:
with f:
client = Client(port=f.port)
client.post('/', parameters={'hello': 'world'})
This might be useful to control Executor
objects during their lifetime.
Processing results using callback functions#
After performing client.post()
, you may want to further process the obtained results.
For this purpose, Jina implements a promise-like interface, letting you specify three kinds of callback functions:
on_done
is executed after successful completion ofclient.post()
on_error
is executed whenever an error occurs inclient.post()
on_always
is always performed, no matter the success or failure ofclient.post()
Tip
Both on_done
and on_always
callback won’t be trigger if the failure is due to an error happening outside of
networking or internal jina issues. For example, if a SIGKILL
is triggered by the OS during the handling of the request
none of the callback will be executed.
Callback functions in Jina expect a Response
of the type jina.types.request.data.DataRequest
, which contains resulting Documents,
parameters, and other information.
Understanding DataRequest
DataRequest
s are objects that are sent by Jina internally. Callback functions process DataRequests, and client.post()
can return DataRequests.
DataRequest
objects can be seen as a container for data relevant for a given request, most importantly:
dr.docs
: The DocumentArray being passed between and returned by the Executors. These are the Documents usually processed in a callback function, and are often the main payload.dr.parameters
: The input parameters of the associated request.dr.parameters['__results__']
: Reserved field that gets populated by Executors returning a Pythondict
. Information in those returneddict
s gets collected here, behind each Executor’s pod_id.
dr.data
: Contains information associated with the data in the request. Most importatnly,dr.data.docs
refers to the same object asdr.docs
.
Accordingly, a callback function can be defined in the following way:
from jina.types.request.data import DataRequest
def my_callback(resp: DataRequest):
... # process request here
from jina.types.request.data import DataRequest
def my_callback(resp: DataRequest):
docs = resp.docs
... # process docs here
In the example below, our Flow passes the message then prints the result when successful. If something goes wrong, it beeps. Finally, the result is written to output.txt.
from jina import Flow, Client
from docarray import Document
def beep(*args):
# make a beep sound
import sys
sys.stdout.write('\a')
with Flow().add() as f, open('output.txt', 'w') as fp:
client = Client(port=f.port)
client.post(
'/',
Document(),
on_done=print,
on_error=beep,
on_always=lambda x: x.docs.save(fp),
)
On failure callback#
Additionally, the on_error
callback can be triggered by a raise of an exception. The callback must take an optional
exception
parameters as an argument.
def on_error(resp, exception: Exception):
...
Returning results from .post()#
If no callback is provided, client.post()
returns a flattened DocumentArray
containing all Documents of all Requests.
By setting return_responses=True
when creating a Client, this behavior can be modified to return a list of Responses
(DataRequest
s) instead.
If a callback is provided, no results will be returned.
Danger
Not using a callback function and instead returning results can come with a serious performance penalty.
Callbacks operate on each individual Request, which represents a batch of the data. In contrast, returning results requires the accumulation of all results of all Requests. This means that you will not receive results until all Requests have been processed. This may not only be slower, but also require more memory.
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port)
docs = client.post(on='', inputs=Document(text='Hi there!'))
print(docs)
print(docs.texts)
>>> <DocumentArray (length=1) at 140619524357664>
>>> ['Hi there!']
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port, return_responses=True)
resp = client.post(on='', inputs=Document(text='Hi there!'))
print(resp)
print(resp[0].docs.texts)
>>> [<jina.types.request.data.DataRequest ('header', 'parameters', 'routes', 'data') at 140619524354592>]
>>> ['Hi there!']
from jina import Flow, Client
from docarray import Document
with Flow() as f:
client = Client(port=f.port)
resp = client.post(
on='',
inputs=Document(text='Hi there!'),
on_done=lambda resp: print(resp.docs.texts),
)
print(resp)
>>> ['Hi there!']
>>> None
GraphQL#
The Jina Client additionally supports fetching data via GraphQL mutations using client.mutate()
:
from jina import Client
PORT = ...
c = Client(port=PORT)
mut = '''
mutation {
docs(data: {text: "abcd"}) {
id
matches {
embedding
}
}
}
'''
response = c.mutate(mutation=mut)
For details on the allowed mutation arguments and response fields, see here.
Async Python Client#
There also exists an async version of the Python Client which works with .post()
and .mutate()
.
While the standard Client
is also asynchronous under the hood, its async version exposes this fact to the outside world,
by allowing coroutines as input, and returning an asynchronous iterator.
This means you can iterate over Responses one by one, as they come in.
import asyncio
from docarray import Document
from jina import Client, Flow
async def async_inputs():
for _ in range(10):
yield Document()
await asyncio.sleep(0.1)
async def run_client(port):
client = Client(port=port, asyncio=True)
async for resp in client.post('/', async_inputs, request_size=1):
print(resp)
with Flow() as f: # Using it as a Context Manager will start the Flow
asyncio.run(run_client(f.port))