# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Union
import pandas as pd
from tqdm import tqdm
from sdp.logging import logger
from sdp.processors.base_processor import (
BaseParallelProcessor,
BaseProcessor,
DataEntry,
LegacyParallelProcessor,
)
from sdp.utils.common import load_manifest
class Subprocess(BaseProcessor):
"""
Processor for handling subprocess execution with additional features for managing input and output manifests.
Args:
cmd (str): The command to be executed as a subprocess.
input_manifest_arg (str, optional): The argument specifying the input manifest. Defaults to an empty string.
output_manifest_arg (str, optional): The argument specifying the output manifest. Defaults to an empty string.
arg_separator (str, optional): The separator used between argument and value. Defaults to "=".
**kwargs: Additional keyword arguments to be passed to the base class.
Example:
_target_: sdp.processors.datasets.commoncrawl.Subprocess
output_manifest_file: /workspace/manifest.json
input_manifest_arg: "--manifest"
output_manifest_arg: "--output_filename"
arg_separator: "="
cmd: "python /workspace/NeMo-text-processing/nemo_text_processing/text_normalization/normalize_with_audio.py \
--language=en --n_jobs=-1 --batch_size=600 --manifest_text_field=text --cache_dir=${workspace_dir}/cache --overwrite_cache \
--whitelist=/workspace/NeMo-text-processing/nemo_text_processing/text_normalization/en/data/whitelist/asr_with_pc.tsv"
"""
def __init__(
self,
cmd: str,
input_manifest_arg: str = "",
output_manifest_arg: str = "",
arg_separator: str = "=",
**kwargs,
):
super().__init__(**kwargs)
self.input_manifest_arg = input_manifest_arg
self.output_manifest_arg = output_manifest_arg
self.arg_separator = arg_separator
self.cmd = cmd
def process(self):
os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True)
if self.cmd.find(self.input_manifest_file) != -1 or self.cmd.find(self.output_manifest_file) != -1:
logger.error(
"input_manifest_file "
+ self.input_manifest_file
+ " and output_manifest_file "
+ self.output_manifest_file
+ " should be exluded from cmd line!"
)
raise ValueError
process_args = [x for x in self.cmd.split(" ") if x]
if self.arg_separator == " ":
if self.input_manifest_arg:
process_args.extend([self.input_manifest_arg, self.input_manifest_file])
if self.output_manifest_arg:
process_args.extend([self.output_manifest_arg, self.output_manifest_file])
else:
if self.input_manifest_arg:
process_args.extend([self.input_manifest_arg + self.arg_separator + self.input_manifest_file])
if self.output_manifest_arg:
process_args.extend([self.output_manifest_arg + self.arg_separator + self.output_manifest_file])
subprocess.run(" ".join(process_args), shell=True)
[docs]
class CombineSources(BaseParallelProcessor):
"""Can be used to create a single field from two alternative sources.
E.g.::
_target_: sdp.processors.CombineSources
sources:
- field: text_pc
origin_label: original
- field: text_pc_pred
origin_label: synthetic
- field: text
origin_label: no_pc
target: text
will populate the ``text`` field with data from ``text_pc`` field if it's
present and not equal to ``n/a`` (can be customized). If ``text_pc`` is
not available, it will populate ``text`` from ``text_pc_pred`` field,
following the same rules. If both are not available, it will fall back to
the ``text`` field itself. In all cases it will specify which source was
used in the ``text_origin`` field by using the label from the
``origin_label`` field.. If non of the sources is available,
it will populate both the target and the origin fields with ``n/a``.
Args:
sources (list[dict]): list of the sources to use in order of preference.
Each element in the list should be in the following format::
{
field: <which field to take the data from>
origin_label: <what to write in the "<target>_origin"
}
target (str): target field that we are populating.
na_indicator (str): if any source field has text equal to the
``na_indicator`` it will be considered as not available. If none
of the sources are present, this will also be used as the value
for the target and origin fields. Defaults to ``n/a``.
Returns:
The same data as in the input manifest enhanced with the following fields::
<target>: <populated with data from either <source1> or <source2> \
or with <na_indicator> if none are available>
<target>_origin: <label that marks where the data came from>
"""
def __init__(
self,
sources: List[Dict[str, str]],
target: str,
na_indicator: str = "n/a",
**kwargs,
):
super().__init__(**kwargs)
self.sources = sources
self.target = target
self.na_indicator = na_indicator
def process_dataset_entry(self, data_entry: Dict):
for source_dict in self.sources:
if data_entry.get(source_dict["field"], self.na_indicator) != self.na_indicator:
data_entry[self.target] = data_entry[source_dict["field"]]
data_entry[f"{self.target}_origin"] = source_dict["origin_label"]
break # breaking out on the first present label
else: # going here if no break was triggered
data_entry[self.target] = self.na_indicator
data_entry[f"{self.target}_origin"] = self.na_indicator
return [DataEntry(data=data_entry)]
[docs]
class AddConstantFields(BaseParallelProcessor):
"""
This processor adds constant fields to all manifest entries using Dask BaseParallelProcessor.
It is useful when you want to attach fixed information (e.g., a language label or metadata)
to each entry for downstream tasks such as language identification model training.
Args:
fields (dict): A dictionary containing key-value pairs of fields to add to each manifest entry.
For example::
{
"label": "en",
"metadata": "mcv-11.0-2022-09-21"
}
Returns:
dict: The same data as in the input manifest with the added constant fields as specified in
the ``fields`` dictionary.
Example:
.. code-block:: yaml
- _target_: sdp.processors.modify_manifest.common.AddConstantFields
input_manifest_file: ${workspace_dir}/input_manifest.json
output_manifest_file: ${workspace_dir}/output_manifest.json
fields:
label: "en"
metadata: "mcv-11.0-2022-09-21"
"""
def __init__(self, fields: Dict, **kwargs):
super().__init__(**kwargs)
self.fields = fields
def process_dataset_entry(self, data_entry: Dict):
data_entry.update(self.fields)
return [DataEntry(data=data_entry)]
[docs]
class DuplicateFields(BaseParallelProcessor):
"""This processor duplicates fields in all manifest entries.
It is useful for when you want to do downstream processing of a variant
of the entry. E.g. make a copy of "text" called "text_no_pc", and
remove punctuation from "text_no_pc" in downstream processors.
Args:
duplicate_fields (dict): dictionary where keys are the original
fields to be copied and their values are the new names of
the duplicate fields.
Returns:
The same data as in the input manifest with duplicated fields
as specified in the ``duplicate_fields`` input dictionary.
Example:
.. code-block:: yaml
- _target_: sdp.processors.modify_manifest.common.DuplicateFields
input_manifest_file: ${workspace_dir}/test1.json
output_manifest_file: ${workspace_dir}/test2.json
duplicate_fields: {"text":"answer"}
"""
def __init__(
self,
duplicate_fields: Dict,
**kwargs,
):
super().__init__(**kwargs)
self.duplicate_fields = duplicate_fields
def process_dataset_entry(self, data_entry: Dict):
for field_src, field_tgt in self.duplicate_fields.items():
if not field_src in data_entry:
raise ValueError(f"Expected field {field_src} in data_entry {data_entry} but there isn't one.")
data_entry[field_tgt] = data_entry[field_src]
return [DataEntry(data=data_entry)]
[docs]
class RenameFields(BaseParallelProcessor):
"""This processor renames fields in all manifest entries.
Args:
rename_fields: dictionary where keys are the fields to be
renamed and their values are the new names of the fields.
Returns:
The same data as in the input manifest with renamed fields
as specified in the ``rename_fields`` input dictionary.
"""
def __init__(
self,
rename_fields: Dict,
**kwargs,
):
super().__init__(**kwargs)
self.rename_fields = rename_fields
def process_dataset_entry(self, data_entry: Dict):
for field_src, field_tgt in self.rename_fields.items():
if not field_src in data_entry:
raise ValueError(f"Expected field {field_src} in data_entry {data_entry} but there isn't one.")
data_entry[field_tgt] = data_entry[field_src]
del data_entry[field_src]
return [DataEntry(data=data_entry)]
[docs]
class SplitOnFixedDuration(BaseParallelProcessor):
"""This processor splits audio into a fixed length segments.
It does not actually create different audio files, but simply adds
corresponding ``offset`` and ``duration`` fields. These fields can
be automatically processed by NeMo to split audio on the fly during
training.
Args:
segment_duration (float): fixed desired duration of each segment.
drop_last (bool): whether to drop the last segment if total duration is
not divisible by desired segment duration. If False, the last
segment will be of a different length which is ``< segment_duration``.
Defaults to True.
drop_text (bool): whether to drop text from entries as it is most likely
inaccurate after the split on duration. Defaults to True.
Returns:
The same data as in the input manifest but all audio that's longer
than the ``segment_duration`` will be duplicated multiple times with
additional ``offset`` and ``duration`` fields. If ``drop_text=True``
will also drop ``text`` field from all entries.
"""
def __init__(
self,
segment_duration: float,
drop_last: bool = True,
drop_text: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.segment_duration = segment_duration
self.drop_last = drop_last
self.drop_text = drop_text
def process_dataset_entry(self, data_entry: Dict):
total_duration = data_entry["duration"]
total_segments = int(total_duration // self.segment_duration)
output = [None] * total_segments
for segment_idx in range(total_segments):
modified_entry = data_entry.copy() # shallow copy should be good enough
modified_entry["duration"] = self.segment_duration
modified_entry["offset"] = segment_idx * self.segment_duration
if self.drop_text:
modified_entry.pop("text", None)
output[segment_idx] = DataEntry(data=modified_entry)
remainder = total_duration - self.segment_duration * total_segments
if not self.drop_last and remainder > 0:
modified_entry = data_entry.copy()
modified_entry["duration"] = remainder
modified_entry["offset"] = self.segment_duration * total_segments
if self.drop_text:
modified_entry.pop("text", None)
output.append(DataEntry(data=modified_entry))
return output
[docs]
class ChangeToRelativePath(BaseParallelProcessor):
"""This processor changes the audio filepaths to be relative.
Args:
base_dir: typically a folder where manifest file is going to be
stored. All passes will be relative to that folder.
Returns:
The same data as in the input manifest with ``audio_filepath`` key
changed to contain relative path to the ``base_dir``.
"""
def __init__(
self,
base_dir: str,
**kwargs,
):
super().__init__(**kwargs)
self.base_dir = base_dir
def process_dataset_entry(self, data_entry: Dict):
data_entry["audio_filepath"] = os.path.relpath(data_entry["audio_filepath"], self.base_dir)
return [DataEntry(data=data_entry)]
[docs]
class SortManifest(BaseProcessor):
"""Processor which will sort the manifest by some specified attribute.
Args:
attribute_sort_by (str): the attribute by which the manifest will be sorted.
descending (bool): if set to False, attribute will be in ascending order.
If True, attribute will be in descending order. Defaults to True.
Returns:
The same entries as in the input manifest, but sorted based
on the provided parameters.
"""
def __init__(
self,
attribute_sort_by: str,
descending: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.attribute_sort_by = attribute_sort_by
self.descending = descending
def process(self):
with open(self.input_manifest_file, "rt", encoding="utf8") as fin:
dataset_entries = [json.loads(line) for line in fin.readlines()]
dataset_entries = sorted(dataset_entries, key=lambda x: x[self.attribute_sort_by], reverse=self.descending)
with open(self.output_manifest_file, "wt", encoding="utf8") as fout:
for line in dataset_entries:
fout.write(json.dumps(line, ensure_ascii=False) + "\n")
[docs]
class KeepOnlySpecifiedFields(BaseProcessor):
"""Saves a copy of a manifest but only with a subset of the fields.
Typically will be the final processor to save only relevant fields
in the desired location.
Args:
fields_to_keep (list[str]): list of the fields in the input manifest
that we want to retain. The output file will only contain these
fields.
Returns:
The same data as in input manifest, but re-saved in the new location
with only ``fields_to_keep`` fields retained.
"""
def __init__(self, fields_to_keep: List[str], **kwargs):
super().__init__(**kwargs)
self.fields_to_keep = fields_to_keep
def process(self):
with open(self.input_manifest_file, "rt", encoding="utf8") as fin, open(
self.output_manifest_file, "wt", encoding="utf8"
) as fout:
for line in tqdm(fin):
line = json.loads(line)
new_line = {field: line[field] for field in self.fields_to_keep}
fout.write(json.dumps(new_line, ensure_ascii=False) + "\n")
[docs]
class ApplyInnerJoin(BaseProcessor):
"""Applies inner join to two manifests, i.e. creates a manifest from records that have matching values in both manifests.
For more information, please refer to the Pandas merge function documentation:
https://pandas.pydata.org/docs/reference/api/pandas.merge.html#pandas.merge
Args:
column_id (Union[str, List[str], None]): Field names to join on. These must be found in both manifests.
If `column_id` is None then this defaults to the intersection of the columns in both manifests.
Defaults to None.
left_manifest_file (Optional[str]): path to the left manifest. Defaults to `input_manifest_file`.
right_manifest_file (str): path to the right manifest.
Returns:
Inner join of two manifests.
"""
def __init__(
self,
right_manifest_file: str,
left_manifest_file: Optional[str] = None,
column_id: Union[str, List[str], None] = None,
**kwargs,
):
super().__init__(**kwargs)
self.left_manifest_file = left_manifest_file if left_manifest_file != None else self.input_manifest_file
self.right_manifest_file = right_manifest_file
self.column_id = column_id
def process(self):
m1 = pd.DataFrame.from_records(load_manifest(Path(self.left_manifest_file)))
m2 = pd.DataFrame.from_records(load_manifest(Path(self.right_manifest_file)))
m3 = pd.merge(m1, m2, on=self.column_id, how="inner")
with open(self.output_manifest_file, "wt", encoding="utf8") as fout:
for _, line in m3.iterrows():
fout.write(json.dumps(dict(line), ensure_ascii=False) + "\n")
[docs]
class DropSpecifiedFields(BaseProcessor):
"""
A processor that removes specified fields from each data entry in the manifest.
This processor reads an input manifest line by line, drops the fields listed in `fields_to_drop`
from each JSON entry, and writes the cleaned entries to the output manifest.
Args:
fields_to_drop (List[str]): A list of keys to remove from each manifest entry.
**kwargs: Additional arguments passed to the BaseProcessor (e.g., input/output manifest paths).
Returns:
A line-delimited JSON manifest, where each entry is the same as the input,
but with the specified fields removed.
"""
def __init__(self, fields_to_drop: List[str], **kwargs):
super().__init__(**kwargs)
self.fields_to_drop = fields_to_drop
def process(self):
# Open the input and output manifest files
with open(self.input_manifest_file, "rt", encoding="utf8") as fin, open(
self.output_manifest_file, "wt", encoding="utf8"
) as fout:
# Iterate over each line (entry) in the input manifest
for line in tqdm(fin):
# Parse JSON entry from the current line
entry = json.loads(line)
# Create a new entry by excluding the specified fields
new_line = {field: entry[field] for field in entry if field not in self.fields_to_drop}
# Write the cleaned entry to the output manifest
fout.write(json.dumps(new_line, ensure_ascii=False) + "\n")