import os os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model') import sys import csv import subprocess from collections import Counter import re import numpy as np import pandas as pd import matplotlib.pyplot as plt #from sklearn.metrics import confusion_matrix import acoustic_model_functions as am_func import convert_xsampa2ipa import defaultfiles as default ## ======================= user define ======================= #curr_dir = r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model' #config_ini = 'config.ini' #repo_dir = r'C:\Users\Aki\source\repos' #forced_alignment_module = repo_dir + '\\forced_alignment' #forced_alignment_module_old = repo_dir + '\\aki_tools' #ipa_xsampa_converter_dir = repo_dir + '\\ipa-xsama-converter' #accent_classification_dir = repo_dir + '\\accent_classification\accent_classification' excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx') #experiments_dir = r'C:\OneDrive\Research\rug\experiments' data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data') #csvfile = data_dir + '\\Frisian Variants Picture Task Stimmen.csv' wav_dir = os.path.join(default.experiments_dir, 'stimmen', 'wav') acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model') htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short') fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA') #cygwin_dir = r'C:\cygwin64\home\Aki\acoustic_model' #lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr') #lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk') # procedure make_dic_files = 0 do_forced_alignment_htk = 1 make_kaldi_data_files = 0 make_kaldi_lexicon_txt = 0 load_forced_alignment_kaldi = 0 eval_forced_alignment = 0 ## ======================= add paths ======================= sys.path.append(os.path.join(default.repo_dir, 'forced_alignment')) from forced_alignment import convert_phone_set from forced_alignment import pyhtk sys.path.append(os.path.join(default.repo_dir, 'toolbox')) #import pyHTK from evaluation import plot_confusion_matrix ## ======================= convert phones ====================== mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir) xls = pd.ExcelFile(excel_file) ## check conversion #df = pd.read_excel(xls, 'frequency') #for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']): # #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_) # ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa) # if not ipa_converted == ipa: # print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa)) ## check phones included in FAME! # the phones used in the lexicon. #phonelist = am_func.get_phonelist(lex_asr) # the lines which include a specific phone. #lines = am_func.find_phone(lex_asr, 'x') # Filename, Word, Self Xsampa df = pd.read_excel(xls, 'original') ipas = [] famehtks = [] for xsampa in df['Self Xsampa']: if not isinstance(xsampa, float): # 'NaN' # typo? xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t') xsampa = xsampa.replace(';', ':') ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa) ipa = ipa.replace('ː', ':') ipa = ipa.replace(' ', '') ipas.append(ipa) famehtk = convert_phone_set.ipa2famehtk(ipa) famehtks.append(famehtk) else: ipas.append('') famehtks.append('') # extract interesting cols. df = pd.DataFrame({'filename': df['Filename'], 'word': df['Word'], 'xsampa': df['Self Xsampa'], 'ipa': pd.Series(ipas), 'famehtk': pd.Series(famehtks)}) # cleansing. df = df[~df['famehtk'].isin(['/', ''])] ## ======================= make dict files used for HTK. ====================== if make_dic_files: word_list = np.unique(df['word']) output_type = 3 for word in word_list: htk_dict_file = htk_dict_dir + '\\' + word + '.dic' # pronunciation variant of the target word. pronvar_ = df['famehtk'][df['word'].str.match(word)] # make dic file. am_func.make_dic(word, pronvar_, htk_dict_file, output_type) ## ======================= forced alignment using HTK ======================= if do_forced_alignment_htk: #hmm_num = 2 for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]: hmm_num_str = str(hmm_num) acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs') predictions = [] for i, filename in enumerate(df['filename']): print('=== {0}/{1} ==='.format(i, len(df))) wav_file = os.path.join(wav_dir, filename) if os.path.exists(wav_file) and i in df['filename'].keys(): word = df['word'][i] WORD = word.upper() # make label file. label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab')) with open(label_file, 'w') as f: lines = f.write(WORD) htk_dict_file = os.path.join(htk_dict_dir, word + '.dic') fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str) pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite, default.phonelist, acoustic_model) prediction = am_func.read_fileFA(fa_file) predictions.append(prediction) os.remove(label_file) print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction)) else: predictions.append('') print('!!!!! file not found.') predictions = np.array(predictions) #match = np.c_[words[predictions != ''], pronunciations[predictions != ''], predictions[predictions != '']] np.save(os.path.join(data_dir, 'predictions_hmm' + hmm_num_str + '.npy'), predictions) ## ======================= make files which is used for forced alignment by Kaldi ======================= if make_kaldi_data_files: wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' kaldi_data_dir = os.path.join(kaldi_work_dir, 'data', 'alignme') kaldi_dict_dir = os.path.join(kaldi_work_dir, 'data', 'local', 'dict') htk_dict_dir = os.path.join(experiments_dir, 'stimmen', 'dic_top3') wav_scp = os.path.join(kaldi_data_dir, 'wav.scp') text_file = os.path.join(kaldi_data_dir, 'text') utt2spk = os.path.join(kaldi_data_dir, 'utt2spk') lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') predictions = [] file_num_max = len(filenames) # remove previous files. if os.path.exists(wav_scp): os.remove(wav_scp) if os.path.exists(text_file): os.remove(text_file) if os.path.exists(utt2spk): os.remove(utt2spk) f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n') f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n') f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n') # make wav.scp, text, and utt2spk files. for i in range(0, file_num_max): #for i in range(400, 410): print('=== {0}/{1} ==='.format(i+1, file_num_max)) filename = filenames[i] wav_file = wav_dir + '\\' + filename if os.path.exists(wav_file): speaker_id = 'speaker_' + str(i).zfill(4) utterance_id = filename.replace('.wav', '') utterance_id = utterance_id.replace(' ', '_') utterance_id = speaker_id + '-' + utterance_id # wav.scp file wav_file_unix = wav_file.replace('\\', '/') wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/') f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix)) # text file word = words[i].lower() f_text_file.write('{0}\t{1}\n'.format(utterance_id, word)) # utt2spk f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id)) f_wav_scp.close() f_text_file.close() f_utt2spk.close() ## ======================= make lexicon txt which is used by Kaldi ======================= if make_kaldi_lexicon_txt: kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' kaldi_dict_dir = os.path.join(kaldi_work_dir, 'data', 'local', 'dict') lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') option_num = 5 # remove previous file. if os.path.exists(lexicon_txt): os.remove(lexicon_txt) mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', ipa_xsampa_converter_dir) with open(csvfile, encoding="utf-8") as fin: lines = csv.reader(fin, delimiter=';', lineterminator="\n", skipinitialspace=True) next(lines, None) # skip the headers filenames = [] words = [] pronunciations = [] p = [] for line in lines: if line[1] is not '' and len(line) > 5: filenames.append(line[0]) words.append(line[1]) pron_xsampa = line[3] pron_ipa = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, pron_xsampa) pron_ipa = pron_ipa.replace('ː', ':') # adjust to phones used in the acoustic model. pronunciations.append(pron_ipa) # check if all phones are in the phonelist of the acoustic model. #'y', 'b', 'ɾ', 'u', 'ɔ:', 'ø', 't', 'œ', 'n', 'ɒ', 'ɐ', 'f', 'o', 'k', 'x', 'ɡ', 'v', 's', 'ɛ:', 'ɪ:', 'ɑ', 'ɛ', 'a', 'd', 'z', 'ɪ', 'ɔ', 'l', 'i:', 'm', 'p', 'a:', 'i', 'e', 'j', 'o:', 'ʁ', 'h', ':', 'e:', 'ə', 'æ', 'χ', 'w', 'r', 'ə:', 'sp', 'ʊ', 'u:', 'ŋ' filenames = np.array(filenames) words = np.array(words) wordlist = np.unique(words) pronunciations = np.array(pronunciations) # output lexicon.txt #f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n') pronvar_list_all = [] for word in word_list: # pronunciation variant of the target word. pronvar_ = pronunciations[words == word] # remove '' pronvar_ = np.delete(pronvar_, np.where(pronvar_=='')) c = Counter(pronvar_) total_num = sum(c.values()) for key, value in c.most_common(option_num): #print('{0}\t{1}\t{2}\t{3}'.format(word, key, value, total_num)) key = key.replace('æ', 'ɛ') key = key.replace('ɐ', 'a') key = key.replace('ɑ', 'a') key = key.replace('ɾ', 'r') key = key.replace('ʁ', 'r') key = key.replace('ʊ', 'u') key = key.replace('χ', 'x') #print('-->{0}\t{1}\t{2}\t{3}\n'.format(word, key, value, total_num)) # make possible pronounciation variant list. pronvar_list = [key] while 'ø:' in ' '.join(pronvar_list) or 'œ' in ' '.join(pronvar_list) or 'ɒ' in ' '.join(pronvar_list): pronvar_list_ = [] for p in pronvar_list: if 'ø:' in p: pronvar_list_.append(p.replace('ø:', 'ö')) pronvar_list_.append(p.replace('ø:', 'ö:')) if 'œ' in p: pronvar_list_.append(p.replace('œ', 'ɔ̈')) pronvar_list_.append(p.replace('œ', 'ɔ̈:')) if 'ɒ' in p: pronvar_list_.append(p.replace('ɒ', 'ɔ̈')) pronvar_list_.append(p.replace('ɒ', 'ɔ̈:')) pronvar_list = np.unique(pronvar_list_) for pronvar_ in pronvar_list: split_ipa = convert_phone_set.split_fame_ipa(pronvar_) pronvar_out = ' '.join(split_ipa) pronvar_list_all.append([word, pronvar_out]) # output pronvar_list_all = np.array(pronvar_list_all) pronvar_list_all = np.unique(pronvar_list_all, axis=0) #f_lexicon_txt.write('\tSPN\n') #for line in pronvar_list_all: # f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1])) #f_lexicon_txt.close() ## ======================= load kaldi forced alignment result ======================= if load_forced_alignment_kaldi: kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' phones_txt = kaldi_work_dir + '\\data\\lang\\phones.txt' merged_alignment_txt = kaldi_work_dir + '\\exp\\tri1_alignme\\merged_alignment.txt' filenames = np.load(data_dir + '\\filenames.npy') words = np.load(data_dir + '\\words.npy') pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy') pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy') word_list = np.unique(words) # load the mapping between phones and ids. with open(phones_txt, 'r', encoding="utf-8") as f: mappings = f.read().split('\n') phones = [] phone_ids = [] for m in mappings: m = m.split(' ') if len(m) > 1: phones.append(m[0]) phone_ids.append(int(m[1])) with open(merged_alignment_txt, 'r') as f: lines = f.read() lines = lines.split('\n') fa_filenames = [] fa_pronunciations = [] filename_ = '' pron = [] for line in lines: line = line.split(' ') if len(line) == 5: filename = line[0] if filename == filename_: phone_id = int(line[4]) #if not phone_id == 1: phone = phones[phone_ids.index(phone_id)] pron_ = re.sub(r'_[A-Z]', '', phone) if not pron_ == 'SIL': pron.append(pron_) else: fa_filenames.append(re.sub(r'speaker_[0-9]{4}-', '', filename)) fa_pronunciations.append(' '.join(pron)) pron = [] filename_ = filename # correct or not. #for filename, fa_pronunciation in zip(fa_filenames, fa_pronunciations): ## ======================= evaluate the result of forced alignment ======================= if eval_forced_alignment: match_num = [] for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256]: #hmm_num = 256 hmm_num_str = str(hmm_num) match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy') # use dic_short? if 1: pronunciation_variants = np.array(['WORD', 'pronunciation']).reshape(1, 2) for word in word_list: fileDic = experiments_dir + r'\stimmen\dic_top3' + '\\' + word + '.dic' pronunciation_variants = np.r_[pronunciation_variants, pyHTK.loadHTKdic(fileDic)] # see only words which appears in top 3. match_short = [] for line in match: word = line[0] WORD = word.upper() pronvar = pronunciation_variants[pronunciation_variants[:, 0] == word.upper(), 1] if line[1] in pronvar: match_short.append(line) match_short = np.array(match_short) match = np.copy(match_short) # number of match total_match = sum(match[:, 1] == match[:, 2]) print("{}: {}/{}".format(hmm_num_str, total_match, match.shape[0])) match_num.append([hmm_num, total_match, match.shape[0]]) # number of mixtures vs accuracy match_num = np.array(match_num) plt.xscale("log") plt.plot(match_num[:, 0], match_num[:, 1]/match_num[0, 2], 'o-') plt.xlabel('number of mixtures', fontsize=14, fontweight='bold') plt.ylabel('accuracy', fontsize=14, fontweight='bold') plt.show() # confusion matrix #dir_out = r'C:\OneDrive\Research\rug\experiments\stimmen\result' #word_list = np.unique(match[:, 0]) #for word in word_list: # match_ = match[match[:, 0] == word, :] # cm = confusion_matrix(match_[:, 1], match_[:, 2]) # pronvar = pronunciation_variants[pronunciation_variants[:, 0] == word.upper(), 1] # plt.figure() # plot_confusion_matrix(cm, classes=pronvar, normalize=True) # plt.savefig(dir_out + '\\cm_' + word + '.png')