Source code for nvalchemi.hooks._registry
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Hook registry mixin for workflow engines."""
from __future__ import annotations
from enum import Enum
from nvalchemi.data import Batch
from nvalchemi.hooks._context import HookContext
from nvalchemi.hooks._protocol import Hook
[docs]
class HookRegistryMixin:
"""Mixin providing flat-list hook storage and dispatch.
The host class must provide a ``step_count`` attribute. Override
``_build_context`` to populate workflow-specific fields.
Set ``_stage_type`` on the host class to restrict which stage enum
types may be registered. When set, ``register_hook`` raises
:class:`TypeError` if ``hook.stage`` is not an instance of the
declared type(s).
Attributes
----------
hooks : list[Hook]
Flat list of registered hooks.
step_count : int
Current step number (must be provided by the engine).
_stage_type : type[Enum] | tuple[type[Enum], ...] | None
Accepted stage enum type(s). ``None`` disables validation.
"""
hooks: list[Hook]
step_count: int
_stage_type: type[Enum] | tuple[type[Enum], ...] | None = None
def _init_hooks(self, hooks: list[Hook] | None = None) -> None:
"""Initialize hook storage and register provided hooks.
Parameters
----------
hooks : list[Hook] | None
Optional list of hooks to register.
"""
self.hooks = []
if hooks:
for hook in hooks:
self.register_hook(hook)
def register_hook(self, hook: Hook, stage: Enum | None = None) -> None:
"""Register a hook.
Parameters
----------
hook : Hook
Hook to register.
stage : Enum | None
If provided, assigns ``hook.stage = stage`` before
validation. This allows stage-agnostic hooks (constructed
with ``stage=None``) to receive their stage at registration
time.
Raises
------
ValueError
If ``hook.frequency`` is not a positive integer.
TypeError
If ``hook.stage`` is ``None`` after assignment and the hook
does not define ``_runs_on_stage``, or if ``hook.stage`` is
not an instance of the accepted ``_stage_type`` declared on
the engine **and** the hook does not define
``_runs_on_stage`` (cross-category hooks that manage their
own stage dispatch bypass this check).
"""
if not isinstance(hook.frequency, int) or hook.frequency < 1:
raise ValueError(
f"Hook frequency must be a positive integer, got {hook.frequency}"
)
if stage is not None:
hook.stage = stage
stage_type = self._stage_type
# Hooks that define ``_runs_on_stage`` handle stage dispatch
# themselves (e.g. cross-category hooks); skip the type check.
has_custom_dispatch = getattr(hook, "_runs_on_stage", None) is not None
if hook.stage is None and not has_custom_dispatch:
raise TypeError(
f"Hook {type(hook).__name__} has no stage assigned. "
f"Pass stage= to the hook constructor or to register_hook()."
)
if (
stage_type is not None
and not has_custom_dispatch
and hook.stage is not None
and not isinstance(hook.stage, stage_type)
):
expected = (
stage_type.__name__
if isinstance(stage_type, type)
else " | ".join(t.__name__ for t in stage_type)
)
raise TypeError(
f"Hook {hook!r} has stage={hook.stage!r} "
f"(type {type(hook.stage).__name__}), but this engine "
f"only accepts {expected} stages."
)
self.hooks.append(hook)
def _build_context(self, batch: Batch) -> HookContext:
"""Build a HookContext for the current state.
Override in subclasses to populate workflow-specific fields.
Parameters
----------
batch : Batch
Current batch being processed.
Returns
-------
HookContext
Context object for hooks.
"""
return HookContext(
batch=batch,
step_count=self.step_count,
model=getattr(self, "model", None),
)
def _call_hooks(self, stage: Enum, batch: Batch) -> None:
"""Call hooks registered for the given stage, gated by frequency.
Hooks fire when ``self.step_count % hook.frequency == 0``.
Hooks that define ``_runs_on_stage`` are called when that method
returns ``True``; otherwise, the default check is
``stage == hook.stage``.
Parameters
----------
stage : Enum
Current workflow stage.
batch : Batch
Current batch being processed.
"""
ctx = self._build_context(batch)
for hook in self.hooks:
runs_on_stage = getattr(hook, "_runs_on_stage", None)
if runs_on_stage is not None:
if not runs_on_stage(stage):
continue
elif stage != hook.stage:
continue
if self.step_count % hook.frequency == 0:
hook(ctx, stage)