0%

李宏毅ML2020Spring-HW4

李宏毅2020年春季机器学习课程HW4 语音辨识
学习代码...

任务描述:根据语音辨识说话的人是谁。

数据包括600个人,所有数据都经过了处理,处理成mel-spectrogram。

数据构成:

其中mapping.json内包括speaker2id和id2speaker

metadata.json内是训练数据,n_mels是mel-spectrograms的特征长度,为40。里面还包括了speakers。

speakers里有很多id,对应的是对应人的声音的数据,feature_path是数据地址,mel_len是这个特征包括了多少个mel(每个的特征长度都是40)

testdata.json类似,只不过没有speakers标签。

代码如下:

构建数据集类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence


class myDataset(Dataset):
def __init__(self, data_dir, segment_len=128): # segment_len是把一个数据切成长度只有128,方便一个batch处理
self.data_dir = data_dir
self.segment_len = segment_len

# Load the mapping from speaker neme to their corresponding id.
mapping_path = Path(data_dir) / "mapping.json"
mapping = json.load(mapping_path.open())
self.speaker2id = mapping["speaker2id"]

# Load metadata of training data.
metadata_path = Path(data_dir) / "metadata.json"
metadata = json.load(open(metadata_path))["speakers"]

# Get the total number of speaker.
self.speaker_num = len(metadata.keys())
self.data = []
for speaker in metadata.keys():
for utterances in metadata[speaker]:
self.data.append([utterances["feature_path"], self.speaker2id[speaker]])
# self.data内存放训练数据的地址和标签。

def __len__(self):
return len(self.data)

def __getitem__(self, index):
feat_path, speaker = self.data[index]
# Load preprocessed mel-spectrogram.
mel = torch.load(os.path.join(self.data_dir, feat_path))

# Segmemt mel-spectrogram into "segment_len" frames.
if len(mel) > self.segment_len:
# Randomly get the starting point of the segment.
start = random.randint(0, len(mel) - self.segment_len)
# Get a segment with "segment_len" frames.
mel = torch.FloatTensor(mel[start:start+self.segment_len])
else:
mel = torch.FloatTensor(mel)
# Turn the speaker id into long for computing loss later.
speaker = torch.FloatTensor([speaker]).long()
return mel, speaker

def get_speaker_number(self):
return self.speaker_num

构建DataLoader类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence


def collate_batch(batch): # 整理一个batch的数据
# 输入是batch_size个tuple,每个tuple长相是(feature, label),返回值是两个tensor,features和labels,features的形状是(batch_size, .....),labels的形状是(batch_size, 1)。简而言之就是把一堆tuple转换成两个Tensor,分别为x和y。
mel, speaker = zip(*batch)
# Because we train the model batch by batch, we need to pad the features in the same batch to make their lengths the same.
# 我们需要把一个batch内的数据长度都弄成一样的,否则没办法矩阵运算。
mel = pad_sequence(mel, batch_first=True, padding_value=-20) # pad log 10^(-20) which is very small value
# 如果batch_first是flase的话,会按照rnn一样把batch放到第二维,但这是我们现在不希望的。
# mel: (batch size, length, 40)
return mel, torch.FloatTensor(speaker).long()


def get_dataloader(data_dir, batch_size, n_workers):
"""Generate dataloader"""
dataset = myDataset(data_dir) # 构建数据集
speaker_num = dataset.get_speaker_number()
# Split dataset into training dataset and validation dataset
trainlen = int(0.9 * len(dataset))
lengths = [trainlen, len(dataset) - trainlen]
trainset, validset = random_split(dataset, lengths) # 随机分割数据集

train_loader = DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
num_workers=n_workers,
pin_memory=True,
collate_fn=collate_batch,
)
valid_loader = DataLoader(
validset,
batch_size=batch_size,
num_workers=n_workers,
drop_last=True,
pin_memory=True,
collate_fn=collate_batch,
)

return train_loader, valid_loader, speaker_num

构建网络架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import torch
import torch.nn as nn
import torch.nn.functional as F


class Classifier(nn.Module):
def __init__(self, d_model=80, n_spks=600, dropout=0.1):
super().__init__()
# Project the dimension of features from that of input into d_model.
# 因为一个mel的特征维度是40,我们要先把mel特征投射到d_model维上
self.prenet = nn.Linear(40, d_model)
# Transformer的Encoder层,d_model是QKV的维度,dim_feedforwoard是前馈网络的中间层维度(输出还是d__model维),nhead是几个头
self.encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, dim_feedforward=256, nhead=1
)
# 如果需要多个encoder层
# self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)

# Project the the dimension of features from d_model into speaker nums.
self.pred_layer = nn.Sequential(
nn.Linear(d_model, n_spks),
)

def forward(self, mels):
"""
args:
mels: (batch size, length, 40)
return:
out: (batch size, n_spks)
"""
# out: (batch size, length, d_model)
out = self.prenet(mels)
# out: (length, batch size, d_model)
out = out.permute(1, 0, 2)
# The encoder layer expect features in the shape of (length, batch size, d_model).
# !!!注意
out = self.encoder_layer(out)
# out: (batch size, length, d_model)
out = out.transpose(0, 1)
# mean pooling
stats = out.mean(dim=1)

# out: (batch, n_spks)
out = self.pred_layer(stats)
return out

学习率调整,先warmup再逐渐降低:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR

# LambdaLR内的optimizer是要调整学习率的优化器;lr_lambda是一个函数,输入是参数更新次数,输出是一个系数w,lr=base_lr * w;
# new_lr=lr_lambda(last_epoch) * base_lr,每次执行schedule.step()时,last_epoch=last_epoch+1,当last_epoch=-1时,base_lr为optimizer内的lr
def get_cosine_schedule_with_warmup(
optimizer: Optimizer,
num_warmup_steps: int,
num_training_steps: int,
num_cycles: float = 0.5,
last_epoch: int = -1,
):
def lr_lambda(current_step):
# Warmup
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
# decadence
progress = float(current_step - num_warmup_steps) / float(
max(1, num_training_steps - num_warmup_steps)
)
return max(
0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
)

return LambdaLR(optimizer, lr_lambda, last_epoch)

定义模型运行,输入一个batch的数据,输出损失和准确率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import torch


def model_fn(batch, model, criterion, device):
"""Forward a batch through the model."""

mels, labels = batch
mels = mels.to(device) # 将数据放到gpu中
labels = labels.to(device)

outs = model(mels)

loss = criterion(outs, labels)

# Get the speaker id with highest probability.
preds = outs.argmax(1)
# Compute accuracy.
accuracy = torch.mean((preds == labels).float())

return loss, accuracy

定义模型在验证集上运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from tqdm import tqdm
import torch


def valid(dataloader, model, criterion, device):
model.eval()
running_loss = 0.0
running_accuracy = 0.0
pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr")
# total是总长度,ncols是进度条的列数(宽度),desc是进度条左边的说明, unit是单位

for i, batch in enumerate(dataloader):
with torch.no_grad():
loss, accuracy = model_fn(batch, model, criterion, device)
running_loss += loss.item()
running_accuracy += accuracy.item()

pbar.update(dataloader.batch_size) # 进度条加batch_size
pbar.set_postfix(loss=f"{running_loss / (i+1):.2f}",accuracy=f"{running_accuracy / (i+1):.2f}",) # 这个是在进度条后面显示的,每次可以刷新

pbar.close()
model.train()

return running_accuracy / len(dataloader) # 返回总准确率

开始训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split


def parse_args():
config = {
"data_dir": "../input/ml2021springhw43/Dataset",
"save_path": "./model.ckpt",
"batch_size": 32,
"n_workers": 2,
"valid_steps": 2000,
"warmup_steps": 1000,
"save_steps": 10000,
"total_steps": 70000,
}
# data_dir是数据集地址,save_path是模型保存地址,valid_steps是每隔valid_steps步进行一次验证,warm_steps是预热的参数更新次数,save_step是每次保存参数间隔的参数更新次数,total_steps是模型总共更新这么多次参数
return config


def main(data_dir, save_path, batch_size, n_workers, valid_steps, warmup_steps, total_steps, save_steps):
"""Main function."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[Info]: Use {device} now!")

train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
train_iterator = iter(train_loader)
print(f"[Info]: Finish loading data!",flush = True)

model = Classifier(n_spks=speaker_num).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=1e-3)
scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)
print(f"[Info]: Finish creating model!",flush = True)

best_accuracy = -1.0
best_state_dict = None

pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

for step in range(total_steps):
# Get data
try:
batch = next(train_iterator)
except StopIteration: # 如果train_iterator后面没有数据了,则从头再来
train_iterator = iter(train_loader)
batch = next(train_iterator)

loss, accuracy = model_fn(batch, model, criterion, device) # 用这一batch的数据forward一次
batch_loss = loss.item()
batch_accuracy = accuracy.item()

# Updata model
loss.backward()
optimizer.step()
scheduler.step()
optimizer.zero_grad()

# Log
pbar.update()
pbar.set_postfix(
loss=f"{batch_loss:.2f}",
accuracy=f"{batch_accuracy:.2f}",
step=step + 1,
)

# Do validation
if (step + 1) % valid_steps == 0: # 到了该验证的时刻了
pbar.close()

valid_accuracy = valid(valid_loader, model, criterion, device)

# keep the best model
if valid_accuracy > best_accuracy:
best_accuracy = valid_accuracy
best_state_dict = model.state_dict() # 保存最好的模型参数

pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step") # 再生成一个新的pbar

# Save the best model so far.
if (step + 1) % save_steps == 0 and best_state_dict is not None: # 到了该保存模型的时候了
torch.save(best_state_dict, save_path)
pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

pbar.close()


if __name__ == "__main__":
main(**parse_args())

接下来是在test数据上跑,然后保存结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
import json
import torch
from pathlib import Path
from torch.utils.data import Dataset


class InferenceDataset(Dataset):
def __init__(self, data_dir):
testdata_path = Path(data_dir) / "testdata.json"
metadata = json.load(testdata_path.open())
self.data_dir = data_dir
self.data = metadata["utterances"]

def __len__(self):
return len(self.data)

def __getitem__(self, index):
utterance = self.data[index]
feat_path = utterance["feature_path"]
mel = torch.load(os.path.join(self.data_dir, feat_path))

return feat_path, mel


def inference_collate_batch(batch):
"""Collate a batch of data."""
feat_paths, mels = zip(*batch)

return feat_paths, torch.stack(mels)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import json
import csv
from pathlib import Path
from tqdm.notebook import tqdm

import torch
from torch.utils.data import DataLoader

def parse_args():
config = {
"data_dir": "../input/ml2021springhw43/Dataset",
"model_path": "./model.ckpt",
"output_path": "./output.csv",
}

return config


def main(data_dir, model_path, output_path):
"""Main function."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[Info]: Use {device} now!")

mapping_path = Path(data_dir) / "mapping.json"
mapping = json.load(mapping_path.open())

dataset = InferenceDataset(data_dir)
dataloader = DataLoader(
dataset,
batch_size=1,
shuffle=False,
drop_last=False,
num_workers=2,
collate_fn=inference_collate_batch,
)
print(f"[Info]: Finish loading data!",flush = True)

speaker_num = len(mapping["id2speaker"])
model = Classifier(n_spks=speaker_num).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
print(f"[Info]: Finish creating model!",flush = True)

results = [["Id", "Category"]]
for feat_paths, mels in tqdm(dataloader):
with torch.no_grad():
mels = mels.to(device)
outs = model(mels)
preds = outs.argmax(1).cpu().numpy()
for feat_path, pred in zip(feat_paths, preds):
results.append([feat_path, mapping["id2speaker"][str(pred)]])

with open(output_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(results)


if __name__ == "__main__":
main(**parse_args())
-------------本文结束感谢阅读-------------