Error Handling

Energon provides configurable error handling to gracefully manage exceptions during data loading and processing. By default, errors in task encoder code are logged and the sample is skipped, allowing training to continue. Some system exceptions (clearly programming errors) always cause immediate failure.

By default, if more than 100 samples in a row raise an error, processing will also fail. This is configurable for the TaskEncoder user-functions by setting the @stateless(failure_tolerance=100) parameter.

Error Handler Configuration

Error handlers are configured via WorkerConfig:

from megatron.energon import WorkerConfig, log_exception, reraise_exception

worker_config = WorkerConfig(
    rank=0,
    world_size=1,
    num_workers=4,
    global_error_handler=log_exception,  # Default: logs and skips failed samples
    restore_error_handler=reraise_exception,  # Default: fails fast on restore
)

global_error_handler: Handles exceptions during normal iteration (training/validation). Default is log_exception which logs details and skips the sample.

restore_error_handler: Handles exceptions during checkpoint restoration. Default is reraise_exception which fails fast to ensure checkpoint integrity.

System Exceptions

Programming errors and critical system issues always cause immediate failure and are never handled by error handlers: SystemError, SyntaxError, ImportError, StopIteration, StopAsyncIteration, MemoryError, RecursionError, ReferenceError, NameError, UnboundLocalError, and FatalSampleError. FatalSampleError is raised automatically when consecutive failure tolerance is exceeded or when a system exception occurs during sample processing.

Built-in Error Handlers

log_exception

Logs detailed error information and continues:

  • Exception traceback

  • Source information (dataset path, shard, index)

  • Sample details in readable format

from megatron.energon import log_exception

worker_config = WorkerConfig(
    rank=0,
    world_size=1,
    num_workers=4,
    global_error_handler=log_exception,
)

reraise_exception

Immediately reraises the exception to halt iteration:

from megatron.energon import reraise_exception

worker_config = WorkerConfig(
    rank=0,
    world_size=1,
    num_workers=4,
    global_error_handler=reraise_exception,  # Fail on any error
)

Custom Error Handlers

Implement custom error handlers with this signature:

def my_error_handler(
    exception: Exception,
    sample: Any,
    sources: list[SourceInfo] | None
) -> None:
    # Log to your monitoring system
    log_to_monitoring(exception, sample)
    
    # Optionally reraise for critical errors
    if isinstance(exception, CriticalError):
        raise exception
worker_config = WorkerConfig(
    rank=0,
    world_size=1,
    num_workers=4,
    global_error_handler=my_error_handler,
)

Failure Tolerance for Task Encoder Functions

By default, if more than 100 samples in a row raise an error, processing will fail with a FatalSampleError.

For TaskEncoder methods, configure this via the @stateless decorator:

from megatron.energon import DefaultTaskEncoder, stateless

class MyTaskEncoder(DefaultTaskEncoder):
    @stateless(failure_tolerance=50)
    def encode_sample(self, sample):
        # Process sample - tolerates up to 50 consecutive failures
        if sample.is_corrupted():
            raise ValueError("Corrupted sample")
        return sample
    
    @stateless(restore_seeds=True, failure_tolerance=200)
    def pack_selected_samples(self, samples):
        # Packing with higher tolerance and deterministic randomness
        return pack_samples(samples)

Set failure_tolerance=0 to disable tolerance checking for a specific function.

Note

Tolerance limits count consecutive failures. A single successful sample resets the counter.

Skip or Fail Explicitly

Raise SkipSample to explicitly skip a sample without logging an error:

from megatron.energon import SkipSample

def process_sample(sample):
    try:
        ...
    except MySpecificError:
        raise SkipSample()
    return sample

Raise FatalSampleError to cause immediate failure, bypassing the error handler:

from megatron.energon import FatalSampleError

def process_sample(sample):
    try:
        ...
    except MyFatalError as e:
        raise FatalSampleError.from_sample(sample, "Critical corruption detected") from e
    return sample