1
0
mirror of https://github.com/msberends/AMR.git synced 2026-04-28 10:23:53 +02:00

Migrate parallel computing in as.sir() from parallel:: to future/future.apply

Replace parallel::mclapply() and parallel::parLapply() with
future.apply::future_lapply(), enabling transparent support for any
future backend (multisession, multicore, mirai_multisession, cluster)
on all platforms including Windows.

When parallel = TRUE the function now: (1) respects an active
future::plan() set by the user without overriding it on exit, or
(2) sets a temporary multisession plan with parallelly::availableCores()
and tears it down on exit. The max_cores argument controls worker count
only when no user plan is active.

future and future.apply are added to Suggests in DESCRIPTION.

https://claude.ai/code/session_01M1Jvf2Miu6JL4TQrEh1wS8
This commit is contained in:
Claude
2026-04-27 12:21:48 +00:00
parent 3f1b20c304
commit b1cf7a94ad
10 changed files with 80 additions and 101 deletions

120
R/sir.R
View File

@@ -95,7 +95,7 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
#' # for veterinary breakpoints, also set `host`:
#' your_data %>% mutate_if(is.mic, as.sir, host = "column_with_animal_species", guideline = "CLSI")
#'
#' # fast processing with parallel computing:
#' # fast processing with parallel computing (requires future.apply):
#' as.sir(your_data, ..., parallel = TRUE)
#' ```
#' * Operators like "<=" will be considered according to the `capped_mic_handling` setting. At default, an MIC value of e.g. ">2" will return "NI" (non-interpretable) if the breakpoint is 4-8; the *true* MIC could be at either side of the breakpoint. This is to prevent that capped values from raw laboratory data would not be treated conservatively.
@@ -112,7 +112,7 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
#' # for veterinary breakpoints, also set `host`:
#' your_data %>% mutate_if(is.disk, as.sir, host = "column_with_animal_species", guideline = "CLSI")
#'
#' # fast processing with parallel computing:
#' # fast processing with parallel computing (requires future.apply):
#' as.sir(your_data, ..., parallel = TRUE)
#' ```
#'
@@ -220,7 +220,8 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
#' sir_interpretation_history()
#'
#' \donttest{
#' # using parallel computing, which is available in base R:
#' # using parallel computing (requires the future.apply package):
#' # future::plan(future::multisession) # optional: set your own plan first
#' as.sir(df_wide, parallel = TRUE, info = TRUE)
#'
#'
@@ -716,8 +717,8 @@ as.sir.disk <- function(x,
}
#' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. If a non-sequential [future::plan()] is already active (e.g., set by the user via `future::plan(future::multisession)`), that plan is respected and left unchanged after the call. Otherwise, a temporary `multisession` plan is set automatically and torn down on exit. Parallelism distributes columns across workers; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of workers to use if `parallel = TRUE` and no [future::plan()] has been set by the user. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means at most 6 workers will be used. Defaults to `-1`. There will never be used more workers than variables to analyse. The number of available cores is detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. This argument is ignored when the user has already set a [future::plan()].
#' @export
as.sir.data.frame <- function(x,
...,
@@ -913,34 +914,26 @@ as.sir.data.frame <- function(x,
# set up parallel computing
n_cores <- get_n_cores(max_cores = max_cores)
n_cores <- min(n_cores, length(ab_cols)) # never more cores than variables required
if (isTRUE(parallel) && (.Platform$OS.type == "windows" || getRversion() < "4.0.0")) {
cl <- tryCatch(parallel::makeCluster(n_cores, type = "PSOCK"),
error = function(e) {
if (isTRUE(info)) {
message_("Could not create parallel cluster, using single-core computation. Error message: ", conditionMessage(e))
}
return(NULL)
}
)
if (!is.null(cl)) {
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
amr_loaded_on_workers <- tryCatch({
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
TRUE
}, error = function(e) FALSE)
if (!amr_loaded_on_workers) {
if (isTRUE(info)) {
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
}
parallel::stopCluster(cl)
cl <- NULL
}
}
if (is.null(cl)) {
n_cores <- 1
plan_was_set_by_us <- FALSE
if (isTRUE(parallel)) {
if (!requireNamespace("future.apply", quietly = TRUE)) {
warning(
"parallel = TRUE requires the 'future.apply' package. ",
"Install it with: install.packages(\"future.apply\"). ",
"Falling back to sequential processing.",
call. = FALSE
)
parallel <- FALSE
} else if (!inherits(future::plan(), "sequential")) {
# honour the user's active plan; update worker count accordingly
n_cores <- min(future::nbrOfWorkers(), length(ab_cols))
} else {
# no active plan: set multisession with our computed worker count
future::plan(future::multisession, workers = n_cores)
plan_was_set_by_us <- TRUE
}
}
on.exit(if (plan_was_set_by_us) future::plan(future::sequential), add = TRUE)
if (isTRUE(info)) {
message_(as_note = FALSE) # empty line
@@ -952,31 +945,23 @@ as.sir.data.frame <- function(x,
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
effective_info <- if (is_parallel_run) FALSE else info
# Row-batch mode: when n_cols < n_cores we would leave cores idle under plain
# column-parallel dispatch. Instead we split rows into pieces so every core
# gets work. pieces_per_col = ceil(n_cores / n_cols) gives ~n_cores jobs
# Row-batch mode: when n_cols < n_workers we would leave workers idle under plain
# column-parallel dispatch. Instead we split rows into pieces so every worker
# gets work. pieces_per_col = ceil(n_workers / n_cols) gives ~n_workers jobs
# total; each job processes one column on one row slice, which also reduces
# per-worker memory pressure (smaller breakpoints search space).
# Only used for the fork path (R >= 4.0, non-Windows); PSOCK clusters already
# incur high per-job serialisation overhead so we keep column-mode there.
use_fork <- is_parallel_run &&
!(.Platform$OS.type == "windows" || getRversion() < "4.0.0")
pieces_per_col <- if (use_fork && length(ab_cols) < n_cores) {
pieces_per_col <- if (is_parallel_run && length(ab_cols) < n_cores) {
ceiling(n_cores / length(ab_cols))
} else {
1L
}
run_as_sir_column <- function(i, rows = NULL) {
# Always resolve AMR_env from the package namespace. This is essential for
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
# pre-existing log entries from earlier in the session when forking.
# Always resolve AMR_env from the package namespace so workers get the live
# environment rather than a stale serialised copy from the closure.
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
# In parallel mode each worker (fork or PSOCK) has its own copy of the
# history; record the current length so we capture only the new rows added
# by the as.sir() call below, not any pre-existing entries inherited at fork
# time or carried over from earlier as.sir() calls.
# In parallel mode each worker has its own copy of the history; record the
# current length so we capture only the rows added by this as.sir() call.
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
ab_col <- ab_cols[i]
@@ -1090,31 +1075,17 @@ as.sir.data.frame <- function(x,
return(out)
}
if (isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1) {
if (is_parallel_run) {
if (isTRUE(info)) {
message_(as_note = FALSE)
if (pieces_per_col > 1L) {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
} else {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
}
}
if (.Platform$OS.type == "windows" || getRversion() < "4.0.0") {
# PSOCK cluster: column-mode only (row-batch serialisation overhead not worth it)
on.exit(parallel::stopCluster(cl), add = TRUE)
parallel::clusterExport(cl, varlist = c(
"x", "x.bak", "x_mo", "ab_cols", "types",
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
"breakpoint_type", "guideline", "host", "uti", "verbose",
"col_mo", "conserve_capped_values",
"effective_info", "is_parallel_run",
"run_as_sir_column"
), envir = environment())
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
} else if (pieces_per_col > 1L) {
# Row-batch mode (R >= 4.0, non-Windows, n_cols < n_cores):
# build (col, row_slice) job pairs so all cores stay active
if (pieces_per_col > 1L) {
# Row-batch mode: build (col, row_slice) job pairs so all workers stay active
row_cuts <- unique(round(seq(0, nrow(x), length.out = pieces_per_col + 1L)))
row_ranges <- lapply(seq_len(length(row_cuts) - 1L), function(p) {
seq.int(row_cuts[p] + 1L, row_cuts[p + 1L])
@@ -1122,9 +1093,9 @@ as.sir.data.frame <- function(x,
jobs <- do.call(c, lapply(seq_along(ab_cols), function(ci) {
lapply(seq_along(row_ranges), function(p) list(col = ci, rows = row_ranges[[p]]))
}))
flat <- parallel::mclapply(jobs, function(job) {
flat <- future.apply::future_lapply(jobs, function(job) {
run_as_sir_column(job$col, job$rows)
}, mc.cores = n_cores)
}, future.seed = TRUE)
# Reassemble: for each column concatenate row pieces in order
result_list <- lapply(seq_along(ab_cols), function(ci) {
pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))]
@@ -1137,8 +1108,8 @@ as.sir.data.frame <- function(x,
)
})
} else {
# Column-parallel mode (R >= 4.0, non-Windows, n_cols >= n_cores)
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
# Column-parallel mode: one job per antibiotic column
result_list <- future.apply::future_lapply(seq_along(ab_cols), run_as_sir_column, future.seed = TRUE)
}
if (isTRUE(info)) {
message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
@@ -1147,10 +1118,13 @@ as.sir.data.frame <- function(x,
}
} else {
# sequential mode (non-parallel)
if (isTRUE(info) && n_cores > 1 && NROW(x) * NCOL(x) > 10000) {
# give a note that parallel mode might be better
if (isTRUE(info) && get_n_cores(Inf) > 1 && NROW(x) * NCOL(x) > 10000) {
message_(as_note = FALSE)
message_("Running in sequential mode. Consider setting {.arg parallel} to {.code TRUE} to speed up processing on multiple cores.\n")
if (requireNamespace("future.apply", quietly = TRUE)) {
message_("Running in sequential mode. Consider setting {.arg parallel} to {.code TRUE} to speed up processing on multiple workers.\n")
} else {
message_("Running in sequential mode. Install the {.pkg future.apply} package and set {.arg parallel} to {.code TRUE} to speed up processing on multiple workers.\n")
}
}
# this will contain a progress bar already
result_list <- lapply(seq_along(ab_cols), run_as_sir_column)