[docs]classDockerComposeConfig:""" Class that implements the output of configuration files for docker-compose for a given Deployment. """class_DockerComposeService:def__init__(self,name:str,version:str,pod_type:PodRoleType,jina_deployment_name:str,shard_id:Optional[int],common_args:Union['Namespace',Dict],service_args:Union['Namespace',Dict],deployments_addresses:Optional[Dict[str,List[str]]]=None,):self.name=nameself.compatible_name=to_compatible_name(self.name)self.version=versionself.pod_type=pod_typeself.jina_deployment_name=jina_deployment_nameself.shard_id=shard_idself.common_args=common_argsself.service_args=service_argsself.num_replicas=getattr(self.service_args,'replicas',1)self.deployments_addresses=deployments_addressesdefget_gateway_config(self,)->Dict:importostest_pip=os.getenv('JINA_K8S_USE_TEST_PIP')isnotNoneimage_name=('jinaai/jina:test-pip'iftest_pipelsef'jinaai/jina:{self.version}-py38-standard')cargs=copy.copy(self.service_args)cargs.deployments_addresses=self.deployments_addressescargs.env=Nonefromjina.helperimportArgNamespacefromjina.parsersimportset_gateway_parsertaboo={'uses_with','uses_metas','volumes','uses_before','uses_after','workspace','workspace_id','upload_files','noblock_on_start',}non_defaults=ArgNamespace.get_non_defaults_args(cargs,set_gateway_parser(),taboo=taboo)_args=ArgNamespace.kwargs2list(non_defaults)container_args=['gateway']+_argsprotocol=str(non_defaults.get('protocol','grpc')).lower()return{'image':image_name,'entrypoint':['jina'],'command':container_args,'expose':[f'{cargs.port}',],'ports':[f'{cargs.port}:{cargs.port}',],'healthcheck':{'test':f'python -m jina.resources.health_check.gateway localhost:{cargs.port}{protocol}','interval':'2s',},}def_get_image_name(self,uses:Optional[str]):importostest_pip=os.getenv('JINA_K8S_USE_TEST_PIP')isnotNoneimage_name=('jinaai/jina:test-pip'iftest_pipelsef'jinaai/jina:{self.version}-py38-perf')ifusesisnotNoneanduses!=__default_executor__:image_name=get_image_name(uses)returnimage_namedef_get_container_args(self,cargs):uses_metas=cargs.uses_metasor{}uses_with=self.service_args.uses_withifcargs.uses!=__default_executor__:cargs.uses='config.yml'returnconstruct_runtime_container_args(cargs,uses_metas,uses_with,self.pod_type)defget_runtime_config(self,)->List[Dict]:# One Dict for replicareplica_configs=[]fori_repinrange(self.service_args.replicas):cargs=copy.copy(self.service_args)cargs.name=(f'{cargs.name}/rep-{i_rep}'ifself.service_args.replicas>1elsecargs.name)env=cargs.envimage_name=self._get_image_name(cargs.uses)container_args=self._get_container_args(cargs)config={'image':image_name,'entrypoint':['jina'],'command':container_args,'healthcheck':{'test':f'python -m jina.resources.health_check.pod localhost:{cargs.port}','interval':'2s',},}ifenvisnotNone:config['environment']=[f'{k}={v}'fork,vinenv.items()]replica_configs.append(config)returnreplica_configsdef__init__(self,args:Union['Namespace',Dict],deployments_addresses:Optional[Dict[str,List[str]]]=None,):ifnotvalidate_uses(args.uses):raiseNoContainerizedError(f'Executor "{args.uses}" is not valid to be used in docker-compose. ''You need to use a containerized Executor. You may check `jina hub --help` to see how Jina Hub can help you building containerized Executors.')self.deployments_addresses=deployments_addressesself.head_service=Noneself.uses_before_service=Noneself.uses_after_service=Noneself.args=copy.copy(args)self.name=self.args.nameself.services_args=self._get_services_args(self.args)ifself.services_args['head_service']isnotNone:self.head_service=self._DockerComposeService(name=self.services_args['head_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['head_service'],pod_type=PodRoleType.HEAD,deployments_addresses=None,)ifself.services_args['uses_before_service']isnotNone:self.uses_before_service=self._DockerComposeService(name=self.services_args['uses_before_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['uses_before_service'],pod_type=PodRoleType.WORKER,deployments_addresses=None,)ifself.services_args['uses_after_service']isnotNone:self.uses_after_service=self._DockerComposeService(name=self.services_args['uses_after_service'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,service_args=self.services_args['uses_after_service'],pod_type=PodRoleType.WORKER,deployments_addresses=None,)self.worker_services=[]services_args=self.services_args['services']fori,argsinenumerate(services_args):name=f'{self.name}-{i}'iflen(services_args)>1elsef'{self.name}'self.worker_services.append(self._DockerComposeService(name=name,version=get_base_executor_version(),shard_id=i,common_args=self.args,service_args=args,pod_type=PodRoleType.WORKERifname!='gateway'elsePodRoleType.GATEWAY,jina_deployment_name=self.name,deployments_addresses=self.deployments_addressesifname=='gateway'elseNone,))def_get_services_args(self,args):parsed_args={'head_service':None,'uses_before_service':None,'uses_after_service':None,'services':[],}shards=getattr(args,'shards',1)replicas=getattr(args,'replicas',1)uses_before=getattr(args,'uses_before',None)uses_after=getattr(args,'uses_after',None)ifargs.name!='gateway':parsed_args['head_service']=BaseDeployment._copy_to_head_args(self.args)parsed_args['head_service'].port=portparsed_args['head_service'].uses=Noneparsed_args['head_service'].uses_metas=Noneparsed_args['head_service'].uses_with=Noneparsed_args['head_service'].uses_before=Noneparsed_args['head_service'].uses_after=Noneparsed_args['head_service'].env=None# if the k8s connection pool is disabled, the connection pool is managed manuallyimportjsonconnection_list={}forshard_idinrange(shards):shard_name=f'{self.name}-{shard_id}'ifshards>1elsef'{self.name}'connection_list[str(shard_id)]=[]fori_repinrange(replicas):replica_name=(f'{shard_name}/rep-{i_rep}'ifreplicas>1elseshard_name)connection_list[str(shard_id)].append(f'{to_compatible_name(replica_name)}:{port}')parsed_args['head_service'].connection_list=json.dumps(connection_list)ifuses_before:uses_before_cargs=copy.deepcopy(args)uses_before_cargs.shard_id=0uses_before_cargs.replicas=1uses_before_cargs.name=f'{args.name}/uses-before'uses_before_cargs.uses=args.uses_beforeuses_before_cargs.uses_before=Noneuses_before_cargs.uses_after=Noneuses_before_cargs.uses_with=Noneuses_before_cargs.uses_metas=Noneuses_before_cargs.env=Noneuses_before_cargs.port=portuses_before_cargs.uses_before_address=Noneuses_before_cargs.uses_after_address=Noneuses_before_cargs.connection_list=Noneuses_before_cargs.pod_role=PodRoleType.WORKERuses_before_cargs.polling=Noneparsed_args['uses_before_service']=uses_before_cargsparsed_args['head_service'].uses_before_address=(f'{to_compatible_name(uses_before_cargs.name)}:{uses_before_cargs.port}')ifuses_after:uses_after_cargs=copy.deepcopy(args)uses_after_cargs.shard_id=0uses_after_cargs.replicas=1uses_after_cargs.name=f'{args.name}/uses-after'uses_after_cargs.uses=args.uses_afteruses_after_cargs.uses_before=Noneuses_after_cargs.uses_after=Noneuses_after_cargs.uses_with=Noneuses_after_cargs.uses_metas=Noneuses_after_cargs.env=Noneuses_after_cargs.port=portuses_after_cargs.uses_before_address=Noneuses_after_cargs.uses_after_address=Noneuses_after_cargs.connection_list=Noneuses_after_cargs.pod_role=PodRoleType.WORKERuses_after_cargs.polling=Noneparsed_args['uses_after_service']=uses_after_cargsparsed_args['head_service'].uses_after_address=(f'{to_compatible_name(uses_after_cargs.name)}:{uses_after_cargs.port}')foriinrange(shards):cargs=copy.deepcopy(args)cargs.shard_id=icargs.uses_before=Nonecargs.uses_after=Nonecargs.k8s_connection_pool=Falsecargs.uses_before_address=Nonecargs.uses_after_address=Noneifshards>1:cargs.name=f'{cargs.name}-{i}'ifargs.name=='gateway':cargs.pod_role=PodRoleType.GATEWAYelse:cargs.port=portparsed_args['services'].append(cargs)returnparsed_args
[docs]defto_docker_compose_config(self,)->List[Tuple[str,Dict]]:""" Return a list of dictionary configurations. One for each service in this Deployment .. # noqa: DAR201 .. # noqa: DAR101 """ifself.name=='gateway':return[('gateway',self.worker_services[0].get_gateway_config(),)]else:services=[]ifself.head_serviceisnotNone:services.append((self.head_service.compatible_name,self.head_service.get_runtime_config()[0],))ifself.uses_before_serviceisnotNone:services.append((self.uses_before_service.compatible_name,self.uses_before_service.get_runtime_config()[0],))ifself.uses_after_serviceisnotNone:services.append((self.uses_after_service.compatible_name,self.uses_after_service.get_runtime_config()[0],))forworker_serviceinself.worker_services:configs=worker_service.get_runtime_config()forrep_id,configinenumerate(configs):name=(f'{worker_service.name}/rep-{rep_id}'iflen(configs)>1elseworker_service.name)services.append((to_compatible_name(name),config))returnservices