import os os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model') import sys import csv import subprocess from collections import Counter import re import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix import acoustic_model_functions as am_func import convert_xsampa2ipa import defaultfiles as default from forced_alignment import pyhtk ## ======================= user define ======================= excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx') data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data') wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model') htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short') fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k') result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result') kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme') kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict') lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') #lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr') #lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk') # procedure make_htk_dict_files = 0 do_forced_alignment_htk = 0 eval_forced_alignment_htk = 0 make_kaldi_data_files = 0 make_kaldi_lexicon_txt = 0 load_forced_alignment_kaldi = 1 eval_forced_alignment_kaldi = 1 ## ======================= add paths ======================= sys.path.append(os.path.join(default.repo_dir, 'forced_alignment')) from forced_alignment import convert_phone_set from forced_alignment import pyhtk sys.path.append(os.path.join(default.repo_dir, 'toolbox')) from evaluation import plot_confusion_matrix ## ======================= convert phones ====================== mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir) xls = pd.ExcelFile(excel_file) ## check conversion #df = pd.read_excel(xls, 'frequency') #for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']): # #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_) # ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa) # if not ipa_converted == ipa: # print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa)) ## check phones included in FAME! # the phones used in the lexicon. #phonelist = am_func.get_phonelist(lex_asr) # the lines which include a specific phone. #lines = am_func.find_phone(lex_asr, 'x') # Filename, Word, Self Xsampa df = pd.read_excel(xls, 'original') ipas = [] famehtks = [] for xsampa in df['Self Xsampa']: if not isinstance(xsampa, float): # 'NaN' # typo? xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t') xsampa = xsampa.replace(';', ':') ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa) ipa = ipa.replace('ː', ':') ipa = ipa.replace(' ', '') ipas.append(ipa) famehtk = convert_phone_set.ipa2famehtk(ipa) famehtks.append(famehtk) else: ipas.append('') famehtks.append('') # extract interesting cols. df = pd.DataFrame({'filename': df['Filename'], 'word': df['Word'], 'xsampa': df['Self Xsampa'], 'ipa': pd.Series(ipas), 'famehtk': pd.Series(famehtks)}) # cleansing. df = df[~df['famehtk'].isin(['/', ''])] word_list = np.unique(df['word']) ## ======================= make dict files used for HTK. ====================== if make_htk_dict_files: output_type = 3 for word in word_list: htk_dict_file = htk_dict_dir + '\\' + word + '.dic' # pronunciation variant of the target word. pronvar_ = df['famehtk'][df['word'].str.match(word)] # make dic file. am_func.make_htk_dict(word, pronvar_, htk_dict_file, output_type) ## ======================= forced alignment using HTK ======================= if do_forced_alignment_htk: #for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]: for hmm_num in [256, 512, 1024]: hmm_num_str = str(hmm_num) acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs') predictions = pd.DataFrame({'filename': [''], 'word': [''], 'xsampa': [''], 'ipa': [''], 'famehtk': [''], 'prediction': ['']}) for i, filename in enumerate(df['filename']): print('=== {0}/{1} ==='.format(i, len(df))) if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)): wav_file = os.path.join(wav_dir, filename) if os.path.exists(wav_file): word = df['word'][i] WORD = word.upper() fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str) #if not os.path.exists(fa_file): # make label file. label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab')) with open(label_file, 'w') as f: lines = f.write(WORD) htk_dict_file = os.path.join(htk_dict_dir, word + '.dic') pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite, default.phonelist, acoustic_model) os.remove(label_file) prediction = am_func.read_fileFA(fa_file) print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction)) else: prediction = '' print('!!!!! file not found.') line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i) predictions = predictions.append(line) else: prediction = '' print('!!!!! invalid entry.') predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl')) ## ======================= make files which is used for forced alignment by Kaldi ======================= if make_kaldi_data_files: wav_scp = os.path.join(kaldi_data_dir, 'wav.scp') text_file = os.path.join(kaldi_data_dir, 'text') utt2spk = os.path.join(kaldi_data_dir, 'utt2spk') # remove previous files. if os.path.exists(wav_scp): os.remove(wav_scp) if os.path.exists(text_file): os.remove(text_file) if os.path.exists(utt2spk): os.remove(utt2spk) f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n') f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n') f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n') # make wav.scp, text, and utt2spk files. for i in df.index: filename = df['filename'][i] print('=== {0}: {1} ==='.format(i, filename)) #if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)): wav_file = os.path.join(wav_dir, filename) if os.path.exists(wav_file): speaker_id = 'speaker_' + str(i).zfill(4) utterance_id = filename.replace('.wav', '') utterance_id = utterance_id.replace(' ', '_') utterance_id = speaker_id + '-' + utterance_id # wav.scp file wav_file_unix = wav_file.replace('\\', '/') wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/') f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix)) # text file word = df['word'][i].lower() f_text_file.write('{0}\t{1}\n'.format(utterance_id, word)) # utt2spk f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id)) f_wav_scp.close() f_text_file.close() f_utt2spk.close() ## ======================= make lexicon txt which is used by Kaldi ======================= if make_kaldi_lexicon_txt: option_num = 6 # remove previous file. if os.path.exists(lexicon_txt): os.remove(lexicon_txt) lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt') if os.path.exists(lexiconp_txt): os.remove(lexiconp_txt) # output lexicon.txt f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n') pronvar_list_all = [] for word in word_list: # pronunciation variant of the target word. pronunciation_variants = df['ipa'][df['word'].str.match(word)] c = Counter(pronunciation_variants) total_num = sum(c.values()) #with open(result_dir + '\\' + word + '.csv', 'a', encoding="utf-8", newline='\n') as f: # for key in c.keys(): # f.write("{0},{1}\n".format(key,c[key])) for key, value in c.most_common(option_num): # make possible pronunciation variant list. pronvar_list = am_func.fame_pronunciation_variant(key) for pronvar_ in pronvar_list: split_ipa = convert_phone_set.split_fame_ipa(pronvar_) pronvar_out = ' '.join(split_ipa) pronvar_list_all.append([word, pronvar_out]) pronvar_list_all = np.array(pronvar_list_all) pronvar_list_all = np.unique(pronvar_list_all, axis=0) # output f_lexicon_txt.write('\tSPN\n') for line in pronvar_list_all: f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1])) f_lexicon_txt.close() ## ======================= load kaldi forced alignment result ======================= if load_forced_alignment_kaldi: phones_txt = os.path.join(default.kaldi_dir, 'data', 'lang', 'phones.txt') merged_alignment_txt = os.path.join(default.kaldi_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt') #filenames = np.load(data_dir + '\\filenames.npy') #words = np.load(data_dir + '\\words.npy') #pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy') #pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy') #word_list = np.unique(words) # load the mapping between phones and ids. with open(phones_txt, 'r', encoding="utf-8") as f: mapping_phone2id = f.read().split('\n') phones = [] phone_ids = [] # ID of phones for m in mapping_phone2id: m = m.split(' ') if len(m) > 1: phones.append(m[0]) phone_ids.append(int(m[1])) # load the result of FA. with open(merged_alignment_txt, 'r') as f: lines = f.read() lines = lines.split('\n') predictions = pd.DataFrame({'filename': [''], 'word': [''], 'xsampa': [''], 'ipa': [''], 'famehtk': [''], 'prediction': ['']}) #fa_filenames = [] #fa_pronunciations = [] utterance_id_ = '' pronunciation = [] for line in lines: line = line.split(' ') if len(line) == 5: utterance_id = line[0] if utterance_id == utterance_id_: phone_id = int(line[4]) #if not phone_id == 1: phone_ = phones[phone_ids.index(phone_id)] phone = re.sub(r'_[A-Z]', '', phone_) if not phone == 'SIL': pronunciation.append(phone) else: filename = re.sub(r'speaker_[0-9]{4}-', '', utterance_id_) prediction = ''.join(pronunciation) df_ = df[df['filename'].str.match(filename)] df_idx = df_.index[0] prediction_ = pd.Series([#filename, #df_['word'][df_idx], #df_['xsampa'][df_idx], #df_['ipa'][df_idx], #df_['famehtk'][df_idx], df_.iloc[0,1], df_.iloc[0,3], df_.iloc[0,4], df_.iloc[0,2], df_.iloc[0,0], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=df_idx) predictions = predictions.append(prediction_) #fa_filenames.append() #fa_pronunciations.append(' '.join(pronunciation)) pronunciation = [] utterance_id_ = utterance_id predictions.to_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl')) ## ======================= evaluate the result of forced alignment ======================= if eval_forced_alignment_htk: htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short') compare_hmm_num = 1 if compare_hmm_num: f_result = open(os.path.join(result_dir, 'result.csv'), 'w') f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n") for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]: #for hmm_num in [256]: hmm_num_str = str(hmm_num) if compare_hmm_num: f_result.write("{},".format(hmm_num_str)) #match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy') #prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy')) #prediction = pd.Series(prediction, index=df.index, name='prediction') #result = pd.concat([df, prediction], axis=1) result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl')) # load pronunciation variants for word in word_list: htk_dict_file = os.path.join(htk_dict_dir, word + '.dic') with open(htk_dict_file, 'r') as f: lines = f.read().split('\n')[:-1] pronunciation_variants = [line.split('\t')[1] for line in lines] # see only words which appears in top 3. result_ = result[result['word'].str.match(word)] result_ = result_[result_['famehtk'].isin(pronunciation_variants)] match_num = sum(result_['famehtk'] == result_['prediction']) total_num = len(result_) print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100)) if compare_hmm_num: f_result.write("{0},{1},".format(match_num, total_num)) else: # output confusion matrix cm = confusion_matrix(result_['famehtk'], result_['prediction']) plt.figure() plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False) plt.savefig(result_dir + '\\cm_' + word + '.png') if compare_hmm_num: f_result.write('\n') if compare_hmm_num: f_result.close() ## ======================= evaluate the result of forced alignment of kaldi ======================= if eval_forced_alignment_kaldi: result = pd.read_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl')) f_result = open(os.path.join(result_dir, 'result.csv'), 'w') f_result.write("word,total,valid,match,[%]\n") # load pronunciation variants with open(lexicon_txt, 'r', encoding="utf-8", newline='\n') as f: lines = f.read().split('\n')[:-1] pronunciation_variants_all = [line.split('\t') for line in lines] word_list = np.delete(word_list, [0], 0) # remove 'Oog' for word in word_list: # load pronunciation variant of the word. pronunciation_variants = [] for line in pronunciation_variants_all: if line[0] == word.lower(): pronunciation_variants.append(line[1].replace(' ', '')) # see only words which appears in top 3. result_ = result[result['word'].str.match(word)] result_tolerant = pd.DataFrame({ 'filename': [''], 'word': [''], 'xsampa': [''], 'ipa': [''], 'prediction': [''], 'match': ['']}) for i in range(0, len(result_)): line = result_.iloc[i] # make a list of all possible pronunciation variants of ipa description. # i.e. possible answers from forced alignment. ipa = line['ipa'] pronvar_list = [ipa] pronvar_list_ = am_func.fame_pronunciation_variant(ipa) if not pronvar_list_ is None: pronvar_list += list(pronvar_list_) # only focus on pronunciations which can be estimated from ipa. if len(set(pronvar_list) & set(pronunciation_variants)) > 0: if line['prediction'] in pronvar_list: ismatch = True else: ismatch = False line_df = pd.DataFrame(result_.iloc[i]).T df_idx = line_df.index[0] result_tolerant_ = pd.Series([line_df.loc[df_idx, 'filename'], line_df.loc[df_idx, 'word'], line_df.loc[df_idx, 'xsampa'], line_df.loc[df_idx, 'ipa'], line_df.loc[df_idx, 'prediction'], ismatch], index=['filename', 'word', 'xsampa', 'ipa', 'prediction', 'match'], name=df_idx) result_tolerant = result_tolerant.append(result_tolerant_) # remove the first entry (dummy) result_tolerant = result_tolerant.drop(0, axis=0) total_num = len(result_) valid_num = len(result_tolerant) match_num = np.sum(result_tolerant['match']) print("word '{0}': {1}/{2} ({3:.2f} %) originally {4}".format(word, match_num, valid_num, match_num/valid_num*100, total_num)) f_result.write("{0},{1},{2},{3},{4}\n".format(word, total_num, valid_num, match_num, match_num/valid_num*100)) f_result.close() ## output confusion matrix #cm = confusion_matrix(result_['ipa'], result_['prediction']) #plt.figure() #plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False) #plt.savefig(result_dir + '\\cm_' + word + '.png')