mirror of
https://github.com/msberends/AMR.git
synced 2026-04-28 15:43:47 +02:00
Require user plan() for parallel=TRUE; fix as_wt_nwt false-positive warnings
- parallel = TRUE now errors with a cli-styled message if no non-sequential future::plan() is active; users must call e.g. future::plan(future::multisession) before using parallel = TRUE (breaking change) - Removed auto-setup/teardown of multisession plan inside as.sir(), which was slow and caused version-mismatch issues with load_all() workflows - Added as_wt_nwt to the exclusion list in as_sir_method() to suppress false-positive "no longer used" warnings during parallel runs - Fixed pieces_per_col row-batch calculation to use n_workers (total available workers from the active plan) instead of n_cores (workers clipped to n_cols), so row-batch mode activates correctly when n_cols < n_workers - Updated @param parallel and @param max_cores roxygen docs; regenerated man/as.sir.Rd - Updated sequential-mode hint to instruct users to set plan() first https://claude.ai/code/session_01M1Jvf2Miu6JL4TQrEh1wS8
This commit is contained in:
55
R/sir.R
55
R/sir.R
@@ -717,8 +717,8 @@ as.sir.disk <- function(x,
|
||||
}
|
||||
|
||||
#' @rdname as.sir
|
||||
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. If a non-sequential [future::plan()] is already active (e.g., set by the user via `future::plan(future::multisession)`), that plan is respected and left unchanged after the call. Otherwise, a temporary `multisession` plan is set automatically and torn down on exit. Parallelism distributes columns across workers; it is most beneficial when there are many antibiotic columns and a large number of rows.
|
||||
#' @param max_cores Maximum number of workers to use if `parallel = TRUE` and no [future::plan()] has been set by the user. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means at most 6 workers will be used. Defaults to `-1`. There will never be used more workers than variables to analyse. The number of available cores is detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. This argument is ignored when the user has already set a [future::plan()].
|
||||
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. **A non-sequential [future::plan()] must already be active before setting `parallel = TRUE`** — for example, `future::plan(future::multisession)`. An error is thrown if `parallel = TRUE` is used without a plan set by the user. Parallelism distributes columns (and optionally row batches) across workers; it is most beneficial when there are many antibiotic columns and a large number of rows.
|
||||
#' @param max_cores Maximum number of workers to use when `parallel = TRUE`. Use a negative value to subtract that number from the available workers, e.g. a value of `-2` means at most `nbrOfWorkers() - 2` workers will be used. Defaults to `-1` (all but one worker). There will never be more workers used than there are antibiotic columns to analyse.
|
||||
#' @export
|
||||
as.sir.data.frame <- function(x,
|
||||
...,
|
||||
@@ -912,28 +912,35 @@ as.sir.data.frame <- function(x,
|
||||
}
|
||||
|
||||
# set up parallel computing
|
||||
n_cores <- get_n_cores(max_cores = max_cores)
|
||||
n_cores <- min(n_cores, length(ab_cols)) # never more cores than variables required
|
||||
plan_was_set_by_us <- FALSE
|
||||
if (isTRUE(parallel)) {
|
||||
if (!requireNamespace("future.apply", quietly = TRUE)) {
|
||||
warning(
|
||||
"parallel = TRUE requires the 'future.apply' package. ",
|
||||
"Install it with: install.packages(\"future.apply\"). ",
|
||||
"Falling back to sequential processing.",
|
||||
call. = FALSE
|
||||
stop_(
|
||||
"Setting {.arg parallel} to {.code TRUE} requires the {.pkg future.apply} package.\n",
|
||||
"Install it with: ", highlight_code('install.packages("future.apply")'), "."
|
||||
)
|
||||
parallel <- FALSE
|
||||
} else if (!inherits(future::plan(), "sequential")) {
|
||||
# honour the user's active plan; update worker count accordingly
|
||||
n_cores <- min(future::nbrOfWorkers(), length(ab_cols))
|
||||
} else {
|
||||
# no active plan: set multisession with our computed worker count
|
||||
future::plan(future::multisession, workers = n_cores)
|
||||
plan_was_set_by_us <- TRUE
|
||||
}
|
||||
if (inherits(future::plan(), "sequential")) {
|
||||
stop_(
|
||||
"Setting {.arg parallel} to {.code TRUE} requires a non-sequential {.help [future::plan](future::plan)} to be active.\n",
|
||||
"Set a parallel plan before calling {.help [{.fun as.sir}](AMR::as.sir)}, for example:\n",
|
||||
highlight_code("future::plan(future::multisession)"), "\n",
|
||||
"Or on Linux/macOS for fork-based workers:\n",
|
||||
highlight_code("future::plan(future::multicore)"), "\n",
|
||||
"See {.help [future::plan](future::plan)} for all available strategies.",
|
||||
call = FALSE
|
||||
)
|
||||
}
|
||||
n_workers <- future::nbrOfWorkers()
|
||||
n_cores <- if (max_cores < 0L) {
|
||||
max(1L, n_workers + max_cores)
|
||||
} else {
|
||||
min(max_cores, n_workers)
|
||||
}
|
||||
n_cores <- min(n_cores, length(ab_cols))
|
||||
} else {
|
||||
n_workers <- 1L
|
||||
n_cores <- 1L
|
||||
}
|
||||
on.exit(if (plan_was_set_by_us) future::plan(future::sequential), add = TRUE)
|
||||
|
||||
if (isTRUE(info)) {
|
||||
message_(as_note = FALSE) # empty line
|
||||
@@ -950,8 +957,8 @@ as.sir.data.frame <- function(x,
|
||||
# gets work. pieces_per_col = ceil(n_workers / n_cols) gives ~n_workers jobs
|
||||
# total; each job processes one column on one row slice, which also reduces
|
||||
# per-worker memory pressure (smaller breakpoints search space).
|
||||
pieces_per_col <- if (is_parallel_run && length(ab_cols) < n_cores) {
|
||||
ceiling(n_cores / length(ab_cols))
|
||||
pieces_per_col <- if (is_parallel_run && length(ab_cols) < n_workers) {
|
||||
ceiling(n_workers / length(ab_cols))
|
||||
} else {
|
||||
1L
|
||||
}
|
||||
@@ -1121,9 +1128,9 @@ as.sir.data.frame <- function(x,
|
||||
if (isTRUE(info) && get_n_cores(Inf) > 1 && NROW(x) * NCOL(x) > 10000) {
|
||||
message_(as_note = FALSE)
|
||||
if (requireNamespace("future.apply", quietly = TRUE)) {
|
||||
message_("Running in sequential mode. Consider setting {.arg parallel} to {.code TRUE} to speed up processing on multiple workers.\n")
|
||||
message_("Running in sequential mode. To speed up processing, set a parallel plan first (e.g., ", highlight_code("future::plan(future::multisession)"), ") and then set {.arg parallel} to {.code TRUE}.\n")
|
||||
} else {
|
||||
message_("Running in sequential mode. Install the {.pkg future.apply} package and set {.arg parallel} to {.code TRUE} to speed up processing on multiple workers.\n")
|
||||
message_("Running in sequential mode. To speed up processing, install the {.pkg future.apply} package, set a parallel plan first (e.g., ", highlight_code("future::plan(future::multisession)"), ") and then set {.arg parallel} to {.code TRUE}.\n")
|
||||
}
|
||||
}
|
||||
# this will contain a progress bar already
|
||||
@@ -1254,7 +1261,7 @@ as_sir_method <- function(method_short,
|
||||
|
||||
# backward compatibilty
|
||||
dots <- list(...)
|
||||
dots <- dots[which(!names(dots) %in% c("warn", "mo.bak", "is_data.frame"))]
|
||||
dots <- dots[which(!names(dots) %in% c("warn", "mo.bak", "is_data.frame", "as_wt_nwt"))]
|
||||
if (length(dots) != 0) {
|
||||
warning_("These arguments in {.help [{.fun as.sir}](AMR::as.sir)} are no longer used: ", vector_and(names(dots), quotes = "`"), ".", call = FALSE)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user