Add Endpoints#

Methods decorated with @requests are mapped to network endpoints while serving.

Decorator#

Executor methods decorated with requests are bound to specific network requests, and respond to network queries.

Both def or async def methods can be decorated with requests.

You can import the @requests decorator via:

from jina import requests

requests takes an optional on= parameter, which binds the decorated method to the specified route:

from jina import Executor, requests
import asyncio


class RequestExecutor(Executor):
    @requests(
        on=['/index', '/search']
    )  # foo is bound to `/index` and `/search` endpoints
    def foo(self, **kwargs):
        print(f'Calling foo')

    @requests(on='/other')  # bar is bound to `/other` endpoint
    async def bar(self, **kwargs):
        await asyncio.sleep(1.0)
        print(f'Calling bar')

Run the example:

from jina import Deployment

dep = Deployment(uses=RequestExecutor)
with dep:
    dep.post(on='/index', inputs=[])
    dep.post(on='/other', inputs=[])
    dep.post(on='/search', inputs=[])
─────────────────────── 🎉 Deployment is ready to serve! ───────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│       Protocol                    GRPC │
│  🏠       Local           0.0.0.0:59525  │
│  🔒     Private      192.168.1.13:59525  │
│  🌍      Public   197.244.143.223:59525  │
╰──────────────────────────────────────────╯
Calling foo
Calling bar
Calling foo

Default binding#

A class method decorated with plain @requests (without on=) is the default handler for all endpoints. This means it is the fallback handler for endpoints that are not found. f.post(on='/blah', ...) invokes MyExecutor.foo.

from jina import Executor, requests
import asyncio


class MyExecutor(Executor):
    @requests
    def foo(self, **kwargs):
        print(kwargs)

    @requests(on='/index')
    async def bar(self, **kwargs):
        await asyncio.sleep(1.0)
        print(f'Calling bar')

No binding#

If a class has no @requests decorator, the request simply passes through without any processing.

Arguments#

All Executor methods decorated by @requests need to follow the signature below to be usable as a microservice to be orchestrated either using Flow or Deployment.

The async definition is optional.

The endpoint signature looks like the following:

from typing import Dict, Union, List, Optional
from jina import Executor, requests, DocumentArray


class MyExecutor(Executor):
    @requests
    async def foo(
        self,
        docs: DocumentArray,
        parameters: Dict,
        tracing_context: Optional['Context'],
        **kwargs
    ) -> Union[DocumentArray, Dict, None]:
        pass

    @requests
    def bar(
        self,
        docs: DocumentArray,
        parameters: Dict,
        tracing_context: Optional['Context'],
        **kwargs
    ) -> Union[DocumentArray, Dict, None]:
        pass

Let’s take a look at these arguments:

  • docs: A DocumentArray that is part of the request. Since an Executor wraps functionality related to DocumentArray, it’s usually the main data format inside Executor methods. Note that these docs can be also change in place, just like any other list-like object in a Python function.

  • parameters: A Dict object that passes extra parameters to Executor functions.

  • tracing_context: Context needed if you want to add custom traces in your Executor.

Flow-specific arguments

If you use an Executor in a Flow and want to merge incoming DocumentArrays from multiple Executors, you may also be interested in some additional arguments.

Hint

If you don’t need certain arguments, you can suppress them into **kwargs. For example:

from jina import Executor, requests


class MyExecutor(Executor):

    @requests
    def foo_using_docs_arg(self, docs, **kwargs):
        print(docs)

    @requests
    def foo_using_docs_parameters_arg(self, docs, parameters, **kwargs):
        print(docs)
        print(parameters)

    @requests
    def foo_using_no_arg(self, **kwargs):
        # the args are suppressed into kwargs
        print(kwargs)

Returns#

Every Executor method can return in three ways:

  • You can directly return a DocumentArray object.

  • If you return None or don’t have a return in your method, then the original docs object (potentially mutated by your function) is returned.

  • If you return a dict object, it will be considered as a result and returned on parameters['__results__'] to the client.

from jina import requests, Executor, Deployment


class MyExec(Executor):
    @requests(on='/status')
    def status(self, **kwargs):
        return {'internal_parameter': 20}


with Deployment(uses=MyExec) as dep:
    print(dep.post(on='/status', return_responses=True)[0].to_dict()["parameters"])
{"__results__": {"my_executor/rep-0": {"internal_parameter": 20.0}}}

Exception handling#

Exceptions inside @requests-decorated functions can simply be raised.

from jina import Executor, requests


class MyExecutor(Executor):
    @requests
    def foo(self, **kwargs):
        raise NotImplementedError('No time for it')
Example usage and output
from jina import Deployment

dep = Deployment(uses=MyExecutor)


def print_why(resp):
    print(resp.status.description)


with dep:
    dep.post('', on_error=print_why)
[...]
executor0/[email protected][E]:NotImplementedError('no time for it')
 add "--quiet-error" to suppress the exception details
[...]
  File "/home/joan/jina/jina/jina/serve/executors/decorators.py", line 115, in arg_wrapper
    return fn(*args, **kwargs)
  File "/home/joan/jina/jina/toy.py", line 8, in foo
    raise NotImplementedError('no time for it')
NotImplementedError: no time for it
NotImplementedError('no time for it')