[docs]classK8sDeploymentConfig:""" Class that implements the output of configuration files for Kubernetes for a given Deployment. """class_K8sDeployment:def__init__(self,name:str,version:str,pod_type:PodRoleType,jina_deployment_name:str,shard_id:Optional[int],common_args:Union['Namespace',Dict],deployment_args:Union['Namespace',Dict],k8s_namespace:str,k8s_deployments_addresses:Optional[Dict[str,List[str]]]=None,):self.name=nameself.dns_name=to_compatible_name(name)self.version=versionself.pod_type=pod_typeself.jina_deployment_name=jina_deployment_nameself.shard_id=shard_idself.common_args=common_argsself.deployment_args=deployment_argsself.num_replicas=getattr(self.deployment_args,'replicas',1)self.k8s_namespace=k8s_namespaceself.k8s_deployments_addresses=k8s_deployments_addressesdefget_gateway_yamls(self,)->List[Dict]:importosimage_name=os.getenv('JINA_GATEWAY_IMAGE',f'jinaai/jina:{self.version}-py38-standard')cargs=copy.copy(self.deployment_args)cargs.deployments_addresses=self.k8s_deployments_addressesfromjina.helperimportArgNamespacefromjina.parsersimportset_gateway_parsertaboo={'uses_with','uses_metas','volumes','uses_before','uses_after','workspace','workspace_id','upload_files','noblock_on_start','env',}non_defaults=ArgNamespace.get_non_defaults_args(cargs,set_gateway_parser(),taboo=taboo)_args=ArgNamespace.kwargs2list(non_defaults)container_args=['gateway']+_argsreturnkubernetes_deployment.get_deployment_yamls(self.dns_name,namespace=self.k8s_namespace,image_name=image_name,container_cmd='["jina"]',container_args=f'{container_args}',replicas=1,pull_policy='IfNotPresent',jina_deployment_name='gateway',pod_type=self.pod_type,port=self.common_args.port,env=cargs.env,monitoring=self.common_args.monitoring,port_monitoring=self.common_args.port_monitoring,)def_get_image_name(self,uses:Optional[str]):importosimage_name=os.getenv('JINA_GATEWAY_IMAGE',f'jinaai/jina:{self.version}-py38-standard')ifusesisnotNoneanduses!=__default_executor__:image_name=get_image_name(uses)returnimage_namedef_get_container_args(self,cargs,pod_type):uses_metas=cargs.uses_metasor{}uses_with=self.deployment_args.uses_withifcargs.uses!=__default_executor__:cargs.uses='config.yml'returnconstruct_runtime_container_args(cargs,uses_metas,uses_with,pod_type)defget_runtime_yamls(self,)->List[Dict]:cargs=copy.copy(self.deployment_args)image_name=self._get_image_name(cargs.uses)image_name_uses_before=(self._get_image_name(cargs.uses_before)ifhasattr(cargs,'uses_before')andcargs.uses_beforeelseNone)image_name_uses_after=(self._get_image_name(cargs.uses_after)ifhasattr(cargs,'uses_after')andcargs.uses_afterelseNone)container_args=self._get_container_args(cargs,pod_type=self.pod_type)container_args_uses_before=Noneifgetattr(cargs,'uses_before',False):uses_before_cargs=copy.copy(cargs)uses_before_cargs.uses=cargs.uses_beforeuses_before_cargs.name=f'{self.common_args.name}/uses-before'uses_before_cargs.port=GrpcConnectionPool.K8S_PORT_USES_BEFOREuses_before_cargs.uses_before_address=Noneuses_before_cargs.uses_after_address=Noneuses_before_cargs.uses_before=Noneuses_before_cargs.uses_after=Noneuses_before_cargs.uses_with=Noneuses_before_cargs.uses_metas=Noneuses_before_cargs.connection_list=Noneuses_before_cargs.runtime_cls='WorkerRuntime'uses_before_cargs.pod_role=PodRoleType.WORKERuses_before_cargs.polling=Noneuses_before_cargs.env=Nonecontainer_args_uses_before=self._get_container_args(uses_before_cargs,PodRoleType.WORKER)container_args_uses_after=Noneifgetattr(cargs,'uses_after',False):uses_after_cargs=copy.copy(cargs)uses_after_cargs.uses=cargs.uses_afteruses_after_cargs.name=f'{self.common_args.name}/uses-after'uses_after_cargs.port=GrpcConnectionPool.K8S_PORT_USES_AFTERuses_after_cargs.uses_before_address=Noneuses_after_cargs.uses_after_address=Noneuses_after_cargs.uses_before=Noneuses_after_cargs.uses_after=Noneuses_after_cargs.uses_with=Noneuses_after_cargs.uses_metas=Noneuses_after_cargs.connection_list=Noneuses_after_cargs.runtime_cls='WorkerRuntime'uses_after_cargs.pod_role=PodRoleType.WORKERuses_after_cargs.polling=Noneuses_after_cargs.env=Nonecontainer_args_uses_after=self._get_container_args(uses_after_cargs,PodRoleType.WORKER)returnkubernetes_deployment.get_deployment_yamls(self.dns_name,namespace=self.k8s_namespace,image_name=image_name,image_name_uses_after=image_name_uses_after,image_name_uses_before=image_name_uses_before,container_cmd='["jina"]',container_cmd_uses_before='["jina"]',container_cmd_uses_after='["jina"]',container_args=f'{container_args}',container_args_uses_before=container_args_uses_before,container_args_uses_after=container_args_uses_after,replicas=self.num_replicas,pull_policy='IfNotPresent',jina_deployment_name=self.jina_deployment_name,pod_type=self.pod_type,shard_id=self.shard_id,env=cargs.env,gpus=cargs.gpusifhasattr(cargs,'gpus')elseNone,monitoring=cargs.monitoring,port_monitoring=cargs.port_monitoring,)def__init__(self,args:Union['Namespace',Dict],k8s_namespace:Optional[str]=None,k8s_deployments_addresses:Optional[Dict[str,List[str]]]=None,):# External Deployments should be ignored in a K8s based Flowassertnot(hasattr(args,'external')andargs.external)ifnotvalidate_uses(args.uses):raiseNoContainerizedError(f'Executor "{args.uses}" is not valid to be used in K8s. ''You need to use a containerized Executor. You may check `jina hub --help` to see how Jina Hub can help you building containerized Executors.')self.k8s_namespace=k8s_namespaceself.k8s_deployments_addresses=k8s_deployments_addressesself.head_deployment=Noneself.args=copy.copy(args)ifk8s_namespaceisnotNone:# otherwise it will remain with the one from the original Deploymentself.args.k8s_namespace=k8s_namespaceself.name=self.args.nameself.deployment_args=self._get_deployment_args(self.args)ifself.deployment_args['head_deployment']isnotNone:self.head_deployment=self._K8sDeployment(name=self.deployment_args['head_deployment'].name,version=get_base_executor_version(),shard_id=None,jina_deployment_name=self.name,common_args=self.args,deployment_args=self.deployment_args['head_deployment'],pod_type=PodRoleType.HEAD,k8s_namespace=self.k8s_namespace,k8s_deployments_addresses=self.k8s_deployments_addresses,)self.worker_deployments=[]deployment_args=self.deployment_args['deployments']fori,argsinenumerate(deployment_args):name=f'{self.name}-{i}'iflen(deployment_args)>1elsef'{self.name}'self.worker_deployments.append(self._K8sDeployment(name=name,version=get_base_executor_version(),shard_id=i,common_args=self.args,deployment_args=args,pod_type=PodRoleType.WORKERifname!='gateway'elsePodRoleType.GATEWAY,jina_deployment_name=self.name,k8s_namespace=self.k8s_namespace,k8s_deployments_addresses=self.k8s_deployments_addressesifname=='gateway'elseNone,))def_get_deployment_args(self,args):parsed_args={'head_deployment':None,'deployments':[],}shards=getattr(args,'shards',1)uses_before=getattr(args,'uses_before',None)uses_after=getattr(args,'uses_after',None)ifargs.name!='gateway':# head deployment only exists for sharded deploymentsifshards>1:parsed_args['head_deployment']=BaseDeployment._copy_to_head_args(self.args)parsed_args['head_deployment'].gpus=Noneparsed_args['head_deployment'].port=GrpcConnectionPool.K8S_PORTparsed_args['head_deployment'].uses=Noneparsed_args['head_deployment'].uses_metas=Noneparsed_args['head_deployment'].uses_with=Noneimportjsonconnection_list={}foriinrange(shards):name=(f'{to_compatible_name(self.name)}-{i}'ifshards>1elsef'{to_compatible_name(self.name)}')connection_list[str(i)]=f'{name}.{self.k8s_namespace}.svc:{GrpcConnectionPool.K8S_PORT}'parsed_args['head_deployment'].connection_list=json.dumps(connection_list)ifuses_before:parsed_args['head_deployment'].uses_before_address=(f'127.0.0.1:{GrpcConnectionPool.K8S_PORT_USES_BEFORE}')ifuses_after:parsed_args['head_deployment'].uses_after_address=(f'127.0.0.1:{GrpcConnectionPool.K8S_PORT_USES_AFTER}')foriinrange(shards):cargs=copy.deepcopy(args)cargs.shard_id=icargs.uses_before=Nonecargs.uses_after=Noneifargs.name!='gateway':cargs.port=GrpcConnectionPool.K8S_PORTcargs.uses_before_address=Nonecargs.uses_after_address=Noneifshards>1:cargs.name=f'{cargs.name}-{i}'ifargs.name=='gateway':cargs.pod_role=PodRoleType.GATEWAYparsed_args['deployments'].append(cargs)returnparsed_args
[docs]defto_k8s_yaml(self,)->List[Tuple[str,List[Dict]]]:""" Return a list of dictionary configurations. One for each deployment in this Deployment .. # noqa: DAR201 .. # noqa: DAR101 """ifself.name=='gateway':return[('gateway',self.worker_deployments[0].get_gateway_yamls(),)]else:deployments=[]ifself.head_deployment:deployments.append(self.head_deployment)deployments.extend(self.worker_deployments)return[(deployment.dns_name,deployment.get_runtime_yamls(),)fordeploymentindeployments]