mirror of
https://github.com/msberends/AMR.git
synced 2026-05-14 04:30:53 +02:00
Migrate parallel computing in as.sir() from parallel:: to future/future.apply (#280)
* Migrate parallel computing in as.sir() from parallel:: to future/future.apply Replace parallel::mclapply() and parallel::parLapply() with future.apply::future_lapply(), enabling transparent support for any future backend (multisession, multicore, mirai_multisession, cluster) on all platforms including Windows. When parallel = TRUE the function now: (1) respects an active future::plan() set by the user without overriding it on exit, or (2) sets a temporary multisession plan with parallelly::availableCores() and tears it down on exit. The max_cores argument controls worker count only when no user plan is active. future and future.apply are added to Suggests in DESCRIPTION. https://claude.ai/code/session_01M1Jvf2Miu6JL4TQrEh1wS8 * Require user plan() for parallel=TRUE; fix as_wt_nwt false-positive warnings - parallel = TRUE now errors with a cli-styled message if no non-sequential future::plan() is active; users must call e.g. future::plan(future::multisession) before using parallel = TRUE (breaking change) - Removed auto-setup/teardown of multisession plan inside as.sir(), which was slow and caused version-mismatch issues with load_all() workflows - Added as_wt_nwt to the exclusion list in as_sir_method() to suppress false-positive "no longer used" warnings during parallel runs - Fixed pieces_per_col row-batch calculation to use n_workers (total available workers from the active plan) instead of n_cores (workers clipped to n_cols), so row-batch mode activates correctly when n_cols < n_workers - Updated @param parallel and @param max_cores roxygen docs; regenerated man/as.sir.Rd - Updated sequential-mode hint to instruct users to set plan() first https://claude.ai/code/session_01M1Jvf2Miu6JL4TQrEh1wS8 * fix parallel * fix parallel * unit tests * unit tedts --------- Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
149
R/sir.R
149
R/sir.R
@@ -95,7 +95,7 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
|
||||
#' # for veterinary breakpoints, also set `host`:
|
||||
#' your_data %>% mutate_if(is.mic, as.sir, host = "column_with_animal_species", guideline = "CLSI")
|
||||
#'
|
||||
#' # fast processing with parallel computing:
|
||||
#' # fast processing with parallel computing (requires future.apply):
|
||||
#' as.sir(your_data, ..., parallel = TRUE)
|
||||
#' ```
|
||||
#' * Operators like "<=" will be considered according to the `capped_mic_handling` setting. At default, an MIC value of e.g. ">2" will return "NI" (non-interpretable) if the breakpoint is 4-8; the *true* MIC could be at either side of the breakpoint. This is to prevent that capped values from raw laboratory data would not be treated conservatively.
|
||||
@@ -112,7 +112,7 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
|
||||
#' # for veterinary breakpoints, also set `host`:
|
||||
#' your_data %>% mutate_if(is.disk, as.sir, host = "column_with_animal_species", guideline = "CLSI")
|
||||
#'
|
||||
#' # fast processing with parallel computing:
|
||||
#' # fast processing with parallel computing (requires future.apply):
|
||||
#' as.sir(your_data, ..., parallel = TRUE)
|
||||
#' ```
|
||||
#'
|
||||
@@ -220,9 +220,6 @@ VALID_SIR_LEVELS <- c("S", "SDD", "I", "R", "NI", "WT", "NWT", "NS")
|
||||
#' sir_interpretation_history()
|
||||
#'
|
||||
#' \donttest{
|
||||
#' # using parallel computing, which is available in base R:
|
||||
#' as.sir(df_wide, parallel = TRUE, info = TRUE)
|
||||
#'
|
||||
#'
|
||||
#' ## Using dplyr -------------------------------------------------
|
||||
#' if (require("dplyr")) {
|
||||
@@ -716,8 +713,7 @@ as.sir.disk <- function(x,
|
||||
}
|
||||
|
||||
#' @rdname as.sir
|
||||
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
|
||||
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
|
||||
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. **A non-sequential [future::plan()] must already be active before setting `parallel = TRUE`** — for example, `future::plan(future::multisession)`. An error is thrown if `parallel = TRUE` is used without a plan set by the user. Parallelism distributes columns (and optionally row batches) across workers; it is most beneficial when there are many antibiotic columns and a large number of rows.
|
||||
#' @export
|
||||
as.sir.data.frame <- function(x,
|
||||
...,
|
||||
@@ -737,7 +733,6 @@ as.sir.data.frame <- function(x,
|
||||
verbose = FALSE,
|
||||
info = interactive(),
|
||||
parallel = FALSE,
|
||||
max_cores = -1,
|
||||
conserve_capped_values = NULL) {
|
||||
meet_criteria(x, allow_class = "data.frame") # will also check for dimensions > 0
|
||||
meet_criteria(col_mo, allow_class = "character", is_in = colnames(x), allow_NULL = TRUE)
|
||||
@@ -756,7 +751,6 @@ as.sir.data.frame <- function(x,
|
||||
meet_criteria(verbose, allow_class = "logical", has_length = 1)
|
||||
meet_criteria(info, allow_class = "logical", has_length = 1)
|
||||
meet_criteria(parallel, allow_class = "logical", has_length = 1)
|
||||
meet_criteria(max_cores, allow_class = c("numeric", "integer"), has_length = 1)
|
||||
x.bak <- x
|
||||
|
||||
if (isTRUE(info) && message_not_thrown_before("as.sir", "sir_interpretation_history")) {
|
||||
@@ -911,40 +905,34 @@ as.sir.data.frame <- function(x,
|
||||
}
|
||||
|
||||
# set up parallel computing
|
||||
n_cores <- get_n_cores(max_cores = max_cores)
|
||||
n_cores <- min(n_cores, length(ab_cols)) # never more cores than variables required
|
||||
if (isTRUE(parallel) && (.Platform$OS.type == "windows" || getRversion() < "4.0.0")) {
|
||||
cl <- tryCatch(parallel::makeCluster(n_cores, type = "PSOCK"),
|
||||
error = function(e) {
|
||||
if (isTRUE(info)) {
|
||||
message_("Could not create parallel cluster, using single-core computation. Error message: ", conditionMessage(e))
|
||||
}
|
||||
return(NULL)
|
||||
}
|
||||
)
|
||||
if (!is.null(cl)) {
|
||||
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
|
||||
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
|
||||
amr_loaded_on_workers <- tryCatch({
|
||||
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
|
||||
TRUE
|
||||
}, error = function(e) FALSE)
|
||||
if (!amr_loaded_on_workers) {
|
||||
if (isTRUE(info)) {
|
||||
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
|
||||
}
|
||||
parallel::stopCluster(cl)
|
||||
cl <- NULL
|
||||
}
|
||||
}
|
||||
if (is.null(cl)) {
|
||||
n_cores <- 1
|
||||
if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) {
|
||||
if (isFALSE(parallel)) {
|
||||
message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.")
|
||||
}
|
||||
parallel <- TRUE
|
||||
}
|
||||
if (isTRUE(parallel)) {
|
||||
stop_ifnot(
|
||||
requireNamespace("future.apply", quietly = TRUE),
|
||||
"Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n",
|
||||
"Install it with {.code install.packages(\"future.apply\")}."
|
||||
)
|
||||
stop_if(inherits(future::plan(), "sequential"),
|
||||
"Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n",
|
||||
"For your system, you could first run: {.code library(future); ",
|
||||
ifelse(.Platform$OS.type == "windows" || in_rstudio(),
|
||||
"plan(multisession)",
|
||||
"plan(multicore)"
|
||||
),
|
||||
"}",
|
||||
call = FALSE
|
||||
)
|
||||
|
||||
if (isTRUE(info)) {
|
||||
message_(as_note = FALSE) # empty line
|
||||
message_("Processing columns:", as_note = FALSE)
|
||||
n_workers <- future::nbrOfWorkers()
|
||||
n_cores <- min(n_workers, length(ab_cols))
|
||||
} else {
|
||||
n_workers <- 1L
|
||||
n_cores <- 1L
|
||||
}
|
||||
|
||||
# In parallel mode suppress per-column messages: workers print simultaneously and
|
||||
@@ -952,31 +940,23 @@ as.sir.data.frame <- function(x,
|
||||
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
|
||||
effective_info <- if (is_parallel_run) FALSE else info
|
||||
|
||||
# Row-batch mode: when n_cols < n_cores we would leave cores idle under plain
|
||||
# column-parallel dispatch. Instead we split rows into pieces so every core
|
||||
# gets work. pieces_per_col = ceil(n_cores / n_cols) gives ~n_cores jobs
|
||||
# Row-batch mode: when n_cols < n_workers we would leave workers idle under plain
|
||||
# column-parallel dispatch. Instead we split rows into pieces so every worker
|
||||
# gets work. pieces_per_col = ceil(n_workers / n_cols) gives ~n_workers jobs
|
||||
# total; each job processes one column on one row slice, which also reduces
|
||||
# per-worker memory pressure (smaller breakpoints search space).
|
||||
# Only used for the fork path (R >= 4.0, non-Windows); PSOCK clusters already
|
||||
# incur high per-job serialisation overhead so we keep column-mode there.
|
||||
use_fork <- is_parallel_run &&
|
||||
!(.Platform$OS.type == "windows" || getRversion() < "4.0.0")
|
||||
pieces_per_col <- if (use_fork && length(ab_cols) < n_cores) {
|
||||
ceiling(n_cores / length(ab_cols))
|
||||
if (is_parallel_run && length(ab_cols) < n_workers) {
|
||||
pieces_per_col <- ceiling(n_workers / length(ab_cols))
|
||||
} else {
|
||||
1L
|
||||
pieces_per_col <- 1L
|
||||
}
|
||||
|
||||
run_as_sir_column <- function(i, rows = NULL) {
|
||||
# Always resolve AMR_env from the package namespace. This is essential for
|
||||
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
|
||||
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
|
||||
# pre-existing log entries from earlier in the session when forking.
|
||||
# Always resolve AMR_env from the package namespace so workers get the live
|
||||
# environment rather than a stale serialised copy from the closure.
|
||||
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
|
||||
# In parallel mode each worker (fork or PSOCK) has its own copy of the
|
||||
# history; record the current length so we capture only the new rows added
|
||||
# by the as.sir() call below, not any pre-existing entries inherited at fork
|
||||
# time or carried over from earlier as.sir() calls.
|
||||
# In parallel mode each worker has its own copy of the history; record the
|
||||
# current length so we capture only the rows added by this as.sir() call.
|
||||
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
|
||||
|
||||
ab_col <- ab_cols[i]
|
||||
@@ -1057,7 +1037,7 @@ as.sir.data.frame <- function(x,
|
||||
ab <- ab_col
|
||||
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
|
||||
show_message <- FALSE
|
||||
if (!all(x[row_idx, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
|
||||
if (!all(x[row_idx, ab, drop = TRUE] %in% c(VALID_SIR_LEVELS, NA), na.rm = TRUE)) {
|
||||
show_message <- TRUE
|
||||
if (isTRUE(effective_info)) {
|
||||
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
|
||||
@@ -1090,31 +1070,17 @@ as.sir.data.frame <- function(x,
|
||||
return(out)
|
||||
}
|
||||
|
||||
if (isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1) {
|
||||
if (is_parallel_run) {
|
||||
if (isTRUE(info)) {
|
||||
message_(as_note = FALSE)
|
||||
if (pieces_per_col > 1L) {
|
||||
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
|
||||
message_("Running in parallel mode using ", n_cores, " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
|
||||
} else {
|
||||
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
|
||||
message_("Running in parallel mode using ", n_cores, " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
|
||||
}
|
||||
}
|
||||
if (.Platform$OS.type == "windows" || getRversion() < "4.0.0") {
|
||||
# PSOCK cluster: column-mode only (row-batch serialisation overhead not worth it)
|
||||
on.exit(parallel::stopCluster(cl), add = TRUE)
|
||||
parallel::clusterExport(cl, varlist = c(
|
||||
"x", "x.bak", "x_mo", "ab_cols", "types",
|
||||
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
|
||||
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
|
||||
"breakpoint_type", "guideline", "host", "uti", "verbose",
|
||||
"col_mo", "conserve_capped_values",
|
||||
"effective_info", "is_parallel_run",
|
||||
"run_as_sir_column"
|
||||
), envir = environment())
|
||||
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
|
||||
} else if (pieces_per_col > 1L) {
|
||||
# Row-batch mode (R >= 4.0, non-Windows, n_cols < n_cores):
|
||||
# build (col, row_slice) job pairs so all cores stay active
|
||||
if (pieces_per_col > 1L) {
|
||||
# Row-batch mode: build (col, row_slice) job pairs so all workers stay active
|
||||
row_cuts <- unique(round(seq(0, nrow(x), length.out = pieces_per_col + 1L)))
|
||||
row_ranges <- lapply(seq_len(length(row_cuts) - 1L), function(p) {
|
||||
seq.int(row_cuts[p] + 1L, row_cuts[p + 1L])
|
||||
@@ -1122,23 +1088,23 @@ as.sir.data.frame <- function(x,
|
||||
jobs <- do.call(c, lapply(seq_along(ab_cols), function(ci) {
|
||||
lapply(seq_along(row_ranges), function(p) list(col = ci, rows = row_ranges[[p]]))
|
||||
}))
|
||||
flat <- parallel::mclapply(jobs, function(job) {
|
||||
flat <- future.apply::future_lapply(jobs, function(job) {
|
||||
run_as_sir_column(job$col, job$rows)
|
||||
}, mc.cores = n_cores)
|
||||
}, future.seed = TRUE)
|
||||
# Reassemble: for each column concatenate row pieces in order
|
||||
result_list <- lapply(seq_along(ab_cols), function(ci) {
|
||||
pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))]
|
||||
list(
|
||||
result = as.sir(do.call(c, lapply(pieces, function(p) as.character(p$result)))),
|
||||
log = {
|
||||
log = {
|
||||
logs <- Filter(Negate(is.null), lapply(pieces, function(p) p$log))
|
||||
if (length(logs) > 0L) do.call(rbind_AMR, logs) else NULL
|
||||
}
|
||||
)
|
||||
})
|
||||
} else {
|
||||
# Column-parallel mode (R >= 4.0, non-Windows, n_cols >= n_cores)
|
||||
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
|
||||
# Column-parallel mode: one job per antibiotic column
|
||||
result_list <- future.apply::future_lapply(seq_along(ab_cols), run_as_sir_column, future.seed = TRUE)
|
||||
}
|
||||
if (isTRUE(info)) {
|
||||
message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
|
||||
@@ -1148,9 +1114,16 @@ as.sir.data.frame <- function(x,
|
||||
} else {
|
||||
# sequential mode (non-parallel)
|
||||
if (isTRUE(info) && n_cores > 1 && NROW(x) * NCOL(x) > 10000) {
|
||||
# give a note that parallel mode might be better
|
||||
suggest <- ifelse(.Platform$OS.type == "windows" || in_rstudio(),
|
||||
"plan(multisession)",
|
||||
"plan(multicore)"
|
||||
)
|
||||
message_(as_note = FALSE)
|
||||
message_("Running in sequential mode. Consider setting {.arg parallel} to {.code TRUE} to speed up processing on multiple cores.\n")
|
||||
if (requireNamespace("future.apply", quietly = TRUE)) {
|
||||
message_("Running in sequential mode. To speed up processing, set a parallel {.help [{.fun future::plan}](future::plan)} such as {.code ", suggest, "}.")
|
||||
} else {
|
||||
message_("Running in sequential mode. To speed up processing, install the {.pkg future.apply} package and then set {.code parallel = TRUE}.\n")
|
||||
}
|
||||
}
|
||||
# this will contain a progress bar already
|
||||
result_list <- lapply(seq_along(ab_cols), run_as_sir_column)
|
||||
@@ -1280,7 +1253,7 @@ as_sir_method <- function(method_short,
|
||||
|
||||
# backward compatibilty
|
||||
dots <- list(...)
|
||||
dots <- dots[which(!names(dots) %in% c("warn", "mo.bak", "is_data.frame"))]
|
||||
dots <- dots[which(!names(dots) %in% c("warn", "mo.bak", "is_data.frame", "as_wt_nwt"))]
|
||||
if (length(dots) != 0) {
|
||||
warning_("These arguments in {.help [{.fun as.sir}](AMR::as.sir)} are no longer used: ", vector_and(names(dots), quotes = "`"), ".", call = FALSE)
|
||||
}
|
||||
@@ -2121,7 +2094,7 @@ sir_interpretation_history <- function(clean = FALSE) {
|
||||
#' @noRd
|
||||
print.sir_log <- function(x, ...) {
|
||||
if (NROW(x) == 0) {
|
||||
message_("No results to print. First run {.help [{.fun as.sir}](AMR::as.sir)} on MIC values or disk diffusion zones (or on a {.cls data.frame} containing any of these) to print a {.val logbook} data set here.")
|
||||
message_("No results to print. First run {.help [{.fun as.sir}](AMR::as.sir)} on MIC values or disk diffusion zones (or on a {.cls data.frame} containing any of these) to print a 'logbook' data set here.")
|
||||
return(invisible(NULL))
|
||||
}
|
||||
class(x) <- class(x)[class(x) != "sir_log"]
|
||||
@@ -2363,7 +2336,7 @@ coerce_reference_data_columns <- function(x) {
|
||||
ref <- AMR::clinical_breakpoints
|
||||
for (col in names(ref)) {
|
||||
col_ref <- ref[[col]]
|
||||
col_x <- x[[col]]
|
||||
col_x <- x[[col]]
|
||||
if (identical(class(col_ref), class(col_x))) next
|
||||
if (col == "mo") {
|
||||
x[[col]] <- suppressMessages(as.mo(col_x))
|
||||
|
||||
Reference in New Issue
Block a user