logging#
Logging utilities for PhysicsNeMo Curator.
Provides a consistent logging interface for sources, filters, and sinks that:
Uses standard Python logging (captured by TUI when in main process)
Automatically includes process/worker identification
Provides structured log messages with timing information
Supports database-backed logging for cross-process log aggregation
Usage in Sources/Filters#
from physicsnemo_curator.core.logging import get_logger
class MySource(Source):
def __init__(self, ...):
self._log = get_logger(self)
def __getitem__(self, index: int) -> Generator[Mesh]:
self._log.info("Reading index %d", index)
...
self._log.debug("Loaded %d points", n_points)
Log Format#
Messages are formatted as:
[ProcessName] ClassName: message
- For example::
[Worker-1] AhmedMLSource: Reading index 5 [MainProcess] MeshStatsFilter: Computing statistics
Database Logging#
For cross-process logging (e.g., in worker processes), use
DatabaseLogHandler which buffers logs and writes them
to the pipeline store in batches to minimize database lock contention.
Classes#
Logging handler that writes to the pipeline store database. |
Functions#
|
Configure logging for physicsnemo_curator. |
|
Get a logger for a pipeline component. |
|
Configure logging for a worker process to write to the database. |
Module Contents#
- class physicsnemo_curator.core.logging.DatabaseLogHandler(
- store: physicsnemo_curator.core.pipeline_store.PipelineStore,
- worker_id: str | None = None,
- buffer_size: int = 50,
- flush_interval: float = 2.0,
Bases:
logging.HandlerLogging handler that writes to the pipeline store database.
Buffers log records and flushes them to the database periodically or when the buffer reaches a threshold. This minimizes database lock contention in multi-process scenarios.
Call
flush()explicitly at key points (e.g., after source reads) to ensure logs appear promptly during long operations.- Parameters:
store (PipelineStore) – The pipeline store to write logs to.
worker_id (str | None) – Identifier for the current worker (e.g., “Worker-1”).
buffer_size (int) – Number of records to buffer before flushing (default: 50).
flush_interval (float) – Maximum seconds between flushes (default: 2.0).
None (Initializes the instance - basically setting the formatter to)
empty. (and the filter list to)
- emit(record: logging.LogRecord) None[source]#
Buffer a log record for later database write.
- physicsnemo_curator.core.logging.configure_logging(level: int = logging.INFO) None[source]#
Configure logging for physicsnemo_curator.
Sets up a console handler with process-aware formatting. Call this at the start of your script if you want to see log output. The TUI automatically configures logging when it starts.
- Parameters:
level (int, optional) – Logging level (default: logging.INFO).
Examples
>>> from physicsnemo_curator.core.logging import configure_logging >>> configure_logging(logging.DEBUG)
- physicsnemo_curator.core.logging.get_logger(
- component: physicsnemo_curator.core.base.Source | physicsnemo_curator.core.base.Filter | physicsnemo_curator.core.base.Sink | str,
Get a logger for a pipeline component.
- Parameters:
component (Source, Filter, Sink, or str) – The pipeline component instance or a string name.
- Returns:
A logger wrapper with process-aware formatting.
- Return type:
_ComponentLogger
Examples
>>> from physicsnemo_curator.core.logging import get_logger >>> log = get_logger("MySource") >>> log.info("Processing index %d", 42) [MainProcess:12345] MySource: Processing index 42
- physicsnemo_curator.core.logging.setup_worker_logging(
- store: physicsnemo_curator.core.pipeline_store.PipelineStore,
- level: int = logging.INFO,
Configure logging for a worker process to write to the database.
Call this at the start of each worker process to enable database-backed logging. Logs are buffered and written in batches to minimize lock contention.
- Parameters:
store (PipelineStore) – The pipeline store to write logs to.
level (int) – Logging level (default: logging.INFO).
- Returns:
The handler instance (useful for setting current_index).
- Return type:
Examples
>>> from physicsnemo_curator.core.logging import setup_worker_logging >>> handler = setup_worker_logging(store, level=logging.DEBUG) >>> handler.set_current_index(42) # Set context for current task