Source code for encoders.convs2s_encoder

# Copyright (c) 2018 NVIDIA Corporation
"""
Conv-based encoder
"""
from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals

import tensorflow as tf
import math
from .encoder import Encoder

from open_seq2seq.parts.transformer import embedding_layer
from open_seq2seq.parts.transformer.utils import get_padding_bias, get_padding
from open_seq2seq.parts.convs2s import ffn_wn_layer, conv_wn_layer
from open_seq2seq.parts.convs2s.utils import gated_linear_units

# Default value used if max_input_length is not given
MAX_INPUT_LENGTH = 128


[docs]class ConvS2SEncoder(Encoder): """ Fully convolutional Encoder of ConvS2S """
[docs] @staticmethod def get_required_params(): return dict( Encoder.get_required_params(), **{ "src_emb_size": int, "src_vocab_size": int, "pad_embeddings_2_eight": bool, "conv_nchannels_kwidth": list, "embedding_dropout_keep_prob": float, "hidden_dropout_keep_prob": float, })
[docs] @staticmethod def get_optional_params(): return dict( Encoder.get_optional_params(), **{ "att_layer_num": int, 'max_input_length': int, 'PAD_SYMBOL': int, 'conv_activation': None, 'normalization_type': str, 'scaling_factor': float, 'init_var': None, })
def __init__(self, params, model, name="convs2s_encoder_with_emb", mode='train'): super(ConvS2SEncoder, self).__init__(params, model, name=name, mode=mode) self._src_vocab_size = self.params['src_vocab_size'] self._src_emb_size = self.params['src_emb_size'] self.layers = [] self._mode = mode self._pad_sym = self.params.get('PAD_SYMBOL', 0) self._pad2eight = params.get('pad_embeddings_2_eight', False) self.scaling_factor = self.params.get("scaling_factor", math.sqrt(0.5)) self.normalization_type = self.params.get("normalization_type", "weight_norm") self.conv_activation = self.params.get("conv_activation", gated_linear_units) self.regularizer = self.params.get('regularizer', None) self.init_var = self.params.get('init_var', None) def _encode(self, input_dict): inputs = input_dict['source_tensors'][0] source_length = input_dict['source_tensors'][1] with tf.variable_scope("encode"): # prepare encoder graph if len(self.layers) == 0: knum_list = list(zip(*self.params.get("conv_nchannels_kwidth")))[0] kwidth_list = list(zip(*self.params.get("conv_nchannels_kwidth")))[1] with tf.variable_scope("embedding"): self.embedding_softmax_layer = embedding_layer.EmbeddingSharedWeights( vocab_size=self._src_vocab_size, hidden_size=self._src_emb_size, pad_vocab_to_eight=self._pad2eight, init_var=0.1, embed_scale=False, pad_sym=self._pad_sym, mask_paddings=True) with tf.variable_scope("pos_embedding"): self.position_embedding_layer = embedding_layer.EmbeddingSharedWeights( vocab_size=self.params.get("max_input_length", MAX_INPUT_LENGTH), hidden_size=self._src_emb_size, pad_vocab_to_eight=self._pad2eight, init_var=0.1, embed_scale=False, pad_sym=self._pad_sym, mask_paddings=True) # linear projection before cnn layers self.layers.append( ffn_wn_layer.FeedFowardNetworkNormalized( self._src_emb_size, knum_list[0], dropout=self.params["embedding_dropout_keep_prob"], var_scope_name="linear_mapping_before_cnn_layers", mode=self.mode, normalization_type=self.normalization_type, regularizer=self.regularizer, init_var=self.init_var)) for i in range(len(knum_list)): in_dim = knum_list[i] if i == 0 else knum_list[i - 1] out_dim = knum_list[i] # linear projection is needed for residual connections if # input and output of a cnn layer do not match if in_dim != out_dim: linear_proj = ffn_wn_layer.FeedFowardNetworkNormalized( in_dim, out_dim, var_scope_name="linear_mapping_cnn_" + str(i + 1), dropout=1.0, mode=self.mode, normalization_type=self.normalization_type, regularizer=self.regularizer, init_var=self.init_var) else: linear_proj = None conv_layer = conv_wn_layer.Conv1DNetworkNormalized( in_dim, out_dim, kernel_width=kwidth_list[i], mode=self.mode, layer_id=i + 1, hidden_dropout=self.params["hidden_dropout_keep_prob"], conv_padding="SAME", decode_padding=False, activation=self.conv_activation, normalization_type=self.normalization_type, regularizer=self.regularizer, init_var=self.init_var) self.layers.append([linear_proj, conv_layer]) # linear projection after cnn layers self.layers.append( ffn_wn_layer.FeedFowardNetworkNormalized( knum_list[-1], self._src_emb_size, dropout=1.0, var_scope_name="linear_mapping_after_cnn_layers", mode=self.mode, normalization_type=self.normalization_type, regularizer=self.regularizer, init_var=self.init_var)) encoder_inputs = self.embedding_softmax_layer(inputs) inputs_attention_bias = get_padding_bias( inputs, res_rank=3, pad_sym=self._pad_sym) with tf.name_scope("add_pos_encoding"): pos_input = tf.range( 0, tf.shape(encoder_inputs)[1], delta=1, dtype=tf.int32, name='range') pos_encoding = self.position_embedding_layer(pos_input) encoder_inputs = encoder_inputs + tf.cast( x=pos_encoding, dtype=encoder_inputs.dtype) if self.mode == "train": encoder_inputs = tf.nn.dropout( encoder_inputs, self.params["embedding_dropout_keep_prob"]) # mask the paddings in the input given to cnn layers inputs_padding = get_padding( inputs, self._pad_sym, dtype=encoder_inputs.dtype) padding_mask = tf.expand_dims(1 - inputs_padding, 2) encoder_inputs *= padding_mask outputs, outputs_b, final_state = self._call(encoder_inputs, padding_mask) return { 'outputs': outputs, 'outputs_b': outputs_b, 'inputs_attention_bias_cs2s': inputs_attention_bias, 'state': final_state, 'src_lengths': source_length, # should it include paddings or not? 'embedding_softmax_layer': self.embedding_softmax_layer, 'encoder_input': inputs } def _call(self, encoder_inputs, padding_mask): # Run inputs through the sublayers. with tf.variable_scope("linear_layer_before_cnn_layers"): outputs = self.layers[0](encoder_inputs) for i in range(1, len(self.layers) - 1): linear_proj, conv_layer = self.layers[i] with tf.variable_scope("layer_%d" % i): if linear_proj is not None: res_inputs = linear_proj(outputs) else: res_inputs = outputs if padding_mask is not None: outputs *= padding_mask outputs = conv_layer(outputs) outputs = (outputs + res_inputs) * self.scaling_factor with tf.variable_scope("linear_layer_after_cnn_layers"): outputs = self.layers[-1](outputs) if padding_mask is not None: outputs *= padding_mask # Gradients are scaled as the gradients from # all decoder attention layers enters the encoder scale = 1.0 / ( 2.0 * self.params.get("att_layer_num", 1)) outputs = (1.0 - scale) * tf.stop_gradient(outputs) + scale * outputs outputs_b = (outputs + encoder_inputs) * self.scaling_factor if padding_mask is not None: outputs_b *= padding_mask # Average of the encoder outputs is calculated as the final state of the encoder # it can be used for decoders which just accept the final state final_state = tf.reduce_mean(outputs_b, 1) return outputs, outputs_b, final_state @property def src_vocab_size(self): return self._src_vocab_size @property def src_emb_size(self): return self._src_emb_size