[docs]classAsyncFlowClient(AsyncBaseClient):"""Async Client to create/update/delete Flows on remote JinaD"""_kind='flow'_endpoint='/flows'@if_aliveasyncdefarguments(self)->Optional[Dict]:"""Get all arguments accepted by a remote Pod/Deployment :return: dict arguments of remote JinaD """asyncwithaiohttp.request(method='GET',url=f'{self.store_api}/arguments',timeout=self.timeout)asresponse:ifresponse.status==HTTPStatus.OK:returnawaitresponse.json()@if_aliveasyncdefcreate(self,workspace_id:'DaemonID',filename:str,envs:Dict[str,str]={},*args,**kwargs,)->str:"""Start a Flow on remote JinaD :param workspace_id: workspace id where flow will be created :param filename: name of the flow yaml file in the workspace :param envs: dict of env vars to be passed :param args: positional args :param kwargs: keyword args :return: flow id """envs=([('envs',f'{k}={v}')fork,vinenvs.items()]ifenvsandisinstance(envs,Dict)else[])asyncwithaiohttp.request(method='POST',url=self.store_api,params=[('workspace_id',workspace_id),('filename',filename)]+envs,timeout=self.timeout,)asresponse:response_json=awaitresponse.json()ifresponse.status!=HTTPStatus.CREATED:error_msg=error_msg_from(response_json)self._logger.error(f'{self._kind.title()} creation failed as: {error_msg}')returnerror_msgself._logger.success(f'Remote Flow created successfully with id {response_json}')returnresponse_json@if_aliveasyncdefrolling_update(self,id:Union[str,'DaemonID'],deployment_name:str,uses_with:Optional[Dict]=None,)->str:"""Perform `rolling_update` on a remote Flow :param id: flow id :param deployment_name: deployment name for rolling update :param uses_with: the uses with to override the Executors params :return: flow id """asyncwithaiohttp.request(method='PUT',url=f'{self.store_api}/rolling_update/{daemonize(id,self._kind)}',params={'deployment_name':deployment_name},json=uses_with,timeout=self.timeout,)asresponse:response_json=awaitresponse.json()ifresponse.status!=HTTPStatus.OK:error_msg=error_msg_from(response_json)self._logger.error(f'{self._kind.title()} update failed as: {error_msg}')returnerror_msgreturnresponse_json@if_aliveasyncdefscale(self,id:Union[str,'DaemonID'],deployment_name:str,replicas:int)->str:"""Scale a Deployment on a remote Flow :param id: flow id :param deployment_name: deployment name for rolling update :param replicas: The number of replicas to scale to :return: flow id """asyncwithaiohttp.request(method='PUT',url=f'{self.store_api}/scale/{daemonize(id,self._kind)}',params={'deployment_name':deployment_name,'replicas':replicas},timeout=self.timeout,)asresponse:response_json=awaitresponse.json()ifresponse.status!=HTTPStatus.OK:error_msg=error_msg_from(response_json)self._logger.error(f'{self._kind.title()} update failed as: {error_msg}')returnerror_msgreturnresponse_json@if_aliveasyncdefdelete(self,id:Union[str,'DaemonID'],*args,**kwargs)->bool:"""Terminate a Flow on remote JinaD :param id: flow id :param args: positional args :param kwargs: keyword args :return: True if deletion is successful """asyncwithaiohttp.request(method='DELETE',url=f'{self.store_api}/{daemonize(id,self._kind)}',timeout=self.timeout,)asresponse:response_json=awaitresponse.json()ifresponse.status!=HTTPStatus.OK:self._logger.error(f'deletion of {self._kind.title()}{id} failed: {error_msg_from(response_json)}')returnresponse.status==HTTPStatus.OK
[docs]classFlowClient(AsyncToSyncMixin,AsyncFlowClient):"""Client to create/update/delete Flows on remote JinaD"""