import json
import math
import librosa
import numpy as np
import requests
import torch
import torchaudio
SAMPLING_RATE = 16000
INPUT_LENGTH = 9.01
# URL for the web service
SCORING_URI_DNSMOS = "https://dnsmos.azurewebsites.net/score"
SCORING_URI_DNSMOS_P835 = "https://dnsmos.azurewebsites.net/v1/dnsmosp835/score"
[docs]def poly1d(coefficients, use_numpy=False):
if use_numpy:
return np.poly1d(coefficients)
coefficients = tuple(reversed(coefficients))
def func(p):
return sum(coef * p**i for i, coef in enumerate(coefficients))
return func
[docs]class DNSMOS_web:
# ported from
# https://github.com/microsoft/DNS-Challenge/blob/master/DNSMOS/dnsmos.py
def __init__(self, auth_key):
self.auth_key = auth_key
def __call__(self, aud, input_fs, fname="", method="p808"):
if input_fs != SAMPLING_RATE:
audio = librosa.resample(aud, orig_sr=input_fs, target_sr=SAMPLING_RATE)
else:
audio = aud
# Set the content type
headers = {"Content-Type": "application/json"}
# If authentication is enabled, set the authorization header
headers["Authorization"] = f"Basic {self.auth_key}"
fname = fname + ".wav" if fname else "audio.wav"
data = {"data": audio.tolist(), "filename": fname}
input_data = json.dumps(data)
# Make the request and display the response
if method == "p808":
u = SCORING_URI_DNSMOS
else:
u = SCORING_URI_DNSMOS_P835
resp = requests.post(u, data=input_data, headers=headers)
score_dict = resp.json()
return score_dict
[docs]class DNSMOS_local:
# ported from
# https://github.com/microsoft/DNS-Challenge/blob/master/DNSMOS/dnsmos_local.py
def __init__(
self, primary_model_path, p808_model_path, use_gpu=False, convert_to_torch=False
):
self.convert_to_torch = convert_to_torch
self.use_gpu = use_gpu
if convert_to_torch:
try:
from onnx2torch import convert
except ModuleNotFoundError:
raise RuntimeError("Please install onnx2torch manually and retry!")
if primary_model_path is not None:
self.primary_model = convert(primary_model_path).eval()
self.p808_model = convert(p808_model_path).eval()
self.spectrogram = torchaudio.transforms.Spectrogram(
n_fft=321, hop_length=160, pad_mode="constant"
)
self.to_db = torchaudio.transforms.AmplitudeToDB("power", top_db=80.0)
if use_gpu:
if primary_model_path is not None:
self.primary_model = self.primary_model.cuda()
self.p808_model = self.p808_model.cuda()
self.spectrogram = self.spectrogram.cuda()
else:
try:
import onnxruntime as ort
except ModuleNotFoundError:
raise RuntimeError("Please install onnxruntime manually and retry!")
prvd = "CUDAExecutionProvider" if use_gpu else "CPUExecutionProvider"
if primary_model_path is not None:
self.onnx_sess = ort.InferenceSession(
primary_model_path, providers=[prvd]
)
self.p808_onnx_sess = ort.InferenceSession(
p808_model_path, providers=[prvd]
)
[docs] def audio_melspec(
self, audio, n_mels=120, frame_size=320, hop_length=160, sr=16000, to_db=True
):
if self.convert_to_torch:
specgram = self.spectrogram(audio)
fb = torch.as_tensor(
librosa.filters.mel(sr=sr, n_fft=frame_size + 1, n_mels=n_mels).T,
dtype=audio.dtype,
device=audio.device,
)
mel_spec = torch.matmul(specgram.transpose(-1, -2), fb).transpose(-1, -2)
if to_db:
self.to_db.db_multiplier = math.log10(
max(self.to_db.amin, torch.max(mel_spec))
)
mel_spec = (self.to_db(mel_spec) + 40) / 40
else:
mel_spec = librosa.feature.melspectrogram(
y=audio,
sr=sr,
n_fft=frame_size + 1,
hop_length=hop_length,
n_mels=n_mels,
)
if to_db:
mel_spec = (librosa.power_to_db(mel_spec, ref=np.max) + 40) / 40
return mel_spec.T
[docs] def get_polyfit_val(self, sig, bak, ovr, is_personalized_MOS):
flag = not self.convert_to_torch
if is_personalized_MOS:
p_ovr = poly1d([-0.00533021, 0.005101, 1.18058466, -0.11236046], flag)
p_sig = poly1d([-0.01019296, 0.02751166, 1.19576786, -0.24348726], flag)
p_bak = poly1d([-0.04976499, 0.44276479, -0.1644611, 0.96883132], flag)
else:
p_ovr = poly1d([-0.06766283, 1.11546468, 0.04602535], flag)
p_sig = poly1d([-0.08397278, 1.22083953, 0.0052439], flag)
p_bak = poly1d([-0.13166888, 1.60915514, -0.39604546], flag)
sig_poly = p_sig(sig)
bak_poly = p_bak(bak)
ovr_poly = p_ovr(ovr)
return sig_poly, bak_poly, ovr_poly
def __call__(self, aud, input_fs, is_personalized_MOS=False):
if self.convert_to_torch:
device = "cuda" if self.use_gpu else "cpu"
if isinstance(aud, torch.Tensor):
aud = aud.to(device=device)
else:
aud = torch.as_tensor(aud, dtype=torch.float32, device=device)
else:
aud = aud.cpu().detach().numpy() if isinstance(aud, torch.Tensor) else aud
if input_fs != SAMPLING_RATE:
if self.convert_to_torch:
audio = torch.as_tensor(
librosa.resample(
aud.detach().cpu().numpy(),
orig_sr=input_fs,
target_sr=SAMPLING_RATE,
),
dtype=aud.dtype,
device=aud.device,
)
else:
audio = librosa.resample(aud, orig_sr=input_fs, target_sr=SAMPLING_RATE)
else:
audio = aud
len_samples = int(INPUT_LENGTH * SAMPLING_RATE)
while len(audio) < len_samples:
if self.convert_to_torch:
audio = torch.cat((audio, audio))
else:
audio = np.append(audio, audio)
num_hops = int(np.floor(len(audio) / SAMPLING_RATE) - INPUT_LENGTH) + 1
hop_len_samples = SAMPLING_RATE
predicted_mos_sig_seg_raw = []
predicted_mos_bak_seg_raw = []
predicted_mos_ovr_seg_raw = []
predicted_mos_sig_seg = []
predicted_mos_bak_seg = []
predicted_mos_ovr_seg = []
predicted_p808_mos = []
for idx in range(num_hops):
audio_seg = audio[
int(idx * hop_len_samples) : int((idx + INPUT_LENGTH) * hop_len_samples)
]
if len(audio_seg) < len_samples:
continue
if self.convert_to_torch:
input_features = audio_seg.float()[None, :]
p808_input_features = self.audio_melspec(
audio=audio_seg[:-160]
).float()[None, :, :]
p808_mos = self.p808_model(p808_input_features)
mos_sig_raw, mos_bak_raw, mos_ovr_raw = self.primary_model(
input_features
)[0]
else:
input_features = np.array(audio_seg).astype("float32")[np.newaxis, :]
p808_input_features = np.array(
self.audio_melspec(audio=audio_seg[:-160])
).astype("float32")[np.newaxis, :, :]
p808_mos = self.p808_onnx_sess.run(
None, {"input_1": p808_input_features}
)[0][0][0]
mos_sig_raw, mos_bak_raw, mos_ovr_raw = self.onnx_sess.run(
None, {"input_1": input_features}
)[0][0]
mos_sig, mos_bak, mos_ovr = self.get_polyfit_val(
mos_sig_raw, mos_bak_raw, mos_ovr_raw, is_personalized_MOS
)
predicted_mos_sig_seg_raw.append(mos_sig_raw)
predicted_mos_bak_seg_raw.append(mos_bak_raw)
predicted_mos_ovr_seg_raw.append(mos_ovr_raw)
predicted_mos_sig_seg.append(mos_sig)
predicted_mos_bak_seg.append(mos_bak)
predicted_mos_ovr_seg.append(mos_ovr)
predicted_p808_mos.append(p808_mos)
to_array = torch.stack if self.convert_to_torch else np.array
return {
"OVRL_raw": to_array(predicted_mos_ovr_seg_raw).mean(),
"SIG_raw": to_array(predicted_mos_sig_seg_raw).mean(),
"BAK_raw": to_array(predicted_mos_bak_seg_raw).mean(),
"OVRL": to_array(predicted_mos_ovr_seg).mean(),
"SIG": to_array(predicted_mos_sig_seg).mean(),
"BAK": to_array(predicted_mos_bak_seg).mean(),
"P808_MOS": to_array(predicted_p808_mos).mean(),
}