# Copyright (c) 2019 NVIDIA Corporation
from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals

import librosa
import matplotlib as mpl
import numpy as np
from import write
from six import BytesIO
from six.moves import range

import matplotlib.pyplot as plt
import tensorflow as tf

from .encoder_decoder import EncoderDecoderModel

[docs]def plot_spectrograms( specs, titles, stop_token_pred, audio_length, logdir, train_step, stop_token_target=None, number=0, append=False, save_to_tensorboard=False ): """ Helper function to create a image to be logged to disk or a tf.Summary to be logged to tensorboard. Args: specs (array): array of images to show titles (array): array of titles. Must match lengths of specs array stop_token_pred (np.array): np.array of size [time, 1] containing the stop token predictions from the model. audio_length (int): lenth of the predicted spectrogram logdir (str): dir to save image file is save_to_tensorboard is disabled. train_step (int): current training step stop_token_target (np.array): np.array of size [time, 1] containing the stop token target. number (int): Current sample number (used if evaluating more than 1 sample from a batch) append (str): Optional string to append to file name eg. train, eval, infer save_to_tensorboard (bool): If False, the created image is saved to the logdir as a png file. If True, the function returns a tf.Summary object containing the image and will be logged to the current tensorboard file. Returns: tf.Summary or None """ num_figs = len(specs) + 1 fig, ax = plt.subplots(nrows=num_figs, figsize=(8, num_figs * 3)) for i, (spec, title) in enumerate(zip(specs, titles)): spec = np.pad(spec, ((1, 1), (1, 1)), "constant", constant_values=0.) spec = spec.astype(float) colour = ax[i].imshow( spec.T, cmap='viridis', interpolation=None, aspect='auto' ) ax[i].invert_yaxis() ax[i].set_title(title) fig.colorbar(colour, ax=ax[i]) if stop_token_target is not None: stop_token_target = stop_token_target.astype(float) ax[-1].plot(stop_token_target, 'r.') stop_token_pred = stop_token_pred.astype(float) ax[-1].plot(stop_token_pred, 'g.') ax[-1].axvline(x=audio_length) ax[-1].set_xlim(0, len(specs[0])) ax[-1].set_title("stop token") plt.xlabel('time') plt.tight_layout() cb = fig.colorbar(colour, ax=ax[-1]) cb.remove() if save_to_tensorboard: tag = "{}_image".format(append) iostream = BytesIO() fig.savefig(iostream, dpi=300) summary = tf.Summary.Image( encoded_image_string=iostream.getvalue(), height=int(fig.get_figheight() * 300), width=int(fig.get_figwidth() * 300) ) summary = tf.Summary.Value(tag=tag, image=summary) plt.close(fig) return summary else: if append: name = '{}/Output_step{}_{}_{}.png'.format( logdir, train_step, number, append ) else: name = '{}/Output_step{}_{}.png'.format(logdir, train_step, number) if logdir[0] != '/': name = "./" + name # save fig.savefig(name, dpi=300) plt.close(fig) return None
[docs]def save_audio( magnitudes, logdir, step, sampling_rate, n_fft=1024, mode="train", number=0, save_format="tensorboard", power=1.5, gl_iters=50, verbose=True, max_normalization=False ): """ Helper function to create a wav file to be logged to disk or a tf.Summary to be logged to tensorboard. Args: magnitudes (np.array): np.array of size [time, n_fft/2 + 1] containing the energy spectrogram. logdir (str): dir to save image file is save_to_tensorboard is disabled. step (int): current training step n_fft (int): number of filters for fft and ifft. sampling_rate (int): samplng rate in Hz of the audio to be saved. number (int): Current sample number (used if evaluating more than 1 sample mode (str): Optional string to append to file name eg. train, eval, infer from a batch) save_format: save_audio can either return the np.array containing the generated sound, log the wav file to the disk, or return a tensorboard summary object. Each method can be enabled by passing save_format as "np.array", "tensorboard", or "disk" respectively. Returns: tf.Summary or None """ # Clip signal max and min if np.min(magnitudes) < 0 or np.max(magnitudes) > 255: if verbose: print("WARNING: {} audio was clipped at step {}".format(mode.capitalize(), step)) magnitudes = np.clip(magnitudes, a_min=0, a_max=255) signal = griffin_lim(magnitudes.T ** power, n_iters=gl_iters, n_fft=n_fft) if max_normalization: signal /= np.max(np.abs(signal)) if save_format == "np.array": return signal elif save_format == "tensorboard": tag = "{}_audio".format(mode) iostream = BytesIO() write(iostream, sampling_rate, signal) summary = tf.Summary.Audio(encoded_audio_string=iostream.getvalue()) summary = tf.Summary.Value(tag=tag, audio=summary) return summary elif save_format == "disk": file_name = '{}/sample_step{}_{}_{}.wav'.format(logdir, step, number, mode) if logdir[0] != '/': file_name = "./" + file_name write(file_name, sampling_rate, signal) return None else: print(( "WARN: The save format passed to save_audio was not understood. No " "sound files will be saved for the current step. " "Received '{}'." "Expected one of 'np.array', 'tensorboard', or 'disk'" ).format(save_format)) return None
[docs]def griffin_lim(magnitudes, n_iters=50, n_fft=1024): """ Griffin-Lim algorithm to convert magnitude spectrograms to audio signals """ phase = np.exp(2j * np.pi * np.random.rand(*magnitudes.shape)) complex_spec = magnitudes * phase signal = librosa.istft(complex_spec) if not np.isfinite(signal).all(): print("WARNING: audio was not finite, skipping audio saving") return np.array([0]) for _ in range(n_iters): _, phase = librosa.magphase(librosa.stft(signal, n_fft=n_fft)) complex_spec = magnitudes * phase signal = librosa.istft(complex_spec) return signal
[docs]class Text2Speech(EncoderDecoderModel): """ Text-to-speech data layer. """
[docs] @staticmethod def get_required_params(): return dict( EncoderDecoderModel.get_required_params(), **{ "save_to_tensorboard": bool, } )
def __init__(self, params, mode="train", hvd=None): super(Text2Speech, self).__init__(params, mode=mode, hvd=hvd) self._save_to_tensorboard = self.params["save_to_tensorboard"]
[docs] def print_logs(self, mode, specs, titles, stop_token_pred, stop_target, audio_length, step, predicted_final_spec, predicted_mag_spec=None): """ Save audio files and plots. Args: mode: "train" or "eval". specs: spectograms to plot. titles: spectogram titles. stop_token_pred: stop token prediction. stop_target: stop target. audio_length: length of the audio. step: current step. predicted_final_spec: predicted mel spectogram. predicted_mag_spec: predicted magnitude spectogram. Returns: Dictionary to log. """ dict_to_log = {} im_summary = plot_spectrograms( specs, titles, stop_token_pred, audio_length, self.params["logdir"], step, append=mode, save_to_tensorboard=self._save_to_tensorboard, stop_token_target=stop_target ) dict_to_log['image'] = im_summary if audio_length < 3: return {} if self._save_to_tensorboard: save_format = "tensorboard" else: save_format = "disk" if predicted_mag_spec is not None: predicted_mag_spec = predicted_mag_spec[:audio_length - 1, :] if self.get_data_layer()._exp_mag is False: predicted_mag_spec = np.exp(predicted_mag_spec) predicted_mag_spec = self.get_data_layer().get_magnitude_spec(predicted_mag_spec) wav_summary = save_audio( predicted_mag_spec, self.params["logdir"], step, n_fft=self.get_data_layer().n_fft, sampling_rate=self.get_data_layer().sampling_rate, mode=mode + "_mag", save_format=save_format ) dict_to_log['audio_mag'] = wav_summary predicted_final_spec = predicted_final_spec[:audio_length - 1, :] predicted_final_spec = self.get_data_layer().get_magnitude_spec( predicted_final_spec, is_mel=True ) wav_summary = save_audio( predicted_final_spec, self.params["logdir"], step, n_fft=self.get_data_layer().n_fft, sampling_rate=self.get_data_layer().sampling_rate, mode=mode, save_format=save_format, max_normalization=self.get_data_layer().max_normalization ) dict_to_log['audio'] = wav_summary if self._save_to_tensorboard: return dict_to_log return {}
[docs] def infer(self, input_values, output_values): if self.on_horovod: raise ValueError("Inference is not supported on horovod") return [input_values, output_values]
[docs] def evaluate(self, input_values, output_values): # Need to reduce amount of data sent for horovod # Use last element idx = -1 output_values = [(item[idx]) for item in output_values] input_values = { key: [value[0][idx], value[1][idx]] for key, value in input_values.items() } return [input_values, output_values]
[docs] def get_alignments(self, attention_mask): """ Get attention alignment plots. Args: attention_mask: attention alignment. Returns: Specs and titles to plot. """ raise NotImplementedError()
[docs] def finalize_inference(self, results_per_batch, output_file): print("output_file is ignored for tts") print("results are logged to the logdir") batch_size = len(results_per_batch[0][0]["source_tensors"][0]) for i, sample in enumerate(results_per_batch): output_values = sample[1] predicted_final_specs = output_values[1] attention_mask = output_values[2] stop_tokens = output_values[3] sequence_lengths = output_values[4] for j in range(len(predicted_final_specs)): predicted_final_spec = predicted_final_specs[j] attention_mask_sample = attention_mask[j] stop_tokens_sample = stop_tokens[j] specs = [predicted_final_spec] titles = ["final spectrogram"] audio_length = sequence_lengths[j] alignment_specs, alignment_titles = self.get_alignments(attention_mask_sample) specs += alignment_specs titles += alignment_titles if "mel" in self.get_data_layer().params["output_type"]: mag_spec = self.get_data_layer().get_magnitude_spec(predicted_final_spec) log_mag_spec = np.log(np.clip(mag_spec, a_min=1e-5, a_max=None)) specs.append(log_mag_spec) titles.append("magnitude spectrogram") elif "both" in self.get_data_layer().params["output_type"]: mag_spec = self.get_data_layer().get_magnitude_spec(predicted_final_spec, is_mel=True) specs.append(mag_spec) titles.append("mag spectrogram from mel basis") specs.append(output_values[5][j]) titles.append("mag spectrogram from proj layer") im_summary = plot_spectrograms( specs, titles, stop_tokens_sample, audio_length, self.params["logdir"], 0, number=i * batch_size + j, append="infer" ) if audio_length > 2: if "both" in self.get_data_layer().params["output_type"]: predicted_mag_spec = output_values[5][j][:audio_length - 1, :] wav_summary = save_audio( predicted_mag_spec, self.params["logdir"], 0, n_fft=self.get_data_layer().n_fft, sampling_rate=self.get_data_layer().sampling_rate, mode="infer_mag", number=i * batch_size + j, save_format="disk", max_normalization=self.get_data_layer().max_normalization ) predicted_final_spec = predicted_final_spec[:audio_length - 1, :] predicted_final_spec = self.get_data_layer().get_magnitude_spec(predicted_final_spec, is_mel=True) wav_summary = save_audio( predicted_final_spec, self.params["logdir"], 0, n_fft=self.get_data_layer().n_fft, sampling_rate=self.get_data_layer().sampling_rate, mode="infer", number=i * batch_size + j, save_format="disk", max_normalization=self.get_data_layer().max_normalization )
[docs] def finalize_evaluation(self, results_per_batch, training_step=None, samples_count=1): sample = results_per_batch[0] input_values = sample[0] output_values = sample[1] y_sample, stop_target = input_values["target_tensors"] predicted_spec = output_values[0] predicted_final_spec = output_values[1] attention_mask = output_values[2] stop_token_pred = output_values[3] audio_length = output_values[4] max_length = np.max([ y_sample.shape[0], predicted_final_spec.shape[0], ]) predictions_pad = np.zeros( [max_length - np.shape(predicted_final_spec)[0], np.shape(predicted_final_spec)[-1]] ) stop_token_pred_pad = np.zeros( [max_length - np.shape(predicted_final_spec)[0], 1] ) spec_pad = np.zeros([max_length - np.shape(y_sample)[0], np.shape(y_sample)[-1]]) stop_token_pad = np.zeros([max_length - np.shape(y_sample)[0]]) predicted_spec = np.concatenate( [predicted_spec, predictions_pad], axis=0 ) predicted_final_spec = np.concatenate( [predicted_final_spec, predictions_pad], axis=0 ) stop_token_pred = np.concatenate( [stop_token_pred, stop_token_pred_pad], axis=0 ) y_sample = np.concatenate([y_sample, spec_pad], axis=0) stop_target = np.concatenate([stop_target, stop_token_pad], axis=0) specs = [ y_sample, predicted_spec, predicted_final_spec ] titles = [ "training data", "decoder results", "post net results" ] alignment_specs, alignment_titles = self.get_alignments(attention_mask) specs += alignment_specs titles += alignment_titles predicted_mag_spec = None if "both" in self.get_data_layer().params["output_type"]: n_feats = self.get_data_layer().params["num_audio_features"] predicted_mag_spec = output_values[5] mag_pred_pad = np.zeros( [max_length - np.shape(predicted_mag_spec)[0], n_feats["magnitude"]] ) predicted_mag_spec = np.concatenate([predicted_mag_spec, mag_pred_pad], axis=0) specs.append(predicted_mag_spec) titles.append("magnitude spectrogram") mel, mag = np.split( y_sample, [n_feats["mel"]], axis=1 ) specs.insert(0, mel) specs[1] = mag titles.insert(0, "target mel") titles[1] = "target mag" return self.print_logs( mode="eval", specs=specs, titles=titles, stop_token_pred=stop_token_pred, stop_target=stop_target[0], audio_length=audio_length, step=training_step, predicted_final_spec=predicted_final_spec, predicted_mag_spec=predicted_mag_spec )
[docs] def maybe_print_logs(self, input_values, output_values, training_step): spec, stop_target, _ = input_values['target_tensors'] predicted_decoder_spec = output_values[0] predicted_final_spec = output_values[1] attention_mask = output_values[2] stop_token_pred = output_values[3] y_sample = spec[0] stop_target = stop_target[0] predicted_spec = predicted_decoder_spec[0] predicted_final_spec = predicted_final_spec[0] alignment = attention_mask[0] stop_token_pred = stop_token_pred[0] audio_length = output_values[4][0] specs = [ y_sample, predicted_spec, predicted_final_spec ] titles = [ "training data", "decoder results", "post net results" ] alignment_specs, alignment_titles = self.get_alignments(alignment) specs += alignment_specs titles += alignment_titles predicted_mag_spec = None if "both" in self.get_data_layer().params["output_type"]: predicted_mag_spec = output_values[5][0] specs.append(predicted_mag_spec) titles.append("magnitude spectrogram") n_feats = self.get_data_layer().params["num_audio_features"] mel, mag = np.split( y_sample, [n_feats["mel"]], axis=1 ) specs.insert(0, mel) specs[1] = mag titles.insert(0, "target mel") titles[1] = "target mag" return self.print_logs( mode="train", specs=specs, titles=titles, stop_token_pred=stop_token_pred, stop_target=stop_target, audio_length=audio_length, step=training_step, predicted_final_spec=predicted_final_spec, predicted_mag_spec=predicted_mag_spec )