[docs]classLegacyParser(VersionedYAMLParser):"""Legacy parser for executor."""version='legacy'# the version number this parser designed for@staticmethoddef_get_all_arguments(class_):""" :param class_: target class from which we want to retrieve arguments :return: all the arguments of all the classes from which `class_` inherits """defget_class_arguments(class_):""" :param class_: the class to check :return: a list containing the arguments from `class_` """signature=inspect.signature(class_.__init__)class_arguments=[p.nameforpinsignature.parameters.values()]returnclass_argumentsdefaccumulate_classes(cls)->Set[Type]:""" :param cls: the class to check :return: all classes from which cls inherits from """def_accumulate_classes(c,cs):cs.append(c)ifcls==object:returncsforbaseinc.__bases__:_accumulate_classes(base,cs)returncsclasses=[]_accumulate_classes(cls,classes)returnset(classes)all_classes=accumulate_classes(class_)args=list(map(lambdax:get_class_arguments(x),all_classes))returnset(reduce(lambdax,y:x+y,args))
[docs]defparse(self,cls:Type['BaseExecutor'],data:Dict,runtime_args:Optional[Dict[str,Any]]=None,)->'BaseExecutor':""" :param cls: target class type to parse into, must be a :class:`JAMLCompatible` type :param data: flow yaml file loaded as python dict :param runtime_args: Optional runtime_args to be directly passed without being parsed into a yaml config :return: the Flow YAML parser given the syntax version number """fromjina.logging.predefinedimportdefault_logger_meta_config=get_default_metas()_meta_config.update(data.get('metas',{}))if_meta_config:data['metas']=_meta_configcls._init_from_yaml=True# tmp_p = {kk: expand_env_var(vv) for kk, vv in data.get('with', {}).items()}obj=cls(**data.get('with',{}),metas=data.get('metas',{}),requests=data.get('requests',{}),runtime_args=runtime_args,)cls._init_from_yaml=False# check if the yaml file used to instanciate 'cls' has arguments that are not in 'cls'arguments_from_cls=LegacyParser._get_all_arguments(cls)arguments_from_yaml=set(data.get('with',{}))difference_set=arguments_from_yaml-arguments_from_cls# only log warnings about unknown args for main Podifany(difference_set)andnotLegacyParser.is_tail_or_head(data):default_logger.warning(f'The given arguments {difference_set} are not defined in `{cls.__name__}.__init__`')ifnot_meta_config:default_logger.warning('"metas" config is not found in this yaml file, ''this map is important as it provides an unique identifier when ''persisting the executor on disk.')# for compound executorif'components'indata:obj.components=lambda:data['components']obj.is_updated=Falsereturnobj
[docs]@staticmethoddefis_tail_or_head(data:Dict)->bool:"""Based on name, compute if this is a tail/head Pod or a main Pod :param data: the data for the parser :return: True if it is tail/head, False otherwise """try:name=data.get('runtime_args',{}).get('name','')return'head'innameor'tail'innameexceptExceptionas_:pass# name can be None in tests since it's not passed
[docs]defdump(self,data:'BaseExecutor')->Dict:""" :param data: versioned executor object :return: the dictionary given a versioned flow object """# note: we only save non-default property for the sake of clarity_defaults=get_default_metas()p=({k:getattr(data.metas,k)fork,vin_defaults.items()ifgetattr(data.metas,k)!=v}ifhasattr(data,'metas')else{})a={k:vfork,vindata._init_kwargs_dict.items()ifknotin_defaults}r={}ifa:r['with']=aifp:r['metas']=pifhasattr(data,'requests'):r['requests']={k:v.__name__fork,vindata.requests.items()}ifhasattr(data,'components'):r['components']=data.componentsreturnr