"""Module for helper functions for Hub API."""importhashlibimportioimportjsonimportosimportshelveimportsubprocessimportsysimporttarfileimporturllibimportwarningsimportzipfilefromfunctoolsimportlru_cache,wrapsfrompathlibimportPathfromtypingimportTuple,Optional,Dictfromurllib.parseimporturlparse,urljoinfromurllib.requestimportRequest,urlopenfromcontextlibimportnullcontextfromjinaimport__resources_path__fromjina.importerimportImportExtensionsfromjina.logging.predefinedimportdefault_logger@lru_cache()def_get_hub_root()->Path:hub_root=Path(os.environ.get('JINA_HUB_ROOT',Path.home().joinpath('.jina')))ifnothub_root.exists():hub_root.mkdir(parents=True,exist_ok=True)returnhub_root
[docs]@lru_cache()defget_hub_packages_dir()->Path:"""Get the path of folder where the hub packages are stored :return: the path of folder where the hub packages are stored """root=_get_hub_root()hub_packages=root.joinpath('hub-package')ifnothub_packages.exists():hub_packages.mkdir(parents=True,exist_ok=True)returnhub_packages
[docs]@lru_cache()defget_cache_db()->Path:"""Get the path of cache db of hub Executors :return: the path of cache db of hub Executors """root=_get_hub_root()cache_db=root.joinpath('disk_cache.db')returncache_db
[docs]@lru_cache()defget_download_cache_dir()->Path:"""Get the path of cache folder where the downloading cache is stored :return: the path of cache folder where the downloading cache is stored """root=_get_hub_root()cache_dir=Path(os.environ.get('JINA_HUB_CACHE_DIR',root.joinpath('.cache'),))ifnotcache_dir.exists():cache_dir.mkdir(parents=True,exist_ok=True)returncache_dir
@lru_cache()def_get_hubble_base_url()->str:"""Get base Hubble Url from api.jina.ai or os.environ :return: base Hubble Url """if'JINA_HUBBLE_REGISTRY'inos.environ:u=os.environ['JINA_HUBBLE_REGISTRY']else:try:req=Request('https://api.jina.ai/hub/hubble.json',headers={'User-Agent':'Mozilla/5.0'},)withurlopen(req)asresp:u=json.load(resp)['url']except:default_logger.critical('Can not fetch the Url of Hubble from `api.jina.ai`')raisereturnu
[docs]defparse_hub_uri(uri_path:str)->Tuple[str,str,str,str]:"""Parse the uri of the Jina Hub executor. :param uri_path: the uri of Jina Hub executor :return: a tuple of schema, id, tag, and secret """parser=urlparse(uri_path)scheme=parser.schemeifschemenotin{'jinahub','jinahub+docker','jinahub+sandbox'}:raiseValueError(f'{uri_path} is not a valid Hub URI.')items=list(parser.netloc.split(':'))name=items[0]ifnotname:raiseValueError(f'{uri_path} is not a valid Hub URI.')secret=items[1]iflen(items)>1elseNonetag=parser.path.strip('/')ifparser.pathelseNonereturnscheme,name,tag,secret
[docs]defis_valid_huburi(uri:str)->bool:"""Return True if it is a valid Hubble URI :param uri: the uri to test :return: True or False """try:parse_hub_uri(uri)returnTrueexcept:returnFalse
[docs]defmd5file(file_path:'Path')->str:"""Retrun the MD5 checksum of the file :param file_path: the file to check md5sum :return: the MD5 checksum """hash_md5=hashlib.md5()withfile_path.open(mode='rb')asfp:forchunkiniter(lambda:fp.read(128*hash_md5.block_size),b''):hash_md5.update(chunk)returnhash_md5.hexdigest()
[docs]defunpack_package(filepath:'Path',target_dir:'Path'):"""Unpack the file to the target_dir. :param filepath: the path of given file :param target_dir: the path of target folder """iffilepath.suffix=='.zip':withzipfile.ZipFile(filepath,'r')aszip:zip.extractall(target_dir)eliffilepath.suffixin['.tar','.gz']:withtarfile.open(filepath)astar:tar.extractall(target_dir)else:raiseValueError('File format is not supported for unpacking.')
[docs]defarchive_package(package_folder:'Path')->'io.BytesIO':""" Archives the given folder in zip format and return a data stream. :param package_folder: the folder path of the package :return: the data stream of zip content """withImportExtensions(required=True):importpathspecroot_path=package_folder.resolve()gitignore=root_path/'.gitignore'ifnotgitignore.exists():gitignore=Path(__resources_path__)/'Python.gitignore'withgitignore.open()asfp:ignore_lines=[line.strip()forlineinfpifline.strip()and(notline.startswith('#'))]ignore_lines+=['.git','.jina']ignored_spec=pathspec.PathSpec.from_lines('gitwildmatch',ignore_lines)zip_stream=io.BytesIO()try:zfile=zipfile.ZipFile(zip_stream,'w',compression=zipfile.ZIP_DEFLATED)exceptEnvironmentErrorase:raiseedef_zip(base_path,path,archive):forpinpath.iterdir():rel_path=p.relative_to(base_path)ifignored_spec.match_file(str(rel_path)):continueifp.is_dir():_zip(base_path,p,archive)else:archive.write(p,rel_path)_zip(root_path,root_path,zfile)zfile.close()zip_stream.seek(0)returnzip_stream
[docs]defdownload_with_resume(url:str,target_dir:'Path',filename:Optional[str]=None,md5sum:Optional[str]=None,)->'Path':""" Download file from url to target_dir, and check md5sum. Performs a HTTP(S) download that can be restarted if prematurely terminated. The HTTP server must support byte ranges. :param url: the URL to download :param target_dir: the target path for the file :param filename: the filename of the downloaded file :param md5sum: the MD5 checksum to match :return: the filepath of the downloaded file """withImportExtensions(required=True):importrequestsdef_download(url,target,resume_byte_pos:int=None):resume_header=({'Range':f'bytes={resume_byte_pos}-'}ifresume_byte_poselseNone)try:r=requests.get(url,stream=True,headers=resume_header)exceptrequests.exceptions.RequestExceptionase:raiseeblock_size=1024mode='ab'ifresume_byte_poselse'wb'withtarget.open(mode=mode)asf:forchunkinr.iter_content(32*block_size):f.write(chunk)iffilenameisNone:filename=url.split('/')[-1]filepath=target_dir/filenamehead_info=requests.head(url)file_size_online=int(head_info.headers.get('content-length',0))_resume_byte_pos=Noneiffilepath.exists():ifmd5sumandmd5file(filepath)==md5sum:returnfilepathfile_size_offline=filepath.stat().st_sizeiffile_size_online>file_size_offline:_resume_byte_pos=file_size_offline_download(url,filepath,_resume_byte_pos)ifmd5sumandnotmd5file(filepath)==md5sum:raiseRuntimeError('MD5 checksum failed.''Might happen when the network is unstable, please retry.''If still not work, feel free to raise an issue.''https://github.com/jina-ai/jina/issues/new')returnfilepath
[docs]defupload_file(url:str,file_name:str,buffer_data:bytes,dict_data:Dict,headers:Dict,stream:bool=False,method:str='post',):"""Upload file to target url :param url: target url :param file_name: the file name :param buffer_data: the data to upload :param dict_data: the dict-style data to upload :param headers: the request header :param stream: receive stream response :param method: the request method :return: the response of request """withImportExtensions(required=True):importrequestsdict_data.update({'file':(file_name,buffer_data)})(data,ctype)=requests.packages.urllib3.filepost.encode_multipart_formdata(dict_data)headers.update({'Content-Type':ctype})response=getattr(requests,method)(url,data=data,headers=headers,stream=stream)returnresponse
[docs]defdisk_cache_offline(cache_file:str='disk_cache.db',message:str='Calling {func_name} failed, using cached results',):""" Decorator which caches a function in disk and uses cache when a urllib.error.URLError exception is raised If the function was called with a kwarg force=True, then this decorator will always attempt to call it, otherwise, will always default to local cache. :param cache_file: the cache file :param message: the warning message shown when defaulting to cache. Use "{func_name}" if you want to print the function name :return: function decorator """defdecorator(func):@wraps(func)defwrapper(*args,**kwargs):call_hash=f'{func.__name__}({", ".join(map(str,args))})'pickle_protocol=4file_lock=nullcontext()withImportExtensions(required=False,help_text=f'FileLock is needed to guarantee non-concurrent access to the'f'cache_file {cache_file}',):importfilelockfile_lock=filelock.FileLock(f'{cache_file}.lock',timeout=-1)cache_db=Nonewithfile_lock:try:cache_db=shelve.open(cache_file,protocol=pickle_protocol,writeback=True)exceptException:ifos.path.exists(cache_file):# cache is in an unsupported format, reset the cacheos.remove(cache_file)cache_db=shelve.open(cache_file,protocol=pickle_protocol,writeback=True)ifcache_dbisNone:# if we failed to load cache, do not raise, it is only an optimization thingreturnfunc(*args,**kwargs),Falseelse:withcache_dbasdict_db:try:ifcall_hashindict_dbandnotkwargs.get('force',False):returndict_db[call_hash],Trueresult=func(*args,**kwargs)dict_db[call_hash]=resultexcepturllib.error.URLError:ifcall_hashindict_db:default_logger.warning(message.format(func_name=func.__name__))returndict_db[call_hash],Trueelse:raisereturnresult,Falsereturnwrapperreturndecorator
[docs]defis_requirements_installed(requirements_file:'Path',show_warning:bool=False)->bool:"""Return True if requirements.txt is installed locally :param requirements_file: the requirements.txt file :param show_warning: if to show a warning when a dependency is not satisfied :return: True or False if not satisfied """frompkg_resourcesimport(DistributionNotFound,VersionConflict,RequirementParseError,)importpkg_resourcesinstall_reqs,install_options=_get_install_options(requirements_file)iflen(install_reqs)==0:returnTruetry:pkg_resources.require('\n'.join(install_reqs))except(DistributionNotFound,VersionConflict,RequirementParseError)asex:ifshow_warning:warnings.warn(repr(ex))returnisinstance(ex,VersionConflict)returnTrue
[docs]definstall_requirements(requirements_file:'Path',timeout:int=1000):"""Install modules included in requirments file :param requirements_file: the requirements.txt file :param timeout: the socket timeout (default = 1000s) """ifis_requirements_installed(requirements_file):returninstall_reqs,install_options=_get_install_options(requirements_file)subprocess.check_call([sys.executable,'-m','pip','install','--compile',f'--default-timeout={timeout}',]+install_reqs+install_options)