Source code for sdp.processors.datasets.earnings.create_initial_manifest

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import json
import os
import re
from pathlib import Path
from typing import List, Dict, Any, Optional

import pandas as pd
import librosa

from sdp.logging import logger
from sdp.processors.base_processor import BaseParallelProcessor, BaseProcessor, DataEntry
from sdp.utils.common import extract_archive


# Step 1: Create Initial Audio and Manifest (Full Audio)
[docs] class CreateInitialAudioAndManifest(BaseParallelProcessor): """Create initial audio manifest from Earnings21/22 dataset files. This processor creates the initial manifest for Earnings21/22 datasets by discovering audio files and creating manifest entries with duration information. Audio format conversion should be handled by a separate FfmpegConvert processor in the pipeline. Args: dataset_root (str): Path to the root directory of the dataset. raw_audio_source_dir (str): Path to the directory containing raw audio files. output_manifest_file (str): Path where the output manifest will be saved. dataset_type (str): Type of dataset ("earnings21" or "earnings22"). Defaults to "earnings21". subset (str): Dataset subset ("full" or "eval10" for earnings21 only). Defaults to "full". test_mode (bool): If True, process only 2 files for testing. Defaults to False. Returns: Manifest entries with audio_filepath, duration, text (placeholder), and file_id fields. Use FfmpegConvert processor afterwards for audio format standardization. Example: .. code-block:: yaml - _target_: sdp.processors.datasets.earnings.CreateInitialAudioAndManifest dataset_root: /path/to/earnings21 raw_audio_source_dir: ${dataset_root}/media output_manifest_file: ${output_dir}/01_initial_manifest.json dataset_type: earnings21 subset: full test_mode: false """ def __init__( self, dataset_root: str, raw_audio_source_dir: str, output_manifest_file: str, dataset_type: str = "earnings21", # "earnings21" or "earnings22" subset: str = "full", test_mode: bool = False, **kwargs, ): super().__init__(**kwargs) self.dataset_root = Path(dataset_root) self.raw_audio_source_dir = Path(raw_audio_source_dir) self.output_manifest_file = output_manifest_file self.dataset_type = dataset_type self.subset = subset self.test_mode = test_mode def prepare(self): """Prepare the processor by loading file metadata.""" # Extract archives if needed (earnings21 only) if self.dataset_type == "earnings21": for archive_name in ["transcripts.tar.gz", "audio.tar.gz"]: archive_path = self.dataset_root / archive_name if archive_path.exists(): extract_archive(str(archive_path), str(self.dataset_root)) # Load file list based on dataset type and subset if self.dataset_type == "earnings21": if self.subset == "eval10": metadata_file = self.dataset_root / "eval10-file-metadata.csv" else: metadata_file = self.dataset_root / "earnings21-file-metadata.csv" else: # earnings22 metadata_file = self.dataset_root / "metadata.csv" # If metadata file doesn't exist, discover files from audio directory if not metadata_file.exists(): logger.warning(f"Metadata file not found: {metadata_file}. Discovering files from audio directory.") file_ids = [] for ext in ['*.mp3', '*.wav']: file_ids.extend([f.stem for f in self.raw_audio_source_dir.glob(ext)]) self.file_ids = file_ids else: file_metadata_df = pd.read_csv(metadata_file) # Handle different column names between earnings21 and earnings22 if 'file_id' in file_metadata_df.columns: self.file_ids = file_metadata_df['file_id'].astype(str).tolist() elif 'File ID' in file_metadata_df.columns: self.file_ids = file_metadata_df['File ID'].astype(str).tolist() else: raise ValueError(f"Neither 'file_id' nor 'File ID' column found in {metadata_file}") if self.test_mode: self.file_ids = self.file_ids[:2] logger.info(f"Loaded {len(self.file_ids)} file IDs for {self.dataset_type} subset {self.subset}.") def read_manifest(self): """Read and process all files to create manifest entries.""" return self.file_ids def process_dataset_entry(self, file_id: str) -> List[DataEntry]: """Process a single file to create full audio manifest entry.""" file_id = str(file_id) # Find audio file audio_file = None for ext in ['.mp3', '.wav']: potential_path = self.raw_audio_source_dir / f"{file_id}{ext}" if potential_path.exists(): audio_file = potential_path break if not audio_file: logger.warning(f"Audio file not found for {file_id}") return [] try: # Get audio duration from the original audio file duration = librosa.get_duration(path=str(audio_file)) # Create manifest entry entry_data = { "audio_filepath": str(audio_file), "duration": duration, "text": "", # Placeholder text "file_id": file_id, } return [DataEntry(data=entry_data)] except Exception as e: logger.error(f"Error processing audio file {file_id}: {e}") return []
# Step 2: Populate Full Text for Manifest
[docs] class CreateFullAudioManifestEarnings21(BaseParallelProcessor): """Add ground truth text from NLP token files to audio manifest. This processor reconstructs the complete transcribed text for each audio file by reading the corresponding NLP token files and combining tokens with proper spacing and punctuation. It preserves the original punctuation and capitalization from the dataset. Args: input_manifest_file (str): Path to the input manifest file. dataset_root (str): Path to the root directory of the dataset. output_manifest_file (str): Path where the output manifest will be saved. dataset_type (str): Type of dataset ("earnings21" or "earnings22"). Defaults to "earnings21". preserve_punctuation (bool): Whether to preserve punctuation marks. Defaults to True. preserve_capitalization (bool): Whether to preserve original capitalization. Defaults to True. Returns: Manifest entries with the original fields plus populated text field containing the complete reconstructed transcript for each audio file. Example: .. code-block:: yaml - _target_: sdp.processors.datasets.earnings.CreateFullAudioManifestEarnings21 input_manifest_file: ${output_dir}/01_initial_manifest.json dataset_root: /path/to/earnings21 output_manifest_file: ${output_dir}/02_manifest_with_text.json dataset_type: earnings21 preserve_punctuation: true preserve_capitalization: true """ def __init__( self, input_manifest_file: str, dataset_root: str, output_manifest_file: str, dataset_type: str = "earnings21", # "earnings21" or "earnings22" preserve_punctuation: bool = True, preserve_capitalization: bool = True, **kwargs, ): super().__init__(**kwargs) self.input_manifest_file = input_manifest_file self.dataset_root = Path(dataset_root) self.output_manifest_file = output_manifest_file self.dataset_type = dataset_type self.preserve_punctuation = preserve_punctuation self.preserve_capitalization = preserve_capitalization def _get_nlp_file_path(self, file_id: str) -> Path: """Get NLP file path based on dataset type.""" if self.dataset_type == "earnings21": return self.dataset_root / "transcripts" / "nlp_references" / f"{file_id}.nlp" else: # earnings22 # Check both possible locations for earnings22 nlp_path1 = self.dataset_root / "transcripts" / "nlp_references" / f"{file_id}.nlp" nlp_path2 = self.dataset_root / "subset10" / "nonverbatim_transcripts" / "nlp_references" / f"{file_id}.nlp" if nlp_path1.exists(): return nlp_path1 elif nlp_path2.exists(): return nlp_path2 else: return nlp_path1 # Return first path for error reporting def _load_nlp_file(self, file_id: str) -> List[Dict[str, Any]]: """Load NLP file containing tokens and metadata.""" nlp_file = self._get_nlp_file_path(file_id) if not nlp_file.exists(): logger.warning(f"NLP file not found: {nlp_file}") return [] tokens_list = [] try: with open(nlp_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='|') try: header = next(reader) except StopIteration: logger.warning(f"NLP file {nlp_file} is empty or has no header.") return [] for i, row_values in enumerate(reader): if len(row_values) == len(header): token_data = dict(zip(header, row_values)) # Parse 'tags' and 'wer_tags' fields if they are string representations of lists for key_to_parse in ['tags', 'wer_tags']: if key_to_parse in token_data: field_value = token_data[key_to_parse] if isinstance(field_value, str): try: token_data[key_to_parse] = json.loads(field_value) except json.JSONDecodeError: if field_value and field_value != "[]": logger.debug(f"Field '{key_to_parse}' in {nlp_file} non-JSON: {field_value}") tokens_list.append(token_data) else: logger.warning(f"Skipping malformed row in {nlp_file} (row {i+2})") return tokens_list except Exception as e: logger.error(f"Error processing NLP file {nlp_file}: {e}") return [] def _reconstruct_text(self, tokens: List[Dict[str, Any]]) -> str: """Reconstruct text from tokens with proper spacing and punctuation.""" if not tokens: return "" text_parts = [] for token in tokens: token_text = token.get('token', '').strip() if not token_text: continue text_parts.append(token_text) # Add punctuation if preserving and it exists if self.preserve_punctuation and token.get('punctuation'): text_parts.append(token.get('punctuation')) # Join with spaces and clean up punctuation spacing text = " ".join(text_parts) if self.preserve_punctuation: # Remove spaces before common punctuation marks text = re.sub(r'\s+([,.!?;:])', r'\1', text) if not self.preserve_capitalization: text = text.lower() # Final cleanup of multiple spaces text = re.sub(r'\s+', ' ', text).strip() return text def process_dataset_entry(self, data_entry: Dict[str, Any]) -> List[DataEntry]: """Process a single manifest entry to add full text.""" file_id = data_entry['file_id'] tokens = self._load_nlp_file(file_id) if not tokens: logger.warning(f"No NLP tokens for {file_id}, text will be empty.") data_entry['text'] = data_entry.get('text', '') else: data_entry['text'] = self._reconstruct_text(tokens) return [DataEntry(data=data_entry)]
# Step 3: Create Speaker-level Segmented Manifest (renamed from CreateFinalSegmentedManifest)
[docs] class SpeakerSegmentedManifest(BaseParallelProcessor): """Create speaker-level segments based on speaker changes in NLP files. This processor creates segments where each segment corresponds to continuous speech from a single speaker. It reads NLP token files to detect speaker changes and creates separate manifest entries for each speaker segment without timing calculations. Args: input_manifest_file (str): Path to the input manifest file. dataset_root (str): Path to the root directory of the dataset. output_manifest_file (str): Path where the output manifest will be saved. dataset_type (str): Type of dataset ("earnings21" or "earnings22"). Defaults to "earnings21". preserve_punctuation (bool): Whether to preserve punctuation marks. Defaults to True. preserve_capitalization (bool): Whether to preserve original capitalization. Defaults to True. include_speaker_info (bool): Whether to include speaker information. Defaults to True. include_tags (bool): Whether to include entity tags (earnings21 only). Defaults to False. use_speaker_metadata_csv (bool): Whether to use speaker metadata CSV for name mapping. Defaults to False. Returns: Manifest entries segmented by speaker with audio_filepath, duration (set to 0), text, file_id, segment_id, and optionally speaker and tags fields. Example: .. code-block:: yaml - _target_: sdp.processors.datasets.earnings.SpeakerSegmentedManifest input_manifest_file: ${output_dir}/02_manifest_with_text.json dataset_root: /path/to/earnings21 output_manifest_file: ${output_dir}/06_speaker_segments.json dataset_type: earnings21 include_speaker_info: true include_tags: false """ def __init__( self, input_manifest_file: str, dataset_root: str, output_manifest_file: str, dataset_type: str = "earnings21", # "earnings21" or "earnings22" preserve_punctuation: bool = True, preserve_capitalization: bool = True, include_speaker_info: bool = True, include_tags: bool = False, use_speaker_metadata_csv: bool = False, **kwargs, ): super().__init__(**kwargs) self.input_manifest_file = input_manifest_file self.dataset_root = Path(dataset_root) self.output_manifest_file = output_manifest_file self.dataset_type = dataset_type self.preserve_punctuation = preserve_punctuation self.preserve_capitalization = preserve_capitalization self.include_speaker_info = include_speaker_info self.include_tags = include_tags self.use_speaker_metadata_csv = use_speaker_metadata_csv self.speaker_name_map = {} def prepare(self): """Prepare the processor by loading speaker metadata if needed.""" # Load speaker metadata if requested (earnings21 only) if self.use_speaker_metadata_csv and self.dataset_type == "earnings21": self._load_speaker_metadata() def _load_speaker_metadata(self): """Load speaker metadata for earnings21.""" metadata_file = self.dataset_root / "speaker-metadata.csv" if not metadata_file.exists(): logger.warning(f"Speaker metadata file not found: {metadata_file}") return try: df = pd.read_csv(metadata_file) for _, row in df.iterrows(): file_id_key = str(row['file_id']) speaker_id_key = str(row['speaker_id']) if file_id_key not in self.speaker_name_map: self.speaker_name_map[file_id_key] = {} self.speaker_name_map[file_id_key][speaker_id_key] = row['speaker_name'] logger.info(f"Loaded speaker metadata from {metadata_file}") except Exception as e: logger.error(f"Error loading speaker metadata {metadata_file}: {e}") def _get_nlp_file_path(self, file_id: str) -> Path: """Get NLP file path based on dataset type.""" if self.dataset_type == "earnings21": return self.dataset_root / "transcripts" / "nlp_references" / f"{file_id}.nlp" else: # earnings22 # Check both possible locations for earnings22 nlp_path1 = self.dataset_root / "transcripts" / "nlp_references" / f"{file_id}.nlp" nlp_path2 = self.dataset_root / "subset10" / "nonverbatim_transcripts" / "nlp_references" / f"{file_id}.nlp" if nlp_path1.exists(): return nlp_path1 elif nlp_path2.exists(): return nlp_path2 else: return nlp_path1 # Return first path for error reporting def _load_nlp_file(self, file_id: str) -> List[Dict[str, Any]]: """Load NLP file containing tokens and metadata.""" nlp_file = self._get_nlp_file_path(file_id) if not nlp_file.exists(): logger.warning(f"NLP file not found: {nlp_file}") return [] tokens_list = [] try: with open(nlp_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='|') try: header = next(reader) except StopIteration: logger.warning(f"NLP file {nlp_file} is empty or has no header.") return [] for i, row_values in enumerate(reader): if len(row_values) == len(header): token_data = dict(zip(header, row_values)) # Parse 'tags' and 'wer_tags' fields if they are string representations of lists for key_to_parse in ['tags', 'wer_tags']: if key_to_parse in token_data: field_value = token_data[key_to_parse] if isinstance(field_value, str): try: token_data[key_to_parse] = json.loads(field_value) except json.JSONDecodeError: if field_value and field_value != "[]": logger.debug(f"Field '{key_to_parse}' in {nlp_file} non-JSON: {field_value}") tokens_list.append(token_data) else: logger.warning(f"Skipping malformed row in {nlp_file} (row {i+2})") return tokens_list except Exception as e: logger.error(f"Error processing NLP file {nlp_file}: {e}") return [] def _load_entity_tags(self, file_id: str) -> Dict[str, Dict[str, str]]: """Load entity tags file (earnings21 only).""" if self.dataset_type != "earnings21": return {} tags_file = self.dataset_root / "transcripts" / "tags" / f"{file_id}.tags.json" if not tags_file.exists(): return {} try: with open(tags_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Error loading tags file {tags_file}: {e}") return {} def _reconstruct_text(self, tokens: List[Dict[str, Any]]) -> str: """Reconstruct text from tokens with proper spacing and punctuation.""" if not tokens: return "" text_parts = [] for token in tokens: token_text = token.get('token', '').strip() if not token_text: continue text_parts.append(token_text) # Add punctuation if preserving and it exists if self.preserve_punctuation and token.get('punctuation'): text_parts.append(token.get('punctuation')) # Join with spaces and clean up punctuation spacing text = " ".join(text_parts) if self.preserve_punctuation: # Remove spaces before common punctuation marks text = re.sub(r'\s+([,.!?;:])', r'\1', text) if not self.preserve_capitalization: text = text.lower() # Final cleanup of multiple spaces text = re.sub(r'\s+', ' ', text).strip() return text def _create_segments(self, tokens: List[Dict[str, Any]], file_id: str) -> List[Dict[str, Any]]: """Create segments based on speaker changes.""" if not tokens: return [] segments = [] current_segment_tokens = [] current_speaker_id = tokens[0].get('speaker', 'unknown_speaker_0') if tokens else 'unknown_speaker_0' for token in tokens: token_speaker_id = token.get('speaker', current_speaker_id) # Check for speaker change if token_speaker_id != current_speaker_id and current_segment_tokens: # Finalize current segment segment_text = self._reconstruct_text(current_segment_tokens) if segment_text.strip(): segments.append({ 'tokens': current_segment_tokens, 'text': segment_text, 'speaker_id': current_speaker_id, 'file_id': file_id, }) # Start new segment current_segment_tokens = [token] current_speaker_id = token_speaker_id else: current_segment_tokens.append(token) # Handle last segment if current_segment_tokens: segment_text = self._reconstruct_text(current_segment_tokens) if segment_text.strip(): segments.append({ 'tokens': current_segment_tokens, 'text': segment_text, 'speaker_id': current_speaker_id, 'file_id': file_id, }) return segments def process_dataset_entry(self, full_audio_manifest_entry: Dict[str, Any]) -> List[DataEntry]: """Process a single full audio manifest entry to create segmented entries.""" file_id = full_audio_manifest_entry['file_id'] audio_filepath = full_audio_manifest_entry['audio_filepath'] logger.info(f"Processing file {file_id} for segmentation") # Load NLP tokens tokens = self._load_nlp_file(file_id) if not tokens: logger.warning(f"No NLP tokens for {file_id}, cannot create segments.") return [] # Load entity tags if requested entity_tags = self._load_entity_tags(file_id) if self.include_tags else {} # Create segments segments = self._create_segments(tokens, file_id) logger.info(f"Created {len(segments)} segments for file {file_id}") # Create manifest entries output_entries = [] for idx, segment_dict in enumerate(segments): segment_text = segment_dict['text'] speaker_id = segment_dict['speaker_id'] # Create manifest entry manifest_entry_data = { "audio_filepath": audio_filepath, # Point to original audio file "duration": 0, # Set to 0 instead of None to avoid TypeError in base processor "text": segment_text, "file_id": file_id, "segment_id": idx, "start_time": None, # No timing information "end_time": None, # No timing information } # Add speaker information if self.include_speaker_info: speaker_name = speaker_id # Default to ID if (self.use_speaker_metadata_csv and file_id in self.speaker_name_map and speaker_id in self.speaker_name_map[file_id]): speaker_name = self.speaker_name_map[file_id][speaker_id] manifest_entry_data["speaker"] = speaker_name # Add tags if requested if self.include_tags: segment_tags = [] segment_entities = [] # Extract basic tags from tokens for token in segment_dict.get('tokens', []): if token.get('tags') and str(token['tags']).strip(): tag_val = str(token['tags']).strip() tag_type = tag_val.split(':', 1)[1].strip() if ':' in tag_val else tag_val if tag_type and tag_type not in segment_tags: segment_tags.append(tag_type) manifest_entry_data["tags"] = segment_tags manifest_entry_data["entities"] = segment_entities output_entries.append(DataEntry(data=manifest_entry_data)) logger.info(f"Successfully processed {len(output_entries)} segments for file {file_id}") return output_entries
# Step 5: Create Sentence-level Segmented Manifest based on CTM files
[docs] class CreateSentenceSegmentedManifest(BaseParallelProcessor): """Create sentence-level segments from word-level CTM alignment files. This processor reads CTM (Conversation Time Mark) files generated by forced alignment and creates sentence-level segments based on punctuation patterns. It intelligently segments on sentence-ending punctuation while excluding abbreviations and numbers. Args: input_manifest_file (str): Path to the input manifest file. ctm_dir (str): Path to the directory containing CTM files with word-level alignments. output_manifest_file (str): Path where the output manifest will be saved. Returns: Manifest entries with sentence-level segments containing audio_filepath, duration (calculated from CTM), text, file_id, segment_id, offset, end_time, and alignment fields with word-level timing information. Example: .. code-block:: yaml - _target_: sdp.processors.datasets.earnings.CreateSentenceSegmentedManifest input_manifest_file: ${output_dir}/04_aligned_manifest.json ctm_dir: ${output_dir}/forced_alignment_output/ctm/words output_manifest_file: ${output_dir}/05_sentence_segments.json """ def __init__( self, input_manifest_file: str, ctm_dir: str, output_manifest_file: str, **kwargs, ): super().__init__(**kwargs) self.input_manifest_file = input_manifest_file self.ctm_dir = Path(ctm_dir) self.output_manifest_file = output_manifest_file def _parse_ctm_file(self, ctm_path: str) -> List[Dict[str, Any]]: """Parse CTM file to extract word alignments.""" alignments = [] try: with open(ctm_path, 'r') as f: for line in f: parts = line.strip().split() if len(parts) >= 5: # CTM format: utt_id channel start_time duration word utt_id = parts[0] channel = parts[1] start_time = float(parts[2]) duration = float(parts[3]) word = parts[4] end_time = start_time + duration alignments.append({ 'word': word, 'start': round(start_time, 3), 'end': round(end_time, 3), 'utt_id': utt_id, 'channel': channel }) except Exception as e: logger.error(f"Error parsing CTM file {ctm_path}: {e}") return alignments def _is_sentence_end(self, word: str, next_word: str = None) -> bool: """ Check if a word marks the end of a sentence. Rules: - Word ends with !, ?, or . - Exclude numbers like 42.12 (. within numbers) - Exclude common abbreviations like Ms., Mr., Dr., etc. - Next word should start with capital letter (if available) """ if not word: return False # Check if word ends with sentence-ending punctuation if not word.endswith(('.', '!', '?')): return False # Handle exclamation and question marks - these are always sentence endings if word.endswith(('!', '?')): return True # For words ending with '.', do additional checks if word.endswith('.'): # Remove the final '.' and check if what remains is a number word_without_dot = word[:-1] try: # If it's a pure number, it's likely part of a decimal float(word_without_dot) return False except ValueError: # Not a number, continue with other checks pass # Check for common abbreviations (case-insensitive) common_abbreviations = { 'mr', 'mrs', 'ms', 'dr', 'prof', 'sr', 'jr', 'vs', 'etc', 'inc', 'corp', 'ltd', 'co', 'st', 'ave', 'blvd', 'rd', 'ln', 'ct', 'pl', 'sq', 'ft', 'in', 'cm', 'mm', 'kg', 'lb', 'oz', 'pt', 'qt', 'gal', 'mph', 'rpm', 'vol', 'no', 'pg', 'pp', 'ch', 'sec', 'min', 'hr', 'hrs', 'am', 'pm', 'est', 'pst', 'cst', 'mst', 'utc', 'gmt', 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun', 'dept', 'div', 'mgr', 'dir', 'pres', 'vp', 'ceo', 'cfo', 'cto', 'coo', 'evp', 'svp', 'avp' } if word_without_dot.lower() in common_abbreviations: return False # If we have a next word, check if it starts with capital letter if next_word: return next_word[0].isupper() # If no next word, assume it's sentence end return True def _create_sentence_segments(self, alignments: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Create sentence-level segments from word alignments.""" if not alignments: return [] segments = [] current_segment_words = [] for i, alignment in enumerate(alignments): current_segment_words.append(alignment) # Check if this word ends a sentence next_word = alignments[i + 1]['word'] if i + 1 < len(alignments) else None if self._is_sentence_end(alignment['word'], next_word): # Create segment from current words if current_segment_words: segment_text = ' '.join([w['word'] for w in current_segment_words]) segment_start = current_segment_words[0]['start'] segment_end = current_segment_words[-1]['end'] segment_duration = round(segment_end - segment_start, 3) segments.append({ 'text': segment_text, 'start_time': segment_start, 'end_time': segment_end, 'duration': segment_duration, 'alignment': current_segment_words.copy() }) current_segment_words = [] # Handle any remaining words if current_segment_words: segment_text = ' '.join([w['word'] for w in current_segment_words]) segment_start = current_segment_words[0]['start'] segment_end = current_segment_words[-1]['end'] segment_duration = round(segment_end - segment_start, 3) segments.append({ 'text': segment_text, 'start_time': segment_start, 'end_time': segment_end, 'duration': segment_duration, 'alignment': current_segment_words.copy() }) return segments def process_dataset_entry(self, aligned_manifest_entry: Dict[str, Any]) -> List[DataEntry]: """Process a single aligned manifest entry to create sentence-level segments.""" file_id = aligned_manifest_entry['file_id'] audio_filepath = aligned_manifest_entry['audio_filepath'] # Find corresponding CTM file ctm_file = self.ctm_dir / f"{file_id}.ctm" if not ctm_file.exists(): logger.warning(f"CTM file not found: {ctm_file}") return [] # Parse CTM file alignments = self._parse_ctm_file(str(ctm_file)) if not alignments: logger.warning(f"No alignments found in CTM file: {ctm_file}") return [] # Create sentence segments segments = self._create_sentence_segments(alignments) logger.info(f"Created {len(segments)} sentence segments for file {file_id}") # Create manifest entries output_entries = [] for idx, segment in enumerate(segments): manifest_entry_data = { "audio_filepath": audio_filepath, "duration": segment['duration'], "text": segment['text'], "file_id": file_id, "segment_id": idx, "offset": segment['start_time'], # Use offset instead of start_time "end_time": segment['end_time'], "alignment": segment['alignment'] } output_entries.append(DataEntry(data=manifest_entry_data)) logger.info(f"Successfully processed {len(output_entries)} sentence segments for file {file_id}") return output_entries
[docs] class NeMoForcedAligner(BaseProcessor): """Apply NeMo Forced Aligner to generate word-level timing alignments. This processor uses NeMo's forced alignment capabilities to generate precise word-level timing information by aligning ground truth text with audio files. It produces CTM files containing word-level timestamps and updates the manifest with alignment information. Args: input_manifest_file (str): Path to the input manifest file. output_manifest_file (str): Path where the output manifest will be saved. output_dir (str): Directory where CTM files and other outputs will be saved. pretrained_name (str): Name or path of the NeMo ASR model to use for alignment. device (str): Device for computation ("cuda" or "cpu"). Defaults to "cuda". nemo_path (str): Optional path to NeMo installation directory. Returns: Manifest entries with added alignment field containing word-level timing information and updated duration based on alignment results. Example: .. code-block:: yaml - _target_: sdp.processors.datasets.earnings.NeMoForcedAligner input_manifest_file: ${output_dir}/03_cleaned_manifest.json output_manifest_file: ${output_dir}/04_aligned_manifest.json output_dir: ${output_dir}/forced_alignment_output pretrained_name: nvidia/parakeet-tdt_ctc-1.1b device: cuda batch_size: 1 """ def __init__( self, input_manifest_file: str, output_manifest_file: str, output_dir: str, pretrained_name: str = "/disk7/projects/models/small-parakeet/oci-N-1_G-8_config-parakeet-wav2vec-600m-am-fl-mc-mm-yt-yo_En-d0.5-rnnt_ctc-quality_LR-1e-4_wup-0_ts-2500.nemo", device: str = "cuda", nemo_path: str = None, **kwargs, ): super().__init__(output_manifest_file=output_manifest_file, input_manifest_file=input_manifest_file, **kwargs) self.output_dir = Path(output_dir) self.pretrained_name = pretrained_name self.device = device self.nemo_path = nemo_path # Create output directory self.output_dir.mkdir(parents=True, exist_ok=True) def process(self): """Process the manifest using NeMo Forced Aligner script.""" import subprocess import json try: # Find NeMo forced aligner script if self.nemo_path: align_script = Path(self.nemo_path) / "tools" / "nemo_forced_aligner" / "align.py" else: # Try to find NeMo installation try: import nemo nemo_dir = Path(nemo.__file__).parent.parent align_script = nemo_dir / "tools" / "nemo_forced_aligner" / "align.py" except ImportError: raise ImportError("NeMo not found. Please install NeMo or specify nemo_path.") if not align_script.exists(): raise FileNotFoundError(f"NeMo Forced Aligner script not found at {align_script}") logger.info(f"Using NeMo Forced Aligner script at: {align_script}") # Prepare manifest for forced alignment input_manifest = [] with open(self.input_manifest_file, 'r') as f: for line in f: if line.strip(): input_manifest.append(json.loads(line)) # Create temporary manifest with absolute paths temp_manifest_path = self.output_dir / "temp_manifest_for_alignment.json" with open(temp_manifest_path, 'w') as f: for entry in input_manifest: if entry.get('text', '').strip(): # Only process entries with text # Ensure absolute path audio_path = Path(entry['audio_filepath']) if not audio_path.is_absolute(): audio_path = audio_path.resolve() alignment_entry = { "audio_filepath": str(audio_path), "text": entry['text'].strip() } f.write(json.dumps(alignment_entry) + '\n') # Run NeMo Forced Aligner # Determine if we should use pretrained_name or model_path if self.pretrained_name.endswith('.nemo'): # Local model file path - use model_path model_param = f"model_path={self.pretrained_name}" else: # Pretrained model name - use pretrained_name model_param = f"pretrained_name={self.pretrained_name}" cmd = [ "python", str(align_script), model_param, f"manifest_filepath={temp_manifest_path}", f"output_dir={self.output_dir}", f"transcribe_device={self.device}", f"viterbi_device={self.device}", "batch_size=1", 'save_output_file_formats=["ctm"]' ] logger.info(f"Running NeMo Forced Aligner: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info("NeMo Forced Aligner completed successfully") # Process the output and merge with original manifest output_manifest_path = self.output_dir / f"{temp_manifest_path.stem}_with_output_file_paths.json" if output_manifest_path.exists(): # Load alignment results alignment_results = [] with open(output_manifest_path, 'r') as f: for line in f: if line.strip(): alignment_results.append(json.loads(line)) # Create mapping from audio filepath to alignment results alignment_map = {} for result in alignment_results: audio_path = result['audio_filepath'] alignment_map[audio_path] = result # Merge alignments with original manifest output_entries = [] for entry in input_manifest: output_entry = entry.copy() if entry.get('text', '').strip(): # Find corresponding alignment audio_path = str(Path(entry['audio_filepath']).resolve()) if audio_path in alignment_map: alignment_result = alignment_map[audio_path] # Load word-level CTM file if available if 'word_level_ctm_filepath' in alignment_result: ctm_path = alignment_result['word_level_ctm_filepath'] word_alignments = self._parse_ctm_file(ctm_path) output_entry['alignment'] = word_alignments # Calculate duration from alignments if word_alignments: output_entry['duration'] = round( word_alignments[-1]['end'] - word_alignments[0]['start'], 3 ) else: output_entry['duration'] = 0.0 else: output_entry['alignment'] = [] output_entry['duration'] = 0.0 else: output_entry['alignment'] = [] output_entry['duration'] = 0.0 else: output_entry['alignment'] = [] output_entry['duration'] = 0.0 output_entries.append(output_entry) # Save final manifest with open(self.output_manifest_file, 'w') as f: for entry in output_entries: f.write(json.dumps(entry) + '\n') logger.info(f"Saved aligned manifest to {self.output_manifest_file}") # Clean up temporary files temp_manifest_path.unlink(missing_ok=True) else: logger.error(f"Expected output file not found: {output_manifest_path}") raise FileNotFoundError(f"NeMo Forced Aligner did not produce expected output") except subprocess.CalledProcessError as e: logger.error(f"NeMo Forced Aligner failed: {e}") logger.error(f"stdout: {e.stdout}") logger.error(f"stderr: {e.stderr}") raise except Exception as e: logger.error(f"Error in forced alignment: {e}") raise def _parse_ctm_file(self, ctm_path: str) -> List[Dict[str, Any]]: """Parse CTM file to extract word alignments.""" alignments = [] try: with open(ctm_path, 'r') as f: for line in f: parts = line.strip().split() if len(parts) >= 5: # CTM format: utt_id channel start_time duration word start_time = float(parts[2]) duration = float(parts[3]) word = parts[4] end_time = start_time + duration alignments.append({ 'word': word, 'start': round(start_time, 3), 'end': round(end_time, 3) }) except Exception as e: logger.error(f"Error parsing CTM file {ctm_path}: {e}") return alignments