[docs]classAsyncNewLoopRuntime(BaseRuntime,MonitoringMixin,ABC):""" The async runtime to start a new event loop. """def__init__(self,args:'argparse.Namespace',cancel_event:Optional[Union['asyncio.Event','multiprocessing.Event','threading.Event']]=None,**kwargs,):super().__init__(args,**kwargs)self._loop=asyncio.new_event_loop()asyncio.set_event_loop(self._loop)self.is_cancel=cancel_eventorasyncio.Event()ifnot__windows__:# TODO: windows event loops don't support signal handlerstry:forsignamein{'SIGINT','SIGTERM'}:self._loop.add_signal_handler(getattr(signal,signame),lambda*args,**kwargs:self.is_cancel.set(),)except(ValueError,RuntimeError)asexc:self.logger.warning(f' The runtime {self.__class__.__name__} will not be able to handle termination signals. 'f' {repr(exc)}')else:withImportExtensions(required=True,logger=self.logger,help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`. If you're using conda, please use the command `conda install -c anaconda pywin32`''',):importwin32apiwin32api.SetConsoleCtrlHandler(lambda*args,**kwargs:self.is_cancel.set(),True)self._setup_monitoring()self._loop.run_until_complete(self.async_setup())
[docs]defrun_forever(self):""" Running method to block the main thread. Run the event loop until a Future is done. """self._loop.run_until_complete(self._loop_body())
[docs]defteardown(self):"""Call async_teardown() and stop and close the event loop."""self._loop.run_until_complete(self.async_teardown())self._loop.stop()self._loop.close()super().teardown()
asyncdef_wait_for_cancel(self):"""Do NOT override this method when inheriting from :class:`GatewayPod`"""# threads are not using asyncio.Event, but threading.Eventifisinstance(self.is_cancel,asyncio.Event):awaitself.is_cancel.wait()else:whilenotself.is_cancel.is_set():awaitasyncio.sleep(0.1)awaitself.async_cancel()asyncdef_loop_body(self):"""Do NOT override this method when inheriting from :class:`GatewayPod`"""try:awaitasyncio.gather(self.async_run_forever(),self._wait_for_cancel())exceptasyncio.CancelledError:self.logger.warning('received terminate ctrl message from main process')def_cancel(self):""" Signal the runtime to terminate """self.is_cancel.set()
[docs]asyncdefasync_setup(self):"""The async method to setup."""pass
[docs]asyncdefasync_teardown(self):"""The async method to clean up resources during teardown. This method should free all resources allocated during async_setup"""pass
[docs]@abstractmethodasyncdefasync_cancel(self):"""An async method to cancel async_run_forever."""...
[docs]@abstractmethodasyncdefasync_run_forever(self):"""The async method to run until it is stopped."""...
# Static methods used by the Pod to communicate with the `Runtime` in the separate process
[docs]@staticmethoddefactivate(**kwargs):""" Activate the runtime, does not apply to these runtimes :param kwargs: extra keyword arguments """# does not apply to this types of runtimespass
[docs]@staticmethoddefis_ready(ctrl_address:str,**kwargs)->bool:""" Check if status is ready. :param ctrl_address: the address where the control request needs to be sent :param kwargs: extra keyword arguments :return: True if status is ready else False. """try:GrpcConnectionPool.send_request_sync(ControlRequest('STATUS'),ctrl_address,timeout=1.0)exceptRpcErrorase:returnFalsereturnTrue
[docs]@staticmethoddefwait_for_ready_or_shutdown(timeout:Optional[float],ready_or_shutdown_event:Union['multiprocessing.Event','threading.Event'],ctrl_address:str,**kwargs,):""" Check if the runtime has successfully started :param timeout: The time to wait before readiness or failure is determined :param ctrl_address: the address where the control message needs to be sent :param ready_or_shutdown_event: the multiprocessing event to detect if the process failed or is ready :param kwargs: extra keyword arguments :return: True if is ready or it needs to be shutdown """timeout_ns=1000000000*timeoutiftimeoutelseNonenow=time.time_ns()whiletimeout_nsisNoneortime.time_ns()-now<timeout_ns:ifready_or_shutdown_event.is_set()orAsyncNewLoopRuntime.is_ready(ctrl_address):returnTruetime.sleep(0.1)returnFalse
def_log_info_msg(self,request:Union[ControlRequest,DataRequest]):iftype(request)==DataRequest:self._log_data_request(request)eliftype(request)==ControlRequest:self._log_control_request(request)def_log_control_request(self,request:ControlRequest):self.logger.debug(f'recv ControlRequest {request.command} with id: {request.header.request_id}')def_log_data_request(self,request:DataRequest):self.logger.debug(f'recv DataRequest at {request.header.exec_endpoint} with id: {request.header.request_id}')