logging#

Logging utilities for PhysicsNeMo Curator.

Provides a consistent logging interface for sources, filters, and sinks that:

  1. Uses standard Python logging (captured by TUI when in main process)

  2. Automatically includes process/worker identification

  3. Provides structured log messages with timing information

  4. Supports database-backed logging for cross-process log aggregation

Usage in Sources/Filters#

from physicsnemo_curator.core.logging import get_logger

class MySource(Source):
    def __init__(self, ...):
        self._log = get_logger(self)

    def __getitem__(self, index: int) -> Generator[Mesh]:
        self._log.info("Reading index %d", index)
        ...
        self._log.debug("Loaded %d points", n_points)

Log Format#

Messages are formatted as:

[ProcessName] ClassName: message
For example::

[Worker-1] AhmedMLSource: Reading index 5 [MainProcess] MeshStatsFilter: Computing statistics

Database Logging#

For cross-process logging (e.g., in worker processes), use DatabaseLogHandler which buffers logs and writes them to the pipeline store in batches to minimize database lock contention.

Classes#

DatabaseLogHandler

Logging handler that writes to the pipeline store database.

Functions#

configure_logging(→ None)

Configure logging for physicsnemo_curator.

get_logger(→ _ComponentLogger)

Get a logger for a pipeline component.

setup_worker_logging(→ DatabaseLogHandler)

Configure logging for a worker process to write to the database.

Module Contents#

class physicsnemo_curator.core.logging.DatabaseLogHandler(
store: physicsnemo_curator.core.pipeline_store.PipelineStore,
worker_id: str | None = None,
buffer_size: int = 50,
flush_interval: float = 2.0,
)[source]#

Bases: logging.Handler

Logging handler that writes to the pipeline store database.

Buffers log records and flushes them to the database periodically or when the buffer reaches a threshold. This minimizes database lock contention in multi-process scenarios.

Call flush() explicitly at key points (e.g., after source reads) to ensure logs appear promptly during long operations.

Parameters:
  • store (PipelineStore) – The pipeline store to write logs to.

  • worker_id (str | None) – Identifier for the current worker (e.g., “Worker-1”).

  • buffer_size (int) – Number of records to buffer before flushing (default: 50).

  • flush_interval (float) – Maximum seconds between flushes (default: 2.0).

  • None (Initializes the instance - basically setting the formatter to)

  • empty. (and the filter list to)

close() None[source]#

Flush and close the handler.

emit(record: logging.LogRecord) None[source]#

Buffer a log record for later database write.

flush() None[source]#

Flush buffered logs to the database.

set_current_index(index: int | None) None[source]#

Set the current index being processed (for log context).

physicsnemo_curator.core.logging.configure_logging(level: int = logging.INFO) None[source]#

Configure logging for physicsnemo_curator.

Sets up a console handler with process-aware formatting. Call this at the start of your script if you want to see log output. The TUI automatically configures logging when it starts.

Parameters:

level (int, optional) – Logging level (default: logging.INFO).

Examples

>>> from physicsnemo_curator.core.logging import configure_logging
>>> configure_logging(logging.DEBUG)
physicsnemo_curator.core.logging.get_logger(
component: physicsnemo_curator.core.base.Source | physicsnemo_curator.core.base.Filter | physicsnemo_curator.core.base.Sink | str,
) _ComponentLogger[source]#

Get a logger for a pipeline component.

Parameters:

component (Source, Filter, Sink, or str) – The pipeline component instance or a string name.

Returns:

A logger wrapper with process-aware formatting.

Return type:

_ComponentLogger

Examples

>>> from physicsnemo_curator.core.logging import get_logger
>>> log = get_logger("MySource")
>>> log.info("Processing index %d", 42)
[MainProcess:12345] MySource: Processing index 42
physicsnemo_curator.core.logging.setup_worker_logging(
store: physicsnemo_curator.core.pipeline_store.PipelineStore,
level: int = logging.INFO,
) DatabaseLogHandler[source]#

Configure logging for a worker process to write to the database.

Call this at the start of each worker process to enable database-backed logging. Logs are buffered and written in batches to minimize lock contention.

Parameters:
  • store (PipelineStore) – The pipeline store to write logs to.

  • level (int) – Logging level (default: logging.INFO).

Returns:

The handler instance (useful for setting current_index).

Return type:

DatabaseLogHandler

Examples

>>> from physicsnemo_curator.core.logging import setup_worker_logging
>>> handler = setup_worker_logging(store, level=logging.DEBUG)
>>> handler.set_current_index(42)  # Set context for current task