Python + CycleGAN-VC2でオリジナルデータにおける音質変換を実装する
Clubhouseの影響で音質系に興味を持ちました.本稿では,Python + CycleGAN-VC2でリジナルデータにおける音質変換を実施します.
今回はGoogle ColabとGoogle Driveを連携させて,notebook形式で実行してます.
Google Colaboratory(以下Google Colab)は、Google社が無料で提供している機械学習の教育や研究用の開発環境です。開発環境はJupyter Notebookに似たインターフェースを持ち、Pythonの主要なライブラリがプリインストールされています。
引用元:Google Colabの使い方 (opens new window)
オリジナルデータでは,近畿大学の西野さんと又吉さんのスピーチからとりました.
全国630店舗以上!もみほぐし・足つぼ・ハンドリフレ・クイックヘッドのリラクゼーション店【りらくる】
# Google Colabのファイル構成
プロジェクトディレクトリはCycleGAN-VC2としています.度々,省略しています. ./cache/JpSpeech/model_checkpoint
CycleGAN-VC2
├── /Pytorch-CycleGAN-VC2
│ ├── /data
│ │ └── JpSpeech
│ │ ├── /train <- 学習データ
│ │ │ ├── /nishino
│ │ │ └── /matayoshi
│ │ ├── /eval <- 評価データ
│ │ │ ├── /nishino
│ │ │ └── /matayoshi
│ │ └── /converted_sound <- 変換したデータ
│ │ ├── /nishino
│ │ └── /matayoshi
│ ├── /cache
│ │ └── JpSpeech
│ │ ├── coded_sps_A_norm.pickle <- 前処理後のデータ
│ │ ├── coded_sps_B_norm.pickle <- 前処理後のデータ
│ │ ├── logf0s_normalization.npz <- 前処理後のデータ
│ │ ├── mcep_normalization.npz <- 前処理後のデータ
│ │ └── /model_checkpoint <- モデルの保存先
│ │ └── _CycleGAN_CheckPoint <- 出力最新モデル
│ ├── train.py
│ ├── test.py
│ ├── download.py
│ └── (省略)
└── Pytorch_CycleGAN_VC2_ORG.ipynb <- 実行用ノートブック
# Google Driveと連携
Google ColabとGoogle Driveを連携させて,gitからTaiChunYen/Pytorch-CycleGAN-VC2 (opens new window)をダウンロードします.
# Google ColabとGoogle Driveを連携
from google.colab import drive
drive.mount('/content/drive')
%%bash
# ディレクトリの移動
cd /content/drive/My\ Drive/CycleGAN-VC2
# gitのダウンロード
# git clone https://github.com/TaiChunYen/Pytorch-CycleGAN-VC2.git
ls
# モジュールのインストール
以下のコード実装で必要なモジュールのインストールをインストールします.
!pip install pyworld
!pip install librosa==0.7.2
!pip install numba==0.48.0
# 学習データの準備(オリジナルデータ)
今回はオリジナルデータとして,西野さんと又吉さんのスピーチをyoutubeからダウンロードして,準備しました.
西野さんと又吉さんのスピーチをwavに変換したのちに,10秒ずつ分割して学習データ・評価データを用意しました.
Google Driveでデータをやりとりする場合は,zipで圧縮したものをアップロードして,Colab上で回答した方がエラーも発生しづらくおすすめです.
以下のコマンドでzipの解凍ができます.
!!unzip data/JpSpeech.zip > /dev/null
# preprocess(前処理)
学習にあたり,以下のコードで前処理を実装します.
音声データwavを学習できるように,numpy形式へ変換します.
!python preprocess_training.py --train_A_dir ./data/JpSpeech/train/matayoshi \
--train_B_dir ./data/JpSpeech/train/nishino \
--cache_folder ./cache/JpSpeech
# train.pyの上書き
train.pyで自分が使いやすいように,変更します.
以下のコードで,train.pyの上書きを実施します.
# train.pyの書き込み
%%writefile train.py
import os
import numpy as np
import argparse
import torch
import time
import librosa
import pickle
import preprocess
from trainingDataset import trainingDataset
from model_VC2 import Generator, Discriminator
class CycleGANTraining:
def __init__(self,
logf0s_normalization,
mcep_normalization,
coded_sps_A_norm,
coded_sps_B_norm,
model_checkpoint,
validation_A_dir,
output_A_dir,
validation_B_dir,
output_B_dir,
restart_training_at=None):
self.start_epoch = 0
self.num_epochs = 10000
self.mini_batch_size = 16
self.dataset_A = self.loadPickleFile(coded_sps_A_norm)
self.dataset_B = self.loadPickleFile(coded_sps_B_norm)
self.device = torch.device(
'cuda' if torch.cuda.is_available() else 'cpu')
# Speech Parameters
logf0s_normalization = np.load(logf0s_normalization)
self.log_f0s_mean_A = logf0s_normalization['mean_A']
self.log_f0s_std_A = logf0s_normalization['std_A']
self.log_f0s_mean_B = logf0s_normalization['mean_B']
self.log_f0s_std_B = logf0s_normalization['std_B']
mcep_normalization = np.load(mcep_normalization)
self.coded_sps_A_mean = mcep_normalization['mean_A']
self.coded_sps_A_std = mcep_normalization['std_A']
self.coded_sps_B_mean = mcep_normalization['mean_B']
self.coded_sps_B_std = mcep_normalization['std_B']
# Generator and Discriminator
self.generator_A2B = Generator().to(self.device)
self.generator_B2A = Generator().to(self.device)
self.discriminator_A = Discriminator().to(self.device)
self.discriminator_B = Discriminator().to(self.device)
# Loss Functions
criterion_mse = torch.nn.MSELoss()
# Optimizer
g_params = list(self.generator_A2B.parameters()) + \
list(self.generator_B2A.parameters())
d_params = list(self.discriminator_A.parameters()) + \
list(self.discriminator_B.parameters())
# Initial learning rates
self.generator_lr = 0.0002
self.discriminator_lr = 0.0001
# Learning rate decay
self.generator_lr_decay = self.generator_lr / 200000
self.discriminator_lr_decay = self.discriminator_lr / 200000
# Starts learning rate decay from after this many iterations have passed
self.start_decay = 12500
self.generator_optimizer = torch.optim.Adam(
g_params, lr=self.generator_lr, betas=(0.5, 0.999))
self.discriminator_optimizer = torch.optim.Adam(
d_params, lr=self.discriminator_lr, betas=(0.5, 0.999))
# To Load save previously saved models
self.modelCheckpoint = model_checkpoint
os.makedirs(model_checkpoint, exist_ok=True)
# Validation set Parameters
self.validation_A_dir = validation_A_dir
self.output_A_dir = output_A_dir
self.validation_B_dir = validation_B_dir
self.output_B_dir = output_B_dir
# Storing Discriminatior and Generator Loss
self.generator_loss_store = []
self.discriminator_loss_store = []
self.file_name = 'log_store_non_sigmoid.txt'
if restart_training_at is not None:
# Training will resume from previous checkpoint
self.start_epoch = self.loadModel(restart_training_at)
print("Training resumed")
def adjust_lr_rate(self, optimizer, name='generator'):
if name == 'generator':
self.generator_lr = max(
0., self.generator_lr - self.generator_lr_decay)
for param_groups in optimizer.param_groups:
param_groups['lr'] = self.generator_lr
else:
self.discriminator_lr = max(
0., self.discriminator_lr - self.discriminator_lr_decay)
for param_groups in optimizer.param_groups:
param_groups['lr'] = self.discriminator_lr
def reset_grad(self):
self.generator_optimizer.zero_grad()
self.discriminator_optimizer.zero_grad()
def train(self):
# Training Begins
for epoch in range(self.start_epoch, self.num_epochs+1):
start_time_epoch = time.time()
# Constants
cycle_loss_lambda = 10
identity_loss_lambda = 5
#if epoch>20:#
#cycle_loss_lambda = 15#
#identity_loss_lambda = 0#
# Preparing Dataset
n_samples = len(self.dataset_A)
dataset = trainingDataset(datasetA=self.dataset_A,
datasetB=self.dataset_B,
n_frames=128)
train_loader = torch.utils.data.DataLoader(dataset=dataset,
batch_size=self.mini_batch_size,
shuffle=True,
drop_last=False)
for i, (real_A, real_B) in enumerate(train_loader):
num_iterations = (
n_samples // self.mini_batch_size) * epoch + i
# print("iteration no: ", num_iterations, epoch)
if num_iterations > 10000:
identity_loss_lambda = 0
if num_iterations > self.start_decay:
self.adjust_lr_rate(
self.generator_optimizer, name='generator')
self.adjust_lr_rate(
self.generator_optimizer, name='discriminator')
real_A = real_A.to(self.device).float()
real_B = real_B.to(self.device).float()
# Generator Loss function
fake_B = self.generator_A2B(real_A)
cycle_A = self.generator_B2A(fake_B)
fake_A = self.generator_B2A(real_B)
cycle_B = self.generator_A2B(fake_A)
identity_A = self.generator_B2A(real_A)
identity_B = self.generator_A2B(real_B)
d_fake_A = self.discriminator_A(fake_A)
d_fake_B = self.discriminator_B(fake_B)
# Generator Cycle loss
cycleLoss = torch.mean(
torch.abs(real_A - cycle_A)) + torch.mean(torch.abs(real_B - cycle_B))
# Generator Identity Loss
identiyLoss = torch.mean(
torch.abs(real_A - identity_A)) + torch.mean(torch.abs(real_B - identity_B))
# Generator Loss
generator_loss_A2B = torch.mean((1 - d_fake_B) ** 2)
generator_loss_B2A = torch.mean((1 - d_fake_A) ** 2)
# Total Generator Loss
generator_loss = generator_loss_A2B + generator_loss_B2A + \
cycle_loss_lambda * cycleLoss + identity_loss_lambda * identiyLoss
self.generator_loss_store.append(generator_loss.item())
# Backprop for Generator
self.reset_grad()
generator_loss.backward()
# if num_iterations > self.start_decay: # Linearly decay learning rate
# self.adjust_lr_rate(
# self.generator_optimizer, name='generator')
self.generator_optimizer.step()
# Discriminator Loss Function
# Discriminator Feed Forward
d_real_A = self.discriminator_A(real_A)
d_real_B = self.discriminator_B(real_B)
generated_A = self.generator_B2A(real_B)
d_fake_A = self.discriminator_A(generated_A)
generated_B = self.generator_A2B(real_A)
d_fake_B = self.discriminator_B(generated_B)
# Loss Functions
d_loss_A_real = torch.mean((1 - d_real_A) ** 2)
d_loss_A_fake = torch.mean((0 - d_fake_A) ** 2)
d_loss_A = (d_loss_A_real + d_loss_A_fake) / 2.0
d_loss_B_real = torch.mean((1 - d_real_B) ** 2)
d_loss_B_fake = torch.mean((0 - d_fake_B) ** 2)
d_loss_B = (d_loss_B_real + d_loss_B_fake) / 2.0
# Final Loss for discriminator
d_loss = (d_loss_A + d_loss_B) / 2.0
self.discriminator_loss_store.append(d_loss.item())
# Backprop for Discriminator
self.reset_grad()
d_loss.backward()
# if num_iterations > self.start_decay: # Linearly decay learning rate
# self.adjust_lr_rate(
# self.discriminator_optimizer, name='discriminator')
self.discriminator_optimizer.step()
if num_iterations % 50 == 0:
store_to_file = "Iter:{}\t Generator Loss:{:.4f} Discrimator Loss:{:.4f} \tGA2B:{:.4f} GB2A:{:.4f} G_id:{:.4f} G_cyc:{:.4f} D_A:{:.4f} D_B:{:.4f}".format(
num_iterations, generator_loss.item(), d_loss.item(), generator_loss_A2B, generator_loss_B2A, identiyLoss, cycleLoss, d_loss_A, d_loss_B)
print("Iter:{}\t Generator Loss:{:.4f} Discrimator Loss:{:.4f} \tGA2B:{:.4f} GB2A:{:.4f} G_id:{:.4f} G_cyc:{:.4f} D_A:{:.4f} D_B:{:.4f}".format(
num_iterations, generator_loss.item(), d_loss.item(), generator_loss_A2B, generator_loss_B2A, identiyLoss, cycleLoss, d_loss_A, d_loss_B))
self.store_to_file(store_to_file)
end_time = time.time()
store_to_file = "Epoch: {} Generator Loss: {:.4f} Discriminator Loss: {}, Time: {:.2f}\n\n".format(
epoch, generator_loss.item(), d_loss.item(), end_time - start_time_epoch)
self.store_to_file(store_to_file)
print("Epoch: {} Generator Loss: {:.4f} Discriminator Loss: {}, Time: {:.2f}\n\n".format(
epoch, generator_loss.item(), d_loss.item(), end_time - start_time_epoch))
if epoch % 100 == 0 and epoch != 0:
# Save the Entire model
print("Saving model Checkpoint ......")
store_to_file = "Saving model Checkpoint ......"
self.store_to_file(store_to_file)
self.saveModelCheckPoint(epoch, '{}'.format(
self.modelCheckpoint + '_CycleGAN_CheckPoint'))
if epoch % 500 == 0 and epoch != 0:
self.saveModelCheckPoint(epoch, '{}'.format(
self.modelCheckpoint + '_CycleGAN_CheckPoint' + str(epoch)))
print("Model Saved!")
if epoch % 100 == 0 and epoch != 0:
# Validation Set
validation_start_time = time.time()
self.validation_for_A_dir()
self.validation_for_B_dir()
validation_end_time = time.time()
store_to_file = "Time taken for validation Set: {}".format(
validation_end_time - validation_start_time)
self.store_to_file(store_to_file)
print("Time taken for validation Set: {}".format(
validation_end_time - validation_start_time))
def validation_for_A_dir(self):
num_mcep = 24
sampling_rate = 16000
frame_period = 5.0
n_frames = 128
validation_A_dir = self.validation_A_dir
output_A_dir = self.output_A_dir
print("Generating Validation Data B from A...")
for file in os.listdir(validation_A_dir):
filePath = os.path.join(validation_A_dir, file)
wav, _ = librosa.load(filePath, sr=sampling_rate, mono=True)
wav = preprocess.wav_padding(wav=wav,
sr=sampling_rate,
frame_period=frame_period,
multiple=4)
f0, timeaxis, sp, ap = preprocess.world_decompose(
wav=wav, fs=sampling_rate, frame_period=frame_period)
f0_converted = preprocess.pitch_conversion(f0=f0,
mean_log_src=self.log_f0s_mean_A,
std_log_src=self.log_f0s_std_A,
mean_log_target=self.log_f0s_mean_B,
std_log_target=self.log_f0s_std_B)
coded_sp = preprocess.world_encode_spectral_envelop(
sp=sp, fs=sampling_rate, dim=num_mcep)
coded_sp_transposed = coded_sp.T
coded_sp_norm = (coded_sp_transposed -
self.coded_sps_A_mean) / self.coded_sps_A_std
coded_sp_norm = np.array([coded_sp_norm])
if torch.cuda.is_available():
coded_sp_norm = torch.from_numpy(coded_sp_norm).cuda().float()
else:
coded_sp_norm = torch.from_numpy(coded_sp_norm).float()
coded_sp_converted_norm = self.generator_A2B(coded_sp_norm)
coded_sp_converted_norm = coded_sp_converted_norm.cpu().detach().numpy()
coded_sp_converted_norm = np.squeeze(coded_sp_converted_norm)
coded_sp_converted = coded_sp_converted_norm * \
self.coded_sps_B_std + self.coded_sps_B_mean
coded_sp_converted = coded_sp_converted.T
coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
decoded_sp_converted = preprocess.world_decode_spectral_envelop(
coded_sp=coded_sp_converted, fs=sampling_rate)
wav_transformed = preprocess.world_speech_synthesis(f0=f0_converted,
decoded_sp=decoded_sp_converted,
ap=ap,
fs=sampling_rate,
frame_period=frame_period)
librosa.output.write_wav(path=os.path.join(output_A_dir, os.path.basename(file)),
y=wav_transformed,
sr=sampling_rate)
def validation_for_B_dir(self):
num_mcep = 24
sampling_rate = 16000
frame_period = 5.0
n_frames = 128
validation_B_dir = self.validation_B_dir
output_B_dir = self.output_B_dir
print("Generating Validation Data A from B...")
for file in os.listdir(validation_B_dir):
filePath = os.path.join(validation_B_dir, file)
wav, _ = librosa.load(filePath, sr=sampling_rate, mono=True)
wav = preprocess.wav_padding(wav=wav,
sr=sampling_rate,
frame_period=frame_period,
multiple=4)
f0, timeaxis, sp, ap = preprocess.world_decompose(
wav=wav, fs=sampling_rate, frame_period=frame_period)
f0_converted = preprocess.pitch_conversion(f0=f0,
mean_log_src=self.log_f0s_mean_B,
std_log_src=self.log_f0s_std_B,
mean_log_target=self.log_f0s_mean_A,
std_log_target=self.log_f0s_std_A)
coded_sp = preprocess.world_encode_spectral_envelop(
sp=sp, fs=sampling_rate, dim=num_mcep)
coded_sp_transposed = coded_sp.T
coded_sp_norm = (coded_sp_transposed -
self.coded_sps_B_mean) / self.coded_sps_B_std
coded_sp_norm = np.array([coded_sp_norm])
if torch.cuda.is_available():
coded_sp_norm = torch.from_numpy(coded_sp_norm).cuda().float()
else:
coded_sp_norm = torch.from_numpy(coded_sp_norm).float()
coded_sp_converted_norm = self.generator_B2A(coded_sp_norm)
coded_sp_converted_norm = coded_sp_converted_norm.cpu().detach().numpy()
coded_sp_converted_norm = np.squeeze(coded_sp_converted_norm)
coded_sp_converted = coded_sp_converted_norm * \
self.coded_sps_A_std + self.coded_sps_A_mean
coded_sp_converted = coded_sp_converted.T
coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
decoded_sp_converted = preprocess.world_decode_spectral_envelop(
coded_sp=coded_sp_converted, fs=sampling_rate)
wav_transformed = preprocess.world_speech_synthesis(f0=f0_converted,
decoded_sp=decoded_sp_converted,
ap=ap,
fs=sampling_rate,
frame_period=frame_period)
librosa.output.write_wav(path=os.path.join(output_B_dir, os.path.basename(file)),
y=wav_transformed,
sr=sampling_rate)
def savePickle(self, variable, fileName):
with open(fileName, 'wb') as f:
pickle.dump(variable, f)
def loadPickleFile(self, fileName):
with open(fileName, 'rb') as f:
return pickle.load(f)
def store_to_file(self, doc):
doc = doc + "\n"
with open(self.file_name, "a") as myfile:
myfile.write(doc)
def saveModelCheckPoint(self, epoch, PATH):
torch.save({
'epoch': epoch,
'generator_loss_store': self.generator_loss_store,
'discriminator_loss_store': self.discriminator_loss_store,
'model_genA2B_state_dict': self.generator_A2B.state_dict(),
'model_genB2A_state_dict': self.generator_B2A.state_dict(),
'model_discriminatorA': self.discriminator_A.state_dict(),
'model_discriminatorB': self.discriminator_B.state_dict(),
'generator_optimizer': self.generator_optimizer.state_dict(),
'discriminator_optimizer': self.discriminator_optimizer.state_dict()
}, PATH)
def loadModel(self, PATH):
checkPoint = torch.load(PATH)
self.generator_A2B.load_state_dict(
state_dict=checkPoint['model_genA2B_state_dict'])
self.generator_B2A.load_state_dict(
state_dict=checkPoint['model_genB2A_state_dict'])
self.discriminator_A.load_state_dict(
state_dict=checkPoint['model_discriminatorA'])
self.discriminator_B.load_state_dict(
state_dict=checkPoint['model_discriminatorB'])
self.generator_optimizer.load_state_dict(
state_dict=checkPoint['generator_optimizer'])
self.discriminator_optimizer.load_state_dict(
state_dict=checkPoint['discriminator_optimizer'])
epoch = int(checkPoint['epoch']) + 1
self.generator_loss_store = checkPoint['generator_loss_store']
self.discriminator_loss_store = checkPoint['discriminator_loss_store']
return epoch
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Train CycleGAN using source dataset and target dataset")
logf0s_normalization_default = './cache/logf0s_normalization.npz'
mcep_normalization_default = './cache/mcep_normalization.npz'
coded_sps_A_norm = './cache/coded_sps_A_norm.pickle'
coded_sps_B_norm = './cache/coded_sps_B_norm.pickle'
model_checkpoint = './cache/model_checkpoint/'
# resume_training_at = './cache/model_checkpoint/_CycleGAN_CheckPoint'
resume_training_at = None
validation_A_dir_default = './data/vcc2016_training/evaluation_all/SF1/'
output_A_dir_default = './data/vcc2016_training/converted_sound/SF1'
validation_B_dir_default = './data/vcc2016_training/evaluation_all/TF2/'
output_B_dir_default = './data/vcc2016_training/converted_sound/TF2/'
parser.add_argument('--logf0s_normalization', type=str,
help="Cached location for log f0s normalized", default=logf0s_normalization_default)
parser.add_argument('--mcep_normalization', type=str,
help="Cached location for mcep normalization", default=mcep_normalization_default)
parser.add_argument('--coded_sps_A_norm', type=str,
help="mcep norm for data A", default=coded_sps_A_norm)
parser.add_argument('--coded_sps_B_norm', type=str,
help="mcep norm for data B", default=coded_sps_B_norm)
parser.add_argument('--model_checkpoint', type=str,
help="location where you want to save the model", default=model_checkpoint)
parser.add_argument('--resume_training_at', type=str,
help="Location of the pre-trained model to resume training",
default=resume_training_at)
parser.add_argument('--validation_A_dir', type=str,
help="validation set for sound source A", default=validation_A_dir_default)
parser.add_argument('--output_A_dir', type=str,
help="output for converted Sound Source A", default=output_A_dir_default)
parser.add_argument('--validation_B_dir', type=str,
help="Validation set for sound source B", default=validation_B_dir_default)
parser.add_argument('--output_B_dir', type=str,
help="Output for converted sound Source B", default=output_B_dir_default)
argv = parser.parse_args()
logf0s_normalization = argv.logf0s_normalization
mcep_normalization = argv.mcep_normalization
coded_sps_A_norm = argv.coded_sps_A_norm
coded_sps_B_norm = argv.coded_sps_B_norm
model_checkpoint = argv.model_checkpoint
resume_training_at = argv.resume_training_at
validation_A_dir = argv.validation_A_dir
output_A_dir = argv.output_A_dir
validation_B_dir = argv.validation_B_dir
output_B_dir = argv.output_B_dir
os.makedirs(output_A_dir, exist_ok=True)
os.makedirs(output_B_dir, exist_ok=True)
# Check whether following cached files exists
if not os.path.exists(logf0s_normalization) or not os.path.exists(mcep_normalization):
print(
"Cached files do not exist, please run the program preprocess_training.py first")
cycleGAN = CycleGANTraining(logf0s_normalization=logf0s_normalization,
mcep_normalization=mcep_normalization,
coded_sps_A_norm=coded_sps_A_norm,
coded_sps_B_norm=coded_sps_B_norm,
model_checkpoint=model_checkpoint,
validation_A_dir=validation_A_dir,
output_A_dir=output_A_dir,
validation_B_dir=validation_B_dir,
output_B_dir=output_B_dir,
restart_training_at=resume_training_at)
cycleGAN.train()
# 学習時の設定
途中経過モデルの出力については,モデルの容量が3GBと重いため,「学習最新の重みの更新はこまめに,途中経過の重みの出力はざっくりと」がおすすめです.
例:学習最新の重みの更新は100回ずつ・途中経過の重みの出力は1000回ずつ
# オリジナルデータの学習
以下のコードでサンプルデータの学習を実行します.
ここでは「西野さんから又吉さんの声へ」,または「又吉さんの声から西野さんの声へ」の変換を学習しています.
学習済みモデルを読み込む場合は,--resume_training_atで重みを指定する.
# !python train.py
# --resume_training_at ../cache/model_checkpoint/_CycleGAN_CheckPoint \
# 学習済みモデルの学習再開
!python train.py --logf0s_normalization ./cache/JpSpeech/logf0s_normalization.npz --mcep_normalization ./cache/JpSpeech/mcep_normalization.npz \
--coded_sps_A_norm ./cache/JpSpeech/coded_sps_A_norm.pickle --coded_sps_B_norm ./cache/JpSpeech/coded_sps_B_norm.pickle \
--model_checkpoint ./cache/JpSpeech/model_checkpoint/ --resume_training_at ./cache/JpSpeech/model_checkpoint/_CycleGAN_CheckPoint \
--validation_A_dir ./data/JpSpeech/eval/matayoshi/ --output_A_dir ./data/JpSpeech/converted_sound/matayoshi/ \
--validation_B_dir ./data/JpSpeech/eval/nishino/ --output_B_dir ./data/JpSpeech/converted_sound/nishino/
# test.pyの上書き
test.pyで自分が使いやすいように,変更します.
以下のコードで,test.pyの上書きを実施します.
# test.pyの書き込み
%%writefile test.py
import os
import numpy as np
import argparse
import torch
import time
import librosa
import pickle
import preprocess
from trainingDataset import trainingDataset
from model_VC2 import Generator, Discriminator
class CycleGANTest:
def __init__(self,
logf0s_normalization,
mcep_normalization,
model_checkpoint,
validation_A_dir,
output_A_dir,
validation_B_dir,
output_B_dir):
logf0s_normalization = np.load(logf0s_normalization)
self.log_f0s_mean_A = logf0s_normalization['mean_A']
self.log_f0s_std_A = logf0s_normalization['std_A']
self.log_f0s_mean_B = logf0s_normalization['mean_B']
self.log_f0s_std_B = logf0s_normalization['std_B']
mcep_normalization = np.load(mcep_normalization)
self.coded_sps_A_mean = mcep_normalization['mean_A']
self.coded_sps_A_std = mcep_normalization['std_A']
self.coded_sps_B_mean = mcep_normalization['mean_B']
self.coded_sps_B_std = mcep_normalization['std_B']
self.validation_A_dir = validation_A_dir
self.output_A_dir = output_A_dir
self.validation_B_dir = validation_B_dir
self.output_B_dir = output_B_dir
self.device = torch.device(
'cuda' if torch.cuda.is_available() else 'cpu')
self.generator_A2B = Generator().to(self.device)
self.generator_A2B.eval()
self.generator_B2A = Generator().to(self.device)
self.generator_B2A.eval()
checkPoint = torch.load(model_checkpoint)
self.generator_A2B.load_state_dict(
state_dict=checkPoint['model_genA2B_state_dict'])
self.generator_B2A.load_state_dict(
state_dict=checkPoint['model_genB2A_state_dict'])
print("load model checkpoint finish!")
# A->B
def validation_for_A_dir(self):
num_mcep = 24
sampling_rate = 16000
frame_period = 5.0
n_frames = 128
validation_A_dir = self.validation_A_dir
output_A_dir = self.output_A_dir
print("Generating Test Data B from A...")
for file in os.listdir(validation_A_dir):
filePath = os.path.join(validation_A_dir, file)
wav, _ = librosa.load(filePath, sr=sampling_rate, mono=True)
wav = preprocess.wav_padding(wav=wav,
sr=sampling_rate,
frame_period=frame_period,
multiple=4)
f0, timeaxis, sp, ap = preprocess.world_decompose(
wav=wav, fs=sampling_rate, frame_period=frame_period)
f0_converted = preprocess.pitch_conversion(f0=f0,
mean_log_src=self.log_f0s_mean_B,
std_log_src=self.log_f0s_std_B,
mean_log_target=self.log_f0s_mean_A,
std_log_target=self.log_f0s_std_A)
coded_sp = preprocess.world_encode_spectral_envelop(
sp=sp, fs=sampling_rate, dim=num_mcep)
coded_sp_transposed = coded_sp.T
coded_sp_norm = (coded_sp_transposed -
self.coded_sps_B_mean) / self.coded_sps_B_std
coded_sp_norm = np.array([coded_sp_norm])
if torch.cuda.is_available():
coded_sp_norm = torch.from_numpy(coded_sp_norm).cuda().float()
else:
coded_sp_norm = torch.from_numpy(coded_sp_norm).float()
coded_sp_converted_norm = self.generator_A2B(coded_sp_norm)
coded_sp_converted_norm = coded_sp_converted_norm.cpu().detach().numpy()
coded_sp_converted_norm = np.squeeze(coded_sp_converted_norm)
coded_sp_converted = coded_sp_converted_norm * \
self.coded_sps_B_std + self.coded_sps_B_mean
coded_sp_converted = coded_sp_converted.T
coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
decoded_sp_converted = preprocess.world_decode_spectral_envelop(
coded_sp=coded_sp_converted, fs=sampling_rate)
wav_transformed = preprocess.world_speech_synthesis(f0=f0_converted,
decoded_sp=decoded_sp_converted,
ap=ap,
fs=sampling_rate,
frame_period=frame_period)
librosa.output.write_wav(path=os.path.join(output_A_dir, os.path.basename(file)),
y=wav_transformed,
sr=sampling_rate)
print("finish!")
# B->A
def validation_for_B_dir(self):
num_mcep = 24
sampling_rate = 16000
frame_period = 5.0
n_frames = 128
validation_B_dir = self.validation_B_dir
output_B_dir = self.output_B_dir
print("Generating Test Data A from B...")
for file in os.listdir(validation_B_dir):
filePath = os.path.join(validation_B_dir, file)
wav, _ = librosa.load(filePath, sr=sampling_rate, mono=True)
wav = preprocess.wav_padding(wav=wav,
sr=sampling_rate,
frame_period=frame_period,
multiple=4)
f0, timeaxis, sp, ap = preprocess.world_decompose(
wav=wav, fs=sampling_rate, frame_period=frame_period)
f0_converted = preprocess.pitch_conversion(f0=f0,
mean_log_src=self.log_f0s_mean_B,
std_log_src=self.log_f0s_std_B,
mean_log_target=self.log_f0s_mean_A,
std_log_target=self.log_f0s_std_A)
coded_sp = preprocess.world_encode_spectral_envelop(
sp=sp, fs=sampling_rate, dim=num_mcep)
coded_sp_transposed = coded_sp.T
coded_sp_norm = (coded_sp_transposed -
self.coded_sps_B_mean) / self.coded_sps_B_std
coded_sp_norm = np.array([coded_sp_norm])
if torch.cuda.is_available():
coded_sp_norm = torch.from_numpy(coded_sp_norm).cuda().float()
else:
coded_sp_norm = torch.from_numpy(coded_sp_norm).float()
coded_sp_converted_norm = self.generator_B2A(coded_sp_norm)
coded_sp_converted_norm = coded_sp_converted_norm.cpu().detach().numpy()
coded_sp_converted_norm = np.squeeze(coded_sp_converted_norm)
coded_sp_converted = coded_sp_converted_norm * \
self.coded_sps_A_std + self.coded_sps_A_mean
coded_sp_converted = coded_sp_converted.T
coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
decoded_sp_converted = preprocess.world_decode_spectral_envelop(
coded_sp=coded_sp_converted, fs=sampling_rate)
wav_transformed = preprocess.world_speech_synthesis(f0=f0_converted,
decoded_sp=decoded_sp_converted,
ap=ap,
fs=sampling_rate,
frame_period=frame_period)
librosa.output.write_wav(path=os.path.join(output_B_dir, os.path.basename(file)),
y=wav_transformed,
sr=sampling_rate)
print("finish!")
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Test CycleGAN")
logf0s_normalization_default = '../cache/logf0s_normalization.npz'
mcep_normalization_default = '../cache/mcep_normalization.npz'
# coded_sps_A_norm = '../cache/coded_sps_A_norm.pickle'
# coded_sps_B_norm = '../cache/coded_sps_B_norm.pickle'
model_checkpoint = '../cache/model_checkpoint/'
resume_training_at = '../cache/model_checkpoint/_CycleGAN_CheckPoint'
resume_training_at = None
validation_A_dir_default = None
output_A_dir_default = None
validation_B_dir_default = None
output_B_dir_default = None
parser.add_argument('--logf0s_normalization', type=str,
help="Cached location for log f0s normalized", default=logf0s_normalization_default)
parser.add_argument('--mcep_normalization', type=str,
help="Cached location for mcep normalization", default=mcep_normalization_default)
parser.add_argument('--model_checkpoint', type=str,
help="location where your model", default=model_checkpoint)
parser.add_argument('--test_A_dir', type=str,
help="test set for sound source A", default=validation_A_dir_default)
parser.add_argument('--output_A_dir', type=str,
help="output for converted Sound Source A", default=output_A_dir_default)
parser.add_argument('--test_B_dir', type=str,
help="test set for sound source B", default=validation_B_dir_default)
parser.add_argument('--output_B_dir', type=str,
help="output for converted Sound Source B", default=output_B_dir_default)
argv = parser.parse_args()
logf0s_normalization = argv.logf0s_normalization
mcep_normalization = argv.mcep_normalization
model_checkpoint = argv.model_checkpoint
# Aのデータ
validation_A_dir = argv.test_A_dir
output_A_dir = argv.output_A_dir
# Bのデータ
validation_B_dir = argv.test_B_dir
output_B_dir = argv.output_B_dir
cycleGAN = CycleGANTest(logf0s_normalization=logf0s_normalization,
mcep_normalization=mcep_normalization,
model_checkpoint=model_checkpoint,
validation_A_dir=validation_A_dir,
output_A_dir=output_A_dir,
validation_B_dir=validation_B_dir,
output_B_dir=output_B_dir)
# A->Bへの変換
if validation_A_dir:
os.makedirs(output_A_dir, exist_ok=True)
cycleGAN.validation_for_A_dir()
# B->Aへの変換
if validation_B_dir:
os.makedirs(output_B_dir, exist_ok=True)
cycleGAN.validation_for_B_dir()
# 音質変換の実行
学習(約4日)が終わったので,出力された最終の重み./cache/JpSpeech/model_checkpoint/_CycleGAN_CheckPoint
でtest.pyを実行し,ここでは「西野さんの声から又吉さんの声へ」の音質変換を実施します.
# --test_A_dir ./data/JpSpeech/eval/matayoshi/ --output_A_dir ./data/test_converted_sound/matayoshi/ \
!python test.py --logf0s_normalization ./cache/JpSpeech/logf0s_normalization.npz --mcep_normalization ./cache/JpSpeech/mcep_normalization.npz \
--test_B_dir ./data/JpSpeech/eval/nishino/ --output_B_dir ./data/test_converted_sound/nishino/ \
--model_checkpoint ./cache/JpSpeech/model_checkpoint/_CycleGAN_CheckPoint
変換結果
# まとめ
近畿大のスピーチをサンプルコードで,音質変換を実施しました.
変換は結構ひどかったです.
以下の点を改良すれば,よくなる可能性があります.
- 学習回数をさらに増やす
- データの見直し・質の統一
- 又吉さんのスピーチはBGMとマイクの反響がない
- 西野さんのスピーチは一部にBGMとマイクの反響がある
- モデルの変更
- モデルの設定の変更(バッチサイズ等)
日本語自体が変換するのに難しい可能性もありますので,日本語の音声データを発掘できたら,それでまず変換してみます.
# 参考サイト
jackaduma/CycleGAN-VC2 (opens new window)