[docs]classclassproperty:"""Helper class to read property inside a classmethod"""def__init__(self,fget):self.fget=fgetdef__get__(self,owner_self,owner_cls):returnself.fget(owner_cls)
[docs]defid_cleaner(docker_id:str,prefix:str='sha256:')->str:"""Get 1st 10 characters in id created by docker :param docker_id: id of docker object :param prefix: defaults to 'sha256:' :return: shorter id """returndocker_id[docker_id.startswith(prefix)andlen(prefix):][:10]
[docs]defget_workspace_path(workspace_id:'DaemonID',*args):"""get the path to the ws :param workspace_id: the id of the ws :param args: paths to join :return: the full path """from.import__root_workspace__returnos.path.join(__root_workspace__,workspace_id,*[str(a)forainargs])
[docs]defis_error_message(s)->bool:""" Check if the string matches an exception regex. :param s: the string to check :return: whether or not it matches """returnre.search(EXCEPTS_REGEX,s,re.IGNORECASE|re.UNICODE)isnotNone
[docs]defget_log_file_path(log_id:'DaemonID')->Tuple[str,'DaemonID']:"""Get logfile path from id :param log_id: DaemonID in the store :return: logfile path, workspace_id for log_id """from.models.enumsimportIDLiteralsfrom.storesimportget_store_from_idiflog_id.jtype==IDLiterals.JWORKSPACE:workspace_id=log_idfilepath=get_workspace_path(log_id,'logging.log')else:workspace_id=get_store_from_id(log_id)[log_id].workspace_idfilepath=get_workspace_path(workspace_id,'logs',log_id,'logging.log')returnfilepath,workspace_id
[docs]defif_alive(func:Callable,raise_type:Exception=None):"""Decorator to be used in store for connection valiation :param func: function to be wrapped :param raise_type: Exception class to be raied :return: wrapped function """asyncdefwrapper(self,*args,**kwargs):try:returnawaitfunc(self,*args,**kwargs)exceptaiohttp.ClientConnectionErrorase:self._logger.error(f'connection to server failed: {e!r}')ifraise_typeandisinstance(raise_type,Exception):raiseraise_type(f'connection to server failed during {func.__name__} for {self._kind.title()}')returnwrapper
[docs]@contextmanagerdefchange_cwd(path):""" Change the current working dir to ``path`` in a context and set it back to the original one when leaves the context. Yields nothing :param path: Target path. :yields: nothing """curdir=os.getcwd()os.chdir(path)try:yieldfinally:os.chdir(curdir)
[docs]@contextmanagerdefchange_env(key,val):""" Change the environment of ``key`` to ``val`` in a context and set it back to the original one when leaves the context. :param key: Old environment variable. :param val: New environment variable. :yields: nothing """old_var=os.environ.get(key,None)os.environ[key]=valtry:yieldfinally:ifold_var:os.environ[key]=old_varelse:os.environ.pop(key)