prefect#

Prefect execution backend.

Uses Prefect for workflow orchestration with observability, retries, and scheduling capabilities.

Classes#

PrefectBackend

Execute pipeline items using Prefect tasks.

Module Contents#

class physicsnemo_curator.run.prefect.PrefectBackend[source]#

Bases: physicsnemo_curator.run.base.RunBackend

Execute pipeline items using Prefect tasks.

Prefect provides workflow orchestration with features like: - Automatic retries on failure - Observability and logging - Caching of task results - Scheduling and triggers - Distributed execution with Dask or Ray

Warning

Stateful filters accumulate per-task state that is not merged back. Design a post-hoc merge strategy if needed.

Backend Options#

task_runnerprefect.task_runners.BaseTaskRunner | None

Custom task runner (e.g., DaskTaskRunner, RayTaskRunner).

retriesint

Number of retries for failed tasks. Default: 0.

retry_delay_secondsfloat

Delay between retries. Default: 0.

timeout_secondsfloat | None

Timeout for each task.

cache_key_fnCallable | None

Function to generate cache keys.

persist_resultbool

Whether to persist task results. Default: False.

flow_namestr

Name for the Prefect flow. Default: “run_pipeline”.

run(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
config: physicsnemo_curator.run.base.RunConfig,
) list[list[str]][source]#

Execute pipeline indices using Prefect.

Parameters:
  • pipeline (Pipeline) – The pipeline to execute.

  • config (RunConfig) – Execution configuration.

Returns:

Sink outputs, one list per index.

Return type:

list[list[str]]

Raises:

ImportError – If prefect is not installed.

description: ClassVar[str] = 'Prefect workflow orchestration with observability'#
name: ClassVar[str] = 'prefect'#
requires: ClassVar[tuple[str, Ellipsis]] = ('prefect',)#