acoustic_model/acoustic_model/performance_check.py

488 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
import csv
import subprocess
from collections import Counter
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import acoustic_model_functions as am_func
import convert_xsampa2ipa
import defaultfiles as default
from forced_alignment import pyhtk
## ======================= user define =======================
excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
#lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk')
# procedure
make_htk_dict_files = 0
do_forced_alignment_htk = 0
eval_forced_alignment_htk = 0
make_kaldi_data_files = 0
make_kaldi_lexicon_txt = 0
load_forced_alignment_kaldi = 1
eval_forced_alignment_kaldi = 1
## ======================= add paths =======================
sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
from forced_alignment import convert_phone_set
from forced_alignment import pyhtk
sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
from evaluation import plot_confusion_matrix
## ======================= convert phones ======================
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
xls = pd.ExcelFile(excel_file)
## check conversion
#df = pd.read_excel(xls, 'frequency')
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_)
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
## check phones included in FAME!
# the phones used in the lexicon.
#phonelist = am_func.get_phonelist(lex_asr)
# the lines which include a specific phone.
#lines = am_func.find_phone(lex_asr, 'x')
# Filename, Word, Self Xsampa
df = pd.read_excel(xls, 'original')
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
famehtk = convert_phone_set.ipa2famehtk(ipa)
famehtks.append(famehtk)
else:
ipas.append('')
famehtks.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas),
'famehtk': pd.Series(famehtks)})
# cleansing.
df = df[~df['famehtk'].isin(['/', ''])]
word_list = np.unique(df['word'])
## ======================= make dict files used for HTK. ======================
if make_htk_dict_files:
output_type = 3
for word in word_list:
htk_dict_file = htk_dict_dir + '\\' + word + '.dic'
# pronunciation variant of the target word.
pronvar_ = df['famehtk'][df['word'].str.match(word)]
# make dic file.
am_func.make_htk_dict(word, pronvar_, htk_dict_file, output_type)
## ======================= forced alignment using HTK =======================
if do_forced_alignment_htk:
#for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
for hmm_num in [256, 512, 1024]:
hmm_num_str = str(hmm_num)
acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
for i, filename in enumerate(df['filename']):
print('=== {0}/{1} ==='.format(i, len(df)))
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
word = df['word'][i]
WORD = word.upper()
fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
#if not os.path.exists(fa_file):
# make label file.
label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
with open(label_file, 'w') as f:
lines = f.write(WORD)
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
default.phonelist, acoustic_model)
os.remove(label_file)
prediction = am_func.read_fileFA(fa_file)
print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
else:
prediction = ''
print('!!!!! file not found.')
line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
predictions = predictions.append(line)
else:
prediction = ''
print('!!!!! invalid entry.')
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= make files which is used for forced alignment by Kaldi =======================
if make_kaldi_data_files:
wav_scp = os.path.join(kaldi_data_dir, 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'text')
utt2spk = os.path.join(kaldi_data_dir, 'utt2spk')
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i in df.index:
filename = df['filename'][i]
print('=== {0}: {1} ==='.format(i, filename))
#if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# wav.scp file
wav_file_unix = wav_file.replace('\\', '/')
wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/')
f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix))
# text file
word = df['word'][i].lower()
f_text_file.write('{0}\t{1}\n'.format(utterance_id, word))
# utt2spk
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
## ======================= make lexicon txt which is used by Kaldi =======================
if make_kaldi_lexicon_txt:
option_num = 6
# remove previous file.
if os.path.exists(lexicon_txt):
os.remove(lexicon_txt)
lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt')
if os.path.exists(lexiconp_txt):
os.remove(lexiconp_txt)
# output lexicon.txt
f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n')
pronvar_list_all = []
for word in word_list:
# pronunciation variant of the target word.
pronunciation_variants = df['ipa'][df['word'].str.match(word)]
c = Counter(pronunciation_variants)
total_num = sum(c.values())
#with open(result_dir + '\\' + word + '.csv', 'a', encoding="utf-8", newline='\n') as f:
# for key in c.keys():
# f.write("{0},{1}\n".format(key,c[key]))
for key, value in c.most_common(option_num):
# make possible pronounciation variant list.
pronvar_list = am_func.fame_pronunciation_variant(key)
for pronvar_ in pronvar_list:
split_ipa = convert_phone_set.split_fame_ipa(pronvar_)
pronvar_out = ' '.join(split_ipa)
pronvar_list_all.append([word, pronvar_out])
pronvar_list_all = np.array(pronvar_list_all)
pronvar_list_all = np.unique(pronvar_list_all, axis=0)
# output
f_lexicon_txt.write('<UNK>\tSPN\n')
for line in pronvar_list_all:
f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1]))
f_lexicon_txt.close()
## ======================= load kaldi forced alignment result =======================
if load_forced_alignment_kaldi:
phones_txt = os.path.join(default.kaldi_dir, 'data', 'lang', 'phones.txt')
merged_alignment_txt = os.path.join(default.kaldi_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt')
#filenames = np.load(data_dir + '\\filenames.npy')
#words = np.load(data_dir + '\\words.npy')
#pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy')
#pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy')
#word_list = np.unique(words)
# load the mapping between phones and ids.
with open(phones_txt, 'r', encoding="utf-8") as f:
mapping_phone2id = f.read().split('\n')
phones = []
phone_ids = [] # ID of phones
for m in mapping_phone2id:
m = m.split(' ')
if len(m) > 1:
phones.append(m[0])
phone_ids.append(int(m[1]))
# load the result of FA.
with open(merged_alignment_txt, 'r') as f:
lines = f.read()
lines = lines.split('\n')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
#fa_filenames = []
#fa_pronunciations = []
utterance_id_ = ''
pronunciation = []
for line in lines:
line = line.split(' ')
if len(line) == 5:
utterance_id = line[0]
if utterance_id == utterance_id_:
phone_id = int(line[4])
#if not phone_id == 1:
phone_ = phones[phone_ids.index(phone_id)]
phone = re.sub(r'_[A-Z]', '', phone_)
if not phone == 'SIL':
pronunciation.append(phone)
else:
filename = re.sub(r'speaker_[0-9]{4}-', '', utterance_id_)
prediction = ''.join(pronunciation)
df_ = df[df['filename'].str.match(filename)]
df_idx = df_.index[0]
prediction_ = pd.Series([#filename,
#df_['word'][df_idx],
#df_['xsampa'][df_idx],
#df_['ipa'][df_idx],
#df_['famehtk'][df_idx],
df_.iloc[0,1],
df_.iloc[0,3],
df_.iloc[0,4],
df_.iloc[0,2],
df_.iloc[0,0],
prediction],
index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'],
name=df_idx)
predictions = predictions.append(prediction_)
#fa_filenames.append()
#fa_pronunciations.append(' '.join(pronunciation))
pronunciation = []
utterance_id_ = utterance_id
predictions.to_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
## ======================= evaluate the result of forced alignment =======================
if eval_forced_alignment_htk:
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
compare_hmm_num = 1
if compare_hmm_num:
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n")
for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
#for hmm_num in [256]:
hmm_num_str = str(hmm_num)
if compare_hmm_num:
f_result.write("{},".format(hmm_num_str))
#match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy')
#prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'))
#prediction = pd.Series(prediction, index=df.index, name='prediction')
#result = pd.concat([df, prediction], axis=1)
result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
# load pronunciation variants
for word in word_list:
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
with open(htk_dict_file, 'r') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants = [line.split('\t')[1] for line in lines]
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_ = result_[result_['famehtk'].isin(pronunciation_variants)]
match_num = sum(result_['famehtk'] == result_['prediction'])
total_num = len(result_)
print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100))
if compare_hmm_num:
f_result.write("{0},{1},".format(match_num, total_num))
else:
# output confusion matrix
cm = confusion_matrix(result_['famehtk'], result_['prediction'])
plt.figure()
plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
plt.savefig(result_dir + '\\cm_' + word + '.png')
if compare_hmm_num:
f_result.write('\n')
if compare_hmm_num:
f_result.close()
## ======================= evaluate the result of forced alignment of kaldi =======================
if eval_forced_alignment_kaldi:
result = pd.read_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("word,total,valid,match,[%]\n")
# load pronunciation variants
with open(lexicon_txt, 'r', encoding="utf-8", newline='\n') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants_all = [line.split('\t') for line in lines]
word_list = np.delete(word_list, [0], 0) # remove 'Oog'
for word in word_list:
# load pronunciation variant of the word.
pronunciation_variants = []
for line in pronunciation_variants_all:
if line[0] == word.lower():
pronunciation_variants.append(line[1].replace(' ', ''))
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_tolerant = pd.DataFrame({
'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'prediction': [''],
'match': ['']})
for i in range(0, len(result_)):
line = result_.iloc[i]
# make a list of all possible pronunciation variants of ipa description.
# i.e. possible answers from forced alignment.
ipa = line['ipa']
pronvar_list = [ipa]
pronvar_list_ = am_func.fame_pronunciation_variant(ipa)
if not pronvar_list_ is None:
pronvar_list += list(pronvar_list_)
# only focus on pronunciations which can be estimated from ipa.
if len(set(pronvar_list) & set(pronunciation_variants)) > 0:
if line['prediction'] in pronvar_list:
ismatch = True
else:
ismatch = False
line_df = pd.DataFrame(result_.iloc[i]).T
df_idx = line_df.index[0]
result_tolerant_ = pd.Series([line_df.loc[df_idx, 'filename'],
line_df.loc[df_idx, 'word'],
line_df.loc[df_idx, 'xsampa'],
line_df.loc[df_idx, 'ipa'],
line_df.loc[df_idx, 'prediction'],
ismatch],
index=['filename', 'word', 'xsampa', 'ipa', 'prediction', 'match'],
name=df_idx)
result_tolerant = result_tolerant.append(result_tolerant_)
# remove the first entry (dummy)
result_tolerant = result_tolerant.drop(0, axis=0)
total_num = len(result_)
valid_num = len(result_tolerant)
match_num = np.sum(result_tolerant['match'])
print("word '{0}': {1}/{2} ({3:.2f} %) originally {4}".format(word, match_num, valid_num, match_num/valid_num*100, total_num))
f_result.write("{0},{1},{2},{3},{4}\n".format(word, total_num, valid_num, match_num, match_num/valid_num*100))
f_result.close()
## output confusion matrix
#cm = confusion_matrix(result_['ipa'], result_['prediction'])
#plt.figure()
#plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
#plt.savefig(result_dir + '\\cm_' + word + '.png')