[docs]classJinaDProcessTarget:"""Target to be executed on JinaD Process"""def__call__(self,args:'argparse.Namespace',is_started:Union['multiprocessing.Event','threading.Event'],is_shutdown:Union['multiprocessing.Event','threading.Event'],is_ready:Union['multiprocessing.Event','threading.Event'],is_cancelled:Union['multiprocessing.Event','threading.Event'],envs:Optional[Dict]=None,):"""Method responsible to manage a remote Pod This method is the target for the Pod's `thread` or `process` .. note:: Please note that env variables are process-specific. Subprocess inherits envs from the main process. But Subprocess's envs do NOT affect the main process. It does NOT mess up user local system envs. :param args: namespace args from the Pod :param is_started: concurrency event to communicate runtime is properly started. Used for better logging :param is_shutdown: concurrency event to communicate runtime is terminated :param is_ready: concurrency event to communicate runtime is ready to receive messages :param is_cancelled: concurrency event to receive cancelling signal from the Pod. Needed by some runtimes :param envs: a dictionary of environment variables to be passed to remote Pod """self.args=argsself.envs=envsself.is_started=is_startedself.is_shutdown=is_shutdownself.is_ready=is_readyself.is_cancelled=is_cancelledself.pod_id=Noneself._logger=JinaLogger('RemotePod',**vars(args))run_async(self._run)asyncdef_run(self):"""Manage a remote Pod"""try:awaitself._create_remote_pod()exceptExceptionasex:self._logger.error(f'{ex!r} while starting a remote Pod'+f'\n add "--quiet-error" to suppress the exception details'ifnotself.args.quiet_errorelse'',exc_info=notself.args.quiet_error,)else:self.is_started.set()self.is_ready.set()awaitself._wait_until_cancelled()finally:awaitself._terminate_remote_pod()self.is_shutdown.set()self._logger.debug('JinaDProcessTarget terminated')asyncdef_create_remote_pod(self):"""Create Workspace, Pod on remote JinaD server"""withImportExtensions(required=True):# rich & aiohttp are used in `AsyncJinaDClient`importaiohttpimportrichfromdaemon.clientsimportAsyncJinaDClientassertrichassertaiohttp# NOTE: args.timeout_ready is always set to -1 for JinadRuntime so that wait_for_success doesn't fail in Pod,# so it can't be used for Client timeout.self.client=AsyncJinaDClient(host=self.args.host,port=self.args.port_jinad,logger=self._logger)ifnotawaitself.client.alive:raiseDaemonConnectivityError# Create a remote workspace with upload_filesworkspace_id=awaitself.client.workspaces.create(paths=self.filepaths,id=self.args.workspace_id,complete=True,)ifnotworkspace_id:self._logger.critical(f'remote workspace creation failed')raiseDaemonWorkspaceCreationFailedpayload=replace_enum_to_str(vars(self._mask_args()))# Create a remote Pod in the above workspacesuccess,response=awaitself.client.pods.create(workspace_id=workspace_id,payload=payload,envs=self.envs)ifnotsuccess:self._logger.critical(f'remote pod creation failed')raiseDaemonPodCreationFailed(response)else:self.pod_id=responseasyncdef_sleep_forever(self):"""Sleep forever, no prince will come."""awaitasyncio.sleep(1e10)asyncdef_wait_until_cancelled(self):whilenotself.is_cancelled.is_set():awaitasyncio.sleep(0.1)asyncdef_terminate_remote_pod(self):"""Removes the remote Pod"""ifself.pod_idisnotNone:ifawaitself.client.pods.delete(id=self.pod_id):self._logger.success(f'Successfully terminated remote Pod {self.pod_id}')# Don't delete workspace here, as other Executors might use them.# TODO(Deepankar): probably enable an arg here?@propertydeffilepaths(self)->List[Path]:"""Get file/directories to be uploaded to remote workspace :return: filepaths to be uploaded to remote """paths=set()ifnotself.args.upload_files:self._logger.warning(f'no files passed to upload to remote')else:forpathinself.args.upload_files:try:fullpath=Path(complete_path(path))paths.add(fullpath)exceptFileNotFoundError:self._logger.error(f'invalid path {path} passed')returnlist(paths)def_mask_args(self):cargs=copy.deepcopy(self.args)# TODO:/NOTE this prevents jumping from remote to another remote (Han: 2021.1.17)fromjinaimport__default_host__cargs.host=__default_host__cargs.log_config=''# do not use local log_configcargs.upload_files=[]# reset upload filescargs.noblock_on_start=False# wait until start successchanges=[]fork,vinvars(cargs).items():ifv!=getattr(self.args,k):changes.append(f'{k:>30s}: {str(getattr(self.args,k)):30s} -> {str(v):30s}')ifchanges:changes=['note the following arguments have been masked or altered for remote purpose:']+changesself._logger.debug('\n'.join(changes))returncargs
[docs]classJinaDPod(BasePod):"""Manages a remote Pod by handling a separate Process / Thread"""def__init__(self,args:'argparse.Namespace'):super().__init__(args)self.worker=_get_worker(args=args,target=JinaDProcessTarget(),kwargs={'args':args,'envs':self._envs,'is_started':self.is_started,'is_shutdown':self.is_shutdown,'is_ready':self.is_ready,'is_cancelled':self.cancel_event,},)def_wait_for_ready_or_shutdown(self,timeout:Optional[float]):""" Waits for the process to be ready or to know it has failed. :param timeout: The time to wait before readiness or failure is determined .. # noqa: DAR201 """is_ready_or_shutdown=self.wait_for_ready_or_shutdown(timeout=timeout,ready_or_shutdown_event=self.ready_or_shutdown.event,ctrl_address=self.runtime_ctrl_address,timeout_ctrl=self._timeout_ctrl,)ifis_ready_or_shutdown:is_ready_or_shutdown=is_ready(self.runtime_ctrl_address)returnis_ready_or_shutdown
[docs]@staticmethoddefwait_for_ready_or_shutdown(timeout:Optional[float],ready_or_shutdown_event:Union['multiprocessing.Event','threading.Event'],ctrl_address:str,**kwargs,):""" Check if the runtime has successfully started :param timeout: The time to wait before readiness or failure is determined :param ctrl_address: the address where the control message needs to be sent :param ready_or_shutdown_event: the multiprocessing event to detect if the process failed or is ready :param kwargs: extra keyword arguments :return: True if is ready or it needs to be shutdown """importtimetimeout_ns=1000000000*timeoutiftimeoutelseNonenow=time.time_ns()whiletimeout_nsisNoneortime.time_ns()-now<timeout_ns:# is_ready returns True is the Pod is actually created by JinaD# ready_or_shutdown_event is set after JinaDProcessTargetifready_or_shutdown_event.is_set():returnTruetime.sleep(0.1)returnFalse
[docs]defstart(self):"""Start the JinaD Process (to manage remote Pod). .. #noqa: DAR201 """self.worker.start()ifnotself.args.noblock_on_start:self.wait_start_success()returnself
[docs]defjoin(self,*args,**kwargs):"""Joins the Pod. This method calls :meth:`join` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. :param args: extra positional arguments to pass to join :param kwargs: extra keyword arguments to pass to join """self.logger.debug(f' Joining the JinaD process')self.worker.join(*args,**kwargs)self.logger.debug(f' Successfully joined the JinaD process')
def_terminate(self):"""Terminate the Pod. This method calls :meth:`terminate` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. """self.cancel_event.set()# Inform JinaD Process to stop streamingself.is_shutdown.wait()# Wait until JinaD terminates remote Pod and sets shutdown eventifhasattr(self.worker,'terminate'):self.logger.debug(f'terminating the JinaD Process')self.worker.terminate()self.logger.debug(f'JinaD Process properly terminated')