Compare commits

..

18 Commits

Author SHA1 Message Date
97486e5599 dataset for experiments in check_novoapi is updated. 2019-04-22 02:03:50 +02:00
2004399179 novoapi_functions.py is adjusted to use convert_phoneset.py. 2019-04-22 00:59:53 +02:00
b444b70af9 fame_phonetics.py and functions to make quests.hed to tie triphone are added. 2019-03-25 00:06:53 +01:00
bf586fcde5 triphone training is added. 2019-03-23 21:52:48 +01:00
fdd165ce6a re-aligned mlf file include less files than original mlf file. Therefore the scp file should also be updated accordingly, when re-estimation is performed. this bug is fixed. 2019-03-08 23:13:08 +01:00
fa81b70b27 monophone training is completed. 2019-03-07 22:16:50 +01:00
41d4fa5ff9 sp is added to the model. 2019-03-05 00:11:38 +01:00
b1b1942fa0 test on stimmen data is added. 2019-03-03 02:05:37 +01:00
c185072d5b label alignment using HVite is added. 2019-02-14 00:21:28 +01:00
8f89f60538 dataset is made. 2019-02-08 14:10:32 +01:00
f6e563ecd3 moved testing parts in htk_vs_kaldi into stimmen_test.py 2019-02-06 09:35:23 +01:00
da0242b0e1 make sure all the phones in stimmen transcription can be treated correctly. 2019-02-06 00:00:14 +01:00
ab3887c6ca sp is added to the model. 2019-02-04 20:32:12 +01:00
f6e7c8eefa bug related encoding on label file is fixed. 2019-02-04 13:46:27 +01:00
322a8a0079 label files are extracted. hcompv_scp is made. 2019-02-03 13:54:37 +01:00
22cccfb61d fix the bug there are characters in the lexicon which cannot be described in ascii. 2019-02-03 00:34:35 +01:00
dc6b7b84b6 lexicon is made. 2019-01-29 21:52:11 +01:00
8cda93de75 fame_asr phoneset is added including reduced version and htk compatible version. 2019-01-28 12:34:20 +01:00
21 changed files with 2158 additions and 1033 deletions

Binary file not shown.

View File

@ -4,8 +4,7 @@
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>4d8c8573-32f0-4a62-9e62-3ce5cc680390</ProjectGuid>
<ProjectHome>.</ProjectHome>
<StartupFile>
</StartupFile>
<StartupFile>check_novoapi.py</StartupFile>
<SearchPath>
</SearchPath>
<WorkingDirectory>.</WorkingDirectory>
@ -23,7 +22,7 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="check_novoapi.py" />
<Compile Include="convert_phone_set.py">
<Compile Include="convert_phoneset.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="convert_xsampa2ipa.py">
@ -32,7 +31,7 @@
<Compile Include="defaultfiles.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="fame_phoneset.py">
<Compile Include="fame_test.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="fa_test.py">
@ -50,9 +49,25 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="fame_hmm.py" />
<Compile Include="phoneset\fame_asr.py" />
<Compile Include="phoneset\fame_ipa.py" />
<Compile Include="phoneset\fame_phonetics.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="stimmen_functions.py" />
<Compile Include="stimmen_test.py" />
</ItemGroup>
<ItemGroup>
<Content Include="config.ini" />
<Content Include="phoneset\fame_ipa2asr.npy" />
<Content Include="phoneset\output_get_translation_key_phone_unknown.npy" />
<Content Include="phoneset\output_get_translation_key_translation_key.npy" />
<Content Include="phoneset\__pycache__\fame_asr.cpython-36.pyc" />
<Content Include="phoneset\__pycache__\fame_ipa.cpython-36.pyc" />
</ItemGroup>
<ItemGroup>
<Folder Include="phoneset\" />
<Folder Include="phoneset\__pycache__\" />
</ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
<!-- Uncomment the CoreCompile target to enable the Build command in

View File

@ -20,57 +20,56 @@ from forced_alignment import convert_phone_set
#import acoustic_model_functions as am_func
import convert_xsampa2ipa
import novoapi_functions
import stimmen_functions
sys.path.append(default.accent_classification_dir)
import output_confusion_matrix
## procedure
forced_alignment_novo70 = True
balance_sample_numbers = False
## ===== load novo phoneset =====
phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = novoapi_functions.load_phonset()
phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = novoapi_functions.load_novo70_phoneset()
## ===== extract pronunciations written in novo70 only (not_in_novo70) =====
# As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr
# [ɪː] ih / ihr
# [iː] iy
# [œː] uh
# [ɛː] eh
# [w] wv in IPA written as ʋ.
david_suggestion = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'w']
## read pronunciation variants.
stimmen_transcription_ = pd.ExcelFile(default.stimmen_transcription_xlsx)
df = pd.read_excel(stimmen_transcription_, 'frequency')
transcription_ipa = list(df['IPA'])
#stimmen_transcription_ = pd.ExcelFile(default.stimmen_transcription_xlsx)
#df = pd.read_excel(stimmen_transcription_, 'frequency')
#transcription_ipa = list(df['IPA'])
# transcription mistake?
transcription_ipa = [ipa.replace(';', 'ː') for ipa in transcription_ipa if not ipa=='pypɪl' and not pd.isnull(ipa)]
transcription_ipa = [ipa.replace('ˑ', '') for ipa in transcription_ipa] # only one case.
not_in_novo70 = []
all_in_novo70 = []
for ipa in transcription_ipa:
ipa = ipa.replace(':', 'ː')
ipa = convert_phone_set.split_ipa(ipa)
stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
df = stimmen_functions.load_transcriptions_novo70(stimmen_test_dir)
# list of phones not in novo70 phoneset.
not_in_novo70_ = [phone for phone in ipa
if not phone in phoneset_ipa and not phone in david_suggestion]
not_in_novo70_ = [phone.replace('sp', '') for phone in not_in_novo70_]
not_in_novo70_ = [phone.replace(':', '') for phone in not_in_novo70_]
not_in_novo70_ = [phone.replace('ː', '') for phone in not_in_novo70_]
if len(not_in_novo70_) == 0:
all_in_novo70.append(''.join(ipa))
## transcription mistake?
#transcription_ipa = [ipa.replace(';', 'ː') for ipa in transcription_ipa if not ipa=='pypɪl' and not pd.isnull(ipa)]
#transcription_ipa = [ipa.replace('ˑ', '') for ipa in transcription_ipa] # only one case.
#translation_key.get(phone, phone)
not_in_novo70.extend(not_in_novo70_)
not_in_novo70_list = list(set(not_in_novo70))
#not_in_novo70 = []
#all_in_novo70 = []
#for ipa in transcription_ipa:
# ipa = ipa.replace(':', 'ː')
# ipa = convert_phone_set.split_ipa(ipa)
# # list of phones not in novo70 phoneset.
# not_in_novo70_ = [phone for phone in ipa
# if not phone in phoneset_ipa and not phone in david_suggestion]
# not_in_novo70_ = [phone.replace('sp', '') for phone in not_in_novo70_]
# not_in_novo70_ = [phone.replace(':', '') for phone in not_in_novo70_]
# not_in_novo70_ = [phone.replace('ː', '') for phone in not_in_novo70_]
# if len(not_in_novo70_) == 0:
# all_in_novo70.append(''.join(ipa))
# #translation_key.get(phone, phone)
# not_in_novo70.extend(not_in_novo70_)
#not_in_novo70_list = list(set(not_in_novo70))
## check which phones used in stimmen but not in novo70
@ -85,70 +84,43 @@ not_in_novo70_list = list(set(not_in_novo70))
# [ʊ] 'ʊ'(1) --> can be ʏ (uh)??
# [χ] --> can be x??
def search_phone_ipa(x, phone_list):
x_in_item = []
for ipa in phone_list:
ipa_original = ipa
ipa = ipa.replace(':', 'ː')
ipa = convert_phone_set.split_ipa(ipa)
if x in ipa and not x+':' in ipa:
x_in_item.append(ipa_original)
return x_in_item
#def search_phone_ipa(x, phone_list):
# x_in_item = []
# for ipa in phone_list:
# ipa_original = ipa
# ipa = ipa.replace(':', 'ː')
# ipa = convert_phone_set.split_ipa(ipa)
# if x in ipa and not x+':' in ipa:
# x_in_item.append(ipa_original)
# return x_in_item
#search_phone_ipa('ø', transcription_ipa)
## ===== load all transcriptions (df) =====
df = pd.read_excel(stimmen_transcription_, 'original')
# mapping from ipa to xsampa
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
else:
ipas.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas)})
#df = stimmen_functions.load_transcriptions()
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
## check frequency of each pronunciation variants
cols = ['word', 'ipa', 'frequency']
df_samples = pd.DataFrame(index=[], columns=cols)
for ipa in all_in_novo70:
ipa = ipa.replace('ː', ':')
samples = df[df['ipa'] == ipa]
word = list(set(samples['word']))[0]
samples_Series = pd.Series([word, ipa, len(samples)], index=df_samples.columns)
df_samples = df_samples.append(samples_Series, ignore_index=True)
#cols = ['word', 'ipa', 'frequency']
#df_samples = pd.DataFrame(index=[], columns=cols)
#for ipa in all_in_novo70:
# ipa = ipa.replace('ː', ':')
# samples = df[df['ipa'] == ipa]
# word = list(set(samples['word']))[0]
# samples_Series = pd.Series([word, ipa, len(samples)], index=df_samples.columns)
# df_samples = df_samples.append(samples_Series, ignore_index=True)
# each word
df_per_word = pd.DataFrame(index=[], columns=df_samples.keys())
#df_per_word = pd.DataFrame(index=[], columns=df_samples.keys())
for word in word_list:
df_samples_ = df_samples[df_samples['word']==word]
df_samples_ = df_samples_[df_samples_['frequency']>2]
df_per_word = df_per_word.append(df_samples_, ignore_index=True)
#for word in word_list:
word = word_list[2]
df_ = df[df['word']==word]
np.unique(list(df_['ipa']))
#df_samples_ = df_samples_[df_samples_['frequency']>2]
#df_per_word = df_per_word.append(df_samples_, ignore_index=True)
#df_per_word.to_excel(os.path.join(default.stimmen_dir, 'pronunciation_variants_novo70.xlsx'), encoding="utf-8")
@ -183,21 +155,6 @@ if forced_alignment_novo70:
# samples in which all pronunciations are written in novo70.
samples = df_.query("ipa in @pronunciation_ipa")
## ===== balance sample numbers =====
if balance_sample_numbers:
c = Counter(samples['ipa'])
sample_num_list = [c[key] for key in c.keys()]
sample_num = np.min(sample_num_list)
samples_balanced = pd.DataFrame(index=[], columns=list(samples.keys()))
for key in c.keys():
samples_ = samples[samples['ipa'] == key]
samples_balanced = samples_balanced.append(samples_.sample(sample_num), ignore_index = True)
samples = samples_balanced
results = pd.DataFrame(index=[],
columns=['filename', 'word', 'xsampa', 'ipa', 'result_ipa', 'result_novo70', 'llh'])

View File

@ -1,29 +0,0 @@
"""Module to convert phonemes."""
def multi_character_tokenize(line, multi_character_tokens):
"""Tries to match one of the tokens in multi_character_tokens at each position of line, starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_word(word, multi_character_phones):
"""
split a line by given phoneset.
Args:
word (str): a word written in given phoneset.
multi_character_phones (list): the list of multicharacter phones which is considered as one phone. this can be obtained with phoneset definition such as fame_phoneset.py.
Returns:
(word_seperated) (list): the word splitted in given phoneset.
"""
return [phone for phone in multi_character_tokenize(word.strip(), multi_character_phones)]

View File

@ -0,0 +1,58 @@
"""Module to convert phonemes."""
def multi_character_tokenize(line, multi_character_tokens):
"""Tries to match one of the tokens in multi_character_tokens at each position of line, starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_word(word, phoneset):
"""
split a line by given phoneset.
Args:
word (str): a word written in given phoneset.
#multi_character_phones (list): the list of multicharacter phones which is considered as one phone. this can be obtained with phoneset definition such as fame_ipa.py.
phoneset (list): the list of phones.
Returns:
(word_seperated) (list): the word splitted in given phoneset.
"""
multi_character_phones = extract_multi_character_phones(phoneset)
return [phone
for phone in multi_character_tokenize(word.strip(), multi_character_phones)
]
def convert_phoneset(word_list, translation_key):
"""
Args:
word_list (str): a list of phones written in given phoneset.
translation_key (dict):
"""
return [translation_key.get(phone, phone) for phone in word_list]
def phone_reduction(phones, reduction_key):
multi_character_tokenize(wo.strip(), multi_character_phones)
return [reduction_key.get(i, i) for i in phones
if not i in phones_to_be_removed]
def extract_multi_character_phones(phoneset):
"""
Args:
phoneset (list):
"""
multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones.sort(key=len, reverse=True)
return multi_character_phones

View File

@ -1,65 +1,42 @@
import os
# add path of the parent directory
#os.path.dirname(os.path.realpath(__file__))
#default_hvite_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', 'htk', 'config.HVite')
#cygwin_dir = r'C:\cygwin64\home\Aki\acoustic_model'
#htk_dir = r'C:\Aki\htk_fame'
htk_dir = r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk'
config_hcopy = os.path.join(htk_dir, 'config', 'config.HCopy')
#config_train = os.path.join(cygwin_dir, 'config', 'config.train')
#config_hvite = os.path.join(cygwin_dir, 'config', 'config.HVite')
#mkhmmdefs_pl = os.path.join(cygwin_dir, 'src', 'acoustic_model', 'mkhmmdefs.pl')
#dbLexicon = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\lexicon.accdb
#scriptBarbara = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\pronvars_barbara.perl
#exeG2P = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\string2phon.exe
#[pyHTK]
#configHVite = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\config.HVite
#filePhoneList = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\phonelist_barbara.txt
#AcousticModel = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\hmmdefs_16-2_barbara.compo
#dbLexicon = config['cLexicon']['dbLexicon']
#scriptBarbara = config['cLexicon']['scriptBarbara']
#exeG2P = config['cLexicon']['exeG2P']
#configHVite = config['pyHTK']['configHVite']
#filePhoneList = config['pyHTK']['filePhoneList']
#AcousticModel = config['pyHTK']['AcousticModel']
# repos
repo_dir = r'C:\Users\Aki\source\repos'
ipa_xsampa_converter_dir = os.path.join(repo_dir, 'ipa-xsama-converter')
forced_alignment_module_dir = os.path.join(repo_dir, 'forced_alignment')
accent_classification_dir = os.path.join(repo_dir, 'accent_classification', 'accent_classification')
toolbox_dir = os.path.join(repo_dir, 'toolbox')
#htk_config_dir = r'c:\Users\A.Kunikoshi\source\repos\forced_alignment\forced_alignment\data\htk\preset_models\aki_dutch_2017'
#config_hvite = os.path.join(htk_config_dir, 'config.HVite')
#acoustic_model = os.path.join(htk_config_dir, 'hmmdefs.compo')
#acoustic_model = r'c:\cygwin64\home\A.Kunikoshi\acoustic_model\model\barbara\hmm128-2\hmmdefs.compo'
#phonelist_txt = os.path.join(htk_config_dir, 'phonelist.txt')
WSL_dir = r'C:\OneDrive\WSL'
#fame_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', 'fame')
fame_dir = r'd:\_corpus\fame'
novo_api_dir = os.path.join(WSL_dir, 'python-novo-api', 'novoapi')
#novo_api_dir = r'c:\Python36-32\Lib\site-packages\novoapi'
fame_s5_dir = os.path.join(fame_dir, 's5')
fame_corpus_dir = os.path.join(fame_dir, 'corpus')
experiments_dir = r'c:\OneDrive\Research\rug\experiments'
# working directories
rug_dir = r'c:\OneDrive\Research\rug'
experiments_dir = os.path.join(rug_dir, 'experiments')
htk_dir = os.path.join(experiments_dir, 'acoustic_model', 'fame', 'htk')
kaldi_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', '_stimmen')
stimmen_dir = os.path.join(experiments_dir, 'stimmen')
stimmen_data_dir = os.path.join(stimmen_dir, 'data')
# data
fame_dir = os.path.join(rug_dir, '_data', 'FAME')
#fame_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', 'fame')
# 44.1 kHz
#stimmen_wav_dir = os.path.join(stimmen_dir, 'wav')
# 16 kHz
stimmen_wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen'
stimmen_result_novoapi_dir = os.path.join(stimmen_dir, 'result', 'novoapi')
stimmen_transcription_xlsx = os.path.join(stimmen_data_dir, 'Frisian Variants Picture Task Stimmen.xlsx')
stimmen_transcription_xlsx = os.path.join(stimmen_dir, 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
phonelist_friesian_txt = os.path.join(experiments_dir, 'friesian', 'acoustic_model', 'config', 'phonelist_friesian.txt')
novo70_phoneset = os.path.join(novo_api_dir, 'asr', 'phoneset', 'nl', 'novo70.phoneset')
#phonelist_txt = os.path.join(htk_dir, 'config', 'phonelist.txt')
#fame_s5_dir = os.path.join(fame_dir, 's5')
#fame_corpus_dir = os.path.join(fame_dir, 'corpus')
#stimmen_result_novoapi_dir = os.path.join(stimmen_dir, 'result', 'novoapi')
# novoapi_functions
novo_api_dir = os.path.join(WSL_dir, 'python-novo-api', 'novoapi')
#novo_api_dir = r'c:\Python36-32\Lib\site-packages\novoapi'
novo70_phoneset = os.path.join(novo_api_dir, 'asr', 'phoneset', 'nl', 'novo70.phoneset')

View File

@ -9,37 +9,11 @@ import numpy as np
import pandas as pd
import defaultfiles as default
import fame_phoneset
import convert_phone_set
import convert_phoneset
from phoneset import fame_ipa, fame_asr
#def ipa2famehtk_lexicon(lexicon_file_in, lexicon_file_out):
# """ Convert a lexicon file from IPA to HTK format for FAME! corpus. """
# lexicon_in = pd.read_table(lexicon_file_in, names=['word', 'pronunciation'])
# with open(lexicon_file_out, "w", encoding="utf-8") as fout:
# for word, pronunciation in zip(lexicon_in['word'], lexicon_in['pronunciation']):
# pronunciation_no_space = pronunciation.replace(' ', '')
# pronunciation_famehtk = convert_phone_set.ipa2famehtk(pronunciation_no_space)
# if 'ceh' not in pronunciation_famehtk and 'sh' not in pronunciation_famehtk:
# fout.write("{0}\t{1}\n".format(word.upper(), pronunciation_famehtk))
#def combine_lexicon(lexicon_file1, lexicon_file2, lexicon_out):
# """ Combine two lexicon files and sort by words. """
# with open(lexicon_file1, "rt", encoding="utf-8") as fin:
# lines1 = fin.read()
# lines1 = lines1.split('\n')
# with open(lexicon_file2, "rt", encoding="utf-8") as fin:
# lines2 = fin.read()
# lines2 = lines2.split('\n')
# lex1 = pd.read_table(lexicon_file1, names=['word', 'pronunciation'])
# lex2 = pd.read_table(lexicon_file2, names=['word', 'pronunciation'])
# lex = pd.concat([lex1, lex2])
# lex = lex.sort_values(by='word', ascending=True)
# lex.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep='\t')
sys.path.append(default.toolbox_dir)
from htk import pyhtk
#def read_fileFA(fileFA):
@ -110,14 +84,6 @@ import convert_phone_set
# return ipa
#def make_filelist(input_dir, output_txt):
# """ Make a list of files in the input_dir. """
# filenames = os.listdir(input_dir)
# with open(output_txt, 'w') as fout:
# for filename in filenames:
# fout.write(input_dir + '\\' + filename + '\n')
#def make_htk_dict(word, pronvar_, fileDic, output_type):
# """
@ -179,10 +145,11 @@ def make_hcopy_scp_from_filelist_in_fame(fame_dir, dataset, feature_dir, hcopy_s
fout.write(wav_file + '\t' + mfc_file + '\n')
return
def load_lexicon(lexicon_file):
""" load lexicon file as Data Frame.
""" load lexicon file as data frame.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
@ -196,25 +163,27 @@ def load_lexicon(lexicon_file):
return lex
def get_phoneset_from_lexicon(lexicon_file, phoneset='asr'):
def get_phoneset_from_lexicon(lexicon_file, phoneset_name='asr'):
""" Make a list of phones which appears in the lexicon.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phoneset (str): the phoneset with which lexicon_file is written. 'asr'(default) or 'ipa'.
phoneset_name (str): the name of phoneset with which lexicon_file is written. 'asr'(default) or 'ipa'.
Returns:
(list_of_phones) (set): the set of phones included in the lexicon_file.
"""
assert phoneset in ['asr', 'ipa'], 'phoneset should be \'asr\' or \'ipa\''
assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file)
if phoneset == 'asr':
if phoneset_name == 'asr':
return set(' '.join(lex['pronunciation']).split(' '))
elif phoneset == 'ipa':
elif phoneset_name == 'ipa':
join_pronunciations = ''.join(lex['pronunciation'])
return set(convert_phone_set.split_word(join_pronunciations, fame_phoneset.multi_character_phones_ipa))
return set(convert_phone_set.split_word(join_pronunciations, fame_ipa.multi_character_phones))
return
def extract_unknown_phones(ipa, known_phones):
@ -228,7 +197,7 @@ def extract_unknown_phones(ipa, known_phones):
(list_of_phones) (list): unknown phones not included in 'known_phones'.
"""
ipa_split = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa)
ipa_split = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
return [i for i in ipa_split if not i in known_phones]
@ -247,14 +216,14 @@ def get_translation_key(lexicon_file_ipa, lexicon_file_asr):
"""
lex_ipa = load_lexicon(lexicon_file_ipa)
lex_asr = load_lexicon(lexicon_file_asr)
phone_unknown = fame_phoneset.phoneset_ipa[:]
phone_unknown = fame_ipa.phoneset[:]
translation_key = dict()
for word in lex_ipa['word']:
if np.sum(lex_ipa['word'] == word) == 1 and np.sum(lex_asr['word'] == word) == 1:
ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
ipa_list = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa)
ipa_list = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
asr_list = asr.split(' ')
# if there are phones which is not in phone_unknown
@ -268,13 +237,13 @@ def get_translation_key(lexicon_file_ipa, lexicon_file_asr):
return translation_key, list(phone_unknown)
def find_phone(lexicon_file, phone, phoneset='ipa'):
def find_phone(lexicon_file, phone, phoneset_name='ipa'):
""" extract rows where the phone is used in the lexicon_file.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phone (str): the phone to be searched.
phoneset (str): the phoneset with which lexicon_file is written. 'asr' or 'ipa'(default).
phoneset_name (str): the name of phoneset_name with which lexicon_file is written. 'asr' or 'ipa'(default).
Returns:
extracted (df): rows where the phone is used.
@ -283,7 +252,7 @@ def find_phone(lexicon_file, phone, phoneset='ipa'):
* develop when the phonset == 'asr'.
"""
assert phoneset in ['asr', 'ipa'], 'phoneset should be \'asr\' or \'ipa\''
assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file)
@ -292,9 +261,146 @@ def find_phone(lexicon_file, phone, phoneset='ipa'):
extracted = pd.DataFrame(index=[], columns=['word', 'pronunciation'])
for index, row in lex_.iterrows():
if phoneset == 'ipa':
pronunciation = convert_phone_set.split_word(row['pronunciation'], fame_phoneset.multi_character_phones_ipa)
if phoneset_name == 'ipa':
pronunciation = convert_phone_set.split_word(row['pronunciation'], fame_ipa.multi_character_phones)
if phone in pronunciation:
extracted_ = pd.Series([row['word'], pronunciation], index=extracted.columns)
extracted = extracted.append(extracted_, ignore_index=True)
return extracted
return extracted
def asr2htk_space_delimited(pronunciation):
"""convert phoneset from asr to htk.
Args:
pronunciation (str): space delimited asr phones.
Returns:
(pronunciation) (str): space delimited asr phones in htk format (ascii).
"""
pronunciation_short = [fame_asr.reduction_key.get(i, i) for i in pronunciation.split(' ')
if not i in fame_asr.phones_to_be_removed]
return ' '.join(convert_phoneset.convert_phoneset(
pronunciation_short, fame_asr.translation_key_asr2htk))
def lexicon_asr2htk(lexicon_file_asr, lexicon_file_htk):
""" Convert a lexicon file from asr to htk format (ascii).
Args:
lexicon_file_asr (path): a lexicon file written in asr format e.g. fame/lex.asr.
lexicon_file_htk (path): a lexicon file written in htk format (ascii).
"""
lex_asr = load_lexicon(lexicon_file_asr)
def word2htk_(row):
return word2htk(row['word'])
def asr2htk_space_delimited_(row):
return asr2htk_space_delimited(row['pronunciation'])
lex_htk = pd.DataFrame({
'word': lex_asr.apply(word2htk_, axis=1).str.upper(),
'pronunciation': lex_asr.apply(asr2htk_space_delimited_, axis=1)
})
lex_htk = lex_htk.ix[:, ['word', 'pronunciation']]
lex_htk.to_csv(lexicon_file_htk, header=None, index=None, sep='\t', encoding='utf-8')
return
def combine_lexicon(lexicon_file1, lexicon_file2, lexicon_out):
""" Combine two lexicon files and sort by words.
Args:
lexicon_file1, lexicon_file2 (path): input lexicon files.
Returns:
lexicon_file_out (path): lexicon_file which lexcion_file1 and 2 are combined and sorted.
"""
lex1 = load_lexicon(lexicon_file1)
lex2 = load_lexicon(lexicon_file2)
lex = pd.concat([lex1, lex2])
lex = lex.sort_values(by='word', ascending=True)
lex.to_csv(lexicon_out, index=False, header=False, sep='\t', encoding='utf-8')
def fix_lexicon(lexicon_file):
""" fix lexicon
- add '\' before all single quote at the beginning of words.
- convert special characters to ascii compatible characters.
- add silence.
Args:
lexicon_file (path): lexicon file, which will be overwitten.
"""
lex = load_lexicon(lexicon_file)
lex = lex.dropna() # remove N/A.
# add 'sil'
row = pd.Series(['SILENCE', 'sil'], index=lex.columns)
lex = lex.append(row, ignore_index=True)
lex = lex.sort_values(by='word', ascending=True)
for i in lex[lex['word'].str.startswith('\'')].index.values:
lex.iat[i, 0] = lex.iat[i, 0].replace('\'', '\\\'')
# to_csv does not work with space seperator. therefore all tabs should manually be replaced.
#lex.to_csv(lexicon_file, index=False, header=False, encoding="utf-8", sep=' ', quoting=csv.QUOTE_NONE, escapechar='\\')
lex.to_csv(lexicon_file, index=False, header=False, sep='\t', encoding='utf-8')
return
def word2htk(word):
return ''.join([fame_asr.translation_key_word2htk.get(i, i) for i in word])
def ipa2asr(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#ipa_ = fame_asr.phone_reduction(ipa)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
return ''.join(asr_splitted)
def ipa2htk(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#translation_key_ipa2asr = np.load(r'c:\Users\Aki\source\repos\acoustic_model\acoustic_model\phoneset\fame_ipa2asr.npy').item(0)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
htk_splitted = convert_phoneset.convert_phoneset(asr_splitted, fame_asr.translation_key_asr2htk)
return ''.join(htk_splitted)
def performance_on_stimmen(config_dir, stimmen_dir, hmmdefs):
lattice_file = os.path.join(stimmen_dir, 'word_lattice.ltc')
hvite_scp = os.path.join(stimmen_dir, 'hvite.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hvite_scp, file_type='mfc')
hresult_scp = os.path.join(stimmen_dir, 'hresult.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hresult_scp, file_type='rec')
lexicon_file = os.path.join(stimmen_dir, 'lexicon_recognition.dic')
# get feature_size from hmmdefs.
with open(hmmdefs) as f:
line = f.readline()
line = f.readline().strip()
feature_size = int(line.split(' ')[2])
chtk = pyhtk.HTK(config_dir, fame_asr.phoneset_htk, lexicon_file, feature_size)
result = chtk.recognition(
lattice_file,
hmmdefs,
hvite_scp
)
per_sentence, per_word = chtk.calc_recognition_performance(hresult_scp)
return per_sentence['accuracy']

View File

@ -3,376 +3,564 @@ import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import tempfile
#import configparser
#import subprocess
#from collections import Counter
import shutil
import glob
import time
#import numpy as np
#import pandas as pd
import numpy as np
import pandas as pd
import fame_functions
from phoneset import fame_ipa, fame_asr, fame_phonetics
import defaultfiles as default
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
#from scripts import run_command
## ======================= user define =======================
#repo_dir = 'C:\\Users\\Aki\\source\\repos\\acoustic_model'
#curr_dir = repo_dir + '\\acoustic_model'
#config_ini = curr_dir + '\\config.ini'
#output_dir = 'C:\\OneDrive\\Research\\rug\\experiments\\friesian\\acoustic_model'
#forced_alignment_module = 'C:\\Users\\Aki\\source\\repos\\forced_alignment'
dataset_list = ['devel', 'test', 'train']
# procedure
combine_all = 1
make_lexicon = 0
make_label = 0 # it takes roughly 4800 sec on Surface pro 2.
make_mlf = 0
extract_features = 0
conv_lexicon = 1
#check_lexicon = 0
#make_mlf = 0
#combine_files = 0
#flat_start = 0
#train_model = 1
flat_start = 1
train_monophone_without_sp = 1
add_sp = 1
train_monophone_with_re_aligned_mlf = 1
increase_mixture = 1
train_triphone = 0
train_triphone_tied = 0
#sys.path.append(os.path.join(os.path.dirname(sys.path[0]), curr_dir))
#sys.path.append(forced_alignment_module)
#from forced_alignment import convert_phone_set
# pre-defined values.
dataset_list = ['devel', 'test', 'train']
feature_size = 30
improvement_threshold = 0.3
lexicon_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
lexicon_oov = os.path.join(default.fame_dir, 'lexicon', 'lex.oov')
config_dir = os.path.join(default.htk_dir, 'config')
phonelist_full_txt = os.path.join(config_dir, 'phonelist_full.txt')
tree_hed = os.path.join(config_dir, 'tree.hed')
quests_hed = os.path.join(config_dir, 'quests.hed')
## ======================= load variables =======================
model_dir = os.path.join(default.htk_dir, 'model')
model_mono0_dir = os.path.join(model_dir, 'mono0')
model_mono1_dir = os.path.join(model_dir, 'mono1')
model_mono1sp_dir = os.path.join(model_dir, 'mono1sp')
model_mono1sp2_dir = os.path.join(model_dir, 'mono1sp2')
model_tri1_dir = os.path.join(model_dir, 'tri1')
model_tri1tied_dir = os.path.join(model_dir, 'tri1tied')
#config = configparser.ConfigParser()
#config.sections()
#config.read(config_ini)
#config_hcopy = config['Settings']['config_hcopy']
#config_train = config['Settings']['config_train']
#mkhmmdefs_pl = config['Settings']['mkhmmdefs_pl']
#FAME_dir = config['Settings']['FAME_dir']
#lex_asr = FAME_dir + '\\lexicon\\lex.asr'
#lex_asr_htk = FAME_dir + '\\lexicon\\lex.asr_htk'
#lex_oov = FAME_dir + '\\lexicon\\lex.oov'
#lex_oov_htk = FAME_dir + '\\lexicon\\lex.oov_htk'
##lex_ipa = FAME_dir + '\\lexicon\\lex.ipa'
##lex_ipa_ = FAME_dir + '\\lexicon\\lex.ipa_'
##lex_ipa_htk = FAME_dir + '\\lexicon\\lex.ipa_htk'
#lex_htk = FAME_dir + '\\lexicon\\lex_original.htk'
#lex_htk_ = FAME_dir + '\\lexicon\\lex.htk'
#hcompv_scp = output_dir + '\\scp\\combined.scp'
#combined_mlf = output_dir + '\\label\\combined.mlf'
#model_dir = output_dir + '\\model'
#model0_dir = model_dir + '\\hmm0'
#proto_init = model_dir + '\\proto38'
#proto_name = 'proto'
#phonelist = output_dir + '\\config\\phonelist_friesian.txt'
#hmmdefs_name = 'hmmdefs'
# directories / files to be made.
lexicon_dir = os.path.join(default.htk_dir, 'lexicon')
lexicon_htk_asr = os.path.join(lexicon_dir, 'lex.htk_asr')
lexicon_htk_oov = os.path.join(lexicon_dir, 'lex.htk_oov')
lexicon_htk = os.path.join(lexicon_dir, 'lex.htk')
lexicon_htk_with_sp = os.path.join(lexicon_dir, 'lex_with_sp.htk')
lexicon_htk_triphone = os.path.join(lexicon_dir, 'lex_triphone.htk')
feature_dir = os.path.join(default.htk_dir, 'mfc')
if not os.path.exists(feature_dir):
os.makedirs(feature_dir)
fh.make_new_directory(feature_dir, existing_dir='leave')
tmp_dir = os.path.join(default.htk_dir, 'tmp')
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
fh.make_new_directory(tmp_dir, existing_dir='leave')
label_dir = os.path.join(default.htk_dir, 'label')
fh.make_new_directory(label_dir, existing_dir='leave')
## training
if combine_all:
hcompv_scp_train = os.path.join(tmp_dir, 'all.scp')
mlf_file_train = os.path.join(label_dir, 'all_phone.mlf')
mlf_file_train_word = os.path.join(label_dir, 'all_word.mlf')
mlf_file_train_with_sp = os.path.join(label_dir, 'all_phone_with_sp.mlf')
mlf_file_train_aligned = os.path.join(label_dir, 'all_phone_aligned.mlf')
triphone_mlf = os.path.join(label_dir, 'all_triphone.mlf')
else:
hcompv_scp_train = os.path.join(tmp_dir, 'train.scp')
mlf_file_train = os.path.join(label_dir, 'train_phone.mlf')
mlf_file_train_word = os.path.join(label_dir, 'train_word.mlf')
mlf_file_train_with_sp = os.path.join(label_dir, 'train_phone_with_sp.mlf')
mlf_file_train_aligned = os.path.join(label_dir, 'train_phone_aligned.mlf')
triphone_mlf = os.path.join(label_dir, 'train_triphone.mlf')
hcompv_scp_train_updated = hcompv_scp_train.replace('.scp', '_updated.scp')
## testing
htk_stimmen_dir = os.path.join(default.htk_dir, 'stimmen')
## ======================= make lexicon for HTK =======================
if make_lexicon:
timer_start = time.time()
print('==== making lexicon for HTK ====')
# convert each lexicon from fame_asr phoneset to fame_htk phoneset.
print('>>> converting each lexicon from fame_asr phoneset to fame_htk phoneset...')
fame_functions.lexicon_asr2htk(lexicon_asr, lexicon_htk_asr)
fame_functions.lexicon_asr2htk(lexicon_oov, lexicon_htk_oov)
# combine lexicon
print('>>> combining lexicon files into one lexicon...')
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
# therefore there is no overlap between lex_asr and lex_oov.
fame_functions.combine_lexicon(lexicon_htk_asr, lexicon_htk_oov, lexicon_htk)
## fixing the lexicon for HTK.
# (1) Replace all tabs with single space;
# (2) Put a '\' before any dictionary entry beginning with single quote
# http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
print('>>> fixing the lexicon...')
fame_functions.fix_lexicon(lexicon_htk)
## adding sp to the lexicon for HTK.
print('>>> adding sp to the lexicon...')
with open(lexicon_htk) as f:
lines = f.read().split('\n')
with open(lexicon_htk_with_sp, 'wb') as f:
f.write(bytes(' sp\n'.join(lines), 'ascii'))
print("elapsed time: {}".format(time.time() - timer_start))
## intialize the instance for HTK.
chtk = pyhtk.HTK(config_dir, fame_asr.phoneset_htk, lexicon_htk_with_sp, feature_size)
## ======================= make label files =======================
if make_label:
for dataset in dataset_list:
timer_start = time.time()
print("==== making label files on dataset {}".format(dataset))
script_list = os.path.join(default.fame_dir, 'data', dataset, 'text')
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
label_dir_ = os.path.join(label_dir, dataset)
dictionary_file = os.path.join(label_dir_, 'temp.dic')
fh.make_new_directory(label_dir_, existing_dir='leave')
# list of scripts
with open(script_list, "rt", encoding="utf-8") as fin:
scripts = fin.read().split('\n')
for line in scripts:
# sample line:
# sp0457m_test_1968_plakkenfryslanterhorne_2168 en dan begjinne je natuerlik
filename_ = line.split(' ')[0]
filename = '_'.join(filename_.split('_')[1:])
sentence = ' '.join(line.split(' ')[1:])
sentence_htk = fame_functions.word2htk(sentence)
wav_file = os.path.join(wav_dir_, filename + '.wav')
if os.path.exists(wav_file) and chtk.can_be_ascii(sentence_htk) == 0:
if chtk.get_number_of_missing_words(
sentence_htk, dictionary_file) == 0:
# when the file name is too long, HDMan command does not work.
# therefore first temporary dictionary_file is made, then renamed.
shutil.move(dictionary_file, os.path.join(label_dir_, filename + '.dic'))
label_file = os.path.join(label_dir_, filename + '.lab')
chtk.make_label_file(sentence_htk, label_file)
else:
os.remove(dictionary_file)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= make master label files =======================
if make_mlf:
timer_start = time.time()
print("==== making master label files ====")
# train_2002_gongfansaken_10347.lab is empty. should be removed.
empty_lab_file = os.path.join(label_dir, 'train', 'train_2002_gongfansaken_10347.lab')
empty_dic_file = empty_lab_file.replace('.lab', '.dic')
if os.path.exists(empty_lab_file):
os.remove(empty_lab_file)
if os.path.exists(empty_dic_file):
os.remove(empty_dic_file)
for dataset in dataset_list:
feature_dir_ = os.path.join(feature_dir, dataset)
label_dir_ = os.path.join(label_dir, dataset)
mlf_word = os.path.join(label_dir, dataset + '_word.mlf')
mlf_phone = os.path.join(label_dir, dataset + '_phone.mlf')
mlf_phone_with_sp = os.path.join(label_dir, dataset + '_phone_with_sp.mlf')
print(">>> generating a word level mlf file for {}...".format(dataset))
chtk.label2mlf(label_dir_, mlf_word)
print(">>> generating a phone level mlf file for {}...".format(dataset))
chtk.mlf_word2phone(mlf_phone, mlf_word, with_sp=False)
chtk.mlf_word2phone(mlf_phone_with_sp, mlf_word, with_sp=True)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= extract features =======================
if extract_features:
for dataset in dataset_list:
print('==== {} ===='.format(dataset))
timer_start = time.time()
print('==== extract features on dataset {} ===='.format(dataset))
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
label_dir_ = os.path.join(label_dir, dataset)
feature_dir_ = os.path.join(feature_dir, dataset)
fh.make_new_directory(feature_dir_, existing_dir='delete')
# a script file for HCopy
print(">>> making a script file for HCopy... \n")
print(">>> making a script file for HCopy...")
hcopy_scp = tempfile.NamedTemporaryFile(mode='w', delete=False)
hcopy_scp.close()
# get a list of features (hcopy.scp) from the filelist in FAME! corpus
feature_dir_ = os.path.join(feature_dir, dataset)
if not os.path.exists(feature_dir_):
os.makedirs(feature_dir_)
# get a list of features (hcopy.scp)
# from the filelist in FAME! corpus.
#fame_functions.make_hcopy_scp_from_filelist_in_fame(default.fame_dir, dataset, feature_dir_, hcopy_scp.name)
# from the list of label files.
lab_list = glob.glob(os.path.join(label_dir_, '*.lab'))
feature_list = [
os.path.join(wav_dir_, os.path.basename(lab_file).replace('.lab', '.wav')) + '\t'
+ os.path.join(feature_dir_, os.path.basename(lab_file).replace('.lab', '.mfc'))
for lab_file in lab_list]
# extract features
print(">>> extracting features... \n")
fame_functions.make_hcopy_scp_from_filelist_in_fame(default.fame_dir, dataset, feature_dir_, hcopy_scp.name)
pyhtk.wav2mfc(default.config_hcopy, hcopy_scp.name)
# a script file for HCompV
print(">>> making a script file for HCompV... \n")
hcompv_scp = os.path.join(tmp_dir, dataset + '.scp')
fh.make_filelist(feature_dir_, hcompv_scp, '.mfc')
## ======================= convert lexicon from ipa to fame_htk =======================
if conv_lexicon:
print('==== convert lexicon from ipa 2 fame ====\n')
#dir_out = r'c:\Users\Aki\source\repos\acoustic_model\_tmp'
lexicon_dir = os.path.join(default.fame_dir, 'lexicon')
lexicon_ipa = os.path.join(lexicon_dir, 'lex.ipa')
lexicon_asr = os.path.join(lexicon_dir, 'lex.asr')
# get the correspondence between lex_ipa and lex_asr.
lex_asr = fame_functions.load_lexicon(lexicon_asr)
lex_ipa = fame_functions.load_lexicon(lexicon_ipa)
if 1:
timer_start = time.time()
translation_key, phone_unknown = fame_functions.get_translation_key(lexicon_ipa, lexicon_asr)
print("elapsed time: {}".format(time.time() - timer_start))
np.save('translation_key_ipa2asr.npy', translation_key)
np.save('phone_unknown.npy', phone_unknown)
else:
translation_key = np.load('translation_key_ipa2asr.npy').item()
phone_unknown = np.load('phone_unknown.npy')
phone_unknown = list(phone_unknown)
## manually check the correspondence for the phone in phone_unknown.
#p = phone_unknown[0]
#lex_ipa_ = find_phone(lexicon_ipa, p, phoneset='ipa')
#for word in lex_ipa_['word']:
# ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
# if np.sum(lex_asr['word'] == word) > 0:
# asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
# ipa_list = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa)
# asr_list = asr.split(' ')
# if p in ipa_list and (len(ipa_list) == len(asr_list)):
# print("{0}: {1} --> {2}".format(word, ipa_list, asr_list))
# for ipa_, asr_ in zip(ipa_list, asr_list):
# if ipa_ in phone_unknown:
# translation_key[ipa_] = asr_
# phone_unknown.remove(ipa_)
## check if all the phones in lexicon_ipa are in fame_phoneset.py.
#timer_start = time.time()
#phoneset_lex = get_phoneset_from_lexicon(lexicon_ipa, phoneset='ipa')
#print("elapsed time: {}".format(time.time() - timer_start))
#phoneset_py = fame_phoneset.phoneset_ipa
#set(phoneset_lex) - set(phoneset_py)
##timer_start = time.time()
##extracted = find_phone(lexicon_ipa, 'ⁿ')
##print("elapsed time: {}".format(time.time() - timer_start))
# lex.asr is Kaldi compatible version of lex.ipa.
# to check...
#lexicon_ipa = pd.read_table(lex_ipa, names=['word', 'pronunciation'])
#with open(lex_ipa_, "w", encoding="utf-8") as fout:
# for word, pronunciation in zip(lexicon_ipa['word'], lexicon_ipa['pronunciation']):
# # ignore nasalization and '.'
# pronunciation_ = pronunciation.replace(u'ⁿ', '')
# pronunciation_ = pronunciation_.replace('.', '')
# pronunciation_split = convert_phone_set.split_ipa_fame(pronunciation_)
# fout.write("{0}\t{1}\n".format(word, ' '.join(pronunciation_split)))
# convert each lexicon from ipa description to fame_htk phoneset.
#am_func.ipa2famehtk_lexicon(lex_oov, lex_oov_htk)
#am_func.ipa2famehtk_lexicon(lex_asr, lex_asr_htk)
# combine lexicon
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
# therefore there is no overlap between lex_asr and lex_oov.
#am_func.combine_lexicon(lex_asr_htk, lex_oov_htk, lex_htk)
## ======================= check if all the phones are successfully converted =======================
if check_lexicon:
print("==== check if all the phones are successfully converted. ====\n")
# the phones used in the lexicon.
phonelist_asr = am_func.get_phonelist(lex_asr)
phonelist_oov = am_func.get_phonelist(lex_oov)
phonelist_htk = am_func.get_phonelist(lex_htk)
phonelist = phonelist_asr.union(phonelist_oov)
# the lines which include a specific phone.
lines = am_func.find_phone(lex_asr, 'g')
# statistics over the lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
pronunciation = lexicon_htk['pronunciation']
phones_all = []
for word in pronunciation:
phones_all = phones_all + word.split()
c = Counter(phones_all)
## =======================
## manually make changes to the pronunciation dictionary and save it as lex.htk
## =======================
# (1) Replace all tabs with single space;
# (2) Put a '\' before any dictionary entry beginning with single quote
#http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
## ======================= make label file =======================
if make_mlf:
print("==== make mlf ====\n")
print("generating word level transcription...\n")
for dataset in dataset_list:
hcompv_scp = output_dir + '\\scp\\' + dataset + '.scp'
hcompv_scp2 = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
script_list = FAME_dir + '\\data\\' + dataset + '\\text'
mlf_word = output_dir + '\\label\\' + dataset + '_word.mlf'
mlf_phone = output_dir + '\\label\\' + dataset + '_phone.mlf'
# lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
# list of features
with open(hcompv_scp) as fin:
features = fin.read()
features = features.split('\n')
# list of scripts
with open(script_list, "rt", encoding="utf-8") as fin:
scripts = fin.read()
scripts = pd.Series(scripts.split('\n'))
i = 0
missing_words = []
fscp = open(hcompv_scp2, 'wt')
fmlf = open(mlf_word, "wt", encoding="utf-8")
fmlf.write("#!MLF!#\n")
feature_nr = 1
for feature in features:
sys.stdout.write("\r%d/%d" % (feature_nr, len(features)))
sys.stdout.flush()
feature_nr += 1
file_basename = os.path.basename(feature).replace('.mfc', '')
# get words from scripts.
try:
script = scripts[scripts.str.contains(file_basename)]
except IndexError:
script = []
if len(script) != 0:
script_id = script.index[0]
script_txt = script.get(script_id)
script_words = script_txt.split(' ')
del script_words[0]
# check if all words can be found in the lexicon.
SCRIPT_WORDS = []
script_prons = []
is_in_lexicon = 1
for word in script_words:
WORD = word.upper()
SCRIPT_WORDS.append(WORD)
extracted = lexicon_htk[lexicon_htk['word']==WORD]
if len(extracted) == 0:
missing_words.append(word)
script_prons.append(extracted)
is_in_lexicon *= len(extracted)
# if all pronunciations are found in the lexicon, update scp and mlf files.
if is_in_lexicon:
# add the feature filename into the .scp file.
fscp.write("{}\n".format(feature))
i += 1
# add the words to the mlf file.
fmlf.write('\"*/{}.lab\"\n'.format(file_basename))
#fmlf.write('{}'.format('\n'.join(SCRIPT_WORDS)))
for word_ in SCRIPT_WORDS:
if word_[0] == '\'':
word_ = '\\' + word_
fmlf.write('{}\n'.format(word_))
fmlf.write('.\n')
print("\n{0} has {1} samples.\n".format(dataset, i))
np.save(output_dir + '\\missing_words' + '_' + dataset + '.npy', missing_words)
fscp.close()
fmlf.close()
## generate phone level transcription
print("generating phone level transcription...\n")
mkphones = output_dir + '\\label\\mkphones0.txt'
subprocessStr = r"HLEd -l * -d " + lex_htk_ + ' -i ' + mlf_phone + ' ' + mkphones + ' ' + mlf_word
subprocess.call(subprocessStr, shell=True)
## ======================= combined scps and mlfs =======================
if combine_files:
print("==== combine scps and mlfs ====\n")
fscp = open(hcompv_scp, 'wt')
fmlf = open(combined_mlf, 'wt')
for dataset in dataset_list:
fmlf.write("#!MLF!#\n")
for dataset in dataset_list:
each_mlf = output_dir + '\\label\\' + dataset + '_phone.mlf'
each_scp = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
#if os.path.exists(empty_mfc_file):
# os.remove(empty_mfc_file)
with open(hcopy_scp.name, 'wb') as f:
f.write(bytes('\n'.join(feature_list), 'ascii'))
with open(each_mlf, 'r') as fin:
lines = fin.read()
lines = lines.split('\n')
fmlf.write('\n'.join(lines[1:]))
# extract features.
print(">>> extracting features on {}...".format(dataset))
chtk.wav2mfc(hcopy_scp.name)
os.remove(hcopy_scp.name)
with open(each_scp, 'r') as fin:
lines = fin.read()
fscp.write(lines)
# make hcompv.scp.
print(">>> making a script file for {}...".format(dataset))
listdir = glob.glob(os.path.join(label_dir_, '*.dic'))
mfc_list = [filename.replace(label_dir_, feature_dir_).replace('.dic', '.mfc') for filename in listdir]
hcompv_scp = os.path.join(tmp_dir, dataset + '.scp')
with open(hcompv_scp, 'wb') as f:
f.write(bytes('\n'.join(mfc_list) + '\n', 'ascii'))
fscp.close()
fmlf.close()
print(">>> extracting features on stimmen...")
chtk.wav2mfc(os.path.join(htk_stimmen_dir, 'hcopy.scp'))
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= flat start monophones =======================
if flat_start:
subprocessStr = 'HCompV -T 1 -C ' + config_train + ' -m -v 0.01 -S ' + hcompv_scp + ' -M ' + model0_dir + ' ' + proto_init
subprocess.call(subprocessStr, shell=True)
if combine_all:
# script files.
fh.concatenate(
os.path.join(tmp_dir, 'devel.scp'),
os.path.join(tmp_dir, 'test.scp'),
hcompv_scp_train
)
fh.concatenate(
hcompv_scp_train,
os.path.join(tmp_dir, 'train.scp'),
hcompv_scp_train
)
# phone level mlfs.
fh.concatenate(
os.path.join(label_dir, 'devel_phone.mlf'),
os.path.join(label_dir, 'test_phone.mlf'),
mlf_file_train
)
fh.concatenate(
mlf_file_train,
os.path.join(label_dir, 'train_phone.mlf'),
mlf_file_train
)
# phone level mlfs with sp.
fh.concatenate(
os.path.join(label_dir, 'devel_phone_with_sp.mlf'),
os.path.join(label_dir, 'test_phone_with_sp.mlf'),
mlf_file_train_with_sp
)
fh.concatenate(
mlf_file_train_with_sp,
os.path.join(label_dir, 'train_phone_with_sp.mlf'),
mlf_file_train_with_sp
)
# word level mlfs.
fh.concatenate(
os.path.join(label_dir, 'devel_word.mlf'),
os.path.join(label_dir, 'test_word.mlf'),
mlf_file_train_word
)
fh.concatenate(
mlf_file_train_word,
os.path.join(label_dir, 'train_word.mlf'),
mlf_file_train_word
)
## ======================= flat start monophones =======================
if flat_start:
timer_start = time.time()
print('==== flat start ====')
fh.make_new_directory(model_mono0_dir, existing_dir='leave')
chtk.flat_start(hcompv_scp_train, model_mono0_dir)
# make macros.
vFloors = os.path.join(model_mono0_dir, 'vFloors')
if os.path.exists(vFloors):
chtk.make_macros(vFloors)
# allocate mean & variance to all phones in the phone list
subprocessStr = 'perl ' + mkhmmdefs_pl + ' ' + model0_dir + '\\proto38' + ' ' + phonelist + ' > ' + model0_dir + '\\' + hmmdefs_name
subprocess.call(subprocessStr, shell=True)
## ======================= estimate monophones =======================
if train_model:
iter_num_max = 3
for mix_num in [128, 256, 512, 1024]:
for iter_num in range(1, iter_num_max+1):
print("===== mix{}, iter{} =====".format(mix_num, iter_num))
iter_num_pre = iter_num - 1
modelN_dir = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num)
if not os.path.exists(modelN_dir):
os.makedirs(modelN_dir)
if iter_num == 1 and mix_num == 1:
modelN_dir_pre = model0_dir
else:
modelN_dir_pre = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num_pre)
## re-estimation
subprocessStr = 'HERest -T 1 -C ' + config_train + ' -v 0.01 -I ' + combined_mlf + ' -H ' + modelN_dir_pre + '\\' + hmmdefs_name + ' -M ' + modelN_dir + ' ' + phonelist + ' -S ' + hcompv_scp
subprocess.call(subprocessStr, shell=True)
mix_num_next = mix_num * 2
modelN_dir_next = model_dir + '\\hmm' + str(mix_num_next) + '-0'
if not os.path.exists(modelN_dir_next):
os.makedirs(modelN_dir_next)
print('>>> allocating mean & variance to all phones in the phone list...')
chtk.make_hmmdefs(model_mono0_dir)
header_file = modelN_dir + '\\mix' + str(mix_num_next) + '.hed'
with open(header_file, 'w') as fout:
fout.write("MU %d {*.state[2-4].mix}" % (mix_num_next))
print("elapsed time: {}".format(time.time() - timer_start))
subprocessStr = 'HHEd -T 1 -H ' + modelN_dir + '\\' + hmmdefs_name + ' -M ' + modelN_dir_next + ' ' + header_file + ' ' + phonelist
## ======================= train model without short pause =======================
if train_monophone_without_sp:
print('==== train monophone without sp ====')
timer_start = time.time()
niter = chtk.re_estimation_until_saturated(
model_mono1_dir,
model_mono0_dir, improvement_threshold, hcompv_scp_train,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic')
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= adding sp to the model =======================
if add_sp:
print('==== adding sp to the model ====')
# reference:
# http://www.f.waseda.jp/yusukekondo/htk.html#flat_start_estimation
timer_start = time.time()
# make model with sp.
print('>>> adding sp state to the last model in the previous step...')
fh.make_new_directory(model_mono1sp_dir, existing_dir='leave')
niter = chtk.get_niter_max(model_mono1_dir)
modeln_dir_pre = os.path.join(model_mono1_dir, 'iter'+str(niter))
modeln_dir = os.path.join(model_mono1sp_dir, 'iter0')
chtk.add_sp(modeln_dir_pre, modeln_dir)
print('>>> re-estimation...')
niter = chtk.re_estimation_until_saturated(
model_mono1sp_dir, modeln_dir, improvement_threshold, hcompv_scp_train,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_with_sp,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= train model with re-aligned mlf =======================
if train_monophone_with_re_aligned_mlf:
print('==== traina monophone with re-aligned mlf ====')
timer_start = time.time()
print('>>> re-aligning the training data... ')
niter = chtk.get_niter_max(model_mono1sp_dir)
modeln_dir = os.path.join(model_mono1sp_dir, 'iter'+str(niter))
chtk.make_aligned_label(
os.path.join(modeln_dir, 'macros'),
os.path.join(modeln_dir, 'hmmdefs'),
mlf_file_train_aligned,
mlf_file_train_word,
hcompv_scp_train)
chtk.fix_mlf(mlf_file_train_aligned)
print('>>> updating the script file... ')
chtk.update_script_file(
mlf_file_train_aligned,
mlf_file_train_with_sp,
hcompv_scp_train,
hcompv_scp_train_updated)
print('>>> re-estimation... ')
timer_start = time.time()
fh.make_new_directory(model_mono1sp2_dir, existing_dir='leave')
niter = chtk.get_niter_max(model_mono1sp_dir)
niter = chtk.re_estimation_until_saturated(
model_mono1sp2_dir,
os.path.join(model_mono1sp_dir, 'iter'+str(niter)),
improvement_threshold,
hcompv_scp_train_updated,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_aligned,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= increase mixture =======================
if increase_mixture:
print('==== increase mixture ====')
timer_start = time.time()
for nmix in [2, 4, 8, 16]:
if nmix == 2:
modeln_dir_ = model_mono1sp2_dir
else:
modeln_dir_ = os.path.join(model_dir, 'mono'+str(nmix_))
modeln_dir = os.path.join(model_dir, 'mono'+str(nmix))
print('mixture: {}'.format(nmix))
fh.make_new_directory(modeln_dir, existing_dir='delete')
niter = chtk.get_niter_max(modeln_dir_)
chtk.increase_mixture(
os.path.join(modeln_dir_, 'iter'+str(niter), 'hmmdefs'),
nmix,
os.path.join(modeln_dir, 'iter0'),
model_type='monophone_with_sp')
shutil.copy2(os.path.join(modeln_dir_, 'iter'+str(niter), 'macros'),
os.path.join(modeln_dir, 'iter0', 'macros'))
#improvement_threshold = -10
niter = chtk.re_estimation_until_saturated(
modeln_dir,
os.path.join(modeln_dir_, 'iter0'),
improvement_threshold,
hcompv_scp_train_updated,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_aligned,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
nmix_ = nmix
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= train triphone =======================
print('>>> making triphone list... ')
chtk.make_triphonelist(
mlf_file_train_aligned,
triphone_mlf)
if train_triphone:
print('==== train triphone model ====')
timer_start = time.time()
print('>>> init triphone model... ')
niter = chtk.get_niter_max(model_mono1sp2_dir)
fh.make_new_directory(os.path.join(model_tri1_dir, 'iter0'), existing_dir='leave')
chtk.init_triphone(
os.path.join(model_mono1sp2_dir, 'iter'+str(niter)),
os.path.join(model_tri1_dir, 'iter0')
)
print('>>> re-estimation... ')
## I wanted to train until satulated:
#niter = chtk.re_estimation_until_saturated(
# model_tri1_dir,
# os.path.join(model_tri1_dir, 'iter0'),
# improvement_threshold,
# hcompv_scp_train_updated,
# os.path.join(htk_stimmen_dir, 'mfc'),
# 'mfc',
# os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
# mlf_file=triphone_mlf,
# lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
# model_type='triphone'
# )
#
# but because the data size is limited, some triphone cannot be trained and received the error:
# ERROR [+8231] GetHCIModel: Cannot find hmm [i:-]r[+???]
# therefore only two times re-estimation is performed.
output_dir = model_tri1_dir
for niter in range(1, 4):
hmm_n = 'iter' + str(niter)
hmm_n_pre = 'iter' + str(niter-1)
_modeln_dir = os.path.join(output_dir, hmm_n)
_modeln_dir_pre = os.path.join(output_dir, hmm_n_pre)
subprocess.call(subprocessStr, shell=True)
fh.make_new_directory(_modeln_dir, 'leave')
chtk.re_estimation(
os.path.join(_modeln_dir_pre, 'hmmdefs'),
_modeln_dir,
hcompv_scp_train_updated,
mlf_file=triphone_mlf,
macros=os.path.join(_modeln_dir_pre, 'macros'),
model_type='triphone')
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= train tied-state triphones =======================
if train_triphone_tied:
print('==== train tied-state triphones ====')
timer_start = time.time()
print('>>> making lexicon for triphone... ')
chtk.make_lexicon_triphone(phonelist_full_txt, lexicon_htk_triphone)
chtk.combine_phonelists(phonelist_full_txt)
print('>>> making a tree header... ')
fame_phonetics.make_quests_hed(quests_hed)
stats = os.path.join(r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk\model\tri1\iter3', 'stats')
chtk.make_tree_header(tree_hed, quests_hed, stats, config_dir)
print('>>> init triphone model... ')
niter = chtk.get_niter_max(model_tri1_dir)
fh.make_new_directory(os.path.join(model_tri1tied_dir, 'iter0'), existing_dir='leave')
chtk.init_triphone(
os.path.join(model_tri1_dir, 'iter'+str(niter)),
os.path.join(model_tri1tied_dir, 'iter0'),
tied=True)
# I wanted to train until satulated:
#niter = chtk.re_estimation_until_saturated(
# model_tri1tied_dir,
# os.path.join(model_tri1tied_dir, 'iter0'),
# improvement_threshold,
# hcompv_scp_train_updated,
# os.path.join(htk_stimmen_dir, 'mfc'),
# 'mfc',
# os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
# mlf_file=triphone_mlf,
# lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
# model_type='triphone'
# )
#
# but because the data size is limited, some triphone cannot be trained and received the error:
# ERROR [+8231] GetHCIModel: Cannot find hmm [i:-]r[+???]
# therefore only 3 times re-estimation is performed.
output_dir = model_tri1tied_dir
for niter in range(1, 4):
hmm_n = 'iter' + str(niter)
hmm_n_pre = 'iter' + str(niter-1)
_modeln_dir = os.path.join(output_dir, hmm_n)
_modeln_dir_pre = os.path.join(output_dir, hmm_n_pre)
fh.make_new_directory(_modeln_dir, 'leave')
chtk.re_estimation(
os.path.join(_modeln_dir_pre, 'hmmdefs'),
_modeln_dir,
hcompv_scp_train_updated,
mlf_file=triphone_mlf,
macros=os.path.join(_modeln_dir_pre, 'macros'),
model_type='triphone')
print("elapsed time: {}".format(time.time() - timer_start))

138
acoustic_model/fame_test.py Normal file
View File

@ -0,0 +1,138 @@
import sys
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
from collections import Counter
import time
import re
import numpy as np
import pandas as pd
import fame_functions
import defaultfiles as default
sys.path.append(default.toolbox_dir)
from phoneset import fame_ipa, fame_asr
import convert_phoneset
lexicon_dir = os.path.join(default.fame_dir, 'lexicon')
lexicon_ipa = os.path.join(lexicon_dir, 'lex.ipa')
lexicon_asr = os.path.join(lexicon_dir, 'lex.asr')
lexicon_htk = os.path.join(default.htk_dir, 'lexicon', 'lex.htk')
## check if all the phones in lexicon.ipa are in fame_ipa.py.
#timer_start = time.time()
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_ipa, phoneset='ipa')
#phoneset_py = fame_ipa.phoneset
#print("phones which is in lexicon.ipa but not in fame_ipa.py:\n{}".format(
# set(phoneset_lex) - set(phoneset_py)))
#print("elapsed time: {}".format(time.time() - timer_start))
# check which word has the phone.
#timer_start = time.time()
#extracted = find_phone(lexicon_ipa, 'ⁿ')
#print("elapsed time: {}".format(time.time() - timer_start))
## get the correspondence between lex_ipa and lex_asr.
lex_asr = fame_functions.load_lexicon(lexicon_asr)
lex_ipa = fame_functions.load_lexicon(lexicon_ipa)
if 0:
timer_start = time.time()
translation_key_ipa2asr, phone_unknown = fame_functions.get_translation_key(lexicon_ipa, lexicon_asr)
print("elapsed time: {}".format(time.time() - timer_start))
np.save(os.path.join('phoneset', 'output_get_translation_key_translation_key.npy'), translation_key_ipa2asr)
np.save(os.path.join('phoneset', 'output_get_translation_key_phone_unknown.npy'), phone_unknown)
else:
translation_key_ipa2asr = np.load(os.path.join('phoneset', 'output_get_translation_key_translation_key.npy')).item()
phone_unknown = np.load(os.path.join('phoneset', 'output_get_translation_key_phone_unknown.npy'))
phone_unknown = list(phone_unknown)
# manually check the correspondence for the phone in phone_unknown.
#p = phone_unknown[0]
#lex_ipa_ = find_phone(lexicon_ipa, p, phoneset='ipa')
#for word in lex_ipa_['word']:
# ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
# if np.sum(lex_asr['word'] == word) > 0:
# asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
# ipa_list = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
# asr_list = asr.split(' ')
# if p in ipa_list and (len(ipa_list) == len(asr_list)):
# print("{0}: {1} --> {2}".format(word, ipa_list, asr_list))
# for ipa_, asr_ in zip(ipa_list, asr_list):
# if ipa_ in phone_unknown:
# translation_key_ipa2asr[ipa_] = asr_
# phone_unknown.remove(ipa_)
translation_key_ipa2asr['ə:'] = 'ə'
translation_key_ipa2asr['r.'] = 'r'
translation_key_ipa2asr['r:'] = 'r'
# added for stimmen.
translation_key_ipa2asr['ɪ:'] = 'ɪ:'
translation_key_ipa2asr['y:'] = 'y'
np.save(os.path.join('phoneset', 'fame_ipa2asr.npy'), translation_key_ipa2asr)
## check if all the phones in lexicon.asr are in translation_key_ipa2asr.
#timer_start = time.time()
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_asr, phoneset='asr')
#phoneset_lex.remove("")
#phoneset_asr = list(set(translation_key_ipa2asr.values()))
#print("phones which is in lexicon.asr but not in the translation_key_ipa2asr:\n{}".format(
# set(phoneset_lex) - set(phoneset_asr)))
#print("elapsed time: {}".format(time.time() - timer_start))
## check if all the phones in lexicon.htk are in fame_asr.py.
#timer_start = time.time()
#phoneset_htk = fame_asr.phoneset_htk
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_htk)
#phoneset_lex.remove('')
#print("phones which is in lexicon.htk but not in the fame_asr.py are:\n{}".format(
# set(phoneset_htk) - set(phoneset_lex)))
#print("elapsed time: {}".format(time.time() - timer_start))
## statistics over the lexicon
#lex_htk = fame_functions.load_lexicon(lexicon_htk)
#phones_all = (' '.join(lex_htk['pronunciation'])).split(' ')
#c = Counter(phones_all)
#lexicon_out = r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk\lexicon\lex.htk2'
#for i in lex_htk[lex_htk['word'].str.startswith('\'')].index.values:
# lex_htk.iat[i, 0] = lex_htk.iat[i, 0].replace('\'', '\\\'')
## to_csv does not work with space seperator. therefore all tabs should manually be replaced.
##lex_htk.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep=' ', quoting=csv.QUOTE_NONE, escapechar='\\')
#lex_htk.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep='\t')
## check which letters are not coded in ascii.
#print('asr phones which cannot be coded in ascii:\n')
#for i in fame_asr.phoneset_short:
# try:
# i_encoded = i.encode("ascii")
# #print("{0} --> {1}".format(i, i.encode("ascii")))
# except UnicodeEncodeError:
# print(">>> {}".format(i))
#print("letters in the scripts which is not coded in ascii:\n")
#for dataset in ['train', 'devel', 'test']:
# timer_start = time.time()
# script_list = os.path.join(default.fame_dir, 'data', dataset, 'text')
# with open(script_list, "rt", encoding="utf-8") as fin:
# scripts = fin.read().split('\n')
# for line in scripts:
# sentence = ' '.join(line.split(' ')[1:])
# sentence_htk = fame_functions.word2htk(sentence)
# #if len(re.findall(r'[âêôûč\'àéèúćäëïöü]', sentence))==0:
# try:
# sentence_htk = bytes(sentence_htk, 'ascii')
# except UnicodeEncodeError:
# print(sentence)
# print(sentence_htk)

View File

@ -1,487 +1,587 @@
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
import csv
import subprocess
from collections import Counter
import re
#import csv
#import subprocess
#from collections import Counter
#import re
import shutil
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from collections import Counter
#import matplotlib.pyplot as plt
#from sklearn.metrics import confusion_matrix
import acoustic_model_functions as am_func
import convert_xsampa2ipa
#import acoustic_model_functions as am_func
#import convert_xsampa2ipa
import defaultfiles as default
from forced_alignment import pyhtk
#from forced_alignment import pyhtk
#sys.path.append(default.forced_alignment_module_dir)
#from forced_alignment import convert_phone_set
#import acoustic_model_functions as am_func
import convert_xsampa2ipa
import stimmen_functions
import fame_functions
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
## ======================= user define =======================
excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
#excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
#data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
#wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
#acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
#htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
#fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
#result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
#kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
#lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
#lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk')
# procedure
make_htk_dict_files = 0
do_forced_alignment_htk = 0
eval_forced_alignment_htk = 0
make_kaldi_data_files = 0
make_kaldi_lexicon_txt = 0
load_forced_alignment_kaldi = 1
eval_forced_alignment_kaldi = 1
make_dic_file = 0
make_HTK_files = 0
extract_features = 0
#make_htk_dict_files = 0
#do_forced_alignment_htk = 0
#eval_forced_alignment_htk = 0
make_kaldi_files = 0
#make_kaldi_lexicon_txt = 0
#load_forced_alignment_kaldi = 1
#eval_forced_alignment_kaldi = 1
#sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
#from forced_alignment import convert_phone_set
#from forced_alignment import pyhtk
#sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
#from evaluation import plot_confusion_matrix
## HTK related files.
config_dir = os.path.join(default.htk_dir, 'config')
model_dir = os.path.join(default.htk_dir, 'model')
feature_dir = os.path.join(default.htk_dir, 'mfc', 'stimmen')
config_hcopy = os.path.join(config_dir, 'config.HCopy')
# files to be made.
lattice_file = os.path.join(config_dir, 'stimmen.ltc')
phonelist_txt = os.path.join(config_dir, 'phonelist.txt')
stimmen_dic = os.path.join(default.htk_dir, 'lexicon', 'stimmen_recognition.dic')
hcopy_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_hcopy.scp')
hvite_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_hvite.scp')
hresult_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_result.scp')
## Kaldi related files.
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data')
# files to be made.
wav_scp = os.path.join(kaldi_data_dir, 'test', 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'test', 'text')
utt2spk = os.path.join(kaldi_data_dir, 'test', 'utt2spk')
corpus_txt = os.path.join(kaldi_data_dir, 'local', 'corpus.txt')
lexicon_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'lexicon.txt')
nonsilence_phones_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'nonsilence_phones.txt')
silence_phones_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'silence_phones.txt')
optional_silence_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'optional_silence.txt')
## ======================= load test data ======================
stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
df = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
df = stimmen_functions.add_row_asr(df)
df = stimmen_functions.add_row_htk(df)
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
## ======================= make dic file to check pronunciation variants ======================
# dic file should be manually modified depends on the task - recognition / forced-alignemnt.
if make_dic_file:
# for HTK.
with open(stimmen_dic, mode='wb') as f:
for word in word_list:
df_ = df[df['word']==word]
pronunciations = list(np.unique(df_['htk']))
pronunciations_ = [word.upper() + ' sil ' + ' '.join(convert_phoneset.split_word(
htk, fame_asr.multi_character_phones_htk)) + ' sil'
for htk in pronunciations]
f.write(bytes('\n'.join(pronunciations_) + '\n', 'ascii'))
f.write(bytes('SILENCE sil\n', 'ascii'))
# for Kaldi.
fh.make_new_directory(os.path.join(kaldi_data_dir, 'local', 'dict'))
with open(lexicon_txt, mode='wb') as f:
f.write(bytes('!SIL sil\n', 'utf-8'))
f.write(bytes('<UNK> spn\n', 'utf-8'))
for word in word_list:
df_ = df[df['word']==word]
pronunciations = list(np.unique(df_['asr']))
pronunciations_ = [word.lower() + ' ' + ' '.join(convert_phoneset.split_word(
asr, fame_asr.multi_character_phones))
for asr in pronunciations]
f.write(bytes('\n'.join(pronunciations_) + '\n', 'utf-8'))
## ======================= test data for recognition ======================
# only target pronunciation variants.
df_rec = pd.DataFrame(index=[], columns=list(df.keys()))
for word in word_list:
variants = [htk.replace(' ', '')
for htk in stimmen_functions.load_pronunciations(word.upper(), stimmen_dic)]
df_ = df[df['word'] == word]
for index, row in df_.iterrows():
if row['htk'] in variants:
df_rec = df_rec.append(row, ignore_index=True)
## ======================= make files required for HTK ======================
if make_HTK_files:
# make a word lattice file.
pyhtk.create_word_lattice_file(
os.path.join(config_dir, 'stimmen.net'),
lattice_file)
# extract features.
with open(hcopy_scp, 'wb') as f:
filelist = [os.path.join(stimmen_test_dir, filename) + '\t'
+ os.path.join(feature_dir, os.path.basename(filename).replace('.wav', '.mfc'))
for filename in df['filename']]
f.write(bytes('\n'.join(filelist), 'ascii'))
pyhtk.wav2mfc(config_hcopy, hcopy_scp)
# make label files.
for index, row in df.iterrows():
filename = row['filename'].replace('.wav', '.lab')
label_file = os.path.join(feature_dir, filename)
with open(label_file, 'wb') as f:
label_string = 'SILENCE\n' + row['word'].upper() + '\nSILENCE\n'
f.write(bytes(label_string, 'ascii'))
## ======================= make files required for Kaldi =======================
if make_kaldi_files:
fh.make_new_directory(os.path.join(kaldi_data_dir, 'test'))
fh.make_new_directory(os.path.join(kaldi_data_dir, 'test', 'local'))
fh.make_new_directory(os.path.join(kaldi_data_dir, 'conf'))
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i, row in df_rec.iterrows():
filename = row['filename']
print('=== {0}: {1} ==='.format(i, filename))
wav_file = os.path.join(stimmen_test_dir, filename)
#if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# output
f_wav_scp.write('{0} {1}\n'.format(
utterance_id,
wav_file.replace('c:/', '/mnt/c/').replace('\\', '/'))) # convert path to unix format.
f_text_file.write('{0}\t{1}\n'.format(utterance_id, df_rec['word'][i].lower()))
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
with open(corpus_txt, 'wb') as f:
f.write(bytes('\n'.join([word.lower() for word in word_list]) + '\n', 'utf-8'))
with open(nonsilence_phones_txt, 'wb') as f:
f.write(bytes('\n'.join(fame_asr.phoneset_short) + '\n', 'utf-8'))
with open(silence_phones_txt, 'wb') as f:
f.write(bytes('sil\nspn\n', 'utf-8'))
with open(optional_silence_txt, 'wb') as f:
f.write(bytes('sil\n', 'utf-8'))
with open(os.path.join(kaldi_data_dir, 'conf', 'decode.config'), 'wb') as f:
f.write(bytes('first_beam=10.0\n', 'utf-8'))
f.write(bytes('beam=13.0\n', 'utf-8'))
f.write(bytes('lattice_beam=6.0\n', 'utf-8'))
with open(os.path.join(kaldi_data_dir, 'conf', 'mfcc.conf'), 'wb') as f:
f.write(bytes('--use-energy=false', 'utf-8'))
## ======================= recognition ======================
listdir = glob.glob(os.path.join(feature_dir, '*.mfc'))
with open(hvite_scp, 'wb') as f:
f.write(bytes('\n'.join(listdir), 'ascii'))
with open(hresult_scp, 'wb') as f:
f.write(bytes('\n'.join(listdir).replace('.mfc', '.rec'), 'ascii'))
# calculate result
performance = np.zeros((1, 2))
for niter in range(50, 60):
output = pyhtk.recognition(
os.path.join(config_dir, 'config.rec'),
lattice_file,
os.path.join(default.htk_dir, 'model', 'hmm1', 'iter' + str(niter), 'hmmdefs'),
stimmen_dic, phonelist_txt, hvite_scp)
output = pyhtk.calc_recognition_performance(
stimmen_dic, hresult_scp)
per_sentence, per_word = pyhtk.load_recognition_output_all(output)
performance_ = np.array([niter, per_sentence['accuracy']]).reshape(1, 2)
performance = np.r_[performance, performance_]
print('{0}: {1}[%]'.format(niter, per_sentence['accuracy']))
#output = run_command_with_output([
# 'HVite', '-T', '1',
# '-C', config_rec,
# '-w', lattice_file,
# '-H', hmm,
# dictionary_file, phonelist_txt,
# '-S', HVite_scp
#])
## ======================= add paths =======================
sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
from forced_alignment import convert_phone_set
from forced_alignment import pyhtk
sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
from evaluation import plot_confusion_matrix
## ======================= convert phones ======================
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
xls = pd.ExcelFile(excel_file)
## check conversion
#df = pd.read_excel(xls, 'frequency')
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_)
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
## check phones included in FAME!
# the phones used in the lexicon.
#phonelist = am_func.get_phonelist(lex_asr)
# the lines which include a specific phone.
#lines = am_func.find_phone(lex_asr, 'x')
# Filename, Word, Self Xsampa
df = pd.read_excel(xls, 'original')
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
famehtk = convert_phone_set.ipa2famehtk(ipa)
famehtks.append(famehtk)
else:
ipas.append('')
famehtks.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas),
'famehtk': pd.Series(famehtks)})
# cleansing.
df = df[~df['famehtk'].isin(['/', ''])]
word_list = np.unique(df['word'])
## ======================= make dict files used for HTK. ======================
if make_htk_dict_files:
output_type = 3
for word in word_list:
htk_dict_file = htk_dict_dir + '\\' + word + '.dic'
# pronunciation variant of the target word.
pronvar_ = df['famehtk'][df['word'].str.match(word)]
# make dic file.
am_func.make_htk_dict(word, pronvar_, htk_dict_file, output_type)
## ======================= forced alignment using HTK =======================
if do_forced_alignment_htk:
#for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
for hmm_num in [256, 512, 1024]:
hmm_num_str = str(hmm_num)
acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs')
#for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
for hmm_num in [256, 512, 1024]:
hmm_num_str = str(hmm_num)
acoustic_model = os.path.join(acoustic_model_dir, 'hmm' + hmm_num_str + r'-2\hmmdefs')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
for i, filename in enumerate(df['filename']):
print('=== {0}/{1} ==='.format(i, len(df)))
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
word = df['word'][i]
WORD = word.upper()
fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
#if not os.path.exists(fa_file):
# make label file.
label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
with open(label_file, 'w') as f:
lines = f.write(WORD)
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
for i, filename in enumerate(df['filename']):
print('=== {0}/{1} ==='.format(i, len(df)))
if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
word = df['word'][i]
WORD = word.upper()
fa_file = os.path.join(fa_dir, filename.replace('.wav', '.txt') + hmm_num_str)
#if not os.path.exists(fa_file):
# make label file.
label_file = os.path.join(wav_dir, filename.replace('.wav', '.lab'))
with open(label_file, 'w') as f:
lines = f.write(WORD)
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
default.phonelist, acoustic_model)
os.remove(label_file)
pyhtk.doHVite(wav_file, label_file, htk_dict_file, fa_file, default.config_hvite,
default.phonelist, acoustic_model)
os.remove(label_file)
prediction = am_func.read_fileFA(fa_file)
prediction = am_func.read_fileFA(fa_file)
print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
else:
prediction = ''
print('!!!!! file not found.')
print('{0}: {1} -> {2}'.format(WORD, df['famehtk'][i], prediction))
else:
prediction = ''
print('!!!!! file not found.')
line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
predictions = predictions.append(line)
else:
prediction = ''
print('!!!!! invalid entry.')
line = pd.Series([df['filename'][i], df['word'][i], df['xsampa'][i], df['ipa'][i], df['famehtk'][i], prediction], index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'], name=i)
predictions = predictions.append(line)
else:
prediction = ''
print('!!!!! invalid entry.')
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= make files which is used for forced alignment by Kaldi =======================
if make_kaldi_data_files:
wav_scp = os.path.join(kaldi_data_dir, 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'text')
utt2spk = os.path.join(kaldi_data_dir, 'utt2spk')
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i in df.index:
filename = df['filename'][i]
print('=== {0}: {1} ==='.format(i, filename))
#if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# wav.scp file
wav_file_unix = wav_file.replace('\\', '/')
wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/')
f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix))
# text file
word = df['word'][i].lower()
f_text_file.write('{0}\t{1}\n'.format(utterance_id, word))
# utt2spk
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
## ======================= make lexicon txt which is used by Kaldi =======================
if make_kaldi_lexicon_txt:
option_num = 6
option_num = 6
# remove previous file.
if os.path.exists(lexicon_txt):
os.remove(lexicon_txt)
lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt')
if os.path.exists(lexiconp_txt):
os.remove(lexiconp_txt)
# output lexicon.txt
f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n')
pronvar_list_all = []
for word in word_list:
# remove previous file.
if os.path.exists(lexicon_txt):
os.remove(lexicon_txt)
lexiconp_txt = lexicon_txt.replace('lexicon.txt', 'lexiconp.txt')
if os.path.exists(lexiconp_txt):
os.remove(lexiconp_txt)
# output lexicon.txt
f_lexicon_txt = open(lexicon_txt, 'a', encoding="utf-8", newline='\n')
pronvar_list_all = []
for word in word_list:
# pronunciation variant of the target word.
pronunciation_variants = df['ipa'][df['word'].str.match(word)]
# pronunciation variant of the target word.
pronunciation_variants = df['ipa'][df['word'].str.match(word)]
c = Counter(pronunciation_variants)
total_num = sum(c.values())
c = Counter(pronunciation_variants)
total_num = sum(c.values())
#with open(result_dir + '\\' + word + '.csv', 'a', encoding="utf-8", newline='\n') as f:
# for key in c.keys():
# f.write("{0},{1}\n".format(key,c[key]))
#with open(result_dir + '\\' + word + '.csv', 'a', encoding="utf-8", newline='\n') as f:
# for key in c.keys():
# f.write("{0},{1}\n".format(key,c[key]))
for key, value in c.most_common(option_num):
# make possible pronunciation variant list.
pronvar_list = am_func.fame_pronunciation_variant(key)
for key, value in c.most_common(option_num):
# make possible pronunciation variant list.
pronvar_list = am_func.fame_pronunciation_variant(key)
for pronvar_ in pronvar_list:
split_ipa = convert_phone_set.split_fame_ipa(pronvar_)
pronvar_out = ' '.join(split_ipa)
pronvar_list_all.append([word, pronvar_out])
for pronvar_ in pronvar_list:
split_ipa = convert_phone_set.split_fame_ipa(pronvar_)
pronvar_out = ' '.join(split_ipa)
pronvar_list_all.append([word, pronvar_out])
pronvar_list_all = np.array(pronvar_list_all)
pronvar_list_all = np.unique(pronvar_list_all, axis=0)
pronvar_list_all = np.array(pronvar_list_all)
pronvar_list_all = np.unique(pronvar_list_all, axis=0)
# output
f_lexicon_txt.write('<UNK>\tSPN\n')
for line in pronvar_list_all:
f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1]))
# output
f_lexicon_txt.write('<UNK>\tSPN\n')
for line in pronvar_list_all:
f_lexicon_txt.write('{0}\t{1}\n'.format(line[0].lower(), line[1]))
f_lexicon_txt.close()
f_lexicon_txt.close()
## ======================= load kaldi forced alignment result =======================
if load_forced_alignment_kaldi:
phones_txt = os.path.join(default.kaldi_dir, 'data', 'lang', 'phones.txt')
merged_alignment_txt = os.path.join(default.kaldi_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt')
#filenames = np.load(data_dir + '\\filenames.npy')
#words = np.load(data_dir + '\\words.npy')
#pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy')
#pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy')
#word_list = np.unique(words)
phones_txt = os.path.join(default.kaldi_dir, 'data', 'lang', 'phones.txt')
merged_alignment_txt = os.path.join(default.kaldi_dir, 'exp', 'tri1_alignme', 'merged_alignment.txt')
#filenames = np.load(data_dir + '\\filenames.npy')
#words = np.load(data_dir + '\\words.npy')
#pronunciations = np.load(data_dir + '\\pronunciations_ipa.npy')
#pronvar_list_all = np.load(data_dir + '\\pronvar_list_all.npy')
#word_list = np.unique(words)
# load the mapping between phones and ids.
with open(phones_txt, 'r', encoding="utf-8") as f:
mapping_phone2id = f.read().split('\n')
# load the mapping between phones and ids.
with open(phones_txt, 'r', encoding="utf-8") as f:
mapping_phone2id = f.read().split('\n')
phones = []
phone_ids = [] # ID of phones
for m in mapping_phone2id:
m = m.split(' ')
if len(m) > 1:
phones.append(m[0])
phone_ids.append(int(m[1]))
phones = []
phone_ids = [] # ID of phones
for m in mapping_phone2id:
m = m.split(' ')
if len(m) > 1:
phones.append(m[0])
phone_ids.append(int(m[1]))
# load the result of FA.
with open(merged_alignment_txt, 'r') as f:
lines = f.read()
lines = lines.split('\n')
# load the result of FA.
with open(merged_alignment_txt, 'r') as f:
lines = f.read()
lines = lines.split('\n')
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
#fa_filenames = []
#fa_pronunciations = []
utterance_id_ = ''
pronunciation = []
for line in lines:
line = line.split(' ')
if len(line) == 5:
utterance_id = line[0]
if utterance_id == utterance_id_:
phone_id = int(line[4])
#if not phone_id == 1:
phone_ = phones[phone_ids.index(phone_id)]
phone = re.sub(r'_[A-Z]', '', phone_)
if not phone == 'SIL':
pronunciation.append(phone)
else:
filename = re.sub(r'speaker_[0-9]{4}-', '', utterance_id_)
prediction = ''.join(pronunciation)
df_ = df[df['filename'].str.match(filename)]
df_idx = df_.index[0]
prediction_ = pd.Series([#filename,
#df_['word'][df_idx],
#df_['xsampa'][df_idx],
#df_['ipa'][df_idx],
#df_['famehtk'][df_idx],
df_.iloc[0,1],
df_.iloc[0,3],
df_.iloc[0,4],
df_.iloc[0,2],
df_.iloc[0,0],
prediction],
index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'],
name=df_idx)
predictions = predictions.append(prediction_)
#fa_filenames.append()
#fa_pronunciations.append(' '.join(pronunciation))
pronunciation = []
predictions = pd.DataFrame({'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'famehtk': [''],
'prediction': ['']})
#fa_filenames = []
#fa_pronunciations = []
utterance_id_ = ''
pronunciation = []
for line in lines:
line = line.split(' ')
if len(line) == 5:
utterance_id = line[0]
if utterance_id == utterance_id_:
phone_id = int(line[4])
#if not phone_id == 1:
phone_ = phones[phone_ids.index(phone_id)]
phone = re.sub(r'_[A-Z]', '', phone_)
if not phone == 'SIL':
pronunciation.append(phone)
else:
filename = re.sub(r'speaker_[0-9]{4}-', '', utterance_id_)
prediction = ''.join(pronunciation)
df_ = df[df['filename'].str.match(filename)]
df_idx = df_.index[0]
prediction_ = pd.Series([#filename,
#df_['word'][df_idx],
#df_['xsampa'][df_idx],
#df_['ipa'][df_idx],
#df_['famehtk'][df_idx],
df_.iloc[0,1],
df_.iloc[0,3],
df_.iloc[0,4],
df_.iloc[0,2],
df_.iloc[0,0],
prediction],
index=['filename', 'word', 'xsampa', 'ipa', 'famehtk', 'prediction'],
name=df_idx)
predictions = predictions.append(prediction_)
#fa_filenames.append()
#fa_pronunciations.append(' '.join(pronunciation))
pronunciation = []
utterance_id_ = utterance_id
predictions.to_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
utterance_id_ = utterance_id
predictions.to_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
## ======================= evaluate the result of forced alignment =======================
if eval_forced_alignment_htk:
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
compare_hmm_num = 1
compare_hmm_num = 1
if compare_hmm_num:
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n")
if compare_hmm_num:
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("nmix,Oog,Oog,Oor,Oor,Pauw,Pauw,Reus,Reus,Reuzenrad,Reuzenrad,Roeiboot,Roeiboot,Rozen,Rozen\n")
for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
#for hmm_num in [256]:
hmm_num_str = str(hmm_num)
if compare_hmm_num:
f_result.write("{},".format(hmm_num_str))
for hmm_num in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
#for hmm_num in [256]:
hmm_num_str = str(hmm_num)
if compare_hmm_num:
f_result.write("{},".format(hmm_num_str))
#match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy')
#prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'))
#prediction = pd.Series(prediction, index=df.index, name='prediction')
#result = pd.concat([df, prediction], axis=1)
result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
#match = np.load(data_dir + '\\match_hmm' + hmm_num_str + '.npy')
#prediction = np.load(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.npy'))
#prediction = pd.Series(prediction, index=df.index, name='prediction')
#result = pd.concat([df, prediction], axis=1)
result = pd.read_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
# load pronunciation variants
for word in word_list:
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
with open(htk_dict_file, 'r') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants = [line.split('\t')[1] for line in lines]
# load pronunciation variants
for word in word_list:
htk_dict_file = os.path.join(htk_dict_dir, word + '.dic')
with open(htk_dict_file, 'r') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants = [line.split('\t')[1] for line in lines]
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_ = result_[result_['famehtk'].isin(pronunciation_variants)]
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_ = result_[result_['famehtk'].isin(pronunciation_variants)]
match_num = sum(result_['famehtk'] == result_['prediction'])
total_num = len(result_)
match_num = sum(result_['famehtk'] == result_['prediction'])
total_num = len(result_)
print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100))
if compare_hmm_num:
f_result.write("{0},{1},".format(match_num, total_num))
else:
# output confusion matrix
cm = confusion_matrix(result_['famehtk'], result_['prediction'])
print("word '{0}': {1}/{2} ({3:.2f} %)".format(word, match_num, total_num, match_num/total_num*100))
if compare_hmm_num:
f_result.write("{0},{1},".format(match_num, total_num))
else:
# output confusion matrix
cm = confusion_matrix(result_['famehtk'], result_['prediction'])
plt.figure()
plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
plt.savefig(result_dir + '\\cm_' + word + '.png')
plt.figure()
plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
plt.savefig(result_dir + '\\cm_' + word + '.png')
if compare_hmm_num:
f_result.write('\n')
if compare_hmm_num:
f_result.write('\n')
if compare_hmm_num:
f_result.close()
if compare_hmm_num:
f_result.close()
## ======================= evaluate the result of forced alignment of kaldi =======================
if eval_forced_alignment_kaldi:
result = pd.read_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
result = pd.read_pickle(os.path.join(result_dir, 'kaldi', 'predictions.pkl'))
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("word,total,valid,match,[%]\n")
f_result = open(os.path.join(result_dir, 'result.csv'), 'w')
f_result.write("word,total,valid,match,[%]\n")
# load pronunciation variants
with open(lexicon_txt, 'r', encoding="utf-8", newline='\n') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants_all = [line.split('\t') for line in lines]
# load pronunciation variants
with open(lexicon_txt, 'r', encoding="utf-8", newline='\n') as f:
lines = f.read().split('\n')[:-1]
pronunciation_variants_all = [line.split('\t') for line in lines]
word_list = np.delete(word_list, [0], 0) # remove 'Oog'
for word in word_list:
word_list = np.delete(word_list, [0], 0) # remove 'Oog'
for word in word_list:
# load pronunciation variant of the word.
pronunciation_variants = []
for line in pronunciation_variants_all:
if line[0] == word.lower():
pronunciation_variants.append(line[1].replace(' ', ''))
# load pronunciation variant of the word.
pronunciation_variants = []
for line in pronunciation_variants_all:
if line[0] == word.lower():
pronunciation_variants.append(line[1].replace(' ', ''))
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_tolerant = pd.DataFrame({
'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'prediction': [''],
'match': ['']})
# see only words which appears in top 3.
result_ = result[result['word'].str.match(word)]
result_tolerant = pd.DataFrame({
'filename': [''],
'word': [''],
'xsampa': [''],
'ipa': [''],
'prediction': [''],
'match': ['']})
for i in range(0, len(result_)):
line = result_.iloc[i]
for i in range(0, len(result_)):
line = result_.iloc[i]
# make a list of all possible pronunciation variants of ipa description.
# i.e. possible answers from forced alignment.
ipa = line['ipa']
pronvar_list = [ipa]
pronvar_list_ = am_func.fame_pronunciation_variant(ipa)
if not pronvar_list_ is None:
pronvar_list += list(pronvar_list_)
# make a list of all possible pronunciation variants of ipa description.
# i.e. possible answers from forced alignment.
ipa = line['ipa']
pronvar_list = [ipa]
pronvar_list_ = am_func.fame_pronunciation_variant(ipa)
if not pronvar_list_ is None:
pronvar_list += list(pronvar_list_)
# only focus on pronunciations which can be estimated from ipa.
if len(set(pronvar_list) & set(pronunciation_variants)) > 0:
if line['prediction'] in pronvar_list:
ismatch = True
else:
ismatch = False
# only focus on pronunciations which can be estimated from ipa.
if len(set(pronvar_list) & set(pronunciation_variants)) > 0:
if line['prediction'] in pronvar_list:
ismatch = True
else:
ismatch = False
line_df = pd.DataFrame(result_.iloc[i]).T
df_idx = line_df.index[0]
result_tolerant_ = pd.Series([line_df.loc[df_idx, 'filename'],
line_df.loc[df_idx, 'word'],
line_df.loc[df_idx, 'xsampa'],
line_df.loc[df_idx, 'ipa'],
line_df.loc[df_idx, 'prediction'],
ismatch],
index=['filename', 'word', 'xsampa', 'ipa', 'prediction', 'match'],
name=df_idx)
result_tolerant = result_tolerant.append(result_tolerant_)
# remove the first entry (dummy)
result_tolerant = result_tolerant.drop(0, axis=0)
line_df = pd.DataFrame(result_.iloc[i]).T
df_idx = line_df.index[0]
result_tolerant_ = pd.Series([line_df.loc[df_idx, 'filename'],
line_df.loc[df_idx, 'word'],
line_df.loc[df_idx, 'xsampa'],
line_df.loc[df_idx, 'ipa'],
line_df.loc[df_idx, 'prediction'],
ismatch],
index=['filename', 'word', 'xsampa', 'ipa', 'prediction', 'match'],
name=df_idx)
result_tolerant = result_tolerant.append(result_tolerant_)
# remove the first entry (dummy)
result_tolerant = result_tolerant.drop(0, axis=0)
total_num = len(result_)
valid_num = len(result_tolerant)
match_num = np.sum(result_tolerant['match'])
total_num = len(result_)
valid_num = len(result_tolerant)
match_num = np.sum(result_tolerant['match'])
print("word '{0}': {1}/{2} ({3:.2f} %) originally {4}".format(word, match_num, valid_num, match_num/valid_num*100, total_num))
f_result.write("{0},{1},{2},{3},{4}\n".format(word, total_num, valid_num, match_num, match_num/valid_num*100))
print("word '{0}': {1}/{2} ({3:.2f} %) originally {4}".format(word, match_num, valid_num, match_num/valid_num*100, total_num))
f_result.write("{0},{1},{2},{3},{4}\n".format(word, total_num, valid_num, match_num, match_num/valid_num*100))
f_result.close()
## output confusion matrix
#cm = confusion_matrix(result_['ipa'], result_['prediction'])
f_result.close()
## output confusion matrix
#cm = confusion_matrix(result_['ipa'], result_['prediction'])
#plt.figure()
#plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
#plt.savefig(result_dir + '\\cm_' + word + '.png')
#plt.figure()
#plot_confusion_matrix(cm, classes=pronunciation_variants, normalize=False)
#plt.savefig(result_dir + '\\cm_' + word + '.png')

View File

@ -52,7 +52,7 @@ p = argparse.ArgumentParser()
#p.add_argument("--user", default=None)
#p.add_argument("--password", default=None)
p.add_argument("--user", default='martijn.wieling')
p.add_argument("--password", default='fa0Thaic')
p.add_argument("--password", default='xxxxxx')
args = p.parse_args()
#wav_file = 'c:\\OneDrive\\WSL\\test\\onetwothree.wav'

View File

@ -1,20 +1,19 @@
## this script should be used only by Aki Kunikoshi.
import os
import numpy as np
import pandas as pd
import argparse
import json
from novoapi.backend import session
import os
#os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import defaultfiles as default
import convert_phoneset
def load_phonset():
translation_key_ipa2novo70 = dict()
translation_key_novo702ipa = dict()
def load_novo70_phoneset():
#phonelist_novo70_ = pd.ExcelFile(default.phonelist_novo70_xlsx)
#df = pd.read_excel(phonelist_novo70_, 'list')
## *_simple includes columns which has only one phone in.
@ -23,21 +22,23 @@ def load_phonset():
# print('{0}:{1}'.format(ipa, novo70))
# translation_key[ipa] = novo70
#phonelist_novo70 = np.unique(list(df['novo70_simple']))
novo70_phoneset = pd.read_csv(default.novo70_phoneset, delimiter='\t', header=None, encoding="utf-8")
novo70_phoneset.rename(columns={0: 'novo70', 1: 'ipa', 2: 'description'}, inplace=True)
phoneset_ipa = []
phoneset_novo70 = []
with open(default.novo70_phoneset, "rt", encoding="utf-8") as fin:
lines = fin.read()
lines = lines.split('\n')
for line in lines:
words = line.split('\t')
if len(words) > 1:
novo70 = words[0]
ipa = words[1]
phoneset_ipa.append(ipa)
phoneset_novo70.append(novo70)
translation_key_ipa2novo70[ipa] = novo70
translation_key_novo702ipa[novo70] = ipa
#phoneset_ipa = []
#phoneset_novo70 = []
#with open(default.novo70_phoneset, "rt", encoding="utf-8") as fin:
# lines = fin.read()
# lines = lines.split('\n')
# for line in lines:
# words = line.split('\t')
# if len(words) > 1:
# novo70 = words[0]
# ipa = words[1]
# phoneset_ipa.append(ipa)
# phoneset_novo70.append(novo70)
# translation_key_ipa2novo70[ipa] = novo70
# translation_key_novo702ipa[novo70] = ipa
# As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr # from ipa->novo70, only oh is used.
@ -47,15 +48,26 @@ def load_phonset():
# [ɛː] eh
# [w] wv in IPA written as ʋ.
extra_ipa = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'ʋ']
extra_novo70 = ['oh', 'ih', 'iy', 'uh', 'eh', 'wv']
for ipa, novo70 in zip(extra_ipa, extra_novo70):
phoneset_ipa.append(ipa)
phoneset_novo70.append(novo70)
extra_novo70 = ['oh', 'ih', 'iy', 'uh', 'eh', 'wv']
phoneset_ipa = list(novo70_phoneset['ipa'])
phoneset_ipa.extend(extra_ipa)
phoneset_ipa = [i.replace('ː', ':') for i in phoneset_ipa]
phoneset_novo70 = list(novo70_phoneset['novo70'])
phoneset_novo70.extend(extra_novo70)
phoneset_novo70 = [i.replace('ː', ':') for i in phoneset_novo70]
translation_key_ipa2novo70 = dict()
translation_key_novo702ipa = dict()
for ipa, novo70 in zip(phoneset_ipa, phoneset_novo70):
#phoneset_ipa.append(ipa)
#phoneset_novo70.append(novo70)
translation_key_ipa2novo70[ipa] = novo70
translation_key_novo702ipa[novo70] = ipa
translation_key_novo702ipa['ohr'] = 'ɔː'
translation_key_novo702ipa['ihr'] = 'ɪː'
translation_key_novo702ipa['ohr'] = 'ɔ:'
translation_key_novo702ipa['ihr'] = 'ɪ:'
phoneset_ipa = np.unique(phoneset_ipa)
phoneset_novo70 = np.unique(phoneset_novo70)
@ -63,25 +75,6 @@ def load_phonset():
return phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa
def multi_character_tokenize(line, multi_character_tokens):
"""
Tries to match one of the tokens in multi_character_tokens at each position of line,
starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character.
Copied from forced_alignment.convert_phone_set.py
"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_ipa(line):
"""
Split a line by IPA phones.
@ -89,13 +82,16 @@ def split_ipa(line):
:param string line: one line written in IPA.
:return string lineSeperated: the line splitted in IPA phone.
"""
phoneset_ipa, _, _, _ = load_novo70_phoneset()
#multi_character_phones = [i for i in phoneset_ipa if len(i) > 1]
#multi_character_phones.sort(key=len, reverse=True)
#multi_character_phones = [
# # IPAs in CGN.
# u'ʌu', u'ɛi', u'œy', u'aː', u'eː', u'iː', u'oː', u'øː', u'ɛː', u'œː', u'ɔː', u'ɛ̃ː', u'ɑ̃ː', u'ɔ̃ː', u'œ̃', u'ɪː'
# ]
#return [phone for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
multi_character_phones = [
# IPAs in CGN.
u'ʌu', u'ɛi', u'œy', u'aː', u'eː', u'iː', u'oː', u'øː', u'ɛː', u'œː', u'ɔː', u'ɛ̃ː', u'ɑ̃ː', u'ɔ̃ː', u'œ̃', u'ɪː'
]
return [phone for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
return convert_phoneset.split_word(line, phoneset_ipa)
def split_novo70(line):
@ -104,30 +100,33 @@ def split_novo70(line):
:param string line: one line written in novo70.
:return string lineSeperated: the line splitted by novo70 phones.
"""
_, phoneset_novo70, _, _ = load_phonset()
multi_character_phones = [p for p in phoneset_novo70 if len(p) > 1]
multi_character_phones = sorted(multi_character_phones, key=len, reverse=True)
_, phoneset_novo70, _, _ = load_novo70_phoneset()
#multi_character_phones = [p for p in phoneset_novo70 if len(p) > 1]
#multi_character_phones = sorted(multi_character_phones, key=len, reverse=True)
multi_character_phones = convert_phoneset.extract_multi_character_phones(phoneset_novo70)
return ['sp' if phone == ' ' else phone
for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
def novo702ipa(tokens):
pronunciation = []
_, _, _, translation_key = load_phonset()
for phone in split_novo70(tokens):
pronunciation.append(translation_key.get(phone, phone))
return ' '.join(pronunciation)
def novo702ipa(line):
#pronunciation = []
_, _, _, translation_key = load_novo70_phoneset()
#for phone in split_novo70(tokens):
# pronunciation.append(translation_key.get(phone, phone))
#return ' '.join(pronunciation)
return ' '.join(convert_phoneset.convert_phoneset(split_novo70(line), translation_key))
# numbering of novo70 should be checked.
def ipa2novo70(tokens):
pronunciation = []
_, _, translation_key, _ = load_phonset()
for phone in split_ipa(tokens):
pronunciation.append(translation_key.get(phone, phone))
return ' '.join(pronunciation)
def ipa2novo70(line):
#pronunciation = []
_, _, translation_key, _ = load_novo70_phoneset()
#for phone in split_ipa(tokens):
# pronunciation.append(translation_key.get(phone, phone))
#return ' '.join(pronunciation)
return ' '.join(convert_phoneset.convert_phoneset(split_ipa(line), translation_key))
def make_grammar(word, pronunciation_ipa):
"""
@ -173,7 +172,10 @@ def forced_alignment(wav_file, word, pronunciation_ipa):
# username / password cannot be passed as artuments...
p = argparse.ArgumentParser()
p.add_argument("--user", default='martijn.wieling')
p.add_argument("--password", default='fa0Thaic')
p.add_argument("--password", default='xxxxxx')
args = p.parse_args()
rec = session.Recognizer(grammar_version="1.0", lang="nl", snodeid=101, user=args.user, password=args.password, keepopen=True) # , modeldir=modeldir)
@ -194,6 +196,25 @@ def result2pronunciation(result, word):
return pronunciation_ipa, pronunciation_novo70, llh
def phones_not_in_novo70(ipa):
""" extract phones which is not in novo70 phoneset. """
phoneset_ipa, _, _, _ = load_novo70_phoneset()
# As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr
# [ɪː] ih / ihr
# [iː] iy
# [œː] uh
# [ɛː] eh
# [w] wv in IPA written as ʋ.
david_suggestion = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'w']
return [phone for phone in split_ipa(ipa)
if not phone in phoneset_ipa and not phone in david_suggestion]
if __name__ == 'main':
pronunciation_ipa = ['rø:s', 'mɑn', 'mɑntsjə']
grammar = make_grammar('reus', pronunciation_ipa)
#grammar = make_grammar('reus', pronunciation_ipa)
phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = load_novo70_phoneset()

View File

@ -0,0 +1,154 @@
""" definition of the phones to be used. """
# phonese in {FAME}/lexicon/lex.asr
phoneset = [
# vowels
'a',
'a:',
'e',
'e:',
'i',
'i:',
'',
'o',
'o:',
'ö',
'ö:',
'u',
'u:',
'ü',
'ü:',
#'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr. The pronunciation in Fries may be mistakes so I removed this phone.
'',
'y',
'ɔ',
'ɔ:',
'ɔ̈',
'ɔ̈:',
'ə',
'ɛ',
'ɛ:',
'ɪ',
'ɪ:',
# plosives
'p',
'b',
't',
'd',
'k',
'g',
'ɡ', # = 'g'
# nasals
'm',
'n',
'ŋ',
# fricatives
'f',
'v',
's',
's:',
'z',
'x',
'h',
# tap and flip
'r',
'r:',
# approximant
'j',
'l'
]
## reduce the number of phones.
# the phones which seldom occur are replaced with another more popular phones.
# replacements are based on the advice from Martijn Wieling.
reduction_key = {
'y':'i:', 'e':'e:', 'ə:':'ɛ:', 'r:':'r', 'ɡ':'g',
# aki added because this is used in stimmen_project.
'ɔ̈:':'ɔ:'
}
# already removed beforehand in phoneset. Just to be sure.
phones_to_be_removed = ['ú', 's:']
def phone_reduction(phones):
"""
Args:
phones (list): list of phones.
"""
if sum([phone in phones for phone in phones_to_be_removed]) != 0:
print('input includes phone(s) which is not defined in fame_asr.')
print('those phone(s) are removed.')
return [reduction_key.get(i, i) for i in phones
if i not in phones_to_be_removed]
phoneset_short = list(set(phone_reduction(phoneset)))
phoneset_short.sort()
## translation_key to htk format (ascii).
# phones which gives UnicodeEncodeError when phone.encode("ascii")
# are replaced with other characters.
translation_key_asr2htk = {
'': 'i_',
'': 'u_',
# on the analogy of German umlaut, 'e' is used.
'ö': 'oe', 'ö:': 'oe:', ''
'ü': 'ue', 'ü:': 'ue:',
# on the analogy of Chinese...
'ŋ': 'ng',
# refer to Xsampa.
'ɔ': 'O', 'ɔ:': 'O:', 'ɔ̈': 'Oe',
#'ɔ̈:': 'O:', # does not appear in FAME, but used in stimmen.
'ɛ': 'E', 'ɛ:': 'E:',
'ɪ': 'I', 'ɪ:': 'I:',
# it is @ in Xsampa, but that is not handy on HTK.
'ə': 'A'
}
phoneset_htk = [translation_key_asr2htk.get(i, i) for i in phoneset_short]
#not_in_ascii = [
# '\'',
# 'â', 'ê', 'ô', 'û', 'č',
# 'à', 'í', 'é', 'è', 'ú', 'ć',
# 'ä', 'ë', 'ï', 'ö', 'ü'
#]
translation_key_word2htk = {
#'\'': '\\\'',
'í':'i1', 'é':'e1', 'ú':'u1', 'ć':'c1',
'à':'a2', 'è':'e2',
'â':'a3', 'ê':'e3', 'ô':'o3', 'û':'u3',
'č':'c4',
'ä': 'ao', 'ë': 'ee', 'ï': 'ie', 'ö': 'oe', 'ü': 'ue',
}
#[translation_key_word2htk.get(i, i) for i in not_in_ascii]
#Stop: p, b, t, d, k, g
#Nasal: m, n, ng(ŋ)
#Fricative: s, z, f, v, h, x
#Liquid: l, r
#Vowel: a, a:, e:, i, i:, i_(i̯), o, o:, u, u:, u_(ṷ), oe(ö), oe:(ö:), ue(ü), ue:(ü:), O(ɔ), O:(ɔ:), Oe(ɔ̈), A(ə), E(ɛ), E:(ɛ:), I(ɪ), I:(ɪ:)
## the list of multi character phones.
# for example, the length of 'a:' is 3, but in the codes it is treated as one letter.
# original.
multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones.sort(key=len, reverse=True)
# phonset reduced.
multi_character_phones_short = [i for i in phoneset_short if len(i) > 1]
multi_character_phones_short.sort(key=len, reverse=True)
# htk compatible.
multi_character_phones_htk = [i for i in phoneset_htk if len(i) > 1]
multi_character_phones_htk.sort(key=len, reverse=True)

View File

@ -1,11 +1,11 @@
""" definition of the phones to be used. """
## phones in IPA.
phoneset_ipa = [
phoneset = [
# vowels
'',
'i̯ⁿ',
'y',
'y:', # not included in lex.ipa, but in stimmen.
'i',
'i.',
'iⁿ',
@ -14,7 +14,7 @@ phoneset_ipa = [
'ɪ',
'ɪⁿ',
'ɪ.',
#'ɪ:', # not included in lex.ipa
'ɪ:', # not included in lex.ipa, but in stimmen.
'ɪ:ⁿ',
'e',
'e:',
@ -35,7 +35,7 @@ phoneset_ipa = [
'',
'ṷ.',
'ṷⁿ',
#'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr.
#'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr. The pronunciation in Fries may be mistakes so I removed this phone.
'u',
'uⁿ',
'u.',
@ -61,7 +61,7 @@ phoneset_ipa = [
'ɔⁿ',
'ɔ:',
'ɔ:ⁿ',
#'ɔ̈', # not included in lex.ipa
'ɔ̈', # not included in lex.ipa
'ɔ̈.',
'ɔ̈:',
@ -101,7 +101,38 @@ phoneset_ipa = [
'l'
]
## reduce the number of phones.
# the phones which are used in stimmen transcription but not in FAME corpus.
# replacements are based on the advice from Jelske Dijkstra on 2018/06/21.
stimmen_replacement = {
'æ': 'ɛ',
'ø': 'ö', # or 'ö:'
'ø:': 'ö:', # Aki added.
'œ': 'ɔ̈', # or 'ɔ̈:'
'œ:': 'ɔ̈:', # Aki added.
'ɐ': 'a', # or 'a:'
'ɐ:': 'a:', # Aki added.
'ɑ': 'a', # or 'a:'
'ɑ:': 'a:', # Aki added
'ɒ': 'ɔ', # or 'ɔ:'
'ɒ:': 'ɔ:', # Aki added.
'ɾ': 'r',
'ʁ': 'r',
'ʊ': 'u',
'χ': 'x',
# aki guessed.
'ʀ': 'r',
'ɹ': 'r',
'w': 'ö'
}
phoneset.extend(list(stimmen_replacement.keys()))
def phone_reduction(phones):
return [stimmen_replacement.get(i, i) for i in phones]
## the list of multi character phones.
# for example, the length of 'i̯ⁿ' is 3, but in the codes it is treated as one letter.
multi_character_phones_ipa = [i for i in phoneset_ipa if len(i) > 1]
multi_character_phones_ipa.sort(key=len, reverse=True)
multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones.sort(key=len, reverse=True)

Binary file not shown.

View File

@ -0,0 +1,197 @@
import sys
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import fame_functions
from phoneset import fame_ipa, fame_asr
import convert_phoneset
## general
stop = 'p, b, t, d, k, g'
nasal = 'm, n, ŋ'
fricative = 's, z, f, v, h, x, j'
liquid = 'l, r'
vowel = 'a, a:, e:, i, i:, i̯, o, o:, u, u:, ṷ, ö, ö:, ü, ü:, ɔ, ɔ:, ɔ̈, ə, ɛ, ɛ:, ɪ, ɪ:'
## consonant
c_front = 'p, b, m, f, v'
c_central = 't, d, n, s, z, l, r'
c_back = 'k, g, ŋ, h, x, j'
fortis = 'p, t, k, f, s'
lenis = 'b, d, g, v, z, j'
neither_fortis_nor_lenis = 'm, n, ŋ, h, l, r, x'
coronal = 't, d, n, s, z, l, r, j'
non_coronal = 'p, b, m, k, g, ŋ, f, v, h, x'
anterior = 'p, b, m, t, d, n, f, v, s, z, l'
non_anterior = 'k, g, ŋ, h, x, j, r'
continuent = 'm, n, ŋ, f, v, s, z, h, l, r'
non_continuent = 'p, b, t, d, k, g, x, j'
strident = 's, z, j'
non_strident = 'f, v, h'
unstrident = 'p, b, t, d, m, n, ŋ, k, g, r, x'
glide = 'h, l, r'
syllabic = 'm, l, ŋ'
unvoiced = 'p, t, k, s, f, x, h'
voiced = 'b, d, g, z, v, m, n, ŋ, l, r, j'
#affricate: ???
non_affricate = 's, z, f, v'
voiced_stop = 'b, d, g'
unvoiced_stop = 'p, t, k'
front_stop = 'p, b'
central_stop = 't, d'
back_stop = 'k, g'
voiced_fricative = 'z, v'
unvoiced_fricative = 's, f'
front_fricative = 'f, v'
central_fricative = 's, z'
back_fricative = 'j'
## vowel
v_front = 'i, i:, i̯, ɪ, ɪ:, e:, ə, ɛ, ɛ:, a, a:'
v_central = 'ə, ɛ, ɛ:, a, a:'
v_back = 'u, u:, ü, ü:, ṷ, ɔ, ɔ:, ɔ̈, ö, ö:, o, o:'
long = 'a:, e:, i:, o:, u:, ö:, ü:, ɔ:, ɛ:, ɪ:'
short = 'a, i, i̯, o, u, ṷ, ö, ü, ɔ, ɔ̈, ə, ɛ, ɪ'
#Dipthong: ???
#Front-Start: ???
#Fronting: ???
high = 'i, i:, i̯, ɪ, ɪ: u, u:, ṷ, ə, e:, o, o:, ö, ö:, ü, ü:'
medium = 'e:, ə, ɛ, ɛ:, ɔ, ɔ:, ɔ̈, o, o:, ö, ö:'
low = 'a, a:, ɛ, ɛ:, ɔ, ɔ:, ɔ̈'
rounded = 'a, a:, o, o:, u, u:, ṷ, ö, ö:, ü, ü:, ɔ, ɔ:, ɔ̈'
unrounded = 'i, i:, i̯, e:, ə, ɛ, ɛ:, ɪ, ɪ:'
i_vowel = 'i, i:, i̯, ɪ, ɪ:'
e_vowel = 'e:,ə, ɛ, ɛ:'
a_vowel = 'a, a:'
o_vowel = 'o, o:, ö, ö:, ɔ, ɔ:, ɔ̈'
u_vowel = 'u, u:, ṷ, ü, ü:'
## htk phoneset
phoneset = fame_asr.phoneset_htk
## convert ipa group to htk format for quests.hed.
def _ipa2quest(R_or_L, ipa_text):
assert R_or_L in ['R', 'L'], print('the first argument should be either R or L.')
ipa_list = ipa_text.replace(' ', '').split(',')
if R_or_L == 'R':
quests_list = ['*+' + fame_functions.ipa2htk(ipa) for ipa in ipa_list]
else:
quests_list = [fame_functions.ipa2htk(ipa) + '-*' for ipa in ipa_list]
return ','.join(quests_list)
def make_quests_hed(quest_hed):
def _add_quests_item(R_or_L, item_name_, ipa_text):
assert R_or_L in ['R', 'L'], print('the first argument should be either R or L.')
item_name = R_or_L + '_' + item_name_
with open(quest_hed, 'ab') as f:
f.write(bytes('QS "' + item_name + '"\t{ ' + _ipa2quest(R_or_L, ipa_text) + ' }\n', 'ascii'))
if os.path.exists(quest_hed):
os.remove(quest_hed)
for R_or_L in ['R', 'L']:
_add_quests_item(R_or_L, 'NonBoundary', '*')
_add_quests_item(R_or_L, 'Silence', 'sil')
_add_quests_item(R_or_L, 'Stop', stop)
_add_quests_item(R_or_L, 'Nasal', nasal)
_add_quests_item(R_or_L, 'Fricative', fricative)
_add_quests_item(R_or_L, 'Liquid', liquid)
_add_quests_item(R_or_L, 'Vowel', vowel)
_add_quests_item(R_or_L, 'C-Front', c_front)
_add_quests_item(R_or_L, 'C-Central', c_central)
_add_quests_item(R_or_L, 'C-Back', c_back)
_add_quests_item(R_or_L, 'V-Front', v_front)
_add_quests_item(R_or_L, 'V-Central', v_central)
_add_quests_item(R_or_L, 'V-Back', v_back)
_add_quests_item(R_or_L, 'Front', c_front + v_front)
_add_quests_item(R_or_L, 'Central', c_central + v_central)
_add_quests_item(R_or_L, 'Back', c_front + v_back)
_add_quests_item(R_or_L, 'Fortis', fortis)
_add_quests_item(R_or_L, 'Lenis', lenis)
_add_quests_item(R_or_L, 'UnFortLenis', neither_fortis_nor_lenis)
_add_quests_item(R_or_L, 'Coronal', coronal)
_add_quests_item(R_or_L, 'NonCoronal', non_coronal)
_add_quests_item(R_or_L, 'Anterior', anterior)
_add_quests_item(R_or_L, 'NonAnterior', non_anterior)
_add_quests_item(R_or_L, 'Continuent', continuent)
_add_quests_item(R_or_L, 'NonContinuent', non_continuent)
_add_quests_item(R_or_L, 'Strident', strident)
_add_quests_item(R_or_L, 'NonStrident', non_strident)
_add_quests_item(R_or_L, 'UnStrident', unstrident)
_add_quests_item(R_or_L, 'Glide', glide)
_add_quests_item(R_or_L, 'Syllabic', syllabic)
_add_quests_item(R_or_L, 'Unvoiced-Cons', unvoiced)
_add_quests_item(R_or_L, 'Voiced-Cons', voiced)
_add_quests_item(R_or_L, 'Unvoiced-All', unvoiced + ', sil')
_add_quests_item(R_or_L, 'Long', long)
_add_quests_item(R_or_L, 'Short', short)
#_add_quests_item(R_or_L, 'Dipthong', xxx)
#_add_quests_item(R_or_L, 'Front-Start', xxx)
#_add_quests_item(R_or_L, 'Fronting', xxx)
_add_quests_item(R_or_L, 'High', high)
_add_quests_item(R_or_L, 'Medium', medium)
_add_quests_item(R_or_L, 'Low', low)
_add_quests_item(R_or_L, 'Rounded', rounded)
_add_quests_item(R_or_L, 'UnRounded', unrounded)
#_add_quests_item(R_or_L, 'Affricative', rounded)
_add_quests_item(R_or_L, 'NonAffricative', non_affricate)
_add_quests_item(R_or_L, 'IVowel', i_vowel)
_add_quests_item(R_or_L, 'EVowel', e_vowel)
_add_quests_item(R_or_L, 'AVowel', a_vowel)
_add_quests_item(R_or_L, 'OVowel', o_vowel)
_add_quests_item(R_or_L, 'UVowel', u_vowel)
_add_quests_item(R_or_L, 'Voiced-Stop', voiced_stop)
_add_quests_item(R_or_L, 'UnVoiced-Stop', unvoiced_stop)
_add_quests_item(R_or_L, 'Front-Stop', front_stop)
_add_quests_item(R_or_L, 'Central-Stop', central_stop)
_add_quests_item(R_or_L, 'Back-Stop', back_stop)
_add_quests_item(R_or_L, 'Voiced-Fric', voiced_fricative)
_add_quests_item(R_or_L, 'UnVoiced-Fric', unvoiced_fricative)
_add_quests_item(R_or_L, 'Front-Fric', front_fricative)
_add_quests_item(R_or_L, 'Central-Fric', central_fricative)
_add_quests_item(R_or_L, 'Back-Fric', back_fricative)
for p in phoneset:
_add_quests_item(R_or_L, p, p)
return

View File

@ -0,0 +1,119 @@
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import glob
import pandas as pd
import convert_xsampa2ipa
import defaultfiles as default
import fame_functions
import novoapi_functions
def _load_transcriptions():
stimmen_transcription = pd.ExcelFile(default.stimmen_transcription_xlsx)
df = pd.read_excel(stimmen_transcription, 'original')
# mapping from ipa to xsampa
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
ipas = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t').replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':').replace(' ', '')
ipas.append(ipa)
else:
ipas.append('')
df_ = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas)})
# not valid inputs, but seperator.
df_ = df_[~df_['ipa'].str.contains('/')]
return df_.dropna()
def load_transcriptions():
""" in default.stimmen_transcription_xlsx
rows of which wav files can be easily found"""
df = _load_transcriptions()
df_ = pd.DataFrame(index=[], columns=list(df.keys()))
for index, row in df.iterrows():
filename = row['filename']
if isinstance(filename, str):
wav_file = os.path.join(default.stimmen_wav_dir, filename)
if os.path.exists(wav_file):
df_ = df_.append(row, ignore_index=True)
return df_
def load_transcriptions_clean(clean_wav_dir):
df = _load_transcriptions()
wav_file_list = glob.glob(os.path.join(clean_wav_dir, '*.wav'))
df_clean = pd.DataFrame(index=[], columns=list(df.keys()))
for wav_file in wav_file_list:
filename = os.path.basename(wav_file)
df_ = df[df['filename'].str.match(filename)]
df_clean = pd.concat([df_clean, df_])
return df_clean
def load_transcriptions_novo70(clean_wav_dir):
""" extract rows of which ipa is written in novo70 phonset. """
df = load_transcriptions_clean(clean_wav_dir)
df_novo70 = pd.DataFrame(index=[], columns=list(df.keys()))
for index, row in df.iterrows():
not_in_novo70 = novoapi_functions.phones_not_in_novo70(row['ipa'])
if len(not_in_novo70) == 0:
df_novo70 = df_novo70.append(row, ignore_index=True)
return df_novo70
def add_row_htk(df):
""" df['htk'] is made from df['ipa'] and added. """
htk = []
for index, row in df.iterrows():
htk.append(fame_functions.ipa2htk(row['ipa']))
return df.assign(htk=htk)
def add_row_asr(df):
""" df['asr'] is made from df['ipa'] and added. """
asr = []
for index, row in df.iterrows():
asr.append(fame_functions.ipa2asr(row['ipa']))
return df.assign(asr=asr)
def load_pronunciations(WORD, htk_dic):
""" load pronunciation variants from HTK dic file.
Args:
WORD (str): word in capital letters.
htk_dic (path): HTK dict file.
Returns:
(pronunciations) (list): pronunciation variants of WORD.
Notes:
Because this function loads all contents from htk_dic file,
it is not recommended to use for large lexicon.
"""
with open(htk_dic) as f:
lines = f.read().replace(' sil', '')
lines = lines.split('\n')
return [' '.join(line.split(' ')[1:])
for line in lines if line.split(' ')[0]==WORD]

View File

@ -0,0 +1,93 @@
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
import shutil
from collections import Counter
import numpy as np
import pandas as pd
import defaultfiles as default
import convert_xsampa2ipa
import stimmen_functions
import fame_functions
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
## ======================= user define =======================
## ======================= make test data ======================
stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
## copy wav files which is in the stimmen data.
df = stimmen_functions.load_transcriptions()
#for index, row in df.iterrows():
# filename = row['filename']
# wav_file = os.path.join(default.stimmen_wav_dir, filename)
# shutil.copy(wav_file, os.path.join(stimmen_test_dir, filename))
# after manually removed files which has too much noise and multiple words...
# update the info.
df_clean = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
# count how many files are removed due to the quality.
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
for word in word_list:
df_ = df[df['word']==word]
df_clean_ = df_clean[df_clean['word']==word]
print('word {0} has {1} clean files among {2} files ({3:.2f} [%]).'.format(
word, len(df_clean_), len(df_), len(df_clean_)/len(df_)*100))
## check phones included in stimmen but not in FAME!
splitted_ipas = [' '.join(
convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones))
for ipa in df['ipa']]
stimmen_phones = set(' '.join(splitted_ipas))
stimmen_phones = list(stimmen_phones)
fame_phones = fame_ipa.phoneset
stimmen_phones.sort()
fame_phones.sort()
print('phones which are used in stimmen transcription but not in FAME corpus are:\n{}'.format(
set(stimmen_phones) - set(fame_phones)
))
for ipa in df['ipa']:
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
if ':' in ipa_splitted:
print(ipa_splitted)
## check pronunciation variants
df_clean = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
df_clean = stimmen_functions.add_row_asr(df_clean)
df_clean = stimmen_functions.add_row_htk(df_clean)
for word in word_list:
#word = word_list[1]
df_ = df_clean[df_clean['word']==word]
c = Counter(df_['htk'])
pronunciations = dict()
for key, value in zip(c.keys(), c.values()):
if value > 3:
pronunciations[key] = value
print(pronunciations)
monophone_mlf = os.path.join(default.htk_dir, 'label', 'train_phone_aligned.mlf')
triphone_mlf = os.path.join(default.htk_dir, 'label', 'train_triphone.mlf')
def filenames_in_mlf(file_mlf):
with open(file_mlf) as f:
lines_ = f.read().split('\n')
lines = [line for line in lines_ if len(line.split(' ')) == 1 and line != '.']
filenames = [line.replace('"', '').replace('*/', '') for line in lines[1:-1]]
return filenames
filenames_mono = filenames_in_mlf(monophone_mlf)
filenames_tri = filenames_in_mlf(triphone_mlf)