# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NVRx Logger for Large-Scale LLM Training
This module provides a simple and efficient log manager that supports both
regular logging and distributed logging for large-scale training with thousands
of GPUs. The design automatically adapts based on environment configuration.
Key Design Principles:
- Environment-driven behavior: NVRX_NODE_LOCAL_TMPDIR controls distributed vs regular logging
- Per-node aggregation: When distributed logging is enabled, a separate aggregator service does log aggregation
- Dynamic rank detection: Automatically reads rank info from environment variables
- Scalable: Works with 3K+ GPUs without overwhelming logging infrastructure
- Fork-safe: All ranks use file-based messaging to ensure child processes can log
- Subprocess-safe: Supports force_reset=True for fresh logger setup in subprocesses
- Service-based aggregation: Aggregator can run as a separate service for reliable log collection
Features:
- Dual mode operation: Regular logging (stderr/stdout) or distributed logging (file aggregation)
- Per-node log files: When distributed logging is enabled (e.g., node_hostname.log)
- Automatic rank and node identification in log messages
- Thread-safe logging with proper synchronization
- Environment variable configuration for easy deployment
- Fork-safe design with file-based message passing for all ranks
- Separate aggregator service: Can run independently of training processes
- Configurable temp directory: Customizable location for pending message files
Environment Variables:
NVRX_LOG_DEBUG: Set to "1", "true", "yes", or "on" to enable DEBUG level logging (default: INFO)
NVRX_LOG_TO_STDOUT: Set to "1" to log to stdout instead of stderr
NVRX_NODE_LOCAL_TMPDIR: Directory for temporary log files
NVRX_LOG_MAX_FILE_SIZE_KB: Maximum size of temporary message files in KB before rotation (default: 10240 KB = 10 MB)
NVRX_LOG_MAX_LOG_FILES: Maximum number of log files to keep per rank (default: 4)
Note: File rotation is designed to be safe for the aggregator service. When files are rotated,
the aggregator will automatically read from both current and backup files to ensure no messages are lost.
Usage:
# In main script (launcher.py)
from nvidia_resiliency_ext.shared_utils.log_manager import setup_logger
logger = setup_logger() # Call once at startup
# In other modules
import logging
logger = logging.getLogger(LogConfig.name)
logger.info("Training started")
logger.debug("Debug information")
logger.error("Error occurred")
logger.warning("Warning message")
logger.critical("Critical error")
Forking Support:
The logger is designed to work safely with process forking. When using fork():
# In parent process
from nvidia_resiliency_ext.shared_utils.log_manager import setup_logger
logger = setup_logger() # Setup before forking
logger.info("Parent process logging")
# Fork child process
pid = os.fork()
if pid == 0:
# In child process - logger will work normally
import logging
logger = logging.getLogger(LogConfig.name)
logger.info("Child process logging")
else:
# Parent continues normally
logger.info("Parent continues")
All ranks use file-based message passing, ensuring child processes can log
even when they don't inherit the aggregator thread from the parent.
Separate Aggregator Service, see log_aggregator.py for details.
"""
import logging
import os
import socket
import sys
from typing import Optional
from nvidia_resiliency_ext.shared_utils.log_node_local_tmp import (
DynamicLogFormatter,
NodeLocalTmpLogHandler,
)
[docs]
class LogConfig:
"""Utility class for log configuration."""
name = "nvrx"
_aggr_file_prefix = "nvrx_log_"
[docs]
@classmethod
def get_node_id(cls):
# Use hostname as node identifier
return socket.gethostname()
[docs]
@classmethod
def get_log_file(cls):
return f"{cls._aggr_file_prefix}{cls.get_node_id()}.log"
[docs]
@classmethod
def get_node_local_tmp_dir(cls, node_local_tmp_dir=None):
# Use configurable temporary directory for pending messages
return node_local_tmp_dir or os.environ.get("NVRX_NODE_LOCAL_TMPDIR")
[docs]
@classmethod
def get_max_file_size(cls, file_size_kb=None) -> int:
if file_size_kb is None:
file_size_kb = int(os.environ.get("NVRX_LOG_MAX_FILE_SIZE_KB", "10240"))
return file_size_kb * 1024 # Convert KB to bytes
[docs]
@classmethod
def get_max_log_files(cls) -> int:
return int(
os.environ.get("NVRX_LOG_MAX_LOG_FILES", "4")
) # Keep default 4 backup files per rank
[docs]
@classmethod
def get_workload_rank(cls):
return int(os.environ.get("RANK", "0")) if os.environ.get("RANK") else None
[docs]
@classmethod
def get_workload_local_rank(cls):
return int(os.environ.get("LOCAL_RANK", "0")) if os.environ.get("LOCAL_RANK") else None
[docs]
@classmethod
def get_infra_rank(cls):
return int(os.environ.get("SLURM_PROCID", "0")) if os.environ.get("SLURM_PROCID") else None
[docs]
@classmethod
def get_infra_local_rank(cls):
return (
int(os.environ.get("SLURM_LOCALID", "0")) if os.environ.get("SLURM_LOCALID") else None
)
[docs]
@classmethod
def get_log_level(cls):
# Use NVRX_LOG_DEBUG environment variable to determine log level
debug_enabled = os.environ.get("NVRX_LOG_DEBUG", "").lower() in ("1", "true", "yes", "on")
return logging.DEBUG if debug_enabled else logging.INFO
[docs]
@classmethod
def get_process_name(cls, proc_name: Optional[str] = None) -> str:
return proc_name if proc_name is not None else str(os.getpid())
[docs]
@classmethod
def get_log_to_stdout_cfg(cls) -> bool:
return os.environ.get("NVRX_LOG_TO_STDOUT", "").lower() in ("1", "true", "yes", "on")
[docs]
class LogManager:
"""
Log manager for large-scale LLM training.
Supports both regular logging and node local temporary logging. When node local temporary logging
is enabled (NVRX_NODE_LOCAL_TMPDIR is set), each node logs independently to avoid
overwhelming centralized logging systems. Local rank 0 acts as the node aggregator,
collecting logs from all ranks on the same node and writing them to a per-node log file.
Fork-safe: Child processes automatically disable aggregation to avoid conflicts.
Service-based: Aggregator can run as a separate service for reliable log collection.
"""
[docs]
def __init__(
self,
node_local_tmp_dir: Optional[str] = None,
node_local_tmp_prefix: str = None,
):
self._node_local_tmp_prefix = LogConfig.get_process_name(node_local_tmp_prefix)
# Get distributed info once during initialization
self._workload_rank = LogConfig.get_workload_rank()
self._workload_local_rank = LogConfig.get_workload_local_rank()
self._infra_rank = LogConfig.get_infra_rank()
self._infra_local_rank = LogConfig.get_infra_local_rank()
# Use NVRX_LOG_DEBUG environment variable to determine log level
self.log_level = LogConfig.get_log_level()
# Use configurable temporary directory for pending messages
self._node_local_tmp_dir = LogConfig.get_node_local_tmp_dir(node_local_tmp_dir)
# File rotation settings (in bytes)
self._max_msg_file_size = LogConfig.get_max_file_size()
self._max_backup_files = LogConfig.get_max_log_files()
self._log_to_stdout = LogConfig.get_log_to_stdout_cfg()
# Create logger
self._logger = self._setup_logger()
@property
def node_local_tmp_logging_enabled(self) -> bool:
"""Check if node local temporary logging is enabled."""
return self._node_local_tmp_dir is not None
@property
def workload_rank(self) -> Optional[int]:
"""Get the workload rank (from RANK env var)."""
return self._workload_rank
@property
def workload_local_rank(self) -> Optional[int]:
"""Get the workload local rank (from LOCAL_RANK env var)."""
return self._workload_local_rank
@property
def infra_rank(self) -> Optional[int]:
"""Get the infrastructure rank (from SLURM_PROCID env var)."""
return self._infra_rank
@property
def infra_local_rank(self) -> Optional[int]:
"""Get the infrastructure local rank (from SLURM_LOCALID env var)."""
return self._infra_local_rank
def _setup_logger(self) -> logging.Logger:
"""Setup the logger with appropriate handlers."""
# Configure the standard "nvrx" logger
logger = logging.getLogger(LogConfig.name)
logger.setLevel(self.log_level)
# Clear existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)
if self.node_local_tmp_logging_enabled:
os.makedirs(self._node_local_tmp_dir, exist_ok=True)
handler = NodeLocalTmpLogHandler(
self.workload_local_rank,
self._node_local_tmp_dir,
self._max_msg_file_size,
self._max_backup_files,
self._node_local_tmp_prefix,
)
else:
# Simple logging to stderr or stdout
if self._log_to_stdout:
handler = logging.StreamHandler(sys.stdout)
else:
handler = logging.StreamHandler(sys.stderr)
# Use dynamic formatter with static hostname and dynamic rank info
formatter = DynamicLogFormatter(
self.workload_rank,
self.workload_local_rank,
self.infra_rank,
self.infra_local_rank,
fmt=(
"%(asctime)s [%(levelname)s] "
"[{node}] "
"[workload:%(workload_rank)s(%(workload_local_rank)s) "
"infra:%(infra_rank)s(%(infra_local_rank)s)] "
"%(filename)s:%(lineno)d %(message)s"
).format(node=LogConfig.get_node_id()),
)
handler.setFormatter(formatter)
handler.setLevel(self.log_level)
logger.addHandler(handler)
logger.propagate = False
return logger
@property
def logger(self) -> logging.Logger:
"""Get the distributed logger instance.
This property provides direct access to the underlying logger,
allowing users to use all standard logging methods:
- logger.debug(message)
- logger.info(message)
- logger.warning(message)
- logger.error(message)
- logger.critical(message)
"""
return self._logger
[docs]
def setup_logger(
node_local_tmp_dir=None,
force_reset=False,
node_local_tmp_prefix: str = None,
) -> logging.Logger:
"""
Setup the distributed logger.
This function configures the standard Python logger "nvrx" with appropriate
handlers for distributed logging. It's safe to call multiple times - if the
logger is already configured, it won't be reconfigured unless force_reset=True.
The expectation is that this function is called once at the start of the program,
and then the logger is used throughout the program i.e. its a singleton.
The logger automatically adapts to distributed or regular mode based on
whether NVRX_NODE_LOCAL_TMPDIR is set. If set, enables distributed logging
with aggregation. If not set, logs go directly to stderr/stdout.
The logger is fork-safe: all ranks use file-based message passing to ensure
child processes can log even when they don't inherit the aggregator thread.
Args:
node_local_tmp_dir: Optional directory path for temporary files. If None, uses NVRX_NODE_LOCAL_TMPDIR env var.
force_reset: If True, force reconfiguration even if logger is already configured.
Useful for subprocesses that need fresh logger setup.
Returns:
logging.Logger: Configured logger instance
Example:
# In main script (launcher.py) or training subprocess
from nvidia_resiliency_ext.shared_utils.log_manager import setup_logger
logger = setup_logger()
# In subprocesses that need fresh logger setup
logger = setup_logger(force_reset=True)
# In other modules
import logging
logger = logging.getLogger(LogConfig.name)
logger.info("Some message")
"""
# Check if the nvrx logger is already configured
logger = logging.getLogger(LogConfig.name)
# If force_reset is True or the logger has no handlers, configure it
if force_reset or not logger.handlers:
# Clear existing handlers if force_reset is True
if force_reset:
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Clear any stored log manager to force fresh creation
if hasattr(setup_logger, '_log_manager'):
delattr(setup_logger, '_log_manager')
# Create a LogManager instance to handle the configuration
log_manager = LogManager(
node_local_tmp_dir=node_local_tmp_dir,
node_local_tmp_prefix=node_local_tmp_prefix,
)
# Get the configured logger from the log manager
logger = log_manager.logger
# Store the log manager instance to prevent garbage collection
# This ensures the aggregator thread keeps running
setup_logger._log_manager = log_manager
else:
# Logger is already configured, just return the existing logger
logger = logging.getLogger(LogConfig.name)
return logger