diff --git a/DESCRIPTION b/DESCRIPTION index b817b7a57..7d2524da3 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: AMR -Version: 3.0.1.9053 -Date: 2026-04-27 +Version: 3.0.1.9050 +Date: 2026-04-30 Title: Antimicrobial Resistance Data Analysis Description: Functions to simplify and standardise antimicrobial resistance (AMR) data analysis and to work with microbial and antimicrobial properties by diff --git a/NEWS.md b/NEWS.md index 9ae3064af..16100de07 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# AMR 3.0.1.9053 +# AMR 3.0.1.9050 This will become release v3.1.0, intended for launch end of May. @@ -7,6 +7,7 @@ This will become release v3.1.0, intended for launch end of May. * Support for the [`future`](https://future.futureverse.org) package and its framework, as the previous implementation of parallel computing was slow - **Breaking change**: `as.sir()` with `parallel = TRUE` now requires a non-sequential `future::plan()` to be active before the call — e.g., `future::plan(future::multisession)` — and throws an informative error if none is set. - New all-core usage setup: when the number of AB columns is smaller than the number of available cores, rows are now split into batches so all cores stay active (row-batch mode). Previously, a 6-column dataset on a 16-core machine would only use 6 cores; now all 16 are used, with each worker processing a smaller row slice (lower per-worker memory pressure and processing time) + - `antibiogram()` and `wisca()` gained a `parallel` argument using the same `future`/`future.apply` pattern: for WISCA, Monte Carlo simulations are split into `(group, chunk)` job pairs distributed across workers; for grouped antibiograms, each group is processed by a separate worker (#281) * Integration with the *tidymodels* framework to allow seamless use of SIR, MIC and disk data in modelling pipelines via `recipes` - `step_mic_log2()` to transform `` columns with log2, and `step_sir_numeric()` to convert `` columns to numeric - New `tidyselect` helpers: diff --git a/R/antibiogram.R b/R/antibiogram.R index 6a8d086e5..623ecd3bc 100755 --- a/R/antibiogram.R +++ b/R/antibiogram.R @@ -65,6 +65,7 @@ #' @param simulations (for WISCA) a numerical value to set the number of Monte Carlo simulations. #' @param conf_interval A numerical value to set confidence interval (default is `0.95`). #' @param interval_side The side of the confidence interval, either `"two-tailed"` (default), `"left"` or `"right"`. +#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. Requires the [`future.apply`][future.apply::future_lapply()] package. For WISCA, Monte Carlo simulations are distributed across workers; for grouped antibiograms, each group is processed by a separate worker. **A non-sequential [future::plan()] must already be active before setting `parallel = TRUE`** — for example, `future::plan(future::multisession)`. An error is thrown if `parallel = TRUE` is used without a plan set by the user. #' @param info A [logical] to indicate info should be printed - the default is `TRUE` only in interactive mode. #' @param object An [antibiogram()] object. #' @param ... When used in [R Markdown or Quarto][knitr::kable()]: arguments passed on to [knitr::kable()] (otherwise, has no use). @@ -413,6 +414,7 @@ antibiogram <- function(x, conf_interval = 0.95, interval_side = "two-tailed", info = interactive(), + parallel = FALSE, ...) { UseMethod("antibiogram") } @@ -439,6 +441,7 @@ antibiogram.default <- function(x, conf_interval = 0.95, interval_side = "two-tailed", info = interactive(), + parallel = FALSE, ...) { meet_criteria(x, allow_class = "data.frame") x <- ascertain_sir_classes(x, "x") @@ -478,6 +481,7 @@ antibiogram.default <- function(x, meet_criteria(conf_interval, allow_class = c("numeric", "integer"), has_length = 1, is_finite = TRUE, is_positive = TRUE) meet_criteria(interval_side, allow_class = "character", has_length = 1, is_in = c("two-tailed", "left", "right")) meet_criteria(info, allow_class = "logical", has_length = 1) + meet_criteria(parallel, allow_class = "logical", has_length = 1) # try to find columns based on type if (is.null(col_mo)) { @@ -705,52 +709,113 @@ antibiogram.default <- function(x, wisca_parameters <- out - progress <- progress_ticker( - n = length(unique(wisca_parameters$group)) * simulations, - n_min = 25, - print = info, - title = paste("Calculating WISCA for", length(unique(wisca_parameters$group)), "regimens") - ) - on.exit(close(progress)) - - # run WISCA per group - for (group in unique(wisca_parameters$group)) { - params_current <- wisca_parameters[wisca_parameters$group == group, , drop = FALSE] - if (sum(params_current$n_tested, na.rm = TRUE) == 0) { - next - } - - # prepare priors - priors_current <- create_wisca_priors(params_current) - - # Monte Carlo simulations - coverage_simulations <- vapply( - FUN.VALUE = double(1), - seq_len(simulations), function(i) { - progress$tick() - simulate_coverage(priors_current) - } - ) - - # summarise results - coverage_mean <- mean(coverage_simulations) - - if (interval_side == "two-tailed") { - probs <- c((1 - conf_interval) / 2, 1 - (1 - conf_interval) / 2) - } else if (interval_side == "left") { - probs <- c(0, conf_interval) - } else if (interval_side == "right") { - probs <- c(1 - conf_interval, 1) - } - - coverage_ci <- unname(stats::quantile(coverage_simulations, probs = probs)) - - out_wisca$coverage[out_wisca$group == group] <- coverage_mean - out_wisca$lower_ci[out_wisca$group == group] <- coverage_ci[1] - out_wisca$upper_ci[out_wisca$group == group] <- coverage_ci[2] + # quantile probabilities are constant across all groups + probs <- if (interval_side == "two-tailed") { + c((1 - conf_interval) / 2, 1 - (1 - conf_interval) / 2) + } else if (interval_side == "left") { + c(0, conf_interval) + } else { + c(1 - conf_interval, 1) } - close(progress) + unique_groups <- unique(wisca_parameters$group) + + # parallel gate for WISCA — identical pattern to as.sir() + if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) { + if (isFALSE(parallel)) { + message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.") + } + parallel <- TRUE + } + if (isTRUE(parallel)) { + stop_ifnot( + requireNamespace("future.apply", quietly = TRUE), + "Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n", + "Install it with {.code install.packages(\"future.apply\")}." + ) + stop_if(inherits(future::plan(), "sequential"), + "Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n", + "For your system, you could first run: {.code library(future); ", + ifelse(.Platform$OS.type == "windows" || in_rstudio(), + "plan(multisession)", + "plan(multicore)" + ), + "}", + call = FALSE + ) + n_workers <- future::nbrOfWorkers() + } else { + n_workers <- 1L + } + + use_parallel_wisca <- isTRUE(parallel) && n_workers > 1L && length(unique_groups) > 0L + + if (use_parallel_wisca) { + if (isTRUE(info)) { + message_("Running WISCA in parallel mode using ", n_workers, " workers...", as_note = FALSE, appendLF = FALSE) + } + # chunks_per_group gives ~n_workers total jobs so all workers stay busy + # even when the number of regimens is smaller than n_workers + chunks_per_group <- max(1L, ceiling(n_workers / length(unique_groups))) + chunk_sizes <- diff(c(0L, round(seq_len(chunks_per_group) * simulations / chunks_per_group))) + + # precompute priors per group and build (group, chunk) job list + jobs <- unlist(lapply(unique_groups, function(g) { + params_g <- wisca_parameters[wisca_parameters$group == g, , drop = FALSE] + if (sum(params_g$n_tested, na.rm = TRUE) == 0L) return(NULL) + priors_g <- create_wisca_priors(params_g) + lapply(seq_along(chunk_sizes), function(ch) { + list(group = g, priors = priors_g, n_sims = chunk_sizes[ch]) + }) + }), recursive = FALSE) + jobs <- Filter(Negate(is.null), jobs) + + flat <- future.apply::future_lapply(jobs, function(job) { + vapply(FUN.VALUE = double(1), seq_len(job$n_sims), function(i) { + simulate_coverage(job$priors) + }) + }, future.seed = TRUE) + + # reassemble per group: concatenate chunks, then summarise + for (g in unique_groups) { + g_idx <- vapply(jobs, function(j) identical(j$group, g), logical(1)) + if (!any(g_idx)) next + sims <- unlist(flat[g_idx], use.names = FALSE) + out_wisca$coverage[out_wisca$group == g] <- mean(sims) + ci_vals <- unname(stats::quantile(sims, probs = probs)) + out_wisca$lower_ci[out_wisca$group == g] <- ci_vals[1] + out_wisca$upper_ci[out_wisca$group == g] <- ci_vals[2] + } + + if (isTRUE(info)) message_(font_green_bg(" DONE "), as_note = FALSE) + + } else { + progress <- progress_ticker( + n = length(unique_groups) * simulations, + n_min = 25, + print = info, + title = paste("Calculating WISCA for", length(unique_groups), "regimens") + ) + on.exit(close(progress), add = TRUE) + + for (group in unique_groups) { + params_current <- wisca_parameters[wisca_parameters$group == group, , drop = FALSE] + if (sum(params_current$n_tested, na.rm = TRUE) == 0) next + priors_current <- create_wisca_priors(params_current) + coverage_simulations <- vapply( + FUN.VALUE = double(1), + seq_len(simulations), function(i) { + progress$tick() + simulate_coverage(priors_current) + } + ) + out_wisca$coverage[out_wisca$group == group] <- mean(coverage_simulations) + ci_vals <- unname(stats::quantile(coverage_simulations, probs = probs)) + out_wisca$lower_ci[out_wisca$group == group] <- ci_vals[1] + out_wisca$upper_ci[out_wisca$group == group] <- ci_vals[2] + } + close(progress) + } # final output preparation out <- out_wisca @@ -997,30 +1062,50 @@ antibiogram.grouped_df <- function(x, conf_interval = 0.95, interval_side = "two-tailed", info = interactive(), + parallel = FALSE, ...) { stop_ifnot(is.null(mo_transform), "{.arg mo_transform} must not be set if creating an antibiogram using a grouped tibble. The groups will become the variables over which the antimicrobials are calculated, which could include the pathogen information (though not necessary). Nonetheless, this makes {.arg mo_transform} redundant.", call = FALSE) stop_ifnot(is.null(syndromic_group), "{.arg syndromic_group} must not be set if creating an antibiogram using a grouped tibble. The groups will become the variables over which the antimicrobials are calculated, making {.arg syndromic_group} redundant.", call = FALSE) + meet_criteria(parallel, allow_class = "logical", has_length = 1) + groups <- attributes(x)$groups n_groups <- NROW(groups) - progress <- progress_ticker( - n = n_groups, - n_min = 5, - print = info, - title = paste("Calculating AMR for", n_groups, "groups") - ) - on.exit(close(progress)) - out <- NULL - wisca_parameters <- NULL - long_numeric <- NULL - - for (i in seq_len(n_groups)) { - progress$tick() - rows <- unlist(groups[i, ]$.rows) - if (length(rows) == 0) { - next + # parallel gate — identical pattern to as.sir() + if (requireNamespace("future.apply", quietly = TRUE) && !inherits(future::plan(), "sequential")) { + if (isFALSE(parallel)) { + message_("Assuming {.code parallel = TRUE} since parallel computing has been set up using the {.pkg future} package before. Set {.help [{.fun plan}](future::plan)} to sequential to prevent this.") } - new_out <- antibiogram(as.data.frame(x)[rows, , drop = FALSE], + parallel <- TRUE + } + if (isTRUE(parallel)) { + stop_ifnot( + requireNamespace("future.apply", quietly = TRUE), + "Setting {.code parallel = TRUE} requires the {.pkg future.apply} package.\n", + "Install it with {.code install.packages(\"future.apply\")}." + ) + stop_if(inherits(future::plan(), "sequential"), + "Setting {.code parallel = TRUE} requires a non-sequential {.help [{.fun future::plan}](future::plan)} to be active.\n", + "For your system, you could first run: {.code library(future); ", + ifelse(.Platform$OS.type == "windows" || in_rstudio(), + "plan(multisession)", + "plan(multicore)" + ), + "}", + call = FALSE + ) + n_workers <- future::nbrOfWorkers() + } else { + n_workers <- 1L + } + + use_parallel <- isTRUE(parallel) && n_workers > 1L && n_groups > 1L + + x_df <- as.data.frame(x) + run_group <- function(i) { + rows <- unlist(groups[i, ]$.rows) + if (length(rows) == 0L) return(NULL) + antibiogram(x_df[rows, , drop = FALSE], antimicrobials = antimicrobials, mo_transform = NULL, ab_transform = ab_transform, @@ -1040,12 +1125,42 @@ antibiogram.grouped_df <- function(x, conf_interval = conf_interval, interval_side = interval_side, info = FALSE, - ... + parallel = FALSE # never nest parallelism in workers ) + } + + if (use_parallel) { + if (isTRUE(info)) { + message_("Running antibiogram for ", n_groups, " groups in parallel using ", n_workers, " workers...", as_note = FALSE, appendLF = FALSE) + } + results_raw <- future.apply::future_lapply(seq_len(n_groups), run_group, future.seed = TRUE) + if (isTRUE(info)) message_(font_green_bg(" DONE "), as_note = FALSE) + } else { + progress <- progress_ticker( + n = n_groups, + n_min = 5, + print = info, + title = paste("Calculating AMR for", n_groups, "groups") + ) + on.exit(close(progress), add = TRUE) + results_raw <- vector("list", n_groups) + for (i in seq_len(n_groups)) { + progress$tick() + results_raw[[i]] <- run_group(i) + } + close(progress) + } + + out <- NULL + wisca_parameters <- NULL + long_numeric <- NULL + + for (i in seq_len(n_groups)) { + new_out <- results_raw[[i]] new_wisca_parameters <- attributes(new_out)$wisca_parameters new_long_numeric <- attributes(new_out)$long_numeric - if (NROW(new_out) == 0) { + if (is.null(new_out) || NROW(new_out) == 0) { next } @@ -1071,8 +1186,7 @@ antibiogram.grouped_df <- function(x, new_long_numeric <- new_long_numeric[, c(col_name, setdiff(names(new_long_numeric), col_name))] # set place to 1st col } - if (i == 1) { - # the first go + if (is.null(out)) { out <- new_out wisca_parameters <- new_wisca_parameters long_numeric <- new_long_numeric @@ -1083,8 +1197,6 @@ antibiogram.grouped_df <- function(x, } } - close(progress) - out <- structure(as_original_data_class(out, class(x), extra_class = "antibiogram"), has_syndromic_group = FALSE, combine_SI = isTRUE(combine_SI), @@ -1116,6 +1228,7 @@ wisca <- function(x, conf_interval = 0.95, interval_side = "two-tailed", info = interactive(), + parallel = FALSE, ...) { antibiogram( x = x, @@ -1137,6 +1250,7 @@ wisca <- function(x, conf_interval = conf_interval, interval_side = interval_side, info = info, + parallel = parallel, ... ) } diff --git a/tests/testthat/test-antibiogram.R b/tests/testthat/test-antibiogram.R index 6f1a765a7..f7a74f221 100644 --- a/tests/testthat/test-antibiogram.R +++ b/tests/testthat/test-antibiogram.R @@ -130,6 +130,74 @@ test_that("test-antibiogram.R", { expect_equal(colnames(ab9), c("ward", "gender", "Piperacillin/tazobactam", "Piperacillin/tazobactam + Gentamicin", "Piperacillin/tazobactam + Tobramycin")) } + # Parallel computing ---------------------------------------------------- + # Tests must pass even when only 1 core is available; parallel = TRUE then + # silently falls back to sequential, but results must still be correct. + + if (AMR:::pkg_is_available("future.apply")) { + set.seed(42) + + # sequential reference for WISCA + wisca_seq <- suppressWarnings(suppressMessages( + wisca(example_isolates, antimicrobials = c("TZP", "TZP+TOB", "TZP+GEN"), simulations = 100, info = FALSE) + )) + + future::plan(future::multicore) + + # 1. parallel = TRUE produces the same antibiogram structure as sequential + wisca_par <- suppressWarnings(suppressMessages( + wisca(example_isolates, antimicrobials = c("TZP", "TZP+TOB", "TZP+GEN"), simulations = 100, parallel = TRUE, info = FALSE) + )) + expect_inherits(wisca_par, "antibiogram") + expect_equal(colnames(wisca_par), colnames(wisca_seq)) + expect_true(isTRUE(attributes(wisca_par)$wisca)) + + # 2. coverage values fall within [0, 100] (basic sanity) + ln <- attributes(wisca_par)$long_numeric + expect_true(all(ln$coverage >= 0 & ln$coverage <= 1, na.rm = TRUE)) + expect_true(all(ln$lower_ci <= ln$coverage, na.rm = TRUE)) + expect_true(all(ln$upper_ci >= ln$coverage, na.rm = TRUE)) + + # 3. a second parallel run gives the same column names + wisca_par2 <- suppressWarnings(suppressMessages( + wisca(example_isolates, antimicrobials = c("TZP", "TZP+TOB", "TZP+GEN"), simulations = 100, parallel = TRUE, info = FALSE) + )) + expect_equal(colnames(wisca_par), colnames(wisca_par2)) + + # 4. parallel with workers = 1 gives same structure as sequential + future::plan(future::multicore, workers = 1) + wisca_par1 <- suppressWarnings(suppressMessages( + wisca(example_isolates, antimicrobials = c("TZP", "TZP+TOB", "TZP+GEN"), simulations = 100, parallel = TRUE, info = FALSE) + )) + expect_equal(colnames(wisca_seq), colnames(wisca_par1)) + + # 5. grouped antibiogram in parallel yields identical structure to sequential + if (AMR:::pkg_is_available("dplyr", min_version = "1.0.0", also_load = TRUE)) { + future::plan(future::sequential) + ab_grp_seq <- suppressWarnings(suppressMessages( + example_isolates %>% + group_by(ward) %>% + wisca(antimicrobials = c("TZP", "TZP+TOB"), simulations = 50, info = FALSE) + )) + future::plan(future::multicore) + ab_grp_par <- suppressWarnings(suppressMessages( + example_isolates %>% + group_by(ward) %>% + wisca(antimicrobials = c("TZP", "TZP+TOB"), simulations = 50, parallel = TRUE, info = FALSE) + )) + expect_equal(colnames(ab_grp_seq), colnames(ab_grp_par)) + expect_equal(nrow(ab_grp_seq), nrow(ab_grp_par)) + } + + # 6. parallel = TRUE without a plan raises an informative error + future::plan(future::sequential) + expect_error( + suppressWarnings(wisca(example_isolates, antimicrobials = "TZP", parallel = TRUE, info = FALSE)), + "non-sequential" + ) + + future::plan(future::sequential) + } # Generate plots with ggplot2 or base R --------------------------------