嗅ぐ音を認識する音声モデルを作る2
この記事を閲覧するにはアカウントが必要です。
前回:[[『嗅ぐ音』を認識する音声モデルを作る]]
あれから数年。もっと上に行きたい。
前回の課題点
前回はESC-50を使ったclip-levelの検出を行った。
モデルの学習自体はそこそこうまくいき、抽出の精度も高い。しかし5秒おきにオーディオをカットして探知することしか出来ないから、正確なポイントを探知できなかった。
たとえば4分の音声があるとして、2s-4s, 66s-67.8sのみに嗅ぐ音があるとしても、5秒刻みでしか取れないので1-5s, 65-70sが「嗅ぎ音」ラベルをつけられて切り取られるということ。
今回はフレーム単位で、完璧なタイミングを検知できるようにしたい。
で、エジソンも横転する無数の試行錯誤の結果、成功しました。ほぼ完璧です。
どんな魔法を使ったかざっくらばんに解説します。
TLDR
- PANNs (Pretrained Audio Neural Networks) CNN14 + MIL (Multiple Instance Learning) ベース
- 約160ms精度で発話区間の開始・終了を特定できる
- モデルのテストデータへのaccuracyは脅威の0.998
kwsk
新しいアプローチ:PANNs + Multiple Instance Learning (MIL)
今回のブレークスルーの鍵は、PANNs (Pre-trained Audio Neural Networks) という強力な音声認識モデルと、MIL (Multiple Instance Learning) という学習手法の組み合わせです。
1. PANNs:音声界の画像認識モデル
PANNsは、大量の音声データで事前学習された非常に高性能なニューラルネットワークです。音声データを画像(メルスペクトログラム)のように捉え、そこから特徴を抽出する能力に長けています。特に「CNN14」というモデルは、音声の特徴を時間的な解像度を保ったまま抽出できるため、今回の目的にピッタリでした。
2. MIL:正確なタイムスタンプがなくても大丈夫
フレーム単位(数ミリ秒単位)で音を検出したいなら、学習データもフレーム単位で「このフレームは嗅ぐ音、このフレームは違う」というラベル付け(アノテーション)が必要になるのが普通です。しかし、そんな作業は正直言ってやってられません。
そこで登場するのがMIL(多重インスタンス学習)です。これは、以下のような「弱いラベル」だけで学習できる魔法のような手法です。
- 従来の学習: 「この5秒のクリップは、1.2秒〜3.5秒の間だけ『嗅ぐ音』だ」という強いラベル(フレーム単位)が必要。
- MILでの学習: 「この5秒のクリップには、どこかに『嗅ぐ音』が含まれている」という弱いラベル(クリップ単位)だけでOK。
学習時には、モデルはクリップ内の全フレームのスコアを計算し、その中の最大値(または平均値)をそのクリップ全体のスコアとします。そして、「嗅ぐ音」のラベルが付いたクリップでは、どこか1フレームでもスコアが高くなるように学習が進みます。これにより、面倒なフレーム単位のアノテーション作業を完全に省略しつつ、フレーム単位で検出できるモデルを構築できるのです。
試行錯誤
そんなわけでPANNsとMILを使えばなんとかなるよとAIに言われた漏れは試行錯誤をした。
まず、データセットは前回の使いまわしでいく。

それぞれ165件くらい。種類の選別も前回テキトーに決めたものだが、今回の目的はクラス0(=嗅ぐ音)を検知することそれのみで、他は言ってしまえばデコイ。どれだけ他の偽陽性があろうと0のラベルさえ正確な位置に貼られていればそれで良いわけです。
幸か不幸か、softmaxではなくsigmoidなのでラベルを奪い合うこともない。ほんとはsoftmaxにして嗅ぐ音とそれ以外とかにした方がいいのかもしれんけど、知らん知らん。うまくいったからええんや。
ていうかフレーム単位で見直す都合上たぶんsigmoidじゃないとダメでしょうね。知らんけど。おれは雰囲気でAIをやっている。
データセットづくり
"""
データセット生成スクリプト(PANNs用)
音声をそのままwaveformとして保存する。
特徴量抽出はモデル内部で行うため、ここではリサンプリングのみ。
"""
import os, glob, numpy as np, librosa, param
from sklearn.model_selection import train_test_split
from tqdm import tqdm
DATASET_ROOT = "./dataset_root/"
def load_and_resample(path):
y, _ = librosa.load(path, sr=param.SR, mono=True)
if len(y) > 0:
# ピークを-1.0〜1.0に合わせる(またはRMSで合わせる)
max_vol = np.max(np.abs(y))
if max_vol > 0:
y = y / max_vol
# 長さをCLIP_SAMPLESに統一
if len(y) < param.CLIP_SAMPLES:
y = np.pad(y, (0, param.CLIP_SAMPLES - len(y)))
else:
y = y[:param.CLIP_SAMPLES]
return y.astype(np.float32)
def add_white_noise(x, rate=0.002):
return x + rate * np.random.randn(len(x)).astype(np.float32)
def shift_sound(x, rate=2):
return np.roll(x, int(len(x) // rate))
def stretch_sound(x, rate=1.1):
x = librosa.effects.time_stretch(y=x, rate=rate)
if len(x) > param.CLIP_SAMPLES:
return x[:param.CLIP_SAMPLES]
return np.pad(x, (0, max(0, param.CLIP_SAMPLES - len(x)))).astype(np.float32)
def save_dataset(filename, paths, labels, aug_type=None):
print(f"Generating {filename}...")
waves = []
for path in tqdm(paths):
y = load_and_resample(path)
if aug_type == "wn":
y = add_white_noise(y, rate=np.random.uniform(0.001, 0.005))
elif aug_type == "ss":
y = shift_sound(y, rate=np.random.uniform(2, 5))
elif aug_type == "st":
y = stretch_sound(y, rate=np.random.uniform(0.8, 1.2))
waves.append(y)
np.savez(filename, x=np.array(waves), y=np.array(labels))
print(f" Saved {len(waves)} clips to {filename}")
if __name__ == "__main__":
all_x, all_y = [], []
for subdir in sorted(glob.glob(os.path.join(DATASET_ROOT, "*"))):
if not os.path.isdir(subdir):
continue
label = int(os.path.basename(subdir).split('_')[0])
files = []
for ext in ['*.wav', '*.mp3']:
files.extend(glob.glob(os.path.join(subdir, ext)))
for f in files:
all_x.append(f)
all_y.append(label)
print(f"Total clips: {len(all_x)}, Label distribution: {np.bincount(all_y)}")
x_train, x_test, y_train, y_test = train_test_split(
all_x, all_y, test_size=0.2, stratify=all_y, random_state=42)
save_dataset("panns_test.npz", x_test, y_test)
save_dataset("panns_train_raw.npz", x_train, y_train)
save_dataset("panns_train_wn.npz", x_train, y_train, aug_type="wn")
save_dataset("panns_train_ss.npz", x_train, y_train, aug_type="ss")
save_dataset("panns_train_st.npz", x_train, y_train, aug_type="st")
print("Done.")
何の変哲もない。npzが割と重くなるのがネックではある。
このコード内で行っているデータセットの水増し(ノイズとか足すやつ)は割と大事です。一回これなしでやったら過学習になりまくって終わったので。
モデル構成
"""
PANNs CNN14 + MIL (Multiple Instance Learning) SED モデル
アーキテクチャ:
- CNN14のframewise_output (約64ms/フレーム、2048次元) をそのまま使う
- その上に小さなヘッド (2048 -> NUM_CLASSES) を乗せる
- 学習: MIL = clipのmax-poolingでclip-levelスコアを作りBCELoss
- 推論: framewise_outputをそのまま使ってフレーム単位スコアを返す
時間解像度 = HOP_SIZE / SR = 320/32000 = 0.01秒 (10ms)
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchlibrosa as tl
import param
def init_layer(layer):
nn.init.xavier_uniform_(layer.weight)
if hasattr(layer, 'bias') and layer.bias is not None:
nn.init.constant_(layer.bias, 0.)
def init_bn(bn):
nn.init.constant_(bn.bias, 0.)
nn.init.constant_(bn.weight, 1.)
class ConvBlock(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1, bias=False)
self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
init_layer(self.conv1); init_layer(self.conv2)
init_bn(self.bn1); init_bn(self.bn2)
def forward(self, x, pool_size=(2, 2)):
x = F.relu_(self.bn1(self.conv1(x)))
x = F.relu_(self.bn2(self.conv2(x)))
if pool_size is not None:
x = F.avg_pool2d(x, pool_size)
return x
class PANNsSED(nn.Module):
"""
CNN14バックボーン + MILヘッド
forward(x) -> (B, NUM_CLASSES) 学習用: clip-level (max pooling)
forward_frames(x) -> (B, T, NUM_CLASSES) 推論用: frame-level (~10ms/frame)
"""
def __init__(self):
super().__init__()
self.spectrogram_extractor = tl.Spectrogram(
n_fft=param.WINDOW_SIZE, hop_length=param.HOP_SIZE,
win_length=param.WINDOW_SIZE, window='hann',
center=True, pad_mode='reflect', freeze_parameters=True)
self.logmel_extractor = tl.LogmelFilterBank(
sr=param.SR, n_fft=param.WINDOW_SIZE, n_mels=param.MEL_BINS,
fmin=param.FMIN, fmax=param.FMAX, ref=1.0, amin=1e-10,
top_db=None, freeze_parameters=True)
self.bn0 = nn.BatchNorm2d(64)
# CNN14と同じ構成
# block1-4: 時間・周波数ともに (2,2) pooling
# block5-6: 周波数のみ (2,1) pooling → 時間解像度を保持
self.conv_block1 = ConvBlock(1, 64)
self.conv_block2 = ConvBlock(64, 128)
self.conv_block3 = ConvBlock(128, 256)
self.conv_block4 = ConvBlock(256, 512)
self.conv_block5 = ConvBlock(512, 1024)
self.conv_block6 = ConvBlock(1024, 2048)
# フレーム単位の分類ヘッド
self.fc_frame = nn.Linear(2048, param.NUM_CLASSES, bias=True)
init_bn(self.bn0)
init_layer(self.fc_frame)
def _cnn_features(self, x):
"""waveform -> (B, T, 2048) 高時間解像度のフレーム特徴量"""
x = self.spectrogram_extractor(x) # (B, 1, T, F)
x = self.logmel_extractor(x) # (B, 1, T, mel)
x = x.transpose(1, 3) # (B, mel, T, 1)
x = self.bn0(x)
x = x.transpose(1, 3) # (B, 1, T, mel)
x = self.conv_block1(x, pool_size=(2, 2))
x = F.dropout(x, p=0.2, training=self.training)
x = self.conv_block2(x, pool_size=(2, 2))
x = F.dropout(x, p=0.2, training=self.training)
x = self.conv_block3(x, pool_size=(2, 2))
x = F.dropout(x, p=0.2, training=self.training)
x = self.conv_block4(x, pool_size=(2, 2))
x = F.dropout(x, p=0.2, training=self.training)
# block5,6: 周波数軸(width方向)のみ縮小、時間軸(height方向)は保持
x = self.conv_block5(x, pool_size=(1, 2))
x = F.dropout(x, p=0.2, training=self.training)
x = self.conv_block6(x, pool_size=(1, 2))
x = F.dropout(x, p=0.2, training=self.training)
# 周波数軸を平均で潰す -> (B, 2048, T)
x = torch.mean(x, dim=3)
x = x.transpose(1, 2) # (B, T, 2048)
return x
def forward_frames(self, x):
"""推論用: フレーム単位スコア (B, T, NUM_CLASSES)"""
feat = self._cnn_features(x) # (B, T, 2048)
return torch.sigmoid(self.fc_frame(feat)) # (B, T, NUM_CLASSES)
def forward(self, x):
"""学習用: MIL (max + avg pooling) で clip-level スコア (B, NUM_CLASSES)"""
frame_scores = self.forward_frames(x) # (B, T, NUM_CLASSES)
# max pooling: 少なくとも1フレーム高ければ検出できる
max_scores, _ = torch.max(frame_scores, dim=1) # (B, NUM_CLASSES)
# avg pooling: clip全体にわたってスコアを上げることを促す
avg_scores = torch.mean(frame_scores, dim=1) # (B, NUM_CLASSES)
# 両者の平均: スパース化を防ぎつつ感度も保つ
return (max_scores + avg_scores) / 2
def freeze_backbone(self):
for name, p in self.named_parameters():
if 'fc_frame' not in name:
p.requires_grad = False
def unfreeze_backbone(self):
for p in self.parameters():
p.requires_grad = True
def load_pretrained(model, pretrained_path):
"""CNN14の事前学習済み重みをロード(ヘッド以外)"""
print(f"Loading pretrained weights from {pretrained_path}")
state_dict = torch.load(pretrained_path, map_location='cpu', weights_only=False)['model']
my_dict = model.state_dict()
matched = {k: v for k, v in state_dict.items()
if k in my_dict and my_dict[k].shape == v.shape}
my_dict.update(matched)
model.load_state_dict(my_dict)
print(f" Matched {len(matched)}/{len(my_dict)} layers.")
return model
基本的にはPANNsのCNN14というモデルをベースにしている。なんか知らんけど五条悟だと聞いたので。(最強の意)
参考:
普通の音声分類モデルなら最後にGlobal Poolingですべての特徴量を潰して「この音声は犬!」とかやるんですが、今回は「いつ」鳴ったかが知りたい。つまり時間軸を潰してはいけないわけです。
なので、途中まではCNN14と同じですが、最後の出力層付近で時間情報を保持したまま確率を出力するようにしています。
この機構はメルスペクトログラムとかいうシャニマスのアイドルグループみたいな機構を使ってて、ようは音声を画像として処理してる。この変換処理はtorchlibrosaが勝手にやってくれてる。
ポイントは forward と forward_frames の二段構えになっているところ。
学習時は forward を呼んで、「クリップ単位の正解ラベル」に対して、フレーム単位の推論結果を「MaxとAvg」で要約したもので戦う。
一方で、実際に使う推論時は forward_frames を呼べば、時間軸が残ったままの確率 (Batch, Time, Class) が返ってくる。
これにより、「学習データは5秒単位でいいのに、推論結果は精密(0.16秒単位)」という裏技を実現している。
学習コード
"""
MIL (Multiple Instance Learning) + PANNs 学習スクリプト
弱ラベル(clip単位)でフレーム精度のSEDモデルを学習する。
MIL: clip内の最大フレームスコアをclipスコアとしてBCELossを計算。
フレームアノテーション不要。
Phase 1 (epoch 1-20): ヘッドのみ学習(バックボーン凍結)
Phase 2 (epoch 21-100): 全体finetuning
"""
import os, numpy as np, torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import param
from model_panns import PANNsSED, load_pretrained
def get_device():
if torch.cuda.is_available():
device = "cuda"
print(f"using GPU: {device}")
else:
device = torch.device('cpu')
print("CPU mode")
return device
class AudioDataset(Dataset):
def __init__(self, waves, labels):
self.waves = waves.astype(np.float32)
self.labels = labels.astype(np.int64)
def __len__(self):
return len(self.waves)
def __getitem__(self, idx):
wave = torch.from_numpy(self.waves[idx])
label = torch.zeros(param.NUM_CLASSES)
label[self.labels[idx]] = 1.0
return wave, label
def load_all_npz(files):
xs, ys = [], []
for f in files:
if not os.path.exists(f):
print(f"WARNING: {f} not found, skipping.")
continue
d = np.load(f)
xs.append(d['x']); ys.append(d['y'])
return np.concatenate(xs), np.concatenate(ys)
def run_epoch(model, loader, criterion, optimizer, device, training):
model.train(training)
total_loss, correct, total = 0, 0, 0
with torch.set_grad_enabled(training):
for waves, labels in loader:
waves, labels = waves.to(device), labels.to(device)
if training:
optimizer.zero_grad()
frame_scores = model.forward_frames(waves) # (B, T, NUM_CLASSES)
max_scores, _= torch.max(frame_scores, dim=1) # (B, NUM_CLASSES)
avg_scores = torch.mean(frame_scores, dim=1) # (B, NUM_CLASSES)
# max loss: 少なくとも1フレームが正解クラスに反応することを学ぶ
loss_max = criterion(max_scores, labels)
# avg loss: clip全体にわたって正解クラスが高スコアになることを学ぶ
loss_avg = criterion(avg_scores, labels)
# avgの重みを大きくしてスパース化を抑制
loss = loss_max * 0.3 + loss_avg * 0.7
if training:
loss.backward()
optimizer.step()
total_loss += loss.item()
correct += (max_scores.argmax(1) == labels.argmax(1)).sum().item()
total += len(waves)
return total_loss / len(loader), correct / total
def train():
DEVICE = get_device()
train_files = [
"panns_train_raw.npz", "panns_train_wn.npz",
"panns_train_ss.npz", "panns_train_st.npz",
]
X, Y = load_all_npz(train_files)
print(f"Total clips: {len(X)}, dist: {np.bincount(Y.astype(int))}")
x_tr, x_val, y_tr, y_val = train_test_split(
X, Y, test_size=0.2, stratify=Y, random_state=42)
train_dl = DataLoader(AudioDataset(x_tr, y_tr), batch_size=16,
shuffle=True, num_workers=0)
val_dl = DataLoader(AudioDataset(x_val, y_val), batch_size=16,
shuffle=False, num_workers=0)
model = PANNsSED().to(DEVICE)
if os.path.exists(param.PANNS_PRETRAINED_PATH):
model = load_pretrained(model, param.PANNS_PRETRAINED_PATH)
else:
print(f"WARNING: No pretrained weights at {param.PANNS_PRETRAINED_PATH}")
print(" Download: https://zenodo.org/record/3987831/files/Cnn14_mAP%3D0.431.pth")
criterion = nn.BCELoss()
best_val_loss = float('inf')
# Phase 1: ヘッドのみ
print("\n=== Phase 1: Head only ===")
model.freeze_backbone()
optimizer = torch.optim.Adam(
filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=3, factor=0.5)
for epoch in range(1, 21):
tr_loss, tr_acc = run_epoch(model, train_dl, criterion, optimizer, DEVICE, True)
va_loss, va_acc = run_epoch(model, val_dl, criterion, None, DEVICE, False)
scheduler.step(va_loss)
print(f"Epoch {epoch:3d}/20 "
f"train loss={tr_loss:.4f} acc={tr_acc:.3f} "
f"val loss={va_loss:.4f} acc={va_acc:.3f}")
if va_loss < best_val_loss:
best_val_loss = va_loss
torch.save(model.state_dict(), param.MODEL_PATH)
print(f" --> Saved")
# Phase 2: 全体finetuning
print("\n=== Phase 2: Full finetuning ===")
model.unfreeze_backbone()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5)
for epoch in range(21, 101):
tr_loss, tr_acc = run_epoch(model, train_dl, criterion, optimizer, DEVICE, True)
va_loss, va_acc = run_epoch(model, val_dl, criterion, None, DEVICE, False)
scheduler.step(va_loss)
print(f"Epoch {epoch:3d}/100 "
f"train loss={tr_loss:.4f} acc={tr_acc:.3f} "
f"val loss={va_loss:.4f} acc={va_acc:.3f}")
if va_loss < best_val_loss:
best_val_loss = va_loss
torch.save(model.state_dict(), param.MODEL_PATH)
print(f" --> Saved")
print(f"\nDone. Best val_loss={best_val_loss:.4f}")
print(f"Model: {param.MODEL_PATH}")
if __name__ == "__main__":
train()
これが最終版(ver114514くらい)。
8:2の割合で学習とテストを切って、宇宙の答えをシードにする、親の顔より見たトレーニングコードです。語るべき部分があるとすればforward部くらいか。
# max loss: 少なくとも1フレームが正解クラスに反応することを学ぶ
loss_max = criterion(max_scores, labels)
# avg loss: clip全体にわたって正解クラスが高スコアになることを学ぶ
loss_avg = criterion(avg_scores, labels)
# avgの重みを大きくしてスパース化を抑制
loss = loss_max * 0.3 + loss_avg * 0.7
ここがかなり色々試してこうなったって感じ。loss_maxだけだとフレーム位置がメチャクチャ短くなりがち(1フレ合ってるだけで高得点になるので)で、loss_avgだけだと前回の二の舞いになるから、3行目の小学三年生みたいな四則演算を入れる必要があったんですね。
avgの方が割合多めにしてるけど、こうでもしないとAIは「最短経路で目的を達成しようとする」(逆ジャイロ)。このバランスでだいたいうまくいく。
100epochだと規模によってはちょっと足りないかもくらい。accは0.998とかまでは行くんだけど、ロスは「俺まだいけます、まだ下がれます」って言ってたので。まあでも十分っすね。
推論
いちいちcmdで呼び出すのはだるいし、一括で処理したいこともあんまりないのでgradioでやれるようにした。CLIに変換するのは簡単だしね。
import os
import json
import torch
import numpy as np
import librosa
import gradio as gr
from pydub import AudioSegment
import tempfile
import shutil
# プロジェクト内のモジュール
import param
from model_panns import PANNsSED
# グローバル変数でモデルを保持
_MODEL = None
_DEVICE = None
def get_device():
if torch.cuda.is_available():
device = "cuda"
print(f"using GPU: {device}")
else:
device = torch.device('cpu')
print("CPU mode")
return device
def get_model():
"""モデルのシングルトンロード"""
global _MODEL, _DEVICE
if _MODEL is None:
if not os.path.exists(param.MODEL_PATH):
raise FileNotFoundError(f"Model not found: {param.MODEL_PATH}. Run train_sed.py first.")
_DEVICE = get_device()
print(f"Using device: {_DEVICE}")
_MODEL = PANNsSED()
state_dict = torch.load(param.MODEL_PATH, map_location='cpu', weights_only=False)
_MODEL.load_state_dict(state_dict)
_MODEL.to(_DEVICE)
_MODEL.eval()
return _MODEL, _DEVICE
def detect_audio_events(audio_path, threshold, min_duration, smooth_ms, max_vol_ratio, progress=gr.Progress()):
model, device = get_model()
# 原音(y)と推論用ノーマライズ(y_norm)を準備
y, sr = librosa.load(audio_path, sr=param.SR, mono=True)
max_vol = np.max(np.abs(y))
y_norm = y / max_vol if max_vol > 0 else y
total_duration = len(y) / param.SR
total_samples = len(y)
# 極端に短いファイルのハンドリング
min_required_samples = int(param.SR * 0.5)
if total_samples < min_required_samples:
y_norm = np.pad(y_norm, (0, min_required_samples - total_samples))
total_samples = len(y_norm)
chunk_sec = 30.0
chunk_samples = int(chunk_sec * param.SR)
all_scores = []
pos = 0
# 推論ループ
while pos < total_samples:
chunk = y_norm[pos:pos + chunk_samples]
if len(chunk) < min_required_samples:
chunk = np.pad(chunk, (0, min_required_samples - len(chunk)))
batch = torch.from_numpy(chunk[np.newaxis].astype(np.float32)).to(device)
with torch.no_grad():
frame_scores = model.forward_frames(batch)
frame_scores = frame_scores[0].cpu().numpy()
actual_chunk_len_in_frames = int(len(y_norm[pos:pos + chunk_samples]) / param.HOP_SIZE)
frame_scores = frame_scores[:max(1, actual_chunk_len_in_frames)]
all_scores.append(frame_scores)
pos += chunk_samples
current_time = min(pos, total_samples) / param.SR
progress(current_time / total_duration, desc=f"Analyzing... {current_time:.1f}s / {total_duration:.1f}s")
all_scores = np.concatenate(all_scores, axis=0)
total_frames = len(all_scores)
actual_frame_duration = total_duration / total_frames
frame_times = np.arange(total_frames) * actual_frame_duration
# スムージング
smooth_frames = max(1, int(smooth_ms / 1000 / actual_frame_duration))
if smooth_frames > 1 and total_frames > smooth_frames:
kernel = np.ones(smooth_frames) / smooth_frames
smoothed = np.stack(
[np.convolve(all_scores[:, i], kernel, mode='same')
for i in range(param.NUM_CLASSES)], axis=1)
else:
smoothed = all_scores
# --- ステップ1: 全ての候補イベントを抽出 (RMS付) ---
candidate_events = []
target_class = param.CLASSES[0]
for i, class_name in enumerate(param.CLASSES):
prob = smoothed[:, i]
binary = (prob > threshold).astype(int)
diff = np.diff(np.pad(binary, (1, 1), 'constant'))
starts = np.where(diff == 1)[0]
ends = np.where(diff == -1)[0]
for s, e in zip(starts, ends):
s_idx = min(s, total_frames - 1)
e_idx = min(e, total_frames - 1)
t_start = float(frame_times[s_idx])
t_end = float(frame_times[e_idx])
duration = t_end - t_start
if duration >= min_duration:
# この区間の音量(RMS)を計算(ノーマライズ前ではなく原音 y で計算)
samp_start = max(0, int(t_start * param.SR))
samp_end = min(len(y), int(t_end * param.SR))
seg_rms = 0.0
if samp_end > samp_start:
seg = y[samp_start:samp_end]
seg_rms = np.sqrt(np.mean(seg**2))
score = float(np.mean(prob[s_idx:e_idx]))
candidate_events.append({
"start": round(t_start, 3),
"end": round(t_end, 3),
"label": class_name,
"score": round(score, 3),
"duration": round(duration, 3),
"rms": seg_rms
})
# --- ステップ2: Class 0 の平均音量に基づいたフィルタリング ---
# Class 0 のイベントのみを取り出す
class0_events = [e for e in candidate_events if e["label"] == target_class]
if not class0_events:
return []
# 有効な音量を持つイベントから平均RMSを算出
rms_values = [e["rms"] for e in class0_events if e["rms"] > 0]
if rms_values:
avg_class0_rms = np.mean(rms_values)
# 閾値:Class 0の平均音量 × ユーザー指定倍率
dynamic_limit = avg_class0_rms * max_vol_ratio
else:
dynamic_limit = float('inf')
# 最終的なイベントリストを作成
final_events = []
for ev in candidate_events:
# Class 0 かつ、平均よりデカすぎるものは除外
if ev["label"] == target_class:
if ev["rms"] > dynamic_limit:
continue
# 不要な一時データ(rms)を削除して追加
filtered_ev = {k: v for k, v in ev.items() if k != "rms"}
final_events.append(filtered_ev)
final_events.sort(key=lambda x: x["start"])
return final_events
def process_audio(audio_file, threshold, min_dur, smooth_ms, max_vol_ratio):
"""
Gradioのメイン処理関数
"""
if audio_file is None:
return None, None, "No audio file uploaded."
try:
# 1. イベント検出
events = detect_audio_events(audio_file, threshold, min_dur, smooth_ms, max_vol_ratio)
# 2. クラス0抽出
target_class = param.CLASSES[0]
class0_events = [e for e in events if e["label"] == target_class]
# 結果サマリー
summary_text = {
"target_class": target_class,
"total_events_found": len(events),
"target_events_count": len(class0_events),
"total_duration_sec": round(sum(e["duration"] for e in class0_events), 3),
"events": class0_events
}
if not class0_events:
return None, None, json.dumps(summary_text, indent=2, ensure_ascii=False)
# 3. 音声の結合処理
original_audio = AudioSegment.from_file(audio_file)
merged_audio = AudioSegment.empty()
for ev in class0_events:
start_ms = int(ev["start"] * 1000)
end_ms = int(ev["end"] * 1000)
merged_audio += original_audio[start_ms:end_ms]
# ---------------------------------------------------------
# A. プレビュー用WAV出力 (Gradio Player用)
# ---------------------------------------------------------
fd_wav, wav_path = tempfile.mkstemp(suffix=".wav")
os.close(fd_wav)
merged_audio.export(wav_path, format="wav")
# ---------------------------------------------------------
# B. ダウンロード用MP3出力 (ファイル名・ビットレート保持)
# ---------------------------------------------------------
# 元のファイル名を取得して "_merged.mp3" を付与
original_basename = os.path.basename(audio_file)
# Gradioの一時ファイル名はランダムなことがあるので、元の名前が復元できない場合は汎用名にする
# ただしGradioのfilepath型は通常元の拡張子などを保持しようとする
stem = os.path.splitext(original_basename)[0]
out_mp3_name = f"{stem}__{target_class}__merged.mp3"
out_mp3_path = os.path.join(tempfile.gettempdir(), out_mp3_name)
# ビットレート取得ロジック (class0_only.py と同様)
bitrate_str = "192k" # デフォルト
try:
from mutagen.mp3 import MP3
audio_info = MP3(audio_file)
# mutagenで取得できるbitrateはbps単位なので /1000 する
br = int(audio_info.info.bitrate / 1000)
bitrate_str = f"{br}k"
print(f"Detected bitrate: {bitrate_str}")
except Exception:
# MP3でない、または読み取れない場合はデフォルトを使用
pass
print(f"Exporting MP3 to: {out_mp3_path} ({bitrate_str})")
merged_audio.export(out_mp3_path, format="mp3", bitrate=bitrate_str)
return wav_path, out_mp3_path, json.dumps(summary_text, indent=2, ensure_ascii=False)
except Exception as e:
import traceback
traceback.print_exc()
return None, None, f"Error: {str(e)}"
# ==========================================
# UI構築
# ==========================================
target_class_name = param.CLASSES[0] if param.CLASSES else "Unknown"
custom_css = """
/* InputとOutputの両方で、波形表示の高さを強制的に広げる */
#input-audio-player #waveform, #output-audio-player #waveform {
height: 120px !important;
}
#input-audio-player canvas, #output-audio-player canvas {
height: 120px !important;
}
::-webkit-scrollbar {
height: 8px !important;
}
"""
with gr.Blocks(title=f"SpecificAudioDetector ({target_class_name})", css=custom_css) as demo:
gr.Markdown(f"# Specific Audio Detector: {target_class_name} Extractor")
with gr.Row():
with gr.Column():
input_audio = gr.Audio(
label="Input Audio",
type="filepath",
elem_id="input-audio-player"
)
with gr.Accordion("Parameters (Advanced)", open=True):
threshold_slider = gr.Slider(
minimum=0.001, maximum=0.99, value=0.006, step=0.001,
label="Threshold"
)
min_dur_slider = gr.Slider(
minimum=0.0, maximum=2.0, value=0.2, step=0.05,
label="Min Duration (s)"
)
smooth_slider = gr.Slider(
minimum=0, maximum=1000, value=200, step=50,
label="Smoothing (ms)"
)
max_vol_slider = gr.Slider(
minimum=1.0, maximum=10.0, value=1.5, step=0.1,
label="Max Volume Ratio"
)
run_btn = gr.Button("Extract Class 0 Audio", variant="primary")
with gr.Column():
# プレビュー用 (WAVで再生)
output_player = gr.Audio(
label="Preview (WAV)",
type="filepath",
elem_id="output-audio-player",
interactive=False
)
# ダウンロード用 (MP3ファイル)
gr.Markdown("### ↓ Download MP3 ↓")
# ここがダウンロードボタンになります(処理完了後にファイルが表示されます)
output_file = gr.File(
label="Merged MP3 File",
file_count="single",
type="filepath",
interactive=False
)
output_json = gr.JSON(label="Detection Details")
run_btn.click(
fn=process_audio,
inputs=[input_audio, threshold_slider, min_dur_slider, smooth_slider, max_vol_slider],
outputs=[output_player, output_file, output_json]
)
if __name__ == "__main__":
demo.queue().launch()
パラメータの調整に一番苦労した。ふつうこの手の推論のスレッショルドって0.5とかが相場なんですが、今回0.01以下が正着です。まさかの。なにか重大な問題がある気もするが、これで動くのでこれでいいのだ。it works。
白眉な実装はMax Volume Ratio。これも色々実験した結果導入したんだけど、音の性質上、外れ値的にうるさいのが入ると””使えなく””なってしまう。

↑こういうのを省きたい
なので、クラス0のラベルが貼られたオーディオの音量平均を取って、乖離してるやつを除外するようにしました。嗅ぐ音がそもそも音量小さくなりがちなので、元オーディオ全体の平均を取るとうまくいかない(1敗)。
これを導入してから一気に性能が上がった感じがある。
ともあれこれで、「フレーム単位で嗅ぐ音がある場所を特定し、出力できる」AIが出来上がったわけです。
こんなインモラルな使い方をしているが、普通に話者認識とかに回しても使えるし、実際その用法でも試したが多分世界一精度が良いので最強です。高度な技術はいつも変態によって作られるのだ。
用法
- 2時間尺とかある同人音声やASMR配信から嗅ぐ音だけを抜き取る
- ???
- profit

記事の感想を伝えられます。
感想レターを書く
定型文を選択
スタンプを選択