[docs]classBaseDeployment(ExitStack):"""A BaseDeployment is an immutable set of pods. Internally, the pods can run with the process/thread backend. They can be also run in their own containers on remote machines. """
[docs]@abstractmethoddefstart(self)->'BaseDeployment':"""Start to run all :class:`Pod` in this BaseDeployment. .. note:: If one of the :class:`Pod` fails to start, make sure that all of them are properly closed. """...
@staticmethoddef_set_upload_files(args):# sets args.upload_files at the deployment level so that pods inherit from it.# all pods work under one remote workspace, hence important to have upload_files set for alldefvalid_path(path):try:complete_path(path)returnTrueexceptFileNotFoundError:returnFalse_upload_files=set()forparamin['uses','uses_before','uses_after']:param_value=getattr(args,param,None)ifparam_valueandvalid_path(param_value):_upload_files.add(param_value)ifgetattr(args,'py_modules',None):_upload_files.update({py_moduleforpy_moduleinargs.py_modulesifvalid_path(py_module)})ifgetattr(args,'upload_files',None):_upload_files.update({upload_fileforupload_fileinargs.upload_filesifvalid_path(upload_file)})returnlist(_upload_files)@propertydefrole(self)->'DeploymentRoleType':"""Return the role of this :class:`BaseDeployment`. .. # noqa: DAR201 """returnself.args.deployment_role@propertydefname(self)->str:"""The name of this :class:`BaseDeployment`. .. # noqa: DAR201 """returnself.args.name@propertydefhead_host(self)->str:"""Get the host of the HeadPod of this deployment .. # noqa: DAR201 """returnself.head_args.hostifself.head_argselseNone@propertydefhead_port(self):"""Get the port of the HeadPod of this deployment .. # noqa: DAR201 """returnself.head_args.portifself.head_argselseNonedef__enter__(self)->'BaseDeployment':withCatchAllCleanupContextManager(self):returnself.start()@staticmethoddef_copy_to_head_args(args:Namespace)->Namespace:""" Set the outgoing args of the head router :param args: basic arguments :return: enriched head arguments """_head_args=copy.deepcopy(args)_head_args.polling=args.pollingifnothasattr(args,'port')ornotargs.port:_head_args.port=helper.random_port()else:_head_args.port=args.port_head_args.uses=args.uses_head_args.pod_role=PodRoleType.HEAD_head_args.runtime_cls='HeadRuntime'_head_args.replicas=1ifargs.name:_head_args.name=f'{args.name}/head'else:_head_args.name=f'head'return_head_args@property@abstractmethoddefhead_args(self)->Namespace:"""Get the arguments for the `head` of this BaseDeployment. .. # noqa: DAR201 """...
[docs]@abstractmethoddefjoin(self):"""Wait until all deployment and pods exit."""...
@property@abstractmethoddef_mermaid_str(self)->List[str]:"""String that will be used to represent the Deployment graphically when `Flow.plot()` is invoked .. # noqa: DAR201 """...@propertydefdeployments(self)->List[Dict]:"""Get deployments of the deployment. The BaseDeployment just gives one deployment. :return: list of deployments """return[{'name':self.name,'head_host':self.head_host,'head_port':self.head_port,}]
[docs]classDeployment(BaseDeployment):"""A Deployment is an immutable set of pods, which run in replicas. They share the same input and output socket. Internally, the pods can run with the process/thread backend. They can be also run in their own containers :param args: arguments parsed from the CLI :param needs: deployments names of preceding deployments, the output of these deployments are going into the input of this deployment """class_ReplicaSet:def__init__(self,deployment_args:Namespace,args:List[Namespace],head_pod,):self.deployment_args=copy.copy(deployment_args)self.args=argsself.shard_id=args[0].shard_idself._pods=[]self.head_pod=head_pod@propertydefis_ready(self):returnall(p.is_ready.is_set()forpinself._pods)defclear_pods(self):self._pods.clear()@propertydefnum_pods(self):returnlen(self._pods)defjoin(self):forpodinself._pods:pod.join()defwait_start_success(self):forpodinself._pods:pod.wait_start_success()def__enter__(self):for_argsinself.args:ifgetattr(self.deployment_args,'noblock_on_start',False):_args.noblock_on_start=Trueself._pods.append(PodFactory.build_pod(_args).start())returnselfdef__exit__(self,exc_type,exc_val,exc_tb):closing_exception=Noneforpodinself._pods:try:pod.close()exceptExceptionasexc:ifclosing_exceptionisNone:closing_exception=excifexc_valisNoneandclosing_exceptionisnotNone:raiseclosing_exceptiondef__init__(self,args:Union['Namespace',Dict],needs:Optional[Set[str]]=None):super().__init__()args.upload_files=BaseDeployment._set_upload_files(args)self.args=argsself.args.polling=(args.pollingifhasattr(args,'polling')elsePollingType.ANY)# polling only works for shards, if there are none, polling will be ignoredifgetattr(args,'shards',1)==1:self.args.polling=PollingType.ANYself.needs=(needsorset())#: used in the :class:`jina.flow.Flow` to build the graphself.uses_before_pod=Noneself.uses_after_pod=Noneself.head_pod=Noneself.shards={}self.update_pod_args()def__exit__(self,exc_type,exc_val,exc_tb)->None:super().__exit__(exc_type,exc_val,exc_tb)self.join()
[docs]defupdate_pod_args(self):"""Update args of all its pods based on Deployment args. Including head/tail"""ifisinstance(self.args,Dict):# This is used when a Deployment is created in a remote context, where pods & their connections are already given.self.pod_args=self.argselse:self.pod_args=self._parse_args(self.args)ifself.is_sandbox:host,port=HubIO.deploy_public_sandbox(self.args)self.first_pod_args.host=hostself.first_pod_args.port=portifself.head_args:self.pod_args['head'].host=hostself.pod_args['head'].port=port
[docs]defupdate_worker_pod_args(self):"""Update args of all its worker pods based on Deployment args. Does not touch head and tail"""self.pod_args['pods']=self._set_pod_args(self.args)
@propertydefis_sandbox(self)->bool:""" Check if this deployment is a sandbox. :return: True if this deployment is provided as a sandbox, False otherwise """uses=getattr(self.args,'uses','')is_sandbox=uses.startswith('jinahub+sandbox://')returnis_sandbox@propertydeftls_enabled(self):""" Checks whether secure connection via tls is enabled for this Deployment. :return: True if tls is enabled, False otherwise """has_cert=getattr(self.args,'ssl_certfile',None)isnotNonehas_key=getattr(self.args,'ssl_keyfile',None)isnotNonereturnself.is_sandboxor(has_certandhas_key)@propertydefexternal(self)->bool:""" Check if this deployment is external. :return: True if this deployment is provided as an external deployment, False otherwise """returngetattr(self.args,'external',False)orself.is_sandbox@propertydefprotocol(self):""" :return: the protocol of this deployment """protocol=getattr(self.args,'protocol','grpc')returnstr(protocol)+('s'ifself.tls_enabledelse'')@propertydeffirst_pod_args(self)->Namespace:"""Return the first worker pod's args .. # noqa: DAR201 """# note this will be never out of boundaryreturnself.pod_args['pods'][0][0]@propertydefhost(self)->str:"""Get the host name of this deployment .. # noqa: DAR201 """returnself.first_pod_args.host@propertydefport(self):""" :return: the port of this deployment """returnself.first_pod_args.port@propertydefports(self)->List[int]:"""Returns a list of ports exposed by this Deployment. Exposed means these are the ports a Client/Gateway is supposed to communicate with. For sharded deployments this will be the head_port. For non sharded deployments it will be all replica ports .. # noqa: DAR201 """ifself.head_port:return[self.head_port]else:ports=[]forreplicainself.pod_args['pods'][0]:ports.append(replica.port)returnports@propertydefdockerized_uses(self)->bool:"""Checks if this Deployment uses a dockerized Executor .. # noqa: DAR201 """returnself.args.uses.startswith('docker://')orself.args.uses.startswith('jinahub+docker://')def_parse_args(self,args:Namespace)->Dict[str,Optional[Union[List[Namespace],Namespace]]]:returnself._parse_base_deployment_args(args)@propertydefhead_args(self)->Namespace:"""Get the arguments for the `head` of this Deployment. .. # noqa: DAR201 """returnself.pod_args['head']@head_args.setterdefhead_args(self,args):"""Set the arguments for the `head` of this Deployment. .. # noqa: DAR101 """self.pod_args['head']=args@propertydefuses_before_args(self)->Namespace:"""Get the arguments for the `uses_before` of this Deployment. .. # noqa: DAR201 """returnself.pod_args['uses_before']@uses_before_args.setterdefuses_before_args(self,args):"""Set the arguments for the `uses_before` of this Deployment. .. # noqa: DAR101 """self.pod_args['uses_before']=args@propertydefuses_after_args(self)->Namespace:"""Get the arguments for the `uses_after` of this Deployment. .. # noqa: DAR201 """returnself.pod_args['uses_after']@uses_after_args.setterdefuses_after_args(self,args):"""Set the arguments for the `uses_after` of this Deployment. .. # noqa: DAR101 """self.pod_args['uses_after']=args@propertydefall_args(self)->List[Namespace]:"""Get all arguments of all Pods in this BaseDeployment. .. # noqa: DAR201 """all_args=(([self.pod_args['uses_before']]ifself.pod_args['uses_before']else[])+([self.pod_args['uses_after']]ifself.pod_args['uses_after']else[])+([self.pod_args['head']]ifself.pod_args['head']else[]))forshard_idinself.pod_args['pods']:all_args+=self.pod_args['pods'][shard_id]returnall_args@propertydefnum_pods(self)->int:"""Get the number of running :class:`Pod` .. # noqa: DAR201 """num_pods=0ifself.head_podisnotNone:num_pods+=1ifself.uses_before_podisnotNone:num_pods+=1ifself.uses_after_podisnotNone:num_pods+=1ifself.shards:# external deploymentsforshard_idinself.shards:num_pods+=self.shards[shard_id].num_podsreturnnum_podsdef__eq__(self,other:'BaseDeployment'):returnself.num_pods==other.num_podsandself.name==other.name
[docs]defactivate(self):""" Activate all worker pods in this deployment by registering them with the head """ifself.head_podisnotNone:forshard_idinself.pod_args['pods']:forpod_idx,pod_argsinenumerate(self.pod_args['pods'][shard_id]):worker_host=self.get_worker_host(pod_args,self.shards[shard_id]._pods[pod_idx],self.head_pod)GrpcConnectionPool.activate_worker_sync(worker_host,int(pod_args.port),self.head_pod.runtime_ctrl_address,shard_id,)
[docs]@staticmethoddefget_worker_host(pod_args,pod,head_pod):""" Check if the current pod and head are both containerized on the same host If so __docker_host__ needs to be advertised as the worker's address to the head :param pod_args: arguments of the worker pod :param pod: the worker pod :param head_pod: head pod communicating with the worker pod :return: host to use in activate messages """# Check if the current pod and head are both containerized on the same host# If so __docker_host__ needs to be advertised as the worker's address to the headworker_host=(__docker_host__ifDeployment._is_container_to_container(pod,head_pod)andhost_is_local(pod_args.host)elsepod_args.host)returnworker_host
@staticmethoddef_is_container_to_container(pod,head_pod):# Check if both shard_id/pod_idx and the head are containerized# if the head is not containerized, it still could mean that the deployment itself is containerizedreturntype(pod)==ContainerPodand(type(head_pod)==ContainerPodorin_docker())
[docs]defstart(self)->'Deployment':""" Start to run all :class:`Pod` in this BaseDeployment. :return: started deployment .. note:: If one of the :class:`Pod` fails to start, make sure that all of them are properly closed. """ifself.pod_args['uses_before']isnotNone:_args=self.pod_args['uses_before']ifgetattr(self.args,'noblock_on_start',False):_args.noblock_on_start=Trueself.uses_before_pod=PodFactory.build_pod(_args)self.enter_context(self.uses_before_pod)ifself.pod_args['uses_after']isnotNone:_args=self.pod_args['uses_after']ifgetattr(self.args,'noblock_on_start',False):_args.noblock_on_start=Trueself.uses_after_pod=PodFactory.build_pod(_args)self.enter_context(self.uses_after_pod)ifself.pod_args['head']isnotNone:_args=self.pod_args['head']ifgetattr(self.args,'noblock_on_start',False):_args.noblock_on_start=Trueself.head_pod=PodFactory.build_pod(_args)self.enter_context(self.head_pod)forshard_idinself.pod_args['pods']:self.shards[shard_id]=self._ReplicaSet(self.args,self.pod_args['pods'][shard_id],self.head_pod,)self.enter_context(self.shards[shard_id])ifnotgetattr(self.args,'noblock_on_start',False):self.activate()returnself
[docs]defwait_start_success(self)->None:"""Block until all pods starts successfully. If not successful, it will raise an error hoping the outer function to catch it """ifnotself.args.noblock_on_start:raiseValueError(f'{self.wait_start_success!r} should only be called when `noblock_on_start` is set to True')try:ifself.uses_before_podisnotNone:self.uses_before_pod.wait_start_success()ifself.uses_after_podisnotNone:self.uses_after_pod.wait_start_success()ifself.head_podisnotNone:self.head_pod.wait_start_success()forshard_idinself.shards:self.shards[shard_id].wait_start_success()self.activate()except:self.close()raise
[docs]defjoin(self):"""Wait until all pods exit"""try:ifself.uses_before_podisnotNone:self.uses_before_pod.join()ifself.uses_after_podisnotNone:self.uses_after_pod.join()ifself.head_podisnotNone:self.head_pod.join()ifself.shards:forshard_idinself.shards:self.shards[shard_id].join()exceptKeyboardInterrupt:passfinally:self.head_pod=Noneifself.shards:forshard_idinself.shards:self.shards[shard_id].clear_pods()
@propertydefis_ready(self)->bool:"""Checks if Deployment is ready .. note:: A Deployment is ready when all the Pods it contains are ready .. # noqa: DAR201 """is_ready=Trueifself.head_podisnotNone:is_ready=self.head_pod.is_ready.is_set()ifis_ready:forshard_idinself.shards:is_ready=self.shards[shard_id].is_readyifis_readyandself.uses_before_podisnotNone:is_ready=self.uses_before_pod.is_ready.is_set()ifis_readyandself.uses_after_podisnotNone:is_ready=self.uses_after_pod.is_ready.is_set()returnis_ready@staticmethoddef_set_pod_args(args:Namespace)->Dict[int,List[Namespace]]:result={}sharding_enabled=args.shardsandargs.shards>1forshard_idinrange(args.shards):replica_args=[]forreplica_idinrange(args.replicas):_args=copy.deepcopy(args)_args.shard_id=shard_id_args.pod_role=PodRoleType.WORKER_args.host=args.hostif_args.name:_args.name+=(f'/shard-{shard_id}/rep-{replica_id}'ifsharding_enabledelsef'/rep-{replica_id}')else:_args.name=f'{replica_id}'# the gateway needs to respect the assigned portifargs.deployment_role==DeploymentRoleType.GATEWAYorargs.external:_args.port=args.portelifargs.shards==1andargs.replicas==1:# if there are no shards/replicas, we dont need to distribute ports randomly# we should rather use the pre assigned oneargs.port=args.portelse:_args.port=helper.random_port()_args.port_monitoring=helper.random_port()# pod workspace if not set then derive from workspaceifnot_args.workspace:_args.workspace=args.workspacereplica_args.append(_args)result[shard_id]=replica_argsreturnresult@staticmethoddef_set_uses_before_after_args(args:Namespace,entity_type:str)->Namespace:_args=copy.deepcopy(args)_args.pod_role=PodRoleType.WORKER_args.host=__default_host___args.port=helper.random_port()if_args.name:_args.name+=f'/{entity_type}-0'else:_args.name=f'{entity_type}-0'if'uses_before'==entity_type:_args.uses_requests=None_args.uses=args.uses_beforeor__default_executor__elif'uses_after'==entity_type:_args.uses_requests=None_args.uses=args.uses_afteror__default_executor__else:raiseValueError(f'uses_before/uses_after pod does not support type {entity_type}')# pod workspace if not set then derive from workspaceifnot_args.workspace:_args.workspace=args.workspacereturn_argsdef_parse_base_deployment_args(self,args):parsed_args={'head':None,'uses_before':None,'uses_after':None,'pods':{},}# a gateway has no heads and uses# also there a no heads created, if there are no shardsifself.role!=DeploymentRoleType.GATEWAYandgetattr(args,'shards',1)>1:if(getattr(args,'uses_before',None)andargs.uses_before!=__default_executor__):uses_before_args=self._set_uses_before_after_args(args,entity_type='uses_before')parsed_args['uses_before']=uses_before_argsargs.uses_before_address=(f'{uses_before_args.host}:{uses_before_args.port}')if(getattr(args,'uses_after',None)andargs.uses_after!=__default_executor__):uses_after_args=self._set_uses_before_after_args(args,entity_type='uses_after')parsed_args['uses_after']=uses_after_argsargs.uses_after_address=(f'{uses_after_args.host}:{uses_after_args.port}')parsed_args['head']=BaseDeployment._copy_to_head_args(args)parsed_args['pods']=self._set_pod_args(args)returnparsed_args@propertydef_mermaid_str(self)->List[str]:"""String that will be used to represent the Deployment graphically when `Flow.plot()` is invoked. It does not include used_before/uses_after .. # noqa: DAR201 """mermaid_graph=[]ifself.role!=DeploymentRoleType.GATEWAYandnotself.external:mermaid_graph=[f'subgraph {self.name};',f'\ndirection LR;\n']uses_before_name=(self.uses_before_args.nameifself.uses_before_argsisnotNoneelseNone)uses_before_uses=(self.uses_before_args.usesifself.uses_before_argsisnotNoneelseNone)uses_after_name=(self.uses_after_args.nameifself.uses_after_argsisnotNoneelseNone)uses_after_uses=(self.uses_after_args.usesifself.uses_after_argsisnotNoneelseNone)shard_names=[]iflen(self.pod_args['pods'])>1:# multiple shardsforshard_id,pod_argsinself.pod_args['pods'].items():shard_name=f'{self.name}/shard-{shard_id}'shard_names.append(shard_name)shard_mermaid_graph=[f'subgraph {shard_name};',f'\ndirection TB;\n',]names=[args.nameforargsinpod_args]# all the names of each of the replicasuses=[args.usesforargsinpod_args]# all the uses should be the same but let's keep it this# wayforrep_i,(name,use)inenumerate(zip(names,uses)):escaped_uses=f'"{use}"'shard_mermaid_graph.append(f'{name}[{escaped_uses}]:::pod;')shard_mermaid_graph.append('end;')shard_mermaid_graph=[node.replace(';','\n')fornodeinshard_mermaid_graph]mermaid_graph.extend(shard_mermaid_graph)mermaid_graph.append('\n')ifuses_before_nameisnotNone:forshard_nameinshard_names:escaped_uses_before_uses=f'"{uses_before_uses}"'mermaid_graph.append(f'{self.args.name}-head[{escaped_uses_before_uses}]:::HEADTAIL --> {shard_name};')ifuses_after_nameisnotNone:forshard_nameinshard_names:escaped_uses_after_uses=f'"{uses_after_uses}"'mermaid_graph.append(f'{shard_name} --> {self.args.name}-tail[{escaped_uses_after_uses}]:::HEADTAIL;')else:# single shard case, no uses_before or uses_after_consideredname=list(self.pod_args['pods'].values())[0][0].nameuses=f'"{list(self.pod_args["pods"].values())[0][0].uses}"'num_replicas=list(self.pod_args['pods'].values())[0][0].replicas# just put the replicas in parallelifnum_replicas>1:forrep_iinrange(num_replicas):mermaid_graph.append(f'{name}/rep-{rep_i}[{uses}]:::pod;')else:mermaid_graph.append(f'{name}[{uses}]:::pod;')mermaid_graph.append('end;')returnmermaid_graph