Dataset — Core Search Engine

sayt2.dataset is the only module that imports tantivy. It integrates field definitions, caching, and cross-process locking into a single high-level DataSet object that handles the full index-build-search lifecycle.

Module-level functions

The module exposes a set of stateless functions that operate on a tantivy Index directly. DataSet composes them internally, but they are also usable standalone for advanced use cases.

Schema construction

def build_schema(
    fields: list[BaseField],
) -> tuple[tantivy.Schema, dict[str, tantivy.TextAnalyzer]]:
    """
    Convert a list of field definitions into a tantivy ``Schema`` and a dict
    of custom tokenizers that must be registered on the ``Index``.

    Returns ``(schema, analyzers)`` where *analyzers* maps tokenizer name →
    ``TextAnalyzer``.
    """
    sb = SchemaBuilder()
    analyzers: dict[str, tantivy.TextAnalyzer] = {}

    for f in fields:
        if isinstance(f, StoredField):
            # stored-only: use raw tokenizer, indexed=False would be ideal
            # but tantivy text_field must be indexed; use raw + stored
            sb.add_text_field(f.name, stored=True, tokenizer_name="raw")

        elif isinstance(f, KeywordField):
            sb.add_text_field(f.name, stored=f.stored, tokenizer_name="raw")

        elif isinstance(f, TextField):
            sb.add_text_field(f.name, stored=f.stored, tokenizer_name=f.tokenizer)

        elif isinstance(f, NgramField):
            tok_name = _ngram_tokenizer_name(f)
            if tok_name not in analyzers:
                analyzers[tok_name] = _build_ngram_analyzer(f)
            sb.add_text_field(f.name, stored=f.stored, tokenizer_name=tok_name)

        elif isinstance(f, NumericField):
            add_fn = {
                NumericKindEnum.I64.value: sb.add_integer_field,
                NumericKindEnum.U64.value: sb.add_unsigned_field,
                NumericKindEnum.F64.value: sb.add_float_field,
            }[f.kind]
            add_fn(f.name, stored=f.stored, indexed=f.indexed, fast=f.fast)

        elif isinstance(f, DatetimeField):
            sb.add_date_field(f.name, stored=f.stored, indexed=f.indexed, fast=f.fast)

        elif isinstance(f, BooleanField):
            sb.add_boolean_field(f.name, stored=f.stored, indexed=f.indexed)

    return sb.build(), analyzers

build_schema() walks the field list and maps each BaseField subclass to the corresponding tantivy SchemaBuilder call. For NgramField, it also creates a custom TextAnalyzer via the helper below:

def _ngram_tokenizer_name(f: NgramField) -> str:
    """Deterministic name for a custom ngram tokenizer so it can be re-registered."""
    return f"__sayt2_ngram_{f.min_gram}_{f.max_gram}_{int(f.prefix_only)}_{int(f.lowercase)}"
def _build_ngram_analyzer(f: NgramField) -> tantivy.TextAnalyzer:
    builder = TextAnalyzerBuilder(
        Tokenizer.ngram(
            min_gram=f.min_gram, max_gram=f.max_gram, prefix_only=f.prefix_only
        )
    )
    if f.lowercase:
        builder = builder.filter(Filter.lowercase())
    return builder.build()

Each NgramField gets a deterministic tokenizer name derived from its parameters. This ensures that two fields with different gram ranges get separate tokenizers while identical configurations share one.

Index opening

def open_index(
    dir_index: Path,
    fields: list[BaseField],
) -> Index:
    """
    Open (or create) a tantivy ``Index`` at *dir_index* and register all
    required custom tokenizers.

    Tantivy does **not** persist tokenizer configuration — only the inverted
    index data.  So every ``Index.open()`` / ``Index(schema, path=...)`` must
    be followed by ``register_tokenizer`` calls.
    """
    dir_index.mkdir(parents=True, exist_ok=True)
    schema, analyzers = build_schema(fields)
    index = Index(schema, path=str(dir_index))
    for name, analyzer in analyzers.items():
        index.register_tokenizer(name, analyzer)
    return index

tantivy does not persist tokenizer configuration — only the inverted index data. Every Index open must be followed by register_tokenizer calls, which open_index() handles automatically.

Document writing

def write_documents(
    index: Index,
    data: T.Iterable[dict[str, T.Any]],
    memory_budget_bytes: int = 128_000_000,
    num_threads: int | None = None,
) -> int:
    """
    Write *data* into *index*.

    :param data: Iterable of dicts, each dict is one document whose keys
        match the field names in the schema.
    :param memory_budget_bytes: Heap budget for the index writer.
    :param num_threads: Number of indexing threads (``None`` = tantivy default).
    :returns: Number of documents written.
    """
    writer_kwargs: dict[str, T.Any] = {"heap_size": memory_budget_bytes}
    if num_threads is not None:
        writer_kwargs["num_threads"] = num_threads

    writer = index.writer(**writer_kwargs)
    count = 0
    for doc in data:
        writer.add_document(tantivy.Document(**doc))
        count += 1
    writer.commit()
    writer.wait_merging_threads()
    index.reload()
    return count

Documents are written through tantivy’s IndexWriter. After committing, the writer waits for background merge threads to finish, then reloads the index so that subsequent searches see the new data.

Query execution

Data model

SortKey

class SortKey(BaseModel):
    """One element of a multi-field sort specification."""

    name: str
    descending: bool = True

SortKey is a simple pydantic model specifying a field name and sort direction. Pass a list of these to sort for multi-field sorting.

Hit

@dataclass(frozen=True)
class Hit:
    """A single search hit with source document and relevance score."""

    source: dict[str, T.Any]
    score: float

Hit is a frozen dataclass representing a single search result. Key fields:

  • source — dict of stored document fields (modelled after Elasticsearch’s _source).

  • score — BM25 relevance score.

SearchResult

@dataclass(frozen=True)
class SearchResult:
    """Immutable search result returned by :meth:`DataSet.search`."""

    hits: list[Hit]
    size: int
    took_ms: int
    fresh: bool
    cache: bool

    def to_json(self) -> str:  # pragma: no cover
        return json.dumps(asdict(self), indent=2, ensure_ascii=False)

    def jprint(self):  # pragma: no cover
        print(self.to_json())

SearchResult is a frozen dataclass — immutable after creation. Key fields:

  • hits — list of Hit objects.

  • size — number of hits returned.

  • took_ms — wall-clock time for the full search flow.

  • freshTrue if this search triggered a data refresh.

  • cacheTrue if the result was served from L2 cache.

DataSet class

class DataSet(BaseModel):
    """
    High-level search dataset that integrates index building, caching,
    cross-process locking, and query execution into a single object.

    :param dir_root: Root directory; index, cache, and tracker DB are stored
        inside sub-directories of this path.
    :param name: Logical name (e.g. ``"books"``).  Used as the tracker lock
        key and cache namespace.
    :param fields: Field definitions that determine the tantivy schema.
    :param downloader: Optional callable that returns an iterable of document
        dicts.  Called when the data is stale or on first search.
    :param cache_expire: Seconds before L1 cache expires (``None`` = never).
    :param sort: Optional multi-field sort specification.
    :param memory_budget_bytes: Heap budget for the tantivy index writer.
    :param num_threads: Number of indexing threads (``None`` = tantivy default).
    :param lock_expire: Seconds before the tracker lock expires.
    """

    dir_root: Path
    name: str
    fields: list[T_Field]  # type: ignore[type-arg]

    downloader: Callable[[], T.Iterable[dict[str, T.Any]]] | None = None
    cache_expire: int | None = None
    sort: list[SortKey] | None = None

    memory_budget_bytes: int = 128_000_000
    num_threads: int | None = None
    lock_expire: int = 60

    _cache_instance: DataSetCache | None = PrivateAttr(default=None)

    # -- derived paths --------------------------------------------------------

    @property
    def _dir_index(self) -> Path:
        return self.dir_root / self.name / f"index-{self._schema_hash}"

    @property
    def _dir_cache(self) -> Path:
        return self.dir_root / self.name / "cache"

    @property
    def _db_tracker(self) -> Path:
        return self.dir_root / "tracker.db"

    @property
    def _schema_hash(self) -> str:
        return fields_schema_hash(self.fields)

    # -- internal helpers -----------------------------------------------------

    def _open_index(self) -> Index:
        return open_index(self._dir_index, self.fields)

    @property
    def _cache(self) -> DataSetCache:
        if self._cache_instance is None:
            self._cache_instance = DataSetCache(
                self._dir_cache,
                self.name,
                self._schema_hash,
                expire=self.cache_expire,
            )
        return self._cache_instance

    def _get_tracker(self) -> Tracker:
        return Tracker(self._db_tracker)

    # -- lifecycle ------------------------------------------------------------

    def close(self) -> None:
        """Close the underlying cache (sqlite3 connection).

        Safe to call multiple times.  After closing, the next
        :meth:`search` or :meth:`build_index` call will lazily
        re-open the cache.
        """
        if self._cache_instance is not None:
            self._cache_instance.close()
            self._cache_instance = None

    def __enter__(self) -> DataSet:
        return self

    def __exit__(self, *exc: object) -> None:
        self.close()

    # -- public API -----------------------------------------------------------

    def build_index(
        self,
        data: T.Iterable[dict[str, T.Any]] | None = None,
    ) -> int:
        """
        Build (or rebuild) the index with tracker lock protection.

        If *data* is ``None``, the :attr:`downloader` is called.  Raises
        ``ValueError`` if both are ``None``.

        :returns: Number of documents indexed.
        """
        if data is None:
            if self.downloader is None:
                raise ValueError("Either data or downloader must be provided")
            data = self.downloader()

        tracker = self._get_tracker()
        cache = self._cache
        with tracker.lock(self.name, expire=self.lock_expire):
            cache.evict_all()
            shutil.rmtree(self._dir_index, ignore_errors=True)
            index = self._open_index()
            count = write_documents(
                index,
                data,
                memory_budget_bytes=self.memory_budget_bytes,
                num_threads=self.num_threads,
            )
            cache.mark_fresh()
        return count

    def search(
        self,
        query: str,
        limit: int = 20,
        refresh: bool = False,
    ) -> SearchResult:
        """
        Full search flow:

        1. Check L1 freshness (or ``refresh=True`` forces rebuild).
        2. If stale, call :meth:`build_index` with :attr:`downloader`.
        3. Check L2 query cache.
        4. On cache miss, execute the query, apply sorting, cache the result.

        :param query: Query string.
        :param limit: Maximum number of hits.
        :param refresh: Force a data refresh even if the cache is fresh.
        """
        t0 = time.monotonic()
        cache = self._cache
        fresh = False

        # Step 1-2: ensure data is fresh
        if refresh or not cache.is_fresh():
            if self.downloader is None:
                raise ValueError("Data is stale but no downloader is configured")
            self.build_index()
            fresh = True

        # Step 3: check query cache
        cached = cache.get_query_result(query, limit)
        if cached is not None:
            return cached

        # Step 4: execute query
        index = self._open_index()
        if self.sort:
            hits = search_index_sorted(
                index,
                self.fields,
                query,
                sort_keys=self.sort,
                limit=limit,
            )
        else:
            hits = search_index(index, self.fields, query, limit=limit)

        took_ms = int((time.monotonic() - t0) * 1000)
        response = SearchResult(
            hits=hits,
            size=len(hits),
            took_ms=took_ms,
            fresh=fresh,
            cache=False,
        )

        # cache with cache=True so subsequent reads get the cached flag
        cached_response = SearchResult(
            hits=hits,
            size=len(hits),
            took_ms=took_ms,
            fresh=fresh,
            cache=True,
        )
        cache.set_query_result(query, limit, cached_response)
        return response

DataSet is the primary user-facing class. It orchestrates three subsystems:

  • Tracker — ensures only one process rebuilds the index at a time.

  • DataSetCache — avoids redundant rebuilds and repeated queries.

  • tantivy Index — the actual search engine.

All state (index files, cache, tracker DB) lives under dir_root:

dir_root/
├── tracker.db                    ← shared across datasets
└── {name}/
    ├── index-{schema_hash}/      ← tantivy index files
    └── cache/                    ← diskcache files

Resource lifecycle

DataSet lazily opens a DataSetCache (backed by diskcache.Cache, which holds a sqlite3 connection). The connection is reused across calls to search() and build_index() — it is not closed automatically after each call.

Three ways to manage the lifecycle:

# 1. Context manager (recommended)
with DataSet(dir_root=..., name="books", fields=..., downloader=dl) as ds:
    r1 = ds.search("python")
    r2 = ds.search("rust")
# cache closed automatically on __exit__

# 2. Explicit close
ds = DataSet(...)
ds.search("python")
ds.close()          # safe to call multiple times

# 3. One-off script (GC will reclaim eventually)
ds = DataSet(...)
ds.search("python")

After close(), the DataSet can still be used — the next call lazily re-opens the cache.

Tracker connections are not held open; each lock_it / unlock_it call opens and closes its own sqlite3 connection, so the tracker needs no explicit lifecycle management.

build_index

build_index() acquires a tracker lock, evicts all caches, writes documents, and marks the data as fresh:

lock(name) → evict_all() → open_index() → write_documents() → mark_fresh() → unlock

If data is None, the downloader callable is invoked to fetch data.