acoustic_model/acoustic_model/htk_vs_kaldi.py

516 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
#import csv
#import subprocess
#from collections import Counter
#import re
import shutil
import glob
#import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
#from sklearn.metrics import confusion_matrix
#import acoustic_model_functions as am_func
#import convert_xsampa2ipa
import defaultfiles as default
#from forced_alignment import pyhtk
#sys.path.append(default.forced_alignment_module_dir)
#from forced_alignment import convert_phone_set
#import acoustic_model_functions as am_func
import convert_xsampa2ipa
import stimmen_functions
import fame_functions
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
## ======================= user define =======================
#excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
#data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
#wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
#acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
#htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
#fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
#result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
#kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
#kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
#lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
#lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk')
## procedure
#make_htk_dict_files = 0
#do_forced_alignment_htk = 0
#eval_forced_alignment_htk = 0
#make_kaldi_data_files = 0
#make_kaldi_lexicon_txt = 0
#load_forced_alignment_kaldi = 1
#eval_forced_alignment_kaldi = 1
#sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
#from forced_alignment import convert_phone_set
#from forced_alignment import pyhtk
#sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
#from evaluation import plot_confusion_matrix
config_dir = os.path.join(default.htk_dir, 'config')
model_dir = os.path.join(default.htk_dir, 'model')
lattice_file = os.path.join(config_dir, 'stimmen.ltc')
#pyhtk.create_word_lattice_file(
# os.path.join(config_dir, 'stimmen.net'),
# lattice_file)
hvite_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test.scp')
## ======================= make test data ======================
# copy wav files which is in the stimmen data.
stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
fh.make_filelist(stimmen_test_dir, hvite_scp, file_type='wav')
df = stimmen_functions.load_transcriptions()
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
# after manually removed files which does not contain clear sound,
# update df as df_test.
wav_file_list = glob.glob(os.path.join(stimmen_test_dir, '*.wav'))
df_test = pd.DataFrame(index=[], columns=list(df.keys()))
for wav_file in wav_file_list:
filename = os.path.basename(wav_file)
df_ = df[df['filename'].str.match(filename)]
df_test = pd.concat([df_test, df_])
#output = pyhtk.recognition(
# os.path.join(default.htk_dir, 'config', 'config.rec',
# lattice_file,
# os.path.join(model_dir, 'hmm1', 'iter13'),
# dictionary_file,
# os.path.join(config_dir, 'phonelist.txt'),
# hvite_scp)
htk = [fame_functions.ipa2htk(ipa) for ipa in df['ipa']]
ipa = 'e:χ'
fame_functions.ipa2htk(ipa)
# Filename, Word, Self Xsampa
df = pd.read_excel(xls, 'original')
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
famehtk = convert_phone_set.ipa2famehtk(ipa)
famehtks.append(famehtk)
else:
ipas.append('')
famehtks.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas),
'famehtk': pd.Series(famehtks)})
# cleansing.
df = df[~df['famehtk'].isin(['/', ''])]
word_list = np.unique(df['word'])
## ======================= make dict files used for HTK. ======================
if make_htk_dict_files:
output_type = 3
for word in word_list:
htk_dict_file = htk_dict_dir + '\\' + word + '.dic'
# pronunciation variant of the target word.
pronvar_ = df['famehtk'][df['word'].str.match(word)]
# make dic file.
am_func.make_htk_dict(word, pronvar_, htk_dict_file, output_type)
## ======================= forced alignment using HTK =======================
if do_forced_alignment_htk:
#for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
for hmm_num in [256, 512, 1024]:
hmm_num_str = str(hmm_num)
acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
for i, filename in enumerate(df['filename']):
print('=== {0}/{1} ==='.format(i, len(df)))
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
word = df['word'][i]
WORD = word.upper()
fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
#if not os.path.exists(fa_file):
# make label file.
label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
with open(label_file, 'w') as f:
lines = f.write(WORD)
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
default.phonelist, acoustic_model)
os.remove(label_file)
prediction = am_func.read_fileFA(fa_file)
print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
else:
prediction = ''
print('!!!!! file not found.')
line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
predictions = predictions.append(line)
else:
prediction = ''
print('!!!!! invalid entry.')
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= make files which is used for forced alignment by Kaldi =======================
if make_kaldi_data_files:
wav_scp = os.path.join(kaldi_data_dir, 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'text')
utt2spk = os.path.join(kaldi_data_dir, 'utt2spk')
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i in df.index:
filename = df['filename'][i]
print('=== {0}: {1} ==='.format(i, filename))
#if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# wav.scp file
wav_file_unix = wav_file.replace('\\', '/')
wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/')
f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix))
# text file
word = df['word'][i].lower()
f_text_file.write('{0}\t{1}\n'.format(utterance_id, word))
# utt2spk
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
## ======================= make lexicon txt which is used by Kaldi =======================
if make_kaldi_lexicon_txt:
option_num = 6
# remove previous file.
if os.path.exists(lexicon_txt):
os.remove(lexicon_txt)
lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt')
if os.path.exists(lexiconp_txt):
os.remove(lexiconp_txt)
# output lexicon.txt
f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n')
pronvar_list_all = []
for word in word_list:
# pronunciation variant of the target word.
pronunciation_variants = df['ipa'][df['word'].str.match(word)]
c = Counter(pronunciation_variants)
total_num = sum(c.values())
#with open(result_dir + '\\' + word + '.csv', 'a', encoding="utf-8", newline='\n') as f:
# for key in c.keys():
# f.write("{0},{1}\n".format(key,c[key]))
for key, value in c.most_common(option_num):
# make possible pronunciation variant list.
pronvar_list = am_func.fame_pronunciation_variant(key)
for pronvar_ in pronvar_list:
split_ipa = convert_phone_set.split_fame_ipa(pronvar_)
pronvar_out = ' '.join(split_ipa)
pronvar_list_all.append([word, pronvar_out])
pronvar_list_all = np.array(pronvar_list_all)
pronvar_list_all = np.unique(pronvar_list_all, axis=0)
# output
f_lexicon_txt.write('<UNK>\tSPN\n')
for line in pronvar_list_all:
f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1]))
f_lexicon_txt.close()
## ======================= load kaldi forced alignment result =======================
if load_forced_alignment_kaldi:
phones_txt = os.path.join(default.kaldi_dir, 'data', 'lang', 'phones.txt')
merged_alignment_txt = os.path.join(default.kaldi_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt')
#filenames = np.load(data_dir + '\\filenames.npy')
#words = np.load(data_dir + '\\words.npy')
#pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy')
#pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy')
#word_list = np.unique(words)
# load the mapping between phones and ids.
with open(phones_txt, 'r', encoding="utf-8") as f:
mapping_phone2id = f.read().split('\n')
phones = []
phone_ids = [] # ID of phones
for m in mapping_phone2id:
m = m.split(' ')
if len(m) > 1:
phones.append(m[0])
phone_ids.append(int(m[1]))
# load the result of FA.
with open(merged_alignment_txt, 'r') as f:
lines = f.read()
lines = lines.split('\n')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
#fa_filenames = []
#fa_pronunciations = []
utterance_id_ = ''
pronunciation = []
for line in lines:
line = line.split(' ')
if len(line) == 5:
utterance_id = line[0]
if utterance_id == utterance_id_:
phone_id = int(line[4])
#if not phone_id == 1:
phone_ = phones[phone_ids.index(phone_id)]
phone = re.sub(r'_[A-Z]', '', phone_)
if not phone == 'SIL':
pronunciation.append(phone)
else:
filename = re.sub(r'speaker_[0-9]{4}-', '', utterance_id_)
prediction = ''.join(pronunciation)
df_ = df[df['filename'].str.match(filename)]
df_idx = df_.index[0]
prediction_ = pd.Series([#filename,
#df_['word'][df_idx],
#df_['xsampa'][df_idx],
#df_['ipa'][df_idx],
#df_['famehtk'][df_idx],
df_.iloc[0,1],
df_.iloc[0,3],
df_.iloc[0,4],
df_.iloc[0,2],
df_.iloc[0,0],
prediction],
index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'],
name=df_idx)
predictions = predictions.append(prediction_)
#fa_filenames.append()
#fa_pronunciations.append(' '.join(pronunciation))
pronunciation = []
utterance_id_ = utterance_id
predictions.to_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
## ======================= evaluate the result of forced alignment =======================
if eval_forced_alignment_htk:
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
compare_hmm_num = 1
if compare_hmm_num:
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n")
for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
#for hmm_num in [256]:
hmm_num_str = str(hmm_num)
if compare_hmm_num:
f_result.write("{},".format(hmm_num_str))
#match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy')
#prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'))
#prediction = pd.Series(prediction, index=df.index, name='prediction')
#result = pd.concat([df, prediction], axis=1)
result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
# load pronunciation variants
for word in word_list:
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
with open(htk_dict_file, 'r') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants = [line.split('\t')[1] for line in lines]
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_ = result_[result_['famehtk'].isin(pronunciation_variants)]
match_num = sum(result_['famehtk'] == result_['prediction'])
total_num = len(result_)
print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100))
if compare_hmm_num:
f_result.write("{0},{1},".format(match_num, total_num))
else:
# output confusion matrix
cm = confusion_matrix(result_['famehtk'], result_['prediction'])
plt.figure()
plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
plt.savefig(result_dir + '\\cm_' + word + '.png')
if compare_hmm_num:
f_result.write('\n')
if compare_hmm_num:
f_result.close()
## ======================= evaluate the result of forced alignment of kaldi =======================
if eval_forced_alignment_kaldi:
result = pd.read_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("word,total,valid,match,[%]\n")
# load pronunciation variants
with open(lexicon_txt, 'r', encoding="utf-8", newline='\n') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants_all = [line.split('\t') for line in lines]
word_list = np.delete(word_list, [0], 0) # remove 'Oog'
for word in word_list:
# load pronunciation variant of the word.
pronunciation_variants = []
for line in pronunciation_variants_all:
if line[0] == word.lower():
pronunciation_variants.append(line[1].replace(' ', ''))
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_tolerant = pd.DataFrame({
'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'prediction': [''],
'match': ['']})
for i in range(0, len(result_)):
line = result_.iloc[i]
# make a list of all possible pronunciation variants of ipa description.
# i.e. possible answers from forced alignment.
ipa = line['ipa']
pronvar_list = [ipa]
pronvar_list_ = am_func.fame_pronunciation_variant(ipa)
if not pronvar_list_ is None:
pronvar_list += list(pronvar_list_)
# only focus on pronunciations which can be estimated from ipa.
if len(set(pronvar_list) & set(pronunciation_variants)) > 0:
if line['prediction'] in pronvar_list:
ismatch = True
else:
ismatch = False
line_df = pd.DataFrame(result_.iloc[i]).T
df_idx = line_df.index[0]
result_tolerant_ = pd.Series([line_df.loc[df_idx, 'filename'],
line_df.loc[df_idx, 'word'],
line_df.loc[df_idx, 'xsampa'],
line_df.loc[df_idx, 'ipa'],
line_df.loc[df_idx, 'prediction'],
ismatch],
index=['filename', 'word', 'xsampa', 'ipa', 'prediction', 'match'],
name=df_idx)
result_tolerant = result_tolerant.append(result_tolerant_)
# remove the first entry (dummy)
result_tolerant = result_tolerant.drop(0, axis=0)
total_num = len(result_)
valid_num = len(result_tolerant)
match_num = np.sum(result_tolerant['match'])
print("word '{0}': {1}/{2} ({3:.2f} %) originally {4}".format(word, match_num, valid_num, match_num/valid_num*100, total_num))
f_result.write("{0},{1},{2},{3},{4}\n".format(word, total_num, valid_num, match_num, match_num/valid_num*100))
f_result.close()
## output confusion matrix
#cm = confusion_matrix(result_['ipa'], result_['prediction'])
#plt.figure()
#plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
#plt.savefig(result_dir + '\\cm_' + word + '.png')