Source code for espnet2.samplers.folded_batch_sampler

from typing import Iterator, List, Optional, Sequence, Tuple, Union

from typeguard import typechecked

from espnet2.fileio.read_text import load_num_sequence_text, read_2columns_text
from espnet2.samplers.abs_sampler import AbsSampler


[docs]class FoldedBatchSampler(AbsSampler): @typechecked def __init__( self, batch_size: int, shape_files: Union[Tuple[str, ...], List[str]], fold_lengths: Sequence[int], min_batch_size: int = 1, sort_in_batch: str = "descending", sort_batch: str = "ascending", drop_last: bool = False, utt2category_file: Optional[str] = None, ): assert batch_size > 0 if sort_batch != "ascending" and sort_batch != "descending": raise ValueError( f"sort_batch must be ascending or descending: {sort_batch}" ) if sort_in_batch != "descending" and sort_in_batch != "ascending": raise ValueError( f"sort_in_batch must be ascending or descending: {sort_in_batch}" ) self.batch_size = batch_size self.shape_files = shape_files self.sort_in_batch = sort_in_batch self.sort_batch = sort_batch self.drop_last = drop_last # utt2shape: (Length, ...) # uttA 100,... # uttB 201,... utt2shapes = [ load_num_sequence_text(s, loader_type="csv_int") for s in shape_files ] first_utt2shape = utt2shapes[0] for s, d in zip(shape_files, utt2shapes): if set(d) != set(first_utt2shape): raise RuntimeError( f"keys are mismatched between {s} != {shape_files[0]}" ) # Sort samples in ascending order # (shape order should be like (Length, Dim)) keys = sorted(first_utt2shape, key=lambda k: first_utt2shape[k][0]) if len(keys) == 0: raise RuntimeError(f"0 lines found: {shape_files[0]}") category2utt = {} if utt2category_file is not None: utt2category = read_2columns_text(utt2category_file) if set(utt2category) != set(first_utt2shape): raise RuntimeError( "keys are mismatched between " f"{utt2category_file} != {shape_files[0]}" ) for k in keys: category2utt.setdefault(utt2category[k], []).append(k) else: category2utt["default_category"] = keys self.batch_list = [] for d, v in category2utt.items(): category_keys = v # Decide batch-sizes start = 0 batch_sizes = [] while True: k = category_keys[start] factor = max(int(d[k][0] / m) for d, m in zip(utt2shapes, fold_lengths)) bs = max(min_batch_size, int(batch_size / (1 + factor))) if self.drop_last and start + bs > len(category_keys): # This if-block avoids 0-batches if len(self.batch_list) > 0: break bs = min(len(category_keys) - start, bs) batch_sizes.append(bs) start += bs if start >= len(category_keys): break if len(batch_sizes) == 0: # Maybe we can't reach here raise RuntimeError("0 batches") # If the last batch-size is smaller than minimum batch_size, # the samples are redistributed to the other mini-batches if len(batch_sizes) > 1 and batch_sizes[-1] < min_batch_size: for i in range(batch_sizes.pop(-1)): batch_sizes[-(i % len(batch_sizes)) - 2] += 1 if not self.drop_last: # Bug check assert sum(batch_sizes) == len( category_keys ), f"{sum(batch_sizes)} != {len(category_keys)}" # Set mini-batch cur_batch_list = [] start = 0 for bs in batch_sizes: assert len(category_keys) >= start + bs, "Bug" minibatch_keys = category_keys[start : start + bs] start += bs if sort_in_batch == "descending": minibatch_keys.reverse() elif sort_in_batch == "ascending": # Key are already sorted in ascending pass else: raise ValueError( "sort_in_batch must be ascending or " f"descending: {sort_in_batch}" ) cur_batch_list.append(tuple(minibatch_keys)) if sort_batch == "ascending": pass elif sort_batch == "descending": cur_batch_list.reverse() else: raise ValueError( f"sort_batch must be ascending or descending: {sort_batch}" ) self.batch_list.extend(cur_batch_list) def __repr__(self): return ( f"{self.__class__.__name__}(" f"N-batch={len(self)}, " f"batch_size={self.batch_size}, " f"shape_files={self.shape_files}, " f"sort_in_batch={self.sort_in_batch}, " f"sort_batch={self.sort_batch})" ) def __len__(self): return len(self.batch_list) def __iter__(self) -> Iterator[Tuple[str, ...]]: return iter(self.batch_list)