Find Small Images Inside Large Images

Alaeddine @ Jina AI

October 29, 2021

Open In Colab

The purpose of this tutorial is to build an image search engine capable of finding small images inside bigger ones. This requires a different architecture than typical image search engines since we need to perform object detection.

Tip

The full source code of this tutorial is available in this google colab notebook

Understanding And Formulating the Problem

As we want to find small images inside big images, simply encoding both the indexed images and the query image and matching will not work. Imagine that you have the following big image :

../../../_images/cat-bird.jpg

It contains a scene with a cat in the background, a bird and a few other items in the scene.

Now let’s suppose that the query image is a simple bird:

../../../_images/bird.jpg

Encoding the query image will generate embeddings that effectively represent it. However, it’s not easy to build an encoder that effectively represents the big image, since it contains a complex scene with different objects. The embeddings will not be representative enough and therefore we need to think about a better approach.

Can you think of another solution ?

Hint

Encoding a complex image is not easy, but what if we can encode objects inside it ? Imagine that we can identify these objects inside the big image like so:

../../../_images/cat-bird-detections.jpg

Right, identifying objects inside the big image and then encoding each one of them will result in better, more representative embeddings. Right now, we should ask 2 questions:

  1. How can we identify objects ?

  2. How can we retrieve the big image if we match the query against identified objects ?

The first question is easy. And the response is simply object detection. There are many models that can perform object detection and in this tutorial, we will be using yolov5. Detected objects can be easily represented as chunks of the original indexed documents.

See Also

If you’re not familiar with chunks in jina, check this section

The second question can be a bit complex. Actually, we will match query documents against chunks of the original documents but we need to return the original documents (the big images). We can solve this problem by relying on a ranker executor, which roughly does the following:

  1. Retrieve the parent document IDs from the matched chunks along with their scores

  2. For each parent ID, aggregate the scores of that parent

  3. Replace the matches by the parent documents instead of children documents (aka chunks).

  4. Sort the new matches by their aggregated scores.

Cool, seems like a complex logic, but no worries, we will build our ranker executor later step by step. However, note that since the ranker is not a storage executor, it’s not capable of retrieving the parent documents from chunks. Instead, we can create empty documents that only contain the IDs. This implies that in a later step, we need to retrieve those documents by IDs.

Now let’s try to imagine and design our Flows given what we’ve discussed so far:

Index Flow:

../../../_images/index_flow_brainstorming.svg

Query Flow:

../../../_images/query_flow_brainstorming.svg

Oh, because we use the ranker, we will need something to help us retrieve original parent documents by IDs. Well that can be any storage executor. Actually Jina Hub includes many storage executors but in this tutorial, we will build our own storage executor. Since this executor should store parent documents, we will call it the root_indexer. Also, since we need it in the query Flow, we also have to add it to the index Flow. One more note, this root_indexer will index documents as they are, so it makes sense to put it in parallel to the other processing steps (segmenting, encoding,…).

Now, the technology behind this executor will be LMDB.

See Also

Jina natively supports complex toplogies of Flow where you can put executors in parallel. Checkout this section to learn more.

Cool, but what about the other indexer ?

Well, it should support matching and indexing chunks of images after they are segmented. Therefore, it needs to support vector search along with indexing. The Jina Hub already includes such indexers (for example, SimpleIndexer), however, we will create our own version of simple indexer. And by the way, it will be convenient to rename this indexer to chunks_indexer.

Alright, before seeing the final architecture, let’s agree on names for our executors:

  • chunks_indexer: SimpleIndexer

  • root_indexer: LMDBStorage (well because we use LMDB)

  • encoder: CLIPImageEncoder (yes we will be using the CLIP model to encode images)

  • segmenter: YoloV5Segmenter. Actually we could name object-detector but segmenter is a term that aligns better with Jina’s terminology

  • ranker: SimpleRanker (trust me it’s going to be simple)

Finally, here is what our Flows will look like. Index Flow:

../../../_images/index_flow.svg

Query Flow:

../../../_images/query_flow.svg

Pre-requisites

In this tutorial, we will need the following dependencies installed:

pip install Pillow jina==2.1.13 torch==1.9.0 torchvision==0.10.0 transformers==4.9.1 yolov5==5.0.7 lmdb==1.2.1 matplotlib [email protected]+https://github.com/jina-ai/jina-commons.git#egg=jina-commons

We also need to download the dataset and unzip it.

You can use the link or the following commands:

wget https://open-images.s3.eu-central-1.amazonaws.com/data.zip
unzip data.zip

You should find 2 folders after unzipping:

  • images: this folder contains the images that we will index

  • query: this folder contains small images that we will use as search queries

Building Executors

In this section, we will start developing the necessary executors, for both query and index flows.

CLIPImageEncoder

This encoder encodes an image into embeddings using the CLIP model. We want an executor that loads the CLIP model and encodes it during the query and index flows.

Our executor should:

  • support both GPU and CPU: That’s why we will provision the device parameter and use it when encoding.

  • be able to process documents in batches in order to use our resources effectively: To do so, we will use the parameter batch_size

  • be able to encode the full image during the query flow and encode only chunks during the index flow: This can be achieved with traversal_paths and method DocumentArray.batch.

from typing import Optional, Tuple

import torch
from jina import DocumentArray, Executor, requests
from jina.logging.logger import JinaLogger
from transformers import CLIPFeatureExtractor, CLIPModel


class CLIPImageEncoder(Executor):
    """Encode image into embeddings using the CLIP model."""

    def __init__(
        self,
        pretrained_model_name_or_path: str = "openai/clip-vit-base-patch32",
        device: str = "cpu",
        batch_size: int = 32,
        traversal_paths: Tuple = ("r",),
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.batch_size = batch_size
        self.traversal_paths = traversal_paths
        self.pretrained_model_name_or_path = pretrained_model_name_or_path

        self.device = device
        self.preprocessor = CLIPFeatureExtractor.from_pretrained(
            pretrained_model_name_or_path
        )
        self.model = CLIPModel.from_pretrained(self.pretrained_model_name_or_path)
        self.model.to(self.device).eval()

    @requests
    def encode(self, docs: Optional[DocumentArray], parameters: dict, **kwargs):
        if docs is None:
            return

        traversal_paths = parameters.get("traversal_paths", self.traversal_paths)
        batch_size = parameters.get("batch_size", self.batch_size)
        document_batches_generator = docs.batch(
            traversal_paths=traversal_paths,
            batch_size=batch_size,
            require_attr="blob",
        )

        with torch.inference_mode():
            for batch_docs in document_batches_generator:
                blob_batch = [d.blob for d in batch_docs]
                tensor = self._generate_input_features(blob_batch)


                embeddings = self.model.get_image_features(**tensor)
                embeddings = embeddings.cpu().numpy()

                for doc, embed in zip(batch_docs, embeddings):
                    doc.embedding = embed

    def _generate_input_features(self, images):
        input_tokens = self.preprocessor(
            images=images,
            return_tensors="pt",
        )
        input_tokens = {
            k: v.to(torch.device(self.device)) for k, v in input_tokens.items()
        }
        return input_tokens

YoloV5Segmenter

Since we want to retrieve small images in bigger images, the technique that we will heavily rely on is segmenting. Basically, we want to do object detection on the indexed images. This will generate bounding boxes around objects detected inside the images. The detected objects will be extracted and added as chunks to the original documents. By the way, guess what is the state-of-the-art object detection model ?

Right, we will use YoloV5.

Our YoloV5Segmenter should be able to load the ultralytics/yolov5 model from Torch hub, otherwise, load a custom model. To achieve this, the executor accepts parameter model_name_or_path which will be used when loading. We will implement the method load which checks if the model exists in the the Torch Hub, otherwise, loads it as a custom model.

For our use case, we will just rely on yolov5s (small version of yolov5). Of course, for better quality, you can choose a more complicated model or your custom model.

Furthermore, we want YoloV5Segmenter to support both GPU and CPU and it should be able to process in batches. Again, this is as simple as adding parameters device and batch_size and using them during segmenting.

To perform segmenting, we will implement method _segment_docs which performs the following steps:

  1. For each batch (a batch consists of several images), use the model to get predictions for each image

  2. Each prediction of an image can contain several detections (because yolov5 will extract as much bounding boxes as possible, along with their confidence scores). We will filter out detections whose scores are below the confidence_threshold to keep good quality.

Each detection is actually 2 points -top left (x1, y1) and bottom right (x2, y2)- a confidence score and a class. We will not use the class of the detection, but it can be useful in other search applications.

  1. With the detections that we have, we create crops (using the 2 points returned). Finally, we add these crops to image documents as chunks.

from typing import Dict, Iterable, Optional

import torch
from jina import Document, DocumentArray, Executor, requests
from jina_commons.batching import get_docs_batch_generator


class YoloV5Segmenter(Executor):

    def __init__(
        self,
        model_name_or_path: str = 'yolov5s',
        confidence_threshold: float = 0.3,
        batch_size: int = 32,
        device: str = 'cpu',
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.model_name_or_path = model_name_or_path
        self.confidence_threshold = confidence_threshold
        self.batch_size = batch_size

        if device != 'cpu' and not device.startswith('cuda'):
            self.logger.error('Torch device not supported. Must be cpu or cuda!')
            raise RuntimeError('Torch device not supported. Must be cpu or cuda!')
        if device == 'cuda' and not torch.cuda.is_available():
            self.logger.warning(
                'You tried to use GPU but torch did not detect your'
                'GPU correctly. Defaulting to CPU. Check your CUDA installation!'
            )
            device = 'cpu'
        self.device = torch.device(device)
        self.model = self._load(self.model_name_or_path)

    @requests
    def segment(
        self, docs: Optional[DocumentArray] = None, parameters: Dict = {}, **kwargs
    ):

        if docs:
            document_batches_generator = get_docs_batch_generator(
                docs,
                traversal_path=['r'],
                batch_size=parameters.get('batch_size', self.batch_size),
                needs_attr='blob',
            )
            self._segment_docs(document_batches_generator, parameters=parameters)

    def _segment_docs(self, document_batches_generator: Iterable, parameters: Dict):
        with torch.no_grad():
            for document_batch in document_batches_generator:
                images = [d.blob for d in document_batch]
                predictions = self.model(
                    images,
                    size=640,
                    augment=False,
                ).pred

                for doc, prediction in zip(document_batch, predictions):
                    for det in prediction:
                        x1, y1, x2, y2, conf, cls = det
                        if conf < parameters.get(
                            'confidence_threshold', self.confidence_threshold
                        ):
                            continue
                        crop = doc.blob[int(y1) : int(y2), int(x1) : int(x2), :]
                        doc.chunks.append(Document(blob=crop))

    def _load(self, model_name_or_path):
        if model_name_or_path in torch.hub.list('ultralytics/yolov5'):
            return torch.hub.load(
                'ultralytics/yolov5', model_name_or_path, device=self.device
            )
        else:
            return torch.hub.load(
                'ultralytics/yolov5', 'custom', model_name_or_path, device=self.device
            )

Indexers

After developing the encoder, we will need 2 kinds of indexers:

  1. SimpleIndexer: This indexer will take care of storing chunks of images. It also should support vector similarity search which is important to match small query images against segments of original images.

  2. LMDBStorage: LMDB is a simple memory-mapped transactional key-value store. It is convenient for this example because we can use it to store the original images (so that we can retrieve them later). We will use it to create LMDBStorage which offers 2 functionalities: indexing documents and retrieving documents by ID.

SimpleIndexer

To implement SimpleIndexer, we can leverage Jina’s DocumentArrayMemmap. You can read about this data type here.

Our indexer will create an instance of DocumentArrayMemmap when it’s initialized. We want to store indexed documents inside the workspace folder that’s why we pass the workspace attribute of the executor to DocumentArrayMemmap.

To index, we implement the method index which is bound to the index flow. It’s as simple as extending the received docs to DocumentArrayMemmap instance.

On the other hand, for search, we implement the method search. We bind it to the query flow using the decorator @requests(on='/search').

In jina, searching for query documents can be done by adding the results to the matches attribute of each query document. Since docs is a DocumentArray we can use method match to match query against the indexed documents. Read more about match here. There’s another detail here. We already indexed documents before search, but we need to match query documents against chunks of the indexed images. Luckily, DocumentArray.match allows us to specify the traversal paths of the right-hand-side parameter with parameter traversal_rdarray. Since we want to match the left side docs (query) against the chunks of the right side docs (indexed docs), we can specify that traversal_rdarray=['c'].

from typing import Dict, Optional

from jina import DocumentArray, Executor, requests
from jina.types.arrays.memmap import DocumentArrayMemmap


class SimpleIndexer(Executor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self._storage = DocumentArrayMemmap(
            self.workspace, key_length=kwargs.get('key_length', 64)
        )

    @requests(on='/index')
    def index(
        self,
        docs: Optional['DocumentArray'] = None,
        **kwargs,
    ):
        if docs:
            self._storage.extend(docs)

    @requests(on='/search')
    def search(
        self,
        docs: Optional['DocumentArray'] = None,
        parameters: Optional[Dict] = None,
        **kwargs,
    ):
        if not docs:
            return

        docs.match(self._storage, traversal_rdarray=['c'])

LMDBStorage

In order to implement the LMDBStorage, we need the following parts:

I. Handler

This will be a context manager that we will use when we access our LMDB database. We will create it as a standalone class.

II. LMDBStorage constructor

The constructor should initialize a few attributes:

  • the map_size of the database

  • the default_traversal_paths. Actually we need traversal paths because we will not be traversing documents in the same way during index and query flows. During index, we want to store the root documents. However, during query,
    we need to get the matches of documents by ID.

  • the index file: again, to keep things clean, we will store the index file inside the workspace folder. Therefore we can use the workspace attribute.

III. LMDBStorage.index

In order to index documents, we first start a transaction (so that our Storage executor is ACID-compliant). Then, we traverse them according to the traversal_paths (will be root in the index Flow). Finally, each document is serialized to string and then added to the database (the key is the document ID)

IV. LMDBStorage.search

Unlike search in the SimpleIndexer, we only wish to get the matched Documents by ID and return them. Actually, the matched documents will be empty and will only contain the IDs. The goal is to return full matched documents using IDs. To accomplish this, again, we start a transaction, traverse the matched documents, get each matched document by ID and use the results to fill our documents.

import os
from typing import Dict, List

import lmdb
from jina import Document, DocumentArray, Executor, requests


class _LMDBHandler:
    def __init__(self, file, map_size):
        # see https://lmdb.readthedocs.io/en/release/#environment-class for usage
        self.file = file
        self.map_size = map_size

    @property
    def env(self):
        return self._env

    def __enter__(self):
        self._env = lmdb.Environment(
            self.file,
            map_size=self.map_size,
            subdir=False,
            readonly=False,
            metasync=True,
            sync=True,
            map_async=False,
            mode=493,
            create=True,
            readahead=True,
            writemap=False,
            meminit=True,
            max_readers=126,
            max_dbs=0,  # means only one db
            max_spare_txns=1,
            lock=True,
        )
        return self._env

    def __exit__(self, exc_type, exc_val, exc_tb):
        if hasattr(self, '_env'):
            self._env.close()


class LMDBStorage(Executor):
    def __init__(
        self,
        map_size: int = 1048576000,  # in bytes, 1000 MB
        default_traversal_paths: List[str] = ['r'],
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.map_size = map_size
        self.default_traversal_paths = default_traversal_paths
        self.file = os.path.join(self.workspace, 'db.lmdb')
        if not os.path.exists(self.workspace):
            os.makedirs(self.workspace)

    def _handler(self):
        return _LMDBHandler(self.file, self.map_size)

    @requests(on='/index')
    def index(self, docs: DocumentArray, parameters: Dict, **kwargs):
        traversal_paths = parameters.get(
            'traversal_paths', self.default_traversal_paths
        )
        if docs is None:
            return
        with self._handler() as env:
            with env.begin(write=True) as transaction:
                for d in docs.traverse_flat(traversal_paths):
                    transaction.put(d.id.encode(), d.SerializeToString())

    @requests(on='/search')
    def search(self, docs: DocumentArray, parameters: Dict, **kwargs):
        traversal_paths = parameters.get(
            'traversal_paths', self.default_traversal_paths
        )
        if docs is None:
            return
        docs_to_get = docs.traverse_flat(traversal_paths)
        with self._handler() as env:
            with env.begin(write=True) as transaction:
                for i, d in enumerate(docs_to_get):
                    id = d.id
                    serialized_doc = Document(transaction.get(d.id.encode()))
                    d.update(serialized_doc)
                    d.id = id

SimpleRanker

You might think why do we need a ranker at all ?

Actually, a ranker is needed because we will be matching small query images against chunks of parent documents. But how can we get back to parent documents (aka full images) given the chunks ? And what if 2 chunks belonging to the same parent are matched ? We can solve this by aggregating the similarity scores of chunks that belong to the same parent (using an aggregation method, in our case, will be the min value). So, for each query document, we perform the following:

  1. We create an empty collection of parent scores. This collection will store, for each parent, a list of scores of its chunk documents.

  2. For each match, since it’s a chunk document, we can retrieve its parent_id. And it’s also a match document so we get its match score and add that value to the parent scores collection.

  3. After processing all matches, we need to aggregate the scores of each parent using the min metric.

  4. Finally, using the aggregated score values of parents, we can create a new list of matches (this time consisting of parents, not chunks). We also need to sort the matches list by aggregated scores.

When query documents exit the SimpleRanker, they now have matches consisting of parent documents. However, parent documents just have IDs. That’s why, during the previous steps, we created LMDBStorage: to actually retrieve parent documents by IDs and fill them with data.

from collections import defaultdict
from typing import Dict, Iterable, Optional

from jina import Document, DocumentArray, Executor, requests


class SimpleRanker(Executor):
    def __init__(
        self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metric = 'cosine'

    @requests(on='/search')
    def rank(
        self, docs: Optional[DocumentArray] = None, parameters: Dict = {}, **kwargs
    ):
        if docs is None:
            return

        for doc in docs:
            parents_scores = defaultdict(list)
            for m in DocumentArray([doc]).traverse_flat(['m']):
                parents_scores[m.parent_id].append(m.scores[self.metric].value)

            # Aggregate match scores for parent document and
            # create doc's match based on parent document of matched chunks
            new_matches = []
            for match_parent_id, scores in parents_scores.items():
                score = min(scores)

                new_matches.append(
                    Document(id=match_parent_id, scores={self.metric: score})
                )

            # Sort the matches
            doc.matches = new_matches
            doc.matches.sort(key=lambda d: d.scores[self.metric].value)

Building Flows

Indexing

Now, after creating executors, it’s time to use them in order to build an index Flow and index our data.

Building the index Flow

We create a Flow object and add executors one after the other with the right parameters:

  1. YoloV5Segmenter: We should also specify the device

  2. CLIPImageEncoder: It also receives the device parameter. And since we only encode the chunks, we specify 'traversal_paths': ['c']

  3. SimpleIndexer: We need to specify the workspace parameter

  4. LMDBStorage: We also need to specify the workspace parameter. Furthermore, the executor can run in parallel to the other branch. We can achieve this using needs='gateway'. Finally, we set default_traversal_paths to ['r']

  5. A final executor which just waits for both branches.

After building the index Flow, we can plot it to verify that we’re using the correct architecture.

from jina import Flow
index_flow = Flow().add(uses=YoloV5Segmenter, name='segmenter', uses_with={'device': device}) \
  .add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': ['c']}) \
  .add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
  .add(uses=LMDBStorage, name='root_indexer', workspace='workspace', needs='gateway', uses_with={'default_traversal_paths': ['r']}) \
  .add(name='wait_both', needs=['root_indexer', 'chunks_indexer'])
index_flow.plot()
../../../_images/index_flow.svg

Now it’s time to index the dataset that we have downloaded. Actually, we will index images inside the images folder. This helper function will convert image files into Jina Documents and yield them:

from glob import glob
from jina import Document

def input_generator():
    for filename in glob('images/*.jpg'):
        doc = Document(uri=filename, tags={'filename': filename})
        doc.load_uri_to_image_blob()
        yield doc

The final step in this section is to send the input documents to the index Flow. Note that indexing can take a while:

  with index_flow:
      input_docs = input_generator()
      index_flow.post(on='/index', inputs=input_docs, show_progress=True)
Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
⠏ 4/6 waiting segmenter encoder to be ready...YOLOv5 🚀 2021-10-29 torch 1.9.0+cu111 CPU

⠋ 4/6 waiting segmenter encoder to be ready...Fusing layers... 
⠼ 4/6 waiting segmenter encoder to be ready...Model Summary: 213 layers, 7225885 parameters, 0 gradients
Adding AutoShape... 
           [email protected][I]:🎉 Flow is ready to use!
	🔗 Protocol: 		GRPC
	🏠 Local access:	0.0.0.0:44619
	🔒 Private network:	172.28.0.2:44619
	🌐 Public address:	34.73.118.227:44619
⠦       DONE ━━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:01:11  0.0 step/s 2 steps done in 1 minute and 11 seconds

Searching:

Now, let’s build the search Flow and use it in order to find sample query images.

Our Flow contains the following executors:

  1. CLIPImageEncoder: It receives the device parameter. This time, since we want to encode root query documents, we specify that 'traversal_paths': ['r']

  2. SimpleIndexer: We need to specify the workspace parameter

  3. SimpleRanker

  4. LMDBStorage: First we specify the workspace parameter. Then we need to use different traversal paths. This time we will be traversing matches: 'default_traversal_paths': ['m']

from jina import Flow
device = 'cpu'
query_flow = Flow().add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': ['r']}) \
  .add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
  .add(uses=SimpleRanker, name='ranker') \
  .add(uses=LMDBStorage, workspace='workspace', name='root_indexer', uses_with={'default_traversal_paths': ['m']})

Let’s plot our Flow

query_flow.plot()
../../../_images/query_flow.svg

Finally, we can start querying. We will use images inside the query folder. For each image, we will create a Jina Document. Then we send our documents to the query Flow and receive the response.

For each query document, we can print the image and its top 3 search results

import glob
with query_flow:
    docs = [Document(uri=filename) for filename in glob.glob('query/*.jpg')]
    for doc in docs:
        doc.load_uri_to_image_blob()
    resp = query_flow.post('/search', docs, return_results=True)
for doc in resp[0].docs:
    print('query:')
    plt.imshow(doc.blob)
    plt.show()
    print('results:')
    show_docs(doc.matches)

Sample results:

query:
../../../_images/query.png
results:
../../../_images/result_1.png
../../../_images/result_2.png
../../../_images/result_3.png

Congratulations !

The approach that we’ve adopted could effectively match the small bird image against bigger images containing birds.

Again, the full source code of this tutorial is available in this google colab notebook.

Feel free to try it !