Source code for jina.orchestrate.deployments.config.docker_compose

import copy
import os
from argparse import Namespace
from typing import Dict, List, Optional, Tuple, Union

from jina import __default_executor__
from jina.enums import PodRoleType
from jina.excepts import NoContainerizedError
from jina.orchestrate.deployments import BaseDeployment
from jina.orchestrate.deployments.config.helper import (
    construct_runtime_container_args,
    get_base_executor_version,
    get_image_name,
    to_compatible_name,
    validate_uses,
)
from jina.orchestrate.helper import generate_default_volume_and_workspace

port = 8081


[docs]class DockerComposeConfig: """ Class that implements the output of configuration files for docker-compose for a given Deployment. """ class _DockerComposeService: def __init__( self, name: str, version: str, pod_type: PodRoleType, jina_deployment_name: str, shard_id: Optional[int], common_args: Union['Namespace', Dict], service_args: Union['Namespace', Dict], deployments_addresses: Optional[Dict[str, List[str]]] = None, ): self.name = name self.compatible_name = to_compatible_name(self.name) self.version = version self.pod_type = pod_type self.jina_deployment_name = jina_deployment_name self.shard_id = shard_id self.common_args = common_args self.service_args = service_args self.num_replicas = getattr(self.service_args, 'replicas', 1) self.deployments_addresses = deployments_addresses def get_gateway_config( self, ) -> Dict: import os image_name = os.getenv( 'JINA_GATEWAY_IMAGE', f'jinaai/jina:{self.version}-py38-standard' ) cargs = copy.copy(self.service_args) cargs.deployments_addresses = self.deployments_addresses from jina.helper import ArgNamespace from jina.parsers import set_gateway_parser taboo = { 'uses_with', 'uses_metas', 'volumes', 'uses_before', 'uses_after', 'workspace', 'workspace_id', 'upload_files', 'noblock_on_start', 'env', } non_defaults = ArgNamespace.get_non_defaults_args( cargs, set_gateway_parser(), taboo=taboo ) _args = ArgNamespace.kwargs2list(non_defaults) container_args = ['gateway'] + _args protocol = str(non_defaults.get('protocol', 'grpc')).lower() ports = [f'{cargs.port}'] + ( [f'{cargs.port_monitoring}'] if cargs.monitoring else [] ) envs = [f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL", "INFO")}'] if cargs.env: for k, v in cargs.env.items(): envs.append(f'{k}={v}') return { 'image': image_name, 'entrypoint': ['jina'], 'command': container_args, 'expose': ports, 'ports': [f'{_port}:{_port}' for _port in ports], 'healthcheck': { 'test': f'python -m jina.resources.health_check.gateway localhost:{cargs.port} {protocol}', 'interval': '2s', }, 'environment': envs, } def _get_image_name(self, uses: Optional[str]): import os image_name = os.getenv( 'JINA_GATEWAY_IMAGE', f'jinaai/jina:{self.version}-py38-standard' ) if uses is not None and uses != __default_executor__: image_name = get_image_name(uses) return image_name def _get_container_args(self, cargs): uses_metas = cargs.uses_metas or {} uses_with = self.service_args.uses_with if cargs.uses != __default_executor__: cargs.uses = 'config.yml' return construct_runtime_container_args( cargs, uses_metas, uses_with, self.pod_type ) def _update_config_with_volumes(self, config, auto_volume=True): if self.service_args.volumes: # respect custom volume definition config['volumes'] = self.service_args.volumes return config if not auto_volume: return config # if no volume is given, create default volume ( generated_volumes, workspace_in_container, ) = generate_default_volume_and_workspace( workspace_id=self.service_args.workspace_id ) config['volumes'] = generated_volumes if ( '--workspace' not in config['command'] ): # set workspace only of not already given config['command'].append('--workspace') config['command'].append(workspace_in_container) return config def get_runtime_config(self) -> List[Dict]: # One Dict for replica replica_configs = [] for i_rep in range(self.service_args.replicas): cargs = copy.copy(self.service_args) cargs.name = ( f'{cargs.name}/rep-{i_rep}' if self.service_args.replicas > 1 else cargs.name ) env = cargs.env image_name = self._get_image_name(cargs.uses) container_args = self._get_container_args(cargs) config = { 'image': image_name, 'entrypoint': ['jina'], 'command': container_args, 'healthcheck': { 'test': f'python -m jina.resources.health_check.pod localhost:{cargs.port}', 'interval': '2s', }, 'environment': [ f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL", "INFO")}' ], } if cargs.gpus: try: count = int(cargs.gpus) except ValueError: count = cargs.gpus config['deploy'] = { 'resources': { 'reservations': { 'devices': [ { 'driver': 'nvidia', 'count': count, 'capabilities': ['gpu'], } ] } } } if cargs.monitoring: config['expose'] = [cargs.port_monitoring] config['ports'] = [ f'{cargs.port_monitoring}:{cargs.port_monitoring}' ] if env is not None: config['environment'] = [f'{k}={v}' for k, v in env.items()] if self.service_args.pod_role == PodRoleType.WORKER: config = self._update_config_with_volumes( config, auto_volume=not self.common_args.disable_auto_volume ) replica_configs.append(config) return replica_configs def __init__( self, args: Union['Namespace', Dict], deployments_addresses: Optional[Dict[str, List[str]]] = None, ): if not validate_uses(args.uses): raise NoContainerizedError( f'Executor "{args.uses}" is not valid to be used in docker-compose. ' 'You need to use a containerized Executor. You may check `jina hub --help` to see how Jina Hub can help you building containerized Executors.' ) self.deployments_addresses = deployments_addresses self.head_service = None self.uses_before_service = None self.uses_after_service = None self.args = copy.copy(args) self.name = self.args.name self.services_args = self._get_services_args(self.args) if self.services_args['head_service'] is not None: self.head_service = self._DockerComposeService( name=self.services_args['head_service'].name, version=get_base_executor_version(), shard_id=None, jina_deployment_name=self.name, common_args=self.args, service_args=self.services_args['head_service'], pod_type=PodRoleType.HEAD, deployments_addresses=None, ) if self.services_args['uses_before_service'] is not None: self.uses_before_service = self._DockerComposeService( name=self.services_args['uses_before_service'].name, version=get_base_executor_version(), shard_id=None, jina_deployment_name=self.name, common_args=self.args, service_args=self.services_args['uses_before_service'], pod_type=PodRoleType.WORKER, deployments_addresses=None, ) if self.services_args['uses_after_service'] is not None: self.uses_after_service = self._DockerComposeService( name=self.services_args['uses_after_service'].name, version=get_base_executor_version(), shard_id=None, jina_deployment_name=self.name, common_args=self.args, service_args=self.services_args['uses_after_service'], pod_type=PodRoleType.WORKER, deployments_addresses=None, ) self.worker_services = [] services_args = self.services_args['services'] for i, args in enumerate(services_args): name = f'{self.name}-{i}' if len(services_args) > 1 else f'{self.name}' self.worker_services.append( self._DockerComposeService( name=name, version=get_base_executor_version(), shard_id=i, common_args=self.args, service_args=args, pod_type=PodRoleType.WORKER if name != 'gateway' else PodRoleType.GATEWAY, jina_deployment_name=self.name, deployments_addresses=self.deployments_addresses if name == 'gateway' else None, ) ) def _get_services_args(self, args): parsed_args = { 'head_service': None, 'uses_before_service': None, 'uses_after_service': None, 'services': [], } shards = getattr(args, 'shards', 1) replicas = getattr(args, 'replicas', 1) uses_before = getattr(args, 'uses_before', None) uses_after = getattr(args, 'uses_after', None) if args.name != 'gateway' and shards > 1: parsed_args['head_service'] = BaseDeployment._copy_to_head_args(self.args) parsed_args['head_service'].port = port parsed_args['head_service'].uses = None parsed_args['head_service'].uses_metas = None parsed_args['head_service'].uses_with = None parsed_args['head_service'].uses_before = None parsed_args['head_service'].uses_after = None # if the k8s connection pool is disabled, the connection pool is managed manually import json connection_list = {} for shard_id in range(shards): shard_name = f'{self.name}-{shard_id}' if shards > 1 else f'{self.name}' connection_list[str(shard_id)] = [] for i_rep in range(replicas): replica_name = ( f'{shard_name}/rep-{i_rep}' if replicas > 1 else shard_name ) connection_list[str(shard_id)].append( f'{to_compatible_name(replica_name)}:{port}' ) parsed_args['head_service'].connection_list = json.dumps(connection_list) if uses_before and shards > 1: uses_before_cargs = copy.deepcopy(args) uses_before_cargs.shard_id = 0 uses_before_cargs.replicas = 1 uses_before_cargs.name = f'{args.name}/uses-before' uses_before_cargs.uses = args.uses_before uses_before_cargs.uses_before = None uses_before_cargs.uses_after = None uses_before_cargs.uses_with = None uses_before_cargs.uses_metas = None uses_before_cargs.env = None uses_before_cargs.port = port uses_before_cargs.uses_before_address = None uses_before_cargs.uses_after_address = None uses_before_cargs.connection_list = None uses_before_cargs.pod_role = PodRoleType.WORKER uses_before_cargs.polling = None parsed_args['uses_before_service'] = uses_before_cargs parsed_args[ 'head_service' ].uses_before_address = ( f'{to_compatible_name(uses_before_cargs.name)}:{uses_before_cargs.port}' ) if uses_after and shards > 1: uses_after_cargs = copy.deepcopy(args) uses_after_cargs.shard_id = 0 uses_after_cargs.replicas = 1 uses_after_cargs.name = f'{args.name}/uses-after' uses_after_cargs.uses = args.uses_after uses_after_cargs.uses_before = None uses_after_cargs.uses_after = None uses_after_cargs.uses_with = None uses_after_cargs.uses_metas = None uses_after_cargs.env = None uses_after_cargs.port = port uses_after_cargs.uses_before_address = None uses_after_cargs.uses_after_address = None uses_after_cargs.connection_list = None uses_after_cargs.pod_role = PodRoleType.WORKER uses_after_cargs.polling = None parsed_args['uses_after_service'] = uses_after_cargs parsed_args[ 'head_service' ].uses_after_address = ( f'{to_compatible_name(uses_after_cargs.name)}:{uses_after_cargs.port}' ) for i in range(shards): cargs = copy.deepcopy(args) cargs.shard_id = i cargs.uses_before = None cargs.uses_after = None cargs.uses_before_address = None cargs.uses_after_address = None if shards > 1: cargs.name = f'{cargs.name}-{i}' if args.name == 'gateway': cargs.pod_role = PodRoleType.GATEWAY else: cargs.port = port parsed_args['services'].append(cargs) return parsed_args
[docs] def to_docker_compose_config( self, ) -> List[Tuple[str, Dict]]: """ Return a list of dictionary configurations. One for each service in this Deployment .. # noqa: DAR201 .. # noqa: DAR101 """ if self.name == 'gateway': return [ ( 'gateway', self.worker_services[0].get_gateway_config(), ) ] else: services = [] if self.head_service is not None: services.append( ( self.head_service.compatible_name, self.head_service.get_runtime_config()[0], ) ) if self.uses_before_service is not None: services.append( ( self.uses_before_service.compatible_name, self.uses_before_service.get_runtime_config()[0], ) ) if self.uses_after_service is not None: services.append( ( self.uses_after_service.compatible_name, self.uses_after_service.get_runtime_config()[0], ) ) for worker_service in self.worker_services: configs = worker_service.get_runtime_config() for rep_id, config in enumerate(configs): name = ( f'{worker_service.name}/rep-{rep_id}' if len(configs) > 1 else worker_service.name ) services.append((to_compatible_name(name), config)) return services