Source code for sdp.processors.manage_files.convert_to_tarred_audio_dataset

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
from dataclasses import dataclass
from typing import Optional
from copy import deepcopy
from tqdm import tqdm
import shutil

from sdp.processors.base_processor import BaseProcessor
from sdp.processors.manage_files.utils.convert_to_tarred_audio_dataset import create_tar_datasets

from sdp.logging import logger

@dataclass
class ConvertToTarredAudioDatasetConfig:
    """
    Configuration class for ConvertToTarredAudioDataset.

    Attributes:
        max_duration (float): Maximum allowed duration for audio samples.
        min_duration (Optional[float]): Minimum allowed duration for audio samples.
        concat_manifest_paths (Optional[str]): Path to a manifest file containing multiple manifest paths to concatenate.
        target_dir (Optional[str]): Output directory to save tarred dataset.
        metadata_path (Optional[str]): Path to write metadata about the tarred dataset.
        num_shards (int): Number of shards to create. If -1, it will be determined automatically.
        shuffle (bool): Whether to shuffle the input manifest before processing.
        keep_files_together (bool): If True, all segments from the same source file are kept in the same shard.
        sort_in_shards (bool): If True, samples inside each shard will be sorted by duration.
        buckets_num (int): Number of duration-based buckets to split data into.
        dynamic_buckets_num (int): Number of dynamic buckets for load balancing.
        shuffle_seed (Optional[int]): Random seed used for shuffling.
        write_metadata (bool): Whether to write metadata JSON files during processing.
        no_shard_manifests (bool): If True, disables writing per-shard manifest files.
        force_codec (Optional[str]): Audio codec to use when re-encoding audio files.
        workers (int): Number of worker processes for parallel audio re-encoding.
        slice_with_offset (bool): If True, audio slices will use offset and duration fields.
        only_manifests (bool): If True, only manifests will be generated without audio re-encoding.
    """
    max_duration: float
    min_duration: Optional[float] = None
    concat_manifest_paths: Optional[str] = None
    target_dir: Optional[str] = None
    metadata_path: Optional[str] = None
    num_shards: int = -1
    shuffle: bool = False
    keep_files_together: bool = False
    sort_in_shards: bool = False
    buckets_num: int = 1
    dynamic_buckets_num: int = 30
    shuffle_seed: Optional[int] = None
    write_metadata: bool = False
    no_shard_manifests: bool = False
    force_codec: Optional[str] = None
    workers: int = 1
    slice_with_offset: bool = False
    only_manifests: bool = False


[docs] class ConvertToTarredAudioDataset(BaseProcessor): """ A processor for converting audio manifests into tarred audio datasets. This processor optionally splits data into duration-based buckets, and calls the `create_tar_datasets` utility to convert and shard audio data into tar files, with accompanying manifest files. Args: output_manifest_file (str): Path to the final output manifest. input_manifest_file (str): Path to the input manifest to be tarred. **cfg_kwargs: Additional keyword arguments passed to the configuration dataclass. Returns: Writes a tarred and sharded audio dataset to disk. - The dataset consists of multiple `.tar` archives with audio files. - A final manifest (JSON lines format) is written to ``output_manifest_file``, referencing each sample, its path inside the tar, and other metadata. - If ``buckets_num > 1``, each sample will include an additional ``bucket_id`` field. .. note:: If `buckets_num > 1`, the input manifest is split into multiple duration buckets, and each bucket is processed independently. A `bucket_id` is added to each sample. You may need to install the extra dependencies of Lhotse and NeMo for this processor to work correctly: ``pip install lhotse "nemo-toolkit[common]"`` """ def __init__( self, output_manifest_file: str, input_manifest_file: str = None, **cfg_kwargs, ): super().__init__( input_manifest_file=input_manifest_file, output_manifest_file=output_manifest_file ) self.cfg = ConvertToTarredAudioDatasetConfig(**cfg_kwargs) def process(self): # If bucketing is enabled, divide the data based on duration ranges. if self.cfg.buckets_num > 1: with open(self.output_manifest_file, 'w', encoding='utf8') as fout: bucket_length = (self.cfg.max_duration - self.cfg.min_duration) / float(self.cfg.buckets_num) for i_bucket in range(self.cfg.buckets_num): # Create a config for the current bucket bucket_config = deepcopy(self.cfg) bucket_config.min_duration = self.cfg.min_duration + i_bucket * bucket_length bucket_config.max_duration = bucket_config.min_duration + bucket_length if i_bucket == self.cfg.buckets_num - 1: # Ensure final bucket includes edge cases bucket_config.max_duration += 1e-5 bucket_config.target_dir = os.path.join(self.cfg.target_dir, f"bucket{i_bucket+1}") logger.info(f"Creating bucket {i_bucket+1} with min_duration={bucket_config.min_duration} and max_duration={bucket_config.max_duration} ...") logger.info(f"Results are being saved at: {bucket_config.target_dir}.") # Create tarred dataset for the current bucket create_tar_datasets( manifest_path=self.input_manifest_file, **vars(bucket_config) ) # Read and modify the output manifest from this bucket bucket_manifest_path = os.path.join(bucket_config.target_dir, 'tarred_audio_manifest.json') with open(bucket_manifest_path, 'r', encoding='utf8') as bin_f: for line in tqdm(bin_f, desc="Writing output manifest.."): entry = json.loads(line) entry['bucket_id'] = i_bucket #line = json.dumps(entry) json.dump(entry, fout, ensure_ascii=False) fout.write('\n') logger.info(f"Bucket {i_bucket+1} is created.") else: # No bucketing — create single tarred dataset create_tar_datasets( manifest_path=self.input_manifest_file, **vars(self.cfg) ) # Copy the generated manifest to the target location tarred_audio_manifest = os.path.join(self.cfg.target_dir, 'tarred_audio_manifest.json') shutil.copy(tarred_audio_manifest, self.output_manifest_file)