mirror of
https://github.com/msberends/AMR.git
synced 2026-05-14 03:50:49 +02:00
* Add parallel computing support to antibiogram() and wisca() (#281) For WISCA: simulations are distributed across (group, chunk) job pairs via future.apply::future_lapply(), keeping all workers active even when the regimen count is smaller than nbrOfWorkers(). Sequential fallback with progress ticker is preserved when parallel = FALSE or workers = 1. For grouped antibiograms: each group is processed by a separate worker, mirroring the row-batch approach in as.sir(). Same gate pattern as as.sir() (PR #280): requires a non-sequential future::plan() to be active; auto-upgrades to parallel = TRUE when a parallel plan is detected; throws an informative error otherwise. https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF * Fix version to 3.0.1.9055 and update CLAUDE.md version formula Uses origin/${defaultbranch} (with a fetch) instead of the local branch ref so the commit count is never stale after a merge. https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF * Fix non-ASCII characters in antibiogram.R Replace en/em dashes and non-breaking spaces with ASCII equivalents to satisfy R CMD check portability requirement. https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF * Update auto-generated Rd files after documentation rebuild https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF * Move parallel gate to top of antibiogram.default() like sir.R The gate was inside the wisca==TRUE block, so parallel=TRUE with a sequential plan was silently ignored for non-WISCA antibiograms. Now the gate runs unconditionally at the top of the function, identical to the as.sir() pattern: error on explicit parallel=TRUE with sequential plan, auto-upgrade when a non-sequential plan is already active. https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF * Fix parallel WISCA returning all NA; strengthen tests; add sequential hint Bug: lapply() over a factor yields length-1 factor elements (integer codes), while for() over a factor yields character strings. The job list stored j\$group as a factor integer, but the reassembly loop compared it with identical(j\$group, g) where g was character -- always FALSE, so no simulation chunks were ever assembled and coverage stayed NA throughout. Fix: convert unique_groups to character before building jobs so both the job list and the reassembly loop use the same type. Tests: replaced na.rm = TRUE guards with explicit anyNA() checks so the test suite would have caught the all-NA result immediately. Also adds a sequential-mode performance hint (analogous to sir.R lines 1116-1127) when simulations >= 500 and >= 3 regimens. https://claude.ai/code/session_01FC43syPbzhGmKgrrVNHjnF --------- Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
261
R/antibiogram.R
261
R/antibiogram.R
@@ -54,7 +54,7 @@
|
||||
#' @param add_total_n *(deprecated in favour of `formatting_type`)* A [logical] to indicate whether `n_tested` available numbers per pathogen should be added to the table (default is `TRUE`). This will add the lowest and highest number of available isolates per antimicrobial (e.g, if for *E. coli* 200 isolates are available for ciprofloxacin and 150 for amoxicillin, the returned number will be "150-200"). This option is unavailable when `wisca = TRUE`; in that case, use [retrieve_wisca_parameters()] to get the parameters used for WISCA.
|
||||
#' @param only_all_tested (for combination antibiograms): a [logical] to indicate that isolates must be tested for all antimicrobials, see *Details*.
|
||||
#' @param digits Number of digits to use for rounding the antimicrobial coverage, defaults to 1 for WISCA and 0 otherwise.
|
||||
#' @param formatting_type Numeric value (1–22 for WISCA, 1-12 for non-WISCA) indicating how the 'cells' of the antibiogram table should be formatted. See *Details* > *Formatting Type* for a list of options.
|
||||
#' @param formatting_type Numeric value (1-22 for WISCA, 1-12 for non-WISCA) indicating how the 'cells' of the antibiogram table should be formatted. See *Details* > *Formatting Type* for a list of options.
|
||||
#' @param col_mo Column name of the names or codes of the microorganisms (see [as.mo()]) - the default is the first column of class [`mo`]. Values will be coerced using [as.mo()].
|
||||
#' @param language Language to translate text, which defaults to the system language (see [get_AMR_locale()]).
|
||||
#' @param minimum The minimum allowed number of available (tested) isolates. Any isolate count lower than `minimum` will return `NA` with a warning. The default number of `30` isolates is advised by the Clinical and Laboratory Standards Institute (CLSI) as best practice, see *Source*.
|
||||
@@ -65,6 +65,7 @@
|
||||
#' @param simulations (for WISCA) a numerical value to set the number of Monte Carlo simulations.
|
||||
#' @param conf_interval A numerical value to set confidence interval (default is `0.95`).
|
||||
#' @param interval_side The side of the confidence interval, either `"two-tailed"` (default), `"left"` or `"right"`.
|
||||
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. For WISCA, Monte Carlo simulations are distributed across workers; for grouped antibiograms, each group is processed by a separate worker. **A non-sequential [future::plan()] must already be active before setting `parallel = TRUE`** -- for example, `future::plan(future::multisession)`. An error is thrown if `parallel = TRUE` is used without a plan set by the user.
|
||||
#' @param info A [logical] to indicate info should be printed - the default is `TRUE` only in interactive mode.
|
||||
#' @param object An [antibiogram()] object.
|
||||
#' @param ... When used in [R Markdown or Quarto][knitr::kable()]: arguments passed on to [knitr::kable()] (otherwise, has no use).
|
||||
@@ -413,6 +414,7 @@ antibiogram <- function(x,
|
||||
conf_interval = 0.95,
|
||||
interval_side = "two-tailed",
|
||||
info = interactive(),
|
||||
parallel = FALSE,
|
||||
...) {
|
||||
UseMethod("antibiogram")
|
||||
}
|
||||
@@ -439,6 +441,7 @@ antibiogram.default <- function(x,
|
||||
conf_interval = 0.95,
|
||||
interval_side = "two-tailed",
|
||||
info = interactive(),
|
||||
parallel = FALSE,
|
||||
...) {
|
||||
meet_criteria(x, allow_class = "data.frame")
|
||||
x <- ascertain_sir_classes(x, "x")
|
||||
@@ -478,6 +481,35 @@ antibiogram.default <- function(x,
|
||||
meet_criteria(conf_interval, allow_class = c("numeric", "integer"), has_length = 1, is_finite = TRUE, is_positive = TRUE)
|
||||
meet_criteria(interval_side, allow_class = "character", has_length = 1, is_in = c("two-tailed", "left", "right"))
|
||||
meet_criteria(info, allow_class = "logical", has_length = 1)
|
||||
meet_criteria(parallel, allow_class = "logical", has_length = 1)
|
||||
|
||||
# parallel gate - identical pattern to as.sir()
|
||||
if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) {
|
||||
if (isFALSE(parallel)) {
|
||||
message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.")
|
||||
}
|
||||
parallel <- TRUE
|
||||
}
|
||||
if (isTRUE(parallel)) {
|
||||
stop_ifnot(
|
||||
requireNamespace("future.apply", quietly = TRUE),
|
||||
"Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n",
|
||||
"Install it with {.code install.packages(\"future.apply\")}."
|
||||
)
|
||||
stop_if(inherits(future::plan(), "sequential"),
|
||||
"Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n",
|
||||
"For your system, you could first run: {.code library(future); ",
|
||||
ifelse(.Platform$OS.type == "windows" || in_rstudio(),
|
||||
"plan(multisession)",
|
||||
"plan(multicore)"
|
||||
),
|
||||
"}",
|
||||
call = FALSE
|
||||
)
|
||||
n_workers <- future::nbrOfWorkers()
|
||||
} else {
|
||||
n_workers <- 1L
|
||||
}
|
||||
|
||||
# try to find columns based on type
|
||||
if (is.null(col_mo)) {
|
||||
@@ -705,52 +737,96 @@ antibiogram.default <- function(x,
|
||||
|
||||
wisca_parameters <- out
|
||||
|
||||
progress <- progress_ticker(
|
||||
n = length(unique(wisca_parameters$group)) * simulations,
|
||||
n_min = 25,
|
||||
print = info,
|
||||
title = paste("Calculating WISCA for", length(unique(wisca_parameters$group)), "regimens")
|
||||
)
|
||||
on.exit(close(progress))
|
||||
|
||||
# run WISCA per group
|
||||
for (group in unique(wisca_parameters$group)) {
|
||||
params_current <- wisca_parameters[wisca_parameters$group == group, , drop = FALSE]
|
||||
if (sum(params_current$n_tested, na.rm = TRUE) == 0) {
|
||||
next
|
||||
}
|
||||
|
||||
# prepare priors
|
||||
priors_current <- create_wisca_priors(params_current)
|
||||
|
||||
# Monte Carlo simulations
|
||||
coverage_simulations <- vapply(
|
||||
FUN.VALUE = double(1),
|
||||
seq_len(simulations), function(i) {
|
||||
progress$tick()
|
||||
simulate_coverage(priors_current)
|
||||
}
|
||||
)
|
||||
|
||||
# summarise results
|
||||
coverage_mean <- mean(coverage_simulations)
|
||||
|
||||
if (interval_side == "two-tailed") {
|
||||
probs <- c((1 - conf_interval) / 2, 1 - (1 - conf_interval) / 2)
|
||||
} else if (interval_side == "left") {
|
||||
probs <- c(0, conf_interval)
|
||||
} else if (interval_side == "right") {
|
||||
probs <- c(1 - conf_interval, 1)
|
||||
}
|
||||
|
||||
coverage_ci <- unname(stats::quantile(coverage_simulations, probs = probs))
|
||||
|
||||
out_wisca$coverage[out_wisca$group == group] <- coverage_mean
|
||||
out_wisca$lower_ci[out_wisca$group == group] <- coverage_ci[1]
|
||||
out_wisca$upper_ci[out_wisca$group == group] <- coverage_ci[2]
|
||||
# quantile probabilities are constant across all groups
|
||||
probs <- if (interval_side == "two-tailed") {
|
||||
c((1 - conf_interval) / 2, 1 - (1 - conf_interval) / 2)
|
||||
} else if (interval_side == "left") {
|
||||
c(0, conf_interval)
|
||||
} else {
|
||||
c(1 - conf_interval, 1)
|
||||
}
|
||||
|
||||
close(progress)
|
||||
unique_groups <- as.character(unique(wisca_parameters$group))
|
||||
|
||||
use_parallel_wisca <- isTRUE(parallel) && n_workers > 1L && length(unique_groups) > 0L
|
||||
|
||||
if (use_parallel_wisca) {
|
||||
if (isTRUE(info)) {
|
||||
message_("Running WISCA in parallel mode using ", n_workers, " workers...", as_note = FALSE, appendLF = FALSE)
|
||||
}
|
||||
# chunks_per_group gives ~n_workers total jobs so all workers stay busy
|
||||
# even when the number of regimens is smaller than n_workers
|
||||
chunks_per_group <- max(1L, ceiling(n_workers / length(unique_groups)))
|
||||
chunk_sizes <- diff(c(0L, round(seq_len(chunks_per_group) * simulations / chunks_per_group)))
|
||||
|
||||
# precompute priors per group and build (group, chunk) job list
|
||||
jobs <- unlist(lapply(unique_groups, function(g) {
|
||||
params_g <- wisca_parameters[wisca_parameters$group == g, , drop = FALSE]
|
||||
if (sum(params_g$n_tested, na.rm = TRUE) == 0L) return(NULL)
|
||||
priors_g <- create_wisca_priors(params_g)
|
||||
lapply(seq_along(chunk_sizes), function(ch) {
|
||||
list(group = g, priors = priors_g, n_sims = chunk_sizes[ch])
|
||||
})
|
||||
}), recursive = FALSE)
|
||||
jobs <- Filter(Negate(is.null), jobs)
|
||||
|
||||
flat <- future.apply::future_lapply(jobs, function(job) {
|
||||
vapply(FUN.VALUE = double(1), seq_len(job$n_sims), function(i) {
|
||||
simulate_coverage(job$priors)
|
||||
})
|
||||
}, future.seed = TRUE)
|
||||
|
||||
# reassemble per group: concatenate chunks, then summarise
|
||||
for (g in unique_groups) {
|
||||
g_idx <- vapply(jobs, function(j) identical(j$group, g), logical(1))
|
||||
if (!any(g_idx)) next
|
||||
sims <- unlist(flat[g_idx], use.names = FALSE)
|
||||
out_wisca$coverage[out_wisca$group == g] <- mean(sims)
|
||||
ci_vals <- unname(stats::quantile(sims, probs = probs))
|
||||
out_wisca$lower_ci[out_wisca$group == g] <- ci_vals[1]
|
||||
out_wisca$upper_ci[out_wisca$group == g] <- ci_vals[2]
|
||||
}
|
||||
|
||||
if (isTRUE(info)) message_(font_green_bg(" DONE "), as_note = FALSE)
|
||||
|
||||
} else {
|
||||
progress <- progress_ticker(
|
||||
n = length(unique_groups) * simulations,
|
||||
n_min = 25,
|
||||
print = info,
|
||||
title = paste("Calculating WISCA for", length(unique_groups), "regimens")
|
||||
)
|
||||
on.exit(close(progress), add = TRUE)
|
||||
|
||||
for (group in unique_groups) {
|
||||
params_current <- wisca_parameters[wisca_parameters$group == group, , drop = FALSE]
|
||||
if (sum(params_current$n_tested, na.rm = TRUE) == 0) next
|
||||
priors_current <- create_wisca_priors(params_current)
|
||||
coverage_simulations <- vapply(
|
||||
FUN.VALUE = double(1),
|
||||
seq_len(simulations), function(i) {
|
||||
progress$tick()
|
||||
simulate_coverage(priors_current)
|
||||
}
|
||||
)
|
||||
out_wisca$coverage[out_wisca$group == group] <- mean(coverage_simulations)
|
||||
ci_vals <- unname(stats::quantile(coverage_simulations, probs = probs))
|
||||
out_wisca$lower_ci[out_wisca$group == group] <- ci_vals[1]
|
||||
out_wisca$upper_ci[out_wisca$group == group] <- ci_vals[2]
|
||||
}
|
||||
close(progress)
|
||||
if (isTRUE(info) && simulations >= 500 && length(unique_groups) >= 3) {
|
||||
suggest <- ifelse(.Platform$OS.type == "windows" || in_rstudio(),
|
||||
"plan(multisession)",
|
||||
"plan(multicore)"
|
||||
)
|
||||
if (requireNamespace("future.apply", quietly = TRUE)) {
|
||||
message_("Running in sequential mode. To speed up WISCA, set a parallel {.help [{.fun future::plan}](future::plan)} such as {.code ", suggest, "} and use {.code parallel = TRUE}.")
|
||||
} else {
|
||||
message_("Running in sequential mode. To speed up WISCA, install the {.pkg future.apply} package and then set {.code parallel = TRUE}.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# final output preparation
|
||||
out <- out_wisca
|
||||
@@ -997,30 +1073,50 @@ antibiogram.grouped_df <- function(x,
|
||||
conf_interval = 0.95,
|
||||
interval_side = "two-tailed",
|
||||
info = interactive(),
|
||||
parallel = FALSE,
|
||||
...) {
|
||||
stop_ifnot(is.null(mo_transform), "{.arg mo_transform} must not be set if creating an antibiogram using a grouped tibble. The groups will become the variables over which the antimicrobials are calculated, which could include the pathogen information (though not necessary). Nonetheless, this makes {.arg mo_transform} redundant.", call = FALSE)
|
||||
stop_ifnot(is.null(syndromic_group), "{.arg syndromic_group} must not be set if creating an antibiogram using a grouped tibble. The groups will become the variables over which the antimicrobials are calculated, making {.arg syndromic_group} redundant.", call = FALSE)
|
||||
meet_criteria(parallel, allow_class = "logical", has_length = 1)
|
||||
|
||||
groups <- attributes(x)$groups
|
||||
n_groups <- NROW(groups)
|
||||
progress <- progress_ticker(
|
||||
n = n_groups,
|
||||
n_min = 5,
|
||||
print = info,
|
||||
title = paste("Calculating AMR for", n_groups, "groups")
|
||||
)
|
||||
on.exit(close(progress))
|
||||
|
||||
out <- NULL
|
||||
wisca_parameters <- NULL
|
||||
long_numeric <- NULL
|
||||
|
||||
for (i in seq_len(n_groups)) {
|
||||
progress$tick()
|
||||
rows <- unlist(groups[i, ]$.rows)
|
||||
if (length(rows) == 0) {
|
||||
next
|
||||
# parallel gate - identical pattern to as.sir()
|
||||
if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) {
|
||||
if (isFALSE(parallel)) {
|
||||
message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.")
|
||||
}
|
||||
new_out <- antibiogram(as.data.frame(x)[rows, , drop = FALSE],
|
||||
parallel <- TRUE
|
||||
}
|
||||
if (isTRUE(parallel)) {
|
||||
stop_ifnot(
|
||||
requireNamespace("future.apply", quietly = TRUE),
|
||||
"Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n",
|
||||
"Install it with {.code install.packages(\"future.apply\")}."
|
||||
)
|
||||
stop_if(inherits(future::plan(), "sequential"),
|
||||
"Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n",
|
||||
"For your system, you could first run: {.code library(future); ",
|
||||
ifelse(.Platform$OS.type == "windows" || in_rstudio(),
|
||||
"plan(multisession)",
|
||||
"plan(multicore)"
|
||||
),
|
||||
"}",
|
||||
call = FALSE
|
||||
)
|
||||
n_workers <- future::nbrOfWorkers()
|
||||
} else {
|
||||
n_workers <- 1L
|
||||
}
|
||||
|
||||
use_parallel <- isTRUE(parallel) && n_workers > 1L && n_groups > 1L
|
||||
|
||||
x_df <- as.data.frame(x)
|
||||
run_group <- function(i) {
|
||||
rows <- unlist(groups[i, ]$.rows)
|
||||
if (length(rows) == 0L) return(NULL)
|
||||
antibiogram(x_df[rows, , drop = FALSE],
|
||||
antimicrobials = antimicrobials,
|
||||
mo_transform = NULL,
|
||||
ab_transform = ab_transform,
|
||||
@@ -1040,12 +1136,42 @@ antibiogram.grouped_df <- function(x,
|
||||
conf_interval = conf_interval,
|
||||
interval_side = interval_side,
|
||||
info = FALSE,
|
||||
...
|
||||
parallel = FALSE # never nest parallelism in workers
|
||||
)
|
||||
}
|
||||
|
||||
if (use_parallel) {
|
||||
if (isTRUE(info)) {
|
||||
message_("Running antibiogram for ", n_groups, " groups in parallel using ", n_workers, " workers...", as_note = FALSE, appendLF = FALSE)
|
||||
}
|
||||
results_raw <- future.apply::future_lapply(seq_len(n_groups), run_group, future.seed = TRUE)
|
||||
if (isTRUE(info)) message_(font_green_bg(" DONE "), as_note = FALSE)
|
||||
} else {
|
||||
progress <- progress_ticker(
|
||||
n = n_groups,
|
||||
n_min = 5,
|
||||
print = info,
|
||||
title = paste("Calculating AMR for", n_groups, "groups")
|
||||
)
|
||||
on.exit(close(progress), add = TRUE)
|
||||
results_raw <- vector("list", n_groups)
|
||||
for (i in seq_len(n_groups)) {
|
||||
progress$tick()
|
||||
results_raw[[i]] <- run_group(i)
|
||||
}
|
||||
close(progress)
|
||||
}
|
||||
|
||||
out <- NULL
|
||||
wisca_parameters <- NULL
|
||||
long_numeric <- NULL
|
||||
|
||||
for (i in seq_len(n_groups)) {
|
||||
new_out <- results_raw[[i]]
|
||||
new_wisca_parameters <- attributes(new_out)$wisca_parameters
|
||||
new_long_numeric <- attributes(new_out)$long_numeric
|
||||
|
||||
if (NROW(new_out) == 0) {
|
||||
if (is.null(new_out) || NROW(new_out) == 0) {
|
||||
next
|
||||
}
|
||||
|
||||
@@ -1071,8 +1197,7 @@ antibiogram.grouped_df <- function(x,
|
||||
new_long_numeric <- new_long_numeric[, c(col_name, setdiff(names(new_long_numeric), col_name))] # set place to 1st col
|
||||
}
|
||||
|
||||
if (i == 1) {
|
||||
# the first go
|
||||
if (is.null(out)) {
|
||||
out <- new_out
|
||||
wisca_parameters <- new_wisca_parameters
|
||||
long_numeric <- new_long_numeric
|
||||
@@ -1083,8 +1208,6 @@ antibiogram.grouped_df <- function(x,
|
||||
}
|
||||
}
|
||||
|
||||
close(progress)
|
||||
|
||||
out <- structure(as_original_data_class(out, class(x), extra_class = "antibiogram"),
|
||||
has_syndromic_group = FALSE,
|
||||
combine_SI = isTRUE(combine_SI),
|
||||
@@ -1116,6 +1239,7 @@ wisca <- function(x,
|
||||
conf_interval = 0.95,
|
||||
interval_side = "two-tailed",
|
||||
info = interactive(),
|
||||
parallel = FALSE,
|
||||
...) {
|
||||
antibiogram(
|
||||
x = x,
|
||||
@@ -1137,6 +1261,7 @@ wisca <- function(x,
|
||||
conf_interval = conf_interval,
|
||||
interval_side = interval_side,
|
||||
info = info,
|
||||
parallel = parallel,
|
||||
...
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user