importargparseimportasyncioimportmultiprocessingimportosimportsignalimportthreadingimporttimefromtypingimportTYPE_CHECKING,Dict,Optional,Unionfromjinaimport__docker_host__,__windows__fromjina.helperimportrandom_name,slugifyfromjina.importerimportImportExtensionsfromjina.logging.loggerimportJinaLoggerfromjina.orchestrate.podsimportBasePod,_get_workerfromjina.orchestrate.pods.container_helperimport(get_docker_network,get_gpu_device_requests,)fromjina.serve.runtimes.asyncioimportAsyncNewLoopRuntimeifTYPE_CHECKING:fromdocker.clientimportDockerClientdef_docker_run(client:'DockerClient',args:'argparse.Namespace',container_name:str,envs:Dict,net_mode:Optional[str],logger:'JinaLogger',):# important to notice, that client is not assigned as instance member to avoid potential# heavy copy into new process memory spaceimportwarningsimportdockerfromjina.exceptsimportBadImageNameError,DockerVersionErrordocker_version=client.version().get('Version')ifnotdocker_version:raiseDockerVersionError('docker version can not be resolved')docker_version=tuple(docker_version.split('.'))# docker daemon versions below 20.0x do not support "host.docker.internal:host-gateway"ifdocker_version<('20',):raiseDockerVersionError(f'docker version {".".join(docker_version)} is below 20.0.0 and does not 'f'support "host.docker.internal:host-gateway" : https://github.com/docker/cli/issues/2664')ifargs.uses.startswith('docker://'):uses_img=args.uses.replace('docker://','')logger.debug(f'will use Docker image: {uses_img}')else:warnings.warn(f'you are using legacy image format {args.uses}, this may create some ambiguity. 'f'please use the new format: "--uses docker://{args.uses}"')uses_img=args.uses# the image arg should be ignored otherwise it keeps using ContainerPod in the container# basically all args in Pod-docker arg group should be ignored.# this prevent setting containerPod twicefrompathlibimportPathfromjina.helperimportArgNamespacefromjina.parsersimportset_pod_parserargs.native=Truenon_defaults=ArgNamespace.get_non_defaults_args(args,set_pod_parser(),taboo={'uses','entrypoint','volumes','pull_latest','docker_kwargs','gpus',},)img_not_found=Falsetry:client.images.get(uses_img)exceptdocker.errors.ImageNotFound:logger.error(f'can not find local image: {uses_img}')img_not_found=Trueifargs.pull_latestorimg_not_found:logger.warning(f'pulling {uses_img}, this could take a while. if you encounter 'f'timeout error due to pulling takes to long, then please set 'f'"timeout-ready" to a larger value.')try:client.images.pull(uses_img)img_not_found=Falseexceptdocker.errors.NotFound:img_not_found=Truelogger.error(f'can not find remote image: {uses_img}')ifimg_not_found:raiseBadImageNameError(f'image: {uses_img} can not be found local & remote.')_volumes={}ifargs.volumes:forpinargs.volumes:paths=p.split(':')local_path=paths[0]Path(os.path.abspath(local_path)).mkdir(parents=True,exist_ok=True)iflen(paths)==2:container_path=paths[1]else:container_path='/'+os.path.basename(p)_volumes[os.path.abspath(local_path)]={'bind':container_path,'mode':'rw',}device_requests=[]ifargs.gpus:device_requests=get_gpu_device_requests(args.gpus)delargs.gpus_args=ArgNamespace.kwargs2list(non_defaults)ports={f'{args.port}/tcp':args.port}ifnotnet_modeelseNonedocker_kwargs=args.docker_kwargsor{}container=client.containers.run(uses_img,_args,detach=True,auto_remove=True,ports=ports,name=container_name,volumes=_volumes,network_mode=net_mode,entrypoint=args.entrypoint,extra_hosts={__docker_host__:'host-gateway'},device_requests=device_requests,environment=envs,**docker_kwargs,)returncontainer
[docs]defrun(args:'argparse.Namespace',name:str,container_name:str,net_mode:Optional[str],runtime_ctrl_address:str,envs:Dict,is_started:Union['multiprocessing.Event','threading.Event'],is_shutdown:Union['multiprocessing.Event','threading.Event'],is_ready:Union['multiprocessing.Event','threading.Event'],):"""Method to be run in a process that stream logs from a Container This method is the target for the Pod's `thread` or `process` .. note:: :meth:`run` is running in subprocess/thread, the exception can not be propagated to the main process. Hence, please do not raise any exception here. .. note:: Please note that env variables are process-specific. Subprocess inherits envs from the main process. But Subprocess's envs do NOT affect the main process. It does NOT mess up user local system envs. :param args: namespace args from the Pod :param name: name of the Pod to have proper logging :param container_name: name to set the Container to :param net_mode: The network mode where to run the container :param runtime_ctrl_address: The control address of the runtime in the container :param envs: Dictionary of environment variables to be set in the docker image :param is_started: concurrency event to communicate runtime is properly started. Used for better logging :param is_shutdown: concurrency event to communicate runtime is terminated :param is_ready: concurrency event to communicate runtime is ready to receive messages """importdockerlogger=JinaLogger(name,**vars(args))cancel=threading.Event()fail_to_start=threading.Event()ifnot__windows__:try:forsignamein{signal.SIGINT,signal.SIGTERM}:signal.signal(signame,lambda*args,**kwargs:cancel.set())except(ValueError,RuntimeError)asexc:logger.warning(f' The process starting the container for {name} will not be able to handle termination signals. 'f' {repr(exc)}')else:withImportExtensions(required=True,logger=logger,help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`. If you're using conda, please use the command `conda install -c anaconda pywin32`''',):importwin32apiwin32api.SetConsoleCtrlHandler(lambda*args,**kwargs:cancel.set(),True)client=docker.from_env()try:container=_docker_run(client=client,args=args,container_name=container_name,envs=envs,net_mode=net_mode,logger=logger,)client.close()def_is_ready():returnAsyncNewLoopRuntime.is_ready(runtime_ctrl_address)def_is_container_alive(container)->bool:importdocker.errorstry:container.reload()exceptdocker.errors.NotFound:returnFalsereturnTrueasyncdef_check_readiness(container):while(_is_container_alive(container)andnot_is_ready()andnotcancel.is_set()):awaitasyncio.sleep(0.1)if_is_container_alive(container):is_started.set()is_ready.set()else:fail_to_start.set()asyncdef_stream_starting_logs(container):forlineincontainer.logs(stream=True):if(notis_started.is_set()andnotfail_to_start.is_set()andnotcancel.is_set()):awaitasyncio.sleep(0.01)logger.info(line.strip().decode())asyncdef_run_async(container):awaitasyncio.gather(*[_check_readiness(container),_stream_starting_logs(container)])asyncio.run(_run_async(container))finally:client.close()ifnotis_started.is_set():logger.error(f' Process terminated, the container fails to start, check the arguments or entrypoint')is_shutdown.set()logger.debug(f' Process terminated')
[docs]classContainerPod(BasePod):""" :class:`ContainerPod` starts a runtime of :class:`BaseRuntime` inside a container. It leverages :class:`threading.Thread` or :class:`multiprocessing.Process` to manage the logs and the lifecycle of docker container object in a robust way. """def__init__(self,args:'argparse.Namespace'):super().__init__(args)if(self.args.docker_kwargsand'extra_hosts'inself.args.docker_kwargsand__docker_host__inself.args.docker_kwargs['extra_hosts']):self.args.docker_kwargs.pop('extra_hosts')self._net_mode=Noneself.worker=Noneself.container_name=slugify(f'{self.name}/{random_name()}')self.net_mode,self.runtime_ctrl_address=self._get_control_address()def_get_control_address(self):importdockerclient=docker.from_env()try:network=get_docker_network(client)if(self.args.docker_kwargsand'extra_hosts'inself.args.docker_kwargsand__docker_host__inself.args.docker_kwargs['extra_hosts']):ctrl_host=__docker_host__elifnetwork:# If the caller is already in a docker network, replace ctrl-host with network gatewaytry:ctrl_host=client.networks.get(network).attrs['IPAM']['Config'][0]['Gateway']except:ctrl_host=__docker_host__else:ctrl_host=self.args.hostctrl_address=f'{ctrl_host}:{self.args.port}'net_node,runtime_ctrl_address=self._get_network_for_dind_linux(client,ctrl_address)finally:client.close()returnnet_node,runtime_ctrl_addressdef_get_network_for_dind_linux(self,client:'DockerClient',ctrl_address:str):importsysfromplatformimportuname# Related to potential docker-in-docker communication. If `Runtime` lives already inside a container.# it will need to communicate using the `bridge` network.# In WSL, we need to set ports explicitlynet_mode,runtime_ctrl_address=None,ctrl_addressifsys.platformin('linux','linux2')and'microsoft'notinuname().release:net_mode='host'try:bridge_network=client.networks.get('bridge')ifbridge_network:runtime_ctrl_address=f'{bridge_network.attrs["IPAM"]["Config"][0]["Gateway"]}:{self.args.port}'exceptExceptionasex:self.logger.warning(f'Unable to set control address from "bridge" network: {ex!r}'f' Control address set to {runtime_ctrl_address}')returnnet_mode,runtime_ctrl_address@propertydef_container(self):importdockerclient=docker.from_env()container=Nonetry:container=client.containers.get(self.container_name)finally:client.close()returncontainer
[docs]defstart(self):"""Start the ContainerPod. This method calls :meth:`start` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. .. #noqa: DAR201 """self.worker=_get_worker(args=self.args,target=run,kwargs={'args':self.args,'name':self.name,'container_name':self.container_name,'net_mode':self.net_mode,'runtime_ctrl_address':self.runtime_ctrl_address,'envs':self._envs,'is_started':self.is_started,'is_shutdown':self.is_shutdown,'is_ready':self.is_ready,},)self.worker.start()ifnotself.args.noblock_on_start:self.wait_start_success()returnself
def_terminate(self):"""Terminate the Pod. This method calls :meth:`terminate` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. """# terminate the dockertry:self._container.kill(signal='SIGTERM')finally:self.is_shutdown.wait(self.args.timeout_ctrl)ifhasattr(self.worker,'terminate'):self.logger.debug(f'terminating the runtime process')self.worker.terminate()self.logger.debug(f' runtime process properly terminated')else:self.logger.debug(f'canceling the runtime thread')self.cancel_event.set()self.logger.debug(f'runtime thread properly canceled')
[docs]defjoin(self,*args,**kwargs):"""Joins the Pod. This method calls :meth:`join` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. :param args: extra positional arguments to pass to join :param kwargs: extra keyword arguments to pass to join """importdockerclient=docker.from_env()try:container_id=self._container.idcontainers=client.containers.list()whilecontainer_idincontainers:time.sleep(0.1)containers=client.containers.list()exceptdocker.errors.NotFound:passself.logger.debug(f' Joining the process')self.worker.join(*args,**kwargs)self.logger.debug(f' Successfully joined the process')