Compare commits

..

18 Commits

Author SHA1 Message Date
97486e5599 dataset for experiments in check_novoapi is updated. 2019-04-22 02:03:50 +02:00
2004399179 novoapi_functions.py is adjusted to use convert_phoneset.py. 2019-04-22 00:59:53 +02:00
b444b70af9 fame_phonetics.py and functions to make quests.hed to tie triphone are added. 2019-03-25 00:06:53 +01:00
bf586fcde5 triphone training is added. 2019-03-23 21:52:48 +01:00
fdd165ce6a re-aligned mlf file include less files than original mlf file. Therefore the scp file should also be updated accordingly, when re-estimation is performed. this bug is fixed. 2019-03-08 23:13:08 +01:00
fa81b70b27 monophone training is completed. 2019-03-07 22:16:50 +01:00
41d4fa5ff9 sp is added to the model. 2019-03-05 00:11:38 +01:00
b1b1942fa0 test on stimmen data is added. 2019-03-03 02:05:37 +01:00
c185072d5b label alignment using HVite is added. 2019-02-14 00:21:28 +01:00
8f89f60538 dataset is made. 2019-02-08 14:10:32 +01:00
f6e563ecd3 moved testing parts in htk_vs_kaldi into stimmen_test.py 2019-02-06 09:35:23 +01:00
da0242b0e1 make sure all the phones in stimmen transcription can be treated correctly. 2019-02-06 00:00:14 +01:00
ab3887c6ca sp is added to the model. 2019-02-04 20:32:12 +01:00
f6e7c8eefa bug related encoding on label file is fixed. 2019-02-04 13:46:27 +01:00
322a8a0079 label files are extracted. hcompv_scp is made. 2019-02-03 13:54:37 +01:00
22cccfb61d fix the bug there are characters in the lexicon which cannot be described in ascii. 2019-02-03 00:34:35 +01:00
dc6b7b84b6 lexicon is made. 2019-01-29 21:52:11 +01:00
8cda93de75 fame_asr phoneset is added including reduced version and htk compatible version. 2019-01-28 12:34:20 +01:00
21 changed files with 2158 additions and 1033 deletions

Binary file not shown.

View File

@ -4,8 +4,7 @@
<SchemaVersion>2.0</SchemaVersion> <SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>4d8c8573-32f0-4a62-9e62-3ce5cc680390</ProjectGuid> <ProjectGuid>4d8c8573-32f0-4a62-9e62-3ce5cc680390</ProjectGuid>
<ProjectHome>.</ProjectHome> <ProjectHome>.</ProjectHome>
<StartupFile> <StartupFile>check_novoapi.py</StartupFile>
</StartupFile>
<SearchPath> <SearchPath>
</SearchPath> </SearchPath>
<WorkingDirectory>.</WorkingDirectory> <WorkingDirectory>.</WorkingDirectory>
@ -23,7 +22,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Compile Include="check_novoapi.py" /> <Compile Include="check_novoapi.py" />
<Compile Include="convert_phone_set.py"> <Compile Include="convert_phoneset.py">
<SubType>Code</SubType> <SubType>Code</SubType>
</Compile> </Compile>
<Compile Include="convert_xsampa2ipa.py"> <Compile Include="convert_xsampa2ipa.py">
@ -32,7 +31,7 @@
<Compile Include="defaultfiles.py"> <Compile Include="defaultfiles.py">
<SubType>Code</SubType> <SubType>Code</SubType>
</Compile> </Compile>
<Compile Include="fame_phoneset.py"> <Compile Include="fame_test.py">
<SubType>Code</SubType> <SubType>Code</SubType>
</Compile> </Compile>
<Compile Include="fa_test.py"> <Compile Include="fa_test.py">
@ -50,9 +49,25 @@
<SubType>Code</SubType> <SubType>Code</SubType>
</Compile> </Compile>
<Compile Include="fame_hmm.py" /> <Compile Include="fame_hmm.py" />
<Compile Include="phoneset\fame_asr.py" />
<Compile Include="phoneset\fame_ipa.py" />
<Compile Include="phoneset\fame_phonetics.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="stimmen_functions.py" />
<Compile Include="stimmen_test.py" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Content Include="config.ini" /> <Content Include="config.ini" />
<Content Include="phoneset\fame_ipa2asr.npy" />
<Content Include="phoneset\output_get_translation_key_phone_unknown.npy" />
<Content Include="phoneset\output_get_translation_key_translation_key.npy" />
<Content Include="phoneset\__pycache__\fame_asr.cpython-36.pyc" />
<Content Include="phoneset\__pycache__\fame_ipa.cpython-36.pyc" />
</ItemGroup>
<ItemGroup>
<Folder Include="phoneset\" />
<Folder Include="phoneset\__pycache__\" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" /> <Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
<!-- Uncomment the CoreCompile target to enable the Build command in <!-- Uncomment the CoreCompile target to enable the Build command in

View File

@ -20,57 +20,56 @@ from forced_alignment import convert_phone_set
#import acoustic_model_functions as am_func #import acoustic_model_functions as am_func
import convert_xsampa2ipa import convert_xsampa2ipa
import novoapi_functions import novoapi_functions
import stimmen_functions
sys.path.append(default.accent_classification_dir) sys.path.append(default.accent_classification_dir)
import output_confusion_matrix import output_confusion_matrix
## procedure ## procedure
forced_alignment_novo70 = True forced_alignment_novo70 = True
balance_sample_numbers = False
## ===== load novo phoneset ===== ## ===== load novo phoneset =====
phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = novoapi_functions.load_phonset() phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = novoapi_functions.load_novo70_phoneset()
## ===== extract pronunciations written in novo70 only (not_in_novo70) ===== ## ===== extract pronunciations written in novo70 only (not_in_novo70) =====
# As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr
# [ɪː] ih / ihr
# [iː] iy
# [œː] uh
# [ɛː] eh
# [w] wv in IPA written as ʋ.
david_suggestion = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'w']
## read pronunciation variants. ## read pronunciation variants.
stimmen_transcription_ = pd.ExcelFile(default.stimmen_transcription_xlsx) #stimmen_transcription_ = pd.ExcelFile(default.stimmen_transcription_xlsx)
df = pd.read_excel(stimmen_transcription_, 'frequency') #df = pd.read_excel(stimmen_transcription_, 'frequency')
transcription_ipa = list(df['IPA']) #transcription_ipa = list(df['IPA'])
# transcription mistake?
transcription_ipa = [ipa.replace(';', 'ː') for ipa in transcription_ipa if not ipa=='pypɪl' and not pd.isnull(ipa)]
transcription_ipa = [ipa.replace('ˑ', '') for ipa in transcription_ipa] # only one case.
not_in_novo70 = [] stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
all_in_novo70 = [] df = stimmen_functions.load_transcriptions_novo70(stimmen_test_dir)
for ipa in transcription_ipa:
ipa = ipa.replace(':', 'ː')
ipa = convert_phone_set.split_ipa(ipa)
# list of phones not in novo70 phoneset.
not_in_novo70_ = [phone for phone in ipa
if not phone in phoneset_ipa and not phone in david_suggestion]
not_in_novo70_ = [phone.replace('sp', '') for phone in not_in_novo70_]
not_in_novo70_ = [phone.replace(':', '') for phone in not_in_novo70_]
not_in_novo70_ = [phone.replace('ː', '') for phone in not_in_novo70_]
if len(not_in_novo70_) == 0:
all_in_novo70.append(''.join(ipa))
#translation_key.get(phone, phone) ## transcription mistake?
not_in_novo70.extend(not_in_novo70_) #transcription_ipa = [ipa.replace(';', 'ː') for ipa in transcription_ipa if not ipa=='pypɪl' and not pd.isnull(ipa)]
not_in_novo70_list = list(set(not_in_novo70)) #transcription_ipa = [ipa.replace('ˑ', '') for ipa in transcription_ipa] # only one case.
#not_in_novo70 = []
#all_in_novo70 = []
#for ipa in transcription_ipa:
# ipa = ipa.replace(':', 'ː')
# ipa = convert_phone_set.split_ipa(ipa)
# # list of phones not in novo70 phoneset.
# not_in_novo70_ = [phone for phone in ipa
# if not phone in phoneset_ipa and not phone in david_suggestion]
# not_in_novo70_ = [phone.replace('sp', '') for phone in not_in_novo70_]
# not_in_novo70_ = [phone.replace(':', '') for phone in not_in_novo70_]
# not_in_novo70_ = [phone.replace('ː', '') for phone in not_in_novo70_]
# if len(not_in_novo70_) == 0:
# all_in_novo70.append(''.join(ipa))
# #translation_key.get(phone, phone)
# not_in_novo70.extend(not_in_novo70_)
#not_in_novo70_list = list(set(not_in_novo70))
## check which phones used in stimmen but not in novo70 ## check which phones used in stimmen but not in novo70
@ -85,70 +84,43 @@ not_in_novo70_list = list(set(not_in_novo70))
# [ʊ] 'ʊ'(1) --> can be ʏ (uh)?? # [ʊ] 'ʊ'(1) --> can be ʏ (uh)??
# [χ] --> can be x?? # [χ] --> can be x??
def search_phone_ipa(x, phone_list): #def search_phone_ipa(x, phone_list):
x_in_item = [] # x_in_item = []
for ipa in phone_list: # for ipa in phone_list:
ipa_original = ipa # ipa_original = ipa
ipa = ipa.replace(':', 'ː') # ipa = ipa.replace(':', 'ː')
ipa = convert_phone_set.split_ipa(ipa) # ipa = convert_phone_set.split_ipa(ipa)
if x in ipa and not x+':' in ipa: # if x in ipa and not x+':' in ipa:
x_in_item.append(ipa_original) # x_in_item.append(ipa_original)
return x_in_item # return x_in_item
#search_phone_ipa('ø', transcription_ipa) #search_phone_ipa('ø', transcription_ipa)
## ===== load all transcriptions (df) ===== ## ===== load all transcriptions (df) =====
df = pd.read_excel(stimmen_transcription_, 'original') #df = stimmen_functions.load_transcriptions()
# mapping from ipa to xsampa
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
else:
ipas.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas)})
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)] word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list) word_list = sorted(word_list)
## check frequency of each pronunciation variants ## check frequency of each pronunciation variants
cols = ['word', 'ipa', 'frequency'] #cols = ['word', 'ipa', 'frequency']
df_samples = pd.DataFrame(index=[], columns=cols) #df_samples = pd.DataFrame(index=[], columns=cols)
for ipa in all_in_novo70: #for ipa in all_in_novo70:
ipa = ipa.replace('ː', ':') # ipa = ipa.replace('ː', ':')
samples = df[df['ipa'] == ipa] # samples = df[df['ipa'] == ipa]
word = list(set(samples['word']))[0] # word = list(set(samples['word']))[0]
samples_Series = pd.Series([word, ipa, len(samples)], index=df_samples.columns) # samples_Series = pd.Series([word, ipa, len(samples)], index=df_samples.columns)
df_samples = df_samples.append(samples_Series, ignore_index=True) # df_samples = df_samples.append(samples_Series, ignore_index=True)
# each word # each word
df_per_word = pd.DataFrame(index=[], columns=df_samples.keys()) #df_per_word = pd.DataFrame(index=[], columns=df_samples.keys())
for word in word_list: #for word in word_list:
df_samples_ = df_samples[df_samples['word']==word] word = word_list[2]
df_samples_ = df_samples_[df_samples_['frequency']>2] df_ = df[df['word']==word]
df_per_word = df_per_word.append(df_samples_, ignore_index=True) np.unique(list(df_['ipa']))
#df_samples_ = df_samples_[df_samples_['frequency']>2]
#df_per_word = df_per_word.append(df_samples_, ignore_index=True)
#df_per_word.to_excel(os.path.join(default.stimmen_dir, 'pronunciation_variants_novo70.xlsx'), encoding="utf-8") #df_per_word.to_excel(os.path.join(default.stimmen_dir, 'pronunciation_variants_novo70.xlsx'), encoding="utf-8")
@ -184,21 +156,6 @@ if forced_alignment_novo70:
# samples in which all pronunciations are written in novo70. # samples in which all pronunciations are written in novo70.
samples = df_.query("ipa in @pronunciation_ipa") samples = df_.query("ipa in @pronunciation_ipa")
## ===== balance sample numbers =====
if balance_sample_numbers:
c = Counter(samples['ipa'])
sample_num_list = [c[key] for key in c.keys()]
sample_num = np.min(sample_num_list)
samples_balanced = pd.DataFrame(index=[], columns=list(samples.keys()))
for key in c.keys():
samples_ = samples[samples['ipa'] == key]
samples_balanced = samples_balanced.append(samples_.sample(sample_num), ignore_index = True)
samples = samples_balanced
results = pd.DataFrame(index=[], results = pd.DataFrame(index=[],
columns=['filename', 'word', 'xsampa', 'ipa', 'result_ipa', 'result_novo70', 'llh']) columns=['filename', 'word', 'xsampa', 'ipa', 'result_ipa', 'result_novo70', 'llh'])

View File

@ -1,29 +0,0 @@
"""Module to convert phonemes."""
def multi_character_tokenize(line, multi_character_tokens):
"""Tries to match one of the tokens in multi_character_tokens at each position of line, starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_word(word, multi_character_phones):
"""
split a line by given phoneset.
Args:
word (str): a word written in given phoneset.
multi_character_phones (list): the list of multicharacter phones which is considered as one phone. this can be obtained with phoneset definition such as fame_phoneset.py.
Returns:
(word_seperated) (list): the word splitted in given phoneset.
"""
return [phone for phone in multi_character_tokenize(word.strip(), multi_character_phones)]

View File

@ -0,0 +1,58 @@
"""Module to convert phonemes."""
def multi_character_tokenize(line, multi_character_tokens):
"""Tries to match one of the tokens in multi_character_tokens at each position of line, starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_word(word, phoneset):
"""
split a line by given phoneset.
Args:
word (str): a word written in given phoneset.
#multi_character_phones (list): the list of multicharacter phones which is considered as one phone. this can be obtained with phoneset definition such as fame_ipa.py.
phoneset (list): the list of phones.
Returns:
(word_seperated) (list): the word splitted in given phoneset.
"""
multi_character_phones = extract_multi_character_phones(phoneset)
return [phone
for phone in multi_character_tokenize(word.strip(), multi_character_phones)
]
def convert_phoneset(word_list, translation_key):
"""
Args:
word_list (str): a list of phones written in given phoneset.
translation_key (dict):
"""
return [translation_key.get(phone, phone) for phone in word_list]
def phone_reduction(phones, reduction_key):
multi_character_tokenize(wo.strip(), multi_character_phones)
return [reduction_key.get(i, i) for i in phones
if not i in phones_to_be_removed]
def extract_multi_character_phones(phoneset):
"""
Args:
phoneset (list):
"""
multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones.sort(key=len, reverse=True)
return multi_character_phones

View File

@ -1,65 +1,42 @@
import os import os
# add path of the parent directory
#os.path.dirname(os.path.realpath(__file__))
#default_hvite_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', 'htk', 'config.HVite') # repos
#cygwin_dir = r'C:\cygwin64\home\Aki\acoustic_model'
#htk_dir = r'C:\Aki\htk_fame'
htk_dir = r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk'
config_hcopy = os.path.join(htk_dir, 'config', 'config.HCopy')
#config_train = os.path.join(cygwin_dir, 'config', 'config.train')
#config_hvite = os.path.join(cygwin_dir, 'config', 'config.HVite')
#mkhmmdefs_pl = os.path.join(cygwin_dir, 'src', 'acoustic_model', 'mkhmmdefs.pl')
#dbLexicon = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\lexicon.accdb
#scriptBarbara = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\pronvars_barbara.perl
#exeG2P = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\string2phon.exe
#[pyHTK]
#configHVite = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\config.HVite
#filePhoneList = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\phonelist_barbara.txt
#AcousticModel = C:\\Users\\Aki\\source\\repos\\rug_VS\\forced_alignment\\config\\hmmdefs_16-2_barbara.compo
#dbLexicon = config['cLexicon']['dbLexicon']
#scriptBarbara = config['cLexicon']['scriptBarbara']
#exeG2P = config['cLexicon']['exeG2P']
#configHVite = config['pyHTK']['configHVite']
#filePhoneList = config['pyHTK']['filePhoneList']
#AcousticModel = config['pyHTK']['AcousticModel']
repo_dir = r'C:\Users\Aki\source\repos' repo_dir = r'C:\Users\Aki\source\repos'
ipa_xsampa_converter_dir = os.path.join(repo_dir, 'ipa-xsama-converter') ipa_xsampa_converter_dir = os.path.join(repo_dir, 'ipa-xsama-converter')
forced_alignment_module_dir = os.path.join(repo_dir, 'forced_alignment') forced_alignment_module_dir = os.path.join(repo_dir, 'forced_alignment')
accent_classification_dir = os.path.join(repo_dir, 'accent_classification', 'accent_classification') accent_classification_dir = os.path.join(repo_dir, 'accent_classification', 'accent_classification')
toolbox_dir = os.path.join(repo_dir, 'toolbox') toolbox_dir = os.path.join(repo_dir, 'toolbox')
#htk_config_dir = r'c:\Users\A.Kunikoshi\source\repos\forced_alignment\forced_alignment\data\htk\preset_models\aki_dutch_2017'
#config_hvite = os.path.join(htk_config_dir, 'config.HVite')
#acoustic_model = os.path.join(htk_config_dir, 'hmmdefs.compo')
#acoustic_model = r'c:\cygwin64\home\A.Kunikoshi\acoustic_model\model\barbara\hmm128-2\hmmdefs.compo'
#phonelist_txt = os.path.join(htk_config_dir, 'phonelist.txt')
WSL_dir = r'C:\OneDrive\WSL' WSL_dir = r'C:\OneDrive\WSL'
#fame_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', 'fame') novo_api_dir = os.path.join(WSL_dir, 'python-novo-api', 'novoapi')
fame_dir = r'd:\_corpus\fame' #novo_api_dir = r'c:\Python36-32\Lib\site-packages\novoapi'
fame_s5_dir = os.path.join(fame_dir, 's5') # working directories
fame_corpus_dir = os.path.join(fame_dir, 'corpus') rug_dir = r'c:\OneDrive\Research\rug'
experiments_dir = os.path.join(rug_dir, 'experiments')
experiments_dir = r'c:\OneDrive\Research\rug\experiments' htk_dir = os.path.join(experiments_dir, 'acoustic_model', 'fame', 'htk')
kaldi_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', '_stimmen')
stimmen_dir = os.path.join(experiments_dir, 'stimmen') stimmen_dir = os.path.join(experiments_dir, 'stimmen')
stimmen_data_dir = os.path.join(stimmen_dir, 'data')
# data
fame_dir = os.path.join(rug_dir, '_data', 'FAME')
#fame_dir = os.path.join(WSL_dir, 'kaldi-trunk', 'egs', 'fame')
# 44.1 kHz # 44.1 kHz
#stimmen_wav_dir = os.path.join(stimmen_dir, 'wav') #stimmen_wav_dir = os.path.join(stimmen_dir, 'wav')
# 16 kHz # 16 kHz
stimmen_wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' stimmen_wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen'
stimmen_result_novoapi_dir = os.path.join(stimmen_dir, 'result', 'novoapi') stimmen_transcription_xlsx = os.path.join(stimmen_dir, 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
stimmen_transcription_xlsx = os.path.join(stimmen_data_dir, 'Frisian Variants Picture Task Stimmen.xlsx')
phonelist_friesian_txt = os.path.join(experiments_dir, 'friesian', 'acoustic_model', 'config', 'phonelist_friesian.txt') phonelist_friesian_txt = os.path.join(experiments_dir, 'friesian', 'acoustic_model', 'config', 'phonelist_friesian.txt')
novo_api_dir = os.path.join(WSL_dir, 'python-novo-api', 'novoapi')
#novo_api_dir = r'c:\Python36-32\Lib\site-packages\novoapi'
novo70_phoneset = os.path.join(novo_api_dir, 'asr', 'phoneset', 'nl', 'novo70.phoneset') novo70_phoneset = os.path.join(novo_api_dir, 'asr', 'phoneset', 'nl', 'novo70.phoneset')
#phonelist_txt = os.path.join(htk_dir, 'config', 'phonelist.txt')
#fame_s5_dir = os.path.join(fame_dir, 's5')
#fame_corpus_dir = os.path.join(fame_dir, 'corpus')
#stimmen_result_novoapi_dir = os.path.join(stimmen_dir, 'result', 'novoapi')
# novoapi_functions

View File

@ -9,37 +9,11 @@ import numpy as np
import pandas as pd import pandas as pd
import defaultfiles as default import defaultfiles as default
import fame_phoneset import convert_phoneset
import convert_phone_set from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
#def ipa2famehtk_lexicon(lexicon_file_in, lexicon_file_out): from htk import pyhtk
# """ Convert a lexicon file from IPA to HTK format for FAME! corpus. """
# lexicon_in = pd.read_table(lexicon_file_in, names=['word', 'pronunciation'])
# with open(lexicon_file_out, "w", encoding="utf-8") as fout:
# for word, pronunciation in zip(lexicon_in['word'], lexicon_in['pronunciation']):
# pronunciation_no_space = pronunciation.replace(' ', '')
# pronunciation_famehtk = convert_phone_set.ipa2famehtk(pronunciation_no_space)
# if 'ceh' not in pronunciation_famehtk and 'sh' not in pronunciation_famehtk:
# fout.write("{0}\t{1}\n".format(word.upper(), pronunciation_famehtk))
#def combine_lexicon(lexicon_file1, lexicon_file2, lexicon_out):
# """ Combine two lexicon files and sort by words. """
# with open(lexicon_file1, "rt", encoding="utf-8") as fin:
# lines1 = fin.read()
# lines1 = lines1.split('\n')
# with open(lexicon_file2, "rt", encoding="utf-8") as fin:
# lines2 = fin.read()
# lines2 = lines2.split('\n')
# lex1 = pd.read_table(lexicon_file1, names=['word', 'pronunciation'])
# lex2 = pd.read_table(lexicon_file2, names=['word', 'pronunciation'])
# lex = pd.concat([lex1, lex2])
# lex = lex.sort_values(by='word', ascending=True)
# lex.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep='\t')
#def read_fileFA(fileFA): #def read_fileFA(fileFA):
@ -110,14 +84,6 @@ import convert_phone_set
# return ipa # return ipa
#def make_filelist(input_dir, output_txt):
# """ Make a list of files in the input_dir. """
# filenames = os.listdir(input_dir)
# with open(output_txt, 'w') as fout:
# for filename in filenames:
# fout.write(input_dir + '\\' + filename + '\n')
#def make_htk_dict(word, pronvar_, fileDic, output_type): #def make_htk_dict(word, pronvar_, fileDic, output_type):
# """ # """
@ -179,10 +145,11 @@ def make_hcopy_scp_from_filelist_in_fame(fame_dir, dataset, feature_dir, hcopy_s
fout.write(wav_file + '\t' + mfc_file + '\n') fout.write(wav_file + '\t' + mfc_file + '\n')
return
def load_lexicon(lexicon_file): def load_lexicon(lexicon_file):
""" load lexicon file as Data Frame. """ load lexicon file as data frame.
Args: Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'. lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
@ -196,25 +163,27 @@ def load_lexicon(lexicon_file):
return lex return lex
def get_phoneset_from_lexicon(lexicon_file, phoneset='asr'): def get_phoneset_from_lexicon(lexicon_file, phoneset_name='asr'):
""" Make a list of phones which appears in the lexicon. """ Make a list of phones which appears in the lexicon.
Args: Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'. lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phoneset (str): the phoneset with which lexicon_file is written. 'asr'(default) or 'ipa'. phoneset_name (str): the name of phoneset with which lexicon_file is written. 'asr'(default) or 'ipa'.
Returns: Returns:
(list_of_phones) (set): the set of phones included in the lexicon_file. (list_of_phones) (set): the set of phones included in the lexicon_file.
""" """
assert phoneset in ['asr', 'ipa'], 'phoneset should be \'asr\' or \'ipa\'' assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file) lex = load_lexicon(lexicon_file)
if phoneset == 'asr': if phoneset_name == 'asr':
return set(' '.join(lex['pronunciation']).split(' ')) return set(' '.join(lex['pronunciation']).split(' '))
elif phoneset == 'ipa': elif phoneset_name == 'ipa':
join_pronunciations = ''.join(lex['pronunciation']) join_pronunciations = ''.join(lex['pronunciation'])
return set(convert_phone_set.split_word(join_pronunciations, fame_phoneset.multi_character_phones_ipa)) return set(convert_phone_set.split_word(join_pronunciations, fame_ipa.multi_character_phones))
return
def extract_unknown_phones(ipa, known_phones): def extract_unknown_phones(ipa, known_phones):
@ -228,7 +197,7 @@ def extract_unknown_phones(ipa, known_phones):
(list_of_phones) (list): unknown phones not included in 'known_phones'. (list_of_phones) (list): unknown phones not included in 'known_phones'.
""" """
ipa_split = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa) ipa_split = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
return [i for i in ipa_split if not i in known_phones] return [i for i in ipa_split if not i in known_phones]
@ -247,14 +216,14 @@ def get_translation_key(lexicon_file_ipa, lexicon_file_asr):
""" """
lex_ipa = load_lexicon(lexicon_file_ipa) lex_ipa = load_lexicon(lexicon_file_ipa)
lex_asr = load_lexicon(lexicon_file_asr) lex_asr = load_lexicon(lexicon_file_asr)
phone_unknown = fame_phoneset.phoneset_ipa[:] phone_unknown = fame_ipa.phoneset[:]
translation_key = dict() translation_key = dict()
for word in lex_ipa['word']: for word in lex_ipa['word']:
if np.sum(lex_ipa['word'] == word) == 1 and np.sum(lex_asr['word'] == word) == 1: if np.sum(lex_ipa['word'] == word) == 1 and np.sum(lex_asr['word'] == word) == 1:
ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1] ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
asr = lex_asr[lex_asr['word'] == word].iat[0, 1] asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
ipa_list = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa) ipa_list = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
asr_list = asr.split(' ') asr_list = asr.split(' ')
# if there are phones which is not in phone_unknown # if there are phones which is not in phone_unknown
@ -268,13 +237,13 @@ def get_translation_key(lexicon_file_ipa, lexicon_file_asr):
return translation_key, list(phone_unknown) return translation_key, list(phone_unknown)
def find_phone(lexicon_file, phone, phoneset='ipa'): def find_phone(lexicon_file, phone, phoneset_name='ipa'):
""" extract rows where the phone is used in the lexicon_file. """ extract rows where the phone is used in the lexicon_file.
Args: Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'. lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phone (str): the phone to be searched. phone (str): the phone to be searched.
phoneset (str): the phoneset with which lexicon_file is written. 'asr' or 'ipa'(default). phoneset_name (str): the name of phoneset_name with which lexicon_file is written. 'asr' or 'ipa'(default).
Returns: Returns:
extracted (df): rows where the phone is used. extracted (df): rows where the phone is used.
@ -283,7 +252,7 @@ def find_phone(lexicon_file, phone, phoneset='ipa'):
* develop when the phonset == 'asr'. * develop when the phonset == 'asr'.
""" """
assert phoneset in ['asr', 'ipa'], 'phoneset should be \'asr\' or \'ipa\'' assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file) lex = load_lexicon(lexicon_file)
@ -292,9 +261,146 @@ def find_phone(lexicon_file, phone, phoneset='ipa'):
extracted = pd.DataFrame(index=[], columns=['word', 'pronunciation']) extracted = pd.DataFrame(index=[], columns=['word', 'pronunciation'])
for index, row in lex_.iterrows(): for index, row in lex_.iterrows():
if phoneset == 'ipa': if phoneset_name == 'ipa':
pronunciation = convert_phone_set.split_word(row['pronunciation'], fame_phoneset.multi_character_phones_ipa) pronunciation = convert_phone_set.split_word(row['pronunciation'], fame_ipa.multi_character_phones)
if phone in pronunciation: if phone in pronunciation:
extracted_ = pd.Series([row['word'], pronunciation], index=extracted.columns) extracted_ = pd.Series([row['word'], pronunciation], index=extracted.columns)
extracted = extracted.append(extracted_, ignore_index=True) extracted = extracted.append(extracted_, ignore_index=True)
return extracted return extracted
def asr2htk_space_delimited(pronunciation):
"""convert phoneset from asr to htk.
Args:
pronunciation (str): space delimited asr phones.
Returns:
(pronunciation) (str): space delimited asr phones in htk format (ascii).
"""
pronunciation_short = [fame_asr.reduction_key.get(i, i) for i in pronunciation.split(' ')
if not i in fame_asr.phones_to_be_removed]
return ' '.join(convert_phoneset.convert_phoneset(
pronunciation_short, fame_asr.translation_key_asr2htk))
def lexicon_asr2htk(lexicon_file_asr, lexicon_file_htk):
""" Convert a lexicon file from asr to htk format (ascii).
Args:
lexicon_file_asr (path): a lexicon file written in asr format e.g. fame/lex.asr.
lexicon_file_htk (path): a lexicon file written in htk format (ascii).
"""
lex_asr = load_lexicon(lexicon_file_asr)
def word2htk_(row):
return word2htk(row['word'])
def asr2htk_space_delimited_(row):
return asr2htk_space_delimited(row['pronunciation'])
lex_htk = pd.DataFrame({
'word': lex_asr.apply(word2htk_, axis=1).str.upper(),
'pronunciation': lex_asr.apply(asr2htk_space_delimited_, axis=1)
})
lex_htk = lex_htk.ix[:, ['word', 'pronunciation']]
lex_htk.to_csv(lexicon_file_htk, header=None, index=None, sep='\t', encoding='utf-8')
return
def combine_lexicon(lexicon_file1, lexicon_file2, lexicon_out):
""" Combine two lexicon files and sort by words.
Args:
lexicon_file1, lexicon_file2 (path): input lexicon files.
Returns:
lexicon_file_out (path): lexicon_file which lexcion_file1 and 2 are combined and sorted.
"""
lex1 = load_lexicon(lexicon_file1)
lex2 = load_lexicon(lexicon_file2)
lex = pd.concat([lex1, lex2])
lex = lex.sort_values(by='word', ascending=True)
lex.to_csv(lexicon_out, index=False, header=False, sep='\t', encoding='utf-8')
def fix_lexicon(lexicon_file):
""" fix lexicon
- add '\' before all single quote at the beginning of words.
- convert special characters to ascii compatible characters.
- add silence.
Args:
lexicon_file (path): lexicon file, which will be overwitten.
"""
lex = load_lexicon(lexicon_file)
lex = lex.dropna() # remove N/A.
# add 'sil'
row = pd.Series(['SILENCE', 'sil'], index=lex.columns)
lex = lex.append(row, ignore_index=True)
lex = lex.sort_values(by='word', ascending=True)
for i in lex[lex['word'].str.startswith('\'')].index.values:
lex.iat[i, 0] = lex.iat[i, 0].replace('\'', '\\\'')
# to_csv does not work with space seperator. therefore all tabs should manually be replaced.
#lex.to_csv(lexicon_file, index=False, header=False, encoding="utf-8", sep=' ', quoting=csv.QUOTE_NONE, escapechar='\\')
lex.to_csv(lexicon_file, index=False, header=False, sep='\t', encoding='utf-8')
return
def word2htk(word):
return ''.join([fame_asr.translation_key_word2htk.get(i, i) for i in word])
def ipa2asr(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#ipa_ = fame_asr.phone_reduction(ipa)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
return ''.join(asr_splitted)
def ipa2htk(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#translation_key_ipa2asr = np.load(r'c:\Users\Aki\source\repos\acoustic_model\acoustic_model\phoneset\fame_ipa2asr.npy').item(0)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
htk_splitted = convert_phoneset.convert_phoneset(asr_splitted, fame_asr.translation_key_asr2htk)
return ''.join(htk_splitted)
def performance_on_stimmen(config_dir, stimmen_dir, hmmdefs):
lattice_file = os.path.join(stimmen_dir, 'word_lattice.ltc')
hvite_scp = os.path.join(stimmen_dir, 'hvite.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hvite_scp, file_type='mfc')
hresult_scp = os.path.join(stimmen_dir, 'hresult.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hresult_scp, file_type='rec')
lexicon_file = os.path.join(stimmen_dir, 'lexicon_recognition.dic')
# get feature_size from hmmdefs.
with open(hmmdefs) as f:
line = f.readline()
line = f.readline().strip()
feature_size = int(line.split(' ')[2])
chtk = pyhtk.HTK(config_dir, fame_asr.phoneset_htk, lexicon_file, feature_size)
result = chtk.recognition(
lattice_file,
hmmdefs,
hvite_scp
)
per_sentence, per_word = chtk.calc_recognition_performance(hresult_scp)
return per_sentence['accuracy']

View File

@ -3,376 +3,564 @@ import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model') os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import tempfile import tempfile
#import configparser import shutil
#import subprocess import glob
#from collections import Counter
import time import time
#import numpy as np import numpy as np
#import pandas as pd import pandas as pd
import fame_functions import fame_functions
from phoneset import fame_ipa, fame_asr, fame_phonetics
import defaultfiles as default import defaultfiles as default
sys.path.append(default.toolbox_dir) sys.path.append(default.toolbox_dir)
import file_handling as fh import file_handling as fh
from htk import pyhtk from htk import pyhtk
#from scripts import run_command
## ======================= user define ======================= ## ======================= user define =======================
#repo_dir = 'C:\\Users\\Aki\\source\\repos\\acoustic_model'
#curr_dir = repo_dir + '\\acoustic_model'
#config_ini = curr_dir + '\\config.ini'
#output_dir = 'C:\\OneDrive\\Research\\rug\\experiments\\friesian\\acoustic_model'
#forced_alignment_module = 'C:\\Users\\Aki\\source\\repos\\forced_alignment'
dataset_list = ['devel', 'test', 'train']
# procedure # procedure
combine_all = 1
make_lexicon = 0
make_label = 0 # it takes roughly 4800 sec on Surface pro 2.
make_mlf = 0
extract_features = 0 extract_features = 0
conv_lexicon = 1 flat_start = 1
#check_lexicon = 0 train_monophone_without_sp = 1
#make_mlf = 0 add_sp = 1
#combine_files = 0 train_monophone_with_re_aligned_mlf = 1
#flat_start = 0 increase_mixture = 1
#train_model = 1 train_triphone = 0
train_triphone_tied = 0
#sys.path.append(os.path.join(os.path.dirname(sys.path[0]), curr_dir)) # pre-defined values.
#sys.path.append(forced_alignment_module) dataset_list = ['devel', 'test', 'train']
#from forced_alignment import convert_phone_set feature_size = 30
improvement_threshold = 0.3
lexicon_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
lexicon_oov = os.path.join(default.fame_dir, 'lexicon', 'lex.oov')
config_dir = os.path.join(default.htk_dir, 'config')
phonelist_full_txt = os.path.join(config_dir, 'phonelist_full.txt')
tree_hed = os.path.join(config_dir, 'tree.hed')
quests_hed = os.path.join(config_dir, 'quests.hed')
## ======================= load variables ======================= model_dir = os.path.join(default.htk_dir, 'model')
model_mono0_dir = os.path.join(model_dir, 'mono0')
model_mono1_dir = os.path.join(model_dir, 'mono1')
model_mono1sp_dir = os.path.join(model_dir, 'mono1sp')
model_mono1sp2_dir = os.path.join(model_dir, 'mono1sp2')
model_tri1_dir = os.path.join(model_dir, 'tri1')
model_tri1tied_dir = os.path.join(model_dir, 'tri1tied')
#config = configparser.ConfigParser() # directories / files to be made.
#config.sections() lexicon_dir = os.path.join(default.htk_dir, 'lexicon')
#config.read(config_ini) lexicon_htk_asr = os.path.join(lexicon_dir, 'lex.htk_asr')
lexicon_htk_oov = os.path.join(lexicon_dir, 'lex.htk_oov')
#config_hcopy = config['Settings']['config_hcopy'] lexicon_htk = os.path.join(lexicon_dir, 'lex.htk')
#config_train = config['Settings']['config_train'] lexicon_htk_with_sp = os.path.join(lexicon_dir, 'lex_with_sp.htk')
#mkhmmdefs_pl = config['Settings']['mkhmmdefs_pl'] lexicon_htk_triphone = os.path.join(lexicon_dir, 'lex_triphone.htk')
#FAME_dir = config['Settings']['FAME_dir']
#lex_asr = FAME_dir + '\\lexicon\\lex.asr'
#lex_asr_htk = FAME_dir + '\\lexicon\\lex.asr_htk'
#lex_oov = FAME_dir + '\\lexicon\\lex.oov'
#lex_oov_htk = FAME_dir + '\\lexicon\\lex.oov_htk'
##lex_ipa = FAME_dir + '\\lexicon\\lex.ipa'
##lex_ipa_ = FAME_dir + '\\lexicon\\lex.ipa_'
##lex_ipa_htk = FAME_dir + '\\lexicon\\lex.ipa_htk'
#lex_htk = FAME_dir + '\\lexicon\\lex_original.htk'
#lex_htk_ = FAME_dir + '\\lexicon\\lex.htk'
#hcompv_scp = output_dir + '\\scp\\combined.scp'
#combined_mlf = output_dir + '\\label\\combined.mlf'
#model_dir = output_dir + '\\model'
#model0_dir = model_dir + '\\hmm0'
#proto_init = model_dir + '\\proto38'
#proto_name = 'proto'
#phonelist = output_dir + '\\config\\phonelist_friesian.txt'
#hmmdefs_name = 'hmmdefs'
feature_dir = os.path.join(default.htk_dir, 'mfc') feature_dir = os.path.join(default.htk_dir, 'mfc')
if not os.path.exists(feature_dir): fh.make_new_directory(feature_dir, existing_dir='leave')
os.makedirs(feature_dir)
tmp_dir = os.path.join(default.htk_dir, 'tmp') tmp_dir = os.path.join(default.htk_dir, 'tmp')
if not os.path.exists(tmp_dir): fh.make_new_directory(tmp_dir, existing_dir='leave')
os.makedirs(tmp_dir) label_dir = os.path.join(default.htk_dir, 'label')
fh.make_new_directory(label_dir, existing_dir='leave')
## training
if combine_all:
hcompv_scp_train = os.path.join(tmp_dir, 'all.scp')
mlf_file_train = os.path.join(label_dir, 'all_phone.mlf')
mlf_file_train_word = os.path.join(label_dir, 'all_word.mlf')
mlf_file_train_with_sp = os.path.join(label_dir, 'all_phone_with_sp.mlf')
mlf_file_train_aligned = os.path.join(label_dir, 'all_phone_aligned.mlf')
triphone_mlf = os.path.join(label_dir, 'all_triphone.mlf')
else:
hcompv_scp_train = os.path.join(tmp_dir, 'train.scp')
mlf_file_train = os.path.join(label_dir, 'train_phone.mlf')
mlf_file_train_word = os.path.join(label_dir, 'train_word.mlf')
mlf_file_train_with_sp = os.path.join(label_dir, 'train_phone_with_sp.mlf')
mlf_file_train_aligned = os.path.join(label_dir, 'train_phone_aligned.mlf')
triphone_mlf = os.path.join(label_dir, 'train_triphone.mlf')
hcompv_scp_train_updated = hcompv_scp_train.replace('.scp', '_updated.scp')
## testing
htk_stimmen_dir = os.path.join(default.htk_dir, 'stimmen')
## ======================= make lexicon for HTK =======================
if make_lexicon:
timer_start = time.time()
print('==== making lexicon for HTK ====')
# convert each lexicon from fame_asr phoneset to fame_htk phoneset.
print('>>> converting each lexicon from fame_asr phoneset to fame_htk phoneset...')
fame_functions.lexicon_asr2htk(lexicon_asr, lexicon_htk_asr)
fame_functions.lexicon_asr2htk(lexicon_oov, lexicon_htk_oov)
# combine lexicon
print('>>> combining lexicon files into one lexicon...')
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
# therefore there is no overlap between lex_asr and lex_oov.
fame_functions.combine_lexicon(lexicon_htk_asr, lexicon_htk_oov, lexicon_htk)
## fixing the lexicon for HTK.
# (1) Replace all tabs with single space;
# (2) Put a '\' before any dictionary entry beginning with single quote
# http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
print('>>> fixing the lexicon...')
fame_functions.fix_lexicon(lexicon_htk)
## adding sp to the lexicon for HTK.
print('>>> adding sp to the lexicon...')
with open(lexicon_htk) as f:
lines = f.read().split('\n')
with open(lexicon_htk_with_sp, 'wb') as f:
f.write(bytes(' sp\n'.join(lines), 'ascii'))
print("elapsed time: {}".format(time.time() - timer_start))
## intialize the instance for HTK.
chtk = pyhtk.HTK(config_dir, fame_asr.phoneset_htk, lexicon_htk_with_sp, feature_size)
## ======================= make label files =======================
if make_label:
for dataset in dataset_list:
timer_start = time.time()
print("==== making label files on dataset {}".format(dataset))
script_list = os.path.join(default.fame_dir, 'data', dataset, 'text')
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
label_dir_ = os.path.join(label_dir, dataset)
dictionary_file = os.path.join(label_dir_, 'temp.dic')
fh.make_new_directory(label_dir_, existing_dir='leave')
# list of scripts
with open(script_list, "rt", encoding="utf-8") as fin:
scripts = fin.read().split('\n')
for line in scripts:
# sample line:
# sp0457m_test_1968_plakkenfryslanterhorne_2168 en dan begjinne je natuerlik
filename_ = line.split(' ')[0]
filename = '_'.join(filename_.split('_')[1:])
sentence = ' '.join(line.split(' ')[1:])
sentence_htk = fame_functions.word2htk(sentence)
wav_file = os.path.join(wav_dir_, filename + '.wav')
if os.path.exists(wav_file) and chtk.can_be_ascii(sentence_htk) == 0:
if chtk.get_number_of_missing_words(
sentence_htk, dictionary_file) == 0:
# when the file name is too long, HDMan command does not work.
# therefore first temporary dictionary_file is made, then renamed.
shutil.move(dictionary_file, os.path.join(label_dir_, filename + '.dic'))
label_file = os.path.join(label_dir_, filename + '.lab')
chtk.make_label_file(sentence_htk, label_file)
else:
os.remove(dictionary_file)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= make master label files =======================
if make_mlf:
timer_start = time.time()
print("==== making master label files ====")
# train_2002_gongfansaken_10347.lab is empty. should be removed.
empty_lab_file = os.path.join(label_dir, 'train', 'train_2002_gongfansaken_10347.lab')
empty_dic_file = empty_lab_file.replace('.lab', '.dic')
if os.path.exists(empty_lab_file):
os.remove(empty_lab_file)
if os.path.exists(empty_dic_file):
os.remove(empty_dic_file)
for dataset in dataset_list:
feature_dir_ = os.path.join(feature_dir, dataset)
label_dir_ = os.path.join(label_dir, dataset)
mlf_word = os.path.join(label_dir, dataset + '_word.mlf')
mlf_phone = os.path.join(label_dir, dataset + '_phone.mlf')
mlf_phone_with_sp = os.path.join(label_dir, dataset + '_phone_with_sp.mlf')
print(">>> generating a word level mlf file for {}...".format(dataset))
chtk.label2mlf(label_dir_, mlf_word)
print(">>> generating a phone level mlf file for {}...".format(dataset))
chtk.mlf_word2phone(mlf_phone, mlf_word, with_sp=False)
chtk.mlf_word2phone(mlf_phone_with_sp, mlf_word, with_sp=True)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= extract features ======================= ## ======================= extract features =======================
if extract_features: if extract_features:
for dataset in dataset_list: for dataset in dataset_list:
print('==== {} ===='.format(dataset)) timer_start = time.time()
print('==== extract features on dataset {} ===='.format(dataset))
wav_dir_ = os.path.join(default.fame_dir, 'fame', 'wav', dataset)
label_dir_ = os.path.join(label_dir, dataset)
feature_dir_ = os.path.join(feature_dir, dataset)
fh.make_new_directory(feature_dir_, existing_dir='delete')
# a script file for HCopy # a script file for HCopy
print(">>> making a script file for HCopy... \n") print(">>> making a script file for HCopy...")
hcopy_scp = tempfile.NamedTemporaryFile(mode='w', delete=False) hcopy_scp = tempfile.NamedTemporaryFile(mode='w', delete=False)
hcopy_scp.close() hcopy_scp.close()
# get a list of features (hcopy.scp) from the filelist in FAME! corpus # get a list of features (hcopy.scp)
feature_dir_ = os.path.join(feature_dir, dataset) # from the filelist in FAME! corpus.
if not os.path.exists(feature_dir_): #fame_functions.make_hcopy_scp_from_filelist_in_fame(default.fame_dir, dataset, feature_dir_, hcopy_scp.name)
os.makedirs(feature_dir_) # from the list of label files.
lab_list = glob.glob(os.path.join(label_dir_, '*.lab'))
feature_list = [
os.path.join(wav_dir_, os.path.basename(lab_file).replace('.lab', '.wav')) + '\t'
+ os.path.join(feature_dir_, os.path.basename(lab_file).replace('.lab', '.mfc'))
for lab_file in lab_list]
# extract features #if os.path.exists(empty_mfc_file):
print(">>> extracting features... \n") # os.remove(empty_mfc_file)
fame_functions.make_hcopy_scp_from_filelist_in_fame(default.fame_dir, dataset, feature_dir_, hcopy_scp.name) with open(hcopy_scp.name, 'wb') as f:
pyhtk.wav2mfc(default.config_hcopy, hcopy_scp.name) f.write(bytes('\n'.join(feature_list), 'ascii'))
# a script file for HCompV # extract features.
print(">>> making a script file for HCompV... \n") print(">>> extracting features on {}...".format(dataset))
chtk.wav2mfc(hcopy_scp.name)
os.remove(hcopy_scp.name)
# make hcompv.scp.
print(">>> making a script file for {}...".format(dataset))
listdir = glob.glob(os.path.join(label_dir_, '*.dic'))
mfc_list = [filename.replace(label_dir_, feature_dir_).replace('.dic', '.mfc') for filename in listdir]
hcompv_scp = os.path.join(tmp_dir, dataset + '.scp') hcompv_scp = os.path.join(tmp_dir, dataset + '.scp')
fh.make_filelist(feature_dir_, hcompv_scp, '.mfc') with open(hcompv_scp, 'wb') as f:
f.write(bytes('\n'.join(mfc_list) + '\n', 'ascii'))
print(">>> extracting features on stimmen...")
chtk.wav2mfc(os.path.join(htk_stimmen_dir, 'hcopy.scp'))
## ======================= convert lexicon from ipa to fame_htk =======================
if conv_lexicon:
print('==== convert lexicon from ipa 2 fame ====\n')
#dir_out = r'c:\Users\Aki\source\repos\acoustic_model\_tmp'
lexicon_dir = os.path.join(default.fame_dir, 'lexicon')
lexicon_ipa = os.path.join(lexicon_dir, 'lex.ipa')
lexicon_asr = os.path.join(lexicon_dir, 'lex.asr')
# get the correspondence between lex_ipa and lex_asr.
lex_asr = fame_functions.load_lexicon(lexicon_asr)
lex_ipa = fame_functions.load_lexicon(lexicon_ipa)
if 1:
timer_start = time.time()
translation_key, phone_unknown = fame_functions.get_translation_key(lexicon_ipa, lexicon_asr)
print("elapsed time: {}".format(time.time() - timer_start)) print("elapsed time: {}".format(time.time() - timer_start))
np.save('translation_key_ipa2asr.npy', translation_key)
np.save('phone_unknown.npy', phone_unknown) ## ======================= flat start monophones =======================
else: if combine_all:
translation_key = np.load('translation_key_ipa2asr.npy').item() # script files.
phone_unknown = np.load('phone_unknown.npy') fh.concatenate(
phone_unknown = list(phone_unknown) os.path.join(tmp_dir, 'devel.scp'),
os.path.join(tmp_dir, 'test.scp'),
hcompv_scp_train
)
fh.concatenate(
hcompv_scp_train,
os.path.join(tmp_dir, 'train.scp'),
hcompv_scp_train
)
# phone level mlfs.
fh.concatenate(
os.path.join(label_dir, 'devel_phone.mlf'),
os.path.join(label_dir, 'test_phone.mlf'),
mlf_file_train
)
fh.concatenate(
mlf_file_train,
os.path.join(label_dir, 'train_phone.mlf'),
mlf_file_train
)
# phone level mlfs with sp.
fh.concatenate(
os.path.join(label_dir, 'devel_phone_with_sp.mlf'),
os.path.join(label_dir, 'test_phone_with_sp.mlf'),
mlf_file_train_with_sp
)
fh.concatenate(
mlf_file_train_with_sp,
os.path.join(label_dir, 'train_phone_with_sp.mlf'),
mlf_file_train_with_sp
)
## manually check the correspondence for the phone in phone_unknown. # word level mlfs.
#p = phone_unknown[0] fh.concatenate(
#lex_ipa_ = find_phone(lexicon_ipa, p, phoneset='ipa') os.path.join(label_dir, 'devel_word.mlf'),
os.path.join(label_dir, 'test_word.mlf'),
#for word in lex_ipa_['word']: mlf_file_train_word
# ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1] )
# if np.sum(lex_asr['word'] == word) > 0: fh.concatenate(
# asr = lex_asr[lex_asr['word'] == word].iat[0, 1] mlf_file_train_word,
os.path.join(label_dir, 'train_word.mlf'),
# ipa_list = convert_phone_set.split_word(ipa, fame_phoneset.multi_character_phones_ipa) mlf_file_train_word
# asr_list = asr.split(' ') )
# if p in ipa_list and (len(ipa_list) == len(asr_list)):
# print("{0}: {1} --> {2}".format(word, ipa_list, asr_list))
# for ipa_, asr_ in zip(ipa_list, asr_list):
# if ipa_ in phone_unknown:
# translation_key[ipa_] = asr_
# phone_unknown.remove(ipa_)
## check if all the phones in lexicon_ipa are in fame_phoneset.py.
#timer_start = time.time()
#phoneset_lex = get_phoneset_from_lexicon(lexicon_ipa, phoneset='ipa')
#print("elapsed time: {}".format(time.time() - timer_start))
#phoneset_py = fame_phoneset.phoneset_ipa
#set(phoneset_lex) - set(phoneset_py)
##timer_start = time.time()
##extracted = find_phone(lexicon_ipa, 'ⁿ')
##print("elapsed time: {}".format(time.time() - timer_start))
# lex.asr is Kaldi compatible version of lex.ipa.
# to check...
#lexicon_ipa = pd.read_table(lex_ipa, names=['word', 'pronunciation'])
#with open(lex_ipa_, "w", encoding="utf-8") as fout:
# for word, pronunciation in zip(lexicon_ipa['word'], lexicon_ipa['pronunciation']):
# # ignore nasalization and '.'
# pronunciation_ = pronunciation.replace(u'ⁿ', '')
# pronunciation_ = pronunciation_.replace('.', '')
# pronunciation_split = convert_phone_set.split_ipa_fame(pronunciation_)
# fout.write("{0}\t{1}\n".format(word, ' '.join(pronunciation_split)))
# convert each lexicon from ipa description to fame_htk phoneset.
#am_func.ipa2famehtk_lexicon(lex_oov, lex_oov_htk)
#am_func.ipa2famehtk_lexicon(lex_asr, lex_asr_htk)
# combine lexicon
# pronunciations which is not found in lex.asr are generated using G2P and listed in lex.oov.
# therefore there is no overlap between lex_asr and lex_oov.
#am_func.combine_lexicon(lex_asr_htk, lex_oov_htk, lex_htk)
## ======================= check if all the phones are successfully converted =======================
if check_lexicon:
print("==== check if all the phones are successfully converted. ====\n")
# the phones used in the lexicon.
phonelist_asr = am_func.get_phonelist(lex_asr)
phonelist_oov = am_func.get_phonelist(lex_oov)
phonelist_htk = am_func.get_phonelist(lex_htk)
phonelist = phonelist_asr.union(phonelist_oov)
# the lines which include a specific phone.
lines = am_func.find_phone(lex_asr, 'g')
# statistics over the lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
pronunciation = lexicon_htk['pronunciation']
phones_all = []
for word in pronunciation:
phones_all = phones_all + word.split()
c = Counter(phones_all)
## =======================
## manually make changes to the pronunciation dictionary and save it as lex.htk
## =======================
# (1) Replace all tabs with single space;
# (2) Put a '\' before any dictionary entry beginning with single quote
#http://electroblaze.blogspot.nl/2013/03/understanding-htk-error-messages.html
## ======================= make label file =======================
if make_mlf:
print("==== make mlf ====\n")
print("generating word level transcription...\n")
for dataset in dataset_list:
hcompv_scp = output_dir + '\\scp\\' + dataset + '.scp'
hcompv_scp2 = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
script_list = FAME_dir + '\\data\\' + dataset + '\\text'
mlf_word = output_dir + '\\label\\' + dataset + '_word.mlf'
mlf_phone = output_dir + '\\label\\' + dataset + '_phone.mlf'
# lexicon
lexicon_htk = pd.read_table(lex_htk, names=['word', 'pronunciation'])
# list of features
with open(hcompv_scp) as fin:
features = fin.read()
features = features.split('\n')
# list of scripts
with open(script_list, "rt", encoding="utf-8") as fin:
scripts = fin.read()
scripts = pd.Series(scripts.split('\n'))
i = 0
missing_words = []
fscp = open(hcompv_scp2, 'wt')
fmlf = open(mlf_word, "wt", encoding="utf-8")
fmlf.write("#!MLF!#\n")
feature_nr = 1
for feature in features:
sys.stdout.write("\r%d/%d" % (feature_nr, len(features)))
sys.stdout.flush()
feature_nr += 1
file_basename = os.path.basename(feature).replace('.mfc', '')
# get words from scripts.
try:
script = scripts[scripts.str.contains(file_basename)]
except IndexError:
script = []
if len(script) != 0:
script_id = script.index[0]
script_txt = script.get(script_id)
script_words = script_txt.split(' ')
del script_words[0]
# check if all words can be found in the lexicon.
SCRIPT_WORDS = []
script_prons = []
is_in_lexicon = 1
for word in script_words:
WORD = word.upper()
SCRIPT_WORDS.append(WORD)
extracted = lexicon_htk[lexicon_htk['word']==WORD]
if len(extracted) == 0:
missing_words.append(word)
script_prons.append(extracted)
is_in_lexicon *= len(extracted)
# if all pronunciations are found in the lexicon, update scp and mlf files.
if is_in_lexicon:
# add the feature filename into the .scp file.
fscp.write("{}\n".format(feature))
i += 1
# add the words to the mlf file.
fmlf.write('\"*/{}.lab\"\n'.format(file_basename))
#fmlf.write('{}'.format('\n'.join(SCRIPT_WORDS)))
for word_ in SCRIPT_WORDS:
if word_[0] == '\'':
word_ = '\\' + word_
fmlf.write('{}\n'.format(word_))
fmlf.write('.\n')
print("\n{0} has {1} samples.\n".format(dataset, i))
np.save(output_dir + '\\missing_words' + '_' + dataset + '.npy', missing_words)
fscp.close()
fmlf.close()
## generate phone level transcription
print("generating phone level transcription...\n")
mkphones = output_dir + '\\label\\mkphones0.txt'
subprocessStr = r"HLEd -l * -d " + lex_htk_ + ' -i ' + mlf_phone + ' ' + mkphones + ' ' + mlf_word
subprocess.call(subprocessStr, shell=True)
## ======================= combined scps and mlfs =======================
if combine_files:
print("==== combine scps and mlfs ====\n")
fscp = open(hcompv_scp, 'wt')
fmlf = open(combined_mlf, 'wt')
for dataset in dataset_list:
fmlf.write("#!MLF!#\n")
for dataset in dataset_list:
each_mlf = output_dir + '\\label\\' + dataset + '_phone.mlf'
each_scp = output_dir + '\\scp\\' + dataset + '_all_words_in_lexicon.scp'
with open(each_mlf, 'r') as fin:
lines = fin.read()
lines = lines.split('\n')
fmlf.write('\n'.join(lines[1:]))
with open(each_scp, 'r') as fin:
lines = fin.read()
fscp.write(lines)
fscp.close()
fmlf.close()
## ======================= flat start monophones ======================= ## ======================= flat start monophones =======================
if flat_start: if flat_start:
subprocessStr = 'HCompV -T 1 -C ' + config_train + ' -m -v 0.01 -S ' + hcompv_scp + ' -M ' + model0_dir + ' ' + proto_init timer_start = time.time()
subprocess.call(subprocessStr, shell=True) print('==== flat start ====')
fh.make_new_directory(model_mono0_dir, existing_dir='leave')
chtk.flat_start(hcompv_scp_train, model_mono0_dir)
# make macros.
vFloors = os.path.join(model_mono0_dir, 'vFloors')
if os.path.exists(vFloors):
chtk.make_macros(vFloors)
# allocate mean & variance to all phones in the phone list # allocate mean & variance to all phones in the phone list
subprocessStr = 'perl ' + mkhmmdefs_pl + ' ' + model0_dir + '\\proto38' + ' ' + phonelist + ' > ' + model0_dir + '\\' + hmmdefs_name print('>>> allocating mean & variance to all phones in the phone list...')
subprocess.call(subprocessStr, shell=True) chtk.make_hmmdefs(model_mono0_dir)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= estimate monophones ======================= ## ======================= train model without short pause =======================
if train_model: if train_monophone_without_sp:
iter_num_max = 3 print('==== train monophone without sp ====')
for mix_num in [128, 256, 512, 1024]:
for iter_num in range(1, iter_num_max+1):
print("===== mix{}, iter{} =====".format(mix_num, iter_num))
iter_num_pre = iter_num - 1
modelN_dir = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num)
if not os.path.exists(modelN_dir):
os.makedirs(modelN_dir)
if iter_num == 1 and mix_num == 1: timer_start = time.time()
modelN_dir_pre = model0_dir niter = chtk.re_estimation_until_saturated(
model_mono1_dir,
model_mono0_dir, improvement_threshold, hcompv_scp_train,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic')
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= adding sp to the model =======================
if add_sp:
print('==== adding sp to the model ====')
# reference:
# http://www.f.waseda.jp/yusukekondo/htk.html#flat_start_estimation
timer_start = time.time()
# make model with sp.
print('>>> adding sp state to the last model in the previous step...')
fh.make_new_directory(model_mono1sp_dir, existing_dir='leave')
niter = chtk.get_niter_max(model_mono1_dir)
modeln_dir_pre = os.path.join(model_mono1_dir, 'iter'+str(niter))
modeln_dir = os.path.join(model_mono1sp_dir, 'iter0')
chtk.add_sp(modeln_dir_pre, modeln_dir)
print('>>> re-estimation...')
niter = chtk.re_estimation_until_saturated(
model_mono1sp_dir, modeln_dir, improvement_threshold, hcompv_scp_train,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_with_sp,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= train model with re-aligned mlf =======================
if train_monophone_with_re_aligned_mlf:
print('==== traina monophone with re-aligned mlf ====')
timer_start = time.time()
print('>>> re-aligning the training data... ')
niter = chtk.get_niter_max(model_mono1sp_dir)
modeln_dir = os.path.join(model_mono1sp_dir, 'iter'+str(niter))
chtk.make_aligned_label(
os.path.join(modeln_dir, 'macros'),
os.path.join(modeln_dir, 'hmmdefs'),
mlf_file_train_aligned,
mlf_file_train_word,
hcompv_scp_train)
chtk.fix_mlf(mlf_file_train_aligned)
print('>>> updating the script file... ')
chtk.update_script_file(
mlf_file_train_aligned,
mlf_file_train_with_sp,
hcompv_scp_train,
hcompv_scp_train_updated)
print('>>> re-estimation... ')
timer_start = time.time()
fh.make_new_directory(model_mono1sp2_dir, existing_dir='leave')
niter = chtk.get_niter_max(model_mono1sp_dir)
niter = chtk.re_estimation_until_saturated(
model_mono1sp2_dir,
os.path.join(model_mono1sp_dir, 'iter'+str(niter)),
improvement_threshold,
hcompv_scp_train_updated,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_aligned,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= increase mixture =======================
if increase_mixture:
print('==== increase mixture ====')
timer_start = time.time()
for nmix in [2, 4, 8, 16]:
if nmix == 2:
modeln_dir_ = model_mono1sp2_dir
else: else:
modelN_dir_pre = model_dir + '\\hmm' + str(mix_num) + '-' + str(iter_num_pre) modeln_dir_ = os.path.join(model_dir, 'mono'+str(nmix_))
modeln_dir = os.path.join(model_dir, 'mono'+str(nmix))
## re-estimation print('mixture: {}'.format(nmix))
subprocessStr = 'HERest -T 1 -C ' + config_train + ' -v 0.01 -I ' + combined_mlf + ' -H ' + modelN_dir_pre + '\\' + hmmdefs_name + ' -M ' + modelN_dir + ' ' + phonelist + ' -S ' + hcompv_scp fh.make_new_directory(modeln_dir, existing_dir='delete')
subprocess.call(subprocessStr, shell=True) niter = chtk.get_niter_max(modeln_dir_)
chtk.increase_mixture(
os.path.join(modeln_dir_, 'iter'+str(niter), 'hmmdefs'),
nmix,
os.path.join(modeln_dir, 'iter0'),
model_type='monophone_with_sp')
shutil.copy2(os.path.join(modeln_dir_, 'iter'+str(niter), 'macros'),
os.path.join(modeln_dir, 'iter0', 'macros'))
mix_num_next = mix_num * 2 #improvement_threshold = -10
modelN_dir_next = model_dir + '\\hmm' + str(mix_num_next) + '-0' niter = chtk.re_estimation_until_saturated(
if not os.path.exists(modelN_dir_next): modeln_dir,
os.makedirs(modelN_dir_next) os.path.join(modeln_dir_, 'iter0'),
improvement_threshold,
hcompv_scp_train_updated,
os.path.join(htk_stimmen_dir, 'mfc'),
'mfc',
os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
mlf_file=mlf_file_train_aligned,
lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
model_type='monophone_with_sp'
)
nmix_ = nmix
header_file = modelN_dir + '\\mix' + str(mix_num_next) + '.hed' print("elapsed time: {}".format(time.time() - timer_start))
with open(header_file, 'w') as fout:
fout.write("MU %d {*.state[2-4].mix}" % (mix_num_next))
subprocessStr = 'HHEd -T 1 -H ' + modelN_dir + '\\' + hmmdefs_name + ' -M ' + modelN_dir_next + ' ' + header_file + ' ' + phonelist
subprocess.call(subprocessStr, shell=True) ## ======================= train triphone =======================
print('>>> making triphone list... ')
chtk.make_triphonelist(
mlf_file_train_aligned,
triphone_mlf)
if train_triphone:
print('==== train triphone model ====')
timer_start = time.time()
print('>>> init triphone model... ')
niter = chtk.get_niter_max(model_mono1sp2_dir)
fh.make_new_directory(os.path.join(model_tri1_dir, 'iter0'), existing_dir='leave')
chtk.init_triphone(
os.path.join(model_mono1sp2_dir, 'iter'+str(niter)),
os.path.join(model_tri1_dir, 'iter0')
)
print('>>> re-estimation... ')
## I wanted to train until satulated:
#niter = chtk.re_estimation_until_saturated(
# model_tri1_dir,
# os.path.join(model_tri1_dir, 'iter0'),
# improvement_threshold,
# hcompv_scp_train_updated,
# os.path.join(htk_stimmen_dir, 'mfc'),
# 'mfc',
# os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
# mlf_file=triphone_mlf,
# lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
# model_type='triphone'
# )
#
# but because the data size is limited, some triphone cannot be trained and received the error:
# ERROR [+8231] GetHCIModel: Cannot find hmm [i:-]r[+???]
# therefore only two times re-estimation is performed.
output_dir = model_tri1_dir
for niter in range(1, 4):
hmm_n = 'iter' + str(niter)
hmm_n_pre = 'iter' + str(niter-1)
_modeln_dir = os.path.join(output_dir, hmm_n)
_modeln_dir_pre = os.path.join(output_dir, hmm_n_pre)
fh.make_new_directory(_modeln_dir, 'leave')
chtk.re_estimation(
os.path.join(_modeln_dir_pre, 'hmmdefs'),
_modeln_dir,
hcompv_scp_train_updated,
mlf_file=triphone_mlf,
macros=os.path.join(_modeln_dir_pre, 'macros'),
model_type='triphone')
print("elapsed time: {}".format(time.time() - timer_start))
## ======================= train tied-state triphones =======================
if train_triphone_tied:
print('==== train tied-state triphones ====')
timer_start = time.time()
print('>>> making lexicon for triphone... ')
chtk.make_lexicon_triphone(phonelist_full_txt, lexicon_htk_triphone)
chtk.combine_phonelists(phonelist_full_txt)
print('>>> making a tree header... ')
fame_phonetics.make_quests_hed(quests_hed)
stats = os.path.join(r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk\model\tri1\iter3', 'stats')
chtk.make_tree_header(tree_hed, quests_hed, stats, config_dir)
print('>>> init triphone model... ')
niter = chtk.get_niter_max(model_tri1_dir)
fh.make_new_directory(os.path.join(model_tri1tied_dir, 'iter0'), existing_dir='leave')
chtk.init_triphone(
os.path.join(model_tri1_dir, 'iter'+str(niter)),
os.path.join(model_tri1tied_dir, 'iter0'),
tied=True)
# I wanted to train until satulated:
#niter = chtk.re_estimation_until_saturated(
# model_tri1tied_dir,
# os.path.join(model_tri1tied_dir, 'iter0'),
# improvement_threshold,
# hcompv_scp_train_updated,
# os.path.join(htk_stimmen_dir, 'mfc'),
# 'mfc',
# os.path.join(htk_stimmen_dir, 'word_lattice.ltc'),
# mlf_file=triphone_mlf,
# lexicon=os.path.join(htk_stimmen_dir, 'lexicon_recognition.dic'),
# model_type='triphone'
# )
#
# but because the data size is limited, some triphone cannot be trained and received the error:
# ERROR [+8231] GetHCIModel: Cannot find hmm [i:-]r[+???]
# therefore only 3 times re-estimation is performed.
output_dir = model_tri1tied_dir
for niter in range(1, 4):
hmm_n = 'iter' + str(niter)
hmm_n_pre = 'iter' + str(niter-1)
_modeln_dir = os.path.join(output_dir, hmm_n)
_modeln_dir_pre = os.path.join(output_dir, hmm_n_pre)
fh.make_new_directory(_modeln_dir, 'leave')
chtk.re_estimation(
os.path.join(_modeln_dir_pre, 'hmmdefs'),
_modeln_dir,
hcompv_scp_train_updated,
mlf_file=triphone_mlf,
macros=os.path.join(_modeln_dir_pre, 'macros'),
model_type='triphone')
print("elapsed time: {}".format(time.time() - timer_start))

138
acoustic_model/fame_test.py Normal file
View File

@ -0,0 +1,138 @@
import sys
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
from collections import Counter
import time
import re
import numpy as np
import pandas as pd
import fame_functions
import defaultfiles as default
sys.path.append(default.toolbox_dir)
from phoneset import fame_ipa, fame_asr
import convert_phoneset
lexicon_dir = os.path.join(default.fame_dir, 'lexicon')
lexicon_ipa = os.path.join(lexicon_dir, 'lex.ipa')
lexicon_asr = os.path.join(lexicon_dir, 'lex.asr')
lexicon_htk = os.path.join(default.htk_dir, 'lexicon', 'lex.htk')
## check if all the phones in lexicon.ipa are in fame_ipa.py.
#timer_start = time.time()
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_ipa, phoneset='ipa')
#phoneset_py = fame_ipa.phoneset
#print("phones which is in lexicon.ipa but not in fame_ipa.py:\n{}".format(
# set(phoneset_lex) - set(phoneset_py)))
#print("elapsed time: {}".format(time.time() - timer_start))
# check which word has the phone.
#timer_start = time.time()
#extracted = find_phone(lexicon_ipa, 'ⁿ')
#print("elapsed time: {}".format(time.time() - timer_start))
## get the correspondence between lex_ipa and lex_asr.
lex_asr = fame_functions.load_lexicon(lexicon_asr)
lex_ipa = fame_functions.load_lexicon(lexicon_ipa)
if 0:
timer_start = time.time()
translation_key_ipa2asr, phone_unknown = fame_functions.get_translation_key(lexicon_ipa, lexicon_asr)
print("elapsed time: {}".format(time.time() - timer_start))
np.save(os.path.join('phoneset', 'output_get_translation_key_translation_key.npy'), translation_key_ipa2asr)
np.save(os.path.join('phoneset', 'output_get_translation_key_phone_unknown.npy'), phone_unknown)
else:
translation_key_ipa2asr = np.load(os.path.join('phoneset', 'output_get_translation_key_translation_key.npy')).item()
phone_unknown = np.load(os.path.join('phoneset', 'output_get_translation_key_phone_unknown.npy'))
phone_unknown = list(phone_unknown)
# manually check the correspondence for the phone in phone_unknown.
#p = phone_unknown[0]
#lex_ipa_ = find_phone(lexicon_ipa, p, phoneset='ipa')
#for word in lex_ipa_['word']:
# ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
# if np.sum(lex_asr['word'] == word) > 0:
# asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
# ipa_list = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
# asr_list = asr.split(' ')
# if p in ipa_list and (len(ipa_list) == len(asr_list)):
# print("{0}: {1} --> {2}".format(word, ipa_list, asr_list))
# for ipa_, asr_ in zip(ipa_list, asr_list):
# if ipa_ in phone_unknown:
# translation_key_ipa2asr[ipa_] = asr_
# phone_unknown.remove(ipa_)
translation_key_ipa2asr['ə:'] = 'ə'
translation_key_ipa2asr['r.'] = 'r'
translation_key_ipa2asr['r:'] = 'r'
# added for stimmen.
translation_key_ipa2asr['ɪ:'] = 'ɪ:'
translation_key_ipa2asr['y:'] = 'y'
np.save(os.path.join('phoneset', 'fame_ipa2asr.npy'), translation_key_ipa2asr)
## check if all the phones in lexicon.asr are in translation_key_ipa2asr.
#timer_start = time.time()
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_asr, phoneset='asr')
#phoneset_lex.remove("")
#phoneset_asr = list(set(translation_key_ipa2asr.values()))
#print("phones which is in lexicon.asr but not in the translation_key_ipa2asr:\n{}".format(
# set(phoneset_lex) - set(phoneset_asr)))
#print("elapsed time: {}".format(time.time() - timer_start))
## check if all the phones in lexicon.htk are in fame_asr.py.
#timer_start = time.time()
#phoneset_htk = fame_asr.phoneset_htk
#phoneset_lex = fame_functions.get_phoneset_from_lexicon(lexicon_htk)
#phoneset_lex.remove('')
#print("phones which is in lexicon.htk but not in the fame_asr.py are:\n{}".format(
# set(phoneset_htk) - set(phoneset_lex)))
#print("elapsed time: {}".format(time.time() - timer_start))
## statistics over the lexicon
#lex_htk = fame_functions.load_lexicon(lexicon_htk)
#phones_all = (' '.join(lex_htk['pronunciation'])).split(' ')
#c = Counter(phones_all)
#lexicon_out = r'c:\OneDrive\Research\rug\experiments\acoustic_model\fame\htk\lexicon\lex.htk2'
#for i in lex_htk[lex_htk['word'].str.startswith('\'')].index.values:
# lex_htk.iat[i, 0] = lex_htk.iat[i, 0].replace('\'', '\\\'')
## to_csv does not work with space seperator. therefore all tabs should manually be replaced.
##lex_htk.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep=' ', quoting=csv.QUOTE_NONE, escapechar='\\')
#lex_htk.to_csv(lexicon_out, index=False, header=False, encoding="utf-8", sep='\t')
## check which letters are not coded in ascii.
#print('asr phones which cannot be coded in ascii:\n')
#for i in fame_asr.phoneset_short:
# try:
# i_encoded = i.encode("ascii")
# #print("{0} --> {1}".format(i, i.encode("ascii")))
# except UnicodeEncodeError:
# print(">>> {}".format(i))
#print("letters in the scripts which is not coded in ascii:\n")
#for dataset in ['train', 'devel', 'test']:
# timer_start = time.time()
# script_list = os.path.join(default.fame_dir, 'data', dataset, 'text')
# with open(script_list, "rt", encoding="utf-8") as fin:
# scripts = fin.read().split('\n')
# for line in scripts:
# sentence = ' '.join(line.split(' ')[1:])
# sentence_htk = fame_functions.word2htk(sentence)
# #if len(re.findall(r'[âêôûč\'àéèúćäëïöü]', sentence))==0:
# try:
# sentence_htk = bytes(sentence_htk, 'ascii')
# except UnicodeEncodeError:
# print(sentence)
# print(sentence_htk)

View File

@ -1,131 +1,278 @@
import os import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model') os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys import sys
import csv
import subprocess #import csv
from collections import Counter #import subprocess
import re #from collections import Counter
#import re
import shutil
import glob
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt from collections import Counter
from sklearn.metrics import confusion_matrix #import matplotlib.pyplot as plt
#from sklearn.metrics import confusion_matrix
import acoustic_model_functions as am_func #import acoustic_model_functions as am_func
import convert_xsampa2ipa #import convert_xsampa2ipa
import defaultfiles as default import defaultfiles as default
from forced_alignment import pyhtk #from forced_alignment import pyhtk
#sys.path.append(default.forced_alignment_module_dir)
#from forced_alignment import convert_phone_set
#import acoustic_model_functions as am_func
import convert_xsampa2ipa
import stimmen_functions
import fame_functions
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
## ======================= user define ======================= ## ======================= user define =======================
excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx') #excel_file = os.path.join(default.experiments_dir, 'stimmen', 'data', 'Frisian Variants Picture Task Stimmen.xlsx')
data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data') #data_dir = os.path.join(default.experiments_dir, 'stimmen', 'data')
wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k #wav_dir = r'c:\OneDrive\WSL\kaldi-trunk\egs\fame\s5\corpus\stimmen' # 16k
acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model') #acoustic_model_dir = os.path.join(default.experiments_dir, 'friesian', 'acoustic_model', 'model')
htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short') #htk_dict_dir = os.path.join(default.experiments_dir, 'stimmen', 'dic_short')
fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k') #fa_dir = os.path.join(default.experiments_dir, 'stimmen', 'FA_44k')
result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result') #result_dir = os.path.join(default.experiments_dir, 'stimmen', 'result')
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme') #kaldi_data_dir = os.path.join(default.kaldi_dir, 'data', 'alignme')
kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict') #kaldi_dict_dir = os.path.join(default.kaldi_dir, 'data', 'local', 'dict')
lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt') #lexicon_txt = os.path.join(kaldi_dict_dir, 'lexicon.txt')
#lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr') #lex_asr = os.path.join(default.fame_dir, 'lexicon', 'lex.asr')
#lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk') #lex_asr_htk = os.path.join(default.fame_dir, 'lexicon', 'lex.asr_htk')
# procedure # procedure
make_htk_dict_files = 0 make_dic_file = 0
do_forced_alignment_htk = 0 make_HTK_files = 0
eval_forced_alignment_htk = 0 extract_features = 0
make_kaldi_data_files = 0 #make_htk_dict_files = 0
make_kaldi_lexicon_txt = 0 #do_forced_alignment_htk = 0
load_forced_alignment_kaldi = 1 #eval_forced_alignment_htk = 0
eval_forced_alignment_kaldi = 1 make_kaldi_files = 0
#make_kaldi_lexicon_txt = 0
#load_forced_alignment_kaldi = 1
#eval_forced_alignment_kaldi = 1
#sys.path.append(os.path.join(default.repo_dir, 'forced_alignment'))
#from forced_alignment import convert_phone_set
#from forced_alignment import pyhtk
#sys.path.append(os.path.join(default.repo_dir, 'toolbox'))
#from evaluation import plot_confusion_matrix
## HTK related files.
config_dir = os.path.join(default.htk_dir, 'config')
model_dir = os.path.join(default.htk_dir, 'model')
feature_dir = os.path.join(default.htk_dir, 'mfc', 'stimmen')
config_hcopy = os.path.join(config_dir, 'config.HCopy')
# files to be made.
lattice_file = os.path.join(config_dir, 'stimmen.ltc')
phonelist_txt = os.path.join(config_dir, 'phonelist.txt')
stimmen_dic = os.path.join(default.htk_dir, 'lexicon', 'stimmen_recognition.dic')
hcopy_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_hcopy.scp')
hvite_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_hvite.scp')
hresult_scp = os.path.join(default.htk_dir, 'tmp', 'stimmen_test_result.scp')
## Kaldi related files.
kaldi_data_dir = os.path.join(default.kaldi_dir, 'data')
# files to be made.
wav_scp = os.path.join(kaldi_data_dir, 'test', 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'test', 'text')
utt2spk = os.path.join(kaldi_data_dir, 'test', 'utt2spk')
corpus_txt = os.path.join(kaldi_data_dir, 'local', 'corpus.txt')
lexicon_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'lexicon.txt')
nonsilence_phones_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'nonsilence_phones.txt')
silence_phones_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'silence_phones.txt')
optional_silence_txt = os.path.join(kaldi_data_dir, 'local', 'dict', 'optional_silence.txt')
## ======================= add paths ======================= ## ======================= load test data ======================
sys.path.append(os.path.join(default.repo_dir, 'forced_alignment')) stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
from forced_alignment import convert_phone_set
from forced_alignment import pyhtk
sys.path.append(os.path.join(default.repo_dir, 'toolbox')) df = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
from evaluation import plot_confusion_matrix df = stimmen_functions.add_row_asr(df)
df = stimmen_functions.add_row_htk(df)
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
## ======================= convert phones ====================== ## ======================= make dic file to check pronunciation variants ======================
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir) # dic file should be manually modified depends on the task - recognition / forced-alignemnt.
if make_dic_file:
xls = pd.ExcelFile(excel_file) # for HTK.
with open(stimmen_dic, mode='wb') as f:
## check conversion
#df = pd.read_excel(xls, 'frequency')
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# #ipa_converted = convert_xsampa2ipa.conversion('xsampa', 'ipa', mapping, xsampa_)
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
## check phones included in FAME!
# the phones used in the lexicon.
#phonelist = am_func.get_phonelist(lex_asr)
# the lines which include a specific phone.
#lines = am_func.find_phone(lex_asr, 'x')
# Filename, Word, Self Xsampa
df = pd.read_excel(xls, 'original')
ipas = []
famehtks = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t')
xsampa = xsampa.replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':')
ipa = ipa.replace(' ', '')
ipas.append(ipa)
famehtk = convert_phone_set.ipa2famehtk(ipa)
famehtks.append(famehtk)
else:
ipas.append('')
famehtks.append('')
# extract interesting cols.
df = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas),
'famehtk': pd.Series(famehtks)})
# cleansing.
df = df[~df['famehtk'].isin(['/', ''])]
word_list = np.unique(df['word'])
## ======================= make dict files used for HTK. ======================
if make_htk_dict_files:
output_type = 3
for word in word_list: for word in word_list:
htk_dict_file = htk_dict_dir + '\\' + word + '.dic' df_ = df[df['word']==word]
pronunciations = list(np.unique(df_['htk']))
pronunciations_ = [word.upper() + ' sil ' + ' '.join(convert_phoneset.split_word(
htk, fame_asr.multi_character_phones_htk)) + ' sil'
for htk in pronunciations]
f.write(bytes('\n'.join(pronunciations_) + '\n', 'ascii'))
f.write(bytes('SILENCE sil\n', 'ascii'))
# pronunciation variant of the target word. # for Kaldi.
pronvar_ = df['famehtk'][df['word'].str.match(word)] fh.make_new_directory(os.path.join(kaldi_data_dir, 'local', 'dict'))
with open(lexicon_txt, mode='wb') as f:
f.write(bytes('!SIL sil\n', 'utf-8'))
f.write(bytes('<UNK> spn\n', 'utf-8'))
for word in word_list:
df_ = df[df['word']==word]
pronunciations = list(np.unique(df_['asr']))
pronunciations_ = [word.lower() + ' ' + ' '.join(convert_phoneset.split_word(
asr, fame_asr.multi_character_phones))
for asr in pronunciations]
f.write(bytes('\n'.join(pronunciations_) + '\n', 'utf-8'))
# make dic file.
am_func.make_htk_dict(word, pronvar_, htk_dict_file, output_type) ## ======================= test data for recognition ======================
# only target pronunciation variants.
df_rec = pd.DataFrame(index=[], columns=list(df.keys()))
for word in word_list:
variants = [htk.replace(' ', '')
for htk in stimmen_functions.load_pronunciations(word.upper(), stimmen_dic)]
df_ = df[df['word'] == word]
for index, row in df_.iterrows():
if row['htk'] in variants:
df_rec = df_rec.append(row, ignore_index=True)
## ======================= make files required for HTK ======================
if make_HTK_files:
# make a word lattice file.
pyhtk.create_word_lattice_file(
os.path.join(config_dir, 'stimmen.net'),
lattice_file)
# extract features.
with open(hcopy_scp, 'wb') as f:
filelist = [os.path.join(stimmen_test_dir, filename) + '\t'
+ os.path.join(feature_dir, os.path.basename(filename).replace('.wav', '.mfc'))
for filename in df['filename']]
f.write(bytes('\n'.join(filelist), 'ascii'))
pyhtk.wav2mfc(config_hcopy, hcopy_scp)
# make label files.
for index, row in df.iterrows():
filename = row['filename'].replace('.wav', '.lab')
label_file = os.path.join(feature_dir, filename)
with open(label_file, 'wb') as f:
label_string = 'SILENCE\n' + row['word'].upper() + '\nSILENCE\n'
f.write(bytes(label_string, 'ascii'))
## ======================= make files required for Kaldi =======================
if make_kaldi_files:
fh.make_new_directory(os.path.join(kaldi_data_dir, 'test'))
fh.make_new_directory(os.path.join(kaldi_data_dir, 'test', 'local'))
fh.make_new_directory(os.path.join(kaldi_data_dir, 'conf'))
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i, row in df_rec.iterrows():
filename = row['filename']
print('=== {0}: {1} ==='.format(i, filename))
wav_file = os.path.join(stimmen_test_dir, filename)
#if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# output
f_wav_scp.write('{0} {1}\n'.format(
utterance_id,
wav_file.replace('c:/', '/mnt/c/').replace('\\', '/'))) # convert path to unix format.
f_text_file.write('{0}\t{1}\n'.format(utterance_id, df_rec['word'][i].lower()))
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
with open(corpus_txt, 'wb') as f:
f.write(bytes('\n'.join([word.lower() for word in word_list]) + '\n', 'utf-8'))
with open(nonsilence_phones_txt, 'wb') as f:
f.write(bytes('\n'.join(fame_asr.phoneset_short) + '\n', 'utf-8'))
with open(silence_phones_txt, 'wb') as f:
f.write(bytes('sil\nspn\n', 'utf-8'))
with open(optional_silence_txt, 'wb') as f:
f.write(bytes('sil\n', 'utf-8'))
with open(os.path.join(kaldi_data_dir, 'conf', 'decode.config'), 'wb') as f:
f.write(bytes('first_beam=10.0\n', 'utf-8'))
f.write(bytes('beam=13.0\n', 'utf-8'))
f.write(bytes('lattice_beam=6.0\n', 'utf-8'))
with open(os.path.join(kaldi_data_dir, 'conf', 'mfcc.conf'), 'wb') as f:
f.write(bytes('--use-energy=false', 'utf-8'))
## ======================= recognition ======================
listdir = glob.glob(os.path.join(feature_dir, '*.mfc'))
with open(hvite_scp, 'wb') as f:
f.write(bytes('\n'.join(listdir), 'ascii'))
with open(hresult_scp, 'wb') as f:
f.write(bytes('\n'.join(listdir).replace('.mfc', '.rec'), 'ascii'))
# calculate result
performance = np.zeros((1, 2))
for niter in range(50, 60):
output = pyhtk.recognition(
os.path.join(config_dir, 'config.rec'),
lattice_file,
os.path.join(default.htk_dir, 'model', 'hmm1', 'iter' + str(niter), 'hmmdefs'),
stimmen_dic, phonelist_txt, hvite_scp)
output = pyhtk.calc_recognition_performance(
stimmen_dic, hresult_scp)
per_sentence, per_word = pyhtk.load_recognition_output_all(output)
performance_ = np.array([niter, per_sentence['accuracy']]).reshape(1, 2)
performance = np.r_[performance, performance_]
print('{0}: {1}[%]'.format(niter, per_sentence['accuracy']))
#output = run_command_with_output([
# 'HVite', '-T', '1',
# '-C', config_rec,
# '-w', lattice_file,
# '-H', hmm,
# dictionary_file, phonelist_txt,
# '-S', HVite_scp
#])
## ======================= forced alignment using HTK ======================= ## ======================= forced alignment using HTK =======================
@ -179,54 +326,7 @@ if do_forced_alignment_htk:
predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl')) predictions.to_pickle(os.path.join(result_dir, 'htk', 'predictions_hmm' + hmm_num_str + '.pkl'))
## ======================= make files which is used for forced alignment by Kaldi =======================
if make_kaldi_data_files:
wav_scp = os.path.join(kaldi_data_dir, 'wav.scp')
text_file = os.path.join(kaldi_data_dir, 'text')
utt2spk = os.path.join(kaldi_data_dir, 'utt2spk')
# remove previous files.
if os.path.exists(wav_scp):
os.remove(wav_scp)
if os.path.exists(text_file):
os.remove(text_file)
if os.path.exists(utt2spk):
os.remove(utt2spk)
f_wav_scp = open(wav_scp, 'a', encoding="utf-8", newline='\n')
f_text_file = open(text_file, 'a', encoding="utf-8", newline='\n')
f_utt2spk = open(utt2spk, 'a', encoding="utf-8", newline='\n')
# make wav.scp, text, and utt2spk files.
for i in df.index:
filename = df['filename'][i]
print('=== {0}: {1} ==='.format(i, filename))
#if (i in df['filename'].keys()) and (isinstance(df['filename'][i], str)):
wav_file = os.path.join(wav_dir, filename)
if os.path.exists(wav_file):
speaker_id = 'speaker_' + str(i).zfill(4)
utterance_id = filename.replace('.wav', '')
utterance_id = utterance_id.replace(' ', '_')
utterance_id = speaker_id + '-' + utterance_id
# wav.scp file
wav_file_unix = wav_file.replace('\\', '/')
wav_file_unix = wav_file_unix.replace('c:/', '/mnt/c/')
f_wav_scp.write('{0} {1}\n'.format(utterance_id, wav_file_unix))
# text file
word = df['word'][i].lower()
f_text_file.write('{0}\t{1}\n'.format(utterance_id, word))
# utt2spk
f_utt2spk.write('{0} {1}\n'.format(utterance_id, speaker_id))
f_wav_scp.close()
f_text_file.close()
f_utt2spk.close()
## ======================= make lexicon txt which is used by Kaldi ======================= ## ======================= make lexicon txt which is used by Kaldi =======================

View File

@ -52,7 +52,7 @@ p = argparse.ArgumentParser()
#p.add_argument("--user", default=None) #p.add_argument("--user", default=None)
#p.add_argument("--password", default=None) #p.add_argument("--password", default=None)
p.add_argument("--user", default='martijn.wieling') p.add_argument("--user", default='martijn.wieling')
p.add_argument("--password", default='fa0Thaic') p.add_argument("--password", default='xxxxxx')
args = p.parse_args() args = p.parse_args()
#wav_file = 'c:\\OneDrive\\WSL\\test\\onetwothree.wav' #wav_file = 'c:\\OneDrive\\WSL\\test\\onetwothree.wav'

View File

@ -1,20 +1,19 @@
## this script should be used only by Aki Kunikoshi. ## this script should be used only by Aki Kunikoshi.
import os
import numpy as np import numpy as np
import pandas as pd
import argparse import argparse
import json import json
from novoapi.backend import session from novoapi.backend import session
import os
#os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import defaultfiles as default import defaultfiles as default
import convert_phoneset
def load_phonset(): def load_novo70_phoneset():
translation_key_ipa2novo70 = dict()
translation_key_novo702ipa = dict()
#phonelist_novo70_ = pd.ExcelFile(default.phonelist_novo70_xlsx) #phonelist_novo70_ = pd.ExcelFile(default.phonelist_novo70_xlsx)
#df = pd.read_excel(phonelist_novo70_, 'list') #df = pd.read_excel(phonelist_novo70_, 'list')
## *_simple includes columns which has only one phone in. ## *_simple includes columns which has only one phone in.
@ -23,21 +22,23 @@ def load_phonset():
# print('{0}:{1}'.format(ipa, novo70)) # print('{0}:{1}'.format(ipa, novo70))
# translation_key[ipa] = novo70 # translation_key[ipa] = novo70
#phonelist_novo70 = np.unique(list(df['novo70_simple'])) #phonelist_novo70 = np.unique(list(df['novo70_simple']))
novo70_phoneset = pd.read_csv(default.novo70_phoneset, delimiter='\t', header=None, encoding="utf-8")
novo70_phoneset.rename(columns={0: 'novo70', 1: 'ipa', 2: 'description'}, inplace=True)
phoneset_ipa = [] #phoneset_ipa = []
phoneset_novo70 = [] #phoneset_novo70 = []
with open(default.novo70_phoneset, "rt", encoding="utf-8") as fin: #with open(default.novo70_phoneset, "rt", encoding="utf-8") as fin:
lines = fin.read() # lines = fin.read()
lines = lines.split('\n') # lines = lines.split('\n')
for line in lines: # for line in lines:
words = line.split('\t') # words = line.split('\t')
if len(words) > 1: # if len(words) > 1:
novo70 = words[0] # novo70 = words[0]
ipa = words[1] # ipa = words[1]
phoneset_ipa.append(ipa) # phoneset_ipa.append(ipa)
phoneset_novo70.append(novo70) # phoneset_novo70.append(novo70)
translation_key_ipa2novo70[ipa] = novo70 # translation_key_ipa2novo70[ipa] = novo70
translation_key_novo702ipa[novo70] = ipa # translation_key_novo702ipa[novo70] = ipa
# As per Nederlandse phoneset_aki.xlsx recieved from David # As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr # from ipa->novo70, only oh is used. # [ɔː] oh / ohr # from ipa->novo70, only oh is used.
@ -48,14 +49,25 @@ def load_phonset():
# [w] wv in IPA written as ʋ. # [w] wv in IPA written as ʋ.
extra_ipa = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'ʋ'] extra_ipa = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'ʋ']
extra_novo70 = ['oh', 'ih', 'iy', 'uh', 'eh', 'wv'] extra_novo70 = ['oh', 'ih', 'iy', 'uh', 'eh', 'wv']
for ipa, novo70 in zip(extra_ipa, extra_novo70):
phoneset_ipa.append(ipa) phoneset_ipa = list(novo70_phoneset['ipa'])
phoneset_novo70.append(novo70) phoneset_ipa.extend(extra_ipa)
phoneset_ipa = [i.replace('ː', ':') for i in phoneset_ipa]
phoneset_novo70 = list(novo70_phoneset['novo70'])
phoneset_novo70.extend(extra_novo70)
phoneset_novo70 = [i.replace('ː', ':') for i in phoneset_novo70]
translation_key_ipa2novo70 = dict()
translation_key_novo702ipa = dict()
for ipa, novo70 in zip(phoneset_ipa, phoneset_novo70):
#phoneset_ipa.append(ipa)
#phoneset_novo70.append(novo70)
translation_key_ipa2novo70[ipa] = novo70 translation_key_ipa2novo70[ipa] = novo70
translation_key_novo702ipa[novo70] = ipa translation_key_novo702ipa[novo70] = ipa
translation_key_novo702ipa['ohr'] = 'ɔː' translation_key_novo702ipa['ohr'] = 'ɔ:'
translation_key_novo702ipa['ihr'] = 'ɪː' translation_key_novo702ipa['ihr'] = 'ɪ:'
phoneset_ipa = np.unique(phoneset_ipa) phoneset_ipa = np.unique(phoneset_ipa)
phoneset_novo70 = np.unique(phoneset_novo70) phoneset_novo70 = np.unique(phoneset_novo70)
@ -63,25 +75,6 @@ def load_phonset():
return phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa return phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa
def multi_character_tokenize(line, multi_character_tokens):
"""
Tries to match one of the tokens in multi_character_tokens at each position of line,
starting at position 0,
if so tokenizes and eats that token. Otherwise tokenizes a single character.
Copied from forced_alignment.convert_phone_set.py
"""
while line != '':
for token in multi_character_tokens:
if line.startswith(token) and len(token) > 0:
yield token
line = line[len(token):]
break
else:
yield line[:1]
line = line[1:]
def split_ipa(line): def split_ipa(line):
""" """
Split a line by IPA phones. Split a line by IPA phones.
@ -89,13 +82,16 @@ def split_ipa(line):
:param string line: one line written in IPA. :param string line: one line written in IPA.
:return string lineSeperated: the line splitted in IPA phone. :return string lineSeperated: the line splitted in IPA phone.
""" """
phoneset_ipa, _, _, _ = load_novo70_phoneset()
#multi_character_phones = [i for i in phoneset_ipa if len(i) > 1]
#multi_character_phones.sort(key=len, reverse=True)
#multi_character_phones = [
# # IPAs in CGN.
# u'ʌu', u'ɛi', u'œy', u'aː', u'eː', u'iː', u'oː', u'øː', u'ɛː', u'œː', u'ɔː', u'ɛ̃ː', u'ɑ̃ː', u'ɔ̃ː', u'œ̃', u'ɪː'
# ]
#return [phone for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
multi_character_phones = [ return convert_phoneset.split_word(line, phoneset_ipa)
# IPAs in CGN.
u'ʌu', u'ɛi', u'œy', u'aː', u'eː', u'iː', u'oː', u'øː', u'ɛː', u'œː', u'ɔː', u'ɛ̃ː', u'ɑ̃ː', u'ɔ̃ː', u'œ̃', u'ɪː'
]
return [phone for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
def split_novo70(line): def split_novo70(line):
@ -104,29 +100,32 @@ def split_novo70(line):
:param string line: one line written in novo70. :param string line: one line written in novo70.
:return string lineSeperated: the line splitted by novo70 phones. :return string lineSeperated: the line splitted by novo70 phones.
""" """
_, phoneset_novo70, _, _ = load_phonset() _, phoneset_novo70, _, _ = load_novo70_phoneset()
multi_character_phones = [p for p in phoneset_novo70 if len(p) > 1] #multi_character_phones = [p for p in phoneset_novo70 if len(p) > 1]
multi_character_phones = sorted(multi_character_phones, key=len, reverse=True) #multi_character_phones = sorted(multi_character_phones, key=len, reverse=True)
multi_character_phones = convert_phoneset.extract_multi_character_phones(phoneset_novo70)
return ['sp' if phone == ' ' else phone return ['sp' if phone == ' ' else phone
for phone in multi_character_tokenize(line.strip(), multi_character_phones)] for phone in multi_character_tokenize(line.strip(), multi_character_phones)]
def novo702ipa(tokens): def novo702ipa(line):
pronunciation = [] #pronunciation = []
_, _, _, translation_key = load_phonset() _, _, _, translation_key = load_novo70_phoneset()
for phone in split_novo70(tokens): #for phone in split_novo70(tokens):
pronunciation.append(translation_key.get(phone, phone)) # pronunciation.append(translation_key.get(phone, phone))
return ' '.join(pronunciation) #return ' '.join(pronunciation)
return ' '.join(convert_phoneset.convert_phoneset(split_novo70(line), translation_key))
# numbering of novo70 should be checked. # numbering of novo70 should be checked.
def ipa2novo70(tokens): def ipa2novo70(line):
pronunciation = [] #pronunciation = []
_, _, translation_key, _ = load_phonset() _, _, translation_key, _ = load_novo70_phoneset()
for phone in split_ipa(tokens): #for phone in split_ipa(tokens):
pronunciation.append(translation_key.get(phone, phone)) # pronunciation.append(translation_key.get(phone, phone))
return ' '.join(pronunciation) #return ' '.join(pronunciation)
return ' '.join(convert_phoneset.convert_phoneset(split_ipa(line), translation_key))
def make_grammar(word, pronunciation_ipa): def make_grammar(word, pronunciation_ipa):
@ -173,7 +172,10 @@ def forced_alignment(wav_file, word, pronunciation_ipa):
# username / password cannot be passed as artuments... # username / password cannot be passed as artuments...
p = argparse.ArgumentParser() p = argparse.ArgumentParser()
p.add_argument("--user", default='martijn.wieling') p.add_argument("--user", default='martijn.wieling')
p.add_argument("--password", default='fa0Thaic') p.add_argument("--password", default='xxxxxx')
args = p.parse_args() args = p.parse_args()
rec = session.Recognizer(grammar_version="1.0", lang="nl", snodeid=101, user=args.user, password=args.password, keepopen=True) # , modeldir=modeldir) rec = session.Recognizer(grammar_version="1.0", lang="nl", snodeid=101, user=args.user, password=args.password, keepopen=True) # , modeldir=modeldir)
@ -194,6 +196,25 @@ def result2pronunciation(result, word):
return pronunciation_ipa, pronunciation_novo70, llh return pronunciation_ipa, pronunciation_novo70, llh
def phones_not_in_novo70(ipa):
""" extract phones which is not in novo70 phoneset. """
phoneset_ipa, _, _, _ = load_novo70_phoneset()
# As per Nederlandse phoneset_aki.xlsx recieved from David
# [ɔː] oh / ohr
# [ɪː] ih / ihr
# [iː] iy
# [œː] uh
# [ɛː] eh
# [w] wv in IPA written as ʋ.
david_suggestion = ['ɔː', 'ɪː', 'iː', 'œː', 'ɛː', 'w']
return [phone for phone in split_ipa(ipa)
if not phone in phoneset_ipa and not phone in david_suggestion]
if __name__ == 'main': if __name__ == 'main':
pronunciation_ipa = ['rø:s', 'mɑn', 'mɑntsjə'] pronunciation_ipa = ['rø:s', 'mɑn', 'mɑntsjə']
grammar = make_grammar('reus', pronunciation_ipa) #grammar = make_grammar('reus', pronunciation_ipa)
phoneset_ipa, phoneset_novo70, translation_key_ipa2novo70, translation_key_novo702ipa = load_novo70_phoneset()

View File

@ -0,0 +1,154 @@
""" definition of the phones to be used. """
# phonese in {FAME}/lexicon/lex.asr
phoneset = [
# vowels
'a',
'a:',
'e',
'e:',
'i',
'i:',
'',
'o',
'o:',
'ö',
'ö:',
'u',
'u:',
'ü',
'ü:',
#'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr. The pronunciation in Fries may be mistakes so I removed this phone.
'',
'y',
'ɔ',
'ɔ:',
'ɔ̈',
'ɔ̈:',
'ə',
'ɛ',
'ɛ:',
'ɪ',
'ɪ:',
# plosives
'p',
'b',
't',
'd',
'k',
'g',
'ɡ', # = 'g'
# nasals
'm',
'n',
'ŋ',
# fricatives
'f',
'v',
's',
's:',
'z',
'x',
'h',
# tap and flip
'r',
'r:',
# approximant
'j',
'l'
]
## reduce the number of phones.
# the phones which seldom occur are replaced with another more popular phones.
# replacements are based on the advice from Martijn Wieling.
reduction_key = {
'y':'i:', 'e':'e:', 'ə:':'ɛ:', 'r:':'r', 'ɡ':'g',
# aki added because this is used in stimmen_project.
'ɔ̈:':'ɔ:'
}
# already removed beforehand in phoneset. Just to be sure.
phones_to_be_removed = ['ú', 's:']
def phone_reduction(phones):
"""
Args:
phones (list): list of phones.
"""
if sum([phone in phones for phone in phones_to_be_removed]) != 0:
print('input includes phone(s) which is not defined in fame_asr.')
print('those phone(s) are removed.')
return [reduction_key.get(i, i) for i in phones
if i not in phones_to_be_removed]
phoneset_short = list(set(phone_reduction(phoneset)))
phoneset_short.sort()
## translation_key to htk format (ascii).
# phones which gives UnicodeEncodeError when phone.encode("ascii")
# are replaced with other characters.
translation_key_asr2htk = {
'': 'i_',
'': 'u_',
# on the analogy of German umlaut, 'e' is used.
'ö': 'oe', 'ö:': 'oe:', ''
'ü': 'ue', 'ü:': 'ue:',
# on the analogy of Chinese...
'ŋ': 'ng',
# refer to Xsampa.
'ɔ': 'O', 'ɔ:': 'O:', 'ɔ̈': 'Oe',
#'ɔ̈:': 'O:', # does not appear in FAME, but used in stimmen.
'ɛ': 'E', 'ɛ:': 'E:',
'ɪ': 'I', 'ɪ:': 'I:',
# it is @ in Xsampa, but that is not handy on HTK.
'ə': 'A'
}
phoneset_htk = [translation_key_asr2htk.get(i, i) for i in phoneset_short]
#not_in_ascii = [
# '\'',
# 'â', 'ê', 'ô', 'û', 'č',
# 'à', 'í', 'é', 'è', 'ú', 'ć',
# 'ä', 'ë', 'ï', 'ö', 'ü'
#]
translation_key_word2htk = {
#'\'': '\\\'',
'í':'i1', 'é':'e1', 'ú':'u1', 'ć':'c1',
'à':'a2', 'è':'e2',
'â':'a3', 'ê':'e3', 'ô':'o3', 'û':'u3',
'č':'c4',
'ä': 'ao', 'ë': 'ee', 'ï': 'ie', 'ö': 'oe', 'ü': 'ue',
}
#[translation_key_word2htk.get(i, i) for i in not_in_ascii]
#Stop: p, b, t, d, k, g
#Nasal: m, n, ng(ŋ)
#Fricative: s, z, f, v, h, x
#Liquid: l, r
#Vowel: a, a:, e:, i, i:, i_(i̯), o, o:, u, u:, u_(ṷ), oe(ö), oe:(ö:), ue(ü), ue:(ü:), O(ɔ), O:(ɔ:), Oe(ɔ̈), A(ə), E(ɛ), E:(ɛ:), I(ɪ), I:(ɪ:)
## the list of multi character phones.
# for example, the length of 'a:' is 3, but in the codes it is treated as one letter.
# original.
multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones.sort(key=len, reverse=True)
# phonset reduced.
multi_character_phones_short = [i for i in phoneset_short if len(i) > 1]
multi_character_phones_short.sort(key=len, reverse=True)
# htk compatible.
multi_character_phones_htk = [i for i in phoneset_htk if len(i) > 1]
multi_character_phones_htk.sort(key=len, reverse=True)

View File

@ -1,11 +1,11 @@
""" definition of the phones to be used. """ """ definition of the phones to be used. """
## phones in IPA. phoneset = [
phoneset_ipa = [
# vowels # vowels
'', '',
'i̯ⁿ', 'i̯ⁿ',
'y', 'y',
'y:', # not included in lex.ipa, but in stimmen.
'i', 'i',
'i.', 'i.',
'iⁿ', 'iⁿ',
@ -14,7 +14,7 @@ phoneset_ipa = [
'ɪ', 'ɪ',
'ɪⁿ', 'ɪⁿ',
'ɪ.', 'ɪ.',
#'ɪ:', # not included in lex.ipa 'ɪ:', # not included in lex.ipa, but in stimmen.
'ɪ:ⁿ', 'ɪ:ⁿ',
'e', 'e',
'e:', 'e:',
@ -35,7 +35,7 @@ phoneset_ipa = [
'', '',
'ṷ.', 'ṷ.',
'ṷⁿ', 'ṷⁿ',
#'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr. #'ú', # only appears in word 'feeste'(út) and 'gaste'(út) which are 'f e: s t ə' and 'yn' in lex_asr. The pronunciation in Fries may be mistakes so I removed this phone.
'u', 'u',
'uⁿ', 'uⁿ',
'u.', 'u.',
@ -61,7 +61,7 @@ phoneset_ipa = [
'ɔⁿ', 'ɔⁿ',
'ɔ:', 'ɔ:',
'ɔ:ⁿ', 'ɔ:ⁿ',
#'ɔ̈', # not included in lex.ipa 'ɔ̈', # not included in lex.ipa
'ɔ̈.', 'ɔ̈.',
'ɔ̈:', 'ɔ̈:',
@ -101,7 +101,38 @@ phoneset_ipa = [
'l' 'l'
] ]
## reduce the number of phones.
# the phones which are used in stimmen transcription but not in FAME corpus.
# replacements are based on the advice from Jelske Dijkstra on 2018/06/21.
stimmen_replacement = {
'æ': 'ɛ',
'ø': 'ö', # or 'ö:'
'ø:': 'ö:', # Aki added.
'œ': 'ɔ̈', # or 'ɔ̈:'
'œ:': 'ɔ̈:', # Aki added.
'ɐ': 'a', # or 'a:'
'ɐ:': 'a:', # Aki added.
'ɑ': 'a', # or 'a:'
'ɑ:': 'a:', # Aki added
'ɒ': 'ɔ', # or 'ɔ:'
'ɒ:': 'ɔ:', # Aki added.
'ɾ': 'r',
'ʁ': 'r',
'ʊ': 'u',
'χ': 'x',
# aki guessed.
'ʀ': 'r',
'ɹ': 'r',
'w': 'ö'
}
phoneset.extend(list(stimmen_replacement.keys()))
def phone_reduction(phones):
return [stimmen_replacement.get(i, i) for i in phones]
## the list of multi character phones. ## the list of multi character phones.
# for example, the length of 'i̯ⁿ' is 3, but in the codes it is treated as one letter. # for example, the length of 'i̯ⁿ' is 3, but in the codes it is treated as one letter.
multi_character_phones_ipa = [i for i in phoneset_ipa if len(i) > 1] multi_character_phones = [i for i in phoneset if len(i) > 1]
multi_character_phones_ipa.sort(key=len, reverse=True) multi_character_phones.sort(key=len, reverse=True)

Binary file not shown.

View File

@ -0,0 +1,197 @@
import sys
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import fame_functions
from phoneset import fame_ipa, fame_asr
import convert_phoneset
## general
stop = 'p, b, t, d, k, g'
nasal = 'm, n, ŋ'
fricative = 's, z, f, v, h, x, j'
liquid = 'l, r'
vowel = 'a, a:, e:, i, i:, i̯, o, o:, u, u:, ṷ, ö, ö:, ü, ü:, ɔ, ɔ:, ɔ̈, ə, ɛ, ɛ:, ɪ, ɪ:'
## consonant
c_front = 'p, b, m, f, v'
c_central = 't, d, n, s, z, l, r'
c_back = 'k, g, ŋ, h, x, j'
fortis = 'p, t, k, f, s'
lenis = 'b, d, g, v, z, j'
neither_fortis_nor_lenis = 'm, n, ŋ, h, l, r, x'
coronal = 't, d, n, s, z, l, r, j'
non_coronal = 'p, b, m, k, g, ŋ, f, v, h, x'
anterior = 'p, b, m, t, d, n, f, v, s, z, l'
non_anterior = 'k, g, ŋ, h, x, j, r'
continuent = 'm, n, ŋ, f, v, s, z, h, l, r'
non_continuent = 'p, b, t, d, k, g, x, j'
strident = 's, z, j'
non_strident = 'f, v, h'
unstrident = 'p, b, t, d, m, n, ŋ, k, g, r, x'
glide = 'h, l, r'
syllabic = 'm, l, ŋ'
unvoiced = 'p, t, k, s, f, x, h'
voiced = 'b, d, g, z, v, m, n, ŋ, l, r, j'
#affricate: ???
non_affricate = 's, z, f, v'
voiced_stop = 'b, d, g'
unvoiced_stop = 'p, t, k'
front_stop = 'p, b'
central_stop = 't, d'
back_stop = 'k, g'
voiced_fricative = 'z, v'
unvoiced_fricative = 's, f'
front_fricative = 'f, v'
central_fricative = 's, z'
back_fricative = 'j'
## vowel
v_front = 'i, i:, i̯, ɪ, ɪ:, e:, ə, ɛ, ɛ:, a, a:'
v_central = 'ə, ɛ, ɛ:, a, a:'
v_back = 'u, u:, ü, ü:, ṷ, ɔ, ɔ:, ɔ̈, ö, ö:, o, o:'
long = 'a:, e:, i:, o:, u:, ö:, ü:, ɔ:, ɛ:, ɪ:'
short = 'a, i, i̯, o, u, ṷ, ö, ü, ɔ, ɔ̈, ə, ɛ, ɪ'
#Dipthong: ???
#Front-Start: ???
#Fronting: ???
high = 'i, i:, i̯, ɪ, ɪ: u, u:, ṷ, ə, e:, o, o:, ö, ö:, ü, ü:'
medium = 'e:, ə, ɛ, ɛ:, ɔ, ɔ:, ɔ̈, o, o:, ö, ö:'
low = 'a, a:, ɛ, ɛ:, ɔ, ɔ:, ɔ̈'
rounded = 'a, a:, o, o:, u, u:, ṷ, ö, ö:, ü, ü:, ɔ, ɔ:, ɔ̈'
unrounded = 'i, i:, i̯, e:, ə, ɛ, ɛ:, ɪ, ɪ:'
i_vowel = 'i, i:, i̯, ɪ, ɪ:'
e_vowel = 'e:,ə, ɛ, ɛ:'
a_vowel = 'a, a:'
o_vowel = 'o, o:, ö, ö:, ɔ, ɔ:, ɔ̈'
u_vowel = 'u, u:, ṷ, ü, ü:'
## htk phoneset
phoneset = fame_asr.phoneset_htk
## convert ipa group to htk format for quests.hed.
def _ipa2quest(R_or_L, ipa_text):
assert R_or_L in ['R', 'L'], print('the first argument should be either R or L.')
ipa_list = ipa_text.replace(' ', '').split(',')
if R_or_L == 'R':
quests_list = ['*+' + fame_functions.ipa2htk(ipa) for ipa in ipa_list]
else:
quests_list = [fame_functions.ipa2htk(ipa) + '-*' for ipa in ipa_list]
return ','.join(quests_list)
def make_quests_hed(quest_hed):
def _add_quests_item(R_or_L, item_name_, ipa_text):
assert R_or_L in ['R', 'L'], print('the first argument should be either R or L.')
item_name = R_or_L + '_' + item_name_
with open(quest_hed, 'ab') as f:
f.write(bytes('QS "' + item_name + '"\t{ ' + _ipa2quest(R_or_L, ipa_text) + ' }\n', 'ascii'))
if os.path.exists(quest_hed):
os.remove(quest_hed)
for R_or_L in ['R', 'L']:
_add_quests_item(R_or_L, 'NonBoundary', '*')
_add_quests_item(R_or_L, 'Silence', 'sil')
_add_quests_item(R_or_L, 'Stop', stop)
_add_quests_item(R_or_L, 'Nasal', nasal)
_add_quests_item(R_or_L, 'Fricative', fricative)
_add_quests_item(R_or_L, 'Liquid', liquid)
_add_quests_item(R_or_L, 'Vowel', vowel)
_add_quests_item(R_or_L, 'C-Front', c_front)
_add_quests_item(R_or_L, 'C-Central', c_central)
_add_quests_item(R_or_L, 'C-Back', c_back)
_add_quests_item(R_or_L, 'V-Front', v_front)
_add_quests_item(R_or_L, 'V-Central', v_central)
_add_quests_item(R_or_L, 'V-Back', v_back)
_add_quests_item(R_or_L, 'Front', c_front + v_front)
_add_quests_item(R_or_L, 'Central', c_central + v_central)
_add_quests_item(R_or_L, 'Back', c_front + v_back)
_add_quests_item(R_or_L, 'Fortis', fortis)
_add_quests_item(R_or_L, 'Lenis', lenis)
_add_quests_item(R_or_L, 'UnFortLenis', neither_fortis_nor_lenis)
_add_quests_item(R_or_L, 'Coronal', coronal)
_add_quests_item(R_or_L, 'NonCoronal', non_coronal)
_add_quests_item(R_or_L, 'Anterior', anterior)
_add_quests_item(R_or_L, 'NonAnterior', non_anterior)
_add_quests_item(R_or_L, 'Continuent', continuent)
_add_quests_item(R_or_L, 'NonContinuent', non_continuent)
_add_quests_item(R_or_L, 'Strident', strident)
_add_quests_item(R_or_L, 'NonStrident', non_strident)
_add_quests_item(R_or_L, 'UnStrident', unstrident)
_add_quests_item(R_or_L, 'Glide', glide)
_add_quests_item(R_or_L, 'Syllabic', syllabic)
_add_quests_item(R_or_L, 'Unvoiced-Cons', unvoiced)
_add_quests_item(R_or_L, 'Voiced-Cons', voiced)
_add_quests_item(R_or_L, 'Unvoiced-All', unvoiced + ', sil')
_add_quests_item(R_or_L, 'Long', long)
_add_quests_item(R_or_L, 'Short', short)
#_add_quests_item(R_or_L, 'Dipthong', xxx)
#_add_quests_item(R_or_L, 'Front-Start', xxx)
#_add_quests_item(R_or_L, 'Fronting', xxx)
_add_quests_item(R_or_L, 'High', high)
_add_quests_item(R_or_L, 'Medium', medium)
_add_quests_item(R_or_L, 'Low', low)
_add_quests_item(R_or_L, 'Rounded', rounded)
_add_quests_item(R_or_L, 'UnRounded', unrounded)
#_add_quests_item(R_or_L, 'Affricative', rounded)
_add_quests_item(R_or_L, 'NonAffricative', non_affricate)
_add_quests_item(R_or_L, 'IVowel', i_vowel)
_add_quests_item(R_or_L, 'EVowel', e_vowel)
_add_quests_item(R_or_L, 'AVowel', a_vowel)
_add_quests_item(R_or_L, 'OVowel', o_vowel)
_add_quests_item(R_or_L, 'UVowel', u_vowel)
_add_quests_item(R_or_L, 'Voiced-Stop', voiced_stop)
_add_quests_item(R_or_L, 'UnVoiced-Stop', unvoiced_stop)
_add_quests_item(R_or_L, 'Front-Stop', front_stop)
_add_quests_item(R_or_L, 'Central-Stop', central_stop)
_add_quests_item(R_or_L, 'Back-Stop', back_stop)
_add_quests_item(R_or_L, 'Voiced-Fric', voiced_fricative)
_add_quests_item(R_or_L, 'UnVoiced-Fric', unvoiced_fricative)
_add_quests_item(R_or_L, 'Front-Fric', front_fricative)
_add_quests_item(R_or_L, 'Central-Fric', central_fricative)
_add_quests_item(R_or_L, 'Back-Fric', back_fricative)
for p in phoneset:
_add_quests_item(R_or_L, p, p)
return

View File

@ -0,0 +1,119 @@
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import glob
import pandas as pd
import convert_xsampa2ipa
import defaultfiles as default
import fame_functions
import novoapi_functions
def _load_transcriptions():
stimmen_transcription = pd.ExcelFile(default.stimmen_transcription_xlsx)
df = pd.read_excel(stimmen_transcription, 'original')
# mapping from ipa to xsampa
mapping = convert_xsampa2ipa.load_converter('xsampa', 'ipa', default.ipa_xsampa_converter_dir)
#for xsampa, ipa in zip(df['X-SAMPA'], df['IPA']):
# ipa_converted = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
# if not ipa_converted == ipa:
# print('{0}: {1} - {2}'.format(xsampa, ipa_converted, ipa))
ipas = []
for xsampa in df['Self Xsampa']:
if not isinstance(xsampa, float): # 'NaN'
# typo?
xsampa = xsampa.replace('r2:z@rA:\\t', 'r2:z@rA:t').replace(';', ':')
ipa = convert_xsampa2ipa.xsampa2ipa(mapping, xsampa)
ipa = ipa.replace('ː', ':').replace(' ', '')
ipas.append(ipa)
else:
ipas.append('')
df_ = pd.DataFrame({'filename': df['Filename'],
'word': df['Word'],
'xsampa': df['Self Xsampa'],
'ipa': pd.Series(ipas)})
# not valid inputs, but seperator.
df_ = df_[~df_['ipa'].str.contains('/')]
return df_.dropna()
def load_transcriptions():
""" in default.stimmen_transcription_xlsx
rows of which wav files can be easily found"""
df = _load_transcriptions()
df_ = pd.DataFrame(index=[], columns=list(df.keys()))
for index, row in df.iterrows():
filename = row['filename']
if isinstance(filename, str):
wav_file = os.path.join(default.stimmen_wav_dir, filename)
if os.path.exists(wav_file):
df_ = df_.append(row, ignore_index=True)
return df_
def load_transcriptions_clean(clean_wav_dir):
df = _load_transcriptions()
wav_file_list = glob.glob(os.path.join(clean_wav_dir, '*.wav'))
df_clean = pd.DataFrame(index=[], columns=list(df.keys()))
for wav_file in wav_file_list:
filename = os.path.basename(wav_file)
df_ = df[df['filename'].str.match(filename)]
df_clean = pd.concat([df_clean, df_])
return df_clean
def load_transcriptions_novo70(clean_wav_dir):
""" extract rows of which ipa is written in novo70 phonset. """
df = load_transcriptions_clean(clean_wav_dir)
df_novo70 = pd.DataFrame(index=[], columns=list(df.keys()))
for index, row in df.iterrows():
not_in_novo70 = novoapi_functions.phones_not_in_novo70(row['ipa'])
if len(not_in_novo70) == 0:
df_novo70 = df_novo70.append(row, ignore_index=True)
return df_novo70
def add_row_htk(df):
""" df['htk'] is made from df['ipa'] and added. """
htk = []
for index, row in df.iterrows():
htk.append(fame_functions.ipa2htk(row['ipa']))
return df.assign(htk=htk)
def add_row_asr(df):
""" df['asr'] is made from df['ipa'] and added. """
asr = []
for index, row in df.iterrows():
asr.append(fame_functions.ipa2asr(row['ipa']))
return df.assign(asr=asr)
def load_pronunciations(WORD, htk_dic):
""" load pronunciation variants from HTK dic file.
Args:
WORD (str): word in capital letters.
htk_dic (path): HTK dict file.
Returns:
(pronunciations) (list): pronunciation variants of WORD.
Notes:
Because this function loads all contents from htk_dic file,
it is not recommended to use for large lexicon.
"""
with open(htk_dic) as f:
lines = f.read().replace(' sil', '')
lines = lines.split('\n')
return [' '.join(line.split(' ')[1:])
for line in lines if line.split(' ')[0]==WORD]

View File

@ -0,0 +1,93 @@
import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
import shutil
from collections import Counter
import numpy as np
import pandas as pd
import defaultfiles as default
import convert_xsampa2ipa
import stimmen_functions
import fame_functions
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
import file_handling as fh
from htk import pyhtk
## ======================= user define =======================
## ======================= make test data ======================
stimmen_test_dir = r'c:\OneDrive\Research\rug\_data\stimmen_test'
## copy wav files which is in the stimmen data.
df = stimmen_functions.load_transcriptions()
#for index, row in df.iterrows():
# filename = row['filename']
# wav_file = os.path.join(default.stimmen_wav_dir, filename)
# shutil.copy(wav_file, os.path.join(stimmen_test_dir, filename))
# after manually removed files which has too much noise and multiple words...
# update the info.
df_clean = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
# count how many files are removed due to the quality.
word_list = [i for i in list(set(df['word'])) if not pd.isnull(i)]
word_list = sorted(word_list)
for word in word_list:
df_ = df[df['word']==word]
df_clean_ = df_clean[df_clean['word']==word]
print('word {0} has {1} clean files among {2} files ({3:.2f} [%]).'.format(
word, len(df_clean_), len(df_), len(df_clean_)/len(df_)*100))
## check phones included in stimmen but not in FAME!
splitted_ipas = [' '.join(
convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones))
for ipa in df['ipa']]
stimmen_phones = set(' '.join(splitted_ipas))
stimmen_phones = list(stimmen_phones)
fame_phones = fame_ipa.phoneset
stimmen_phones.sort()
fame_phones.sort()
print('phones which are used in stimmen transcription but not in FAME corpus are:\n{}'.format(
set(stimmen_phones) - set(fame_phones)
))
for ipa in df['ipa']:
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
if ':' in ipa_splitted:
print(ipa_splitted)
## check pronunciation variants
df_clean = stimmen_functions.load_transcriptions_clean(stimmen_test_dir)
df_clean = stimmen_functions.add_row_asr(df_clean)
df_clean = stimmen_functions.add_row_htk(df_clean)
for word in word_list:
#word = word_list[1]
df_ = df_clean[df_clean['word']==word]
c = Counter(df_['htk'])
pronunciations = dict()
for key, value in zip(c.keys(), c.values()):
if value > 3:
pronunciations[key] = value
print(pronunciations)
monophone_mlf = os.path.join(default.htk_dir, 'label', 'train_phone_aligned.mlf')
triphone_mlf = os.path.join(default.htk_dir, 'label', 'train_triphone.mlf')
def filenames_in_mlf(file_mlf):
with open(file_mlf) as f:
lines_ = f.read().split('\n')
lines = [line for line in lines_ if len(line.split(' ')) == 1 and line != '.']
filenames = [line.replace('"', '').replace('*/', '') for line in lines[1:-1]]
return filenames
filenames_mono = filenames_in_mlf(monophone_mlf)
filenames_tri = filenames_in_mlf(triphone_mlf)