260 lines
9.4 KiB
Python
260 lines
9.4 KiB
Python
import sys
|
|
import os
|
|
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
|
|
|
|
import tempfile
|
|
import shutil
|
|
import glob
|
|
import time
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
import fame_functions
|
|
from phoneset import fame_ipa, fame_asr
|
|
import defaultfiles as default
|
|
sys.path.append(default.toolbox_dir)
|
|
import file_handling as fh
|
|
from htk import pyhtk
|
|
|
|
|
|
## ======================= user define =======================
|
|
# procedure
|
|
make_lexicon = 0
|
|
make_label = 0 # it takes roughly 4800 sec on Surface pro 2.
|
|
make_htk_files = 0
|
|
extract_features = 0
|
|
flat_start = 0
|
|
train_model_without_sp = 1
|
|
|
|
|
|
# pre-defined values.
|
|
|
|
dataset_list = ['devel', 'test', 'train']
|
|
hmmdefs_name = 'hmmdefs'
|
|
|
|
lexicon_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
|
|
lexicon_oov = os.path.join(default.fame_dir, 'lexicon', 'lex.oov')
|
|
|
|
config_dir = os.path.join(default.htk_dir, 'config')
|
|
config_hcopy = os.path.join(config_dir, 'config.HCopy')
|
|
config_train = os.path.join(config_dir, 'config.train')
|
|
global_ded = os.path.join(config_dir, 'global.ded')
|
|
mkphones_led = os.path.join(config_dir, 'mkphones.led')
|
|
prototype = os.path.join(config_dir, 'proto39')
|
|
|
|
model_dir = os.path.join(default.htk_dir, 'model')
|
|
|
|
|
|
# directories / files to be made.
|
|
|
|
lexicon_dir = os.path.join(default.htk_dir, 'lexicon')
|
|
lexicon_htk_asr = os.path.join(lexicon_dir, 'lex.htk_asr')
|
|
lexicon_htk_oov = os.path.join(lexicon_dir, 'lex.htk_oov')
|
|
lexicon_htk = os.path.join(lexicon_dir, 'lex.htk')
|
|
|
|
phonelist_txt = os.path.join(config_dir, 'phonelist.txt')
|
|
model0_dir = os.path.join(model_dir, 'hmm0')
|
|
|
|
feature_dir = os.path.join(default.htk_dir, 'mfc')
|
|
if not os.path.exists(feature_dir):
|
|
os.makedirs(feature_dir)
|
|
tmp_dir = os.path.join(default.htk_dir, 'tmp')
|
|
if not os.path.exists(tmp_dir):
|
|
os.makedirs(tmp_dir)
|
|
label_dir = os.path.join(default.htk_dir, 'label')
|
|
if not os.path.exists(label_dir):
|
|
os.makedirs(label_dir)
|
|
|
|
|
|
## ======================= make lexicon for HTK =======================
|
|
if make_lexicon:
|
|
timer_start = time.time()
|
|
print('==== making lexicon for HTK ====')
|
|
|
|
# convert each lexicon from fame_asr phoneset to fame_htk phoneset.
|
|
print('>>> converting each lexicon from fame_asr phoneset to fame_htk phoneset...')
|
|
fame_functions.lexicon_asr2htk(lexicon_asr, lexicon_htk_asr)
|
|
fame_functions.lexicon_asr2htk(lexicon_oov, lexicon_htk_oov)
|
|
|
|
# combine lexicon
|
|
print('>>> combining lexicon files into one lexicon...')
|
|
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
|
|
# therefore there is no overlap between lex_asr and lex_oov.
|
|
fame_functions.combine_lexicon(lexicon_htk_asr, lexicon_htk_oov, lexicon_htk)
|
|
|
|
## =======================
|
|
## manually make changes to the pronunciation dictionary and save it as lex.htk
|
|
## =======================
|
|
# (1) Replace all tabs with single space;
|
|
# (2) Put a '\' before any dictionary entry beginning with single quote
|
|
#http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
|
|
fame_functions.fix_single_quote(lexicon_htk)
|
|
print("elapsed time: {}".format(time.time() - timer_start))
|
|
|
|
|
|
## ======================= make label files =======================
|
|
if make_label:
|
|
for dataset in dataset_list:
|
|
timer_start = time.time()
|
|
print("==== making label files on dataset {}".format(dataset))
|
|
|
|
script_list = os.path.join(default.fame_dir, 'data', dataset, 'text')
|
|
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
|
|
label_dir_ = os.path.join(label_dir, dataset)
|
|
dictionary_file = os.path.join(label_dir_, 'temp.dic')
|
|
fh.make_new_directory(label_dir_)
|
|
|
|
# list of scripts
|
|
with open(script_list, "rt", encoding="utf-8") as fin:
|
|
scripts = fin.read().split('\n')
|
|
|
|
for line in scripts:
|
|
# sample line:
|
|
# sp0457m_test_1968_plakkenfryslanterhorne_2168 en dan begjinne je natuerlik
|
|
filename_ = line.split(' ')[0]
|
|
filename = '_'.join(filename_.split('_')[1:])
|
|
sentence = ' '.join(line.split(' ')[1:])
|
|
sentence_htk = fame_functions.word2htk(sentence)
|
|
|
|
wav_file = os.path.join(wav_dir_, filename + '.wav')
|
|
if os.path.exists(wav_file) and pyhtk.can_be_ascii(sentence_htk) == 0:
|
|
if pyhtk.create_dictionary_without_log(
|
|
sentence_htk, global_ded, dictionary_file, lexicon_htk) == 0:
|
|
# when the file name is too long, HDMan command does not work.
|
|
# therefore first temporary dictionary_file is made, then renamed.
|
|
shutil.move(dictionary_file, os.path.join(label_dir_, filename + '.dic'))
|
|
|
|
label_file = os.path.join(label_dir_, filename + '.lab')
|
|
pyhtk.create_label_file(sentence_htk, label_file)
|
|
else:
|
|
os.remove(dictionary_file)
|
|
print("elapsed time: {}".format(time.time() - timer_start))
|
|
|
|
|
|
## ======================= make other required files =======================
|
|
if make_htk_files:
|
|
timer_start = time.time()
|
|
print("==== making files required for HTK ====")
|
|
|
|
print(">>> making a phonelist...")
|
|
pyhtk.create_phonelist_file(fame_asr.phoneset_htk, phonelist_txt)
|
|
|
|
for dataset in dataset_list:
|
|
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
|
|
feature_dir_ = os.path.join(feature_dir, dataset)
|
|
label_dir_ = os.path.join(label_dir, dataset)
|
|
mlf_word = os.path.join(label_dir, dataset + '_word.mlf')
|
|
mlf_phone = os.path.join(label_dir, dataset + '_phone.mlf')
|
|
|
|
#print(">>> making a script file for {}...".format(dataset))
|
|
#listdir = glob.glob(os.path.join(wav_dir_, '*.dic'))
|
|
#mfc_list = [filename.replace(wav_dir_, feature_dir_).replace('.dic', '.mfc') for filename in listdir]
|
|
#hcompv_scp = os.path.join(tmp_dir, dataset + '.scp')
|
|
#with open(hcompv_scp, 'wb') as f:
|
|
# f.write(bytes('\n'.join(mfc_list) + '\n', 'ascii'))
|
|
|
|
print(">>> making a mlf file for {}...".format(dataset))
|
|
lab_list = glob.glob(os.path.join(label_dir_, '*.lab'))
|
|
with open(mlf_word, 'wb') as fmlf:
|
|
fmlf.write(bytes('#!MLF!#\n', 'ascii'))
|
|
for label_file in lab_list:
|
|
filename = os.path.basename(label_file)
|
|
fmlf.write(bytes('\"*/{}\"\n'.format(filename), 'ascii'))
|
|
with open(label_file) as flab:
|
|
lines = flab.read()
|
|
fmlf.write(bytes(lines + '.\n', 'ascii'))
|
|
|
|
print(">>> generating phone level transcription for {}...".format(dataset))
|
|
pyhtk.mlf_word2phone(lexicon_htk, mlf_phone, mlf_word, mkphones_led)
|
|
print("elapsed time: {}".format(time.time() - timer_start))
|
|
|
|
|
|
## ======================= extract features =======================
|
|
if extract_features:
|
|
for dataset in dataset_list:
|
|
timer_start = time.time()
|
|
print('==== extract features on dataset {} ===='.format(dataset))
|
|
|
|
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
|
|
label_dir_ = os.path.join(label_dir, dataset)
|
|
feature_dir_ = os.path.join(feature_dir, dataset)
|
|
fh.make_new_directory(feature_dir_)
|
|
|
|
# a script file for HCopy
|
|
print(">>> making a script file for HCopy...")
|
|
hcopy_scp = tempfile.NamedTemporaryFile(mode='w', delete=False)
|
|
hcopy_scp.close()
|
|
|
|
# get a list of features (hcopy.scp)
|
|
# from the filelist in FAME! corpus.
|
|
#fame_functions.make_hcopy_scp_from_filelist_in_fame(default.fame_dir, dataset, feature_dir_, hcopy_scp.name)
|
|
# from the list of label files.
|
|
lab_list = glob.glob(os.path.join(label_dir_, '*.lab'))
|
|
feature_list = [
|
|
os.path.join(wav_dir_, os.path.basename(lab_file).replace('.lab', '.wav')) + '\t'
|
|
+ os.path.join(feature_dir_, os.path.basename(lab_file).replace('.lab', '.mfc'))
|
|
for lab_file in lab_list]
|
|
with open(hcopy_scp.name, 'wb') as f:
|
|
f.write(bytes('\n'.join(feature_list), 'ascii'))
|
|
|
|
# extract features.
|
|
print(">>> extracting features on {}...".format(dataset))
|
|
pyhtk.wav2mfc(config_hcopy, hcopy_scp.name)
|
|
os.remove(hcopy_scp.name)
|
|
|
|
# make hcompv.scp.
|
|
print(">>> making a script file for {}...".format(dataset))
|
|
listdir = glob.glob(os.path.join(label_dir_, '*.dic'))
|
|
mfc_list = [filename.replace(label_dir_, feature_dir_).replace('.dic', '.mfc') for filename in listdir]
|
|
hcompv_scp = os.path.join(tmp_dir, dataset + '.scp')
|
|
with open(hcompv_scp, 'wb') as f:
|
|
f.write(bytes('\n'.join(mfc_list) + '\n', 'ascii'))
|
|
|
|
print("elapsed time: {}".format(time.time() - timer_start))
|
|
|
|
|
|
## ======================= flat start monophones =======================
|
|
if flat_start:
|
|
hcompv_scp = os.path.join(tmp_dir, 'test.scp')
|
|
|
|
timer_start = time.time()
|
|
print('==== flat start ====')
|
|
pyhtk.flat_start(config_train, hcompv_scp, model0_dir, prototype)
|
|
|
|
# allocate mean & variance to all phones in the phone list
|
|
pyhtk.create_hmmdefs(
|
|
os.path.join(model0_dir, 'proto39'),
|
|
os.path.join(model0_dir, 'hmmdefs'),
|
|
phonelist_txt)
|
|
print("elapsed time: {}".format(time.time() - timer_start))
|
|
|
|
|
|
## ======================= estimate monophones =======================
|
|
if train_model_without_sp:
|
|
hcompv_scp = os.path.join(tmp_dir, 'test.scp')
|
|
mlf_file = os.path.join(label_dir, 'test_phone.mlf')
|
|
output_dir = os.path.join(model_dir, 'hmm1')
|
|
fh.make_new_directory(output_dir)
|
|
|
|
print('==== train model without sp ====')
|
|
if not os.path.exists(os.path.join(output_dir, 'iter0')):
|
|
shutil.copytree(model0_dir, os.path.join(output_dir, 'iter0'))
|
|
niter = 1
|
|
for niter in range(1, 5):
|
|
timer_start = time.time()
|
|
hmm_n = 'iter' + str(niter)
|
|
hmm_n_pre = 'iter' + str(niter-1)
|
|
modeln_dir = os.path.join(output_dir, hmm_n)
|
|
modeln_dir_pre = os.path.join(output_dir, hmm_n_pre)
|
|
|
|
# re-estimation
|
|
fh.make_new_directory(modeln_dir)
|
|
pyhtk.re_estimation(
|
|
config_train,
|
|
os.path.join(modeln_dir_pre, 'proto39'),
|
|
os.path.join(modeln_dir_pre, hmmdefs_name),
|
|
modeln_dir,
|
|
hcompv_scp, phonelist_txt,
|
|
mlf_file=mlf_file)
|
|
print("elapsed time: {}".format(time.time() - timer_start)) |