Source code for jina.types.document.generators

import csv
import glob
import itertools
import json
import os
import random
from typing import Optional, Generator, Union, List, Iterable, Dict, TYPE_CHECKING

import numpy as np

if TYPE_CHECKING:
    from . import Document


[docs]def from_ndarray( array: 'np.ndarray', axis: int = 0, size: Optional[int] = None, shuffle: bool = False, ) -> Generator['Document', None, None]: """Create a generator for a given dimension of a numpy array. :param array: the numpy ndarray data source :param axis: iterate over that axis :param size: the maximum number of the sub arrays :param shuffle: shuffle the numpy data source beforehand :yield: ndarray """ from ..document import Document if shuffle: # shuffle for random query array = np.take(array, np.random.permutation(array.shape[0]), axis=axis) d = 0 for r in array: yield Document(content=r) d += 1 if size is not None and d >= size: break
[docs]def from_files( patterns: Union[str, List[str]], recursive: bool = True, size: Optional[int] = None, sampling_rate: Optional[float] = None, read_mode: Optional[str] = None, ) -> Generator['Document', None, None]: """Creates an iterator over a list of file path or the content of the files. :param patterns: The pattern may contain simple shell-style wildcards, e.g. '\*.py', '[\*.zip, \*.gz]' :param recursive: If recursive is true, the pattern '**' will match any files and zero or more directories and subdirectories :param size: the maximum number of the files :param sampling_rate: the sampling rate between [0, 1] :param read_mode: specifies the mode in which the file is opened. 'r' for reading in text mode, 'rb' for reading in binary mode. If `read_mode` is None, will iterate over filenames. :yield: file paths or binary content .. note:: This function should not be directly used, use :meth:`Flow.index_files`, :meth:`Flow.search_files` instead """ from ..document import Document if read_mode not in {'r', 'rb', None}: raise RuntimeError(f'read_mode should be "r", "rb" or None, got {read_mode}') def _iter_file_exts(ps): return itertools.chain.from_iterable( glob.iglob(p, recursive=recursive) for p in ps ) d = 0 if isinstance(patterns, str): patterns = [patterns] for g in _iter_file_exts(patterns): if sampling_rate is None or random.random() < sampling_rate: if read_mode is None: yield Document(uri=g) elif read_mode in {'r', 'rb'}: with open(g, read_mode) as fp: yield Document(content=fp.read(), uri=g) d += 1 if size is not None and d >= size: break
[docs]def from_csv( fp: Iterable[str], field_resolver: Optional[Dict[str, str]] = None, size: Optional[int] = None, sampling_rate: Optional[float] = None, ) -> Generator['Document', None, None]: """Generator function for CSV. Yields documents. :param fp: file paths :param field_resolver: a map from field names defined in ``document`` (JSON, dict) to the field names defined in Protobuf. This is only used when the given ``document`` is a JSON string or a Python dict. :param size: the maximum number of the documents :param sampling_rate: the sampling rate between [0, 1] :yield: documents """ from ..document import Document lines = csv.DictReader(fp) for value in _subsample(lines, size, sampling_rate): if 'groundtruth' in value and 'document' in value: yield Document(value['document'], field_resolver), Document( value['groundtruth'], field_resolver ) else: yield Document(value, field_resolver)
[docs]def from_ndjson( fp: Iterable[str], field_resolver: Optional[Dict[str, str]] = None, size: Optional[int] = None, sampling_rate: Optional[float] = None, ) -> Generator['Document', None, None]: """Generator function for line separated JSON. Yields documents. :param fp: file paths :param field_resolver: a map from field names defined in ``document`` (JSON, dict) to the field names defined in Protobuf. This is only used when the given ``document`` is a JSON string or a Python dict. :param size: the maximum number of the documents :param sampling_rate: the sampling rate between [0, 1] :yield: documents """ from ..document import Document for line in _subsample(fp, size, sampling_rate): value = json.loads(line) if 'groundtruth' in value and 'document' in value: yield Document(value['document'], field_resolver), Document( value['groundtruth'], field_resolver ) else: yield Document(value, field_resolver)
[docs]def from_lines( lines: Optional[Iterable[str]] = None, filepath: Optional[str] = None, read_mode: str = 'r', line_format: str = 'json', field_resolver: Optional[Dict[str, str]] = None, size: Optional[int] = None, sampling_rate: Optional[float] = None, ) -> Generator['Document', None, None]: """Generator function for lines, json and csv. Yields documents or strings. :param lines: a list of strings, each is considered as a document :param filepath: a text file that each line contains a document :param read_mode: specifies the mode in which the file is opened. 'r' for reading in text mode, 'rb' for reading in binary :param line_format: the format of each line ``json`` or ``csv`` :param field_resolver: a map from field names defined in ``document`` (JSON, dict) to the field names defined in Protobuf. This is only used when the given ``document`` is a JSON string or a Python dict. :param size: the maximum number of the documents :param sampling_rate: the sampling rate between [0, 1] :yield: documents """ if filepath: file_type = os.path.splitext(filepath)[1] with open(filepath, read_mode) as f: if file_type in _jsonl_ext: yield from from_ndjson(f, field_resolver, size, sampling_rate) elif file_type in _csv_ext: yield from from_csv(f, field_resolver, size, sampling_rate) else: yield from _subsample(f, size, sampling_rate) elif lines: if line_format == 'json': yield from from_ndjson(lines, field_resolver, size, sampling_rate) elif line_format == 'csv': yield from from_csv(lines, field_resolver, size, sampling_rate) else: yield from _subsample(lines, size, sampling_rate) else: raise ValueError('"filepath" and "lines" can not be both empty')
# https://github.com/ndjson/ndjson.github.io/issues/1#issuecomment-109935996 _jsonl_ext = {'.jsonlines', '.ndjson', '.jsonl', '.jl', '.ldjson'} _csv_ext = {'.csv', '.tcsv'} def _sample(iterable, sampling_rate: Optional[float] = None): for i in iterable: if sampling_rate is None or random.random() < sampling_rate: yield i def _subsample( iterable, size: Optional[int] = None, sampling_rate: Optional[float] = None ): yield from itertools.islice(_sample(iterable, sampling_rate), size)