importwarningsfromfunctoolsimportpartialmethod,wrapsfrominspectimportsignaturefromtypingimportTYPE_CHECKING,AsyncGenerator,Dict,List,Optional,Unionfromjina.helperimportget_or_reuse_loop,run_asyncfromjina.importerimportImportExtensionsifTYPE_CHECKING:fromjinaimportDocumentArrayfromjina.clients.baseimportCallbackFnType,InputTypefromjina.types.requestimportResponsedef_include_results_field_in_param(parameters:Optional['Dict'])->'Dict':key_result='__results__'ifparameters:ifkey_resultinparameters:ifnotisinstance(parameters[key_result],dict):warnings.warn(f'It looks like you passed a dictionary with the key `{key_result}` to `parameters`.''This key is reserved, so the associated value will be deleted.')parameters.update({key_result:dict()})else:parameters={key_result:dict()}returnparameters
[docs]classMutateMixin:"""The GraphQL Mutation Mixin for Client and Flow"""
[docs]defmutate(self,mutation:str,variables:Optional[dict]=None,timeout:Optional[float]=None,headers:Optional[dict]=None,):"""Perform a GraphQL mutation :param mutation: the GraphQL mutation as a single string. :param variables: variables to be substituted in the mutation. Not needed if no variables are present in the mutation string. :param timeout: HTTP request timeout :param headers: HTTP headers :return: dict containing the optional keys ``data`` and ``errors``, for response data and errors. """withImportExtensions(required=True):fromsgqlc.endpoint.httpimportHTTPEndpointasSgqlcHTTPEndpointproto='https'ifself.args.tlselse'http'graphql_url=f'{proto}://{self.args.host}:{self.args.port}/graphql'endpoint=SgqlcHTTPEndpoint(graphql_url)res=endpoint(mutation,variables=variables,timeout=timeout,extra_headers=headers)returnres
[docs]classAsyncMutateMixin(MutateMixin):"""The async GraphQL Mutation Mixin for Client and Flow"""
[docs]asyncdefmutate(self,mutation:str,variables:Optional[dict]=None,timeout:Optional[float]=None,headers:Optional[dict]=None,):"""Perform a GraphQL mutation, asynchronously :param mutation: the GraphQL mutation as a single string. :param variables: variables to be substituted in the mutation. Not needed if no variables are present in the mutation string. :param timeout: HTTP request timeout :param headers: HTTP headers :return: dict containing the optional keys ``data`` and ``errors``, for response data and errors. """returnawaitget_or_reuse_loop().run_in_executor(None,super().mutate,mutation,variables,timeout,headers)
[docs]classPostMixin:"""The Post Mixin class for Client and Flow"""
[docs]defpost(self,on:str,inputs:Optional['InputType']=None,on_done:Optional['CallbackFnType']=None,on_error:Optional['CallbackFnType']=None,on_always:Optional['CallbackFnType']=None,parameters:Optional[Dict]=None,target_executor:Optional[str]=None,request_size:int=100,show_progress:bool=False,continue_on_error:bool=False,return_responses:bool=False,**kwargs,)->Optional[Union['DocumentArray',List['Response']]]:"""Post a general data request to the Flow. :param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document. :param on: the endpoint which is invoked. All the functions in the executors decorated by `@requests(on=...)` with the same endpoint are invoked. :param on_done: the function to be called when the :class:`Request` object is resolved. :param on_error: the function to be called when the :class:`Request` object is rejected. :param on_always: the function to be called when the :class:`Request` object is either resolved or rejected. :param parameters: the kwargs that will be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param request_size: the number of Documents per request. <=0 means all inputs in one request. :param show_progress: if set, client will show a progress bar on receiving every request. :param continue_on_error: if set, a Request that causes callback error will be logged only without blocking the further requests.7 :param return_responses: if set to True, the result will come as Response and not as a `DocumentArray` :param kwargs: additional parameters :return: None or DocumentArray containing all response Documents .. warning:: ``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``. """c=self.clientifc.args.return_responsesandnotreturn_responses:warnings.warn('return_responses was set in the Client constructor. Therefore, we are overriding the `.post()` input ''parameter `return_responses`. This argument will be deprecated from the `constructor` ''soon. We recommend passing `return_responses` to the `post` method.')return_responses=Truec.show_progress=show_progressc.continue_on_error=continue_on_errorparameters=_include_results_field_in_param(parameters)on_error=_wrap_on_error(on_error)ifon_errorisnotNoneelseon_errorfromjinaimportDocumentArrayreturn_results=(on_alwaysisNone)and(on_doneisNone)asyncdef_get_results(*args,**kwargs):result=[]ifreturn_responseselseDocumentArray()asyncforrespinc._get_results(*args,**kwargs):ifreturn_results:ifreturn_responses:result.append(resp)else:result.extend(resp.data.docs)ifreturn_results:returnresultreturnrun_async(_get_results,inputs=inputs,on_done=on_done,on_error=on_error,on_always=on_always,exec_endpoint=on,target_executor=target_executor,parameters=parameters,request_size=request_size,**kwargs,)
# ONLY CRUD, for other request please use `.post`index=partialmethod(post,'/index')search=partialmethod(post,'/search')update=partialmethod(post,'/update')delete=partialmethod(post,'/delete')
[docs]classAsyncPostMixin:"""The Async Post Mixin class for AsyncClient and AsyncFlow"""
[docs]asyncdefpost(self,on:str,inputs:Optional['InputType']=None,on_done:Optional['CallbackFnType']=None,on_error:Optional['CallbackFnType']=None,on_always:Optional['CallbackFnType']=None,parameters:Optional[Dict]=None,target_executor:Optional[str]=None,request_size:int=100,show_progress:bool=False,continue_on_error:bool=False,return_responses:bool=False,**kwargs,)->AsyncGenerator[None,Union['DocumentArray','Response']]:"""Async Post a general data request to the Flow. :param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document. :param on: the endpoint which is invoked. All the functions in the executors decorated by `@requests(on=...)` with the same endpoint are invoked. :param on_done: the function to be called when the :class:`Request` object is resolved. :param on_error: the function to be called when the :class:`Request` object is rejected. :param on_always: the function to be called when the :class:`Request` object is either resolved or rejected. :param parameters: the kwargs that will be sent to the executor :param target_executor: a regex string. Only matching Executors will process the request. :param request_size: the number of Documents per request. <=0 means all inputs in one request. :param show_progress: if set, client will show a progress bar on receiving every request. :param continue_on_error: if set, a Request that causes callback error will be logged only without blocking the further requests. :param return_responses: if set to True, the result will come as Response and not as a `DocumentArray` :param kwargs: additional parameters :yield: Response object .. warning:: ``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``. """c=self.clientifc.args.return_responsesandnotreturn_responses:warnings.warn('return_responses was set in the Client constructor. Therefore, we are overriding the `.post()` input ''parameter `return_responses`. This argument will be deprecated from the `constructor` ''soon. We recommend passing `return_responses` to the `post` method.')return_responses=Truec.show_progress=show_progressc.continue_on_error=continue_on_errorparameters=_include_results_field_in_param(parameters)on_error=_wrap_on_error(on_error)ifon_errorisnotNoneelseon_errorasyncforresultinc._get_results(inputs=inputs,on_done=on_done,on_error=on_error,on_always=on_always,exec_endpoint=on,target_executor=target_executor,parameters=parameters,request_size=request_size,**kwargs,):ifnotreturn_responses:yieldresult.data.docselse:yieldresult
# ONLY CRUD, for other request please use `.post`index=partialmethod(post,'/index')search=partialmethod(post,'/search')update=partialmethod(post,'/update')delete=partialmethod(post,'/delete')
def_wrap_on_error(on_error):num_args=len(signature(on_error).parameters)ifnum_args==1:warnings.warn("on_error callback taking only the response parameters is deprecated. Please add one parameter ""to handle additional optional Exception as well",DeprecationWarning,)@wraps(on_error)def_fn(resp,exception):# just skip the exceptionreturnon_error(resp)return_fnelse:returnon_error