importargparseimportmultiprocessingimportosimportthreadingimporttimefromabcimportABC,abstractmethodfromtypingimportDict,Optional,Type,Unionfromjinaimport__ready_msg__,__stop_msg__,__windows__fromjina.enumsimportPodRoleType,RuntimeBackendTypefromjina.exceptsimportRuntimeFailToStart,RuntimeRunForeverEarlyErrorfromjina.helperimporttypenamefromjina.jamlimportJAMLfromjina.logging.loggerimportJinaLoggerfromjina.orchestrate.pods.helperimportConditionalEvent,_get_event,_get_workerfromjina.serve.runtimes.asyncioimportAsyncNewLoopRuntime__all__=['BasePod','Pod']defrun(args:'argparse.Namespace',name:str,runtime_cls:Type[AsyncNewLoopRuntime],envs:Dict[str,str],is_started:Union['multiprocessing.Event','threading.Event'],is_shutdown:Union['multiprocessing.Event','threading.Event'],is_ready:Union['multiprocessing.Event','threading.Event'],cancel_event:Union['multiprocessing.Event','threading.Event'],jaml_classes:Optional[Dict]=None,):"""Method representing the :class:`BaseRuntime` activity. This method is the target for the Pod's `thread` or `process` .. note:: :meth:`run` is running in subprocess/thread, the exception can not be propagated to the main process. Hence, please do not raise any exception here. .. note:: Please note that env variables are process-specific. Subprocess inherits envs from the main process. But Subprocess's envs do NOT affect the main process. It does NOT mess up user local system envs. .. warning:: If you are using ``thread`` as backend, envs setting will likely be overidden by others .. note:: `jaml_classes` contains all the :class:`JAMLCompatible` classes registered in the main process. When using `spawn` as the multiprocessing start method, passing this argument to `run` method re-imports & re-registers all `JAMLCompatible` classes. :param args: namespace args from the Pod :param name: name of the Pod to have proper logging :param runtime_cls: the runtime class to instantiate :param envs: a dictionary of environment variables to be set in the new Process :param is_started: concurrency event to communicate runtime is properly started. Used for better logging :param is_shutdown: concurrency event to communicate runtime is terminated :param is_ready: concurrency event to communicate runtime is ready to receive messages :param cancel_event: concurrency event to receive cancelling signal from the Pod. Needed by some runtimes :param jaml_classes: all the `JAMLCompatible` classes imported in main process """logger=JinaLogger(name,**vars(args))def_unset_envs():ifenvsandargs.runtime_backend!=RuntimeBackendType.THREAD:forkinenvs.keys():os.environ.pop(k,None)def_set_envs():ifargs.env:ifargs.runtime_backend==RuntimeBackendType.THREAD:logger.warning('environment variables should not be set when runtime="thread".')else:os.environ.update({k:str(v)fork,vinenvs.items()})try:_set_envs()runtime=runtime_cls(args=args,cancel_event=cancel_event,)exceptExceptionasex:logger.error(f'{ex!r} during {runtime_cls!r} initialization'+f'\n add "--quiet-error" to suppress the exception details'ifnotargs.quiet_errorelse'',exc_info=notargs.quiet_error,)else:ifnotis_shutdown.is_set():is_started.set()withruntime:is_ready.set()runtime.run_forever()finally:_unset_envs()is_shutdown.set()logger.debug(f' Process terminated')
[docs]classBasePod(ABC):""" :class:`BasePod` is an interface from which all the classes managing the lifetime of a Runtime inside a local process, container or in a remote JinaD instance (to come) must inherit. It exposes the required APIs so that the `BasePod` can be handled by the `cli` api as a context manager or by a `Deployment`. What makes a BasePod a BasePod is that it manages the lifecycle of a Runtime (gateway or not gateway) """def__init__(self,args:'argparse.Namespace'):self.args=argsifhasattr(self.args,'port'):self.args.port=self.args.portself.args.parallel=self.args.shardsself.name=self.args.nameorself.__class__.__name__self.is_forked=Falseself.logger=JinaLogger(self.name,**vars(self.args))ifself.args.runtime_backend==RuntimeBackendType.THREAD:self.logger.warning(f' Using Thread as runtime backend is not recommended for production purposes. It is 'f'just supposed to be used for easier debugging. Besides the performance considerations, it is'f'specially dangerous to mix `Executors` running in different types of `RuntimeBackends`.')self._envs={'JINA_DEPLOYMENT_NAME':self.name}ifself.args.quiet:self._envs['JINA_LOG_CONFIG']='QUIET'ifself.args.env:self._envs.update(self.args.env)# arguments needed to create `runtime` and communicate with it in the `run` in the stack of the new process# or thread.ftest_worker={RuntimeBackendType.THREAD:threading.Thread,RuntimeBackendType.PROCESS:multiprocessing.Process,}.get(getattr(args,'runtime_backend',RuntimeBackendType.THREAD))()self.is_ready=_get_event(test_worker)self.is_shutdown=_get_event(test_worker)self.cancel_event=_get_event(test_worker)self.is_started=_get_event(test_worker)self.ready_or_shutdown=ConditionalEvent(getattr(args,'runtime_backend',RuntimeBackendType.THREAD),events_list=[self.is_ready,self.is_shutdown],)self.runtime_ctrl_address=self._get_control_address()self._timeout_ctrl=self.args.timeout_ctrldef_get_control_address(self):returnf'{self.args.host}:{self.args.port}'
[docs]defclose(self)->None:"""Close the Pod This method makes sure that the `Process/thread` is properly finished and its resources properly released """self.logger.debug('waiting for ready or shutdown signal from runtime')ifnotself.is_shutdown.is_set()andself.is_started.is_set():try:self.logger.debug(f'terminate')self._terminate()ifnotself.is_shutdown.wait(timeout=self._timeout_ctrlifnot__windows__else1.0):ifnot__windows__:raiseException(f'Shutdown signal was not received for {self._timeout_ctrl} seconds')else:self.logger.warning('Pod was forced to close after 1 second. Graceful closing is not available on Windows.')exceptExceptionasex:self.logger.error(f'{ex!r} during {self.close!r}'+f'\n add "--quiet-error" to suppress the exception details'ifnotself.args.quiet_errorelse'',exc_info=notself.args.quiet_error,)else:# here shutdown has been set already, therefore `run` will gracefully finishself.logger.debug(f'{"shutdown is is already set"ifself.is_shutdown.is_set()else"Runtime was never started"}. Runtime will end gracefully on its own')passself.is_shutdown.set()self.logger.debug(__stop_msg__)self.logger.close()
def__enter__(self):returnself.start()def__exit__(self,exc_type,exc_val,exc_tb):self.close()def_wait_for_ready_or_shutdown(self,timeout:Optional[float]):""" Waits for the process to be ready or to know it has failed. :param timeout: The time to wait before readiness or failure is determined .. # noqa: DAR201 """returnAsyncNewLoopRuntime.wait_for_ready_or_shutdown(timeout=timeout,ready_or_shutdown_event=self.ready_or_shutdown.event,ctrl_address=self.runtime_ctrl_address,timeout_ctrl=self._timeout_ctrl,)def_fail_start_timeout(self,timeout):""" Closes the Pod and raises a TimeoutError with the corresponding warning messages :param timeout: The time to wait before readiness or failure is determined .. # noqa: DAR201 """_timeout=timeoutor-1self.logger.warning(f'{self} timeout after waiting for {self.args.timeout_ready}ms, 'f'if your executor takes time to load, you may increase --timeout-ready')self.close()raiseTimeoutError(f'{typename(self)}:{self.name} can not be initialized after {_timeout*1e3}ms')def_check_failed_to_start(self):""" Raises a corresponding exception if failed to start """ifself.is_shutdown.is_set():# return too early and the shutdown is set, means something fails!!ifnotself.is_started.is_set():raiseRuntimeFailToStartelse:raiseRuntimeRunForeverEarlyError
[docs]defwait_start_success(self):"""Block until all pods starts successfully. If not success, it will raise an error hoping the outer function to catch it """_timeout=self.args.timeout_readyif_timeout<=0:_timeout=Noneelse:_timeout/=1e3ifself._wait_for_ready_or_shutdown(_timeout):self._check_failed_to_start()self.logger.debug(__ready_msg__)else:self._fail_start_timeout(_timeout)
[docs]asyncdefasync_wait_start_success(self):""" Wait for the `Pod` to start successfully in a non-blocking manner """importasyncio_timeout=self.args.timeout_readyif_timeout<=0:_timeout=Noneelse:_timeout/=1e3timeout_ns=1e9*_timeoutif_timeoutelseNonenow=time.time_ns()whiletimeout_nsisNoneortime.time_ns()-now<timeout_ns:ifself.ready_or_shutdown.event.is_set():self._check_failed_to_start()self.logger.debug(__ready_msg__)returnelse:awaitasyncio.sleep(0.1)self._fail_start_timeout(_timeout)
@propertydefrole(self)->'PodRoleType':"""Get the role of this pod in a deployment .. #noqa: DAR201"""returnself.args.pod_role
[docs]@abstractmethoddefstart(self):"""Start the BasePod. This method calls :meth:`start` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. .. #noqa: DAR201 """...
@abstractmethoddef_terminate(self):...
[docs]@abstractmethoddefjoin(self,*args,**kwargs):"""Joins the BasePod. Wait for the BasePod to properly terminate :param args: extra positional arguments :param kwargs: extra keyword arguments """...
[docs]classPod(BasePod):""" :class:`Pod` is a thread/process- container of :class:`BaseRuntime`. It leverages :class:`threading.Thread` or :class:`multiprocessing.Process` to manage the lifecycle of a :class:`BaseRuntime` object in a robust way. A :class:`Pod` must be equipped with a proper :class:`Runtime` class to work. """def__init__(self,args:'argparse.Namespace'):super().__init__(args)self.runtime_cls=self._get_runtime_cls()self.worker=_get_worker(args=args,target=run,kwargs={'args':args,'name':self.name,'envs':self._envs,'is_started':self.is_started,'is_shutdown':self.is_shutdown,'is_ready':self.is_ready,# the cancel event is only necessary for threads, otherwise runtimes should create and use the asyncio event'cancel_event':self.cancel_eventifgetattr(args,'runtime_backend',RuntimeBackendType.THREAD)==RuntimeBackendType.THREADelseNone,'runtime_cls':self.runtime_cls,'jaml_classes':JAML.registered_classes(),},name=self.name,)
[docs]defstart(self):"""Start the Pod. This method calls :meth:`start` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. .. #noqa: DAR201 """self.worker.start()self.is_forked=multiprocessing.get_start_method().lower()=='fork'ifnotself.args.noblock_on_start:self.wait_start_success()returnself
[docs]defjoin(self,*args,**kwargs):"""Joins the Pod. This method calls :meth:`join` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. :param args: extra positional arguments to pass to join :param kwargs: extra keyword arguments to pass to join """self.logger.debug(f' Joining the process')self.worker.join(*args,**kwargs)self.logger.debug(f' Successfully joined the process')
def_terminate(self):"""Terminate the Pod. This method calls :meth:`terminate` in :class:`threading.Thread` or :class:`multiprocesssing.Process`. """ifhasattr(self.worker,'terminate'):self.logger.debug(f'terminating the runtime process')self.worker.terminate()self.logger.debug(f' runtime process properly terminated')else:self.logger.debug(f'canceling the runtime thread')self.cancel_event.set()self.logger.debug(f'runtime thread properly canceled')def_get_runtime_cls(self)->AsyncNewLoopRuntime:fromjina.orchestrate.pods.helperimportupdate_runtime_clsfromjina.serve.runtimesimportget_runtimeupdate_runtime_cls(self.args)returnget_runtime(self.args.runtime_cls)