1
0
mirror of https://github.com/msberends/AMR.git synced 2026-05-31 18:21:44 +02:00

fix parallel

This commit is contained in:
2026-04-30 00:41:17 +02:00
parent 20c9447096
commit 49e440d037
14 changed files with 155 additions and 174 deletions

View File

@@ -1681,28 +1681,6 @@ readRDS_AMR <- function(file, refhook = NULL) {
readRDS(con, refhook = refhook)
}
get_n_cores <- function(max_cores = Inf) {
if (pkg_is_available("parallelly", min_version = "0.8.0", also_load = FALSE)) {
available_cores <- import_fn("availableCores", "parallelly")
n_cores <- min(available_cores(), na.rm = TRUE)
} else {
# `parallel` is part of base R since 2.14.0, but detectCores() is not very precise on exotic systems like Docker and quota-set Linux environments
n_cores <- parallel::detectCores()[1]
if (is.na(n_cores)) {
n_cores <- 1
}
}
max_cores <- floor(max_cores)
if (max_cores == 0) {
n_cores <- 1
} else if (max_cores < 0) {
n_cores <- max(1, n_cores - abs(max_cores))
} else if (max_cores > 0) {
n_cores <- min(n_cores, max_cores)
}
n_cores
}
# Support `where()` if tidyselect not installed ----
if (!is.null(import_fn("where", "tidyselect", error_on_fail = FALSE))) {
# tidyselect::where() exists, retrieve from their namespace to make `where()`s work across the package in default arguments

82
R/sir.R
View File

@@ -718,7 +718,6 @@ as.sir.disk <- function(x,
#' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. **A non-sequential [future::plan()] must already be active before setting `parallel = TRUE`** — for example, `future::plan(future::multisession)`. An error is thrown if `parallel = TRUE` is used without a plan set by the user. Parallelism distributes columns (and optionally row batches) across workers; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of workers to use when `parallel = TRUE`. Use a negative value to subtract that number from the available workers, e.g. a value of `-2` means at most `nbrOfWorkers() - 2` workers will be used. Defaults to `-1` (all but one worker). There will never be more workers used than there are antibiotic columns to analyse.
#' @export
as.sir.data.frame <- function(x,
...,
@@ -738,7 +737,6 @@ as.sir.data.frame <- function(x,
verbose = FALSE,
info = interactive(),
parallel = FALSE,
max_cores = -1,
conserve_capped_values = NULL) {
meet_criteria(x, allow_class = "data.frame") # will also check for dimensions > 0
meet_criteria(col_mo, allow_class = "character", is_in = colnames(x), allow_NULL = TRUE)
@@ -757,7 +755,6 @@ as.sir.data.frame <- function(x,
meet_criteria(verbose, allow_class = "logical", has_length = 1)
meet_criteria(info, allow_class = "logical", has_length = 1)
meet_criteria(parallel, allow_class = "logical", has_length = 1)
meet_criteria(max_cores, allow_class = c("numeric", "integer"), has_length = 1)
x.bak <- x
if (isTRUE(info) && message_not_thrown_before("as.sir", "sir_interpretation_history")) {
@@ -912,41 +909,36 @@ as.sir.data.frame <- function(x,
}
# set up parallel computing
if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) {
if (isFALSE(parallel)) {
message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.")
}
parallel <- TRUE
}
if (isTRUE(parallel)) {
if (!requireNamespace("future.apply", quietly = TRUE)) {
stop_(
"Setting {.arg parallel} to {.code TRUE} requires the {.pkg future.apply} package.\n",
"Install it with: ", highlight_code('install.packages("future.apply")'), "."
)
}
if (inherits(future::plan(), "sequential")) {
stop_(
"Setting {.arg parallel} to {.code TRUE} requires a non-sequential {.help [future::plan](future::plan)} to be active.\n",
"Set a parallel plan before calling {.help [{.fun as.sir}](AMR::as.sir)}, for example:\n",
highlight_code("future::plan(future::multisession)"), "\n",
"Or on Linux/macOS for fork-based workers:\n",
highlight_code("future::plan(future::multicore)"), "\n",
"See {.help [future::plan](future::plan)} for all available strategies.",
call = FALSE
)
}
stop_ifnot(
requireNamespace("future.apply", quietly = TRUE),
"Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n",
"Install it with {.code install.packages(\"future.apply\")}."
)
stop_if(inherits(future::plan(), "sequential"),
"Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n",
"For your system, you could first run: {.code library(future); ",
ifelse(.Platform$OS.type == "windows" || in_rstudio(),
"plan(multisession)",
"plan(multicore)"
),
"}",
call = FALSE
)
n_workers <- future::nbrOfWorkers()
n_cores <- if (max_cores < 0L) {
max(1L, n_workers + max_cores)
} else {
min(max_cores, n_workers)
}
n_cores <- min(n_cores, length(ab_cols))
n_cores <- min(n_workers, length(ab_cols))
} else {
n_workers <- 1L
n_cores <- 1L
}
if (isTRUE(info)) {
message_(as_note = FALSE) # empty line
message_("Processing columns:", as_note = FALSE)
}
# In parallel mode suppress per-column messages: workers print simultaneously and
# their output would be interleaved on the console.
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
@@ -957,10 +949,10 @@ as.sir.data.frame <- function(x,
# gets work. pieces_per_col = ceil(n_workers / n_cols) gives ~n_workers jobs
# total; each job processes one column on one row slice, which also reduces
# per-worker memory pressure (smaller breakpoints search space).
pieces_per_col <- if (is_parallel_run && length(ab_cols) < n_workers) {
ceiling(n_workers / length(ab_cols))
if (is_parallel_run && length(ab_cols) < n_workers) {
pieces_per_col <- ceiling(n_workers / length(ab_cols))
} else {
1L
pieces_per_col <- 1L
}
run_as_sir_column <- function(i, rows = NULL) {
@@ -1049,7 +1041,7 @@ as.sir.data.frame <- function(x,
ab <- ab_col
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
show_message <- FALSE
if (!all(x[row_idx, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
if (!all(x[row_idx, ab, drop = TRUE] %in% c(VALID_SIR_LEVELS, NA), na.rm = TRUE)) {
show_message <- TRUE
if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
@@ -1086,9 +1078,9 @@ as.sir.data.frame <- function(x,
if (isTRUE(info)) {
message_(as_note = FALSE)
if (pieces_per_col > 1L) {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
message_("Running in parallel mode using ", n_cores, " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
} else {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
message_("Running in parallel mode using ", n_cores, " workers, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
}
}
if (pieces_per_col > 1L) {
@@ -1108,7 +1100,7 @@ as.sir.data.frame <- function(x,
pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))]
list(
result = as.sir(do.call(c, lapply(pieces, function(p) as.character(p$result)))),
log = {
log = {
logs <- Filter(Negate(is.null), lapply(pieces, function(p) p$log))
if (length(logs) > 0L) do.call(rbind_AMR, logs) else NULL
}
@@ -1125,12 +1117,16 @@ as.sir.data.frame <- function(x,
}
} else {
# sequential mode (non-parallel)
if (isTRUE(info) && get_n_cores(Inf) > 1 && NROW(x) * NCOL(x) > 10000) {
if (isTRUE(info) && n_cores > 1 && NROW(x) * NCOL(x) > 10000) {
suggest <- ifelse(.Platform$OS.type == "windows" || in_rstudio(),
"plan(multisession)",
"plan(multicore)"
)
message_(as_note = FALSE)
if (requireNamespace("future.apply", quietly = TRUE)) {
message_("Running in sequential mode. To speed up processing, set a parallel plan first (e.g., ", highlight_code("future::plan(future::multisession)"), ") and then set {.arg parallel} to {.code TRUE}.\n")
message_("Running in sequential mode. To speed up processing, set a parallel {.help [{.fun future::plan}](future::plan)} such as {.code ", suggest, "}.")
} else {
message_("Running in sequential mode. To speed up processing, install the {.pkg future.apply} package, set a parallel plan first (e.g., ", highlight_code("future::plan(future::multisession)"), ") and then set {.arg parallel} to {.code TRUE}.\n")
message_("Running in sequential mode. To speed up processing, install the {.pkg future.apply} package and then set {.code parallel = TRUE}.\n")
}
}
# this will contain a progress bar already
@@ -2102,7 +2098,7 @@ sir_interpretation_history <- function(clean = FALSE) {
#' @noRd
print.sir_log <- function(x, ...) {
if (NROW(x) == 0) {
message_("No results to print. First run {.help [{.fun as.sir}](AMR::as.sir)} on MIC values or disk diffusion zones (or on a {.cls data.frame} containing any of these) to print a {.val logbook} data set here.")
message_("No results to print. First run {.help [{.fun as.sir}](AMR::as.sir)} on MIC values or disk diffusion zones (or on a {.cls data.frame} containing any of these) to print a 'logbook' data set here.")
return(invisible(NULL))
}
class(x) <- class(x)[class(x) != "sir_log"]
@@ -2344,7 +2340,7 @@ coerce_reference_data_columns <- function(x) {
ref <- AMR::clinical_breakpoints
for (col in names(ref)) {
col_ref <- ref[[col]]
col_x <- x[[col]]
col_x <- x[[col]]
if (identical(class(col_ref), class(col_x))) next
if (col == "mo") {
x[[col]] <- suppressMessages(as.mo(col_x))