Dynamic Batching#

Dynamic batching allows requests to be accumulated and batched together before being sent to an Executor. The batch is created dynamically depending on the configuration for each endpoint.

This feature is especially relevant for inference tasks where model inference is more optimized when batched to efficiently use GPU resources.

Overview#

Enabling dynamic batching on Executor endpoints that perform inference typically results in better hardware usage and thus, in increased throughput.

When you enable dynamic batching, incoming requests to Executor endpoints with the same request parameters are queued together. The Executor endpoint is executed on the queue requests when either:

Although this feature can work on parametrized requests, it’s best used for endpoints that don’t often receive different parameters. Creating a batch of requests typically results in better usage of hardware resources and potentially increased throughput.

You can enable and configure dynamic batching on an Executor endpoint using several methods:

  • dynamic_batching decorator

  • uses_dynamic_batching Executor parameter

  • dynamic_batching section in Executor YAML

Example#

The following examples show how to enable dynamic batching on an Executor Endpoint:

This decorator is applied per Executor endpoint. Only Executor endpoints (methods decorated with @requests) decorated with @dynamic_batching have dynamic batching enabled.

from jina import Executor, requests, dynamic_batching, Deployment
from docarray import DocList, BaseDoc
from docarray.typing import AnyTensor, AnyEmbedding
from typing import Optional

import numpy as np
import torch

class MyDoc(BaseDoc):
    tensor: Optional[AnyTensor[128]] = None
    embedding: Optional[AnyEmbedding[128]] = None


class MyExecutor(Executor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # initialize model
        self.model = torch.nn.Linear(in_features=128, out_features=128)
    
    @requests(on='/bar')
    @dynamic_batching(preferred_batch_size=10, timeout=200)
    def embed(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
        docs.embedding = self.model(torch.Tensor(docs.tensor))

dep = Deployment(uses=MyExecutor)

This argument is a dictionary mapping each endpoint to its corresponding configuration:

from jina import Executor, requests, dynamic_batching, Deployment
from docarray import DocList, BaseDoc
from docarray.typing import AnyTensor, AnyEmbedding
from typing import Optional

import numpy as np
import torch

class MyDoc(BaseDoc):
    tensor: Optional[AnyTensor[128]] = None
    embedding: Optional[AnyEmbedding[128]] = None


class MyExecutor(Executor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # initialize model
        self.model = torch.nn.Linear(in_features=128, out_features=128)
    
    @requests(on='/bar')
    def embed(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
        docs.embedding = self.model(torch.Tensor(docs.tensor))


dep = Deployment(
    uses=MyExecutor,
    uses_dynamic_batching={'/bar': {'preferred_batch_size': 10, 'timeout': 200}},
)

If you use YAML to enable dynamic batching on an Executor, you can use the dynamic_batching section in the Executor section. Suppose the Executor is implemented like this: my_executor.py:

from jina import Executor, requests, dynamic_batching, Deployment
from docarray import DocList, BaseDoc
from docarray.typing import AnyTensor, AnyEmbedding
from typing import Optional

import numpy as np
import torch

class MyDoc(BaseDoc):
    tensor: Optional[AnyTensor[128]] = None
    embedding: Optional[AnyEmbedding[128]] = None


class MyExecutor(Executor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        # initialize model
        self.model = torch.nn.Linear(in_features=128, out_features=128)
    
    @requests(on='/bar')
    def embed(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
        docs.embedding = self.model(torch.Tensor(docs.tensor))

Then, in your config.yaml file, you can enable dynamic batching on the /bar endpoint like so:

jtype: MyExecutor
py_modules:
    - my_executor.py
uses_dynamic_batching:
  /bar:
    preferred_batch_size: 10
    timeout: 200

We then deploy with:

from jina import Deployment

with Deployment(uses='config.yml') as dep:
    dep.block()

Parameters#

The following parameters allow you to configure the dynamic batching behavior on each Executor endpoint:

  • preferred_batch_size: Target number of Documents in a batch. The batcher collects requests until preferred_batch_size is reached, or until timeout is reached. The batcher then makes sure that the Executor only receives documents in groups of maximum the preferred_batch_size Therefore, the actual batch size could be smaller than preferred_batch_size.

  • timeout: Maximum time in milliseconds to wait for a request to be assigned to a batch. If the oldest request in the queue reaches a waiting time of timeout, the batch is passed to the Executor, even if it contains fewer than preferred_batch_size Documents. Default is 10,000ms (10 seconds).