jina.serve.executors.decorators module#
Decorators and wrappers designed for wrapping BaseExecutor
functions.
- jina.serve.executors.decorators.avoid_concurrent_lock_cls(cls)[source]#
Wraps a function to lock a filelock for concurrent access with the name of the class to which it applies, to avoid deadlocks :param cls: the class to which is applied, only when the class corresponds to the instance type, this filelock will apply :return: the wrapped function
- jina.serve.executors.decorators.requests(func=None, *, on=None)[source]#
@requests defines the endpoints of an Executor. It has a keyword on= to define the endpoint.
A class method decorated with plain @requests (without on=) is the default handler for all endpoints. That means, it is the fallback handler for endpoints that are not found.
EXAMPLE USAGE
from jina import Executor, requests, Flow from docarray import Document # define Executor with custom `@requests` endpoints class MyExecutor(Executor): @requests(on='/index') def index(self, docs, **kwargs): print(docs) # index docs here @requests(on=['/search', '/query']) def search(self, docs, **kwargs): print(docs) # perform search here @requests # default/fallback endpoint def foo(self, docs, **kwargs): print(docs) # process docs here f = Flow().add(uses=MyExecutor) # add your Executor to a Flow with f: f.post( on='/index', inputs=Document(text='I am here!') ) # send doc to `index` method f.post( on='/search', inputs=Document(text='Who is there?') ) # send doc to `search` method f.post( on='/query', inputs=Document(text='Who is there?') ) # send doc to `search` method f.post(on='/bar', inputs=Document(text='Who is there?')) # send doc to `foo` method
- Parameters:
func (
Optional
[Callable
[[DocumentArray
,Dict
,DocumentArray
,List
[DocumentArray
],List
[DocumentArray
]],Union
[DocumentArray
,Dict
,None
]]]) – the method to decorateon (
Union
[str
,Sequence
[str
],None
]) – the endpoint string, by convention starts with /
- Returns:
decorated function
- jina.serve.executors.decorators.dynamic_batching(func=None, *, preferred_batch_size=None, timeout=10000)[source]#
@dynamic_batching defines the dynamic batching behavior of an Executor. Dynamic batching works by collecting Documents from multiple requests in a queue, and passing them to the Executor in batches of specified size. This can improve throughput and resource utilization at the cost of increased latency. TODO(johannes) add docstring example
- Parameters:
func (
Optional
[Callable
[[DocumentArray
,Dict
,DocumentArray
,List
[DocumentArray
],List
[DocumentArray
]],Union
[DocumentArray
,Dict
,None
]]]) – the method to decoratepreferred_batch_size (
Optional
[int
]) – target number of Documents in a batch. The batcher will collect requests until preferred_batch_size is reached, or until timeout is reached. Therefore, the actual batch size can be smaller or larger than preferred_batch_size.timeout (
Optional
[float
]) – maximum time in milliseconds to wait for a request to be assigned to a batch. If the oldest request in the queue reaches a waiting time of timeout, the batch will be passed to the Executor, even if it contains fewer than preferred_batch_size Documents. Default is 10_000ms (10 seconds).
- Returns:
decorated function
- jina.serve.executors.decorators.monitor(*, name=None, documentation=None)[source]#
Decorator and context manager that allows monitoring of an Executor.
You can access these metrics by enabling monitoring on your Executor. It will track the time spent calling the function and the number of times it has been called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/.
EXAMPLE USAGE
As decorator
from jina import Executor, monitor class MyExecutor(Executor): @requests # `@requests` are monitored automatically def foo(self, docs, *args, **kwargs): ... self.my_method() ... # custom metric for `my_method` @monitor(name='metric_name', documentation='useful information goes here') def my_method(self): ...
As context manager
from jina import Executor, requests class MyExecutor(Executor): @requests # `@requests` are monitored automatically def foo(self, docs, *args, **kwargs): ... # custom metric for code block with self.monitor('metric_name', 'useful information goes here'): docs = process(docs)
To enable the defined
monitor()
blocks, enable monitoring on the Flow levelfrom jina import Flow f = Flow(monitoring=True, port_monitoring=9090).add( uses=MyExecutor, port_monitoring=9091 ) with f: ...
- Warning:
Don’t use this decorator in combination with the @request decorator. @request’s are already monitored.
- Parameters:
name (
Optional
[str
]) – the name of the metrics, by default it is based on the name of the method it decoratesdocumentation (
Optional
[str
]) – the description of the metrics, by default it is based on the name of the method it decorates
- Returns:
decorator which takes as an input a single callable