importosimportargparsefromtypingimportDict,Anyfromjina.jaml.parsers.baseimportVersionedYAMLParserfromjinaimportFlowfromjina.enumsimportDeploymentRoleTypefromjina.helperimportexpand_env_var,ArgNamespacefromjina.parsersimportset_deployment_parser,set_gateway_parserdef_get_taboo(parser:argparse.ArgumentParser):""" :param parser: deployment or gateway parser :return: set of keys that should not be dumped """return{k.destforkinparser._actionsifk.help==argparse.SUPPRESS}
[docs]classV1Parser(VersionedYAMLParser):"""V1Parser introduces new syntax and features: - It has a top-level field ``version`` - ``deployments`` is now a List of Dict (rather than a Dict as prev.) - ``name`` is now optional - new field ``method`` can be used to specify how to add this Deployment into the Flow, availables are: - ``add``: (default) equal to `Flow.add(...)` - ``needs``: (default) equal to `Flow.needs(...)` - ``inspect``: (default) equal to `Flow.inspect(...)` An example V1 YAML config can be found below: .. highlight:: yaml .. code-block:: yaml !Flow version: '1.0' deployments: - name: executor0 # notice the change here, name is now an attribute method: add # by default method is always add, available: add, needs, inspect needs: gateway - name: executor1 # notice the change here, name is now an attribute method: add # by default method is always add, available: add, needs, inspect needs: gateway - method: inspect # add an inspect node on executor1 - method: needs # let's try something new in Flow YAML v1: needs needs: [executor1, executor0] """version='1'# the version number this parser designed for
[docs]defparse(self,cls:type,data:Dict)->'Flow':""" :param cls: the class registered for dumping/loading :param data: flow yaml file loaded as python dict :return: the Flow YAML parser given the syntax version number """p=data.get('with',{})# type: Dict[str, Any]a=p.pop('args')if'args'inpelse()k=p.pop('kwargs')if'kwargs'inpelse{}# maybe there are some hanging kwargs in "parameters"tmp_a=(expand_env_var(v)forvina)tmp_p={kk:expand_env_var(vv)forkk,vvin{**k,**p}.items()}obj=cls(*tmp_a,**tmp_p)pp=data.get('executors',data.get('deployments',[]))fordeploymentsinpp:p_deployment_attr={kk:expand_env_var(vv)forkk,vvindeployments.items()}# in v1 YAML, flow is an optional argumentifp_deployment_attr.get('name',None)!='gateway':# ignore gateway when reading, it will be added during build()method=p_deployment_attr.get('method','add')# support methods: add, needs, inspectgetattr(obj,method)(**p_deployment_attr,copy_flow=False)returnobj
[docs]defdump(self,data:'Flow')->Dict:""" :param data: versioned flow object :return: the dictionary given a versioned flow object """r={}ifdata._version:r['version']=data._version# to maintain order - version -> with -> executorsr['with']={}ifdata._kwargs:r['with'].update(data._kwargs)ifdata._common_kwargs:r['with'].update(data._common_kwargs)ifdata._deployment_nodes:r['executors']=[]last_name='gateway'fork,vindata._deployment_nodes.items():kwargs={}# only add "needs" when the value is not the last deployment nameiflist(v.needs)!=[last_name]:kwargs={'needs':list(v.needs)}# get nondefault kwargsparser=set_deployment_parser()ifv.role==DeploymentRoleType.GATEWAY:parser=set_gateway_parser()non_default_kw=ArgNamespace.get_non_defaults_args(v.args,parser)kwargs.update(non_default_kw)fortin_get_taboo(parser):iftinkwargs:kwargs.pop(t)ifk=='gateway':if'JINA_FULL_CLI'inos.environ:r['with'].update(kwargs)else:continueelse:last_name=kwargs['name']r['executors'].append(kwargs)returnr