importmultiprocessingimportthreadingfromcopyimportdeepcopyfromfunctoolsimportpartialfromtypingimportTYPE_CHECKING,Callable,Dict,Optional,UnionfromgrpcimportRpcErrorfromjina.enumsimportGatewayProtocolType,PodRoleType,RuntimeBackendTypefromjina.hubble.helperimportis_valid_huburifromjina.hubble.hubioimportHubIOfromjina.serve.networkingimportGrpcConnectionPoolfromjina.types.request.controlimportControlRequestifTYPE_CHECKING:fromargparseimportNamespacedef_get_worker(args,target:Callable,kwargs:Dict,name:Optional[str]=None)->Union['threading.Thread','multiprocessing.Process']:return{RuntimeBackendType.THREAD:threading.Thread,RuntimeBackendType.PROCESS:multiprocessing.Process,}.get(getattr(args,'runtime_backend',RuntimeBackendType.THREAD))(target=target,name=name,kwargs=kwargs,daemon=True)def_get_event(obj)->Union[multiprocessing.Event,threading.Event]:ifisinstance(obj,threading.Thread):returnthreading.Event()elifisinstance(obj,multiprocessing.Process)orisinstance(obj,multiprocessing.context.ForkProcess):returnmultiprocessing.Event()elifisinstance(obj,multiprocessing.context.SpawnProcess):returnmultiprocessing.get_context('spawn').Event()else:raiseTypeError(f'{obj} is not an instance of "threading.Thread" nor "multiprocessing.Process"')
[docs]classConditionalEvent:""" :class:`ConditionalEvent` provides a common interface to an event (multiprocessing or threading event) that gets triggered when any of the events provided in input is triggered (OR logic) :param backend_runtime: The runtime type to decide which type of Event to instantiate :param events_list: The list of events that compose this composable event """def__init__(self,backend_runtime:RuntimeBackendType,events_list):super().__init__()self.event=Noneifbackend_runtime==RuntimeBackendType.THREAD:self.event=threading.Event()else:self.event=multiprocessing.synchronize.Event(ctx=multiprocessing.get_context())self.event_list=events_listforeinevents_list:self._setup(e,self._state_changed)self._state_changed()def_state_changed(self):bools=[e.is_set()foreinself.event_list]ifany(bools):self.event.set()else:self.event.clear()def_custom_set(self,e):e._set()e._state_changed()def_custom_clear(self,e):e._clear()e._state_changed()def_setup(self,e,changed_callback):e._set=e.sete._clear=e.cleare._state_changed=changed_callbacke.set=partial(self._custom_set,e)e.clear=partial(self._custom_clear,e)
[docs]defupdate_runtime_cls(args,copy=False)->'Namespace':"""Get runtime_cls as a string from args :param args: pod/deployment namespace args :param copy: True if args shouldn't be modified in-place :return: runtime class as a string """_args=deepcopy(args)ifcopyelseargsgateway_runtime_dict={GatewayProtocolType.GRPC:'GRPCGatewayRuntime',GatewayProtocolType.WEBSOCKET:'WebSocketGatewayRuntime',GatewayProtocolType.HTTP:'HTTPGatewayRuntime',}if_args.runtime_cls=='WorkerRuntime'andis_valid_huburi(_args.uses):_hub_args=deepcopy(_args)_hub_args.uri=_args.uses_hub_args.no_usage=True_args.uses=HubIO(_hub_args).pull()ifhasattr(_args,'protocol'):_args.runtime_cls=gateway_runtime_dict[_args.protocol]if_args.pod_role==PodRoleType.HEAD:_args.runtime_cls='HeadRuntime'return_args
[docs]defis_ready(address:str)->bool:""" TODO: make this async Check if status is ready. :param address: the address where the control message needs to be sent :return: True if status is ready else False. """try:GrpcConnectionPool.send_request_sync(ControlRequest('STATUS'),address)exceptRpcError:returnFalsereturnTrue