jina.serve.executors.decorators module#

Decorators and wrappers designed for wrapping BaseExecutor functions.

jina.serve.executors.decorators.avoid_concurrent_lock_cls(cls)[source]#

Wraps a function to lock a filelock for concurrent access with the name of the class to which it applies, to avoid deadlocks :param cls: the class to which is applied, only when the class corresponds to the instance type, this filelock will apply :return: the wrapped function

jina.serve.executors.decorators.requests(func=None, *, on=None)[source]#

@requests defines the endpoints of an Executor. It has a keyword on= to define the endpoint.

A class method decorated with plain @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.

EXAMPLE USAGE

from jina import Executor, requests, Flow
from docarray import Document


# define Executor with custom `@requests` endpoints
class MyExecutor(Executor):
    @requests(on='/index')
    def index(self, docs, **kwargs):
        print(docs)  # index docs here

    @requests(on=['/search', '/query'])
    def search(self, docs, **kwargs):
        print(docs)  # perform search here

    @requests  # default/fallback endpoint
    def foo(self, docs, **kwargs):
        print(docs)  # process docs here


f = Flow().add(uses=MyExecutor)  # add your Executor to a Flow
with f:
    f.post(
        on='/index', inputs=Document(text='I am here!')
    )  # send doc to `index` method
    f.post(
        on='/search', inputs=Document(text='Who is there?')
    )  # send doc to `search` method
    f.post(
        on='/query', inputs=Document(text='Who is there?')
    )  # send doc to `search` method
    f.post(on='/bar', inputs=Document(text='Who is there?'))  # send doc to `foo` method
Parameters:
Returns:

decorated function

jina.serve.executors.decorators.monitor(*, name=None, documentation=None)[source]#

Decorator and context manager that allows monitoring of an Executor.

You can access these metrics by enabling monitoring on your Executor. It will track the time spent calling the function and the number of times it has been called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/.

EXAMPLE USAGE

As decorator

from jina import Executor, monitor


class MyExecutor(Executor):
    @requests  # `@requests` are monitored automatically
    def foo(self, docs, *args, **kwargs):
        ...
        self.my_method()
        ...

    # custom metric for `my_method`
    @monitor(name='metric_name', documentation='useful information goes here')
    def my_method(self):
        ...

As context manager

from jina import Executor, requests


class MyExecutor(Executor):
    @requests  # `@requests` are monitored automatically
    def foo(self, docs, *args, **kwargs):
        ...
        # custom metric for code block
        with self.monitor('metric_name', 'useful information goes here'):
            docs = process(docs)

To enable the defined monitor() blocks, enable monitoring on the Flow level

from jina import Flow

f = Flow(monitoring=True, port_monitoring=9090).add(
    uses=MyExecutor, port_monitoring=9091
)
with f:
    ...
Warning:

Don’t use this decorator in combination with the @request decorator. @request’s are already monitored.

Parameters:
  • name (Optional[str]) – the name of the metrics, by default it is based on the name of the method it decorates

  • documentation (Optional[str]) – the description of the metrics, by default it is based on the name of the method it decorates

Returns:

decorator which takes as an input a single callable