Source code for physicsnemo_curator.core.cache

# SPDX-FileCopyrightText: Copyright (c) 2025 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Cache directory management and introspection for pipeline SQLite databases.

Provides utilities to locate, list, inspect, and clean up ``.db`` files
produced by pipeline runs.  The default cache location follows the
`XDG Base Directory Specification`_ and can be overridden with the
``PSNC_CACHE_DIR`` environment variable.

.. _XDG Base Directory Specification:
   https://specifications.freedesktop.org/basedir-spec/latest/

Usage
-----
>>> from physicsnemo_curator.core.cache import default_cache_dir, list_databases
>>> cache = default_cache_dir()
>>> for info in list_databases(cache):
...     print(info.hash_prefix, info.source_name, info.completed)
"""

from __future__ import annotations

import json
import logging
import os
import pathlib
import sqlite3
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Default cache directory
# ---------------------------------------------------------------------------


[docs] def default_cache_dir() -> pathlib.Path: """Return the default cache directory for pipeline databases. Resolution order (highest priority first): 1. ``PSNC_CACHE_DIR`` environment variable 2. ``$XDG_CACHE_HOME/psnc/`` 3. ``~/.cache/psnc/`` Returns ------- pathlib.Path Absolute path to the cache directory (may not exist yet). Examples -------- >>> import os >>> os.environ["PSNC_CACHE_DIR"] = "/tmp/my_cache" >>> default_cache_dir() PosixPath('/tmp/my_cache') """ psnc = os.environ.get("PSNC_CACHE_DIR") if psnc: return pathlib.Path(psnc) xdg = os.environ.get("XDG_CACHE_HOME") if xdg: return pathlib.Path(xdg) / "psnc" return pathlib.Path.home() / ".cache" / "psnc"
# --------------------------------------------------------------------------- # DBInfo dataclass # ---------------------------------------------------------------------------
[docs] @dataclass class DBInfo: """Metadata about a single pipeline database file. Parameters ---------- hash_prefix : str Filename stem (the config hash prefix used as the DB name). path : pathlib.Path Absolute path to the ``.db`` file. size_bytes : int File size in bytes. created : datetime Pipeline run start timestamp (from ``pipeline_runs.started_at``). source_name : str Registered source name extracted from the stored config JSON. sink_name : str Registered sink name extracted from the stored config JSON. filter_names : list[str] Registered filter names extracted from the stored config JSON. total : int Total number of ``index_results`` rows (completed + failed). completed : int Number of completed index results. failed : int Number of failed index results. """ hash_prefix: str path: pathlib.Path size_bytes: int created: datetime source_name: str sink_name: str filter_names: list[str] = field(default_factory=list) total: int = 0 completed: int = 0 failed: int = 0
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _read_db_info(db_path: pathlib.Path) -> DBInfo | None: """Read metadata from a single pipeline database file. Parameters ---------- db_path : pathlib.Path Path to a ``.db`` file. Returns ------- DBInfo | None Metadata if the DB is valid, or ``None`` if it is corrupt / unreadable. """ try: conn = sqlite3.connect(str(db_path), timeout=5) try: row = conn.execute( "SELECT config_hash, config_json, started_at FROM pipeline_runs ORDER BY run_id DESC LIMIT 1" ).fetchone() if row is None: return None config_hash, config_json, started_at = row # Parse config for source / sink / filter names config = json.loads(config_json) source_name = config.get("source", {}).get("name", "") sink_name = config.get("sink", {}).get("name", "") filter_names = [f.get("name", "") for f in config.get("filters", [])] # Parse started_at timestamp created = datetime.fromisoformat(started_at) if created.tzinfo is None: created = created.replace(tzinfo=UTC) # Count index results completed_row = conn.execute("SELECT COUNT(*) FROM index_results WHERE status = 'completed'").fetchone() completed = completed_row[0] if completed_row else 0 failed_row = conn.execute("SELECT COUNT(*) FROM index_results WHERE status = 'error'").fetchone() failed = failed_row[0] if failed_row else 0 total = completed + failed return DBInfo( hash_prefix=db_path.stem, path=db_path.resolve(), size_bytes=db_path.stat().st_size, created=created, source_name=source_name, sink_name=sink_name, filter_names=filter_names, total=total, completed=completed, failed=failed, ) finally: conn.close() except (sqlite3.Error, json.JSONDecodeError, OSError) as exc: logger.debug("Skipping corrupt or unreadable DB %s: %s", db_path, exc) return None def _resolve_cache_dir(cache_dir: pathlib.Path | None) -> pathlib.Path: """Resolve the cache directory, falling back to the default. Parameters ---------- cache_dir : pathlib.Path | None Explicit cache directory, or ``None`` to use :func:`default_cache_dir`. Returns ------- pathlib.Path Resolved cache directory path. """ if cache_dir is not None: return pathlib.Path(cache_dir) return default_cache_dir() # --------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------
[docs] def list_databases(cache_dir: pathlib.Path | None = None) -> list[DBInfo]: """List all pipeline databases in the cache directory. Opens each ``.db`` file, reads the ``pipeline_runs`` and ``index_results`` tables, and returns metadata sorted newest first (by ``started_at`` timestamp). Corrupt or unreadable databases are silently skipped. Parameters ---------- cache_dir : pathlib.Path | None, optional Directory to scan. Defaults to :func:`default_cache_dir`. Returns ------- list[DBInfo] Metadata for each valid database, sorted newest first. """ d = _resolve_cache_dir(cache_dir) if not d.is_dir(): return [] infos: list[DBInfo] = [] for p in d.glob("*.db"): info = _read_db_info(p) if info is not None: infos.append(info) # Sort newest first by created timestamp infos.sort(key=lambda x: x.created, reverse=True) return infos
[docs] def remove_databases( hash_prefixes: list[str], *, cache_dir: pathlib.Path | None = None, ) -> int: """Remove pipeline databases matching the given hash prefixes. Each prefix is matched against ``.db`` filenames (stems). A prefix that matches more than one file raises :class:`ValueError` to prevent accidental deletion. Parameters ---------- hash_prefixes : list[str] Hash prefix strings to match against DB file stems. cache_dir : pathlib.Path | None, optional Directory to scan. Defaults to :func:`default_cache_dir`. Returns ------- int Number of database files removed. Raises ------ ValueError If a prefix is ambiguous (matches more than one ``.db`` file). """ d = _resolve_cache_dir(cache_dir) if not d.is_dir(): return 0 db_files = list(d.glob("*.db")) removed = 0 for prefix in hash_prefixes: matches = [f for f in db_files if f.stem.startswith(prefix)] if len(matches) > 1: stems = [f.stem for f in matches] msg = f"Prefix {prefix!r} is ambiguous, matches {len(matches)} databases: {stems}" raise ValueError(msg) if len(matches) == 1: matches[0].unlink() removed += 1 return removed
[docs] def remove_older_than( max_age: timedelta, *, cache_dir: pathlib.Path | None = None, ) -> int: """Remove pipeline databases older than *max_age* (by file mtime). Parameters ---------- max_age : timedelta Maximum age. Files with an mtime older than ``now - max_age`` are removed. cache_dir : pathlib.Path | None, optional Directory to scan. Defaults to :func:`default_cache_dir`. Returns ------- int Number of database files removed. """ d = _resolve_cache_dir(cache_dir) if not d.is_dir(): return 0 import time cutoff = time.time() - max_age.total_seconds() removed = 0 for p in d.glob("*.db"): if p.stat().st_mtime < cutoff: p.unlink() removed += 1 return removed
[docs] def clear_cache(*, cache_dir: pathlib.Path | None = None) -> int: """Remove all ``.db`` files from the cache directory. Parameters ---------- cache_dir : pathlib.Path | None, optional Directory to clear. Defaults to :func:`default_cache_dir`. Returns ------- int Number of database files removed. """ d = _resolve_cache_dir(cache_dir) if not d.is_dir(): return 0 removed = 0 for p in d.glob("*.db"): p.unlink() removed += 1 return removed
[docs] def cache_size(*, cache_dir: pathlib.Path | None = None) -> int: """Return the total size in bytes of all ``.db`` files in the cache. Parameters ---------- cache_dir : pathlib.Path | None, optional Directory to measure. Defaults to :func:`default_cache_dir`. Returns ------- int Total bytes occupied by ``.db`` files, or ``0`` if the directory is empty or does not exist. """ d = _resolve_cache_dir(cache_dir) if not d.is_dir(): return 0 return sum(p.stat().st_size for p in d.glob("*.db"))