Tracker — Cross-Process Locking¶
sayt2.tracker provides a SQLite-backed lock manager that prevents
concurrent index builds from corrupting each other. A single .db file can
manage locks for multiple datasets (one row per dataset name).
Why SQLite?¶
Index building is a write-heavy operation. If two processes try to rebuild the same index simultaneously, the result is undefined. A lock ensures only one writer proceeds at a time.
SQLite is the ideal back-end for this lock because:
Atomic — the lock check and acquisition happen in a single SQL statement (
INSERT ... ON CONFLICT DO UPDATE ... WHERE). There is no gap between “is it free?” and “I’ll take it” where another process could slip in.Cross-process — SQLite’s built-in file locking works reliably across processes on every major OS.
Zero-dependency —
sqlite3ships with the Python standard library.Multi-dataset — one
.dbfile holds a row per dataset, avoiding the file-per-lock sprawl of file-based locking.
Table schema¶
from .exc import TrackerIsLockedError
_CREATE_TABLE_SQL = """\
CREATE TABLE IF NOT EXISTS locks (
name TEXT PRIMARY KEY,
lock TEXT,
lock_at TEXT,
expire_at TEXT
Four columns:
name— the dataset identifier (primary key).lock—NULLwhen unlocked; a UUID hex string when locked.lock_at— ISO-8601 timestamp of when the lock was acquired.expire_at— ISO-8601 timestamp of when the lock expires. Used for automatic dead-lock recovery if a process crashes while holding the lock.
The UPSERT — atomic lock acquisition¶
INSERT INTO locks (name, lock, lock_at, expire_at) VALUES (?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
lock = excluded.lock,
lock_at = excluded.lock_at,
expire_at = excluded.expire_at
WHERE locks.lock IS NULL OR locks.expire_at < ?
This single SQL statement handles three cases:
Scenario |
What happens |
|
|---|---|---|
Row does not exist |
|
|
Row exists, unlocked or expired |
|
|
Row exists, actively locked |
|
|
Because the check and the mutation are a single statement executed inside an implicit SQLite transaction, there is zero race window.
Tracker class¶
class Tracker:
"""
SQLite-backed cross-process lock manager.
A single ``.db`` file can manage locks for multiple datasets (one row per
dataset name). The table and parent directories are created lazily on
first use.
:param db_path: Path to the SQLite database file.
"""
def __init__(self, db_path: Path):
self._db_path = Path(db_path)
self._db_path.parent.mkdir(parents=True, exist_ok=True)
def _get_conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(
str(self._db_path),
timeout=10.0,
)
conn.execute("PRAGMA journal_mode=WAL")
return conn
@staticmethod
def _ensure_table(conn: sqlite3.Connection) -> None:
conn.execute(_CREATE_TABLE_SQL)
def _execute(
self, conn: sqlite3.Connection, sql: str, params: tuple = ()
) -> sqlite3.Cursor:
"""Execute *sql*; if the table does not exist yet, create it and retry."""
try:
return conn.execute(sql, params)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
self._ensure_table(conn)
return conn.execute(sql, params)
raise
def lock_it(self, name: str, expire: int = 60) -> str:
"""
Atomically acquire a lock for *name* using a single UPSERT.
- ``rowcount == 1`` → lock acquired (new row or unlocked/expired row).
- ``rowcount == 0`` → lock is actively held → :exc:`TrackerIsLockedError`.
:returns: The lock token (UUID hex).
:raises TrackerIsLockedError: if the lock is held by another process.
"""
lock_token = uuid.uuid4().hex
now = _utcnow()
now_iso = now.isoformat()
expire_at_iso = (now + timedelta(seconds=expire)).isoformat()
conn = self._get_conn()
try:
with conn:
cursor = self._execute(
conn,
_ACQUIRE_SQL,
(name, lock_token, now_iso, expire_at_iso, now_iso),
)
if cursor.rowcount == 1:
return lock_token
raise TrackerIsLockedError(
f"Lock {name!r} is held by another process"
)
finally:
conn.close()
def unlock_it(self, name: str, lock_token: str) -> None:
"""
Release the lock for *name*, but only if *lock_token* matches.
If the token does not match (e.g. the lock expired and was re-acquired
by another process), this is a silent no-op — the caller's lock is
already gone.
"""
conn = self._get_conn()
try:
with conn:
self._execute(conn, _RELEASE_SQL, (name, lock_token))
finally:
conn.close()
@contextmanager
def lock(self, name: str, expire: int = 60):
"""
Context manager that acquires the lock on entry and guarantees release
on exit (even if an exception is raised).
Usage::
tracker = Tracker(db_path)
with tracker.lock("books", expire=60):
# build index ...
"""
lock_token = self.lock_it(name, expire)
try:
yield
finally:
self.unlock_it(name, lock_token)
Key methods:
lock_it()Acquire the lock atomically. Returns a UUID token on success; raises
TrackerIsLockedErrorif the lock is held.unlock_it()Release the lock, but only if the caller’s token matches. If the lock has already expired and been re-acquired by another process, this becomes a safe no-op.
lock()Context manager for the common acquire-then-release pattern:
tracker = Tracker(db_path) with tracker.lock("books", expire=60): # build index safely ...
The lock is guaranteed to be released on exit — even if an exception is raised.
Lazy table creation¶
The locks table is not created in __init__. Instead, the private
_execute() method catches
sqlite3.OperationalError (“no such table”), creates the table, and retries
the original statement. This keeps the happy path fast (no extra CREATE
TABLE IF NOT EXISTS round-trip on every call) while remaining self-healing on
first use.