Source code for sdp.processors.datasets.masc.create_initial_manifest

# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import logging
from pathlib import Path
import pandas as pd
from sox import Transformer

from sdp.processors.base_processor import BaseParallelProcessor, DataEntry
from sdp.utils.common import extract_archive

[docs] class CreateInitialManifestMASC(BaseParallelProcessor): """ Processor for creating initial manifest for Massive Arabic Speech Corpus (MASC). \n Dataset link: https://ieee-dataport.org/open-access/masc-massive-arabic-speech-corpus. Prior to calling processor download the tarred dataset and store it under `raw_dataset_dir/masc.tar.gz`. Creates manifest from samples in . `dataset_dir/subsets/data_split.csv`. All meta information is kept. Args: raw_data_dir (str): The root directory of the dataset. extract_archive_dir (str): Directory where the extracted data will be saved. resampled_audios_dir (str): Directory where the resampled audio will be saved. data_split (str): Dataset split type. already_extracted (bool): If True, we will not try to extract the raw data. Defaults to False. target_samplerate (int): Sample rate (Hz) to use for resampling. Defaults to 16000. target_nchannels (int): Number of channels to create during resampling process. Defaults to 1. output_manifest_sample_id_key (str): The field name to store sample ID. Defaults to "sample_id". output_manifest_vtt_filapath_key (str): The field name to store vtt file path. Defaults to "vtt_filepath". output_manifest_audio_filapath_key (str): The field name to store audio file path. Defaults to "audio_filepath". verbose (bool): Set to True for more detailed logging. **kwargs: Additional keyword arguments to be passed to the base class `BaseParallelProcessor`. Returns: This processor generates an initial manifest file with the following fields:: { "sample_id": <sample ID> "audio_filepath": <path to the audio file>, "vtt_filepath": <path to the vtt file>, "category": <video category>, "video_duration": <video duration>, "channel_id": <video channel ID>, "country": <video country>, "dialect": <speaker dialect>, "gender": <speaker gender>, "transcript_duration": <transcript duration>, } """ def __init__( self, raw_data_dir: str, data_split: str, extract_archive_dir: str, resampled_audios_dir: str, already_extracted: bool = False, target_samplerate: int = 16000, target_nchannels: int = 1, output_manifest_sample_id_key: str = "sample_id", output_manifest_vtt_filapath_key: str = "vtt_filepath", output_manifest_audio_filapath_key: str = "audio_filepath", verbose: bool = False, **kwargs, ): super().__init__(**kwargs) self.raw_dataset_dir = Path(raw_data_dir) self.data_split = data_split # in original dataset there are no train, dev and test splits. These are added to support end-to-end tests. if self.data_split == "train": self.data_split = "clean_train" if self.data_split == "dev": self.data_split = "clean_dev" if self.data_split == "test": self.data_split = "clean_test" self.extract_archive_dir = extract_archive_dir self.resampled_audios_dir = Path(resampled_audios_dir) self.already_extracted = already_extracted self.target_samplerate = target_samplerate self.target_nchannels = target_nchannels self.output_manifest_sample_id_key = output_manifest_sample_id_key self.output_manifest_vtt_filapath_key = output_manifest_vtt_filapath_key self.output_manifest_audio_filapath_key = output_manifest_audio_filapath_key self.verbose = verbose data_split_values = ["train", "dev", "test", "clean_train", "clean_dev", "clean_test", "noisy_train", "noisy_dev", "noisy_test"] if self.data_split not in data_split_values: raise ValueError(f'Data split value must be from {data_split_values}. "{self.data_split}" was given.') def prepare(self): # Extracting data (unless already done). if not self.already_extracted: tar_gz_filepath = Path(str(self.raw_dataset_dir)) / "masc.tar.gz" if not tar_gz_filepath.exists: raise RuntimeError( f"Did not find any file matching {tar_gz_filepath}. " "For MASC dataset we cannot automatically download the data, so " "make sure to get the data from https://ieee-dataport.org/open-access/masc-massive-arabic-speech-corpus" "and put it in the 'raw_dataset_dir' folder." ) self.dataset_dir = Path(extract_archive(tar_gz_filepath, self.extract_archive_dir)) else: logging.info("Skipping dataset untarring...") self.dataset_dir = Path(self.extract_archive_dir) / "masc" self.vtts_dir = self.dataset_dir / "subtitles" self.audios_dir = self.dataset_dir / "audios" if self.data_split == "clean_train" or self.data_split == "noisy_train": self.csv_filepath = self.dataset_dir / "subsets" / f"{self.data_split}.csv" else: self.csv_filepath = self.dataset_dir / "subsets" / f"{self.data_split}_meta.csv" if not self.csv_filepath.exists(): raise FileNotFoundError(f"{self.csv_filepath} not found.") if not self.vtts_dir.exists(): raise FileNotFoundError(f"{self.vtts_dir} not found.") if not self.audios_dir.exists(): raise FileNotFoundError(f"{self.audios_dir} not found.") os.makedirs(self.resampled_audios_dir, exist_ok=True) def read_manifest(self): csv = pd.read_csv(self.csv_filepath) return [row.to_dict() for _, row in csv.iterrows()] def process_dataset_entry(self, sample_data): sample_id = sample_data["video_id"] source_audio_filepath = self.audios_dir / f"{sample_id}.wav" target_audio_filepath = self.resampled_audios_dir / f"{sample_id}.wav" vtt_filepath = self.vtts_dir / f"{sample_id}.ar.vtt" # if source audio or vtt file do not exist skip if not (os.path.exists(source_audio_filepath) and os.path.exists(vtt_filepath)): return [] # if target audio exists skip resampling if not os.path.exists(target_audio_filepath): tfm = Transformer() tfm.rate(samplerate=self.target_samplerate) tfm.channels(n_channels=self.target_nchannels) tfm.build(input_filepath=source_audio_filepath, output_filepath=target_audio_filepath) elif self.verbose: logging.info(f"{target_audio_filepath} already exists. Skipping resampling") sample_data.pop("video_id") sample_data.update( { self.output_manifest_sample_id_key: sample_id, self.output_manifest_vtt_filapath_key: str(vtt_filepath), self.output_manifest_audio_filapath_key: str(target_audio_filepath), } ) return [DataEntry(data=sample_data)]