acoustic_model/acoustic_model/performance_check.py

527 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
import csv
import subprocess
from collections import Counter
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import acoustic_model_functions as am_func
import convert_xsampa2ipa
import defaultfiles as default
## ======================= user define =======================
#curr_dir = r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model'
#config_ini = 'config.ini'
#repo_dir = r'C:\Users\Aki\source\repos'
#forced_alignment_module = repo_dir + '\\forced_alignment'
#forced_alignment_module_old = repo_dir + '\\aki_tools'
#ipa_xsampa_converter_dir = repo_dir + '\\ipa-xsama-converter'
#accent_classification_dir = repo_dir + '\\accent_classification\accent_classification'
excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
#experiments_dir = r'C:\OneDrive\Research\rug\experiments'
data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
#csvfile = data_dir + '\\Frisian Variants Picture Task Stimmen.csv'
#wav_dir = os.path.join(default.experiments_dir, 'stimmen', 'wav_44k') # 44.1k
wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
#wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#cygwin_dir = r'C:\cygwin64\home\Aki\acoustic_model'
#lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
#lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk')
from forced_alignment import pyhtk
# procedure
make_dic_files = 0
do_forced_alignment_htk = 0
make_kaldi_data_files = 0
make_kaldi_lexicon_txt = 0
load_forced_alignment_kaldi = 1
eval_forced_alignment = 0
## ======================= add paths =======================
sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
from forced_alignment import convert_phone_set
from forced_alignment import pyhtk
sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
#import pyHTK
from evaluation import plot_confusion_matrix
## ======================= convert phones ======================
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
xls = pd.ExcelFile(excel_file)
## check conversion
#df = pd.read_excel(xls, 'frequency')
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_)
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
## check phones included in FAME!
# the phones used in the lexicon.
#phonelist = am_func.get_phonelist(lex_asr)
# the lines which include a specific phone.
#lines = am_func.find_phone(lex_asr, 'x')
# Filename, Word, Self Xsampa
df = pd.read_excel(xls, 'original')
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
famehtk = convert_phone_set.ipa2famehtk(ipa)
famehtks.append(famehtk)
else:
ipas.append('')
famehtks.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas),
'famehtk': pd.Series(famehtks)})
# cleansing.
df = df[~df['famehtk'].isin(['/', ''])]
word_list = np.unique(df['word'])
## ======================= make dict files used for HTK. ======================
if make_dic_files:
output_type = 3
for word in word_list:
htk_dict_file = htk_dict_dir + '\\' + word + '.dic'
# pronunciation variant of the target word.
pronvar_ = df['famehtk'][df['word'].str.match(word)]
# make dic file.
am_func.make_dic(word, pronvar_, htk_dict_file, output_type)
## ======================= forced alignment using HTK =======================
if do_forced_alignment_htk:
#hmm_num = 2
#for hmm_num in [1]:
#for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
for hmm_num in [256, 512, 1024]:
hmm_num_str = str(hmm_num)
acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
for i, filename in enumerate(df['filename']):
print('=== {0}/{1} ==='.format(i, len(df)))
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
word = df['word'][i]
WORD = word.upper()
fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
#if not os.path.exists(fa_file):
# make label file.
label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
with open(label_file, 'w') as f:
lines = f.write(WORD)
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
default.phonelist, acoustic_model)
os.remove(label_file)
prediction = am_func.read_fileFA(fa_file)
#predictions.append(prediction)
print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
else:
prediction = ''
#predictions.append('')
print('!!!!! file not found.')
line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
predictions = predictions.append(line)
else:
prediction = ''
#predictions.append('')
print('!!!!! invalid entry.')
#predictions = np.array(predictions)
#np.save(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'), predictions)
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= make files which is used for forced alignment by Kaldi =======================
if make_kaldi_data_files:
wav_scp = os.path.join(kaldi_data_dir, 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'text')
utt2spk = os.path.join(kaldi_data_dir, 'utt2spk')
#predictions = []
#file_num_max = len(filenames)
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
#for i in range(0, file_num_max):
#for i in range(400, 410):
for i, filename in enumerate(df['filename']):
#print('=== {0}/{1} ==='.format(i+1, file_num_max))
#filename = filenames[i]
print('=== {0}/{1} ==='.format(i, len(df)))
wav_file = wav_dir + '\\' + filename
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# wav.scp file
wav_file_unix = wav_file.replace('\\', '/')
wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/')
f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix))
# text file
#word = words[i].lower()
word = df['word'][i].lower()
f_text_file.write('{0}\t{1}\n'.format(utterance_id, word))
# utt2spk
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
## ======================= make lexicon txt which is used by Kaldi =======================
if make_kaldi_lexicon_txt:
#lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
option_num = 5
# remove previous file.
if os.path.exists(lexicon_txt):
os.remove(lexicon_txt)
lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt')
if os.path.exists(lexiconp_txt):
os.remove(lexiconp_txt)
#mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', ipa_xsampa_converter_dir)
#with open(csvfile, encoding="utf-8") as fin:
# lines = csv.reader(fin, delimiter=';', lineterminator="\n", skipinitialspace=True)
# next(lines, None) # skip the headers
# filenames = []
# words = []
# pronunciations = []
# p = []
# for line in lines:
# if line[1] is not '' and len(line) > 5:
# filenames.append(line[0])
# words.append(line[1])
# pron_xsampa = line[3]
# pron_ipa = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, pron_xsampa)
# pron_ipa = pron_ipa.replace('ː', ':')
# # adjust to phones used in the acoustic model.
# pronunciations.append(pron_ipa)
## check if all phones are in the phonelist of the acoustic model.
##'y', 'b', 'ɾ', 'u', 'ɔ:', 'ø', 't', 'œ', 'n', 'ɒ', 'ɐ', 'f', 'o', 'k', 'x', 'ɡ', 'v', 's', 'ɛ:', 'ɪ:', 'ɑ', 'ɛ', 'a', 'd', 'z', 'ɪ', 'ɔ', 'l', 'i:', 'm', 'p', 'a:', 'i', 'e', 'j', 'o:', 'ʁ', 'h', ':', 'e:', 'ə', 'æ', 'χ', 'w', 'r', 'ə:', 'sp', 'ʊ', 'u:', 'ŋ'
#filenames = np.array(filenames)
#words = np.array(words)
#wordlist = np.unique(words)
#pronunciations = np.array(pronunciations)
# output lexicon.txt
f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n')
pronvar_list_all = []
for word in word_list:
# pronunciation variant of the target word.
#pronvar_ = pronunciations[words == word]
pronunciation_variants = df['ipa'][df['word'].str.match(word)]
#pronunciation_variants = np.unique(pronunciation_variants)
# remove ''
#pronvar_ = np.delete(pronvar_, np.where(pronvar_==''))
c = Counter(pronunciation_variants)
total_num = sum(c.values())
for key, value in c.most_common(option_num):
#print('{0}\t{1}\t{2}\t{3}'.format(word, key, value, total_num))
key = key.replace('æ', 'ɛ')
key = key.replace('ɐ', 'a')
key = key.replace('ɑ', 'a')
key = key.replace('ɾ', 'r')
key = key.replace('ɹ', 'r') # ???
key = key.replace('ʁ', 'r')
key = key.replace('ʀ', 'r') # ???
key = key.replace('ʊ', 'u')
key = key.replace('χ', 'x')
#print('-->{0}\t{1}\t{2}\t{3}\n'.format(word, key, value, total_num))
# make possible pronounciation variant list.
pronvar_list = [key]
while 'ø:' in ' '.join(pronvar_list) or 'œ' in ' '.join(pronvar_list) or 'ɒ' in ' '.join(pronvar_list):
pronvar_list_ = []
for p in pronvar_list:
if 'ø:' in p:
pronvar_list_.append(p.replace('ø:', 'ö'))
pronvar_list_.append(p.replace('ø:', 'ö:'))
if 'œ' in p:
pronvar_list_.append(p.replace('œ', 'ɔ̈'))
pronvar_list_.append(p.replace('œ', 'ɔ̈:'))
if 'ɒ' in p:
pronvar_list_.append(p.replace('ɒ', 'ɔ̈'))
pronvar_list_.append(p.replace('ɒ', 'ɔ̈:'))
pronvar_list = np.unique(pronvar_list_)
for pronvar_ in pronvar_list:
split_ipa = convert_phone_set.split_fame_ipa(pronvar_)
pronvar_out = ' '.join(split_ipa)
pronvar_list_all.append([word, pronvar_out])
# output
pronvar_list_all = np.array(pronvar_list_all)
pronvar_list_all = np.unique(pronvar_list_all, axis=0)
f_lexicon_txt.write('<UNK>\tSPN\n')
for line in pronvar_list_all:
f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1]))
f_lexicon_txt.close()
## ======================= load kaldi forced alignment result =======================
if load_forced_alignment_kaldi:
kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5'
phones_txt = os.path.join(kaldi_work_dir, 'data', 'lang', 'phones.txt')
merged_alignment_txt = os.path.join(kaldi_work_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt')
#filenames = np.load(data_dir + '\\filenames.npy')
#words = np.load(data_dir + '\\words.npy')
#pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy')
#pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy')
#word_list = np.unique(words)
# load the mapping between phones and ids.
with open(phones_txt, 'r', encoding="utf-8") as f:
mappings = f.read().split('\n')
phones = []
phone_ids = []
for m in mappings:
m = m.split(' ')
if len(m) > 1:
phones.append(m[0])
phone_ids.append(int(m[1]))
with open(merged_alignment_txt, 'r') as f:
lines = f.read()
lines = lines.split('\n')
fa_filenames = []
fa_pronunciations = []
filename_ = ''
pron = []
for line in lines:
line = line.split(' ')
if len(line) == 5:
filename = line[0]
if filename == filename_:
phone_id = int(line[4])
#if not phone_id == 1:
phone = phones[phone_ids.index(phone_id)]
pron_ = re.sub(r'_[A-Z]', '', phone)
if not pron_ == 'SIL':
pron.append(pron_)
else:
fa_filenames.append(re.sub(r'speaker_[0-9]{4}-', '', filename))
fa_pronunciations.append(' '.join(pron))
pron = []
filename_ = filename
# correct or not.
#for filename, fa_pronunciation in zip(fa_filenames, fa_pronunciations):
# predictions = pd.DataFrame({'filename': [''],
# 'word': [''],
# 'xsampa': [''],
# 'ipa': [''],
# 'famehtk': [''],
# 'prediction': ['']})
# for i, filename in enumerate(df['filename']):
# print('=== {0}/{1} ==='.format(i, len(df)))
# if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
# wav_file = os.path.join(wav_dir, filename)
# if os.path.exists(wav_file):
# word = df['word'][i]
# WORD = word.upper()
# fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
# #if not os.path.exists(fa_file):
# # make label file.
# label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
# with open(label_file, 'w') as f:
# lines = f.write(WORD)
# htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
# pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
# default.phonelist, acoustic_model)
# os.remove(label_file)
# prediction = am_func.read_fileFA(fa_file)
# #predictions.append(prediction)
# print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
# else:
# prediction = ''
# #predictions.append('')
# print('!!!!! file not found.')
# line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
# predictions = predictions.append(line)
# else:
# prediction = ''
# #predictions.append('')
# print('!!!!! invalid entry.')
# #predictions = np.array(predictions)
# #np.save(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'), predictions)
# predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= evaluate the result of forced alignment =======================
if eval_forced_alignment:
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
compare_hmm_num = 1
if compare_hmm_num:
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n")
for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
#for hmm_num in [256]:
hmm_num_str = str(hmm_num)
if compare_hmm_num:
f_result.write("{},".format(hmm_num_str))
#match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy')
#prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'))
#prediction = pd.Series(prediction, index=df.index, name='prediction')
#result = pd.concat([df, prediction], axis=1)
result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
# load pronunciation variants
for word in word_list:
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
with open(htk_dict_file, 'r') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants = [line.split('\t')[1] for line in lines]
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_ = result_[result_['famehtk'].isin(pronunciation_variants)]
match_num = sum(result_['famehtk'] == result_['prediction'])
total_num = len(result_)
print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100))
if compare_hmm_num:
f_result.write("{0},{1},".format(match_num, total_num))
else:
# output confusion matrix
cm = confusion_matrix(result_['famehtk'], result_['prediction'])
plt.figure()
plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
plt.savefig(result_dir + '\\cm_' + word + '.png')
if compare_hmm_num:
f_result.write('\n')
if compare_hmm_num:
f_result.close()