[docs]classAsyncBaseClient:""" JinaD baseclient (Async) :param uri: the uri of ``jinad`` instance :param logger: jinad logger :param timeout: stop waiting for a response after a given number of seconds with the timeout parameter. """_kind=''_endpoint='/'def__init__(self,uri:str,logger:JinaLogger,timeout:int=None,):self._logger=loggerself.timeout=aiohttp.ClientTimeout(timeoutifisinstance(timeout,Number)else10*60)self.http_uri=f'http://{uri}'self.store_api=f'{self.http_uri}{self._endpoint}'@if_aliveasyncdefalive(self)->bool:"""Check if JinaD is alive :return: True if JinaD is alive at remote else false """asyncwithaiohttp.request(method='GET',url=self.http_uri,timeout=self.timeout)asresponse:returnresponse.status==HTTPStatus.OK@if_aliveasyncdefstatus(self)->Optional[Dict]:"""Get status of remote JinaD :return: dict status of remote JinaD """asyncwithaiohttp.request(method='GET',url=f'{self.http_uri}/status',timeout=self.timeout)asresponse:ifresponse.status==HTTPStatus.OK:returnawaitresponse.json()else:self._logger.error(f'got response {response.status} while getting status {self._kind}s')@if_aliveasyncdefget(self,id:Union[str,DaemonID])->Optional[Union[str,Dict]]:"""Get status of the remote object :param id: identity of the Pod/Deployment :return: response if the remote Flow/Pod/Deployment/Workspace exists """asyncwithaiohttp.request(method='GET',url=f'{self.store_api}/{daemonize(id,self._kind)}',timeout=self.timeout,)asresponse:response_json=awaitresponse.json()ifresponse.status==HTTPStatus.UNPROCESSABLE_ENTITY:self._logger.error(f'validation error in the request: {error_msg_from(response_json)}')returnresponse_json['body']elifresponse.status==HTTPStatus.NOT_FOUND:self._logger.error(f'couldn\'t find {id} in remote {self._kind.title()} store')returnresponse_json['detail']else:returnresponse_json@if_aliveasyncdeflist(self)->Dict:"""List all objects in the store :return: json response of the remote Flow/Pod/Deployment/Workspace status """asyncwithaiohttp.request(method='GET',url=self.store_api,timeout=self.timeout)asresponse:ifresponse.status==HTTPStatus.OK:response_json=awaitresponse.json()self._logger.success(f'found {len(response_json.get("items",[]))}{self._kind.title()}(s) in store')return(response_json['items']if'items'inresponse_jsonelseresponse_json)else:self._logger.error(f'got response {response.status} while listing all {self._kind}s')@if_aliveasyncdefclear(self):"""Send delete request to api :return : json response of the remote Flow/Pod/Deployment/Workspace status """asyncwithaiohttp.request(method='DELETE',url=f'{self.store_api}')asresponse:ifresponse.status==HTTPStatus.OK:returnawaitresponse.json()else:self._logger.error(f'got response {response.status} while sending delete request {self._kind}s')
[docs]asyncdefcreate(self,*args,**kwargs)->Dict:"""Create a Workspace/Flow/Pod/Deployment on remote. Must be implemented by the inherited class. # noqa: DAR101 # noqa: DAR102 """raiseNotImplementedError
[docs]asyncdefupdate(self,*args,**kwargs)->Dict:"""Update a Workspace/Flow/Pod/Deployment on remote. Must be implemented by the inherited class. # noqa: DAR101 # noqa: DAR102 """raiseNotImplementedError
[docs]asyncdefdelete(self,id:DaemonID,*args,**kwargs)->str:"""Delete a Workspace/Flow/Pod/Deployment on remote. Must be implemented by the inherited class. # noqa: DAR101 # noqa: DAR102 """raiseNotImplementedError