[docs]defget_fastapi_app(args:'argparse.Namespace',topology_graph:'TopologyGraph',connection_pool:'GrpcConnectionPool',logger:'JinaLogger',metrics_registry:Optional['CollectorRegistry']=None,):""" Get the app from FastAPI as the REST interface. :param args: passed arguments. :param topology_graph: topology graph that manages the logic of sending to the proper executors. :param connection_pool: Connection Pool to handle multiple replicas and sending to different of them :param logger: Jina logger. :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler :return: fastapi app """withImportExtensions(required=True):fromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewarefromfastapi.responsesimportHTMLResponsefromstarlette.requestsimportRequestfromjina.serve.runtimes.gateway.http.modelsimport(JinaEndpointRequestModel,JinaRequestModel,JinaResponseModel,JinaStatusModel,)docs_url='/docs'app=FastAPI(title=args.titleor'My Jina Service',description=args.descriptionor'This is my awesome service. You can set `title` and `description` in your `Flow` or `Gateway` ''to customize this text.',version=__version__,docs_url=docs_urlifargs.default_swagger_uielseNone,)ifargs.cors:app.add_middleware(CORSMiddleware,allow_origins=['*'],allow_credentials=True,allow_methods=['*'],allow_headers=['*'],)logger.warning('CORS is enabled. This service is now accessible from any website!')fromjina.serve.runtimes.gateway.request_handlingimportRequestHandlerfromjina.serve.streamimportRequestStreamerrequest_handler=RequestHandler(metrics_registry,args.name)streamer=RequestStreamer(args=args,request_handler=request_handler.handle_request(graph=topology_graph,connection_pool=connection_pool),result_handler=request_handler.handle_result(),)streamer.Call=streamer.stream@app.on_event('shutdown')asyncdef_shutdown():awaitconnection_pool.close()openapi_tags=[]ifnotargs.no_debug_endpoints:openapi_tags.append({'name':'Debug','description':'Debugging interface. In production, you should hide them by setting ''`--no-debug-endpoints` in `Flow`/`Gateway`.',})fromjina.serve.runtimes.gateway.http.modelsimportJinaHealthModel@app.get(path='/',summary='Get the health of Jina service',response_model=JinaHealthModel,)asyncdef_health():""" Get the health of this Jina service. .. # noqa: DAR201 """return{}@app.get(path='/status',summary='Get the status of Jina service',response_model=JinaStatusModel,tags=['Debug'],)asyncdef_status():""" Get the status of this Jina service. This is equivalent to running `jina -vf` from command line. .. # noqa: DAR201 """_info=get_full_version()return{'jina':_info[0],'envs':_info[1],'used_memory':used_memory_readable(),}@app.post(path='/post',summary='Post a data request to some endpoint',response_model=JinaResponseModel,tags=['Debug']# do not add response_model here, this debug endpoint should not restricts the response model)asyncdefpost(body:JinaEndpointRequestModel):""" Post a data request to some endpoint. This is equivalent to the following: from jina import Flow f = Flow().add(...) with f: f.post(endpoint, ...) .. # noqa: DAR201 .. # noqa: DAR101 """# The above comment is written in Markdown for better rendering in FastAPIfromjina.enumsimportDataInputTypebd=body.dict()# type: Dictreq_generator_input=bdreq_generator_input['data_type']=DataInputType.DICTifbd['data']isnotNoneand'docs'inbd['data']:req_generator_input['data']=req_generator_input['data']['docs']result=await_get_singleton_result(request_generator(**req_generator_input))returnresultdefexpose_executor_endpoint(exec_endpoint,http_path=None,**kwargs):"""Exposing an executor endpoint to http endpoint :param exec_endpoint: the executor endpoint :param http_path: the http endpoint :param kwargs: kwargs accepted by FastAPI """# set some default kwargs for richer semantics# group flow exposed endpoints into `customized` groupkwargs['tags']=kwargs.get('tags',['Customized'])kwargs['response_model']=kwargs.get('response_model',JinaResponseModel,# use standard response model by default)kwargs['methods']=kwargs.get('methods',['POST'])@app.api_route(path=http_pathorexec_endpoint,name=http_pathorexec_endpoint,**kwargs)asyncdeffoo(body:JinaRequestModel):fromjina.enumsimportDataInputTypebd=body.dict()ifbodyelse{'data':None}bd['exec_endpoint']=exec_endpointreq_generator_input=bdreq_generator_input['data_type']=DataInputType.DICTifbd['data']isnotNoneand'docs'inbd['data']:req_generator_input['data']=req_generator_input['data']['docs']result=await_get_singleton_result(request_generator(**req_generator_input))returnresultifnotargs.no_crud_endpoints:openapi_tags.append({'name':'CRUD','description':'CRUD interface. If your service does not implement those interfaces, you can should ''hide them by setting `--no-crud-endpoints` in `Flow`/`Gateway`.',})crud={'/index':{'methods':['POST']},'/search':{'methods':['POST']},'/delete':{'methods':['DELETE']},'/update':{'methods':['PUT']},}fork,vincrud.items():v['tags']=['CRUD']v['description']=f'Post data requests to the Flow. Executors with `@requests(on="{k}")` will respond.'expose_executor_endpoint(exec_endpoint=k,**v)ifopenapi_tags:app.openapi_tags=openapi_tagsifargs.expose_endpoints:endpoints=json.loads(args.expose_endpoints)# type: Dict[str, Dict]fork,vinendpoints.items():expose_executor_endpoint(exec_endpoint=k,**v)ifnotargs.default_swagger_ui:asyncdef_render_custom_swagger_html(req:Request)->HTMLResponse:importurllib.requestswagger_url='https://api.jina.ai/swagger'req=urllib.request.Request(swagger_url,headers={'User-Agent':'Mozilla/5.0'})withurllib.request.urlopen(req)asf:returnHTMLResponse(f.read().decode())app.add_route(docs_url,_render_custom_swagger_html,include_in_schema=False)ifargs.expose_graphql_endpoint:withImportExtensions(required=True):fromdataclassesimportasdictimportstrawberryfromdocarrayimportDocumentArrayfromdocarray.document.strawberry_typeimport(JSONScalar,StrawberryDocument,StrawberryDocumentInput,)fromstrawberry.fastapiimportGraphQLRouterasyncdefget_docs_from_endpoint(data,target_executor,parameters,exec_endpoint):req_generator_input={'data':[asdict(d)fordindata],'target_executor':target_executor,'parameters':parameters,'exec_endpoint':exec_endpoint,'data_type':DataInputType.DICT,}if(req_generator_input['data']isnotNoneand'docs'inreq_generator_input['data']):req_generator_input['data']=req_generator_input['data']['docs']response=await_get_singleton_result(request_generator(**req_generator_input))returnDocumentArray.from_dict(response['data']).to_strawberry_type()@strawberry.typeclassMutation:@strawberry.mutationasyncdefdocs(self,data:Optional[List[StrawberryDocumentInput]]=None,target_executor:Optional[str]=None,parameters:Optional[JSONScalar]=None,exec_endpoint:str='/search',)->List[StrawberryDocument]:returnawaitget_docs_from_endpoint(data,target_executor,parameters,exec_endpoint)@strawberry.typeclassQuery:@strawberry.fieldasyncdefdocs(self,data:Optional[List[StrawberryDocumentInput]]=None,target_executor:Optional[str]=None,parameters:Optional[JSONScalar]=None,exec_endpoint:str='/search',)->List[StrawberryDocument]:returnawaitget_docs_from_endpoint(data,target_executor,parameters,exec_endpoint)schema=strawberry.Schema(query=Query,mutation=Mutation)app.include_router(GraphQLRouter(schema),prefix='/graphql')asyncdef_get_singleton_result(request_iterator)->Dict:""" Streams results from AsyncPrefetchCall as a dict :param request_iterator: request iterator, with length of 1 :return: the first result from the request iterator """asyncforkinstreamer.stream(request_iterator=request_iterator):request_dict=k.to_dict()returnrequest_dictreturnapp