How to Prevent Duplicate Indexing

Summary

When indexing documents, it is common to have duplicate documents received by the search system. This document will show you how to prevent indexing duplicates in Jina.

Feature Description

One can either remove the duplicates before sending the duplicates to Jina or leave it to Jina for handling the duplicates.

Before you start

Make sure you are familiar with Executor YAML Sytax.

Implementation

When shards == 1

If you are not using multiple shards, the built-in executor UniqueVectorIndexer and UniquePbIndexer can be used directly for keeping the unique indexed Documents. In the following codes, there are four Documents (including two unique ones) feeded to the Flow and only two unique ones are indexed.

demo_prevent_duplicate.py
from jina import Flow, Document
import numpy as np

docs = [Document(text=f'doc{idx}', embedding=np.random.rand(10)) for idx in range(2)]

def assert_only_two_docs_indexed(rsp):
    assert len(rsp.index.docs) == 2

f = Flow().add(uses='unique_vec_idx.yml')

with f:
    f.index(docs + docs, on_done=lambda x: assert_only_two_docs_indexed(x))
unique_vec_idx.yml
!UniqueVectorIndexer
components:
  - !DocCache
    metas:
      name: doc_cache
  - !NumpyIndexer
    metas:
      name: numpy_indexer

When shards > 1

When shards > 1, one needs to add _unique for the uses_before option. The reason is that different Peas might be running of different machines and therefore we need to check whether Document exists or not before sending the Document to one for the shards. In the following example, there are 40 Documents (including 20 unique ones) indexed by the Flow. We use on_done argument to check how many Documents are indexed. We use two shards in this example.

demo_prevent_duplicate_multishards.py
from jina import Flow, Document
import numpy as np

docs = [Document(text=f'doc{idx}', embedding=np.random.rand(10)) for idx in range(20)]

def assert_only_twenty_docs_indexed(rsp):
    assert len(rsp.index.docs) == 20

f = Flow().add(
    uses='NumpyIndexer', uses_before='_unique', shards=2)

with f:
    f.index(docs + docs, on_done=lambda x: assert_only_twenty_docs_indexed(x))

Under the hood, the configuration yaml file, jina/resources/executors._unique.yml, is used. The yaml file is defined as below

jina/resources/executors._unique.yml
!DocCache
with:
  index_path: unique.tmp
metas:
  name: unique
requests:
  on:
    [SearchRequest, TrainRequest, IndexRequest, DeleteRequest, UpdateRequest, ControlRequest]:
      - !RouteDriver {}
    IndexRequest:
      - !TaggingCacheDriver
        with:
          tags:
            is_indexed: true
      - !FilterQL
        with:
          lookups: {tags__is_indexed__neq: true}

jina.executors.indexers.cache.DocCache uses document ID to detect the duplicates. The documents with the same ID are considered as the same one. jina.drivers.cache.TaggingCacheDriver keep a set of the indexed keys and check against the cache for a hit. If the document id exists, jina.drivers.cache.TaggingCacheDriver sets the customized keys in the tags field to the predefined value. In the above configuration, is_indexed in the tags field is set to true when the document id hit the cached indexed keys. Afterwards, jina.drivers.querylang.filter.FilterQL is used to filter out the duplicate documents from the request.

Limitations

Be careful when using _unique keyword as a cache executor, it will not set any workspace where the data is stored. By default, it uses the folder where it runs as workspace, which may not be where the actual indexers store their data. If you want to store the cache in a specific workspace while keeping the same functionality,

you need to define unique_customized.yml as below to set the desired workspace under metas.

unique_customized.yml
!DocCache
with:
  index_path: cache.tmp
metas:
  name: cache
  workspace: $WORKSPACE
  ...