Extending PhysicsNeMo Curator#
PhysicsNeMo Curator is designed around the Pipes and Filters pattern. Every pipeline is assembled from three building blocks:
Component |
Role |
Base class |
|---|---|---|
Source |
Reads raw data and yields domain objects |
|
Filter |
Transforms a stream of objects |
|
Sink |
Persists objects and returns the paths it wrote |
You can create custom versions of any of these to support your own datasets, transformations, or output formats — no changes to the core library required.
Anatomy of a Component#
Every component shares the same skeleton:
nameanddescriptionclass variables — used by the CLI and registry for display.A
params()classmethod returning a list ofParam— drives CLI prompts and documents the constructor interface.An
__init__accepting those parameters as keyword arguments.A core method implementing the component’s logic (
__getitem__,__call__, or__call__depending on the component type).
from __future__ import annotations
from typing import ClassVar, TYPE_CHECKING
from physicsnemo_curator.core.base import Param
if TYPE_CHECKING:
pass # domain-specific imports here
class MyComponent:
name: ClassVar[str] = "My Component"
description: ClassVar[str] = "One-line description of what it does"
@classmethod
def params(cls) -> list[Param]:
return [
Param(name="option", description="Processing option", type=str, default="default"),
]
def __init__(self, option: str = "default") -> None:
self._option = option
Registering Components#
To make components discoverable by the CLI, register them in your submodule’s
__init__.py:
# src/physicsnemo_curator/mymodule/__init__.py
from physicsnemo_curator.core.registry import registry
from .sources.my_source import MySource
from .filters.my_filter import MyFilter
from .sinks.my_sink import MySink
registry.register_submodule(
"mymodule",
"My custom data processing",
"some_dependency", # import check for availability
)
registry.register_source("mymodule", MySource)
registry.register_filter("mymodule", MyFilter)
registry.register_sink("mymodule", MySink)
Testing#
Use the requires marker to skip tests when optional dependencies are
missing:
import pytest
pytestmark = pytest.mark.requires("mesh")
from physicsnemo_curator.domains.mesh.sources.vtk import VTKSource
class TestMySource:
def test_len(self, tmp_path):
# Create test fixtures in tmp_path...
source = MySource(input_path=str(tmp_path))
assert len(source) > 0
def test_yields_correct_type(self, tmp_path):
# Create test fixtures in tmp_path...
source = MySource(input_path=str(tmp_path))
item = next(source[0])
assert isinstance(item, Mesh)
Run tests with:
make test # All tests
make test-core # Core tests only (no optional deps)
make test-mesh # Mesh tests only
make test-unit # Unit tests
make test-integration # Integration tests
make test-e2e # End-to-end tests
Executing Pipelines#
Once components are registered, use run_pipeline()
to execute a pipeline:
from physicsnemo_curator import run_pipeline
pipeline = (
MySource(input_path="./data/")
.filter(MyFilter())
.write(MySink(output_dir="./out/"))
)
# Sequential
results = run_pipeline(pipeline)
# Parallel — uses all CPUs with the best available backend
results = run_pipeline(pipeline, n_jobs=-1)
See Parallel Execution for backend options and process-isolation considerations.