import numpy as np from collections import Counter import matplotlib.pyplot as plt import itertools from sklearn.neighbors import KNeighborsClassifier from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier from sklearn.naive_bayes import GaussianNB from sklearn.discriminant_analysis import LinearDiscriminantAnalysis from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis from sklearn.model_selection import cross_val_score from sklearn.metrics import confusion_matrix import data_manipulation as mani import evaluation as eval # extract data that corresponds to pid in the pidlist def extractPid(pidlist, data): for pidnum in range(0, len(pidlist)): pid = pidlist[pidnum, 0] x = data[data[:, 1] == pid, :] if pidnum == 0: data_ = x else: data_ = np.r_[data_, x] return data_ def OneHotEncoding(data, LB_X, LE_y): # one hot encoding of data using LabelBinalizer per word (LB_X) and for region (LB_y) # INPUT # data # 0: filename # 1: pid # 2: region # 3: ID (unique word_id) # 4: sentence_id # 5: word_id # 6: word # 7: pronunciation # LB_x: LabelBinalizer objects # LE_y: LabelEncoder object # OUTPUT # X: encoded variable data # y: encoded target data pidlist = data[:, 1] regionlist = data[:, 2] uniqueWordIDlist = data[:, 3].astype(int) pronvarlist = data[:, 7] uniqueWordIDlist_unique = np.unique(uniqueWordIDlist) uniqueWordIDlist_unique.sort() for uniqueWordIDnum in uniqueWordIDlist_unique: x_ = pronvarlist[uniqueWordIDlist == uniqueWordIDnum] lb = LB_X[uniqueWordIDnum-1] x = lb.transform(x_) if uniqueWordIDnum == uniqueWordIDlist_unique[0]: X = x else: X = np.c_[X, x] # pid and region of the speakers y_ = regionlist[uniqueWordIDlist == uniqueWordIDlist_unique[0]] y = LE_y.transform(y_) pid = pidlist[uniqueWordIDlist == uniqueWordIDlist_unique[0]] return X, y, pid def outputConfusionMatrix33(foutName, matrixName, regionLabels): for r in range(0, len(regionLabels)): execString1 = foutName + '.write("{0},{1},{2},{3}\\n".format(' execString2 = 'regionLabels[' + str(r) + ']' execString3 = '' for c in range(0, len(regionLabels)): execString3 = execString3 + ',' + matrixName + '[' + str(r) + '][' + str(c) + ']' execString4 = '))' execString = execString1 + execString2 + execString3 + execString4 exec(execString) def compare_sentence_level_classifiers(data_train, LBlist, LE_y, fileCSV): """ compare the classification algorithms on sentence-level classifiers. Args: data_train: training data. LBlist: list of label binarizer, which is used to encode pronunciation variants. LE_y: label encorder, which is used to encode rigion names. fileCSV: output csv file path. """ fout = open(fileCSV, "w") sentenceIDlist_train = data_train[:, 4].astype(int) sentenceIDmax_train = max(sentenceIDlist_train) for sentenceID in range(1, sentenceIDmax_train+1): sentenceIDstr = format(sentenceID, '02') ## categorical values into binary values. data_sentence = data_train[sentenceIDlist_train == sentenceID, :] X_train, y_train, pid_train = OneHotEncoding(data_sentence, LBlist, LE_y) regionCounter = Counter(LE_y.inverse_transform(y_train)) ## classifier comparison names = [ "Nearest Neighbors", "Linear SVM", "Poly SVM", "RBF SVM", "Decision Tree", "Random Forest 2", "Random Forest 3", "Random Forest 4", "AdaBoost", "AdaBoost(SVM)", "AdaBoost(Random Forest 3)", "Naive Bayes", "Linear Discriminant Analysis", "Quadratic Discriminant Analysis" ] classifiers = [ KNeighborsClassifier(3), SVC(kernel="linear", C=0.025), SVC(kernel="poly", C=0.025), SVC(gamma=2, C=1), DecisionTreeClassifier(max_depth=4), RandomForestClassifier(max_depth=2, n_estimators=10, max_features=1), RandomForestClassifier(max_depth=3, n_estimators=10, max_features=1), RandomForestClassifier(max_depth=4, n_estimators=10, max_features=1), AdaBoostClassifier(), AdaBoostClassifier(SVC(probability=True, kernel='linear')), AdaBoostClassifier(RandomForestClassifier(max_depth=3, n_estimators=10, max_features=1)), GaussianNB(), LinearDiscriminantAnalysis(), QuadraticDiscriminantAnalysis() ] for name, model in zip(names, classifiers): scores = cross_val_score(model, X_train, y_train, cv = 10, scoring = 'f1_micro') fout.write("{0},{1},{2},{3}\n".format(sentenceID, name, scores.mean(), scores.var())) print('{0}, {1}: {2}'.format(sentenceID, name, scores.mean())) fout.close() def train_sentence_level_classifiers(data_train, LBlist, LE_y, fileCSV): """ train sentence-level classifiers. Args: data_train: training data. LBlist: list of label binarizer, which is used to encode pronunciation variants. LE_y: label encorder, which is used to encode rigion names. fileCSV: output csv file path. Returns: modelList (list): list of models (length: sentenceNumMax) scoreList (list): list of scores (length: sentenceNumMax) """ fout = open(fileCSV, "w") fout.write('< cross-validation in training set >\n') sentenceIDlist_train = data_train[:, 4].astype(int) sentenceIDmax_train = max(sentenceIDlist_train) modelList = [] scoreList = [] confusionMatrixList = [] for sentenceID in range(1, sentenceIDmax_train+1): sentenceIDstr = format(sentenceID, '02') ## categorical values into binary values. data_sentence = data_train[sentenceIDlist_train == sentenceID, :] X_train, y_train, pid_train = OneHotEncoding(data_sentence, LBlist, LE_y) regionCounter = Counter(LE_y.inverse_transform(y_train)) ## cross-validation with the best classifier model = AdaBoostClassifier() #model = SVC(kernel="linear", C=0.025) #model = LinearDiscriminantAnalysis() # #scores = cross_val_score(model, X_train, y_train, cv = 10, scoring = 'f1_micro') scores, confusionMatrix = eval.cross_val_confusion_matrix(model, X_train, y_train, 10) ci_mean, ci_low, ci_high = eval.mean_confidence_interval(scores, 0.95) scoreList.append(scores) confusionMatrixList.append(confusionMatrix) ## model fitting modelfit = model.fit(X_train, y_train) modelList.append(modelfit) ## output fout.write("{},".format(sentenceID)) #fout.write("{0},{1},{2},".format( # regionCounter['Groningen_and_Drenthe'], regionCounter['Limburg'], regionCounter['Oost_Overijsel-Gelderland'])) #fout.write("{0},{1},".format( # regionCounter['Low_Saxon'], regionCounter['Limburg'])) fout.write("{0},{1},".format( regionCounter['Groningen_and_Drenthe'], regionCounter['Limburg'])) fout.write("{0},{1},{2}\n".format(ci_mean, ci_low, ci_high)) fout.write('\n') fout.close() return modelList, scoreList, confusionMatrixList def prediction_per_sentence(data_eval, modelList, LBlist, LE_y): """ prediction using sentence-level classifiers. Args: data_eval: evaluation data. modelList: list of the models. LBlist: list of label binarizer, which is used to encode pronunciation variants. LE_y: label encorder, which is used to encode rigion names. Returns: prediction (list): [sentenceID, pid, answer, prediction] """ sentenceIDlist_eval = data_eval[:, 4].astype(int) sentenceIDmax_eval = max(sentenceIDlist_eval) for sentenceID in range(1, sentenceIDmax_eval+1): sentenceIDstr = format(sentenceID, '02') ## categorical values into binary values. data_sentence = data_eval[sentenceIDlist_eval == sentenceID, :] X_eval, y_eval, pid_eval = OneHotEncoding(data_sentence, LBlist, LE_y) regionCounter = Counter(LE_y.inverse_transform(y_eval)) ## evaluate model modelfit = modelList[sentenceID-1] y_pred = modelfit.predict(X_eval) y_pred_label = LE_y.inverse_transform(y_pred) y_eval_label = LE_y.inverse_transform(y_eval) # pid, y, y_pred sentenceIDvec = np.ones((y_eval_label.shape[0], 1)).astype(int) * sentenceID prediction_ = np.c_[sentenceIDvec, pid_eval, y_eval_label, y_pred_label] if sentenceID == 1: prediction = prediction_ else: prediction = np.r_[prediction, prediction_] return prediction def prediction_per_pid_majority(pidlist_eval, prediction): """ make a prediction per pid using majority vote Returns: prediction_per_pid (ndarray): [pid, ans, prediction] """ prediction_per_pid = [] for pid_ in range(0, len(pidlist_eval[:, 0])): pid = pidlist_eval[pid_, 0] ans = pidlist_eval[pid_, 1] prediction_ = prediction[prediction[:, 1] == pid, :] # majority vote predCounter = Counter(prediction_[:, -1]) predMostCommon = predCounter.most_common(1) predLabel = predMostCommon[0][0] predRatio = predMostCommon[0][1] / prediction_.shape[0] * 100 prediction_per_pid.append([pid, ans, predLabel]) return np.array(prediction_per_pid) def calc_weight(confusionMatrixList): """ calculate weight (how trustworthy the prediction is) for majority vote. Note: Of all subjects we predicted are GO/OG/LB, what fraction of them actually are (precision) is used as weight. Args: confusionMarixList: list of confusion matrix of sentence-level classifiers. """ sentenceID_max = len(confusionMatrixList) weight = np.zeros((sentenceID_max, confusionMatrixList[0].shape[0])) for sentenceID in range(1, sentenceID_max+1): cm = confusionMatrixList[sentenceID-1] # normalized confusion matrix #rTotal = np.sum(cm, axis=1) #cm_normalized = cm / rTotal #weight[sentenceID-1, :] = np.diag(cm_normalized) true_positives = np.diag(cm) predicted = np.sum(cm, axis=0) weight[sentenceID-1, :] = true_positives / predicted return weight def prediction_per_pid_weighted(pidlist_eval, prediction, weight, LB_y, LE_y): """ make a prediction per pid using weighted (majority) vote. Args: weight (ndarray): how trustworthy the prediction of each sentence-based classifier is. LB_y: label binalizer, which is used to encode region names. LE_y: label encorder, which is used to encode region names. Returns: prediction_per_pid (ndarray): [pid, ans, prediction] """ prediction_per_pid = [] for pid_ in range(0, len(pidlist_eval[:, 0])): pid = pidlist_eval[pid_, 0] ans = pidlist_eval[pid_, 1] prediction_ = prediction[prediction[:, 1] == pid, :] # calculate weighted (majority) vote vote_weighted = np.zeros((1, 3)) for sentenceID_ in range(0, prediction_.shape[0]): sentenceID = prediction_[sentenceID_, 0].astype(int) w = weight[sentenceID-1, :] pred = prediction_[sentenceID_, 3] pred_int = LB_y.transform([pred]) vote_weighted = vote_weighted + w * pred_int # choose the most vote vote_weighted = vote_weighted[0] maxindex = list(vote_weighted).index(max(vote_weighted)) #predLabel = regionLabels[maxindex] predLabel = LE_y.inverse_transform(maxindex) prediction_per_pid.append([pid, ans, predLabel]) return np.array(prediction_per_pid) def groningen_vs_limburg(pidlist3): """convert a pidlist for 3 regions into that for 2 regions. Notes: 3 regions include ['Groningen_and_Drenthe', 'Limburg', 'Oost_Overijsel-Gelderland'] 2 regions include ['Groningen_and_Drenthe', 'Limburg'] """ regionLabels = ['Groningen_and_Drenthe', 'Oost_Overijsel-Gelderland', 'Limburg'] pidlist_groningen = pidlist3[pidlist3[:, 1] == regionLabels[0], :] pidlist_limburg = pidlist3[pidlist3[:, 1] == regionLabels[1], :] pidlist2 = np.r_[pidlist_groningen, pidlist_limburg] return pidlist2