[docs]classDockerComposeConfig:""" Class that implements the output of configuration files for docker-compose for a given Deployment. """class_DockerComposeService:def__init__(self,name:str,version:str,pod_type:PodRoleType,jina_deployment_name:str,shard_id:Optional[int],common_args:Union['Namespace',Dict],service_args:Union['Namespace',Dict],deployments_addresses:Optional[Dict[str,List[str]]]=None,):self.name=nameself.compatible_name=to_compatible_name(self.name)self.version=versionself.pod_type=pod_typeself.jina_deployment_name=jina_deployment_nameself.shard_id=shard_idself.common_args=common_argsself.service_args=service_argsself.num_replicas=getattr(self.service_args,'replicas',1)self.deployments_addresses=deployments_addressesdefget_gateway_config(self,)->Dict:importosimage_name=os.getenv('JINA_GATEWAY_IMAGE',f'jinaai/jina:{self.version}-py38-standard')cargs=copy.copy(self.service_args)cargs.deployments_addresses=self.deployments_addressesfromjina.helperimportArgNamespacefromjina.parsersimportset_gateway_parsertaboo={'uses_with','uses_metas','volumes','uses_before','uses_after','workspace','workspace_id','upload_files','noblock_on_start','env',}non_defaults=ArgNamespace.get_non_defaults_args(cargs,set_gateway_parser(),taboo=taboo)_args=ArgNamespace.kwargs2list(non_defaults)container_args=['gateway']+_argsprotocol=str(non_defaults.get('protocol','grpc')).lower()ports=[f'{cargs.port}']+([f'{cargs.port_monitoring}']ifcargs.monitoringelse[])envs=[f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL","INFO")}']ifcargs.env:fork,vincargs.env.items():envs.append(f'{k}={v}')return{'image':image_name,'entrypoint':['jina'],'command':container_args,'expose':ports,'ports':[f'{_port}:{_port}'for_portinports],'healthcheck':{'test':f'python -m jina.resources.health_check.gateway localhost:{cargs.port}{protocol}','interval':'2s',},'environment':envs,}def_get_image_name(self,uses:Optional[str]):importosimage_name=os.getenv('JINA_GATEWAY_IMAGE',f'jinaai/jina:{self.version}-py38-standard')ifusesisnotNoneanduses!=__default_executor__:image_name=get_image_name(uses)returnimage_namedef_get_container_args(self,cargs):uses_metas=cargs.uses_metasor{}uses_with=self.service_args.uses_withifcargs.uses!=__default_executor__:cargs.uses='config.yml'returnconstruct_runtime_container_args(cargs,uses_metas,uses_with,self.pod_type)def_update_config_with_volumes(self,config,auto_volume=True):ifself.service_args.volumes:# respect custom volume definitionconfig['volumes']=self.service_args.volumesreturnconfigifnotauto_volume:returnconfig# if no volume is given, create default volume(generated_volumes,workspace_in_container,)=generate_default_volume_and_workspace(workspace_id=self.service_args.workspace_id)config['volumes']=generated_volumesif('--workspace'notinconfig['command']):# set workspace only of not already givenconfig['command'].append('--workspace')config['command'].append(workspace_in_container)returnconfigdefget_runtime_config(self)->List[Dict]:# One Dict for replicareplica_configs=[]fori_repinrange(self.service_args.replicas):cargs=copy.copy(self.service_args)cargs.name=(f'{cargs.name}/rep-{i_rep}'ifself.service_args.replicas>1elsecargs.name)env=cargs.envimage_name=self._get_image_name(cargs.uses)container_args=self._get_container_args(cargs)config={'image':image_name,'entrypoint':['jina'],'command':container_args,'healthcheck':{'test':f'python -m jina.resources.health_check.pod localhost:{cargs.port}','interval':'2s',},'environment':[f'JINA_LOG_LEVEL={os.getenv("JINA_LOG_LEVEL","INFO")}'],}ifcargs.gpus:try:count=int(cargs.gpus)exceptValueError:count=cargs.gpusconfig['deploy']={'resources':{'reservations':{'devices':[{'driver':'nvidia','count':count,'capabilities':['gpu'],}]}}}ifcargs.monitoring:config['expose']=[cargs.port_monitoring]config['ports']=[f'{cargs.port_monitoring}:{cargs.port_monitoring}']ifenvisnotNone:config['environment']=[f'{k}={v}'fork,vinenv.items()]ifself.service_args.pod_role==PodRoleType.WORKER:config=self._update_config_with_volumes(config,auto_volume=notself.common_args.disable_auto_volume)replica_configs.append(config)returnreplica_configsdef__init__(self,args:Union['Namespace',Dict],deployments_addresses:Optional[Dict[str,List[str]]]=None,):ifnotvalidate_uses(args.uses):raiseNoContainerizedError(f'Executor "{args.uses}" is not valid to be used in docker-compose. ''You need to use a containerized Executor. You may check `jina hub --help` to see how Jina Hub can help you building containerized Executors.')self.deployments_addresses=deployments_addressesself.head_service=Noneself.uses_before_service=Noneself.uses_after_service=Noneself.args=copy.copy(args)self.name=self.args.nameself.services_args=self._get_services_args(self.args)ifself.services_args['head_service']isnotNone:self.head_service=self._DockerComposeService(name=self.services_args['head_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['head_service'],pod_type=PodRoleType.HEAD,deployments_addresses=None,)ifself.services_args['uses_before_service']isnotNone:self.uses_before_service=self._DockerComposeService(name=self.services_args['uses_before_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['uses_before_service'],pod_type=PodRoleType.WORKER,deployments_addresses=None,)ifself.services_args['uses_after_service']isnotNone:self.uses_after_service=self._DockerComposeService(name=self.services_args['uses_after_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['uses_after_service'],pod_type=PodRoleType.WORKER,deployments_addresses=None,)self.worker_services=[]services_args=self.services_args['services']fori,argsinenumerate(services_args):name=f'{self.name}-{i}'iflen(services_args)>1elsef'{self.name}'self.worker_services.append(self._DockerComposeService(name=name,version=get_base_executor_version(),shard_id=i,common_args=self.args,service_args=args,pod_type=PodRoleType.WORKERifname!='gateway'elsePodRoleType.GATEWAY,jina_deployment_name=self.name,deployments_addresses=self.deployments_addressesifname=='gateway'elseNone,))def_get_services_args(self,args):parsed_args={'head_service':None,'uses_before_service':None,'uses_after_service':None,'services':[],}shards=getattr(args,'shards',1)replicas=getattr(args,'replicas',1)uses_before=getattr(args,'uses_before',None)uses_after=getattr(args,'uses_after',None)ifargs.name!='gateway'andshards>1:parsed_args['head_service']=BaseDeployment._copy_to_head_args(self.args)parsed_args['head_service'].port=portparsed_args['head_service'].uses=Noneparsed_args['head_service'].uses_metas=Noneparsed_args['head_service'].uses_with=Noneparsed_args['head_service'].uses_before=Noneparsed_args['head_service'].uses_after=None# if the k8s connection pool is disabled, the connection pool is managed manuallyimportjsonconnection_list={}forshard_idinrange(shards):shard_name=f'{self.name}-{shard_id}'ifshards>1elsef'{self.name}'connection_list[str(shard_id)]=[]fori_repinrange(replicas):replica_name=(f'{shard_name}/rep-{i_rep}'ifreplicas>1elseshard_name)connection_list[str(shard_id)].append(f'{to_compatible_name(replica_name)}:{port}')parsed_args['head_service'].connection_list=json.dumps(connection_list)ifuses_beforeandshards>1:uses_before_cargs=copy.deepcopy(args)uses_before_cargs.shard_id=0uses_before_cargs.replicas=1uses_before_cargs.name=f'{args.name}/uses-before'uses_before_cargs.uses=args.uses_beforeuses_before_cargs.uses_before=Noneuses_before_cargs.uses_after=Noneuses_before_cargs.uses_with=Noneuses_before_cargs.uses_metas=Noneuses_before_cargs.env=Noneuses_before_cargs.port=portuses_before_cargs.uses_before_address=Noneuses_before_cargs.uses_after_address=Noneuses_before_cargs.connection_list=Noneuses_before_cargs.pod_role=PodRoleType.WORKERuses_before_cargs.polling=Noneparsed_args['uses_before_service']=uses_before_cargsparsed_args['head_service'].uses_before_address=(f'{to_compatible_name(uses_before_cargs.name)}:{uses_before_cargs.port}')ifuses_afterandshards>1:uses_after_cargs=copy.deepcopy(args)uses_after_cargs.shard_id=0uses_after_cargs.replicas=1uses_after_cargs.name=f'{args.name}/uses-after'uses_after_cargs.uses=args.uses_afteruses_after_cargs.uses_before=Noneuses_after_cargs.uses_after=Noneuses_after_cargs.uses_with=Noneuses_after_cargs.uses_metas=Noneuses_after_cargs.env=Noneuses_after_cargs.port=portuses_after_cargs.uses_before_address=Noneuses_after_cargs.uses_after_address=Noneuses_after_cargs.connection_list=Noneuses_after_cargs.pod_role=PodRoleType.WORKERuses_after_cargs.polling=Noneparsed_args['uses_after_service']=uses_after_cargsparsed_args['head_service'].uses_after_address=(f'{to_compatible_name(uses_after_cargs.name)}:{uses_after_cargs.port}')foriinrange(shards):cargs=copy.deepcopy(args)cargs.shard_id=icargs.uses_before=Nonecargs.uses_after=Nonecargs.uses_before_address=Nonecargs.uses_after_address=Noneifshards>1:cargs.name=f'{cargs.name}-{i}'ifargs.name=='gateway':cargs.pod_role=PodRoleType.GATEWAYelse:cargs.port=portparsed_args['services'].append(cargs)returnparsed_args
[docs]defto_docker_compose_config(self,)->List[Tuple[str,Dict]]:""" Return a list of dictionary configurations. One for each service in this Deployment .. # noqa: DAR201 .. # noqa: DAR101 """ifself.name=='gateway':return[('gateway',self.worker_services[0].get_gateway_config(),)]else:services=[]ifself.head_serviceisnotNone:services.append((self.head_service.compatible_name,self.head_service.get_runtime_config()[0],))ifself.uses_before_serviceisnotNone:services.append((self.uses_before_service.compatible_name,self.uses_before_service.get_runtime_config()[0],))ifself.uses_after_serviceisnotNone:services.append((self.uses_after_service.compatible_name,self.uses_after_service.get_runtime_config()[0],))forworker_serviceinself.worker_services:configs=worker_service.get_runtime_config()forrep_id,configinenumerate(configs):name=(f'{worker_service.name}/rep-{rep_id}'iflen(configs)>1elseworker_service.name)services.append((to_compatible_name(name),config))returnservices