643 lines
28 KiB
Python
Executable File
643 lines
28 KiB
Python
Executable File
import argparse
|
|
import multiprocessing
|
|
import numpy as np
|
|
import SimpleITK as sitk
|
|
import time
|
|
import os
|
|
import itertools
|
|
import concurrent.futures
|
|
|
|
from typing import List
|
|
import sys
|
|
sys.path.append('./../code')
|
|
from utils_quintin import *
|
|
|
|
# from fastMRI_PCa.visualization.visualize import write_array2nifti_t2
|
|
from tensorflow.keras.models import load_model
|
|
from functools import partial
|
|
from focal_loss import BinaryFocalLoss
|
|
from scipy import ndimage
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from sklearn.metrics import roc_curve, auc
|
|
|
|
# from fastMRI_PCa.visualization import save_slice_3d
|
|
# from fastMRI_PCa.utils import read_yaml_to_dict, list_from_file, print_stats_np
|
|
# from fastMRI_PCa.utils import print_p, get_rand_exp_decay_mask
|
|
# from fastMRI_PCa.data import resample, center_crop, normalize, undersample, binarize_s
|
|
# from fastMRI_PCa.models import ssim_loss, ssim_metric, psnr_metric
|
|
|
|
from typing import List, Tuple, Dict, Any, Union, Optional, Callable, Iterable, Hashable, Sized
|
|
|
|
try:
|
|
import numpy.typing as npt
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
|
|
############################### PREPROCESSING BLOBS #########################
|
|
"""
|
|
Extract lesion candidates from a softmax prediction
|
|
Authors: anindox8, matinhz, joeranbosma
|
|
"""
|
|
|
|
|
|
# Preprocess Softmax Volume (Clipping, Max Confidence)
|
|
def preprocess_softmax_static(softmax: np.ndarray,
|
|
threshold: float = 0.10,
|
|
min_voxels_detection: int = 10,
|
|
max_prob_round_decimals: Optional[int] = 4) -> Tuple[np.ndarray, List[Tuple[int, float]], np.ndarray]:
|
|
# Load and Preprocess Softmax Image
|
|
all_hard_blobs = np.zeros_like(softmax)
|
|
confidences = []
|
|
clipped_softmax = softmax.copy()
|
|
clipped_softmax[softmax < threshold] = 0
|
|
blobs_index, num_blobs = ndimage.label(clipped_softmax, np.ones((3, 3, 3)))
|
|
|
|
if num_blobs > 0: # For Each Prediction
|
|
for tumor in range(1, num_blobs+1):
|
|
# determine mask for current lesion
|
|
hard_mask = np.zeros_like(blobs_index)
|
|
hard_mask[blobs_index == tumor] = 1
|
|
|
|
if np.count_nonzero(hard_mask) <= min_voxels_detection:
|
|
# remove tiny detection of <= 0.009 cm^3
|
|
blobs_index[hard_mask.astype(bool)] = 0
|
|
continue
|
|
|
|
# add sufficiently sized detection
|
|
hard_blob = hard_mask * clipped_softmax
|
|
max_prob = np.max(hard_blob)
|
|
if max_prob_round_decimals is not None:
|
|
max_prob = np.round(max_prob, max_prob_round_decimals)
|
|
hard_blob[hard_blob > 0] = max_prob
|
|
all_hard_blobs += hard_blob
|
|
confidences.append((tumor, max_prob))
|
|
return all_hard_blobs, confidences, blobs_index
|
|
|
|
|
|
def preprocess_softmax_dynamic(softmax: np.ndarray,
|
|
min_voxels_detection: int = 10,
|
|
num_lesions_to_extract: int = 5,
|
|
dynamic_threshold_factor: float = 2.5,
|
|
max_prob_round_decimals: Optional[int] = None,
|
|
remove_adjacent_lesion_candidates: bool = True,
|
|
max_prob_failsafe_stopping_threshold: float = 0.01) -> Tuple[np.ndarray, List[Tuple[int, float]], np.ndarray]:
|
|
"""
|
|
Generate detection proposals using a dynamic threshold to determine the location and size of lesions.
|
|
Author: Joeran Bosma
|
|
"""
|
|
working_softmax = softmax.copy()
|
|
dynamic_hard_blobs = np.zeros_like(softmax)
|
|
confidences: List[Tuple[int, float]] = []
|
|
dynamic_indexed_blobs = np.zeros_like(softmax, dtype=int)
|
|
|
|
while len(confidences) < num_lesions_to_extract:
|
|
tumor_index = 1 + len(confidences)
|
|
|
|
# determine max. softmax
|
|
max_prob = np.max(working_softmax)
|
|
|
|
if max_prob < max_prob_failsafe_stopping_threshold:
|
|
break
|
|
|
|
# set dynamic threshold to half the max
|
|
threshold = max_prob / dynamic_threshold_factor
|
|
|
|
# extract blobs for dynamix threshold
|
|
all_hard_blobs, _, _ = preprocess_softmax_static(working_softmax, threshold=threshold,
|
|
min_voxels_detection=min_voxels_detection,
|
|
max_prob_round_decimals=max_prob_round_decimals)
|
|
|
|
# select blob with max. confidence
|
|
# note: the max_prob is re-computed in the (unlikely) case that the max. prob
|
|
# was inside a 'lesion candidate' of less than min_voxels_detection, which is
|
|
# thus removed in preprocess_softmax_static.
|
|
max_prob = np.max(all_hard_blobs)
|
|
mask_current_lesion = (all_hard_blobs == max_prob)
|
|
|
|
# ensure that mask is only a single lesion candidate (this assumption fails when multiple lesions have the same max. prob)
|
|
mask_current_lesion_indexed, _ = ndimage.label(mask_current_lesion, np.ones((3, 3, 3)))
|
|
mask_current_lesion = (mask_current_lesion_indexed == 1)
|
|
|
|
# create mask with its confidence
|
|
hard_blob = (all_hard_blobs * mask_current_lesion)
|
|
|
|
# Detect whether the extractted mask is a ring/hollow sphere
|
|
# around an existing lesion candidate. For confident lesions,
|
|
# the surroundings of the prediction are still quite confident,
|
|
# and can become a second 'detection'. For an # example, please
|
|
# see extracted lesion candidates nr. 4 and 5 at:
|
|
# https://repos.diagnijmegen.nl/trac/ticket/9299#comment:49
|
|
# Detection method: grow currently extracted lesions by one voxel,
|
|
# and check if they overlap with the current extracted lesion.
|
|
extracted_lesions_grown = ndimage.morphology.binary_dilation(dynamic_hard_blobs > 0)
|
|
current_lesion_has_overlap = (mask_current_lesion & extracted_lesions_grown).any()
|
|
|
|
# Check if lesion candidate should be retained
|
|
if (not remove_adjacent_lesion_candidates) or (not current_lesion_has_overlap):
|
|
# store extracted lesion
|
|
dynamic_hard_blobs += hard_blob
|
|
confidences += [(tumor_index, max_prob)]
|
|
dynamic_indexed_blobs += (mask_current_lesion * tumor_index)
|
|
|
|
# remove extracted lesion from working-softmax
|
|
working_softmax = (working_softmax * (~mask_current_lesion))
|
|
|
|
return dynamic_hard_blobs, confidences, dynamic_indexed_blobs
|
|
|
|
|
|
def preprocess_softmax(softmax: np.ndarray,
|
|
threshold: Union[str, float] = 0.10,
|
|
min_voxels_detection: int = 10,
|
|
num_lesions_to_extract: int = 5,
|
|
dynamic_threshold_factor: float = 2.5,
|
|
max_prob_round_decimals: Optional[int] = None,
|
|
remove_adjacent_lesion_candidates: bool = True) -> Tuple[np.ndarray, List[Tuple[int, float]], np.ndarray]:
|
|
"""
|
|
Generate detection proposals using a dynamic or static threshold to determine the size of lesions.
|
|
"""
|
|
if threshold == 'dynamic':
|
|
all_hard_blobs, confidences, indexed_pred = preprocess_softmax_dynamic(softmax, min_voxels_detection=min_voxels_detection,
|
|
dynamic_threshold_factor=dynamic_threshold_factor,
|
|
num_lesions_to_extract=num_lesions_to_extract,
|
|
remove_adjacent_lesion_candidates=remove_adjacent_lesion_candidates,
|
|
max_prob_round_decimals=max_prob_round_decimals)
|
|
elif threshold == 'dynamic-fast':
|
|
# determine max. softmax and set a per-case 'static' threshold based on that
|
|
max_prob = np.max(softmax)
|
|
threshold = float(max_prob / dynamic_threshold_factor)
|
|
all_hard_blobs, confidences, indexed_pred = preprocess_softmax_static(softmax, threshold=threshold,
|
|
min_voxels_detection=min_voxels_detection,
|
|
max_prob_round_decimals=max_prob_round_decimals)
|
|
else:
|
|
threshold = float(threshold) # convert threshold to float, if it wasn't already
|
|
all_hard_blobs, confidences, indexed_pred = preprocess_softmax_static(softmax, threshold=threshold,
|
|
min_voxels_detection=min_voxels_detection,
|
|
max_prob_round_decimals=max_prob_round_decimals)
|
|
|
|
return all_hard_blobs, confidences, indexed_pred
|
|
|
|
|
|
################################################################################
|
|
|
|
|
|
# Evaluate all cases
|
|
def evaluate(
|
|
y_true: np.ndarray,
|
|
y_pred: np.ndarray,
|
|
min_overlap=0.10,
|
|
overlap_func: str = 'DSC',
|
|
case_confidence: str = 'max',
|
|
multiple_lesion_candidates_selection_criteria='overlap',
|
|
allow_unmatched_candidates_with_minimal_overlap=True,
|
|
flat: Optional[bool] = None
|
|
) -> Dict[str, Any]:
|
|
|
|
# Make list out of numpy array so that it can be mapped in parallel with multiple CPUs.
|
|
y_true_list = y_true #[y_true[mri_idx] for mri_idx in range(y_true.shape[0])]
|
|
y_pred_list = y_pred #[y_pred[mri_idx] for mri_idx in range(y_pred.shape[0])]
|
|
|
|
roc_true, roc_pred = {}, {}
|
|
y_list = []
|
|
num_lesions = 0
|
|
|
|
subject_idxs = list(range(len(y_true_list)))
|
|
N_CPUS = 2
|
|
with ThreadPoolExecutor(max_workers=N_CPUS) as pool:
|
|
# define the functions that need to be processed: compute_pred_vector, with each individual
|
|
# detection_map prediction, ground truth label and parameters
|
|
future_to_args = {
|
|
pool.submit(evaluate_case,
|
|
y_true,
|
|
y_pred,
|
|
min_overlap=min_overlap,
|
|
overlap_func=overlap_func,
|
|
multiple_lesion_candidates_selection_criteria=multiple_lesion_candidates_selection_criteria,
|
|
allow_unmatched_candidates_with_minimal_overlap=allow_unmatched_candidates_with_minimal_overlap): idx
|
|
for (y_true, y_pred, idx) in zip(y_true_list, y_pred_list, subject_idxs)
|
|
}
|
|
|
|
# process the cases in parallel
|
|
iterator = concurrent.futures.as_completed(future_to_args)
|
|
iterator = tqdm(iterator, desc='Computing FROC', total=len(y_true_list))
|
|
for future in iterator:
|
|
try:
|
|
res = future.result()
|
|
except Exception as e:
|
|
print(f"Exception: {e}")
|
|
else:
|
|
# unpack results
|
|
y_list_pat, num_lesions_gt = res
|
|
# note: y_list_pat contains: is_lesion, confidence[, Dice, gt num voxels]
|
|
|
|
# aggregate results
|
|
idx = future_to_args[future]
|
|
roc_true[idx] = np.max([a[0] for a in y_list_pat])
|
|
if case_confidence == 'max':
|
|
# take highest lesion confidence as case-level confidence
|
|
roc_pred[idx] = np.max([a[1] for a in y_list_pat])
|
|
elif case_confidence == 'bayesian':
|
|
# if a_i is the probability the i-th lesion is csPCa, then the case-level
|
|
# probability to have any csPCa lesion is 1 - Π_i{ 1 - a_i}
|
|
roc_pred[idx] = 1 - np.prod([(1-a[1]) for a in y_list_pat])
|
|
else:
|
|
raise ValueError(f"Patient confidence calculation method not recognised. Got: {case_confidence}.")
|
|
|
|
# accumulate outputs
|
|
y_list += y_list_pat
|
|
num_lesions += num_lesions_gt
|
|
|
|
# calculate statistics
|
|
num_patients = len(roc_true)
|
|
|
|
# get lesion-level results
|
|
sensitivity, FP_per_case, thresholds, num_lesions = froc_from_lesion_evaluations(
|
|
y_list=y_list, num_patients=num_patients
|
|
)
|
|
|
|
# calculate recall, precision and average precision
|
|
AP, precision, recall, _ = ap_from_lesion_evaluations(y_list, thresholds=thresholds)
|
|
|
|
# calculate case-level AUROC
|
|
fpr, tpr, _ = roc_curve(y_true=[roc_true[s] for s in subject_idxs],
|
|
y_score=[roc_pred[s] for s in subject_idxs],
|
|
pos_label=1)
|
|
auc_score = auc(fpr, tpr)
|
|
|
|
flat = True
|
|
if flat:
|
|
# flatten roc_true and roc_pred
|
|
roc_true_flat = [roc_true[s] for s in subject_idxs]
|
|
roc_pred_flat = [roc_pred[s] for s in subject_idxs]
|
|
|
|
metrics = {
|
|
"FP_per_case": convert_np_to_list(FP_per_case),
|
|
"sensitivity": convert_np_to_list(sensitivity),
|
|
"thresholds": convert_np_to_list(thresholds),
|
|
"num_lesions": int(num_lesions),
|
|
"num_patients": int(num_patients),
|
|
"roc_true": convert_np_to_list(roc_true_flat if flat else roc_true),
|
|
"roc_pred": convert_np_to_list(roc_pred_flat if flat else roc_pred),
|
|
"AP": float(AP),
|
|
"precision": convert_np_to_list(precision),
|
|
"recall": convert_np_to_list(recall),
|
|
|
|
# patient-level predictions
|
|
'auroc': float(auc_score),
|
|
'tpr': convert_np_to_list(tpr),
|
|
'fpr': convert_np_to_list(fpr),
|
|
}
|
|
|
|
return metrics
|
|
|
|
|
|
def convert_np_to_list(flat_numpy_arr):
|
|
ans = []
|
|
for elem in flat_numpy_arr:
|
|
ans.append(float(elem))
|
|
return ans
|
|
|
|
|
|
# Calculate FROC metrics (FP rate, sensitivity)
|
|
def froc_from_lesion_evaluations(y_list, num_patients, thresholds=None):
|
|
# calculate counts
|
|
TP, FP, thresholds, num_lesions = counts_from_lesion_evaluations(
|
|
y_list=y_list, thresholds=thresholds
|
|
)
|
|
|
|
# calculate FROC metrics from counts
|
|
sensitivity = TP / num_lesions if num_lesions > 0 else np.nan
|
|
FP_per_case = FP / num_patients
|
|
|
|
return sensitivity, FP_per_case, thresholds, num_lesions
|
|
|
|
|
|
def ap_from_lesion_evaluations(y_list, thresholds=None):
|
|
# calculate counts
|
|
TP, FP, thresholds, num_lesions = counts_from_lesion_evaluations(
|
|
y_list=y_list, thresholds=thresholds
|
|
)
|
|
|
|
# calculate precision (lesion-level)
|
|
precision = TP / (TP + FP)
|
|
precision = np.append(precision, [0])
|
|
|
|
# calculate recall (lesion-level)
|
|
FN = num_lesions - TP
|
|
recall = TP / (TP + FN)
|
|
recall = np.append(recall, recall[-1:])
|
|
|
|
# calculate average precission (lesion-level)
|
|
AP = np.trapz(y=precision, x=recall)
|
|
|
|
return AP, precision, recall, thresholds
|
|
|
|
|
|
# Calculate macro metrics (true positives (TP), false positives (FP))
|
|
def counts_from_lesion_evaluations(
|
|
y_list: List[Tuple[int, float, float]],
|
|
thresholds: "Optional[npt.NDArray[np.float64]]" = None
|
|
) -> "Tuple[npt.NDArray[np.float32], npt.NDArray[np.float32], npt.NDArray[np.float64], int]":
|
|
"""
|
|
Calculate true positives (TP) and false positives (FP) as function of threshold,
|
|
based on the case evaluations from `evaluate_case`.
|
|
"""
|
|
# sort predictions
|
|
y_list.sort()
|
|
|
|
# collect targets and predictions
|
|
y_true: "npt.NDArray[np.float64]" = np.array([target for target, *_ in y_list])
|
|
y_pred: "npt.NDArray[np.float64]" = np.array([pred for _, pred, *_ in y_list])
|
|
|
|
# calculate number of lesions
|
|
num_lesions = y_true.sum()
|
|
|
|
if thresholds is None:
|
|
# collect thresholds for FROC analysis
|
|
thresholds = np.unique(y_pred)
|
|
thresholds[::-1].sort() # sort thresholds in descending order (inplace)
|
|
|
|
# for >10,000 thresholds: resample to 10,000 unique thresholds, while also
|
|
# keeping all thresholds higher than 0.8 and the first 20 thresholds
|
|
if len(thresholds) > 10_000:
|
|
rng = np.arange(1, len(thresholds), len(thresholds)/10_000, dtype=np.int32)
|
|
st = [thresholds[i] for i in rng]
|
|
low_thresholds = thresholds[-20:]
|
|
thresholds = np.array([t for t in thresholds if t > 0.8 or t in st or t in low_thresholds])
|
|
|
|
# define placeholders
|
|
FP: "npt.NDArray[np.float32]" = np.zeros_like(thresholds, dtype=np.float32)
|
|
TP: "npt.NDArray[np.float32]" = np.zeros_like(thresholds, dtype=np.float32)
|
|
|
|
# for each threshold: count FPs and calculate the sensitivity
|
|
for i, th in enumerate(thresholds):
|
|
if th > 0:
|
|
y_pred_thresholded = (y_pred >= th).astype(int)
|
|
tp = np.sum(y_true*y_pred_thresholded)
|
|
fp = np.sum(y_pred_thresholded - y_true*y_pred_thresholded)
|
|
|
|
# update FROC with new point
|
|
FP[i] = fp
|
|
TP[i] = tp
|
|
else:
|
|
# extend FROC curve to infinity
|
|
# TP[i] = TP[-2] #note: aangepast stefan 11-04-2022
|
|
TP[i] = 1
|
|
FP[i] = np.inf
|
|
|
|
return TP, FP, thresholds, num_lesions
|
|
|
|
|
|
# Compute base prediction metrics TP/FP/FN with associated model confidences
|
|
def evaluate_case(
|
|
y_true: np.ndarray,
|
|
y_pred: np.ndarray,
|
|
min_overlap: float = 0.10,
|
|
overlap_func: str = 'DSC',
|
|
multiple_lesion_candidates_selection_criteria: str = 'overlap',
|
|
allow_unmatched_candidates_with_minimal_overlap: bool = True
|
|
) -> Tuple[List[Tuple[int, float, float]], int]:
|
|
"""
|
|
Gather the list of lesion candidates, and classify in TP/FP/FN.
|
|
- multiple_lesion_candidates_selection_criteria: when multiple lesion candidates have overlap with the same
|
|
ground truth mask, use 'overlap' or 'confidence' to choose
|
|
which lesion is matched against the ground truth mask.
|
|
|
|
Returns:
|
|
- a list of tuples with:
|
|
(is_lesion, prediction confidence, Dice similarity coefficient, number of voxels in label)
|
|
- number of ground truth lesions
|
|
"""
|
|
y_list: List[Tuple[int, float, float]] = []
|
|
|
|
if overlap_func == 'IoU':
|
|
overlap_func = calculate_iou
|
|
elif overlap_func == 'DSC':
|
|
overlap_func = calculate_dsc
|
|
else:
|
|
raise ValueError(f"Overlap function with name {overlap_func} not recognized. Supported are 'IoU' and 'DSC'")
|
|
|
|
# convert dtype to float32
|
|
y_true = y_true.astype('int32')
|
|
y_pred = y_pred.astype('float32')
|
|
|
|
if y_pred.shape[0] < y_true.shape[0]:
|
|
print("Warning: padding prediction to match label!")
|
|
y_pred = resize_image_with_crop_or_pad(y_pred, y_true.shape)
|
|
|
|
|
|
confidences, indexed_pred = parse_detection_map(y_pred)
|
|
|
|
lesion_candidates_best_overlap: Dict[str, float] = {}
|
|
|
|
if y_true.any():
|
|
# for each malignant scan
|
|
labeled_gt, num_gt_lesions = ndimage.label(y_true, np.ones((3, 3, 3)))
|
|
|
|
for lesiong_id in range(1, num_gt_lesions+1):
|
|
# for each lesion in ground-truth (GT) label
|
|
gt_lesion_mask = (labeled_gt == lesiong_id)
|
|
|
|
# collect indices of lesion candidates that have any overlap with the current GT lesion
|
|
overlapping_lesion_candidate_indices = set(np.unique(indexed_pred[gt_lesion_mask]))
|
|
overlapping_lesion_candidate_indices -= {0} # remove index 0, if present
|
|
|
|
# collect lesion candidates for current GT lesion
|
|
lesion_candidates_for_target_gt: List[Dict[str, Union[int, float]]] = []
|
|
for lesion_candidate_id, lesion_confidence in confidences:
|
|
if lesion_candidate_id in overlapping_lesion_candidate_indices:
|
|
# calculate overlap between lesion candidate and GT mask
|
|
lesion_pred_mask = (indexed_pred == lesion_candidate_id)
|
|
overlap_score = overlap_func(lesion_pred_mask, gt_lesion_mask)
|
|
|
|
# keep track of the highest overlap a lesion candidate has with any GT lesion
|
|
lesion_candidates_best_overlap[lesion_candidate_id] = max(
|
|
overlap_score, lesion_candidates_best_overlap.get(lesion_candidate_id, 0)
|
|
)
|
|
|
|
# store lesion candidate info for current GT mask
|
|
lesion_candidates_for_target_gt.append({
|
|
'id': lesion_candidate_id,
|
|
'confidence': lesion_confidence,
|
|
'overlap': overlap_score,
|
|
})
|
|
print(lesion_candidates_for_target_gt)
|
|
|
|
if len(lesion_candidates_for_target_gt) == 0:
|
|
# no lesion candidate matched with GT mask. Add FN.
|
|
y_list.append((1, 0., 0.))
|
|
elif len(lesion_candidates_for_target_gt) == 1:
|
|
# single lesion candidate overlapped with GT mask. Add TP if overlap is sufficient, or FN otherwise.
|
|
candidate_info = lesion_candidates_for_target_gt[0]
|
|
lesion_pred_mask = (indexed_pred == candidate_info['id'])
|
|
|
|
if candidate_info['overlap'] > min_overlap:
|
|
# overlap between lesion candidate and GT mask is sufficient, add TP
|
|
indexed_pred[lesion_pred_mask] = 0 # remove lesion candidate after assignment
|
|
y_list.append((1, candidate_info['confidence'], candidate_info['overlap']))
|
|
else:
|
|
# overlap between lesion candidate and GT mask is insufficient, add FN
|
|
y_list.append((1, 0., 0.))
|
|
else:
|
|
# multiple predictions for current GT lesion
|
|
# sort lesion candidates based on overlap or confidence
|
|
key = multiple_lesion_candidates_selection_criteria
|
|
lesion_candidates_for_target_gt = sorted(lesion_candidates_for_target_gt, key=lambda x: x[key], reverse=True)
|
|
|
|
gt_lesion_matched = False
|
|
for candidate_info in lesion_candidates_for_target_gt:
|
|
lesion_pred_mask = (indexed_pred == candidate_info['id'])
|
|
|
|
if candidate_info['overlap'] > min_overlap:
|
|
indexed_pred[lesion_pred_mask] = 0
|
|
y_list.append((1, candidate_info['confidence'], candidate_info['overlap']))
|
|
gt_lesion_matched = True
|
|
break
|
|
|
|
if not gt_lesion_matched:
|
|
# ground truth lesion not matched to a lesion candidate. Add FN.
|
|
y_list.append((1, 0., 0.))
|
|
|
|
# Remaining lesions are FPs
|
|
remaining_lesions = set(np.unique(indexed_pred))
|
|
remaining_lesions -= {0} # remove index 0, if present
|
|
for lesion_candidate_id, lesion_confidence in confidences:
|
|
if lesion_candidate_id in remaining_lesions:
|
|
overlap_score = lesion_candidates_best_overlap.get(lesion_candidate_id, 0)
|
|
if allow_unmatched_candidates_with_minimal_overlap and overlap_score > min_overlap:
|
|
# The lesion candidate was not matched to a GT lesion, but did have overlap > min_overlap
|
|
# with a GT lesion. The GT lesion is, however, matched to another lesion candidate.
|
|
# In this operation mode, this lesion candidate is not considered as a false positive.
|
|
pass
|
|
else:
|
|
y_list.append((0, lesion_confidence, 0.)) # add FP
|
|
|
|
else:
|
|
# for benign case, all predictions are FPs
|
|
num_gt_lesions = 0
|
|
if len(confidences) > 0:
|
|
for _, lesion_confidence in confidences:
|
|
y_list.append((0, lesion_confidence, 0.))
|
|
else:
|
|
y_list.append((0, 0., 0.)) # avoid empty list
|
|
|
|
return y_list, num_gt_lesions
|
|
|
|
|
|
# Calculate Intersection over Union (IoU) for N-D Arrays
|
|
def calculate_iou(predictions: "npt.NDArray[np.float32]", labels: "npt.NDArray[np.int32]") -> float:
|
|
epsilon = 1e-8
|
|
iou_num = np.sum(predictions[labels == 1])
|
|
iou_denom = np.sum(predictions) + np.sum(labels) - iou_num
|
|
return float((iou_num + epsilon) / (iou_denom + epsilon))
|
|
|
|
|
|
# Calculate Dice Similarity Coefficient (DSC) for N-D Arrays
|
|
def calculate_dsc(predictions: "npt.NDArray[np.float32]", labels: "npt.NDArray[np.int32]") -> float:
|
|
epsilon = 1e-8
|
|
dsc_num = np.sum(predictions[labels == 1]) * 2.0
|
|
dsc_denom = np.sum(predictions) + np.sum(labels)
|
|
return float((dsc_num + epsilon) / (dsc_denom + epsilon))
|
|
|
|
|
|
# Resize images (scans/predictions/labels) by cropping and/or padding [Ref: https://github.com/DLTK/DLTK]
|
|
def resize_image_with_crop_or_pad(image, img_size=(64, 64, 64), **kwargs):
|
|
assert isinstance(image, np.ndarray)
|
|
assert (image.ndim - 1 == len(img_size) or image.ndim == len(img_size)), \
|
|
"Target size doesn't fit image size"
|
|
|
|
rank = len(img_size) # image dimensions
|
|
|
|
# placeholders for new shape
|
|
from_indices = [[0, image.shape[dim]] for dim in range(rank)]
|
|
to_padding = [[0, 0] for _ in range(rank)]
|
|
slicer = [slice(None)] * rank
|
|
|
|
# for each dimension, determine process (cropping or padding)
|
|
for i in range(rank):
|
|
if image.shape[i] < img_size[i]:
|
|
to_padding[i][0] = (img_size[i] - image.shape[i]) // 2
|
|
to_padding[i][1] = img_size[i] - image.shape[i] - to_padding[i][0]
|
|
else:
|
|
from_indices[i][0] = int(np.floor((image.shape[i] - img_size[i]) / 2.))
|
|
from_indices[i][1] = from_indices[i][0] + img_size[i]
|
|
|
|
# create slicer object to crop/leave each dimension
|
|
slicer[i] = slice(from_indices[i][0], from_indices[i][1])
|
|
|
|
# pad cropped image to extend missing dimension
|
|
return np.pad(image[tuple(slicer)], to_padding, **kwargs)
|
|
|
|
|
|
# Parse Detection Maps to Individual Lesions + Likelihoods
|
|
def parse_detection_map(detection_map):
|
|
# Label All Non-Connected Components in Detection Map
|
|
blobs_index, num_blobs = ndimage.label(detection_map, np.ones((3, 3, 3)))
|
|
|
|
confidences = []
|
|
if num_blobs > 0:
|
|
# For Each Lesion Detection
|
|
for tumor_index in range(1, num_blobs+1):
|
|
|
|
# Extract Mask of Current Lesion
|
|
# hard_blob = np.zeros_like(blobs_index)
|
|
# hard_blob[blobs_index == tumor] = 1
|
|
# TODO: replace above with the following? Is faster I think.
|
|
# hard_blob = (blobs_index == tumor).astype(int)
|
|
|
|
# Extract Max Predicted Likelihood for Lesion Detection
|
|
# max_prob = np.max(hard_blob) # <- this is always 1
|
|
# hard_blob[hard_blob > 0] = max_prob # <- this line does nothing, as hard_blob is not used
|
|
|
|
# Store Predicted Likelihood per Lesion Detection
|
|
max_prob = detection_map[blobs_index == tumor_index].max()
|
|
confidences.append((tumor_index, max_prob))
|
|
return confidences, blobs_index
|
|
|
|
############################### CONSTANTS ####################################
|
|
# Quitin code removed
|
|
############################### CONSTANTS ####################################
|
|
|
|
def move_dims(arr):
|
|
# UMCG numpy dimensions convention: dims = (batch, width, heigth, depth)
|
|
# Joeran numpy dimensions convention: dims = (batch, depth, heigth, width)
|
|
arr = np.moveaxis(arr, 3, 1) # Joeran has his numpy arrays ordered differently.
|
|
arr = np.moveaxis(arr, 3, 2)
|
|
return arr
|
|
|
|
############################### CONSTANTS ####################################
|
|
|
|
|
|
|
|
|
|
# # Reconstruction model directory
|
|
# RECON_DIR = "11_new_recon_multiprocess"
|
|
# T2_MODEL_PATH = f"models/prelims/{RECON_DIR}/best-direct-fold0.h5"
|
|
|
|
# # Path to file with train, validation and test indexes.
|
|
# IDX_YAML_PATH = r"data/path_lists/pirads_4plus/train_val_test_idxs.yml"
|
|
|
|
# # Diagnostic models
|
|
# diag_models = [r"models/14_long_diag/best-direct-fold0.h5",
|
|
# r"models/15_long_diag_early_stopping/best-direct-fold0.h5"]
|
|
|
|
|
|
# # x_true_val = np.load("temp/x_true_val.npy")
|
|
# print("loading numpy arrays")
|
|
# y_true_val = np.squeeze(np.load("temp/y_true_val_label.npy"))
|
|
# y_pred_val = np.squeeze(np.load("temp/y_pred_val_detection_map.npy"))
|
|
# y_true_val = move_dims(y_true_val)
|
|
# y_pred_val = move_dims(y_pred_val)
|
|
|
|
# # Preprocess the blobs individually
|
|
# for mri_idx in range(y_pred_val.shape[0]):
|
|
# print(f"preprocessing mri idx {mri_idx}")
|
|
# y_pred_val[mri_idx] = preprocess_softmax(y_pred_val[mri_idx], threshold="dynamic")[0]
|
|
|
|
# print(f"Start FROC evaluation.")
|
|
# metrics = evaluate(y_true=y_true_val, y_pred=y_pred_val)
|
|
|
|
# dump_dict_to_yaml(metrics, 'temp', "froc_metrics", verbose=True) |