Scale Out#
Flow
orchestrates multiple Executor
s.
By default, all Executors run with a single instance. If one Executor in the Flow is particularly slow, then it will reduce the overall throughput of the entire Flow.
To solve this, you can specify the number of replicas
to scale out an Executor.
Replicate Executors#
Replication creates multiple copies of the same Executor
. Each request in the Flow
is then passed to only one replica (instance) of that Executor. All replicas compete for a request. The idle replica gets the request first.
This is useful for improving performance and availability:
If you have slow Executors (e.g. embedding) you can scale up the number of instances to process multiple requests in parallel.
Executors might need to be taken offline occasionally (for updates, failures, etc.), but you may want your Flow to be still able to process requests without any downtime. Adding replicas allows any replica to be taken down as long as there is at least one in the Flow. This ensures the high availability of your Flow.
from jina import Flow
f = Flow().add(name='slow_encoder', replicas=3).add(name='fast_indexer')
Flow with three replicas of slow_encoder and one replica of fast_indexer#
The above Flow creates a topology with three replicas of the Executor slow_encoder
. The Flow
sends every
request to exactly one of the three instances. Then the replica sends its result to fast_indexer
.
Replicate on multiple GPUs#
To replicate your Executor
s so that each replica uses a different GPU on your machine, you can tell the Flow
to use multiple GPUs by passing CUDA_VISIBLE_DEVICES=RR
as an environment variable.
The Flow then assigns each available GPU to replicas in a round-robin fashion.
Caution
You should only replicate on multiple GPUs with CUDA_VISIBLE_DEVICES=RR
locally.
Tip
In Kubernetes or with Docker Compose you should allocate GPU resources to each replica directly in the configuration files.
For example, if you have three GPUs and one of your Executor has five replicas then:
In a flow.py
file
from jina import Flow
with Flow().add(
uses='jinaai://jina-ai/CLIPEncoder', replicas=5, install_requirements=True
) as f:
f.block()
CUDA_VISIBLE_DEVICES=RR python flow.py
In a flow.yaml
file
jtype: Flow
executors:
- uses: jinaai://jina-ai/CLIPEncoder
install_requirements: True
replicas: 5
CUDA_VISIBLE_DEVICES=RR jina flow --uses flow.yaml
The Flow assigns GPU devices in the following round-robin fashion:
GPU device |
Replica ID |
---|---|
0 |
0 |
1 |
1 |
2 |
2 |
0 |
3 |
1 |
4 |
You can restrict the visible devices in round-robin assignment using CUDA_VISIBLE_DEVICES=RR0:2
, where 0:2
corresponds to a Python slice. This creates the following assignment:
GPU device |
Replica ID |
---|---|
0 |
0 |
1 |
1 |
0 |
2 |
1 |
3 |
0 |
4 |
You can restrict the visible devices in round-robin assignment by assigning the list of device IDs to CUDA_VISIBLE_DEVICES=RR1,3
. This creates the following assignment:
GPU device |
Replica ID |
---|---|
1 |
0 |
3 |
1 |
1 |
2 |
3 |
3 |
1 |
4 |
You can also refer to GPUs by their UUID. For instance, you could assign a list of device UUIDs
CUDA_VISIBLE_DEVICES=RRGPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5,GPU-0bbbbbbb-74d2-7297-d557-12771b6a79d5,GPU-0ccccccc-74d2-7297-d557-12771b6a79d5,GPU-0ddddddd-74d2-7297-d557-12771b6a79d5
Check CUDA Documentation to see the accepted formats to assign CUDA devices by UUID.
GPU device |
Replica ID |
---|---|
GPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5 |
0 |
GPU-0bbbbbbb-74d2-7297-d557-12771b6a79d5 |
1 |
GPU-0ccccccc-74d2-7297-d557-12771b6a79d5 |
2 |
GPU-0ddddddd-74d2-7297-d557-12771b6a79d5 |
3 |
GPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5 |
4 |
Replicate external Executors#
If you have external Executors with multiple replicas running elsewhere, you can add them to your Flow by specifying all the respective hosts and ports:
from jina import Flow
replica_hosts, replica_ports = ['localhost','91.198.174.192'], ['12345','12346']
Flow().add(host=replica_hosts, port=replica_ports, external=True)
# alternative syntax
Flow().add(host=['localhost:12345','91.198.174.192:12346'], external=True)
This connects to grpc://localhost:12345
and grpc://91.198.174.192:12346
as two replicas of the external Executor.
Reducing
If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting no_reduce=True is not allowed for these cases.
Customize polling behaviors#
Replicas compete for a request, so only one of them will get the request. What if we want all replicas to get the request?
For example, considering the index and search requests:
Index (and update, delete) are handled by a single replica, as this is sufficient to add it one time.
Search requests are handled by all replicas, as you need to search over all replicas to ensure the completeness of the result. The requested data could be on any shard.
For this purpose, you need shards
and polling
.
You can define if all or any shards
receive the request by specifying polling
. ANY
means only one shard receives the request, while ALL
means that all shards receive the same request.
from jina import Flow
flow = Flow().add(name='ExecutorWithShards', shards=3, polling={'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'})
The above example results in a Flow
having the Executor ExecutorWithShards
with the following polling options:
/index
has pollingANY
(the default value is not changed here)./search
has pollingANY
as it is explicitly set (usually that should not be necessary)./custom
has pollingALL
.All other endpoints have polling
ANY
due to using*
as a wildcard to catch all other cases.
Understand behaviors of replicas and shards with polling#
The following example demonstrates the different behaviors when setting replicas
, shards
and polling
together.
from jina import Flow, Document, Executor, requests
class MyExec(Executor):
@requests
def foo(self, docs, **kwargs):
print(f'inside: {docs.texts}')
f = (
Flow()
.add(uses=MyExec, replicas=2, polling='ANY')
.needs_all()
)
with f:
r = f.post('/', Document(text='hello'))
print(f'return: {r.texts}')
We now change the combination of the yellow highlight line above and see if there is any difference in the console output (note two print in the snippet):
|
|
|
---|---|---|
|
inside: [‘hello’] return: [‘hello’] |
inside: [‘hello’] return: [‘hello’] |
|
inside: [‘hello’] inside: [‘hello’] return: [‘hello’] |
inside: [‘hello’] return: [‘hello’] |