# Copyright 2022 Carnegie Mellon University (Jiatong Shi)
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""Translatotron2 related modules for ESPnet2."""
from typing import Optional
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from espnet2.s2st.synthesizer.abs_synthesizer import AbsSynthesizer
from espnet.nets.pytorch_backend.fastspeech.duration_predictor import (
DurationPredictor as FastDurationPredictor,
)
[docs]class Translatotron2(AbsSynthesizer):
"""Translatotron2 module.
This is a module of the synthesizer in Translatotron2 described in `Translatotron 2:
High-quality direct speech-to-speech translation with voice preservation`_.
.. _`Translatotron 2:
High-quality direct speech-to-speech translation with voice preservation`:
https://arxiv.org/pdf/2107.08661v5.pdf
"""
def __init__(
self,
# network structure related
idim: int,
odim: int,
synthesizer_type: str = "rnn",
layers: int = 2,
units: int = 1024,
# for prenet
prenet_layers: int = 2,
prenet_units: int = 128,
prenet_dropout_rate: float = 0.5,
# for postnet
postnet_layers: int = 5,
postnet_chans: int = 512,
postnet_dropout_rate: float = 0.5,
# for transformer
adim: int = 384,
aheads: int = 4,
# only for conformer
conformer_rel_pos_type: str = "legacy",
conformer_pos_enc_layer_type: str = "rel_pos",
conformer_self_attn_layer_type: str = "rel_selfattn",
conformer_activation_type: str = "swish",
use_macaron_style_in_conformer: bool = True,
use_cnn_in_conformer: bool = True,
zero_triu: bool = False,
conformer_enc_kernel_size: int = 7,
conformer_dec_kernel_size: int = 31,
# duration predictor
duration_predictor_layers: int = 2,
duration_predictor_type: str = "rnn",
duration_predictor_units: int = 128,
# extra embedding related
spks: Optional[int] = None,
langs: Optional[int] = None,
spk_embed_dim: Optional[int] = None,
spk_embed_integration_type: str = "add",
# training related
init_type: str = "xavier_uniform",
init_enc_alpha: float = 1.0,
init_dec_alpha: float = 1.0,
use_masking: bool = False,
use_weighted_masking: bool = False,
):
return
[docs]class Prenet(nn.Module):
"""Non-Attentive Tacotron (NAT) Prenet."""
def __init__(self, idim, units=128, num_layers=2, dropout=0.5):
super(Prenet, self).__init__()
sizes = [units] * num_layers
in_sizes = [idim] + sizes[:-1]
self.layers = nn.ModuleList(
[
nn.Linear(in_size, out_size, bias=False)
for (in_size, out_size) in zip(in_sizes, sizes)
]
)
self.dropout = nn.Dropout(p=dropout)
self.activation = nn.ReLU()
[docs] def forward(self, x):
for linear in self.layers:
x = self.dropout(self.activation(linear(x)))
return x
[docs]class DurationPredictor(nn.Module):
"""Non-Attentive Tacotron (NAT) Duration Predictor module."""
def __init__(self, cfg):
super(FastDurationPredictor, self).__init__()
self.lstm = nn.LSTM(
cfg.units,
int(cfg.duration_lstm_dim / 2),
2,
batch_first=True,
bidirectional=True,
)
self.proj = nn.LinearNorm(cfg.duration_lstm_dim, 1)
self.relu = nn.ReLU()
[docs] def forward(self, encoder_outputs, input_lengths=None):
"""Forward Duration Predictor
:param encoder_outputs: [batch_size, hidden_length, encoder_lstm_dim]
:param input_lengths: [batch_size, hidden_length]
:return: [batch_size, hidden_length]
"""
batch_size = encoder_outputs.size(0)
hidden_length = encoder_outputs.size(1)
# remove pad activations
if input_lengths is not None:
encoder_outputs = pack_padded_sequence(
encoder_outputs, input_lengths, batch_first=True, enforce_sorted=False
)
self.lstm.flatten_parameters()
outputs, _ = self.lstm(encoder_outputs)
if input_lengths is not None:
outputs, _ = pad_packed_sequence(outputs, batch_first=True)
outputs = self.relu(self.proj(outputs))
return outputs.view(batch_size, hidden_length)
[docs]class GaussianUpsampling(nn.Module):
"""Gaussian Upsample.
Non-attention Tacotron:
- https://arxiv.org/abs/2010.04301
this source code is implemenation of the ExpressiveTacotron from BridgetteSong
- https://github.com/BridgetteSong/ExpressiveTacotron/
"""
def __init__(self):
super(GaussianUpsampling, self).__init__()
self.mask_score = -1e15
[docs] def forward(self, encoder_outputs, durations, vars, input_lengths=None):
"""Gaussian upsampling.
Args:
encoder_outputs: encoder outputs [batch_size, hidden_length, dim]
durations: phoneme durations [batch_size, hidden_length]
vars : phoneme attended ranges [batch_size, hidden_length]
input_lengths : [batch_size]
Return:
encoder_upsampling_outputs: upsampled encoder_output
[batch_size, frame_length, dim]
"""
batch_size = encoder_outputs.size(0)
hidden_length = encoder_outputs.size(1)
frame_length = int(torch.sum(durations, dim=1).max().item())
c = torch.cumsum(durations, dim=1).float() - 0.5 * durations
c = c.unsqueeze(2) # [batch_size, hidden_length, 1]
t = (
torch.arange(frame_length, device=encoder_outputs.device)
.expand(batch_size, hidden_length, frame_length)
.float()
) # [batch_size, hidden_length, frame_length]
vars = vars.view(batch_size, -1, 1) # [batch_size, hidden_length, 1]
w_t = -0.5 * (
np.log(2.0 * np.pi) + torch.log(vars) + torch.pow(t - c, 2) / vars
) # [batch_size, hidden_length, frame_length]
if input_lengths is not None:
input_masks = ~self.get_mask_from_lengths(
input_lengths, hidden_length
) # [batch_size, hidden_length]
input_masks = torch.tensor(input_masks, dtype=torch.bool, device=w_t.device)
masks = input_masks.unsqueeze(2)
w_t.data.masked_fill_(masks, self.mask_score)
w_t = F.softmax(w_t, dim=1)
encoder_upsampling_outputs = torch.bmm(
w_t.transpose(1, 2), encoder_outputs
) # [batch_size, frame_length, encoder_hidden_size]
return encoder_upsampling_outputs
[docs] def get_mask_from_lengths(self, lengths, max_len=None):
if max_len is None:
max_len = max(lengths)
ids = np.arange(0, max_len)
mask = ids < lengths.reshape(-1, 1)
return mask