From 060449e23401018abaa530431ad4bf1516c3d66b Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 22:01:09 +0000 Subject: [PATCH] Optimise parallel as.sir(): row-batch mode when n_cols < n_cores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously parallel dispatch only parallelised by column, so a 6-column dataset on a 16-core machine used at most 6 cores with the other 10 idle. For large n this also caused memory-bandwidth saturation (each worker did a full n-row scan of clinical_breakpoints simultaneously). New row-batch mode (fork path, R >= 4.0, non-Windows): pieces_per_col = ceil(n_cores / n_cols) Jobs = n_cols × pieces_per_col (≈ n_cores jobs total) Each job: one column × one row slice Benefits: - All cores stay busy regardless of column count - Per-worker memory footprint shrinks by pieces_per_col × - Breakpoints lookup cache pressure reduced per worker PSOCK path (Windows / R < 4.0) is unchanged: per-job serialisation overhead makes row batching unprofitable there. run_as_sir_column() gains an optional `rows` parameter (NULL = all rows, backward-compatible). Results are reassembled via as.sir(c(as.character(.))) which is safe for already-clean SIR values. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR --- NEWS.md | 1 + R/sir.R | 78 +++++++++++++++++++++++++++++++-------- tests/testthat/test-sir.R | 15 ++++++++ 3 files changed, 78 insertions(+), 16 deletions(-) diff --git a/NEWS.md b/NEWS.md index ec7274c6b..3262d65ab 100644 --- a/NEWS.md +++ b/NEWS.md @@ -37,6 +37,7 @@ * Fixed BRMO classification by including bacterial complexes (#275) * Fixed `as.sir()` for data frames silently deleting columns whose AB class was already `` when called a second time (re-running on already-converted data) (#278) * Fixed `as.sir()` for data frames incorrectly treating metadata columns (e.g. `patient`, `ward`) as antibiotic columns when their names coincidentally matched an antibiotic code; column content is now validated against AMR data patterns before inclusion +* Improved parallel computing in `as.sir()`: when the number of AB columns is smaller than the number of available cores, rows are now split into batches so all cores stay active (row-batch mode). Previously, a 6-column dataset on a 16-core machine would only use 6 cores; now all 16 are used, with each worker processing a smaller row slice (lower per-worker memory pressure) ### Updates * Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265) diff --git a/R/sir.R b/R/sir.R index 456f15fe0..024ab34e4 100755 --- a/R/sir.R +++ b/R/sir.R @@ -952,7 +952,22 @@ as.sir.data.frame <- function(x, is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1 effective_info <- if (is_parallel_run) FALSE else info - run_as_sir_column <- function(i) { + # Row-batch mode: when n_cols < n_cores we would leave cores idle under plain + # column-parallel dispatch. Instead we split rows into pieces so every core + # gets work. pieces_per_col = ceil(n_cores / n_cols) gives ~n_cores jobs + # total; each job processes one column on one row slice, which also reduces + # per-worker memory pressure (smaller breakpoints search space). + # Only used for the fork path (R >= 4.0, non-Windows); PSOCK clusters already + # incur high per-job serialisation overhead so we keep column-mode there. + use_fork <- is_parallel_run && + !(.Platform$OS.type == "windows" || getRversion() < "4.0.0") + pieces_per_col <- if (use_fork && length(ab_cols) < n_cores) { + ceiling(n_cores / length(ab_cols)) + } else { + 1L + } + + run_as_sir_column <- function(i, rows = NULL) { # Always resolve AMR_env from the package namespace. This is essential for # PSOCK workers (where the closure-captured AMR_env is a stale serialised copy # while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing @@ -967,14 +982,17 @@ as.sir.data.frame <- function(x, ab_col <- ab_cols[i] out <- list(result = NULL, log = NULL) + # row subsetting: NULL means all rows (column-mode), otherwise row-batch mode + row_idx <- if (is.null(rows)) seq_len(nrow(x)) else rows + if (types[i] == "mic") { result <- as.sir( - as.mic(as.character(x[, ab_col, drop = TRUE])), - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], + as.mic(as.character(x[row_idx, ab_col, drop = TRUE])), + mo = x_mo[row_idx], + mo.bak = x[row_idx, col_mo, drop = TRUE], ab = ab_col, guideline = guideline, - uti = uti, + uti = if (length(uti) > 1L) uti[row_idx] else uti, capped_mic_handling = capped_mic_handling, as_wt_nwt = as_wt_nwt, add_intrinsic_resistance = add_intrinsic_resistance, @@ -983,7 +1001,7 @@ as.sir.data.frame <- function(x, include_screening = include_screening, include_PKPD = include_PKPD, breakpoint_type = breakpoint_type, - host = host, + host = if (length(host) > 1L) host[row_idx] else host, verbose = verbose, info = effective_info, conserve_capped_values = conserve_capped_values, @@ -1004,12 +1022,12 @@ as.sir.data.frame <- function(x, return(out) } else if (types[i] == "disk") { result <- as.sir( - as.disk(as.character(x[, ab_col, drop = TRUE])), - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], + as.disk(as.character(x[row_idx, ab_col, drop = TRUE])), + mo = x_mo[row_idx], + mo.bak = x[row_idx, col_mo, drop = TRUE], ab = ab_col, guideline = guideline, - uti = uti, + uti = if (length(uti) > 1L) uti[row_idx] else uti, as_wt_nwt = as_wt_nwt, add_intrinsic_resistance = add_intrinsic_resistance, reference_data = reference_data, @@ -1017,7 +1035,7 @@ as.sir.data.frame <- function(x, include_screening = include_screening, include_PKPD = include_PKPD, breakpoint_type = breakpoint_type, - host = host, + host = if (length(host) > 1L) host[row_idx] else host, verbose = verbose, info = effective_info, is_data.frame = TRUE @@ -1039,7 +1057,7 @@ as.sir.data.frame <- function(x, ab <- ab_col ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE)) show_message <- FALSE - if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { + if (!all(x[row_idx, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { show_message <- TRUE if (isTRUE(effective_info)) { message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", @@ -1060,7 +1078,7 @@ as.sir.data.frame <- function(x, ) } } - result <- as.sir(as.character(x[, ab, drop = TRUE])) + result <- as.sir(as.character(x[row_idx, ab, drop = TRUE])) if (show_message == TRUE && isTRUE(effective_info)) { message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE) } @@ -1075,10 +1093,14 @@ as.sir.data.frame <- function(x, if (isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1) { if (isTRUE(info)) { message_(as_note = FALSE) - message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(font_bold(ab_cols, collapse = NULL), quotes = "'", sort = FALSE), "...", as_note = FALSE, appendLF = FALSE) + if (pieces_per_col > 1L) { + message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(font_bold(ab_cols, collapse = NULL), quotes = "'", sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE) + } else { + message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(font_bold(ab_cols, collapse = NULL), quotes = "'", sort = FALSE), "...", as_note = FALSE, appendLF = FALSE) + } } if (.Platform$OS.type == "windows" || getRversion() < "4.0.0") { - # `cl` has been created in the part above before the `run_as_sir_column` function + # PSOCK cluster: column-mode only (row-batch serialisation overhead not worth it) on.exit(parallel::stopCluster(cl), add = TRUE) parallel::clusterExport(cl, varlist = c( "x", "x.bak", "x_mo", "ab_cols", "types", @@ -1090,8 +1112,32 @@ as.sir.data.frame <- function(x, "run_as_sir_column" ), envir = environment()) result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column) + } else if (pieces_per_col > 1L) { + # Row-batch mode (R >= 4.0, non-Windows, n_cols < n_cores): + # build (col, row_slice) job pairs so all cores stay active + row_cuts <- unique(round(seq(0, nrow(x), length.out = pieces_per_col + 1L))) + row_ranges <- lapply(seq_len(length(row_cuts) - 1L), function(p) { + seq.int(row_cuts[p] + 1L, row_cuts[p + 1L]) + }) + jobs <- do.call(c, lapply(seq_along(ab_cols), function(ci) { + lapply(seq_along(row_ranges), function(p) list(col = ci, rows = row_ranges[[p]])) + })) + flat <- parallel::mclapply(jobs, function(job) { + run_as_sir_column(job$col, job$rows) + }, mc.cores = n_cores) + # Reassemble: for each column concatenate row pieces in order + result_list <- lapply(seq_along(ab_cols), function(ci) { + pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))] + list( + result = as.sir(do.call(c, lapply(pieces, function(p) as.character(p$result)))), + log = { + logs <- Filter(Negate(is.null), lapply(pieces, function(p) p$log)) + if (length(logs) > 0L) do.call(rbind_AMR, logs) else NULL + } + ) + }) } else { - # R>=4.0 on unix + # Column-parallel mode (R >= 4.0, non-Windows, n_cols >= n_cores) result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores) } if (isTRUE(info)) { diff --git a/tests/testthat/test-sir.R b/tests/testthat/test-sir.R index 68fbe467e..160f9a357 100644 --- a/tests/testthat/test-sir.R +++ b/tests/testthat/test-sir.R @@ -502,6 +502,21 @@ test_that("test-sir.R", { sir_single_par <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE, parallel = TRUE)) expect_identical(sir_single_seq[["AMC"]], sir_single_par[["AMC"]]) + # 9. row-batch mode (n_cols < n_cores): force row splitting via max_cores and + # verify identical output to sequential for a dataset with 2 AB columns so + # pieces_per_col = ceiling(max_cores / 2) >= 2 and row batching activates + df_wide <- data.frame( + mo = "B_ESCHR_COLI", + AMC = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)), + GEN = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)), + stringsAsFactors = FALSE + ) + sir_wide_seq <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE)) + sir_wide_par <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE, + parallel = TRUE, max_cores = 8L)) + expect_identical(sir_wide_seq[["AMC"]], sir_wide_par[["AMC"]]) + expect_identical(sir_wide_seq[["GEN"]], sir_wide_par[["GEN"]]) + # 8. info = TRUE with parallel does not produce per-column worker messages # (messages should only appear in the main process, not duplicated from workers) msgs <- capture.output(