Source code for DylMath

#!/usr/bin/python3.6
from typing import Dict, Tuple
import ROC1
import numpy as np
np.set_printoptions(threshold=np.inf)
np.seterr(all="ignore")
from multiprocessing import Pool
from scipy.interpolate import interp1d
from scipy.stats import norm
try:
	import matplotlib
	matplotlib.use('QT4Agg')
	import matplotlib.pyplot as plt
	font: dict = {'size' : 56}
	#matplotlib.rc('font', **font)
	from matplotlib.collections import PatchCollection
	from matplotlib.patches import Rectangle
except BaseException:
	pass
from DylData import continuousScale
unbiasedMeanMatrixVar = ROC1.unbiasedMeanMatrixVar
[docs]def paramToParams(predicted: list, D0: list=None, D1: list=None) -> Tuple[list, list, list]: """Takes one parameter and splits it into three if predicted is a 2d list.""" if isinstance(predicted[0], (list, tuple)): return predicted[0], predicted[1], predicted[2] else: return predicted, D0, D1
[docs]def auc(results: tuple, D0: list=None, D1: list=None) -> float: """ Takes an ROC curve from genROC and returns the AUC. If results is a prediction not an ROC curve, generates the ROC curve. If results is already an ROC curve, D0 and D1 are not required.""" if not isinstance(results[0], (list, tuple)): results: list = genROC(results, D0, D1) total: float = 0.0 for i,(x,y) in enumerate(results[:-1], start=1): # start=1 means i is actually i + 1 total += 0.5*(y + results[i][1]) * (x - results[i][0]) return abs(total)
[docs]def hanleyMcNeil(auc: float, n0: int, n1: int) -> float: """The very good power-law variance estimate from Hanley/McNeil.""" auc2=auc*auc q1=auc/(2.-auc) q2=2.*auc2/(1.+auc) return( (auc-auc2+(n1-1.)*(q1-auc2)+(n0-1.)*(q2-auc2))/n0/n1 )
[docs]def calcNLayers(arr: list) -> int: """Returns the number of layers that would be needed to sort. If arr is the a tuple or list, uses the length. If arr is already the length, uses that.""" if isinstance(arr, int): length: int = arr else: length: int = len(arr) return int(np.ceil(np.log2(length)))
[docs]def genSep(dist: str, auc: float) -> float: """Returns the sep parameter needed for the target AUC for the given distribution.""" if dist == 'exponential': return abs(auc/(1-auc)) elif dist == 'normal': return norm.ppf(auc)*(2**0.5) raise NotImplementedError("Cannot gen sep for that distribution")
[docs]def MSE(sep: float, dist: str, ROC: list, rocEmpiric: list=None) -> Tuple[float, float, float]: """Returns the MSE of the given ROC. If sep and dist are not None: the true ROC from sep and dist If rocEmpiric is not None: the MSE between the Empiric and ROC If sep and dist are None, the first value returned is always 0 The last value returned is always the AUC of the ROC""" step: float = 10**-4 fpf = np.arange(0, 1, step) if len(ROC) == 2: approx = interp1d(*((ROC['x'], ROC['y']) if isinstance(ROC, dict) else ROC))(fpf) else: approx = interp1d(*zip(*ROC))(fpf) if dist == 'exponential': mseTrue: float = np.mean((approx - (fpf**(1/sep)))**2) elif dist == 'normal': mseTrue: float = np.mean((approx - (1-norm.cdf(norm.ppf(1-fpf) - sep)))**2) else: mseTrue: float = 0.0 if rocEmpiric != None: if len(rocEmpiric) == 2: trueApprox = interp1d(rocEmpiric['x'], rocEmpiric['y']) else: trueApprox = interp1d(*zip(*rocEmpiric)) mseEmpiric: float = np.mean((approx - (trueApprox(fpf)))**2) else: mseEmpiric = None calcAUC: float = np.trapz(approx) / (1/step) return (mseTrue, calcAUC) if rocEmpiric == None else (mseTrue, mseEmpiric, calcAUC)
[docs]def genX0X1(predicted: tuple, D1: tuple=None, D0: tuple=None) -> Tuple[list, list]: """Generates x0 and x1 vectors out of the given parameters. D1 and D0 should never be smaller than the predicted array, but are often bigger.""" predicted, D0, D1 = paramToParams(predicted, D0, D1) x0, x1 = genD0D1((D0, D1), predicted) return np.array(x0), np.array(x1)
[docs]def genD0D1(d0d1: list, arr: list) -> tuple: """Generates filtered D0 and D1 vectors. d0d1 is (D0, D1) together as a tuple/list.""" D0, D1 = list(), list() for item in arr: if item in d0d1[0]: D0.append(item) elif item in d0d1[1]: D1.append(item) return D0, D1
[docs]def genROC(predicted: tuple, D1: list=None, D0: list=None) -> list: """Returns a list of collections of x,y coordinates in order of the threshold.""" predicted, D0, D1 = paramToParams(predicted, D0, D1) x0 = list() x1 = list() for i, val in enumerate(predicted): if val in D1: x1.append(i) elif val in D0: x0.append(i) roc = ROC1.rocxy(x1, x0) return list(zip(roc['x'], roc['y']))
[docs]def graphROC(predicted: tuple, D0: list=None, D1: list=None): """Generates and graphs a single ROC curve and displays the results.""" predicted, D0, D1 = paramToParams(predicted, D0, D1) fig = plt.figure(figsize=(4,4)) ax = fig.add_subplot(111) ax.plot(*zip(*genROC(predicted, D0, D1))) ax.plot((0,1),(0,1),c="r", linestyle="--") ax.set_ylim(top=1.1,bottom=-0.1) ax.set_xlim(left=-0.1,right=1.1) ax.set_title(f"AUC: {auc(predicted, D0, D1):.5f}") ax.set_xlabel("FPF") ax.set_ylabel("TPF") plt.show()
[docs]def graphROCs(arrays: list, withPatches: bool=False, withLine: bool=True, D0: list=None, D1: list=None): """Graphs a collection of array predictions. Takes the arrays as they would come out of DylSort sorts. If withPatches, puts a color coded success matrix behind the line. If withLine, graphs the line. Returns the plt handle, does not display the results.""" rows: int = int(np.ceil(np.sqrt(len(arrays)))) cols: int = int(np.ceil(len(arrays) / rows)) fig, axes = plt.subplots(rows, cols, sharex=True, sharey=True, num="plots") fig.suptitle("ROC curves") if withLine: params: list = [(array, D0, D1) for array in arrays] if len(arrays[0]) < 1024: results: list = list(map(genROC, params)) else: with Pool() as p: results: list = list(p.imap(genROC,params)) for i, ax in enumerate(axes.flat if (rows * cols > 1) else [axes]): if i >= len(arrays): continue ax.set(xlabel="False Positive Fraction", ylabel="True Positive Fraction") ax.label_outer() ax.plot((0,1),(0,1),c='red', linestyle=":") if withLine: ax.plot(*zip(*results[i]), c='blue') ax.set_ylim(top=1.02, bottom=0) ax.set_xlim(left=-0.01, right=1) if not withPatches: ax.set_title(f"Iteration #{i} AUC: {auc(results[i]):.5f}") if withPatches: sm: np.ndarray = successMatrix(arrays[i], D0, D1) yes: list = [] no: list = [] yLen: int = len(D1) xLen: int = len(D0) for (y,x), value in np.ndenumerate(sm): if value: yes.append(Rectangle((x/xLen,y/yLen),1/xLen,1/yLen)) else: no.append(Rectangle((x/xLen,y/yLen),1/xLen,1/yLen)) patches = PatchCollection(no, facecolor = 'r', alpha=0.75, edgecolor='None') ax.add_collection(patches) patches = PatchCollection(yes, facecolor = 'g', alpha=0.75, edgecolor='None') ax.add_collection(patches) area = len(yes) / (len(yes) + len(no)) ax.set_ylim(top=1, bottom=0) ax.set_xlim(left=0, right=1) ax.set_title(f"Iteration #{i} AUC: {area:.5f}") ax.set_aspect('equal', 'box') figManager = plt.get_current_fig_manager() figManager.window.showMaximized() #plt.show() return plt
[docs]def avROC(rocs: list) -> tuple: """Averages ROC curves. Rocs parameter are ROC curves from genROC.""" #hard coded SeSp #e = 9*sys.float_info.epsilon # convert [(x1, y1), (x2, y2) ...] into np array for better arithmatic rocs: list = [np.array(roc) for roc in rocs] rotrocs: list = [{'u': tuple((roc[:,0] + roc[:,1])/2), 'v': tuple((roc[:,1]-roc[:,0])/2)} for roc in rocs] stdA: list = list() for roc in rotrocs: stdA.extend(roc['u']) stdA: np.ndarray = np.array(sorted(set(stdA))) aprotrocs: np.ndarray = np.zeros((len(rotrocs), len(stdA))) for iRoc, roc in enumerate(rotrocs): inter = interp1d(roc['u'], roc['v']) for iU, u in enumerate(stdA): aprotrocs[iRoc][iU] = inter(u) ymean: np.ndarray = np.zeros((1, len(stdA))) for apro in aprotrocs: ymean += apro ymean /= len(aprotrocs) fpout: np.ndarray = stdA - ymean tpout: np.ndarray = stdA + ymean ret = tpout.tolist(), fpout.tolist() return ret[0][0], ret[1][0]
[docs]def successMatrix(predicted: list, D0: list, D1: list): """Creates the success matrix for the predicted ordering. Checks to make sure it got every entry filled.""" arr: np.ndarray = np.full((len(D1), len(D0)), -1) indecies: Dict[int] = dict() for val in D0 + D1: indecies[val] = predicted.index(val) for col, x in enumerate(reversed(D0)): for row, y in enumerate(reversed(D1)): arr[row, col] = indecies[x] < indecies[y] if -1 in arr: raise EnvironmentError("failed to create success matrix") return arr
[docs]def runStats(groups: list, params: list, comp) -> list: """Runs stats on the groups provided. Params parameter must be: ((d0d1), dist, targetAUC, n, currLayer, len(mergers))""" aucs, varOfSM, hanleyMcNeils, estimates = list(), list(), list(), list() d0d1, dist, targetAUC, n, *_ = params rocs: list = list() for group in groups: D0, D1 = genD0D1(d0d1, group) if D0 and D1: rocs.append(genROC(group, D0, D1)) sm: np.ndarray = successMatrix(group, D0, D1) auc: float = np.mean(sm) if auc == auc: aucs.append(auc) hanleyMcNeils.append((len(D0), len(D1))) smVAR: float = unbiasedMeanMatrixVar(sm) if smVAR == smVAR and len(D0) > 3 and len(D1) > 3: # if not NaN varOfSM.append(smVAR) rocs: list = list(filter(lambda roc: np.min(np.isfinite(roc)), rocs)) varOfAverageAUC = np.var(aucs, ddof=1) / len(aucs) aucs: np.ndarray = np.array(aucs) avgAUC: float = np.mean(aucs) estimateNs: list = [list()] for ns in hanleyMcNeils: estimateNs[0].append(ns) # while there are groups to 'merge' while len(estimateNs[-1]) != 1: # get the previous layer and sort by N0 + N1 oldNs: list = sorted(estimateNs[-1], key=sum) # roughly the same code as mergers creation estimateNs.append(list()) while oldNs: i: int = 0 toMerge: list = list() segments: int = min(n, len(oldNs) - i) for _ in range(segments): toMerge.append(oldNs.pop(0)) estimateNs[-1].append([sum((x[0] for x in toMerge)), sum((x[1] for x in toMerge))]) estimateNs[-1].sort(key=sum) estimates.append(hanleyMcNeil(avgAUC, estimateNs[-1][-1][0], estimateNs[-1][-1][1]) / len(estimateNs[-1])) for i, (N0, N1) in enumerate(hanleyMcNeils): hanleyMcNeils[i] = hanleyMcNeil(avgAUC, N0, N1) if len(varOfSM) == 0: varEstimate: float = float(varOfAverageAUC) else: varEstimate: float = (sum(varOfSM) / (len(varOfSM)**2)) avgROC: tuple = avROC(rocs) if dist != 0: # not a net comparator empiricROC: tuple = comp.empiricROC() sep: float = genSep(dist, float(targetAUC)) # float in case it's a string stats: list = [avgAUC, varEstimate, sum(hanleyMcNeils) / len(hanleyMcNeils)**2, estimates, *MSE(sep, dist, avgROC, empiricROC)[:2]] else: stats: list = [avgAUC, varEstimate, sum(hanleyMcNeils) / len(hanleyMcNeils)**2, estimates] return stats
if __name__ == "__main__": from DylSort import mergeSort test: int = 9 if test == 1: #print(D0, D1) newData, D0, D1 = continuousScale("sampledata.csv") print(auc(genROC(newData))) arrays: list = [newData[:]] for _ in mergeSort(newData): arrays.append(newData[:]) print(arrays) graphROCs(arrays) elif test == 3: predicted: list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19] print(np.mean(successMatrix(predicted, [*range(10)], [*range(10,20)]))) elif test == 4: arrays: list = [[0, 1, 4, 2, 5, 3, 6], [0, 1, 2, 4, 3, 5, 6], [0, 1, 2, 4, 3, 5, 6], [0, 1, 2, 3, 4, 5, 6]] graphROCs(arrays, D0=[0, 1, 2, 3], D1=[4, 5, 6]) elif test == 5: graphROC([4, 1, 2, 3], [1, 2], [3, 4]) elif test == 6: from DylSort import treeMergeSort from DylComp import Comparator from DylData import continuousScale import matplotlib font: dict = {'size' : 10} matplotlib.rc('font', **font) data, D0, D1 = continuousScale(128, 128) comp: Comparator = Comparator(data, rand=True, level=0, seed=15) for arr in treeMergeSort(data, comp=comp): pass D0.sort(key = comp.getLatentScore) D1.sort(key = comp.getLatentScore) roc: dict = ROC1.rocxy(comp.getLatentScore(D1), comp.getLatentScore(D0)) graphROCs([arr], True, True, D0, D1) elif test == 7: roc1: list = [[0, 0], [0, 1], [1, 1]] roc3 = roc2 = roc1 roc4: list = [[0, 0], [0.5, 0], [0.5, 0.5], [1, 1]] avgROC: tuple = avROC([roc1, roc2, roc3, roc4]) fig = plt.figure(figsize=(4,4)) ax = fig.add_subplot(111) ax.plot(*zip(*roc1), 'm', label='chunk1', ls='-') ax.plot(*zip(*roc2), 'b', label='chunk2', ls='--') ax.plot(*zip(*roc3), 'g', label='chunk3', ls=':') ax.plot(*zip(*roc4), 'c', label='chunk4') ax.plot(*avgROC, 'orange', label='avg') ax.plot((0,1),(0,1),c="r", linestyle="--") ax.set_ylim(top=1.1,bottom=-0.1) ax.set_xlim(left=-0.1,right=1.1) ax.set_xlabel("FPF") ax.set_ylabel("TPF") ax.legend() plt.show() elif test == 8: roc1: list = [[0,0],[0,0.05],[0,0.1],[0,0.15],[0,0.2],[0,0.25],[0,0.3],[0,0.35],[0,0.4],[0,0.45],[0.1,0.45],[0.1,0.5],[0.1,0.55],[0.1,0.6],[0.1,0.65],[0.2,0.65],[0.3,0.65],[0.3,0.7],[0.4,0.7],[0.5,0.7],[0.5,0.75],[0.5,0.8],[0.5,0.85],[0.5,0.9],[0.5,0.95],[0.5,1],[0.6,1],[0.7,1],[0.8,1],[0.9,1],[1,1]] roc2: list = [[0,0],[0,0.1],[0,0.2],[0,0.3],[0,0.4],[0,0.5],[0.06666667,0.5],[0.13333333,0.5],[0.2,0.5],[0.26666667,0.5],[0.26666667,0.6],[0.26666667,0.7],[0.33333333,0.7],[0.4,0.7],[0.4,0.8],[0.4,0.9],[0.4,1],[0.46666667,1],[0.53333333,1],[0.6,1],[0.66666667,1],[0.73333333,1],[0.8,1],[0.86666667,1],[0.93333333,1],[1,1]] avgROC: tuple = avROC([roc1, roc2]) fig = plt.figure(figsize=(4,4)) ax = fig.add_subplot(111) ax.plot(*zip(*roc1), 'm', label='chunk1', ls=':', marker='o') ax.plot(*zip(*roc2), 'b', label='chunk2', ls='--', marker='o') ax.plot(*avgROC, 'orange', label='avg', marker='o') ax.legend() plt.show() elif test == 9: from DylSort import treeMergeSort from DylComp import Comparator import matplotlib.pyplot as plt from time import time t1: float = time() data, D0, D1 = continuousScale(2048, 2048) comp: Comparator = Comparator(data, rand=True) comp.genRand(len(D0), len(D1), 7.72, 'exponential') fig = plt.figure() for level, groups in enumerate(treeMergeSort(data, comp, combGroups=False)): rocs: list = list() for group in groups: roc: list = genROC(group, D0, D1) rocs.append(roc) avgROC:tuple = avROC(rocs) rocs: list = list(zip(*avgROC)) rocs.reverse() mse: float = MSE(7.72, 'exponential', rocs) #print(*mse, auc(rocs)) print(f"{mse[0]:03.3e}, {auc(rocs):0.3f}, {len(comp)}") ax = fig.add_subplot(3, 4, level + 1) ax.set_aspect('equal', 'box') approx = interp1d(*zip(*rocs), 'linear') ax.plot(list(np.arange(0, 1 - 10**-4, 10**-4)), [approx(fp) for fp in np.arange(0, 1 - 10**-4, 10**-4)]) ax.plot(list(np.arange(0, 1 - 10**-4, 10**-4)), [fp**(1/7.72) for fp in np.arange(0, 1 - 10**-4, 10**-4)]) ax.set(title=f"{mse[0]:03.6e}:{len(comp)}") #plt.subplots_adjust(hspace=0.25) plt.show() print(time() - t1)