"""Module for wrapping Jina Hub API calls."""
import argparse
import glob
import json
import time
import urllib.parse
import urllib.request
import webbrowser
from typing import Dict, Any, List

from .checker import *
from .helper import credentials_file
from .hubapi.local import _list_local, _load_local_hub_manifest
from .hubapi.remote import _list, _register_to_mongodb, _fetch_docker_auth
from .. import __version__ as jina_version
from ..enums import BuildTestLevel
from ..excepts import (
from ..executors import BaseExecutor
from ..flow import Flow
from ..helper import (
from ..importer import ImportExtensions
from ..logging import JinaLogger
from ..logging.profile import TimeContext, ProgressBar
from ..parsers import set_pod_parser
from ..peapods import Pod

_allowed = {

_label_prefix = 'ai.jina.hub.'

[docs]class HubIO: """:class:`HubIO` provides the way to interact with Jina Hub registry. You can use it with CLI to package a directory into a Jina Hub image and publish it to the world. Examples: - :command:`jina hub build my_pod/` build the image - :command:`jina hub build my_pod/ --push` build the image and push to the public registry - :command:`jina hub pull jinahub/pod.dummy_mwu_encoder:0.0.6` to download the image """ def __init__(self, args: 'argparse.Namespace'): """Create a new HubIO. :param args: arguments """ self.logger = JinaLogger(self.__class__.__name__, **vars(args)) self.args = args self._load_docker_client() def _load_docker_client(self): with ImportExtensions( required=False, help_text='missing "docker" dependency, available CLIs limited to "jina hub [list, new]"' 'to enable full CLI, please do pip install "jina[docker]"', ): import docker from docker import APIClient, DockerClient self._client: DockerClient = docker.from_env() # low-level client self._raw_client = APIClient(base_url='unix://var/run/docker.sock')
[docs] def new(self, no_input: bool = False) -> None: """ Create a new executor using cookiecutter template. :param no_input: Argument to avoid prompting dialogue (just to be used for testing) """ with ImportExtensions(required=True): from cookiecutter.main import cookiecutter import click # part of cookiecutter cookiecutter_template = self.args.template if self.args.type == 'app': cookiecutter_template = '' elif self.args.type == 'pod': cookiecutter_template = ( '' ) try: cookiecutter( template=cookiecutter_template, overwrite_if_exists=self.args.overwrite, output_dir=self.args.output_dir, no_input=no_input, ) except click.exceptions.Abort:'nothing is created, bye!')
[docs] def login(self) -> None: """Login using Github Device flow to allow push access to Jina Hub Registry.""" import requests with resource_stream('jina', '/'.join(('resources', 'hubapi.yml'))) as fp: hubapi_yml = JAML.load(fp) client_id = hubapi_yml['github']['client_id'] scope = hubapi_yml['github']['scope'] device_code_url = hubapi_yml['github']['device_code_url'] access_token_url = hubapi_yml['github']['access_token_url'] grant_type = hubapi_yml['github']['grant_type'] login_max_retry = hubapi_yml['github']['login_max_retry'] headers = {'Accept': 'application/json'} code_request_body = {'client_id': client_id, 'scope': scope} try: 'Jina Hub login will use Github Device to generate one time token' ) response = url=device_code_url, headers=headers, data=code_request_body ) if response.status_code != self.logger.error( 'cannot reach github server. please make sure you\'re connected to internet' ) code_response = response.json() device_code = code_response['device_code'] user_code = code_response['user_code'] verification_uri = code_response['verification_uri'] try: f'You should see a "Device Activation" page open in your browser. ' f'If not, please go to {colored(verification_uri, "cyan", attrs=["underline"])}' ) 'Please follow the steps:\n\n' f'1. Enter the following code to that page: {colored(user_code, "cyan", attrs=["bold"])}\n' '2. Click "Continue"\n' '3. Come back to this terminal\n' ) # allowing sometime for the user to view the message time.sleep(1), new=2) except: pass # intentional pass, browser support isn't cross-platform access_request_body = { 'client_id': client_id, 'device_code': device_code, 'grant_type': grant_type, } for _ in range(login_max_retry): access_token_response = url=access_token_url, headers=headers, data=access_request_body ).json() if access_token_response.get('error', None) == 'authorization_pending': self.logger.warning('still waiting for authorization') countdown( 10, reason=colored( 're-fetch access token', 'cyan', attrs=['bold', 'reverse'] ), ) elif 'access_token' in access_token_response: token = {'access_token': access_token_response['access_token']} with open(credentials_file(), 'w') as cf: JAML.dump(token, cf) self.logger.success(f'✅ Successfully logged in!') break else: self.logger.error(f'❌ Max retries {login_max_retry} reached') except KeyError as exp: self.logger.error(f'❌ Can not read the key in response: {exp}')
[docs] def list(self) -> Optional[List[Dict[str, Any]]]: """List all hub images given a filter specified by CLI. :return: list of dictionaries of images """ if self.args.local_only: return _list_local(self.logger) else: return _list( logger=self.logger,, image_kind=self.args.kind, image_type=self.args.type, image_keywords=self.args.keywords, )
[docs] def push( self, name: Optional[str] = None, build_result: Optional[Dict] = None, ) -> None: """Push image to Jina Hub. :param name: name of image :param build_result: dictionary containing the build summary :return: None """ name = name or try: # check if image exists # fail if it does if ( self.args.no_overwrite and build_result and self._image_version_exists( build_result['manifest_info']['name'], build_result['manifest_info']['version'], jina_version, ) ): raise ImageAlreadyExists( f'Image with name {name} already exists. Will NOT overwrite.' ) else: self.logger.debug( f'Image with name {name} does not exist. Pushing now...' ) self._push_docker_hub(name) if not build_result: file_path = get_summary_path(name) if os.path.isfile(file_path): with open(file_path) as f: build_result = json.load(f) else: self.logger.error( f'can not find the build summary file.' f'please use "jina hub build" to build the image first ' f'before pushing.' ) return if build_result: if build_result.get('is_build_success', False): _register_to_mongodb(logger=self.logger, summary=build_result) if build_result.get('details', None) and build_result.get( 'build_history', None ): self._write_slack_message( build_result, build_result['details'], build_result['build_history'], ) except Exception as e: self.logger.error(f'Error when trying to push image {name}: {e!r}') if isinstance(e, (ImageAlreadyExists, HubLoginRequired)): raise e
def _push_docker_hub(self, name: Optional[str] = None) -> None: """Push to Docker Hub. :param name: name of image """ check_registry(self.args.registry, name, self.args.repository) self._check_docker_image(name) self._docker_login() with ProgressBar(task_name=f'pushing {name}', batch_unit='') as t: for line in self._client.images.push(name, stream=True, decode=True): t.update(1) self.logger.debug(line) self.logger.success(f'🎉 {name} is now published!') share_link = f'{urllib.parse.quote_plus(name)}' try:, new=2) except: # pass intentionally, dont want to bother users on opening browser failure pass finally: f'Check out the usage {colored(share_link, "cyan", attrs=["underline"])} and share it with others!' )
[docs] def pull(self) -> None: """Pull docker image.""" check_registry(self.args.registry,, self.args.repository) try: self._docker_login() with TimeContext(f'pulling {}', self.logger): image = self._client.images.pull( if isinstance(image, list): image = image[0] image_tag = image.tags[0] if image.tags else '' self.logger.success( f'🎉 pulled {image_tag} ({image.short_id}) uncompressed size: {get_readable_size(image.attrs["Size"])}' ) except Exception as ex: self.logger.error( f'can not pull image {} from {self.args.registry} due to {ex!r}' )
def _check_docker_image(self, name: str) -> None: # check local image image = self._client.images.get(name) for r in _allowed: if f'{_label_prefix}{r}' not in image.labels.keys(): self.logger.warning( f'{r} is missing in your docker image labels, you may want to check it' ) try: image.labels['ai.jina.hub.jina_version'] = jina_version label_info = ( f'{self.args.repository}/' + '{type}.{kind}.{name}:{version}-{jina_version}'.format( **{k.replace(_label_prefix, ''): v for k, v in image.labels.items()} ) ) safe_name = safe_url_name(label_info) if name != safe_name: raise ValueError( f'image {name} does not match with label info in the image. name should be {safe_name}' ) except KeyError as e: self.logger.error(f'missing key in the label of the image {repr(e)}') raise'✅ {name} is a valid Jina Hub image, ready to publish') def _docker_login(self) -> None: """Log-in to Docker.""" from docker.errors import APIError if not (self.args.username and self.args.password): self.args.username, self.args.password = _fetch_docker_auth( logger=self.logger ) try: self._client.login( username=self.args.username, password=self.args.password, registry=self.args.registry, ) self.logger.success(f'✅ Successfully logged in to docker hub') except APIError: raise HubLoginRequired( f'❌ Invalid docker credentials passed. docker login failed' )
[docs] def build(self) -> Dict: """ Perform a build of the Docker image. :return: dictionary with information on image (manifest) """ if self.args.dry_run: result = self.dry_run() else: is_build_success, is_push_success = True, False _logs = [] _except_strs = [] _excepts = [] with TimeContext( f'building {colored(self.args.path, "green")}', self.logger ) as tc: try: _check_result = self._check_completeness() self._freeze_jina_version() _dockerfile = os.path.basename(_check_result['Dockerfile']) _labels = { _label_prefix + k: str(v) for k, v in self.manifest.items() } streamer = decode=True, path=self.args.path, tag=self.tag, pull=self.args.pull, dockerfile=_dockerfile, labels=_labels, rm=True, ) for chunk in streamer: if 'stream' in chunk: for line in chunk['stream'].splitlines(): if is_error_message(line): self.logger.critical(line) _except_strs.append(line) elif 'warning' in line.lower(): self.logger.warning(line) else: _logs.append(line) except Exception as ex: # if pytest fails it should end up here as well self.logger.error(ex) is_build_success = False ex = HubBuilderBuildError(ex) _except_strs.append(repr(ex)) _excepts.append(ex) if is_build_success: # compile it again, but this time don't show the log image, log = path=self.args.path, tag=self.tag, pull=self.args.pull, dockerfile=_dockerfile, labels=_labels, rm=True, ) # success _details = { 'inspect': self._raw_client.inspect_image(image.tags[0]), 'tag': image.tags[0], 'hash': image.short_id, 'size': get_readable_size(image.attrs['Size']), } self.logger.success( '🎉 built {tag} ({hash}) uncompressed size: {size}'.format_map( _details ) ) else: self.logger.error(f'can not build the image due to {_except_strs}') _details = {} if is_build_success: if self.args.test_uses: p_names = [] try: is_build_success = False p_names, failed_test_levels = HubIO._test_build( image, self.args.test_level, self.config_yaml_path, self.args.timeout_ready, self.args.daemon, self.logger, ) if any( test_level in failed_test_levels for test_level in [ BuildTestLevel.POD_DOCKER, BuildTestLevel.FLOW, ] ): is_build_success = False self.logger.error( f'build unsuccessful, failed at {str(failed_test_levels)} level' ) else: is_build_success = True self.logger.warning( f'Build successful. Tests failed at : {str(failed_test_levels)} levels. ' f'This could be due to the fact that the executor has non-installed external dependencies' ) except Exception as ex: self.logger.error( f'something wrong while testing the build: {ex!r}' ) ex = HubBuilderTestError(ex) _except_strs.append(repr(ex)) _excepts.append(ex) finally: if self.args.daemon: try: for p in p_names: self._raw_client.stop(p) except: pass # suppress on purpose self._raw_client.prune_containers() info, env_info = get_full_version() _host_info = { 'jina': info, 'jina_envs': env_info, 'docker':, 'build_args': vars(self.args), } _build_history = { 'time': get_now_timestamp(), 'host_info': _host_info if is_build_success and self.args.host_info else '', 'duration': tc.readable_duration, 'logs': _logs, 'exception': _except_strs, } if self.args.prune_images:'deleting unused images') self._raw_client.prune_images() # since db tracks `version` & `jina_version` on the top level, let's get rid of them in `manifest` if is_build_success: _version = self.manifest['version'] self.manifest.pop('version', None) self.manifest.pop('jina_version', None) else: _version = '0.0.1' result = { 'name': self.executor_name if is_build_success else '', 'version': _version, 'jina_version': jina_version, 'path': self.args.path, 'manifest_info': self.manifest if is_build_success else '', 'details': _details, 'is_build_success': is_build_success, 'build_history': _build_history, } # only successful build (NOT dry run) writes the summary to disk if result['is_build_success']: self._write_summary_to_file(summary=result) if self.args.push: self.push(image.tags[0], result) if not result['is_build_success'] and self.args.raise_error: # remove the very verbose build log when throw error if 'build_history' in result: result['build_history'].pop('logs', None) raise HubBuilderError(_excepts) return result
@staticmethod def _test_build( image, # type docker image object test_level: 'BuildTestLevel', config_yaml_path: str, timeout_ready: int, daemon_arg: bool, logger: 'JinaLogger', ): p_names = [] failed_levels = []'run tests using test level {test_level}') # test uses at executor level if test_level >= BuildTestLevel.EXECUTOR: f'test to initialize an executor from yaml configuration: {config_yaml_path}' ) try: with BaseExecutor.load_config(config_yaml_path): pass'successfully tested to initialize an executor') except: logger.error(f'failed to initialize an executor') failed_levels.append(BuildTestLevel.EXECUTOR) # test uses at Pod level (no docker) if test_level >= BuildTestLevel.POD_NONDOCKER: f'test to initialize a pod from yaml configuration: {config_yaml_path}' ) try: with Pod( set_pod_parser().parse_args( [ '--uses', config_yaml_path, '--timeout-ready', str(timeout_ready), ] ) ): pass f'successfully tested to initialize a pod from yaml configuration' ) except: logger.error(f'failed to initialize a pod') failed_levels.append(BuildTestLevel.POD_NONDOCKER) # test uses at Pod level (with docker) if test_level >= BuildTestLevel.POD_DOCKER: p_name = random_name() f'test to initialize a pod via docker image {image.tags[0]} named {p_name}' ) try: with Pod( set_pod_parser().parse_args( [ '--uses', f'docker://{image.tags[0]}', '--name', p_name, '--timeout-ready', str(timeout_ready), ] + ['--daemon'] if daemon_arg else [] ) ): pass p_names.append(p_name)'successfully tested to initialize a pod via docker') except: logger.error(f'failed to initialize a pod via docker image') failed_levels.append(BuildTestLevel.POD_DOCKER) # test uses at Flow level if test_level >= BuildTestLevel.FLOW: p_name = random_name() f'test to build a flow from docker image {image.tags[0]} named {p_name} ' f'with daemon={daemon_arg} and timeout_ready={timeout_ready}' ) try: with Flow().add( name=p_name, uses=f'docker://{image.tags[0]}', daemon=daemon_arg, timeout_ready=timeout_ready, ): pass p_names.append(p_name)'successfully tested to build a flow from docker image') except: logger.error(f'failed to build a flow from docker image') failed_levels.append(BuildTestLevel.FLOW) return p_names, failed_levels
[docs] def dry_run(self) -> Dict: """ Perform a dry-run. :return: a dict with the manifest info. """ try: s = self._check_completeness() s['is_build_success'] = True except Exception as ex: s = {'is_build_success': False, 'exception': str(ex)} return s
def _write_summary_to_file(self, summary: Dict) -> None: file_path = get_summary_path(f'{summary["name"]}:{summary["version"]}') with open(file_path, 'w+') as f: json.dump(summary, f) self.logger.debug(f'stored the summary from build to {file_path}') def _freeze_jina_version(self) -> None: import pkg_resources requirements_path = get_exist_path(self.args.path, 'requirements.txt') if requirements_path and os.path.exists(requirements_path): new_requirements = [] update = False with open(requirements_path, 'r') as fp: requirements = pkg_resources.parse_requirements(fp) for req in requirements: if 'jina' in str(req): update = True'Freezing jina version to {jina_version}') new_requirements.append(f'jina=={jina_version}') else: new_requirements.append(str(req)) if update: with open(requirements_path, 'w') as fp: fp.write('\n'.join(new_requirements)) @staticmethod def _alias_to_local_path(alias: str): """ Convert user given alias to the actual local path of the image, if fails return the original :param alias: the name of the hub image, given by user :return: the local path of the hub image, if not matched then return the original input """ all_local_images = _load_local_hub_manifest() if alias in all_local_images: return all_local_images[alias]['source_path'] else: return alias @staticmethod def _alias_to_docker_image_name(alias: str): """ Convert user given alias to the actual image tag, if fails return the original :param alias: the name of the hub image, given by user :return: the actual image tag, if not matched then return the original input """ all_local_images = _load_local_hub_manifest() if alias in all_local_images: return all_local_images[alias]['image_tag'] else: return alias def _check_completeness(self) -> Dict: self.args.path = self._alias_to_local_path(self.args.path) dockerfile_path = get_exist_path(self.args.path, self.args.file) manifest_path = get_exist_path(self.args.path, 'manifest.yml') self.config_yaml_path = get_exist_path(self.args.path, 'config.yml') readme_path = get_exist_path(self.args.path, '') requirements_path = get_exist_path(self.args.path, 'requirements.txt') yaml_glob = set(glob.glob(os.path.join(self.args.path, '*.yml'))) yaml_glob.difference_update({manifest_path, self.config_yaml_path}) if not self.config_yaml_path: self.config_yaml_path = yaml_glob.pop() py_glob = glob.glob(os.path.join(self.args.path, '*.py')) test_glob = glob.glob(os.path.join(self.args.path, 'tests/test_*.py')) completeness = { 'Dockerfile': dockerfile_path, 'manifest.yml': manifest_path, 'config.yml': self.config_yaml_path, '': readme_path, 'requirements.txt': requirements_path, '*.yml': yaml_glob, '*.py': py_glob, 'tests': test_glob, } f'completeness check\n' + '\n'.join( f'{colored("✓", "green") if v else colored("✗", "red"):>4} {k:<20} {v}' for k, v in completeness.items() ) + '\n' ) if not (completeness['Dockerfile'] and completeness['manifest.yml']): self.logger.critical( 'Dockerfile or manifest.yml is not given, can not build' ) raise FileNotFoundError( 'Dockerfile or manifest.yml is not given, can not build' ) self.manifest = self._read_manifest(manifest_path) self.manifest['jina_version'] = jina_version self.executor_name = safe_url_name( f'{self.args.repository}/' + f'{self.manifest["type"]}.{self.manifest["kind"]}.{self.manifest["name"]}' ) self.tag = self.executor_name + f':{self.manifest["version"]}-{jina_version}' return completeness def _read_manifest(self, path: str, validate: bool = True) -> Dict: with resource_stream( 'jina', '/'.join(('resources', 'hub-builder', 'manifest.yml')) ) as fp: tmp = JAML.load( fp ) # do not expand variables at here, i.e. DO NOT USE expand_dict(yaml.load(fp)) with open(path) as fp: tmp.update(JAML.load(fp)) if validate: self._validate_manifest(tmp) return tmp def _validate_manifest(self, manifest: Dict) -> None: required = {'name', 'type', 'version'} # check the required field in manifest for r in required: if r not in manifest: raise ValueError(f'{r} is missing in the manifest.yaml, it is required') # check if all fields are there for r in _allowed: if r not in manifest: self.logger.warning( f'{r} is missing in your manifest.yml, you may want to check it' ) # check name check_name(manifest['name']) # check_image_type check_image_type(manifest['type']) # check version number check_version(manifest['version']) # check version number check_license(manifest['license']) # check platform if not isinstance(manifest['platform'], list): manifest['platform'] = list(manifest['platform']) check_platform(manifest['platform']) # replace all chars in value to safe chars for k, v in manifest.items(): if v and isinstance(v, str): manifest[k] = remove_control_characters(v) # show manifest key-values for k, v in manifest.items(): self.logger.debug(f'{k}: {v}') def _write_slack_message(self, *args): def _expand_fn(v): if isinstance(v, str): for d in args: try: v = v.format(**d) except KeyError: pass return v if 'JINAHUB_SLACK_WEBHOOK' in os.environ: with resource_stream( 'jina', '/'.join(('resources', 'hub-builder-success', 'slack-jinahub.json')), ) as fp: tmp = expand_dict(json.load(fp), _expand_fn, resolve_cycle_ref=False) req = urllib.request.Request(os.environ['JINAHUB_SLACK_WEBHOOK']) req.add_header('Content-Type', 'application/json; charset=utf-8') jdb = json.dumps(tmp).encode('utf-8') # needs to be bytes req.add_header('Content-Length', str(len(jdb))) with urllib.request.urlopen(req, jdb) as f: res ='push to Slack: {res}') # alias of "new" in cli create = new init = new def _image_version_exists(self, name, module_version, req_jina_version): manifests = _list(self.logger, name) # check if matching module version and jina version exists if manifests: matching = [ m for m in manifests if m['version'] == module_version and 'jina_version' in m.keys() and m['jina_version'] == req_jina_version ] return len(matching) > 0 return False