# -*- coding: utf-8 -*-
"""Network related utility tools."""
import logging
from typing import Dict
import numpy as np
import torch
[docs]def to_device(m, x):
"""Send tensor into the device of the module.
Args:
m (torch.nn.Module): Torch module.
x (Tensor): Torch tensor.
Returns:
Tensor: Torch tensor located in the same place as torch module.
"""
if isinstance(m, torch.nn.Module):
device = next(m.parameters()).device
elif isinstance(m, torch.Tensor):
device = m.device
else:
raise TypeError(
"Expected torch.nn.Module or torch.tensor, " f"bot got: {type(m)}"
)
return x.to(device)
[docs]def pad_list(xs, pad_value):
"""Perform padding for the list of tensors.
Args:
xs (List): List of Tensors [(T_1, `*`), (T_2, `*`), ..., (T_B, `*`)].
pad_value (float): Value for padding.
Returns:
Tensor: Padded tensor (B, Tmax, `*`).
Examples:
>>> x = [torch.ones(4), torch.ones(2), torch.ones(1)]
>>> x
[tensor([1., 1., 1., 1.]), tensor([1., 1.]), tensor([1.])]
>>> pad_list(x, 0)
tensor([[1., 1., 1., 1.],
[1., 1., 0., 0.],
[1., 0., 0., 0.]])
"""
n_batch = len(xs)
max_len = max(x.size(0) for x in xs)
pad = xs[0].new(n_batch, max_len, *xs[0].size()[1:]).fill_(pad_value)
for i in range(n_batch):
pad[i, : xs[i].size(0)] = xs[i]
return pad
[docs]def make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None):
"""Make mask tensor containing indices of padded part.
Args:
lengths (LongTensor or List): Batch of lengths (B,).
xs (Tensor, optional): The reference tensor.
If set, masks will be the same shape as this tensor.
length_dim (int, optional): Dimension indicator of the above tensor.
See the example.
Returns:
Tensor: Mask tensor containing indices of padded part.
dtype=torch.uint8 in PyTorch 1.2-
dtype=torch.bool in PyTorch 1.2+ (including 1.2)
Examples:
With only lengths.
>>> lengths = [5, 3, 2]
>>> make_pad_mask(lengths)
masks = [[0, 0, 0, 0 ,0],
[0, 0, 0, 1, 1],
[0, 0, 1, 1, 1]]
With the reference tensor.
>>> xs = torch.zeros((3, 2, 4))
>>> make_pad_mask(lengths, xs)
tensor([[[0, 0, 0, 0],
[0, 0, 0, 0]],
[[0, 0, 0, 1],
[0, 0, 0, 1]],
[[0, 0, 1, 1],
[0, 0, 1, 1]]], dtype=torch.uint8)
>>> xs = torch.zeros((3, 2, 6))
>>> make_pad_mask(lengths, xs)
tensor([[[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1]],
[[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1]],
[[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)
With the reference tensor and dimension indicator.
>>> xs = torch.zeros((3, 6, 6))
>>> make_pad_mask(lengths, xs, 1)
tensor([[[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1]],
[[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1]],
[[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1]]], dtype=torch.uint8)
>>> make_pad_mask(lengths, xs, 2)
tensor([[[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 1]],
[[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1]],
[[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)
"""
if length_dim == 0:
raise ValueError("length_dim cannot be 0: {}".format(length_dim))
# If the input dimension is 2 or 3,
# then we use ESPnet-ONNX based implementation for tracable modeling.
# otherwise we use the traditional implementation for research use.
if isinstance(lengths, list):
logging.warning(
"Using make_pad_mask with a list of lengths is not tracable. "
+ "If you try to trace this function with type(lengths) == list, "
+ "please change the type of lengths to torch.LongTensor."
)
if (
(xs is None or xs.dim() in (2, 3))
and length_dim <= 2
and (not isinstance(lengths, list) and lengths.dim() == 1)
):
return _make_pad_mask_traceable(lengths, xs, length_dim, maxlen)
else:
return _make_pad_mask(lengths, xs, length_dim, maxlen)
def _make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None):
if not isinstance(lengths, list):
lengths = lengths.long().tolist()
bs = int(len(lengths))
if maxlen is None:
if xs is None:
maxlen = int(max(lengths))
else:
maxlen = xs.size(length_dim)
else:
assert xs is None, "When maxlen is specified, xs must not be specified."
assert maxlen >= int(
max(lengths)
), f"maxlen {maxlen} must be >= max(lengths) {max(lengths)}"
seq_range = torch.arange(0, maxlen, dtype=torch.int64)
seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen)
seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1)
mask = seq_range_expand >= seq_length_expand
if xs is not None:
assert (
xs.size(0) == bs
), f"The size of x.size(0) {xs.size(0)} must match the batch size {bs}"
if length_dim < 0:
length_dim = xs.dim() + length_dim
# ind = (:, None, ..., None, :, , None, ..., None)
ind = tuple(
slice(None) if i in (0, length_dim) else None for i in range(xs.dim())
)
mask = mask[ind].expand_as(xs).to(xs.device)
return mask
def _make_pad_mask_traceable(lengths, xs, length_dim, maxlen=None):
"""
Make mask tensor containing indices of padded part.
This is a simplified implementation of make_pad_mask without the xs input
that supports JIT tracing for applications like exporting models to ONNX.
Dimension length of xs should be 2 or 3
This function will create torch.ones(maxlen, maxlen).triu(diagonal=1) and
select rows to create mask tensor.
"""
if xs is None:
device = lengths.device
else:
device = xs.device
if xs is not None and len(xs.shape) == 3:
if length_dim == 1:
lengths = lengths.unsqueeze(1).expand(*xs.transpose(1, 2).shape[:2])
else:
# Then length_dim is 2 or -1.
if length_dim not in (-1, 2):
logging.warn(
f"Invalid length_dim {length_dim}."
+ "We set it to -1, which is the default value."
)
length_dim = -1
lengths = lengths.unsqueeze(1).expand(*xs.shape[:2])
if maxlen is not None:
assert xs is None
assert maxlen >= lengths.max()
elif xs is not None:
maxlen = xs.shape[length_dim]
else:
maxlen = lengths.max()
# clip max(length) to maxlen
lengths = torch.clamp(lengths, max=maxlen).type(torch.long)
mask = torch.ones(maxlen + 1, maxlen + 1, dtype=torch.bool, device=device)
mask = triu_onnx(mask)[1:, :-1] # onnx cannot handle diagonal argument.
mask = mask[lengths - 1][..., :maxlen]
if xs is not None and len(xs.shape) == 3 and length_dim == 1:
return mask.transpose(1, 2)
else:
return mask
[docs]def triu_onnx(x):
arange = torch.arange(x.size(0), device=x.device)
mask = arange.unsqueeze(-1).expand(-1, x.size(0)) <= arange
return x * mask
[docs]def make_non_pad_mask(lengths, xs=None, length_dim=-1):
"""Make mask tensor containing indices of non-padded part.
Args:
lengths (LongTensor or List): Batch of lengths (B,).
xs (Tensor, optional): The reference tensor.
If set, masks will be the same shape as this tensor.
length_dim (int, optional): Dimension indicator of the above tensor.
See the example.
Returns:
ByteTensor: mask tensor containing indices of padded part.
dtype=torch.uint8 in PyTorch 1.2-
dtype=torch.bool in PyTorch 1.2+ (including 1.2)
Examples:
With only lengths.
>>> lengths = [5, 3, 2]
>>> make_non_pad_mask(lengths)
masks = [[1, 1, 1, 1 ,1],
[1, 1, 1, 0, 0],
[1, 1, 0, 0, 0]]
With the reference tensor.
>>> xs = torch.zeros((3, 2, 4))
>>> make_non_pad_mask(lengths, xs)
tensor([[[1, 1, 1, 1],
[1, 1, 1, 1]],
[[1, 1, 1, 0],
[1, 1, 1, 0]],
[[1, 1, 0, 0],
[1, 1, 0, 0]]], dtype=torch.uint8)
>>> xs = torch.zeros((3, 2, 6))
>>> make_non_pad_mask(lengths, xs)
tensor([[[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0]],
[[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0]],
[[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)
With the reference tensor and dimension indicator.
>>> xs = torch.zeros((3, 6, 6))
>>> make_non_pad_mask(lengths, xs, 1)
tensor([[[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 0]],
[[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0]],
[[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0]]], dtype=torch.uint8)
>>> make_non_pad_mask(lengths, xs, 2)
tensor([[[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 0]],
[[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0]],
[[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)
"""
return ~make_pad_mask(lengths, xs, length_dim)
[docs]def mask_by_length(xs, lengths, fill=0):
"""Mask tensor according to length.
Args:
xs (Tensor): Batch of input tensor (B, `*`).
lengths (LongTensor or List): Batch of lengths (B,).
fill (int or float): Value to fill masked part.
Returns:
Tensor: Batch of masked input tensor (B, `*`).
Examples:
>>> x = torch.arange(5).repeat(3, 1) + 1
>>> x
tensor([[1, 2, 3, 4, 5],
[1, 2, 3, 4, 5],
[1, 2, 3, 4, 5]])
>>> lengths = [5, 3, 2]
>>> mask_by_length(x, lengths)
tensor([[1, 2, 3, 4, 5],
[1, 2, 3, 0, 0],
[1, 2, 0, 0, 0]])
"""
assert xs.size(0) == len(lengths)
ret = xs.data.new(*xs.size()).fill_(fill)
for i, l in enumerate(lengths):
ret[i, :l] = xs[i, :l]
return ret
[docs]def th_accuracy(pad_outputs, pad_targets, ignore_label):
"""Calculate accuracy.
Args:
pad_outputs (Tensor): Prediction tensors (B * Lmax, D).
pad_targets (LongTensor): Target label tensors (B, Lmax, D).
ignore_label (int): Ignore label id.
Returns:
float: Accuracy value (0.0 - 1.0).
"""
pad_pred = pad_outputs.view(
pad_targets.size(0), pad_targets.size(1), pad_outputs.size(1)
).argmax(2)
mask = pad_targets != ignore_label
numerator = torch.sum(
pad_pred.masked_select(mask) == pad_targets.masked_select(mask)
)
denominator = torch.sum(mask)
return float(numerator) / float(denominator)
[docs]def to_torch_tensor(x):
"""Change to torch.Tensor or ComplexTensor from numpy.ndarray.
Args:
x: Inputs. It should be one of numpy.ndarray, Tensor, ComplexTensor, and dict.
Returns:
Tensor or ComplexTensor: Type converted inputs.
Examples:
>>> xs = np.ones(3, dtype=np.float32)
>>> xs = to_torch_tensor(xs)
tensor([1., 1., 1.])
>>> xs = torch.ones(3, 4, 5)
>>> assert to_torch_tensor(xs) is xs
>>> xs = {'real': xs, 'imag': xs}
>>> to_torch_tensor(xs)
ComplexTensor(
Real:
tensor([1., 1., 1.])
Imag;
tensor([1., 1., 1.])
)
"""
# If numpy, change to torch tensor
if isinstance(x, np.ndarray):
if x.dtype.kind == "c":
# Dynamically importing because torch_complex requires python3
from torch_complex.tensor import ComplexTensor
return ComplexTensor(x)
else:
return torch.from_numpy(x)
# If {'real': ..., 'imag': ...}, convert to ComplexTensor
elif isinstance(x, dict):
# Dynamically importing because torch_complex requires python3
from torch_complex.tensor import ComplexTensor
if "real" not in x or "imag" not in x:
raise ValueError("has 'real' and 'imag' keys: {}".format(list(x)))
# Relative importing because of using python3 syntax
return ComplexTensor(x["real"], x["imag"])
# If torch.Tensor, as it is
elif isinstance(x, torch.Tensor):
return x
else:
error = (
"x must be numpy.ndarray, torch.Tensor or a dict like "
"{{'real': torch.Tensor, 'imag': torch.Tensor}}, "
"but got {}".format(type(x))
)
try:
from torch_complex.tensor import ComplexTensor
except Exception:
# If PY2
raise ValueError(error)
else:
# If PY3
if isinstance(x, ComplexTensor):
return x
else:
raise ValueError(error)
[docs]def get_subsample(train_args, mode, arch):
"""Parse the subsampling factors from the args for the specified `mode` and `arch`.
Args:
train_args: argument Namespace containing options.
mode: one of ('asr', 'mt', 'st')
arch: one of ('rnn', 'rnn-t', 'rnn_mix', 'rnn_mulenc', 'transformer')
Returns:
np.ndarray / List[np.ndarray]: subsampling factors.
"""
if arch == "transformer":
return np.array([1])
elif mode == "mt" and arch == "rnn":
# +1 means input (+1) and layers outputs (train_args.elayer)
subsample = np.ones(train_args.elayers + 1, dtype=np.int64)
logging.warning("Subsampling is not performed for machine translation.")
logging.info("subsample: " + " ".join([str(x) for x in subsample]))
return subsample
elif (
(mode == "asr" and arch in ("rnn", "rnn-t"))
or (mode == "mt" and arch == "rnn")
or (mode == "st" and arch == "rnn")
):
subsample = np.ones(train_args.elayers + 1, dtype=np.int64)
if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"):
ss = train_args.subsample.split("_")
for j in range(min(train_args.elayers + 1, len(ss))):
subsample[j] = int(ss[j])
else:
logging.warning(
"Subsampling is not performed for vgg*. "
"It is performed in max pooling layers at CNN."
)
logging.info("subsample: " + " ".join([str(x) for x in subsample]))
return subsample
elif mode == "asr" and arch == "rnn_mix":
subsample = np.ones(
train_args.elayers_sd + train_args.elayers + 1, dtype=np.int64
)
if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"):
ss = train_args.subsample.split("_")
for j in range(
min(train_args.elayers_sd + train_args.elayers + 1, len(ss))
):
subsample[j] = int(ss[j])
else:
logging.warning(
"Subsampling is not performed for vgg*. "
"It is performed in max pooling layers at CNN."
)
logging.info("subsample: " + " ".join([str(x) for x in subsample]))
return subsample
elif mode == "asr" and arch == "rnn_mulenc":
subsample_list = []
for idx in range(train_args.num_encs):
subsample = np.ones(train_args.elayers[idx] + 1, dtype=np.int64)
if train_args.etype[idx].endswith("p") and not train_args.etype[
idx
].startswith("vgg"):
ss = train_args.subsample[idx].split("_")
for j in range(min(train_args.elayers[idx] + 1, len(ss))):
subsample[j] = int(ss[j])
else:
logging.warning(
"Encoder %d: Subsampling is not performed for vgg*. "
"It is performed in max pooling layers at CNN.",
idx + 1,
)
logging.info("subsample: " + " ".join([str(x) for x in subsample]))
subsample_list.append(subsample)
return subsample_list
else:
raise ValueError("Invalid options: mode={}, arch={}".format(mode, arch))
[docs]def rename_state_dict(
old_prefix: str, new_prefix: str, state_dict: Dict[str, torch.Tensor]
):
"""Replace keys of old prefix with new prefix in state dict."""
# need this list not to break the dict iterator
old_keys = [k for k in state_dict if k.startswith(old_prefix)]
if len(old_keys) > 0:
logging.warning(f"Rename: {old_prefix} -> {new_prefix}")
for k in old_keys:
v = state_dict.pop(k)
new_k = k.replace(old_prefix, new_prefix)
state_dict[new_k] = v
[docs]def get_activation(act):
"""Return activation function."""
# Lazy load to avoid unused import
from espnet.nets.pytorch_backend.conformer.swish import Swish
activation_funcs = {
"hardtanh": torch.nn.Hardtanh,
"tanh": torch.nn.Tanh,
"relu": torch.nn.ReLU,
"selu": torch.nn.SELU,
"swish": Swish,
}
return activation_funcs[act]()
[docs]def trim_by_ctc_posterior(
h: torch.Tensor,
ctc_probs: torch.Tensor,
masks: torch.Tensor,
pos_emb: torch.Tensor = None,
):
"""
Trim the encoder hidden output using CTC posterior.
The continuous frames in the tail that confidently represent
blank symbols are trimmed.
"""
# Empirical settings
frame_tolerance = 5
conf_tolerance = 0.95
blank_id = 0
assert masks.size(1) == 1
masks = masks.squeeze(1)
hlens = masks.sum(dim=1)
assert h.size()[:2] == ctc_probs.size()[:2]
assert h.size(0) == hlens.size(0)
# blank frames
max_values, max_indices = ctc_probs.max(dim=2)
blank_masks = torch.logical_and(
max_values > conf_tolerance, max_indices == blank_id
)
# plus ignored frames
joint_masks = torch.logical_or(blank_masks, ~masks)
# lengths after the trimming
B, T, _ = h.size()
frame_idx = torch.where(
joint_masks, -1, torch.arange(T).unsqueeze(0).repeat(B, 1).to(h.device)
)
after_lens = torch.where(
frame_idx.max(dim=-1)[0] + frame_tolerance + 1 < hlens,
frame_idx.max(dim=-1)[0] + frame_tolerance + 1,
hlens,
)
h = h[:, : max(after_lens)]
masks = ~make_pad_mask(after_lens).to(h.device).unsqueeze(1)
if pos_emb is None:
pos_emb = None
elif (hlens.max() * 2 - 1).item() == pos_emb.size(1): # RelPositionalEncoding
pos_emb = pos_emb[
:, pos_emb.size(1) // 2 - h.size(1) + 1 : pos_emb.size(1) // 2 + h.size(1)
]
else:
pos_emb = pos_emb[:, : h.size(1)]
return h, masks, pos_emb