"""Module for helper functions for Hub API."""importhashlibimportioimportjsonimportosimportshelveimportsubprocessimportsysimporttarfileimporturllibimportwarningsimportzipfilefromcontextlibimportnullcontextfromfunctoolsimportlru_cache,wrapsfrompathlibimportPathfromtypingimportDict,Optional,Tuplefromurllib.parseimporturljoin,urlparsefromurllib.requestimportRequest,urlopenfromjinaimport__resources_path__fromjina.enumsimportBetterEnumfromjina.importerimportImportExtensionsfromjina.logging.predefinedimportdefault_logger@lru_cache()def_get_hub_root()->Path:hub_root=Path(os.environ.get('JINA_HUB_ROOT',Path.home().joinpath('.jina')))ifnothub_root.exists():hub_root.mkdir(parents=True,exist_ok=True)returnhub_root
[docs]@lru_cache()defget_hub_packages_dir()->Path:"""Get the path of folder where the hub packages are stored :return: the path of folder where the hub packages are stored """root=_get_hub_root()hub_packages=root.joinpath('hub-package')ifnothub_packages.exists():hub_packages.mkdir(parents=True,exist_ok=True)returnhub_packages
[docs]@lru_cache()defget_cache_db()->Path:"""Get the path of cache db of hub Executors :return: the path of cache db of hub Executors """root=_get_hub_root()cache_db=root.joinpath('disk_cache.db')returncache_db
[docs]@lru_cache()defget_download_cache_dir()->Path:"""Get the path of cache folder where the downloading cache is stored :return: the path of cache folder where the downloading cache is stored """root=_get_hub_root()cache_dir=Path(os.environ.get('JINA_HUB_CACHE_DIR',root.joinpath('.cache'),))ifnotcache_dir.exists():cache_dir.mkdir(parents=True,exist_ok=True)returncache_dir
@lru_cache()def_get_hubble_base_url()->str:"""Get base Hubble Url from api.jina.ai or os.environ :return: base Hubble Url """if'JINA_HUBBLE_REGISTRY'inos.environ:u=os.environ['JINA_HUBBLE_REGISTRY']else:try:req=Request('https://api.jina.ai/hub/hubble.json',headers={'User-Agent':'Mozilla/5.0'},)withurlopen(req)asresp:u=json.load(resp)['url']except:default_logger.critical('Can not fetch the Url of Hubble from `api.jina.ai`')raisereturnu
[docs]defparse_hub_uri(uri_path:str)->Tuple[str,str,str,str]:"""Parse the uri of the Jina Hub executor. :param uri_path: the uri of Jina Hub executor :return: a tuple of schema, id, tag, and secret """parser=urlparse(uri_path)scheme=parser.schemeifschemenotin{'jinahub','jinahub+docker','jinahub+sandbox'}:raiseValueError(f'{uri_path} is not a valid Hub URI.')items=list(parser.netloc.split(':'))name=items[0]ifnotname:raiseValueError(f'{uri_path} is not a valid Hub URI.')secret=items[1]iflen(items)>1elseNonetag=parser.path.strip('/')ifparser.pathelseNonereturnscheme,name,tag,secret
[docs]defis_valid_huburi(uri:str)->bool:"""Return True if it is a valid Hubble URI :param uri: the uri to test :return: True or False """try:parse_hub_uri(uri)returnTrueexcept:returnFalse
[docs]defmd5file(file_path:'Path')->str:"""Retrun the MD5 checksum of the file :param file_path: the file to check md5sum :return: the MD5 checksum """hash_md5=hashlib.md5()withfile_path.open(mode='rb')asfp:forchunkiniter(lambda:fp.read(128*hash_md5.block_size),b''):hash_md5.update(chunk)returnhash_md5.hexdigest()
[docs]defunpack_package(filepath:'Path',target_dir:'Path'):"""Unpack the file to the target_dir. :param filepath: the path of given file :param target_dir: the path of target folder """iffilepath.suffix=='.zip':withzipfile.ZipFile(filepath,'r')aszip:zip.extractall(target_dir)eliffilepath.suffixin['.tar','.gz']:withtarfile.open(filepath)astar:tar.extractall(target_dir)else:raiseValueError('File format is not supported for unpacking.')
[docs]defarchive_package(package_folder:'Path')->'io.BytesIO':""" Archives the given folder in zip format and return a data stream. :param package_folder: the folder path of the package :return: the data stream of zip content """withImportExtensions(required=True):importpathspecroot_path=package_folder.resolve()gitignore=root_path/'.gitignore'ifnotgitignore.exists():gitignore=Path(__resources_path__)/'Python.gitignore'withgitignore.open()asfp:ignore_lines=[line.strip()forlineinfpifline.strip()and(notline.startswith('#'))]ignore_lines+=['.git','.jina']ignored_spec=pathspec.PathSpec.from_lines('gitwildmatch',ignore_lines)zip_stream=io.BytesIO()try:zfile=zipfile.ZipFile(zip_stream,'w',compression=zipfile.ZIP_DEFLATED)exceptEnvironmentErrorase:raiseedef_zip(base_path,path,archive):forpinpath.iterdir():rel_path=p.relative_to(base_path)ifignored_spec.match_file(str(rel_path)):continueifp.is_dir():_zip(base_path,p,archive)else:archive.write(p,rel_path)_zip(root_path,root_path,zfile)zfile.close()zip_stream.seek(0)returnzip_stream
[docs]defdownload_with_resume(url:str,target_dir:'Path',filename:Optional[str]=None,md5sum:Optional[str]=None,)->'Path':""" Download file from url to target_dir, and check md5sum. Performs a HTTP(S) download that can be restarted if prematurely terminated. The HTTP server must support byte ranges. :param url: the URL to download :param target_dir: the target path for the file :param filename: the filename of the downloaded file :param md5sum: the MD5 checksum to match :return: the filepath of the downloaded file """withImportExtensions(required=True):importrequestsdef_download(url,target,resume_byte_pos:int=None):resume_header=({'Range':f'bytes={resume_byte_pos}-'}ifresume_byte_poselseNone)try:r=requests.get(url,stream=True,headers=resume_header)exceptrequests.exceptions.RequestExceptionase:raiseeblock_size=1024mode='ab'ifresume_byte_poselse'wb'withtarget.open(mode=mode)asf:forchunkinr.iter_content(32*block_size):f.write(chunk)iffilenameisNone:filename=url.split('/')[-1]filepath=target_dir/filenamehead_info=requests.head(url)file_size_online=int(head_info.headers.get('content-length',0))_resume_byte_pos=Noneiffilepath.exists():ifmd5sumandmd5file(filepath)==md5sum:returnfilepathfile_size_offline=filepath.stat().st_sizeiffile_size_online>file_size_offline:_resume_byte_pos=file_size_offline_download(url,filepath,_resume_byte_pos)ifmd5sumandnotmd5file(filepath)==md5sum:raiseRuntimeError('MD5 checksum failed.''Might happen when the network is unstable, please retry.''If still not work, feel free to raise an issue.''https://github.com/jina-ai/jina/issues/new')returnfilepath
[docs]defupload_file(url:str,file_name:str,buffer_data:bytes,dict_data:Dict,headers:Dict,stream:bool=False,method:str='post',):"""Upload file to target url :param url: target url :param file_name: the file name :param buffer_data: the data to upload :param dict_data: the dict-style data to upload :param headers: the request header :param stream: receive stream response :param method: the request method :return: the response of request """withImportExtensions(required=True):importrequestsdict_data.update({'file':(file_name,buffer_data)})(data,ctype)=requests.packages.urllib3.filepost.encode_multipart_formdata(dict_data)headers.update({'Content-Type':ctype})response=getattr(requests,method)(url,data=data,headers=headers,stream=stream)returnresponse
[docs]defdisk_cache_offline(cache_file:str='disk_cache.db',message:str='Calling {func_name} failed, using cached results',):""" Decorator which caches a function in disk and uses cache when a urllib.error.URLError exception is raised If the function was called with a kwarg force=True, then this decorator will always attempt to call it, otherwise, will always default to local cache. :param cache_file: the cache file :param message: the warning message shown when defaulting to cache. Use "{func_name}" if you want to print the function name :return: function decorator """defdecorator(func):@wraps(func)defwrapper(*args,**kwargs):call_hash=f'{func.__name__}({", ".join(map(str,args))})'pickle_protocol=4file_lock=nullcontext()withImportExtensions(required=False,help_text=f'FileLock is needed to guarantee non-concurrent access to the'f'cache_file {cache_file}',):importfilelockfile_lock=filelock.FileLock(f'{cache_file}.lock',timeout=-1)cache_db=Nonewithfile_lock:try:cache_db=shelve.open(cache_file,protocol=pickle_protocol,writeback=True)exceptException:ifos.path.exists(cache_file):# cache is in an unsupported format, reset the cacheos.remove(cache_file)cache_db=shelve.open(cache_file,protocol=pickle_protocol,writeback=True)ifcache_dbisNone:# if we failed to load cache, do not raise, it is only an optimization thingreturnfunc(*args,**kwargs),Falseelse:withcache_dbasdict_db:try:ifcall_hashindict_dbandnotkwargs.get('force',False):returndict_db[call_hash],Trueresult=func(*args,**kwargs)dict_db[call_hash]=resultexcepturllib.error.URLError:ifcall_hashindict_db:default_logger.warning(message.format(func_name=func.__name__))returndict_db[call_hash],Trueelse:raisereturnresult,Falsereturnwrapperreturndecorator
[docs]defis_requirements_installed(requirements_file:'Path',show_warning:bool=False)->bool:"""Return True if requirements.txt is installed locally :param requirements_file: the requirements.txt file :param show_warning: if to show a warning when a dependency is not satisfied :return: True or False if not satisfied """importpkg_resourcesfrompkg_resourcesimport(DistributionNotFound,RequirementParseError,VersionConflict,)install_reqs,install_options=_get_install_options(requirements_file)iflen(install_reqs)==0:returnTruetry:pkg_resources.require('\n'.join(install_reqs))except(DistributionNotFound,VersionConflict,RequirementParseError)asex:ifshow_warning:warnings.warn(repr(ex))returnisinstance(ex,VersionConflict)returnTrue
[docs]definstall_requirements(requirements_file:'Path',timeout:int=1000):"""Install modules included in requirments file :param requirements_file: the requirements.txt file :param timeout: the socket timeout (default = 1000s) """ifis_requirements_installed(requirements_file):returninstall_reqs,install_options=_get_install_options(requirements_file)subprocess.check_call([sys.executable,'-m','pip','install','--compile',f'--default-timeout={timeout}',]+install_reqs+install_options)
[docs]classHubbleReturnStatus(BetterEnum):""" Type of hubble return status enum """UNKNOWN_ERROR=-1OK=20000PARAM_VALIDATION_ERROR=40001SQL_CREATION_ERROR=40002DATA_STREAM_BROKEN_ERROR=40003UNEXPECTED_MIME_TYPE_ERROR=40004SSO_LOGIN_REQUIRED=40101AUTHENTICATION_FAILED=40102AUTHENTICATION_REQUIRED=40103OPERATION_NOT_ALLOWED=40301INTERNAL_RESOURCE_NOT_FOUND=40401RPC_METHOD_NOT_FOUND=40402REQUESTED_ENTITY_NOT_FOUND=40403INTERNAL_RESOURCE_METHOD_NOT_ALLOWED=40501INCOMPATIBLE_METHOD_ERROR=40502INTERNAL_RESOURCE_ID_CONFLICT=40901RESOURCE_POLICY_DENY=40902TOO_LARGE_FILE=41301INTERNAL_DATA_CORRUPTION=42201IDENTIFIER_NAMESPACE_OCCUPIED=42202SUBMITTED_DATA_MALFORMED=42203EXTERNAL_SERVICE_FAILURE=42204DOWNSTREAM_SERVICE_FAILURE=42205SERVER_INTERNAL_ERROR=50001DOWNSTREAM_SERVICE_ERROR=50002SERVER_SUBPROCESS_ERROR=50003SANDBOX_BUILD_NOT_FOUND=50004NOT_IMPLEMENTED_ERROR=50005RESPONSE_STREAM_CLOSED=50006
[docs]classNormalizerErrorCode(BetterEnum):""" Type of executor-normalizer error code enum """ExecutorNotFound=4000ExecutorExists=4001IllegalExecutor=4002BrokenDependency=4003Others=5000
[docs]defget_hubble_error_message(hubble_structured_error:dict)->Tuple[str,str]:"""Override some of the hubble error messages to provide better user experience :param hubble_structured_error: the hubble structured error response :returns: Tuple of overridden_msg and original_msg """msg=hubble_structured_error.get('readableMessage','')orhubble_structured_error.get('message','')status=hubble_structured_error.get('status',None)original_msg=msgifnotstatus:return(msg,msg)if(status==HubbleReturnStatus.SERVER_SUBPROCESS_ERRORandhubble_structured_error.get('cmd','')=='docker'):msg='''Failed on building Docker image. Potential solutions: - If you haven't provide a Dockerfile in the executor bundle, you may want to provide one, as the auto-generated one on the cloud did not work. - If you have provided a Dockerfile, you may want to check the validity of this Dockerfile.'''elif(status==HubbleReturnStatus.DOWNSTREAM_SERVICE_FAILUREandhubble_structured_error.get('service','')=='normalizer'):normalizer_error=hubble_structured_error.get('err','')if(isinstance(normalizer_error,dict)andnormalizer_error.get('code',None)==NormalizerErrorCode.ExecutorNotFound):msg='''We can not discover any Executor in your uploaded bundle. This is often due to one of the following errors: - The bundle did not contain any valid executor. - The config.yml's `jtype` is mismatched with the actual Executor class name. For more information about the expected bundle structure, please refer to the documentation. https://docs.jina.ai/fundamentals/executor/repository-structure/'''msg+='''For more detailed information, you can try the `executor-normalizer` locally to see the root cause. https://github.com/jina-ai/executor-normalizer'''return(msg,original_msg)