Source code for jina.jaml.parsers.executor.legacy

import inspect
from functools import reduce
from typing import Dict, Type, Set

from ..base import VersionedYAMLParser
from ....executors import BaseExecutor
from ....executors.metas import get_default_metas

[docs]class LegacyParser(VersionedYAMLParser): """Legacy parser for executor.""" version = 'legacy' # the version number this parser designed for @staticmethod def _get_all_arguments(class_): """ :param class_: target class from which we want to retrieve arguments :return: all the arguments of all the classes from which `class_` inherits """ def get_class_arguments(class_): """ :param class_: the class to check :return: a list containing the arguments from `class_` """ signature = inspect.signature(class_.__init__) class_arguments = [ for p in signature.parameters.values()] return class_arguments def accumulate_classes(cls) -> Set[Type]: """ :param cls: the class to check :return: all classes from which cls inherits from """ def _accumulate_classes(c, cs): cs.append(c) if cls == object: return cs for base in c.__bases__: _accumulate_classes(base, cs) return cs classes = [] _accumulate_classes(cls, classes) return set(classes) all_classes = accumulate_classes(class_) args = list(map(lambda x: get_class_arguments(x), all_classes)) return set(reduce(lambda x, y: x + y, args))
[docs] def parse(self, cls: Type['BaseExecutor'], data: Dict) -> 'BaseExecutor': """ :param cls: target class type to parse into, must be a :class:`JAMLCompatible` type :param data: flow yaml file loaded as python dict :return: the Flow YAML parser given the syntax version number """ from ....logging.predefined import default_logger _meta_config = get_default_metas() _meta_config.update(data.get('metas', {})) if _meta_config: data['metas'] = _meta_config cls._init_from_yaml = True # tmp_p = {kk: expand_env_var(vv) for kk, vv in data.get('with', {}).items()} obj = cls( **data.get('with', {}), metas=data.get('metas', {}), requests=data.get('requests', {}), runtime_args=data.get('runtime_args', {}), ) cls._init_from_yaml = False # check if the yaml file used to instanciate 'cls' has arguments that are not in 'cls' arguments_from_cls = LegacyParser._get_all_arguments(cls) arguments_from_yaml = set(data.get('with', {})) difference_set = arguments_from_yaml - arguments_from_cls # only log warnings about unknown args for main Pea if any(difference_set) and not LegacyParser.is_tail_or_head(data): default_logger.warning( f'The given arguments {difference_set} are not defined in `{cls.__name__}.__init__`' ) if not _meta_config: default_logger.warning( '"metas" config is not found in this yaml file, ' 'this map is important as it provides an unique identifier when ' 'persisting the executor on disk.' ) # for compound executor if 'components' in data: obj.components = lambda: data['components'] obj.is_updated = False return obj
[docs] @staticmethod def is_tail_or_head(data: Dict) -> bool: """Based on name, compute if this is a tail/head Pea or a main Pea :param data: the data for the parser :return: True if it is tail/head, False otherwise """ try: name = data.get('runtime_args', {}).get('name', '') return 'head' in name or 'tail' in name except Exception as _: pass # name can be None in tests since it's not passed
[docs] def dump(self, data: 'BaseExecutor') -> Dict: """ :param data: versioned executor object :return: the dictionary given a versioned flow object """ # note: we only save non-default property for the sake of clarity _defaults = get_default_metas() p = ( { k: getattr(data.metas, k) for k, v in _defaults.items() if getattr(data.metas, k) != v } if hasattr(data, 'metas') else {} ) a = {k: v for k, v in data._init_kwargs_dict.items() if k not in _defaults} r = {} if a: r['with'] = a if p: r['metas'] = p if hasattr(data, 'requests'): r['requests'] = {k: v.__name__ for k, v in data.requests.items()} if hasattr(data, 'components'): r['components'] = data.components return r