Source code for jina.peapods.runtimes.container

import argparse
import os
import sys
import time
import warnings
from pathlib import Path

from ..zmq.base import ZMQRuntime
from ...zmq import Zmqlet
from ....helper import ArgNamespace, is_valid_local_config_source, slugify
from ....jaml.helper import complete_path

[docs]class ContainerRuntime(ZMQRuntime): """Runtime procedure for container.""" def __init__(self, args: 'argparse.Namespace'): super().__init__(args) self._set_network_for_dind_linux()
[docs] def setup(self): """Run the container.""" self._docker_run() while self._is_container_alive and not self.is_ready: time.sleep(1) # two cases to reach here: 1. is_ready, 2. container is dead if not self._is_container_alive: # replay it to see the log self._docker_run(replay=True) raise Exception('the container fails to start, check the arguments or entrypoint')
[docs] def teardown(self): """Stop the container.""" self._container.stop() super().teardown()
def _stream_logs(self): for line in self._container.logs(stream=True):
[docs] def run_forever(self): """Stream the logs while running.""" self._stream_logs()
def _set_network_for_dind_linux(self): import docker # recompute the control_addr, do not assign client, since this would be an expensive object to # copy in the new process generated client = docker.from_env() # Related to potential docker-in-docker communication. If `ContainerPea` lives already inside a container. # it will need to communicate using the `bridge` network. self._net_mode = None if sys.platform in ('linux', 'linux2'): self._net_mode = 'host' try: bridge_network = client.networks.get('bridge') if bridge_network: self.ctrl_addr, _ = Zmqlet.get_ctrl_address( bridge_network.attrs['IPAM']['Config'][0]['Gateway'], self.args.port_ctrl, self.args.ctrl_with_ipc) except Exception as ex: self.logger.warning(f'Unable to set control address from "bridge" network: {ex!r}' f' Control address set to {self.ctrl_addr}') client.close() def _docker_run(self, replay: bool = False): # important to notice, that client is not assigned as instance member to avoid potential # heavy copy into new process memory space import docker client = docker.from_env() if self.args.uses.startswith('docker://'): uses_img = self.args.uses.replace('docker://', '')'using image: {uses_img}') else: warnings.warn(f'you are using legacy image format {self.args.uses}, this may create some ambiguity. ' f'please use the new format: "--uses docker://{self.args.uses}"') uses_img = self.args.uses # the image arg should be ignored otherwise it keeps using ContainerPea in the container # basically all args in BasePea-docker arg group should be ignored. # this prevent setting containerPea twice from ....parsers import set_pea_parser non_defaults = ArgNamespace.get_non_defaults_args(self.args, set_pea_parser(), taboo={'uses', 'entrypoint', 'volumes', 'pull_latest', 'runtime_cls', 'docker_kwargs'}) if self.args.pull_latest: self.logger.warning(f'pulling {uses_img}, this could take a while. if you encounter ' f'timeout error due to pulling takes to long, then please set ' f'"timeout-ready" to a larger value.') client.images.pull(uses_img) _volumes = {} if self.args.uses_internal: full_path = None try: full_path = complete_path(self.args.uses_internal) except FileNotFoundError: pass if full_path and os.path.exists(full_path): # external YAML config, need to be volumed into the container # uses takes value from uses_internal non_defaults['uses'] = '/' + os.path.basename(full_path) _volumes[full_path] = {'bind': non_defaults['uses'], 'mode': 'ro'} elif not is_valid_local_config_source(self.args.uses_internal): raise FileNotFoundError( f'"uses_internal" {self.args.uses_internal} is not like a path, please check it') if self.args.volumes: for p in self.args.volumes: paths = p.split(':') local_path = paths[0] Path(os.path.abspath(local_path)).mkdir(parents=True, exist_ok=True) if len(paths) == 2: container_path = paths[1] else: container_path = '/' + os.path.basename(p) _volumes[os.path.abspath(local_path)] = {'bind': container_path, 'mode': 'rw'} _expose_port = [self.args.port_ctrl] if self.args.socket_in.is_bind: _expose_port.append(self.args.port_in) if self.args.socket_out.is_bind: _expose_port.append(self.args.port_out) _args = ArgNamespace.kwargs2list(non_defaults) ports = {f'{v}/tcp': v for v in _expose_port} if not self._net_mode else None docker_kwargs = self.args.docker_kwargs or {} self._container =, _args, detach=True, auto_remove=True, ports=ports, name=slugify(, volumes=_volumes, network_mode=self._net_mode, entrypoint=self.args.entrypoint, **docker_kwargs) if replay: # when replay is on, it means last time it fails to start # therefore we know the loop below wont block the main process self._stream_logs() client.close() @property def _is_container_alive(self) -> bool: import docker.errors try: self._container.reload() except docker.errors.NotFound: return False return True