"""Indexer for caching."""
import pickle
import tempfile
from typing import Optional, Iterable, List, Tuple, Union
from jina.executors.indexers import BaseKVIndexer
from jina.helper import deprecated_alias
DATA_FIELD = 'data'
ID_KEY = 'id'
CONTENT_HASH_KEY = 'content_hash'
[docs]class BaseCache(BaseKVIndexer):
"""Base class of the cache inherited :class:`BaseKVIndexer`.
The difference between a cache and a :class:`BaseKVIndexer` is the ``handler_mutex`` is released in cache,
this allows one to query-while-indexing.
:param args: additional positional arguments which are just used for the parent initialization
:param kwargs: additional key value arguments which are just used for the parent initialization
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] def post_init(self):
"""For Cache we need to release the handler mutex to allow RW at the same time."""
self.handler_mutex = False
[docs]class DocCache(BaseCache):
"""A key-value indexer that specializes in caching.
Serializes the cache to two files, one for ids, one for the actually cached field.
If fields=["id"], then the second file is redundant. The class optimizes the process
so that there are no duplicates.
Order of fields does NOT affect the caching.
:param index_filename: file name for storing the cache data
:param fields: fields to cache on (of Document)
:param args: additional positional arguments which are just used for the parent initialization
:param kwargs: additional key value arguments which are just used for the parent initialization
"""
[docs] class CacheHandler:
"""A handler for loading and serializing the in-memory cache of the DocCache.
:param path: Path to the file from which to build the actual paths.
:param logger: Instance of logger.
"""
def __init__(self, path, logger):
self.path = path
try:
self.id_to_cache_val = pickle.load(open(path + '.ids', 'rb'))
self.cache_val_to_id = pickle.load(open(path + '.cache', 'rb'))
except FileNotFoundError as e:
logger.warning(
f'File path did not exist : {path}.ids or {path}.cache: {e!r}. Creating new CacheHandler...'
)
self.id_to_cache_val = dict()
self.cache_val_to_id = dict()
[docs] def close(self):
"""Flushes the in-memory cache to pickle files."""
pickle.dump(self.id_to_cache_val, open(self.path + '.ids', 'wb'))
pickle.dump(self.cache_val_to_id, open(self.path + '.cache', 'wb'))
default_fields = (ID_KEY,)
@deprecated_alias(field=('fields', 0))
def __init__(
self,
index_filename: Optional[str] = None,
fields: Optional[
Union[str, Tuple[str]]
] = None, # str for backwards compatibility
*args,
**kwargs,
):
if not index_filename:
# create a new temp file if not exist
index_filename = tempfile.NamedTemporaryFile(delete=False).name
super().__init__(index_filename, *args, **kwargs)
if isinstance(fields, str):
fields = (fields,)
# order shouldn't matter
self.fields = sorted(fields or self.default_fields)
[docs] def add(
self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs
) -> None:
"""Add a document to the cache depending.
:param keys: document ids to be added
:param values: document cache values to be added
:param args: not used
:param kwargs: not used
"""
for key, value in zip(keys, values):
self.query_handler.id_to_cache_val[key] = value
self.query_handler.cache_val_to_id[value] = key
self._size += 1
[docs] def query(self, key: str, *args, **kwargs) -> bool:
"""Check whether the data exists in the cache.
:param key: the value that we cached by (combination of the Document fields)
:param args: not used
:param kwargs: not used
:return: status
"""
return key in self.query_handler.cache_val_to_id
[docs] def update(
self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs
) -> None:
"""Update cached documents.
:param keys: list of Document.id
:param values: list of values (combination of the Document fields)
:param args: not used
:param kwargs: not used
"""
if len(self.fields) == 1 and self.fields[0] == ID_KEY:
# if we don't cache anything else, no need
return
for key, value in zip(keys, values):
if key not in self.query_handler.id_to_cache_val:
continue
old_value = self.query_handler.id_to_cache_val[key]
self.query_handler.id_to_cache_val[key] = value
del self.query_handler.cache_val_to_id[old_value]
self.query_handler.cache_val_to_id[value] = key
[docs] def delete(self, keys: Iterable[str], *args, **kwargs) -> None:
"""Delete documents from the cache.
:param keys: list of Document.id
:param args: not used
:param kwargs: not used
"""
for key in keys:
if key not in self.query_handler.id_to_cache_val:
continue
value = self.query_handler.id_to_cache_val[key]
del self.query_handler.id_to_cache_val[key]
del self.query_handler.cache_val_to_id[value]
self._size -= 1
[docs] def get_add_handler(self):
"""Get the CacheHandler.
.. # noqa: DAR201"""
return self.get_query_handler()
[docs] def get_query_handler(self) -> CacheHandler:
"""Get the CacheHandler.
.. # noqa: DAR201"""
return self.CacheHandler(self.save_abspath, self.logger)
[docs] def get_create_handler(self):
"""Get the CacheHandler.
.. # noqa: DAR201"""
return self.get_query_handler()