Source code for espnet2.fileio.score_scp

import collections.abc
import json
from pathlib import Path
from typing import Union

import numpy as np
from typeguard import typechecked

from espnet2.fileio.read_text import read_2columns_text

try:
    import music21 as m21  # for CI import
except (ImportError, ModuleNotFoundError):
    m21 = None
try:
    import miditoolkit  # for CI import
except (ImportError, ModuleNotFoundError):
    miditoolkit = None


[docs]class NOTE(object): def __init__(self, lyric, midi, st, et): self.lyric = lyric self.midi = midi self.st = st self.et = et
[docs]class XMLReader(collections.abc.Mapping): """Reader class for 'xml.scp'. Examples: key1 /some/path/a.xml key2 /some/path/b.xml key3 /some/path/c.xml key4 /some/path/d.xml ... >>> reader = XMLScpReader('xml.scp') >>> tempo, note_list = reader['key1'] """ @typechecked def __init__( self, fname, dtype=np.int16, ): assert m21 is not None, ( "Cannot load music21 package. ", "Please install Muskit modules via ", "(cd tools && make muskit.done)", ) self.fname = fname self.dtype = dtype self.data = read_2columns_text(fname) # get key-value dict def __getitem__(self, key): score = m21.converter.parse(self.data[key]) m = score.metronomeMarkBoundaries() tempo = int(m[0][2].number) part = score.parts[0].flat notes_list = [] prepitch = -1 st = 0 for note in part.notesAndRests: dur = note.seconds if not note.isRest: # Note or Chord lr = note.lyric if note.isChord: for n in note: if n.pitch.midi != prepitch: # Ignore repeat note note = n break if lr is None or lr == "" or lr == "ー": # multi note in one syllable if note.pitch.midi == prepitch: # same pitch notes_list[-1].et += dur else: # different pitch notes_list.append(NOTE("—", note.pitch.midi, st, st + dur)) elif lr == "br": # <br> is tagged as a note if prepitch == 0: notes_list[-1].et += dur else: notes_list.append(NOTE("P", 0, st, st + dur)) prepitch = 0 st += dur continue else: # normal note for one syllable notes_list.append(NOTE(lr, note.pitch.midi, st, st + dur)) prepitch = note.pitch.midi for arti in note.articulations: # NOTE(Yuning): By default, 'breath mark' appears at the end of # the sentence. In some situations, 'breath mark' doesn't take # effect in its belonging note. Please handle them under local/. if arti.name in ["breath mark"]: # <br> is tagged as a notation notes_list.append(NOTE("B", 0, st + dur, st + dur)) # NOTE(Yuning): In some datasets, there is a break when 'staccato' # occurs. We let users to decide whether to perform segmentation # under local/. else: # rest note if prepitch == 0: notes_list[-1].et += dur else: notes_list.append(NOTE("P", 0, st, st + dur)) prepitch = 0 st += dur # NOTE(Yuning): implicit rest at the end of xml file should be removed. if notes_list[-1].midi == 0 and notes_list[-1].lyric == "P": notes_list.pop() return tempo, notes_list
[docs] def get_path(self, key): return self.data[key]
def __contains__(self, item): return item def __len__(self): return len(self.data) def __iter__(self): return iter(self.data)
[docs] def keys(self): return self.data.keys()
[docs]class XMLWriter: """Writer class for 'midi.scp' Examples: key1 /some/path/a.musicxml key2 /some/path/b.musicxml key3 /some/path/c.musicxml key4 /some/path/d.musicxml ... >>> writer = XMLScpWriter('./data/', './data/xml.scp') >>> writer['aa'] = xml_obj >>> writer['bb'] = xml_obj """ @typechecked def __init__( self, outdir: Union[Path, str], scpfile: Union[Path, str], ): self.dir = Path(outdir) self.dir.mkdir(parents=True, exist_ok=True) scpfile = Path(scpfile) scpfile.parent.mkdir(parents=True, exist_ok=True) self.fscp = scpfile.open("w", encoding="utf-8") self.data = {} def __setitem__(self, key: str, value: tuple): assert ( len(value) == 4 ), "The xml values should include lyrics, note, segmentations and tempo" lyrics_seq, notes_seq, segs_seq, tempo = value xml_path = self.dir / f"{key}.musicxml" xml_path.parent.mkdir(parents=True, exist_ok=True) m = m21.stream.Stream() m.insert(m21.tempo.MetronomeMark(number=tempo)) bps = 1.0 * tempo / 60 offset = 0 for i in range(len(lyrics_seq)): duration = int(8 * (segs_seq[i][1] - segs_seq[i][0]) * bps + 0.5) duration = 1.0 * duration / 8 if duration == 0: duration = 1 / 16 if notes_seq[i] != 0: # isNote n = m21.note.Note(notes_seq[i]) if lyrics_seq[i] != "—": n.lyric = lyrics_seq[i] else: # isRest n = m21.note.Rest() n.offset = offset n.duration = m21.duration.Duration(duration) m.insert(n) offset += duration m.write("xml", fp=xml_path) self.fscp.write(f"{key} {xml_path}\n") self.data[key] = str(xml_path)
[docs] def get_path(self, key): return self.data[key]
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.fscp.close()
[docs]class MIDReader(collections.abc.Mapping): """Reader class for 'mid.scp'. Examples: key1 /some/path/a.mid key2 /some/path/b.mid key3 /some/path/c.mid key4 /some/path/d.mid ... >>> reader = XMLScpReader('mid.scp') >>> tempo, note_list = reader['key1'] """ @typechecked def __init__( self, fname, add_rest=True, dtype=np.int16, ): assert miditoolkit is not None, ( "Cannot load miditoolkit package. ", "Please install Muskit modules via ", "(cd tools && make muskit.done)", ) self.fname = fname self.dtype = dtype self.add_rest = add_rest # add rest into note sequencee self.data = read_2columns_text(fname) # get key-value dict def __getitem__(self, key): midi_obj = miditoolkit.midi.parser.MidiFile(self.data[key]) # load tempo tempos = midi_obj.tempo_changes tempos.sort(key=lambda x: (x.time, x.tempo)) assert len(tempos) == 1 tempo = int(tempos[0].tempo + 0.5) # load pitch time sequence tick_to_time = midi_obj.get_tick_to_time_mapping() notes = midi_obj.instruments[0].notes notes.sort(key=lambda x: (x.start, x.pitch)) notes_list = [] pre_et = 0 for note in notes: st = tick_to_time[note.start] et = tick_to_time[note.end] # NOTE(Yuning): MIDIs don't have explicit rest notes. # Explicit rest notes might be needed for stage 1 in svs. if st != pre_et and self.add_rest: notes_list.append(NOTE("P", 0, pre_et, st)) notes_list.append(NOTE("*", note.pitch, st, et)) pre_et = et return tempo, notes_list
[docs] def get_path(self, key): return self.data[key]
def __contains__(self, item): return item def __len__(self): return len(self.data) def __iter__(self): return iter(self.data)
[docs] def keys(self): return self.data.keys()
[docs]class SingingScoreReader(collections.abc.Mapping): """Reader class for 'score.scp'. Examples: key1 /some/path/score.json key2 /some/path/score.json key3 /some/path/score.json key4 /some/path/score.json ... >>> reader = SoundScpReader('score.scp') >>> score = reader['key1'] """ @typechecked def __init__( self, fname, dtype=np.int16, ): self.fname = fname self.dtype = dtype self.data = read_2columns_text(fname) def __getitem__(self, key): with open(self.data[key], "r") as f: score = json.load(f) return score
[docs] def get_path(self, key): return self.data[key]
def __contains__(self, item): return item def __len__(self): return len(self.data) def __iter__(self): return iter(self.data)
[docs] def keys(self): return self.data.keys()
[docs]class SingingScoreWriter: """Writer class for 'score.scp' Examples: key1 /some/path/score.json key2 /some/path/score.json key3 /some/path/score.json key4 /some/path/score.json ... >>> writer = SingingScoreWriter('./data/', './data/score.scp') >>> writer['aa'] = score_obj >>> writer['bb'] = score_obj """ @typechecked def __init__( self, outdir: Union[Path, str], scpfile: Union[Path, str], ): self.dir = Path(outdir) self.dir.mkdir(parents=True, exist_ok=True) scpfile = Path(scpfile) scpfile.parent.mkdir(parents=True, exist_ok=True) self.fscp = scpfile.open("w", encoding="utf-8") self.data = {} def __setitem__(self, key: str, value: dict): """Score should be a dict Example: { "tempo": bpm, "item_list": a subset of ["st", "et", "lyric", "midi", "phn"], "note": [ [start_time1, end_time1, lyric1, midi1, phn1], [start_time2, end_time2, lyric2, midi2, phn2], ... ] } The itmes in each note correspond to the "item_list". """ score_path = self.dir / f"{key}.json" score_path.parent.mkdir(parents=True, exist_ok=True) with open(score_path, "w") as f: json.dump(value, f, ensure_ascii=False, indent=2) self.fscp.write(f"{key} {score_path}\n") self.data[key] = str(score_path)
[docs] def get_path(self, key): return self.data[key]
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.fscp.close()