You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

319 lines
10 KiB

import os
import sys
import tempfile
import configparser
import subprocess
from collections import Counter
import numpy as np
import pandas as pd
## ======================= user define =======================
repo_dir = 'C:\\Users\\Aki\\source\\repos\\acoustic_model'
curr_dir = repo_dir + '\\acoustic_model'
config_ini = curr_dir + '\\config.ini'
output_dir = 'C:\\OneDrive\\Research\\rug\\experiments\\friesian\\acoustic_model'
forced_alignment_module = 'C:\\Users\\Aki\\source\\repos\\forced_alignment'
dataset_list = ['devel', 'test', 'train']
# procedure
extract_features = 0
make_feature_list = 0
conv_lexicon = 0
check_lexicon = 0
make_mlf = 0
combine_files = 0
flat_start = 0
train_model = 1
sys.path.append(os.path.join(os.path.dirname(sys.path[0]), curr_dir))
sys.path.append(forced_alignment_module)
from forced_alignment import convert_phone_set
import acoustic_model_functions as am_func
## ======================= load variables =======================
config = configparser.ConfigParser()
config.sections()
config.read(config_ini)
config_hcopy = config['Settings']['config_hcopy']
config_train = config['Settings']['config_train']
mkhmmdefs_pl = config['Settings']['mkhmmdefs_pl']
FAME_dir = config['Settings']['FAME_dir']
lex_asr = FAME_dir + '\\lexicon\\lex.asr'
lex_asr_htk = FAME_dir + '\\lexicon\\lex.asr_htk'
lex_oov = FAME_dir + '\\lexicon\\lex.oov'
lex_oov_htk = FAME_dir + '\\lexicon\\lex.oov_htk'
#lex_ipa = FAME_dir + '\\lexicon\\lex.ipa'
#lex_ipa_ = FAME_dir + '\\lexicon\\lex.ipa_'
#lex_ipa_htk = FAME_dir + '\\lexicon\\lex.ipa_htk'
lex_htk = FAME_dir + '\\lexicon\\lex_original.htk'
lex_htk_ = FAME_dir + '\\lexicon\\lex.htk'
hcompv_scp = output_dir + '\\scp\\combined.scp'
combined_mlf = output_dir + '\\label\\combined.mlf'
model_dir = output_dir + '\\model'
model0_dir = model_dir + '\\hmm0'
proto_init = model_dir + '\\proto38'
proto_name = 'proto'
phonelist = output_dir + '\\config\\phonelist_friesian.txt'
hmmdefs_name = 'hmmdefs'
## ======================= extract features =======================
if extract_features:
print("==== extract features ====\n")
for dataset in dataset_list:
print(dataset)
# a script file for HCopy
hcopy_scp = tempfile.NamedTemporaryFile(mode='w', delete=False)
hcopy_scp.close()
# get a list of features (hcopy.scp) from the filelist in FAME! corpus
feature_dir = output_dir + '\\mfc\\' + dataset
am_func.make_hcopy_scp_from_filelist_in_fame(FAME_dir, dataset, feature_dir, hcopy_scp.name)
# extract features
subprocessStr = 'HCopy -C ' + config_hcopy + ' -S ' + hcopy_scp.name
subprocess.call(subprocessStr, shell=True)
## ======================= make a list of features =======================
if make_feature_list:
print("==== make a list of features ====\n")
for dataset in dataset_list:
print(dataset)
feature_dir = output_dir + '\\mfc\\' + dataset
hcompv_scp = output_dir + '\\scp\\' + dataset + '.scp'
am_func.make_filelist(feature_dir, hcompv_scp)
## ======================= convert lexicon from ipa to fame_htk =======================
if conv_lexicon:
print('==== convert lexicon from ipa 2 fame ====\n')
# lex.asr is Kaldi compatible version of lex.ipa.
# to check...
#lexicon_ipa = pd.read_table(lex_ipa, names=['word', 'pronunciation'])
#with open(lex_ipa_, "w", encoding="utf-8") as fout:
# for word, pronunciation in zip(lexicon_ipa['word'], lexicon_ipa['pronunciation']):
# # ignore nasalization and '.'
# pronunciation_ = pronunciation.replace(u'ⁿ', '')
# pronunciation_ = pronunciation_.replace('.', '')
# pronunciation_split = convert_phone_set.split_ipa_fame(pronunciation_)
# fout.write("{0}\t{1}\n".format(word, ' '.join(pronunciation_split)))
# convert each lexicon from ipa description to fame_htk phoneset.
am_func.ipa2famehtk_lexicon(lex_oov, lex_oov_htk)
am_func.ipa2famehtk_lexicon(lex_asr, lex_asr_htk)
# combine lexicon
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
# therefore there is no overlap between lex_asr and lex_oov.
am_func.combine_lexicon(lex_asr_htk, lex_oov_htk, lex_htk)
## ======================= check if all the phones are successfully converted =======================
if check_lexicon:
print("==== check if all the phones are successfully converted. ====\n")
# the phones used in the lexicon.
phonelist_asr = am_func.get_phonelist(lex_asr)
phonelist_oov = am_func.get_phonelist(lex_oov)
phonelist_htk = am_func.get_phonelist(lex_htk)
phonelist = phonelist_asr.union(phonelist_oov)
# the lines which include a specific phone.
lines = am_func.find_phone(lex_asr, 'g')
# statistics over the lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
pronunciation = lexicon_htk['pronunciation']
phones_all = []
for word in pronunciation:
phones_all = phones_all + word.split()
c = Counter(phones_all)
## =======================
## manually make changes to the pronunciation dictionary and save it as lex.htk
## =======================
# (1) Replace all tabs with single space;
# (2) Put a '\' before any dictionary entry beginning with single quote
#http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
## ======================= make label file =======================
if make_mlf:
print("==== make mlf ====\n")
print("generating word level transcription...\n")
for dataset in dataset_list:
hcompv_scp = output_dir + '\\scp\\' + dataset + '.scp'
hcompv_scp2 = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
script_list = FAME_dir + '\\data\\' + dataset + '\\text'
mlf_word = output_dir + '\\label\\' + dataset + '_word.mlf'
mlf_phone = output_dir + '\\label\\' + dataset + '_phone.mlf'
# lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
# list of features
with open(hcompv_scp) as fin:
features = fin.read()
features = features.split('\n')
# list of scripts
with open(script_list, "rt", encoding="utf-8") as fin:
scripts = fin.read()
scripts = pd.Series(scripts.split('\n'))
i = 0
missing_words = []
fscp = open(hcompv_scp2, 'wt')
fmlf = open(mlf_word, "wt", encoding="utf-8")
fmlf.write("#!MLF!#\n")
feature_nr = 1
for feature in features:
sys.stdout.write("\r%d/%d" % (feature_nr, len(features)))
sys.stdout.flush()
feature_nr += 1
file_basename = os.path.basename(feature).replace('.mfc', '')
# get words from scripts.
try:
script = scripts[scripts.str.contains(file_basename)]
except IndexError:
script = []
if len(script) != 0:
script_id = script.index[0]
script_txt = script.get(script_id)
script_words = script_txt.split(' ')
del script_words[0]
# check if all words can be found in the lexicon.
SCRIPT_WORDS = []
script_prons = []
is_in_lexicon = 1
for word in script_words:
WORD = word.upper()
SCRIPT_WORDS.append(WORD)
extracted = lexicon_htk[lexicon_htk['word']==WORD]
if len(extracted) == 0:
missing_words.append(word)
script_prons.append(extracted)
is_in_lexicon *= len(extracted)
# if all pronunciations are found in the lexicon, update scp and mlf files.
if is_in_lexicon:
# add the feature filename into the .scp file.
fscp.write("{}\n".format(feature))
i += 1
# add the words to the mlf file.
fmlf.write('\"*/{}.lab\"\n'.format(file_basename))
#fmlf.write('{}'.format('\n'.join(SCRIPT_WORDS)))
for word_ in SCRIPT_WORDS:
if word_[0] == '\'':
word_ = '\\' + word_
fmlf.write('{}\n'.format(word_))
fmlf.write('.\n')
print("\n{0} has {1} samples.\n".format(dataset, i))
np.save(output_dir + '\\missing_words' + '_' + dataset + '.npy', missing_words)
fscp.close()
fmlf.close()
## generate phone level transcription
print("generating phone level transcription...\n")
mkphones = output_dir + '\\label\\mkphones0.txt'
subprocessStr = r"HLEd -l * -d " + lex_htk_ + ' -i ' + mlf_phone + ' ' + mkphones + ' ' + mlf_word
subprocess.call(subprocessStr, shell=True)
## ======================= combined scps and mlfs =======================
if combine_files:
print("==== combine scps and mlfs ====\n")
fscp = open(hcompv_scp, 'wt')
fmlf = open(combined_mlf, 'wt')
for dataset in dataset_list:
fmlf.write("#!MLF!#\n")
for dataset in dataset_list:
each_mlf = output_dir + '\\label\\' + dataset + '_phone.mlf'
each_scp = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
with open(each_mlf, 'r') as fin:
lines = fin.read()
lines = lines.split('\n')
fmlf.write('\n'.join(lines[1:]))
with open(each_scp, 'r') as fin:
lines = fin.read()
fscp.write(lines)
fscp.close()
fmlf.close()
## ======================= flat start monophones =======================
if flat_start:
subprocessStr = 'HCompV -T 1 -C ' + config_train + ' -m -v 0.01 -S ' + hcompv_scp + ' -M ' + model0_dir + ' ' + proto_init
subprocess.call(subprocessStr, shell=True)
# allocate mean & variance to all phones in the phone list
subprocessStr = 'perl ' + mkhmmdefs_pl + ' ' + model0_dir + '\\proto38' + ' ' + phonelist + ' > ' + model0_dir + '\\' + hmmdefs_name
subprocess.call(subprocessStr, shell=True)
## ======================= estimate monophones =======================
if train_model:
iter_num_max = 3
for mix_num in [128, 256, 512, 1024]:
for iter_num in range(1, iter_num_max+1):
print("===== mix{}, iter{} =====".format(mix_num, iter_num))
iter_num_pre = iter_num - 1
modelN_dir = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num)
if not os.path.exists(modelN_dir):
os.makedirs(modelN_dir)
if iter_num == 1 and mix_num == 1:
modelN_dir_pre = model0_dir
else:
modelN_dir_pre = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num_pre)
## re-estimation
subprocessStr = 'HERest -T 1 -C ' + config_train + ' -v 0.01 -I ' + combined_mlf + ' -H ' + modelN_dir_pre + '\\' + hmmdefs_name + ' -M ' + modelN_dir + ' ' + phonelist + ' -S ' + hcompv_scp
subprocess.call(subprocessStr, shell=True)
mix_num_next = mix_num * 2
modelN_dir_next = model_dir + '\\hmm' + str(mix_num_next) + '-0'
if not os.path.exists(modelN_dir_next):
os.makedirs(modelN_dir_next)
header_file = modelN_dir + '\\mix' + str(mix_num_next) + '.hed'
with open(header_file, 'w') as fout:
fout.write("MU %d {*.state[2-4].mix}" % (mix_num_next))
subprocessStr = 'HHEd -T 1 -H ' + modelN_dir + '\\' + hmmdefs_name + ' -M ' + modelN_dir_next + ' ' + header_file + ' ' + phonelist
subprocess.call(subprocessStr, shell=True)