# Copyright (c) 2018 NVIDIA Corporation
from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals
import math
import os
import h5py
import numpy as np
import resampy as rs
import scipy.io.wavfile as wave
BACKENDS = []
try:
import python_speech_features as psf
BACKENDS.append('psf')
except ImportError:
pass
try:
import librosa
BACKENDS.append('librosa')
except ImportError:
pass
WINDOWS_FNS = {"hanning": np.hanning, "hamming": np.hamming, "none": None}
[docs]class PreprocessOnTheFlyException(Exception):
""" Exception that is thrown to not load preprocessed features from disk;
recompute on-the-fly.
This saves disk space (if you're experimenting with data input
formats/preprocessing) but can be slower.
The slowdown is especially apparent for small, fast NNs."""
pass
[docs]class RegenerateCacheException(Exception):
""" Exception that is thrown to force recomputation of (preprocessed) features
"""
pass
[docs]def load_features(path, data_format):
""" Function to load (preprocessed) features from disk
Args:
:param path: the path where the features are stored
:param data_format: the format in which the features are stored
:return: tuple of (features, duration)
"""
if data_format == 'hdf5':
with h5py.File(path + '.hdf5', "r") as hf5_file:
features = hf5_file["features"][:]
duration = hf5_file["features"].attrs["duration"]
elif data_format == 'npy':
features, duration = np.load(path + '.npy')
elif data_format == 'npz':
data = np.load(path + '.npz')
features = data['features']
duration = data['duration']
else:
raise ValueError("Invalid data format for caching: ", data_format, "!\n",
"options: hdf5, npy, npz")
return features, duration
[docs]def save_features(features, duration, path, data_format, verbose=False):
""" Function to save (preprocessed) features to disk
Args:
:param features: features
:param duration: metadata: duration in seconds of audio file
:param path: path to store the data
:param data_format: format to store the data in ('npy',
'npz',
'hdf5')
"""
if verbose: print("Saving to: ", path)
if data_format == 'hdf5':
with h5py.File(path + '.hdf5', "w") as hf5_file:
dset = hf5_file.create_dataset("features", data=features)
dset.attrs["duration"] = duration
elif data_format == 'npy':
np.save(path + '.npy', [features, duration])
elif data_format == 'npz':
np.savez(path + '.npz', features=features, duration=duration)
else:
raise ValueError("Invalid data format for caching: ", data_format, "!\n",
"options: hdf5, npy, npz")
[docs]def get_preprocessed_data_path(filename, params):
""" Function to convert the audio path into the path to the preprocessed
version of this audio
Args:
:param filename: WAVE filename
:param params: dictionary containing preprocessing parameters
:return: path to new file (without extension). The path is
generated from the relevant preprocessing parameters.
"""
if isinstance(filename, bytes): # convert binary string to normal string
filename = filename.decode('ascii')
filename = os.path.realpath(filename) # decode symbolic links
## filter relevant parameters # TODO is there a cleaner way of doing this?
# print(list(params.keys()))
ignored_params = ["cache_features", "cache_format", "cache_regenerate",
"vocab_file", "dataset_files", "shuffle", "batch_size",
"max_duration",
"mode", "interactive", "autoregressive", "char2idx",
"tgt_vocab_size", "idx2char", "dtype"]
def fix_kv(text):
""" Helper function to shorten length of filenames to get around
filesystem path length limitations"""
text = str(text)
text = text.replace("speed_perturbation_ratio", "sp") \
.replace("noise_level_min", "nlmin", ) \
.replace("noise_level_max", "nlmax") \
.replace("add_derivatives", "d") \
.replace("add_second_derivatives", "dd")
return text
# generate the identifier by simply concatenating preprocessing key-value
# pairs as strings.
preprocess_id = "-".join(
[fix_kv(k) + "_" + fix_kv(v) for k, v in params.items() if
k not in ignored_params])
preprocessed_dir = os.path.dirname(filename).replace("wav",
"preprocessed-" +
preprocess_id)
preprocessed_path = os.path.join(preprocessed_dir,
os.path.basename(filename).replace(".wav",
""))
# create dir if it doesn't exist yet
if not os.path.exists(preprocessed_dir):
os.makedirs(preprocessed_dir)
return preprocessed_path
[docs]def get_speech_features_from_file(filename, params):
"""Function to get a numpy array of features, from an audio file.
if params['cache_features']==True, try load preprocessed data from
disk, or store after preprocesseng.
else, perform preprocessing on-the-fly.
Args:
filename (string): WAVE filename.
params (dict): the following parameters
num_features (int): number of speech features in frequency domain.
features_type (string): 'mfcc' or 'spectrogram'.
window_size (float): size of analysis window in milli-seconds.
window_stride (float): stride of analysis window in milli-seconds.
augmentation (dict, optional): dictionary of augmentation parameters. See
:func:`augment_audio_signal` for specification and example.
window (str): window function to apply
dither (float): weight of Gaussian noise to apply to input signal for
dithering/preventing quantization noise
num_fft (int): size of fft window to use if features require fft,
defaults to smallest power of 2 larger than window size
norm_per_feature (bool): if True, the output features will be normalized
(whitened) individually. if False, a global mean/std over all features
will be used for normalization
Returns:
np.array: np.array of audio features with shape=[num_time_steps,
num_features].
"""
cache_features = params.get('cache_features', False)
cache_format = params.get('cache_format', 'hdf5')
cache_regenerate = params.get('cache_regenerate', False)
try:
if not cache_features:
raise PreprocessOnTheFlyException(
"on-the-fly preprocessing enforced with 'cache_features'==True")
if cache_regenerate:
raise RegenerateCacheException("regenerating cache...")
preprocessed_data_path = get_preprocessed_data_path(filename, params)
features, duration = load_features(preprocessed_data_path,
data_format=cache_format)
except PreprocessOnTheFlyException:
sample_freq, signal = wave.read(filename)
features, duration = get_speech_features(signal, sample_freq, params)
except (OSError, FileNotFoundError, RegenerateCacheException):
sample_freq, signal = wave.read(filename)
features, duration = get_speech_features(signal, sample_freq, params)
preprocessed_data_path = get_preprocessed_data_path(filename, params)
save_features(features, duration, preprocessed_data_path,
data_format=cache_format)
return features, duration
[docs]def normalize_signal(signal):
"""
Normalize float32 signal to [-1, 1] range
"""
return signal / (np.max(np.abs(signal)) + 1e-5)
[docs]def augment_audio_signal(signal, sample_freq, augmentation):
"""Function that performs audio signal augmentation.
Args:
signal (np.array): np.array containing raw audio signal.
sample_freq (float): frames per second.
augmentation (dict, optional): None or dictionary of augmentation parameters.
If not None, has to have 'speed_perturbation_ratio',
'noise_level_min', or 'noise_level_max' fields, e.g.::
augmentation={
'speed_perturbation_ratio': 0.2,
'noise_level_min': -90,
'noise_level_max': -46,
}
'speed_perturbation_ratio' can either be a list of possible speed
perturbation factors or a float. If float, a random value from
U[1-speed_perturbation_ratio, 1+speed_perturbation_ratio].
Returns:
np.array: np.array with augmented audio signal.
"""
signal_float = normalize_signal(signal.astype(np.float32))
if 'speed_perturbation_ratio' in augmentation:
stretch_amount = -1
if isinstance(augmentation['speed_perturbation_ratio'], list):
stretch_amount = np.random.choice(augmentation['speed_perturbation_ratio'])
elif augmentation['speed_perturbation_ratio'] > 0:
# time stretch (might be slow)
stretch_amount = 1.0 + (2.0 * np.random.rand() - 1.0) * \
augmentation['speed_perturbation_ratio']
if stretch_amount > 0:
signal_float = rs.resample(
signal_float,
sample_freq,
int(sample_freq * stretch_amount),
filter='kaiser_best',
)
# noise
if 'noise_level_min' in augmentation and 'noise_level_max' in augmentation:
noise_level_db = np.random.randint(low=augmentation['noise_level_min'],
high=augmentation['noise_level_max'])
signal_float += np.random.randn(signal_float.shape[0]) * \
10.0 ** (noise_level_db / 20.0)
return normalize_signal(signal_float)
[docs]def preemphasis(signal, coeff=0.97):
return np.append(signal[0], signal[1:] - coeff * signal[:-1])
[docs]def get_speech_features(signal, sample_freq, params):
"""
Get speech features using either librosa (recommended) or
python_speech_features
Args:
signal (np.array): np.array containing raw audio signal
sample_freq (float): sample rate of the signal
params (dict): parameters of pre-processing
Returns:
np.array: np.array of audio features with shape=[num_time_steps,
num_features].
audio_duration (float): duration of the signal in seconds
"""
backend = params.get('backend', 'psf')
features_type = params.get('input_type', 'spectrogram')
num_features = params['num_audio_features']
window_size = params.get('window_size', 20e-3)
window_stride = params.get('window_stride', 10e-3)
augmentation = params.get('augmentation', None)
if backend == 'librosa':
window_fn = WINDOWS_FNS[params.get('window', "hanning")]
dither = params.get('dither', 0.0)
num_fft = params.get('num_fft', None)
norm_per_feature = params.get('norm_per_feature', False)
mel_basis = params.get('mel_basis', None)
if mel_basis is not None and sample_freq != params["sample_freq"]:
raise ValueError(
("The sampling frequency set in params {} does not match the "
"frequency {} read from file {}").format(params["sample_freq"],
sample_freq, filename)
)
features, duration = get_speech_features_librosa(
signal, sample_freq, num_features, features_type,
window_size, window_stride, augmentation, window_fn=window_fn,
dither=dither, norm_per_feature=norm_per_feature, num_fft=num_fft,
mel_basis=mel_basis
)
else:
pad_to = params.get('pad_to', 8)
features, duration = get_speech_features_psf(
signal, sample_freq, num_features, pad_to, features_type,
window_size, window_stride, augmentation
)
return features, duration
[docs]def get_speech_features_librosa(signal, sample_freq, num_features,
features_type='spectrogram',
window_size=20e-3,
window_stride=10e-3,
augmentation=None,
window_fn=np.hanning,
num_fft=None,
dither=0.0,
norm_per_feature=False,
mel_basis=None):
"""Function to convert raw audio signal to numpy array of features.
Backend: librosa
Args:
signal (np.array): np.array containing raw audio signal.
sample_freq (float): frames per second.
num_features (int): number of speech features in frequency domain.
pad_to (int): if specified, the length will be padded to become divisible
by ``pad_to`` parameter.
features_type (string): 'mfcc' or 'spectrogram'.
window_size (float): size of analysis window in milli-seconds.
window_stride (float): stride of analysis window in milli-seconds.
augmentation (dict, optional): dictionary of augmentation parameters. See
:func:`augment_audio_signal` for specification and example.
Returns:
np.array: np.array of audio features with shape=[num_time_steps,
num_features].
audio_duration (float): duration of the signal in seconds
"""
if augmentation:
signal = augment_audio_signal(signal.astype(np.float32), sample_freq, augmentation)
else:
signal = normalize_signal(signal.astype(np.float32))
audio_duration = len(signal) * 1.0 / sample_freq
n_window_size = int(sample_freq * window_size)
n_window_stride = int(sample_freq * window_stride)
num_fft = num_fft or 2**math.ceil(math.log2(window_size*sample_freq))
if dither > 0:
signal += dither*np.random.randn(*signal.shape)
if features_type == 'spectrogram':
# ignore 1/n_fft multiplier, since there is a post-normalization
powspec = np.square(np.abs(librosa.core.stft(
signal, n_fft=n_window_size,
hop_length=n_window_stride, win_length=n_window_size, center=True,
window=window_fn)))
# remove small bins
powspec[powspec <= 1e-30] = 1e-30
features = 10 * np.log10(powspec.T)
assert num_features <= n_window_size // 2 + 1, \
"num_features for spectrogram should be <= (sample_freq * window_size // 2 + 1)"
# cut high frequency part
features = features[:, :num_features]
elif features_type == 'mfcc':
signal = preemphasis(signal, coeff=0.97)
S = np.square(
np.abs(
librosa.core.stft(signal, n_fft=num_fft,
hop_length=int(window_stride * sample_freq),
win_length=int(window_size * sample_freq),
center=True, window=window_fn
)
)
)
features = librosa.feature.mfcc(sr=sample_freq, S=S,
n_mfcc=num_features, n_mels=2*num_features).T
elif features_type == 'logfbank':
signal = preemphasis(signal,coeff=0.97)
S = np.abs(librosa.core.stft(signal, n_fft=num_fft,
hop_length=int(window_stride * sample_freq),
win_length=int(window_size * sample_freq),
center=True, window=window_fn))**2.0
if mel_basis is None:
# Build a Mel filter
mel_basis = librosa.filters.mel(sample_freq, num_fft, n_mels=num_features,
fmin=0, fmax=int(sample_freq/2))
features = np.log(np.dot(mel_basis, S) + 1e-20).T
else:
raise ValueError('Unknown features type: {}'.format(features_type))
norm_axis = 0 if norm_per_feature else None
mean = np.mean(features, axis=norm_axis)
std_dev = np.std(features, axis=norm_axis)
features = (features - mean) / std_dev
if augmentation:
n_freq_mask = augmentation.get('n_freq_mask', 0)
n_time_mask = augmentation.get('n_time_mask', 0)
width_freq_mask = augmentation.get('width_freq_mask', 10)
width_time_mask = augmentation.get('width_time_mask', 50)
for idx in range(n_freq_mask):
freq_band = np.random.randint(width_freq_mask + 1)
freq_base = np.random.randint(0, features.shape[1] - freq_band)
features[:, freq_base:freq_base+freq_band] = 0
for idx in range(n_time_mask):
time_band = np.random.randint(width_time_mask + 1)
if features.shape[0] - time_band > 0:
time_base = np.random.randint(features.shape[0] - time_band)
features[time_base:time_base+time_band, :] = 0
# now it is safe to pad
# if pad_to > 0:
# if features.shape[0] % pad_to != 0:
# pad_size = pad_to - features.shape[0] % pad_to
# if pad_size != 0:
# features = np.pad(features, ((0,pad_size), (0,0)), mode='constant')
return features, audio_duration
[docs]def get_speech_features_psf(signal, sample_freq, num_features,
pad_to=8,
features_type='spectrogram',
window_size=20e-3,
window_stride=10e-3,
augmentation=None):
"""Function to convert raw audio signal to numpy array of features.
Backend: python_speech_features
Args:
signal (np.array): np.array containing raw audio signal.
sample_freq (float): frames per second.
num_features (int): number of speech features in frequency domain.
pad_to (int): if specified, the length will be padded to become divisible
by ``pad_to`` parameter.
features_type (string): 'mfcc' or 'spectrogram'.
window_size (float): size of analysis window in milli-seconds.
window_stride (float): stride of analysis window in milli-seconds.
augmentation (dict, optional): dictionary of augmentation parameters. See
:func:`augment_audio_signal` for specification and example.
apply_window (bool): whether to apply Hann window for mfcc and logfbank.
python_speech_features version should accept winfunc if it is True.
Returns:
np.array: np.array of audio features with shape=[num_time_steps,
num_features].
audio_duration (float): duration of the signal in seconds
"""
if augmentation is not None:
signal = augment_audio_signal(signal, sample_freq, augmentation)
else:
signal = (normalize_signal(signal.astype(np.float32)) * 32767.0).astype(
np.int16)
audio_duration = len(signal) * 1.0 / sample_freq
n_window_size = int(sample_freq * window_size)
n_window_stride = int(sample_freq * window_stride)
# making sure length of the audio is divisible by 8 (fp16 optimization)
length = 1 + int(math.ceil(
(1.0 * signal.shape[0] - n_window_size) / n_window_stride
))
if pad_to > 0:
if length % pad_to != 0:
pad_size = (pad_to - length % pad_to) * n_window_stride
signal = np.pad(signal, (0, pad_size), mode='constant')
if features_type == 'spectrogram':
frames = psf.sigproc.framesig(sig=signal,
frame_len=n_window_size,
frame_step=n_window_stride,
winfunc=np.hanning)
# features = np.log1p(psf.sigproc.powspec(frames, NFFT=N_window_size))
features = psf.sigproc.logpowspec(frames, NFFT=n_window_size)
assert num_features <= n_window_size // 2 + 1, \
"num_features for spectrogram should be <= (sample_freq * window_size // 2 + 1)"
# cut high frequency part
features = features[:, :num_features]
elif features_type == 'mfcc':
features = psf.mfcc(signal=signal,
samplerate=sample_freq,
winlen=window_size,
winstep=window_stride,
numcep=num_features,
nfilt=2 * num_features,
nfft=512,
lowfreq=0, highfreq=None,
preemph=0.97,
ceplifter=2 * num_features,
appendEnergy=False)
elif features_type == 'logfbank':
features = psf.logfbank(signal=signal,
samplerate=sample_freq,
winlen=window_size,
winstep=window_stride,
nfilt=num_features,
nfft=512,
lowfreq=0, highfreq=sample_freq / 2,
preemph=0.97)
else:
raise ValueError('Unknown features type: {}'.format(features_type))
if pad_to > 0:
assert features.shape[0] % pad_to == 0
mean = np.mean(features)
std_dev = np.std(features)
features = (features - mean) / std_dev
return features, audio_duration