Source code for jina.orchestrate.flow.builder

from functools import wraps
from typing import TYPE_CHECKING, List

from jina.excepts import FlowBuildLevelError

# noinspection PyUnreachableCode
if TYPE_CHECKING:
    from jina.enums import FlowBuildLevel
    from jina.orchestrate.flow.base import Flow


[docs]def allowed_levels(levels: List['FlowBuildLevel']): """Annotate a function so that it requires certain build level to run. Example: .. highlight:: python .. code-block:: python @build_required(FlowBuildLevel.RUNTIME) def foo(): print(1) :param levels: required build level to run this function. :return: annotated function """ def __build_level(func): @wraps(func) def arg_wrapper(self, *args, **kwargs): if hasattr(self, '_build_level'): if self._build_level in levels: return func(self, *args, **kwargs) else: raise FlowBuildLevelError( f'build_level check failed for {func!r}, required level: {levels}, actual level: {self._build_level}' ) else: raise AttributeError(f'{self!r} has no attribute "_build_level"') return arg_wrapper return __build_level
def _hanging_deployments(op_flow: 'Flow') -> List[str]: """ :param op_flow: the Flow we're operating on :return: names of floating Deployments (nobody recv from them) in the Flow. """ all_needs = {k for p, v in op_flow for k in v.needs} all_names = {p for p, v in op_flow if not v.args.floating} # all_names is always a superset of all_needs return list(all_names.difference(all_needs))