Input and Output Functions in Jina

This chapter explains the input and output functions of Jina’s Flow API.

Input Function

TL;DR

  • By default, everything is sent in a buffer

  • Use a crafter to handle the input

  • Shortcuts such as index_lines, index_ndarray and index_files are available to input predefined formats.

In the Flow API, we highlight that you can use .index(), .search() and .train() to feed index data and search queries to a Flow:

with f:
    f.index(input_fn)
with f:
    f.search(input_fn, top_k=50, output_fn=print)

input_fn is Iterator[bytes], each of which corresponds to a bytes representation of a Document.

A simple input_fn can be defined as follows:

def input_fn():
    for _ in range(10):
        yield b'look! i am a Document!'  # `s` is a "Document"! 

# or ...
input_fn = (b'look! i am a Document!' for _ in range(10))

Shortcuts

Function Description
index_files, search_files Use a list of files as the index/query source for the current Flow
index_lines, search_lines Use a list of lines as the index/query source for the current Flow
index_ndarray, search_ndarray Use a Numpy ndarray as the index/query source for the current Flow

Usage of index_ndarray()

import numpy as np
from jina.flow import Flow

input_data = np.random.random((3,8))

f = Flow().add(uses='_logforward')

with f:
    f.index_ndarray(input_data)
  • Add a dummy Pod with config _logforward to the Flow. _logforward is a built-in YAML, which just forwards input data to the results and prints it to the log. It is located in jina/resources/executors._forward.yml. You can also use your own YAML to organize pods.

  • Use the Flow to index an ndarray by calling the index_ndarray() API.

Calling the index_ndarray() API generates requests with the following message:

request {
  request_id: 1
  index {
    docs {
      id: 1
      weight: 1.0
      length: 100
      blob {
        buffer: "\004@\316\362/D\333?\244>\235\305\027\311\336?\267\210\251\311^\260\345?\366\n(\014\022m\356?\374\262\017\030\036\357\351?-c\300\337\217V\345?\241G\241\352\233\024\356?\340\346lUf\353\350?"
        shape: 8
        dtype: "float64"
      }
    }
    docs {
      id: 2
      weight: 1.0
      length: 100
      blob {
        buffer: "\312Wm\337\250\217\354?t\212\326\020\261\r\320?\254\262\300u<O\323?\340\210\222$\321\216\314?\310.q,+\347\311?&\316\361\310\252R\331?\214\016\201a\231\262\330?\342\231\262\221\343%\324?"
        shape: 8
        dtype: "float64"
      }
    }
    docs {
      id: 3
      weight: 1.0
      length: 100
      blob {
        buffer: "kT\250\372K%\345?\237\017+u\300\227\353?\3668\256\340\251\227\350?\327\006$\032$\002\341?\274\300\3573\371\262\343?\346\371\265dV\330\342?\370\210\360\002P3\340?\022i-\016\374\320\331?"
        shape: 8
        dtype: "float64"
      }
    }
  }
}

The structure of this message is defined in the format of protobuf. Check more details of the data structure at jina.proto. Messages are passed between the Pods in the Flow.

request contains input data and related metadata. The input is a 3*8 matrix that is sent to the Flow, which matches 3 request.index.docs, and the request.index.docs.blog.shape is 8. The vector of the matrix is stored in request.index.docs.blob, and the request.index.docs.blob.dtype indicates the type of the vector.

search_ndarray() is the API for searching np.ndarray. The data structure will be replaced from request.index to request.search, and the other nodes stay the same.

import numpy as np
from jina.flow import Flow

input_data = np.random.random((3,8))

f = Flow().add(uses='_logforward')

with f:
    f.search_ndarray(input_data)

Usage of index_files()

from jina.flow import Flow

f = Flow().add(uses='_logforward')

with f:
    f.index_files(f'../pokedex-with-bit/pods/*.yml')

API index_files() reads input data from ../pokedex-with-bit/pods/*.yml. In this directory, there are 5 YAML files, so you in the protobuf you can see:

  • 5 docs under request.index

  • Each file’s path in a request.index.doc.uri

request {
  request_id: 1
  index {
    docs {
      id: 1
      weight: 1.0
      length: 100
      uri: "../pokedex-with-bit/pods/encode-baseline.yml"
    }
    docs {
      id: 2
      weight: 1.0
      length: 100
      uri: "../pokedex-with-bit/pods/chunk.yml"
    }
    docs {
      id: 3
      weight: 1.0
      length: 100
      uri: "../pokedex-with-bit/pods/doc.yml"
    }
    docs {
      id: 4
      weight: 1.0
      length: 100
      uri: "../pokedex-with-bit/pods/encode.yml"
    }
    docs {
      id: 5
      weight: 1.0
      length: 100
      uri: "../pokedex-with-bit/pods/craft.yml"
    }
  }
}

search_files() is the API for searching files.

from jina.flow import Flow

f = Flow().add(uses='_logforward')

with f:
    f.search_files(f'../pokedex-with-bit/pods/chunk.yml')

Usage of index_lines()

from jina.flow import Flow
input_str = ['aaa','bbb']

f = Flow().add(uses='_logforward')
with f:
    f.index_lines(lines=input_str)

index_lines() reads input data from input_str. As you can see above, there are 2 elements in input_str, so in the protobuf you can see:

  • 2 docs under request.index.docs

  • Each individual string in request.index.docs.text.

request {
  request_id: 1
  index {
    docs {
      id: 1
      weight: 1.0
      length: 100
      mime_type: "text/plain"
      text: "aaa"
    }
    docs {
      id: 2
      weight: 1.0
      length: 100
      mime_type: "text/plain"
      text: "bbb"
    }
  }
}

search_lines() is the API for searching text.

from jina.flow import Flow

text = input('please type a sentence: ')

f = Flow().add(uses='_logforward')

with f:   
    f.search_lines(lines=[text, ])

Why Bytes/Buffer?

You may wonder why we use bytes instead of some Python native objects as the input. There are two reasons:

  • As a universal search framework, Jina accepts documents in different formats, from text to image to video. Raw bytes is the only consistent data representation over those modalities.

  • Clients can be written in languages other than Python. Raw bytes is the only data type that can be recognized across languages.

But Then How Can Jina Recognize Those Bytes?

The answer relies on the Flow’s crafter, and the “type recognition” is implemented as a “deserialization” step. The crafter is often the Flow’s first component, and translates the raw bytes into a Python native object.

For example, let’s say our input function reads gif videos in binary:

def input_fn():
    for g in all_gif_files:
        with open(g, 'rb') as fp:
            yield fp.read()

The corresponding crafter takes whatever is stored in the buffer and tries to make sense out of it:

import io
from PIL import Image
from jina.executors.crafters import BaseDocCrafter

class GifCrafter(BaseDocCrafter):
    def craft(self, buffer):
        im = Image.open(io.BytesIO(buffer))
        # manipulate the image here
        # ...

In this example, PIL.Image.open takes either the filename or file object as argument. We convert buffer to a file object here using io.BytesIO.

Alternatively, if your input function is only sending the file name, like:

def input_fn():
    for g in all_gif_files:
        yield g.encode()  # convert str to binary string b'str'

Then the corresponding crafter should change accordingly.


from PIL import Image
from jina.executors.crafters import BaseDocCrafter

class GifCrafter(BaseDocCrafter):
    def craft(self, buffer):
        im = Image.open(buffer.decode())
        # manipulate the image here
        # ...

buffer now stores the file path, so we convert it back to a normal string with .decode() and read from the file path.

You can also combine two types of data, like:

def input_fn():
    for g in all_gif_files:
        with open(g, 'rb') as fp:
            yield g.encode() + b'JINA_DELIM' + fp.read()

The crafter then can be implemented as:

from jina.executors.crafters import BaseDocCrafter
import io
from PIL import Image

class GifCrafter(BaseDocCrafter):

    def craft(self, buffer, *args, **kwargs):
        file_name, img_raw = buffer.split(b'JINA_DELIM')
        im = Image.open(io.BytesIO(img_raw))
        # manipulate the image and file_name here
        # ...

As you can see from the examples above, we can use buffer to transfer strings and gif videos.

.index(), .search() and .train() also accept batch_size which controls the number of Documents per request. However, this does not change the crafter’s implementation, as the crafter always works at the Document level.

Further reading:

Output Function

TL;DR

  • Everything works asynchronously

  • Use callback= to specify the output function

Jina’s output function is basically asynchronous callback. For the sake of efficiency, Jina is designed to be highly asynchronous on data transmission. You just keep sending requests to Jina without any blocking. When a request is finished, the callback function is invoked.

For example, the following will print the request after a IndexRequest is finished:

with f:
    f.index(input_fn, output_fn=print)

This is quite useful when debugging.

In the “Hello, World!” example, we use a callback function to append the top-k results to an HTML page:

def print_html(resp):
    for d in resp.search.docs:
        vi = 'data:image/png;base64,' + d.meta_info.decode()
        result_html.append(f'<tr><td><img src="{vi}"/></td><td>')
        for kk in d.matches:
            kmi = 'data:image/png;base64,' + kk.match_doc.meta_info.decode()
            result_html.append(f'<img src="{kmi}" style="opacity:{kk.score.value}"/>')
            # k['score']['explained'] = json.loads(kk.score.explained)
        result_html.append('</td></tr>\n')
f.search(input_fn,
                 output_fn=print_html, top_k=args.top_k, batch_size=args.query_batch_size)