accent_classification/accent_classification/speaker_based_functions.py

355 lines
11 KiB
Python

import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import itertools
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix
import data_manipulation as mani
import evaluation as eval
# extract data that corresponds to pid in the pidlist
def extractPid(pidlist, data):
for pidnum in range(0, len(pidlist)):
pid = pidlist[pidnum, 0]
x = data[data[:, 1] == pid, :]
if pidnum == 0:
data_ = x
else:
data_ = np.r_[data_, x]
return data_
def OneHotEncoding(data, LB_X, LE_y):
# one hot encoding of data using LabelBinalizer per word (LB_X) and for region (LB_y)
# INPUT
# data
# 0: filename
# 1: pid
# 2: region
# 3: ID (unique word_id)
# 4: sentence_id
# 5: word_id
# 6: word
# 7: pronunciation
# LB_x: LabelBinalizer objects
# LE_y: LabelEncoder object
# OUTPUT
# X: encoded variable data
# y: encoded target data
pidlist = data[:, 1]
regionlist = data[:, 2]
uniqueWordIDlist = data[:, 3].astype(int)
pronvarlist = data[:, 7]
uniqueWordIDlist_unique = np.unique(uniqueWordIDlist)
uniqueWordIDlist_unique.sort()
for uniqueWordIDnum in uniqueWordIDlist_unique:
x_ = pronvarlist[uniqueWordIDlist == uniqueWordIDnum]
lb = LB_X[uniqueWordIDnum-1]
x = lb.transform(x_)
if uniqueWordIDnum == uniqueWordIDlist_unique[0]:
X = x
else:
X = np.c_[X, x]
# pid and region of the speakers
y_ = regionlist[uniqueWordIDlist == uniqueWordIDlist_unique[0]]
y = LE_y.transform(y_)
pid = pidlist[uniqueWordIDlist == uniqueWordIDlist_unique[0]]
return X, y, pid
def outputConfusionMatrix33(foutName, matrixName, regionLabels):
for r in range(0, len(regionLabels)):
execString1 = foutName + '.write("{0},{1},{2},{3}\\n".format('
execString2 = 'regionLabels[' + str(r) + ']'
execString3 = ''
for c in range(0, len(regionLabels)):
execString3 = execString3 + ',' + matrixName + '[' + str(r) + '][' + str(c) + ']'
execString4 = '))'
execString = execString1 + execString2 + execString3 + execString4
exec(execString)
def compare_sentence_level_classifiers(data_train, LBlist, LE_y, fileCSV):
""" compare the classification algorithms on sentence-level classifiers.
Args:
data_train: training data.
LBlist: list of label binarizer, which is used to encode pronunciation variants.
LE_y: label encorder, which is used to encode rigion names.
fileCSV: output csv file path.
"""
fout = open(fileCSV, "w")
sentenceIDlist_train = data_train[:, 4].astype(int)
sentenceIDmax_train = max(sentenceIDlist_train)
for sentenceID in range(1, sentenceIDmax_train+1):
sentenceIDstr = format(sentenceID, '02')
## categorical values into binary values.
data_sentence = data_train[sentenceIDlist_train == sentenceID, :]
X_train, y_train, pid_train = OneHotEncoding(data_sentence, LBlist, LE_y)
regionCounter = Counter(LE_y.inverse_transform(y_train))
## classifier comparison
names = [
"Nearest Neighbors",
"Linear SVM",
"Poly SVM",
"RBF SVM",
"Decision Tree",
"Random Forest 2",
"Random Forest 3",
"Random Forest 4",
"AdaBoost",
"AdaBoost(SVM)",
"AdaBoost(Random Forest 3)",
"Naive Bayes",
"Linear Discriminant Analysis",
"Quadratic Discriminant Analysis"
]
classifiers = [
KNeighborsClassifier(3),
SVC(kernel="linear", C=0.025),
SVC(kernel="poly", C=0.025),
SVC(gamma=2, C=1),
DecisionTreeClassifier(max_depth=4),
RandomForestClassifier(max_depth=2, n_estimators=10, max_features=1),
RandomForestClassifier(max_depth=3, n_estimators=10, max_features=1),
RandomForestClassifier(max_depth=4, n_estimators=10, max_features=1),
AdaBoostClassifier(),
AdaBoostClassifier(SVC(probability=True, kernel='linear')),
AdaBoostClassifier(RandomForestClassifier(max_depth=3, n_estimators=10, max_features=1)),
GaussianNB(),
LinearDiscriminantAnalysis(),
QuadraticDiscriminantAnalysis()
]
for name, model in zip(names, classifiers):
scores = cross_val_score(model, X_train, y_train, cv = 10, scoring = 'f1_micro')
fout.write("{0},{1},{2},{3}\n".format(sentenceID, name, scores.mean(), scores.var()))
print('{0}, {1}: {2}'.format(sentenceID, name, scores.mean()))
fout.close()
def train_sentence_level_classifiers(data_train, LBlist, LE_y, fileCSV):
""" train sentence-level classifiers.
Args:
data_train: training data.
LBlist: list of label binarizer, which is used to encode pronunciation variants.
LE_y: label encorder, which is used to encode rigion names.
fileCSV: output csv file path.
Returns:
modelList (list): list of models (length: sentenceNumMax)
scoreList (list): list of scores (length: sentenceNumMax)
"""
fout = open(fileCSV, "w")
fout.write('< cross-validation in training set >\n')
sentenceIDlist_train = data_train[:, 4].astype(int)
sentenceIDmax_train = max(sentenceIDlist_train)
modelList = []
scoreList = []
confusionMatrixList = []
for sentenceID in range(1, sentenceIDmax_train+1):
sentenceIDstr = format(sentenceID, '02')
## categorical values into binary values.
data_sentence = data_train[sentenceIDlist_train == sentenceID, :]
X_train, y_train, pid_train = OneHotEncoding(data_sentence, LBlist, LE_y)
regionCounter = Counter(LE_y.inverse_transform(y_train))
## cross-validation with the best classifier
model = AdaBoostClassifier()
#model = SVC(kernel="linear", C=0.025)
#model = LinearDiscriminantAnalysis()
# #scores = cross_val_score(model, X_train, y_train, cv = 10, scoring = 'f1_micro')
scores, confusionMatrix = eval.cross_val_confusion_matrix(model, X_train, y_train, 10)
ci_mean, ci_low, ci_high = eval.mean_confidence_interval(scores, 0.95)
scoreList.append(scores)
confusionMatrixList.append(confusionMatrix)
## model fitting
modelfit = model.fit(X_train, y_train)
modelList.append(modelfit)
## output
fout.write("{},".format(sentenceID))
#fout.write("{0},{1},{2},".format(
# regionCounter['Groningen_and_Drenthe'], regionCounter['Limburg'], regionCounter['Oost_Overijsel-Gelderland']))
#fout.write("{0},{1},".format(
# regionCounter['Low_Saxon'], regionCounter['Limburg']))
fout.write("{0},{1},".format(
regionCounter['Groningen_and_Drenthe'], regionCounter['Limburg']))
fout.write("{0},{1},{2}\n".format(ci_mean, ci_low, ci_high))
fout.write('\n')
fout.close()
return modelList, scoreList, confusionMatrixList
def prediction_per_sentence(data_eval, modelList, LBlist, LE_y):
""" prediction using sentence-level classifiers.
Args:
data_eval: evaluation data.
modelList: list of the models.
LBlist: list of label binarizer, which is used to encode pronunciation variants.
LE_y: label encorder, which is used to encode rigion names.
Returns:
prediction (list): [sentenceID, pid, answer, prediction]
"""
sentenceIDlist_eval = data_eval[:, 4].astype(int)
sentenceIDmax_eval = max(sentenceIDlist_eval)
for sentenceID in range(1, sentenceIDmax_eval+1):
sentenceIDstr = format(sentenceID, '02')
## categorical values into binary values.
data_sentence = data_eval[sentenceIDlist_eval == sentenceID, :]
X_eval, y_eval, pid_eval = OneHotEncoding(data_sentence, LBlist, LE_y)
regionCounter = Counter(LE_y.inverse_transform(y_eval))
## evaluate model
modelfit = modelList[sentenceID-1]
y_pred = modelfit.predict(X_eval)
y_pred_label = LE_y.inverse_transform(y_pred)
y_eval_label = LE_y.inverse_transform(y_eval)
# pid, y, y_pred
sentenceIDvec = np.ones((y_eval_label.shape[0], 1)).astype(int) * sentenceID
prediction_ = np.c_[sentenceIDvec, pid_eval, y_eval_label, y_pred_label]
if sentenceID == 1:
prediction = prediction_
else:
prediction = np.r_[prediction, prediction_]
return prediction
def prediction_per_pid_majority(pidlist_eval, prediction):
""" make a prediction per pid using majority vote
Returns:
prediction_per_pid (ndarray): [pid, ans, prediction]
"""
prediction_per_pid = []
for pid_ in range(0, len(pidlist_eval[:, 0])):
pid = pidlist_eval[pid_, 0]
ans = pidlist_eval[pid_, 1]
prediction_ = prediction[prediction[:, 1] == pid, :]
# majority vote
predCounter = Counter(prediction_[:, -1])
predMostCommon = predCounter.most_common(1)
predLabel = predMostCommon[0][0]
predRatio = predMostCommon[0][1] / prediction_.shape[0] * 100
prediction_per_pid.append([pid, ans, predLabel])
return np.array(prediction_per_pid)
def calc_weight(confusionMatrixList):
""" calculate weight (how trustworthy the prediction is) for majority vote.
Note:
Of all subjects we predicted are GO/OG/LB, what fraction of them actually are (precision) is used as weight.
Args:
confusionMarixList: list of confusion matrix of sentence-level classifiers.
"""
sentenceID_max = len(confusionMatrixList)
weight = np.zeros((sentenceID_max, confusionMatrixList[0].shape[0]))
for sentenceID in range(1, sentenceID_max+1):
cm = confusionMatrixList[sentenceID-1]
# normalized confusion matrix
#rTotal = np.sum(cm, axis=1)
#cm_normalized = cm / rTotal
#weight[sentenceID-1, :] = np.diag(cm_normalized)
true_positives = np.diag(cm)
predicted = np.sum(cm, axis=0)
weight[sentenceID-1, :] = true_positives / predicted
return weight
def prediction_per_pid_weighted(pidlist_eval, prediction, weight, LB_y, LE_y):
""" make a prediction per pid using weighted (majority) vote.
Args:
weight (ndarray): how trustworthy the prediction of each sentence-based classifier is.
LB_y: label binalizer, which is used to encode region names.
LE_y: label encorder, which is used to encode region names.
Returns:
prediction_per_pid (ndarray): [pid, ans, prediction]
"""
prediction_per_pid = []
for pid_ in range(0, len(pidlist_eval[:, 0])):
pid = pidlist_eval[pid_, 0]
ans = pidlist_eval[pid_, 1]
prediction_ = prediction[prediction[:, 1] == pid, :]
# calculate weighted (majority) vote
vote_weighted = np.zeros((1, 3))
for sentenceID_ in range(0, prediction_.shape[0]):
sentenceID = prediction_[sentenceID_, 0].astype(int)
w = weight[sentenceID-1, :]
pred = prediction_[sentenceID_, 3]
pred_int = LB_y.transform([pred])
vote_weighted = vote_weighted + w * pred_int
# choose the most vote
vote_weighted = vote_weighted[0]
maxindex = list(vote_weighted).index(max(vote_weighted))
#predLabel = regionLabels[maxindex]
predLabel = LE_y.inverse_transform(maxindex)
prediction_per_pid.append([pid, ans, predLabel])
return np.array(prediction_per_pid)
def groningen_vs_limburg(pidlist3):
"""convert a pidlist for 3 regions into that for 2 regions.
Notes:
3 regions include ['Groningen_and_Drenthe', 'Limburg', 'Oost_Overijsel-Gelderland']
2 regions include ['Groningen_and_Drenthe', 'Limburg']
"""
regionLabels = ['Groningen_and_Drenthe', 'Oost_Overijsel-Gelderland', 'Limburg']
pidlist_groningen = pidlist3[pidlist3[:, 1] == regionLabels[0], :]
pidlist_limburg = pidlist3[pidlist3[:, 1] == regionLabels[1], :]
pidlist2 = np.r_[pidlist_groningen, pidlist_limburg]
return pidlist2