# Master Executor: from Zero to Hub¶

Cristian @ Jina AI

Sept. 10, 2021

This is a step-by-step walkthrough on how to create your Executors or use existing ones.

We will create a simple logging Executor. It will log the Documents’ information as they reach it, and save these to a file. We will also see how to push our Executor to Jina Hub to use it later.

## Set-up & overview¶

We recommend creating a new python virtual environment to have a clean installation of Jina and prevent dependency clashing.

We can start by installing Jina:

pip install jina


To create your Executor, you just need to run this command in your terminal:

jina hub new


A wizard will ask you some questions about the Executor. For the basic configuration, you will be asked two things:

• the Executor’s name

• where it should be saved

For this tutorial, we will call ours RequestLogger. And you can save it wherever you want to have your project. The wizard will ask if you want to have a more advanced configuration, but it is unnecessary for this tutorial.

### Logger Executor¶

Once we followed the wizard, we have our folder structure ready. We can start working with the executor.py. Open that file, and let’s import the following

import os
import time
from typing import Dict

from jina import Executor, DocumentArray, requests
from jina.logging.logger import JinaLogger


Then we create our class that inherits from the Executor base class. We will call ours RequestLogger

Important

You always need to inherit from the Executor class, in order for the class to be properly registered into Jina.

class RequestLogger(Executor):


Our Executor will have two methods: one for the constructor and one for the actual logging:

class RequestLogger(Executor):
def __init__(self, **args, **kwargs):
# Whatever you need for our constructor

def log():
# Whatever we need for our logging


It could be helpful to specify the number of Documents we want to work with, so we pass this directly in the arguments of our constructor

def __init__(self,
default_log_docs: int = 1,
# here you can pass whatever other arguments you need
*args, **kwargs):


Important

You need to do this before writing any custom logic. It’s required in order to register the parent class, which instantiates special fields and methods.

super().__init__(*args, **kwargs)


Now we start creating our constructor method. We set the default_log_docs we got from the arguments:

self.default_log_docs = default_log_docs


For logging, we need to create an instance of JinaLogger. We also need to specify the path where we save our log file.

self.logger = JinaLogger('req_logger')
self.log_path = os.path.join(self.workspace, 'log.txt')


Note

self.workspace will be provided by the Executor parent class.

And finally, we need to create the file, in case it doesn’t exist.

if not os.path.exists(self.log_path):
with open(self.log_path, 'w'): pass


Ok, that’s it for our constructor, by now we should have something like this:

class RequestLogger(Executor):                                                                      # needs to inherit from Executor
def __init__(self,
default_log_docs: int = 1,                                                          # number of documents to log
*args, **kwargs):                                                                   # *args and **kwargs are required for Executor
super().__init__(*args, **kwargs)                                                           # before any custom logic
self.default_log_docs = default_log_docs
self.logger = JinaLogger('req_logger')                                                      # create instance of JinaLogger
self.log_path = os.path.join(self.workspace, 'log.txt')                                     # set path to save the log.txt
if not os.path.exists(self.log_path):                                                       # check the file doesn't exist already
with open(self.log_path, 'w'): pass


We can start creating our log method now. First of all, we need the @requests decorator. This is to communicate to the Flow when the function will be called and on which endpoint. We use @requests without any endpoint, so we will call our function on every request:

@requests
def log(self,
docs: Optional[DocumentArray],
parameters: Dict,
**kwargs):


It’s important to note the arguments here.

Important

It’s not possible to redefine the interface of the public methods decorated by @requests. You can’t change the name of these arguments. To see exactly which parameters you can use, check here.

If you would like to call your log function only on /index time, you specify the endpoint with on=, like this:

@requests(on='/index')
def log(self,
docs: Optional[DocumentArray],
parameters: Dict,
**kwargs):


If you want more information on how to use this decorator, refer to the documentation. In this example, we want to call our log function on every request, so we don’t specify any endpoint.

Now we can add the logic for our function. First, we will print a line that displays some information. And then, we will save the details from our Documents:

self.logger.info('Request being processed...')
nr_docs = int(parameters.get('log_docs', self.default_log_docs))         # accesing parameters (nr are passed as float due to Protobuf)
with open(self.log_path, 'a') as f:
f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
for i, doc in enumerate(docs):
f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
if i + 1 == nr_docs:
break


Here you can set whatever logic you need for your Executor. By now, your code should look like this:

import os
import time
from typing import Dict, Optional

from jina import Executor, DocumentArray, requests
from jina.logging.logger import JinaLogger

class RequestLogger(Executor):                                                                      # needs to inherit from Executor
def __init__(self,
default_log_docs: int = 1,                                                          # your arguments
*args, **kwargs):                                                                   # *args and **kwargs are required for Executor
super().__init__(*args, **kwargs)                                                           # before any custom logic
self.default_log_docs = default_log_docs
self.logger = JinaLogger('req_logger')
self.log_path = os.path.join(self.workspace, 'log.txt')
if not os.path.exists(self.log_path):
with open(self.log_path, 'w'): pass

@requests                                                                                       # decorate, by default it will be called on every request
def log(self,                                                                                   # arguments are automatically received
docs: Optional[DocumentArray],
parameters: Dict,
**kwargs):
self.logger.info('Request being processed...')

nr_docs = int(parameters.get('log_docs', self.default_log_docs))                            # accesing parameters (nr are passed as float due to Protobuf)
with open(self.log_path, 'a') as f:
f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
for i, doc in enumerate(docs):
f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
if i + 1 == nr_docs:
break


And that’s it. We have an Executor that takes whatever Documents we pass to it and logs them.

Ok, and what now? How can you use this in your app?

### Push your Executor to Hub¶

We could use our Executor directly in our app, but here we will see how to push it to Jina Hub so we can share it with more people, or use it later.

First step is to actually make sure the manifest.yml and config.yml files are still relevant. Check that the data in there still represent you Executor’s purpose.

For this, you need to open a terminal in the folder of your executor.py, so in this case, open a terminal inside the RequestLogger folder. And there you just need to type:

jina hub push --public .


This means you will push your Executor publicly to Jina Hub. The last dot means you will use your current path. Once you run that command, you should see something like this:

Note

Since we pushed our Executor using the --public flag, the only thing we will use is the ID. In this case, it’s zsor7fe6. Refer to Jina Hub usage.

Let’s create a Jina Flow that can use the Executor we just wrote. Create an app.py in the same folder as RequestLogger. Now open it and import Flow, DocumentArray, Document before we create our main function:

from jina import Flow, DocumentArray, Document

def main():
# We'll have our Flow here

if __name__ == '__main__':
main()


The Executor we just created logs whatever Documents we pass to it. So we need to create some Documents first. We’ll do that in main()

def main():
docs = DocumentArray()
docs.append(Document(content='I love cats'))                # creating documents
docs.append(Document(content='I love every type of cat'))
docs.append(Document(content='I guess dogs are ok'))


We have three Documents in one DocumentArray. Now let’s create a Flow and add the Executor we created. We will reference it by the ID we got when we pushed it (in my case, it was zsor7fe6):

flow = Flow().add(
uses='jinahub+docker://zsor7fe6',                   # here we choose to use the Executor inside a docker container
uses_with={                                         # RequestLogger arguments
'default_log_docs': 3
},
volumes='workspace:/internal_workspace',            # mapping local folders to docker instance folders
uses_metas={                                        # Executor (parent class) arguments
'workspace': '/internal_workspace',             # this should match the above
},
)


This seems like plenty of details, so let’s explain them:

uses='jinahub+docker://zsor7fe6',


Here you use uses= to specify the image of your Executor. This will start a Docker container with the image of the Executor we built and deployed in the previous step. So don’t forget to change the ID to the correct one.

uses_with={                                         # RequestLogger arguments
'default_log_docs': 3
},


We need uses_with= to pass the arguments we need. In our case, we have only one argument: default_log_docs. In the constructor of our RequestLogger Executor, we defined the default_log_docs as 1, but we override it here with 3, so 3 will be the new value.

The next line refers to our workspace:

volumes='workspace:/internal_workspace',


Here we are mapping the workspace folder that will be created when we run our app to a folder called internal_workspace in Docker. We do this because our Executor logs the Documents into a file, and we want to save that file on our local disk. If we don’t do that, the information would be saved in the Docker container, and you would need to access that container to see files. To do this, we use volumes= and set it to our internal workspace.

The last part overrides arguments too, but this time for the Executor parent class:

uses_metas={                                                # Executor (parent class) arguments
'workspace': '/internal_workspace',             # this should match the above
},


In our case, the only argument we want to override is the name of the workspace. If you don’t do this, a folder with the same name of your Executor class (RequestLogger) would be created, and your information would have been saved there. But since we just mounted our workspace with the name internal_workspace in Docker, we need to make a folder with that same name.

Ok, we have our Flow ready with the Executor we deployed previously. We can use it now. Let’s start by indexing the Documents:

with flow as f:                                                 # Flow is a context manager
f.post(
on='/index',                                        # the endpoint
inputs=docs,                                        # the documents we send as input
)


The Executor we created doesn’t care about what endpoint is used, so it will perform the same operation no matter what endpoint you specify here. In this example, we set it to on='/index' anyway. Here you could use one for index and another one for query if you need it and your Executor has the proper endpoints.

So far, your code should look like this:

from jina import Flow, DocumentArray, Document

def main():
docs = DocumentArray()
docs.append(Document(content='I love cats'))                # creating documents
docs.append(Document(content='I love every type of cat'))
docs.append(Document(content='I guess dogs are ok'))

flow = Flow().add(                                          # provide as class name or jinahub+docker URI
uses='jinahub+docker://7dne55rj',
uses_with={                                             # RequestLogger arguments
'default_log_docs': 3
},
volumes='workspace:/internal_workspace',                # mapping local folders to docker instance folders
uses_metas={                                            # Executor (parent class) arguments
'workspace': '/internal_workspace',                 # this should match the above
},
)

with flow as f:                                             # Flow is a context manager
f.post(
on='/index',                                        # the endpoint
inputs=docs,                                        # the documents we send as input
)

if __name__ == '__main__':
main()


When you run this, you will see a new workspace folder created with two other folders inside. One called RequestLogger or whatever name you used in your class. And another folder for the sharding, but we won’t talk about that in this tutorial because it’s out of scope. Inside the sharding folder called 0, you will see a log.txt` file. And there you will have the 3 Documents with their information.

And that’s it! You created an Executor, pushed it to Jina Hub, and used it in your app.

There are still a lot of concepts to learn. So stay tuned for our following tutorials.

If you have any issues following this tutorial, you can always get support from our Slack community