# Copyright 2022 Diagnostic Image Analysis Group, Radboudumc, Nijmegen, The Netherlands # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import numpy as np from sklearn.metrics import auc from scipy import ndimage try: import numpy.typing as npt except ImportError: # pragma: no cover pass # Parse Detection Maps to Individual Lesions + Likelihoods def parse_detection_map(detection_map): # Label All Non-Connected Components in Detection Map blobs_index, num_blobs = ndimage.label(detection_map, np.ones((3, 3, 3))) confidences = [] if num_blobs > 0: # For Each Lesion Detection for tumor_index in range(1, num_blobs+1): # Extract Mask of Current Lesion # hard_blob = np.zeros_like(blobs_index) # hard_blob[blobs_index == tumor] = 1 # TODO: replace above with the following? Is faster I think. # hard_blob = (blobs_index == tumor).astype(int) # Extract Max Predicted Likelihood for Lesion Detection # max_prob = np.max(hard_blob) # <- this is always 1 # hard_blob[hard_blob > 0] = max_prob # <- this line does nothing, as hard_blob is not used # Store Predicted Likelihood per Lesion Detection max_prob = detection_map[blobs_index == tumor_index].max() confidences.append((tumor_index, max_prob)) return confidences, blobs_index # Calculate Dice Similarity Coefficient (DSC) for N-D Arrays def calculate_dsc(predictions: "npt.NDArray[np.float32]", labels: "npt.NDArray[np.int32]") -> float: epsilon = 1e-8 dsc_num = np.sum(predictions[labels == 1]) * 2.0 dsc_denom = np.sum(predictions) + np.sum(labels) return float((dsc_num + epsilon) / (dsc_denom + epsilon)) # Calculate Intersection over Union (IoU) for N-D Arrays def calculate_iou(predictions: "npt.NDArray[np.float32]", labels: "npt.NDArray[np.int32]") -> float: epsilon = 1e-8 iou_num = np.sum(predictions[labels == 1]) iou_denom = np.sum(predictions) + np.sum(labels) - iou_num return float((iou_num + epsilon) / (iou_denom + epsilon)) # Calculate Operating Points for Curve def calculate_operating_points(y, x, op_match=None, verbose=1): """ Input: - y: (monotonically increasing) performance metric, such as the True Positive Rate - x: (monotonically increasing) performance metric, such as the False Positive Rate - op_match: dictionary that specifies the target operating point: { 'x': target x value, 'y': target y value } Returns: - dictionary with operating point(s): { 'op_closest_xy_y': y_op, # y value at operating point that matches both x and y of target operating point 'op_closest_xy_x': x_op, # x value at operating point that matches both x and y of target operating point ... } """ # TODO: currently, a lower sensitivity is preferrred over a higher sensitivity if that means the operating point is matched better. # Would be better to go for the best sensitivity/specificity, if that can be done without hurting the other performance metric. # In practice, this should not be an issue, as we have many points then. y = np.array(y) x = np.array(x) operating_points = {} if not np.all(np.diff(y) >= 0) and verbose: print("Warning: y performance metric is not monotonically increasing, this could lead to unexpected behaviour!") if not np.all(np.diff(x) >= 0) and verbose: print("Warning: x performance metric is not monotonically increasing, this could lead to unexpected behaviour!") # Grab Index of Intersection -> Compute y/TPR and x/FPR @ Index -> Store op_best_roc_idx = np.argmin(np.abs(y - (1 - x))) op_best_roc_y = y[op_best_roc_idx] op_best_roc_x = x[op_best_roc_idx] operating_points.update(dict( op_best_roc_idx=op_best_roc_idx, op_best_roc_y=op_best_roc_y, op_best_roc_x=op_best_roc_x )) if op_match is not None: # calculate operating point closest to target operating point abs_deficit_x, abs_deficit_y = None, None optional_x_keys = ['x', 'fpr', 'FPR'] optional_y_keys = ['y', 'tpr', 'TPR', 'sensitivity', 'sens'] # if the target x value is specified, calculate the difference between target and oberved value for key in optional_x_keys: if key in op_match: op_match_x = op_match[key] abs_deficit_x = np.abs(x - op_match_x) break # if the target y value is specified, calculate the difference between target and oberved value for key in optional_y_keys: if key in op_match: op_match_y = op_match[key] abs_deficit_y = np.abs(y - op_match_y) break # if both target x and y values are specified, calculate the difference between the target pair and observed pair # at the best match, store the observed x and y values if abs_deficit_x is not None and abs_deficit_y is not None: # determine the index of the the closest point to the target pair abs_deficit = abs_deficit_x + abs_deficit_y op_closest_xy_idx = np.argmin(abs_deficit) op_closest_xy_y = y[op_closest_xy_idx] op_closest_xy_x = x[op_closest_xy_idx] # store operating_points.update(dict( op_closest_xy_idx=op_closest_xy_idx, op_closest_xy_y=op_closest_xy_y, op_closest_xy_x=op_closest_xy_x )) # same for matching x only if abs_deficit_x is not None: # determine the index of the the closest point to the target value op_closest_x_idx = np.argmin(abs_deficit_x) op_closest_x_y = y[op_closest_x_idx] op_closest_x_x = x[op_closest_x_idx] # store operating_points.update(dict( op_closest_x_idx=op_closest_x_idx, op_closest_x_y=op_closest_x_y, op_closest_x_x=op_closest_x_x )) # same for matching y only if abs_deficit_y is not None: # determine the index of the the closest point to the target value op_closest_y_idx = np.argmin(abs_deficit_y) op_closest_y_y = y[op_closest_y_idx] op_closest_y_x = x[op_closest_y_idx] # store operating_points.update(dict( op_closest_y_idx=op_closest_y_idx, op_closest_y_y=op_closest_y_y, op_closest_y_x=op_closest_y_x )) return operating_points # Calculate Statistics for Multiple Curves def calculate_statistics(metrics, op_match=None, x_start=0., x_end=1., verbose=1): """ Calculate statistics, such as the area under the curve, for multiple (independent) curves. To calculate shared statistics, the curves must be translated to a shared x domain. To achieve this with virtually no loss of the step-like nature of curves like ROC and FROC, the shared x values are derived from the input, and offset with ± 1e-7. Input: - metrics should be a list of tuples with the y & x coordinates for each run: [([y1, y2, y3, ...], [x1, x2, x3]), # run 1 ([y1, y2, y3, ...], [x1, x2, x3]), # run 2 ] - op_match: { 'y': value of y metric (e.g., TPR/sensitivity) to match, 'x': value of x metric (e.g., FPR/false positive rate) to match, } Note: mean and 95% CI are calculated as function of the shared x. """ # construct the array of shared x values eps = 1e-10 x_shared = np.array([xi for _, x in metrics for xi in x], dtype=np.float64) # collect list of all possible x-values x_shared = np.ravel(x_shared) # flatten list, if necessary x_shared = np.append(x_shared, [x_start, x_end]) # add x_start and x_end to ensure correct pAUC calculation x_shared = np.concatenate((x_shared+eps, x_shared-eps)) x_shared = np.unique(x_shared) # only keep unique x values x_shared.sort() # sort in ascending order (inplace) # validate x_start and x_end assert x_start < x_end, f"x_start must be smaller than x_end! Got x_start={x_start} and x_end={x_end}." # convert the per-model y (e.g., TPR) vs x (e.g., FPR) to a shared domain y_shared_all = np.zeros(shape=(len(metrics), len(x_shared)), dtype=np.float32) auroc_all = [] individually_matched_operating_points = [] for i, (y, x) in enumerate(metrics): # if necessary, unpack x and y if len(y) == 1: y = y[0] if len(x) == 1: x = x[0] # interpolate the y values to the shared x values y_shared_domain = np.interp(x_shared, x, y) y_shared_all[i] = y_shared_domain # calculate AUROC for macro stats mask = (x_shared >= x_start) & (x_shared <= x_end) auc_score = auc(x_shared[mask], y_shared_domain[mask]) auroc_all += [auc_score] # match operating point for each run individually operating_points = calculate_operating_points(y=y, x=x, op_match=op_match) individually_matched_operating_points += [operating_points] # calculate statistics in shared domain y_shared_mean = np.mean(y_shared_all, axis=0) y_shared_std = np.std(y_shared_all, axis=0) y_shared_CI_lower = np.percentile(y_shared_all, 2.5, axis=0) y_shared_CI_higher = np.percentile(y_shared_all, 97.5, axis=0) auroc_mean = np.mean(auroc_all) auroc_std = np.std(auroc_all) # calculate operating points in shared domain operating_points = calculate_operating_points(y=y_shared_mean, x=x_shared, op_match=op_match, verbose=verbose) # collect results results = { # overview statistics 'auroc_mean': auroc_mean, 'auroc_std': auroc_std, 'auroc_all': auroc_all, # for plotting 'x_shared': x_shared, 'y_shared_all': y_shared_all, 'y_shared_mean': y_shared_mean, 'y_shared_std': y_shared_std, 'y_shared_CI_lower': y_shared_CI_lower, 'y_shared_CI_higher': y_shared_CI_higher, # individually matched operating point 'individually_matched_operating_points': individually_matched_operating_points, } results.update(operating_points) # calculate standard deviation of each metric (op_closest_xy_y, etc.) between individual runs individually_matched_operating_points_std = { f"{key}_std": np.std([ operating_point_info[key] for operating_point_info in individually_matched_operating_points ]) for key in individually_matched_operating_points[0].keys() } results.update(individually_matched_operating_points_std) return results # Calculate (partial) Area Under Curve (pAUC) using (x,y) coordinates from the given curve def calculate_pAUC_from_graph(x, y, pAUC_start: float = 0.0, pAUC_end: float = 1.0, full: bool = False): """ Input: For a single curve: - x: x values of a curve (e.g., the False Positive Rate points). [x1, x2, .., xn] - y: y values of a curve (e.g., the True Positive Rate points). [y1, y2, .., yn] For multiple curves: - list of x curves, for example the x values observed across multiple runs. [[x1m1, x2m1, .., xnm1], [x1m2, x2m2, ...., xnm2], ..] - list of y curves, for example the y values observed across multiple runs. [[y1m1, y2m1, .., ynm1], [y1m2, y2m2, ...., ynm2], ..] - pAUC_start: lower bound of x (e.g., FPR) to compute pAUC - pAUC_end: higher bound of x (e.g., FPR) to compute pAUC Returns: - if (full==False): List of pAUC values for each set of ([x1, ..], [y1, ..]) coordinates - if (full==True): Metrics as returned by `calculate_statistics` [see there] Note: function is not specific to the FROC curve """ if not isinstance(x[0], (list, np.ndarray)) or not isinstance(y[0], (list, np.ndarray)): # Have a single set of (x,y) coordinates assert not isinstance(x[0], (list, np.ndarray)) and not isinstance(y[0], (list, np.ndarray)), \ "Either provide multiple sequences of (x,y) coordinates, or a single sequence. Obtained a mix of both now. " # Pack coordinates in format expected by `calculate_statistics` coordinates_joined = [(y, x)] else: # Have multiple sets of (x,y) coordinates # Pack coordinates in format expected by `calculate_statistics` coordinates_joined = list(zip(y, x)) # Calculate AUC in Given Ranges results = calculate_statistics(metrics=coordinates_joined, x_start=pAUC_start, x_end=pAUC_end) if full: return results return results['auroc_all']