import os import sys import csv import subprocess import configparser from collections import Counter import re import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix ## ======================= functions ======================= def read_fileFA(fileFA): """ read the result file of HTK forced alignment. this function only works when input is one word. """ with open(fileFA, 'r') as f: lines = f.read() lines = lines.split('\n') phones = [] for line in lines: line_split = line.split() if len(line_split) > 1: phones.append(line_split[2]) return ' '.join(phones) def make_dic(word, pronvar_, fileDic, output_type): """ make dict files which can be used for HTK. param word: target word. param pronvar_: pronunciation variant. nx2 (WORD /t pronunciation) ndarray. param fileDic: output dic file. param output_type: 0:full, 1:statistics, 2:frequency <2% entries are removed. 3:top 3. """ #assert(output_type < 4 and output_type >= 0, 'output_type should be an integer between 0 and 3.') if output_type == 0: # full pronvar = np.unique(pronvar_) with open(fileDic, 'w') as f: for pvar in pronvar: f.write('{0}\t{1}\n'.format(WORD, pvar)) else: c = Counter(pronvar_) total_num = sum(c.values()) with open(fileDic, 'w') as f: if output_type == 3: for key, value in c.most_common(3): f.write('{0}\t{1}\n'.format(WORD, key)) else: for key, value in c.items(): percentage = value/total_num*100 if output_type == 1: # all f.write('{0}\t{1:.2f}\t{2}\t{3}\n'.format(value, percentage, WORD, key)) elif output_type == 2: # less than 2 percent if percentage < 2: f.write('{0}\t{1}\n'.format(WORD, key)) ## ======================= user define ======================= curr_dir = r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model' config_ini = curr_dir + '\\config.ini' forced_alignment_module = r'C:\Users\Aki\source\repos\forced_alignment' forced_alignment_module_old = r'C:\OneDrive\Research\rug\code\forced_alignment\forced_alignment' ipa_xsampa_converter_dir = r'C:\Users\Aki\source\repos\ipa-xsama-converter' accent_classification_dir = r'C:\Users\Aki\source\repos\accent_classification\accent_classification' experiments_dir = r'C:\OneDrive\Research\rug\experiments' data_dir = experiments_dir + '\\stimmen\\data' csvfile = data_dir + '\\Frisian Variants Picture Task Stimmen.csv' cygwin_dir = r'C:\cygwin64\home\Aki\acoustic_model' # procedure convert_phones = 0 make_dic_files = 0 make_dic_files_short = 0 do_forced_alignment_htk = 0 make_kaldi_data_files = 0 make_kaldi_lexicon_txt = 0 load_forced_alignment_kaldi = 1 eval_forced_alignment = 0 ## ======================= add paths ======================= sys.path.append(forced_alignment_module) from forced_alignment import convert_phone_set # for interactive window sys.path.append(curr_dir) import convert_xsampa2ipa import acoustic_model_functions as am_func # for forced-alignment sys.path.append(forced_alignment_module_old) import pyHTK # to output confusion matrix sys.path.append(accent_classification_dir) from output_confusion_matrix import plot_confusion_matrix ## ======================= load variables ======================= config = configparser.ConfigParser() config.sections() config.read(config_ini) FAME_dir = config['Settings']['FAME_dir'] lex_asr = FAME_dir + '\\lexicon\\lex.asr' lex_asr_htk = FAME_dir + '\\lexicon\\lex.asr_htk' ## ======================= convert phones ====================== if convert_phones: mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', ipa_xsampa_converter_dir) ## check phones included in FAME! # the phones used in the lexicon. #phonelist = am_func.get_phonelist(lex_htk) # the lines which include a specific phone. #lines = am_func.find_phone(lex_asr, 'x') with open(csvfile, encoding="utf-8") as fin: lines = csv.reader(fin, delimiter=';', lineterminator="\n", skipinitialspace=True) next(lines, None) # skip the headers filenames = [] words = [] pronunciations = [] for line in lines: if line[1] is not '' and len(line) > 5: filenames.append(line[0]) words.append(line[1]) pron_xsampa = line[3] pron_ipa = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, pron_xsampa) pron_ipa = pron_ipa.replace('ː', ':') pron_famehtk = convert_phone_set.ipa2famehtk(pron_ipa) # adjust to phones used in the acoustic model. pron_famehtk = pron_famehtk.replace('sp', 'sil') pron_famehtk = pron_famehtk.replace('ce :', 'ce') # because ceh is ignored. pron_famehtk = pron_famehtk.replace('w :', 'wh') pron_famehtk = pron_famehtk.replace('e :', 'eh') pron_famehtk = pron_famehtk.replace('eh :', 'eh') pron_famehtk = pron_famehtk.replace('ih :', 'ih') #translation_key = {'sp': 'sil', 'ce :': 'ceh', 'w :': 'wh'} #pron = [] #for phoneme in pron_famehtk.split(' '): # pron.append(translation_key.get(phoneme, phoneme)) #pronunciations.append(' '.join(pron_famehtk)) pronunciations.append(pron_famehtk) # check if all phones are in the phonelist of the acoustic model. #phonelist = ' '.join(pronunciations) #np.unique(phonelist.split(' ')) #phonelist.find(':') filenames = np.array(filenames) words = np.array(words) pronunciations = np.array(pronunciations) del line, lines del pron_xsampa, pron_ipa, pron_famehtk np.save(data_dir + '\\filenames.npy', filenames) np.save(data_dir + '\\words.npy', words) np.save(data_dir + '\\pronunciations.npy', pronunciations) else: filenames = np.load(data_dir + '\\filenames.npy') words = np.load(data_dir + '\\words.npy') pronunciations = np.load(data_dir + '\\pronunciations.npy') word_list = np.unique(words) ## ======================= make dict files used for HTK. ====================== if make_dic_files: output_type = 2 output_dir = experiments_dir + r'\stimmen\dic_short' for word in word_list: WORD = word.upper() fileDic = output_dir + '\\' + word + '.dic' # pronunciation variant of the target word. pronvar_ = pronunciations[words == word] # remove '' pronvar_ = np.delete(pronvar_, np.where(pronvar_=='')) # make dic file. make_dic(word, pronvar_, fileDic, output_type) ## ======================= forced alignment using HTK ======================= if do_forced_alignment_htk: configHVite = cygwin_dir + r'\config\config.HVite' filePhoneList = experiments_dir + r'\friesian\acoustic_model\config\phonelist_friesian.txt' wav_dir = experiments_dir + r'\stimmen\wav' #hmm_num = 128 for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256]: hmm_num_str = str(hmm_num) AcousticModel = experiments_dir + r'\friesian\acoustic_model\model\hmm' + hmm_num_str + r'-2\hmmdefs' predictions = [] file_num_max = len(filenames) for i in range(0, file_num_max): #for i in range(500, 502): print('=== {0}/{1} ==='.format(i, file_num_max)) filename = filenames[i] fileWav = wav_dir + '\\' + filename if os.path.exists(fileWav): word = words[i] WORD = word.upper() # make label file. fileLab = wav_dir + '\\' + filename.replace('.wav', '.lab') with open(fileLab, 'w') as f: lines = f.write(WORD) fileDic = experiments_dir + r'\stimmen\dic_top3' + '\\' + word + '.dic' fileFA = experiments_dir + r'\stimmen\FA' + '\\' + filename.replace('.wav', '.txt') + hmm_num_str pyHTK.doHVite(fileWav, fileLab, fileDic, fileFA, configHVite, filePhoneList, AcousticModel) prediction = read_fileFA(fileFA) predictions.append(prediction) os.remove(fileLab) print('{0}: {1} -> {2}'.format(WORD, pronunciations[i], prediction)) else: predictions.append('') print('!!!!! file not found.') predictions = np.array(predictions) match = np.c_[words[predictions != ''], pronunciations[predictions != ''], predictions[predictions != '']] np.save(data_dir + '\\match_hmm' + hmm_num_str + '.npy', match) ## ======================= make files which is used for forced alignment by Kaldi ======================= if make_kaldi_data_files: wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' kaldi_data_dir = os.path.join(kaldi_work_dir, 'data', 'alignme') kaldi_dict_dir = os.path.join(kaldi_work_dir, 'data', 'local', 'dict') htk_dict_dir = os.path.join(experiments_dir, 'stimmen', 'dic_top3') wav_scp = os.path.join(kaldi_data_dir, 'wav.scp') text_file = os.path.join(kaldi_data_dir, 'text') utt2spk = os.path.join(kaldi_data_dir, 'utt2spk') lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') predictions = [] file_num_max = len(filenames) # remove previous files. if os.path.exists(wav_scp): os.remove(wav_scp) if os.path.exists(text_file): os.remove(text_file) if os.path.exists(utt2spk): os.remove(utt2spk) f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n') f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n') f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n') # make wav.scp, text, and utt2spk files. for i in range(0, file_num_max): #for i in range(400, 410): print('=== {0}/{1} ==='.format(i+1, file_num_max)) filename = filenames[i] wav_file = wav_dir + '\\' + filename if os.path.exists(wav_file): speaker_id = 'speaker_' + str(i).zfill(4) utterance_id = filename.replace('.wav', '') utterance_id = utterance_id.replace(' ', '_') utterance_id = speaker_id + '-' + utterance_id # wav.scp file wav_file_unix = wav_file.replace('\\', '/') wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/') f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix)) # text file word = words[i].lower() f_text_file.write('{0}\t{1}\n'.format(utterance_id, word)) # utt2spk f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id)) f_wav_scp.close() f_text_file.close() f_utt2spk.close() ## ======================= make lexicon txt which is used by Kaldi ======================= if make_kaldi_lexicon_txt: kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' kaldi_dict_dir = os.path.join(kaldi_work_dir, 'data', 'local', 'dict') lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') option_num = 5 # remove previous file. if os.path.exists(lexicon_txt): os.remove(lexicon_txt) mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', ipa_xsampa_converter_dir) with open(csvfile, encoding="utf-8") as fin: lines = csv.reader(fin, delimiter=';', lineterminator="\n", skipinitialspace=True) next(lines, None) # skip the headers filenames = [] words = [] pronunciations = [] p = [] for line in lines: if line[1] is not '' and len(line) > 5: filenames.append(line[0]) words.append(line[1]) pron_xsampa = line[3] pron_ipa = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, pron_xsampa) pron_ipa = pron_ipa.replace('ː', ':') # adjust to phones used in the acoustic model. pronunciations.append(pron_ipa) # check if all phones are in the phonelist of the acoustic model. #'y', 'b', 'ɾ', 'u', 'ɔ:', 'ø', 't', 'œ', 'n', 'ɒ', 'ɐ', 'f', 'o', 'k', 'x', 'ɡ', 'v', 's', 'ɛ:', 'ɪ:', 'ɑ', 'ɛ', 'a', 'd', 'z', 'ɪ', 'ɔ', 'l', 'i:', 'm', 'p', 'a:', 'i', 'e', 'j', 'o:', 'ʁ', 'h', ':', 'e:', 'ə', 'æ', 'χ', 'w', 'r', 'ə:', 'sp', 'ʊ', 'u:', 'ŋ' filenames = np.array(filenames) words = np.array(words) wordlist = np.unique(words) pronunciations = np.array(pronunciations) # output lexicon.txt #f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n') pronvar_list_all = [] for word in word_list: # pronunciation variant of the target word. pronvar_ = pronunciations[words == word] # remove '' pronvar_ = np.delete(pronvar_, np.where(pronvar_=='')) c = Counter(pronvar_) total_num = sum(c.values()) for key, value in c.most_common(option_num): #print('{0}\t{1}\t{2}\t{3}'.format(word, key, value, total_num)) key = key.replace('æ', 'ɛ') key = key.replace('ɐ', 'a') key = key.replace('ɑ', 'a') key = key.replace('ɾ', 'r') key = key.replace('ʁ', 'r') key = key.replace('ʊ', 'u') key = key.replace('χ', 'x') #print('-->{0}\t{1}\t{2}\t{3}\n'.format(word, key, value, total_num)) # make possible pronounciation variant list. pronvar_list = [key] while 'ø:' in ' '.join(pronvar_list) or 'œ' in ' '.join(pronvar_list) or 'ɒ' in ' '.join(pronvar_list): pronvar_list_ = [] for p in pronvar_list: if 'ø:' in p: pronvar_list_.append(p.replace('ø:', 'ö')) pronvar_list_.append(p.replace('ø:', 'ö:')) if 'œ' in p: pronvar_list_.append(p.replace('œ', 'ɔ̈')) pronvar_list_.append(p.replace('œ', 'ɔ̈:')) if 'ɒ' in p: pronvar_list_.append(p.replace('ɒ', 'ɔ̈')) pronvar_list_.append(p.replace('ɒ', 'ɔ̈:')) pronvar_list = np.unique(pronvar_list_) for pronvar_ in pronvar_list: split_ipa = convert_phone_set.split_ipa_fame(pronvar_) pronvar_out = ' '.join(split_ipa) pronvar_list_all.append([word, pronvar_out]) # output pronvar_list_all = np.array(pronvar_list_all) pronvar_list_all = np.unique(pronvar_list_all, axis=0) #f_lexicon_txt.write('\tSPN\n') #for line in pronvar_list_all: # f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1])) #f_lexicon_txt.close() ## ======================= load kaldi forced alignment result ======================= if load_forced_alignment_kaldi: kaldi_work_dir = r'C:\OneDrive\WSL\kaldi-trunk\egs\fame\s5' phones_txt = kaldi_work_dir + '\\data\\lang\\phones.txt' merged_alignment_txt = kaldi_work_dir + '\\exp\\tri1_alignme\\merged_alignment.txt' filenames = np.load(data_dir + '\\filenames.npy') words = np.load(data_dir + '\\words.npy') pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy') pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy') word_list = np.unique(words) # load the mapping between phones and ids. with open(phones_txt, 'r', encoding="utf-8") as f: mappings = f.read().split('\n') phones = [] phone_ids = [] for m in mappings: m = m.split(' ') if len(m) > 1: phones.append(m[0]) phone_ids.append(int(m[1])) with open(merged_alignment_txt, 'r') as f: lines = f.read() lines = lines.split('\n') fa_filenames = [] fa_pronunciations = [] filename_ = '' pron = [] for line in lines: line = line.split(' ') if len(line) == 5: filename = line[0] if filename == filename_: phone_id = int(line[4]) #if not phone_id == 1: phone = phones[phone_ids.index(phone_id)] pron_ = re.sub(r'_[A-Z]', '', phone) if not pron_ == 'SIL': pron.append(pron_) else: fa_filenames.append(re.sub(r'speaker_[0-9]{4}-', '', filename)) fa_pronunciations.append(' '.join(pron)) pron = [] filename_ = filename # correct or not. for filename, fa_pronunciation in zip(fa_filenames, fa_pronunciations): ## ======================= evaluate the result of forced alignment ======================= if eval_forced_alignment: match_num = [] for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256]: #hmm_num = 256 hmm_num_str = str(hmm_num) match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy') # use dic_short? if 1: pronunciation_variants = np.array(['WORD', 'pronunciation']).reshape(1, 2) for word in word_list: fileDic = experiments_dir + r'\stimmen\dic_top3' + '\\' + word + '.dic' pronunciation_variants = np.r_[pronunciation_variants, pyHTK.loadHTKdic(fileDic)] # see only words which appears in top 3. match_short = [] for line in match: word = line[0] WORD = word.upper() pronvar = pronunciation_variants[pronunciation_variants[:, 0] == word.upper(), 1] if line[1] in pronvar: match_short.append(line) match_short = np.array(match_short) match = np.copy(match_short) # number of match total_match = sum(match[:, 1] == match[:, 2]) print("{}: {}/{}".format(hmm_num_str, total_match, match.shape[0])) match_num.append([hmm_num, total_match, match.shape[0]]) # number of mixtures vs accuracy match_num = np.array(match_num) plt.xscale("log") plt.plot(match_num[:, 0], match_num[:, 1]/match_num[0, 2], 'o-') plt.xlabel('number of mixtures', fontsize=14, fontweight='bold') plt.ylabel('accuracy', fontsize=14, fontweight='bold') plt.show() # confusion matrix #dir_out = r'C:\OneDrive\Research\rug\experiments\stimmen\result' #word_list = np.unique(match[:, 0]) #for word in word_list: # match_ = match[match[:, 0] == word, :] # cm = confusion_matrix(match_[:, 1], match_[:, 2]) # pronvar = pronunciation_variants[pronunciation_variants[:, 0] == word.upper(), 1] # plt.figure() # plot_confusion_matrix(cm, classes=pronvar, normalize=True) # plt.savefig(dir_out + '\\cm_' + word + '.png')