From 19157ce71855a08c3b8c1e3874feadeb030016cd Mon Sep 17 00:00:00 2001 From: Matthijs Berends <31037261+msberends@users.noreply.github.com> Date: Sat, 25 Apr 2026 00:34:38 +0200 Subject: [PATCH] Fix parallel computing in as.sir.data.frame (#276) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix parallel computing in as.sir.data.frame Six bugs in parallel = TRUE mode: 1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR)) with a graceful fallback to sequential when the package cannot be loaded (e.g. dev-only load_all() environments). 2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale copy, so the captured log was always wrong. Fixed by resolving AMR_env dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the worker function, and removing AMR_env from clusterExport. 3. In the fork-based (mclapply) path each worker inherited the parent's full sir_interpretation_history. Capturing the whole log then combining across workers duplicated every pre-existing entry. Fixed by recording the log row count before the as.sir() call and slicing only the new rows afterwards. 4. run_as_sir_column used non-exported internals (%pm>%, pm_pull, as.sir.default) that are inaccessible on PSOCK workers after library(AMR). Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE])) and as.disk(...) calls, and changed as.sir.default() to as.sir() which dispatches correctly via S3. 5. With info = TRUE, worker forks printed per-column progress messages simultaneously, producing garbled interleaved console output. Per-column messages are now suppressed inside workers (effective_info = FALSE) while the outer "Running in parallel" / "DONE" messages still appear. 6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  . https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Add parallel computing tests to test-sir.R Eight targeted tests verify correctness of the parallel as.sir() path: identical SIR output vs sequential, matching log row counts, no pre-existing history duplication, reproducibility across runs, results consistency across max_cores values, single-column fallback, and no per-column worker messages leaking when info = TRUE. All pass when only 1 core is available (parallel silently falls back to sequential). https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Fix as.sir() data.frame: preserve already- columns, exclude metadata Issue #278: two related bugs in the column-detection / type-assignment pipeline. Bug 1 – already- columns deleted on re-run Line 886 excluded already-sir columns from the type assignment (they stayed type "") causing the result loop to do x[,col] <- NULL, deleting them. Fix: drop the !is.sir() guard so all untyped columns fall through to type "sir" and are re-processed correctly. Bug 2 – metadata columns treated as antibiotics as.ab("patient") -> OXY, as.ab("ward") -> PRU. The column detector accepted any column whose name matched an antibiotic code, regardless of content. Fix: for name-matched columns that do not already carry an AMR class, also verify content looks like AMR data (all_valid_mics, all- numeric, or any SIR-like string). all_valid_disks() is intentionally avoided here because it strips letters from strings (as.disk("Pt_1")==1). Also adds tools/benchmark_parallel.R: a standalone script that times sequential vs parallel as.sir() across n=20/200/2000/20000 rows and saves a ggplot2 PNG to tools/benchmark_parallel.png. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Update benchmark: two-panel script with warm-up and column-count sweep Previous single-panel benchmark was misleading: the first sequential run paid one-time cache-warm-up cost (skewing n=20), and only 6 columns were used so only 6 cores were ever active on a 16-core machine. New two-panel design: Left – vary rows with 16 fixed AB columns (shows memory-bandwidth saturation for large n) Right – vary columns with fixed rows (shows the real speedup profile: parallel wins when n_cols >> 1) Also adds a warm-up pass before measurements to eliminate first-call bias. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Optimise parallel as.sir(): row-batch mode when n_cols < n_cores Previously parallel dispatch only parallelised by column, so a 6-column dataset on a 16-core machine used at most 6 cores with the other 10 idle. For large n this also caused memory-bandwidth saturation (each worker did a full n-row scan of clinical_breakpoints simultaneously). New row-batch mode (fork path, R >= 4.0, non-Windows): pieces_per_col = ceil(n_cores / n_cols) Jobs = n_cols × pieces_per_col (≈ n_cores jobs total) Each job: one column × one row slice Benefits: - All cores stay busy regardless of column count - Per-worker memory footprint shrinks by pieces_per_col × - Breakpoints lookup cache pressure reduced per worker PSOCK path (Windows / R < 4.0) is unchanged: per-job serialisation overhead makes row batching unprofitable there. run_as_sir_column() gains an optional `rows` parameter (NULL = all rows, backward-compatible). Results are reassembled via as.sir(c(as.character(.))) which is safe for already-clean SIR values. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Fix info=FALSE ignored when no breakpoints found in as_sir_method Operator-precedence bug at line 1601: if (isTRUE(info) && nrow(df_unique) < 10 || nrow(breakpoints) == 0) R evaluates && before ||, so this was equivalent to: (isTRUE(info) && nrow(df_unique) < 10) || (nrow(breakpoints) == 0) When nrow(breakpoints) == 0 (e.g. cefoxitin / flucloxacillin / mupirocin against E. coli in EUCAST) the intro message was always printed regardless of info. Fix: add parentheses so info gates both conditions: isTRUE(info) && (nrow(df_unique) < 10 || nrow(breakpoints) == 0) Also pass print = isTRUE(info) to progress_ticker so the progress bar (which prints intro_txt as its title) is suppressed when info = FALSE. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Fix cli formatting in as.sir() messages - stop_if for empty ab_cols: wrap as.mic() and as.disk() in {.help [{.fun ...}](...)} for clickable links in cli output - Parallel mode message: use {.field col} formatting for column names and quotes = FALSE in vector_and(), consistent with the rest of the codebase (avoids double-quoting from both font_bold and quotes="'") https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Use font_bold() inside {.field} for column names in parallel message Convention: paste0("{.field ", font_bold(col), "}") gives bold green column names without quotation marks, consistent with the rest of the codebase (e.g. the 'Cleaning values' message in run_as_sir_column). https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Add collapse = NULL to font_bold() for column name vectors font_bold() without collapse = NULL joins a vector with "" into a single string, breaking paste0() element-wise formatting for length > 1 vectors. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR * Add tools/ to .Rbuildignore Keeps the benchmark script out of the built package tarball. https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR --------- Co-authored-by: Claude --- .Rbuildignore | 1 + DESCRIPTION | 4 +- NEWS.md | 7 +- R/sir.R | 254 ++++++++++++++++++++++++++----------- tests/testthat/test-sir.R | 146 ++++++++++++++++----- tools/benchmark_parallel.R | 115 +++++++++++++++++ 6 files changed, 420 insertions(+), 107 deletions(-) create mode 100644 tools/benchmark_parallel.R diff --git a/.Rbuildignore b/.Rbuildignore index 7b1fc9398..89b14e09c 100755 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -41,4 +41,5 @@ ^CRAN-SUBMISSION$ ^PythonPackage$ ^README\.Rmd$ +^tools$ \.no_include$ diff --git a/DESCRIPTION b/DESCRIPTION index db313c43a..81618ee49 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: AMR -Version: 3.0.1.9048 -Date: 2026-04-22 +Version: 3.0.1.9050 +Date: 2026-04-24 Title: Antimicrobial Resistance Data Analysis Description: Functions to simplify and standardise antimicrobial resistance (AMR) data analysis and to work with microbial and antimicrobial properties by diff --git a/NEWS.md b/NEWS.md index c5c7ad602..b8c7a5ecb 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# AMR 3.0.1.9048 +# AMR 3.0.1.9050 ### New * Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations. @@ -21,6 +21,7 @@ * Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values ### Fixes +* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message * Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter * Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved * Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245) @@ -34,6 +35,10 @@ * Fixed SIR and MIC coercion of combined values, e.g. `as.sir("<= 0.002; S") ` or `as.mic("S; 0.002")` (#252) * Fixed translation of foreign languages in `sir_df()` (#272) * Fixed BRMO classification by including bacterial complexes (#275) +* Fixed `as.sir()` for data frames silently deleting columns whose AB class was already `` when called a second time (re-running on already-converted data) (#278) +* Fixed `as.sir()` for data frames incorrectly treating metadata columns (e.g. `patient`, `ward`) as antibiotic columns when their names coincidentally matched an antibiotic code; column content is now validated against AMR data patterns before inclusion +* Improved parallel computing in `as.sir()`: when the number of AB columns is smaller than the number of available cores, rows are now split into batches so all cores stay active (row-batch mode). Previously, a 6-column dataset on a 16-core machine would only use 6 cores; now all 16 are used, with each worker processing a smaller row slice (lower per-worker memory pressure) +* Fixed `as.sir()` ignoring `info = FALSE` for columns with no breakpoints (e.g. cefoxitin against *E. coli*): an operator-precedence bug (`&&`/`||`) caused the "Interpreting MIC values" intro message to fire unconditionally when `nrow(breakpoints) == 0`, regardless of `info`; the progress bar title was also not gated by `info` ### Updates * Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265) diff --git a/R/sir.R b/R/sir.R index 4020acf2e..cf4728940 100755 --- a/R/sir.R +++ b/R/sir.R @@ -716,7 +716,7 @@ as.sir.disk <- function(x, } #' @rdname as.sir -#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used. +#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows. #' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. #' @export as.sir.data.frame <- function(x, @@ -852,7 +852,6 @@ as.sir.data.frame <- function(x, i <- 0 ab_cols <- colnames(x)[vapply(FUN.VALUE = logical(1), x, function(y) { i <<- i + 1 - check <- is.mic(y) | is.disk(y) ab <- colnames(x)[i] if (!is.null(col_mo) && ab == col_mo) { return(FALSE) @@ -861,13 +860,30 @@ as.sir.data.frame <- function(x, return(FALSE) } if (length(sel) == 0 || (length(sel) > 0 && ab %in% sel)) { + # columns already carrying an AMR class are always included + y_bak <- x.bak[, ab, drop = TRUE] + if (is.mic(y_bak) || is.disk(y_bak) || is.sir(y_bak)) { + return(TRUE) + } ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE)) if (is.na(ab_coerced) || (length(sel) > 0 & !ab %in% sel)) { # not even a valid AB code return(FALSE) - } else { - return(TRUE) } + # Name matches an antibiotic; also verify column content resembles AMR + # data. This prevents false positives on metadata columns whose names + # happen to match a drug code (e.g. 'patient' -> OXY, 'ward' -> PRU). + # Note: all_valid_disks() is intentionally avoided here because it strips + # non-numeric characters (as.disk("Pt_1") == 1), accepting patient IDs. + y_char <- tryCatch(as.character(y), error = function(e) character(0)) + y_valid <- y_char[!is.na(y_char) & nzchar(trimws(y_char))] + if (length(y_valid) == 0L) { + return(FALSE) + } + y_numeric <- suppressWarnings(as.numeric(y_valid)) + all_valid_mics(y) || + all(!is.na(y_numeric)) || + any(y_valid %in% c("S", "SDD", "I", "R", "NI")) } else { return(FALSE) } @@ -875,7 +891,7 @@ as.sir.data.frame <- function(x, stop_if( length(ab_cols) == 0, - "no columns with MIC values, disk zones or antibiotic column names found in this data set. Use as.mic() or as.disk() to transform antimicrobial columns." + "no columns with MIC values, disk zones or antibiotic column names found in this data set. Use {.help [{.fun as.mic}](AMR::as.mic)} or {.help [{.fun as.disk}](AMR::as.disk)} to transform antimicrobial columns." ) # set type per column types <- character(length(ab_cols)) @@ -883,7 +899,7 @@ as.sir.data.frame <- function(x, types[vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.mic)] <- "mic" types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_disks)] <- "disk" types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_mics)] <- "mic" - types[types == "" & !vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.sir)] <- "sir" + types[types == ""] <- "sir" if (any(types %in% c("mic", "disk"), na.rm = TRUE)) { # now we need an mo column stop_if(is.null(col_mo), "{.arg col_mo} must be set") @@ -906,6 +922,21 @@ as.sir.data.frame <- function(x, return(NULL) } ) + if (!is.null(cl)) { + # Each PSOCK worker is a fresh R session — the AMR package must be loaded there + # so all exported functions (as.sir, as.mic, as.disk, ...) are available. + amr_loaded_on_workers <- tryCatch({ + parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE)) + TRUE + }, error = function(e) FALSE) + if (!amr_loaded_on_workers) { + if (isTRUE(info)) { + message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.") + } + parallel::stopCluster(cl) + cl <- NULL + } + } if (is.null(cl)) { n_cores <- 1 } @@ -916,93 +947,139 @@ as.sir.data.frame <- function(x, message_("Processing columns:", as_note = FALSE) } - run_as_sir_column <- function(i) { + # In parallel mode suppress per-column messages: workers print simultaneously and + # their output would be interleaved on the console. + is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1 + effective_info <- if (is_parallel_run) FALSE else info + + # Row-batch mode: when n_cols < n_cores we would leave cores idle under plain + # column-parallel dispatch. Instead we split rows into pieces so every core + # gets work. pieces_per_col = ceil(n_cores / n_cols) gives ~n_cores jobs + # total; each job processes one column on one row slice, which also reduces + # per-worker memory pressure (smaller breakpoints search space). + # Only used for the fork path (R >= 4.0, non-Windows); PSOCK clusters already + # incur high per-job serialisation overhead so we keep column-mode there. + use_fork <- is_parallel_run && + !(.Platform$OS.type == "windows" || getRversion() < "4.0.0") + pieces_per_col <- if (use_fork && length(ab_cols) < n_cores) { + ceiling(n_cores / length(ab_cols)) + } else { + 1L + } + + run_as_sir_column <- function(i, rows = NULL) { + # Always resolve AMR_env from the package namespace. This is essential for + # PSOCK workers (where the closure-captured AMR_env is a stale serialised copy + # while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing + # pre-existing log entries from earlier in the session when forking. + .amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE) + # In parallel mode each worker (fork or PSOCK) has its own copy of the + # history; record the current length so we capture only the new rows added + # by the as.sir() call below, not any pre-existing entries inherited at fork + # time or carried over from earlier as.sir() calls. + if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history) + ab_col <- ab_cols[i] out <- list(result = NULL, log = NULL) + # row subsetting: NULL means all rows (column-mode), otherwise row-batch mode + row_idx <- if (is.null(rows)) seq_len(nrow(x)) else rows + if (types[i] == "mic") { - result <- x %pm>% - pm_pull(ab_col) %pm>% - as.character() %pm>% - as.mic() %pm>% - as.sir( - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], - ab = ab_col, - guideline = guideline, - uti = uti, - capped_mic_handling = capped_mic_handling, - as_wt_nwt = as_wt_nwt, - add_intrinsic_resistance = add_intrinsic_resistance, - reference_data = reference_data, - substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, - include_screening = include_screening, - include_PKPD = include_PKPD, - breakpoint_type = breakpoint_type, - host = host, - verbose = verbose, - info = info, - conserve_capped_values = conserve_capped_values, - is_data.frame = TRUE - ) + result <- as.sir( + as.mic(as.character(x[row_idx, ab_col, drop = TRUE])), + mo = x_mo[row_idx], + mo.bak = x[row_idx, col_mo, drop = TRUE], + ab = ab_col, + guideline = guideline, + uti = if (length(uti) > 1L) uti[row_idx] else uti, + capped_mic_handling = capped_mic_handling, + as_wt_nwt = as_wt_nwt, + add_intrinsic_resistance = add_intrinsic_resistance, + reference_data = reference_data, + substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, + include_screening = include_screening, + include_PKPD = include_PKPD, + breakpoint_type = breakpoint_type, + host = if (length(host) > 1L) host[row_idx] else host, + verbose = verbose, + info = effective_info, + conserve_capped_values = conserve_capped_values, + is_data.frame = TRUE + ) out$result <- result - out$log <- AMR_env$sir_interpretation_history - AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log + if (is_parallel_run) { + full_log <- .amr_env$sir_interpretation_history + out$log <- if (pre_log_n < NROW(full_log)) { + full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE] + } else { + full_log[0L, , drop = FALSE] + } + } else { + out$log <- .amr_env$sir_interpretation_history + .amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE] + } return(out) } else if (types[i] == "disk") { - result <- x %pm>% - pm_pull(ab_col) %pm>% - as.character() %pm>% - as.disk() %pm>% - as.sir( - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], - ab = ab_col, - guideline = guideline, - uti = uti, - as_wt_nwt = as_wt_nwt, - add_intrinsic_resistance = add_intrinsic_resistance, - reference_data = reference_data, - substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, - include_screening = include_screening, - include_PKPD = include_PKPD, - breakpoint_type = breakpoint_type, - host = host, - verbose = verbose, - info = info, - is_data.frame = TRUE - ) + result <- as.sir( + as.disk(as.character(x[row_idx, ab_col, drop = TRUE])), + mo = x_mo[row_idx], + mo.bak = x[row_idx, col_mo, drop = TRUE], + ab = ab_col, + guideline = guideline, + uti = if (length(uti) > 1L) uti[row_idx] else uti, + as_wt_nwt = as_wt_nwt, + add_intrinsic_resistance = add_intrinsic_resistance, + reference_data = reference_data, + substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, + include_screening = include_screening, + include_PKPD = include_PKPD, + breakpoint_type = breakpoint_type, + host = if (length(host) > 1L) host[row_idx] else host, + verbose = verbose, + info = effective_info, + is_data.frame = TRUE + ) out$result <- result - out$log <- AMR_env$sir_interpretation_history - AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] + if (is_parallel_run) { + full_log <- .amr_env$sir_interpretation_history + out$log <- if (pre_log_n < NROW(full_log)) { + full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE] + } else { + full_log[0L, , drop = FALSE] + } + } else { + out$log <- .amr_env$sir_interpretation_history + .amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE] + } return(out) } else if (types[i] == "sir") { ab <- ab_col ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE)) show_message <- FALSE - if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { + if (!all(x[row_idx, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { show_message <- TRUE - if (isTRUE(info)) { - message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", + if (isTRUE(effective_info)) { + message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), - ab_name(ab_coerced, tolower = TRUE, info = info), ")... ", + ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ", appendLF = FALSE, as_note = FALSE ) } } else if (!is.sir(x.bak[, ab, drop = TRUE])) { show_message <- TRUE - if (isTRUE(info)) { - message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", + if (isTRUE(effective_info)) { + message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), - ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ", + ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ", appendLF = FALSE, as_note = FALSE ) } } - result <- as.sir.default(x = as.character(x[, ab, drop = TRUE])) - if (show_message == TRUE && isTRUE(info)) { + result <- as.sir(as.character(x[row_idx, ab, drop = TRUE])) + if (show_message == TRUE && isTRUE(effective_info)) { message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE) } out$result <- result @@ -1016,26 +1093,55 @@ as.sir.data.frame <- function(x, if (isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1) { if (isTRUE(info)) { message_(as_note = FALSE) - message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(font_bold(ab_cols, collapse = NULL), quotes = "'", sort = FALSE), "...", as_note = FALSE, appendLF = FALSE) + if (pieces_per_col > 1L) { + message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE) + } else { + message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE) + } } if (.Platform$OS.type == "windows" || getRversion() < "4.0.0") { - # `cl` has been created in the part above before the `run_as_sir_column` function + # PSOCK cluster: column-mode only (row-batch serialisation overhead not worth it) on.exit(parallel::stopCluster(cl), add = TRUE) parallel::clusterExport(cl, varlist = c( "x", "x.bak", "x_mo", "ab_cols", "types", "capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance", "reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD", - "breakpoint_type", "guideline", "host", "uti", "info", "verbose", - "col_mo", "AMR_env", "conserve_capped_values", + "breakpoint_type", "guideline", "host", "uti", "verbose", + "col_mo", "conserve_capped_values", + "effective_info", "is_parallel_run", "run_as_sir_column" ), envir = environment()) result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column) + } else if (pieces_per_col > 1L) { + # Row-batch mode (R >= 4.0, non-Windows, n_cols < n_cores): + # build (col, row_slice) job pairs so all cores stay active + row_cuts <- unique(round(seq(0, nrow(x), length.out = pieces_per_col + 1L))) + row_ranges <- lapply(seq_len(length(row_cuts) - 1L), function(p) { + seq.int(row_cuts[p] + 1L, row_cuts[p + 1L]) + }) + jobs <- do.call(c, lapply(seq_along(ab_cols), function(ci) { + lapply(seq_along(row_ranges), function(p) list(col = ci, rows = row_ranges[[p]])) + })) + flat <- parallel::mclapply(jobs, function(job) { + run_as_sir_column(job$col, job$rows) + }, mc.cores = n_cores) + # Reassemble: for each column concatenate row pieces in order + result_list <- lapply(seq_along(ab_cols), function(ci) { + pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))] + list( + result = as.sir(do.call(c, lapply(pieces, function(p) as.character(p$result)))), + log = { + logs <- Filter(Negate(is.null), lapply(pieces, function(p) p$log)) + if (length(logs) > 0L) do.call(rbind_AMR, logs) else NULL + } + ) + }) } else { - # R>=4.0 on unix + # Column-parallel mode (R >= 4.0, non-Windows, n_cols >= n_cores) result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores) } if (isTRUE(info)) { - message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE) + message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE) message_(as_note = FALSE) message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.") } @@ -1492,11 +1598,11 @@ as_sir_method <- function(method_short, add_intrinsic_resistance_to_AMR_env() } - if (isTRUE(info) && nrow(df_unique) < 10 || nrow(breakpoints) == 0) { + if (isTRUE(info) && (nrow(df_unique) < 10 || nrow(breakpoints) == 0)) { # only print intro under 10 items, otherwise progressbar will print this and then it will be printed double message_(intro_txt, appendLF = FALSE, as_note = FALSE) } - p <- progress_ticker(n = nrow(df_unique), n_min = 10, title = intro_txt, only_bar_percent = TRUE) + p <- progress_ticker(n = nrow(df_unique), n_min = 10, print = isTRUE(info), title = intro_txt, only_bar_percent = TRUE) has_progress_bar <- !is.null(import_fn("progress_bar", "progress", error_on_fail = FALSE)) && nrow(df_unique) >= 10 on.exit(close(p)) diff --git a/tests/testthat/test-sir.R b/tests/testthat/test-sir.R index 849729276..160f9a357 100644 --- a/tests/testthat/test-sir.R +++ b/tests/testthat/test-sir.R @@ -406,40 +406,126 @@ test_that("test-sir.R", { expect_equal(out3, as.sir(c("NWT", "WT", "NWT"))) expect_equal(out4, as.sir(c("NWT", "WT", "NWT"))) + # Issue #278: re-running as.sir() on already- data must preserve columns + df_already_sir <- data.frame( + mo = "B_ESCHR_COLI", + AMC = as.mic(c("1", "2", "4")), + GEN = sample(c("S", "I", "R"), 3, replace = TRUE), + stringsAsFactors = FALSE + ) + first_pass <- suppressMessages(as.sir(df_already_sir, col_mo = "mo", info = FALSE)) + second_pass <- suppressMessages(as.sir(first_pass, col_mo = "mo", info = FALSE)) + expect_equal(ncol(first_pass), ncol(second_pass)) + expect_true(is.sir(second_pass[["AMC"]])) + expect_true(is.sir(second_pass[["GEN"]])) + expect_identical(first_pass[["AMC"]], second_pass[["AMC"]]) + expect_identical(first_pass[["GEN"]], second_pass[["GEN"]]) + + # Issue #278: metadata columns whose names coincidentally match antibiotic + # codes (e.g. 'patient' -> OXY, 'ward' -> PRU) must not be processed + df_meta <- data.frame( + mo = "B_ESCHR_COLI", + patient = paste0("Pt_", 1:20), + ward = rep(c("ICU", "Surgery", "Outpatient", "ED"), 5), + AMC = as.mic(rep(c("1", "2", "4", "8"), 5)), + stringsAsFactors = FALSE + ) + df_meta_sir <- suppressMessages(as.sir(df_meta, col_mo = "mo", info = FALSE)) + expect_true("patient" %in% colnames(df_meta_sir)) + expect_true("ward" %in% colnames(df_meta_sir)) + expect_false(is.sir(df_meta_sir[["patient"]])) + expect_false(is.sir(df_meta_sir[["ward"]])) + expect_true(is.sir(df_meta_sir[["AMC"]])) + # Parallel computing ---------------------------------------------------- + # Tests must pass even when only 1 core is available; parallel = TRUE then + # silently falls back to sequential, but results must still be identical. - # MB 29 Apr 2025: I have run the code of AVC, PEI, Canada (dataset of 2854x65), and compared it like this: + set.seed(42) + n_par <- 200 + df_par <- data.frame( + mo = "B_ESCHR_COLI", + AMC = as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n_par, TRUE)), + GEN = as.mic(sample(c("0.5", "1", "2", "4", "8", "16", "32", "64"), n_par, TRUE)), + CIP = as.mic(sample(c("0.001", "0.002", "0.004", "0.008", "0.016", "0.032"), n_par, TRUE)), + PEN = sample(c("S", "I", "R", NA_character_), n_par, TRUE), + stringsAsFactors = FALSE + ) - # system.time({ - # data_2022_2023_SIR_parallel <- data_2022_2023_clean |> - # as.sir(amikacin:tiamulin, - # col_mo = "mo", - # guideline = "CLSI 2024", - # host = "Species", - # uti = "isUTI", - # parallel = TRUE) - # }) - # # user system elapsed - # # 271.424 2.767 45.762 - # - # history_parallel <- sir_interpretation_history(clean = TRUE) - # - # system.time({ - # data_2022_2023_SIR <- data_2022_2023_clean |> - # as.sir(amikacin:tiamulin, - # col_mo = "mo", - # guideline = "CLSI 2024", - # host = "Species", - # uti = "isUTI") - # }) - # # user system elapsed - # # 120.637 5.406 128.835 - # history <- sir_interpretation_history() + # clear any existing history before comparing + sir_interpretation_history(clean = TRUE) + sir_seq <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE)) + log_seq <- sir_interpretation_history(clean = TRUE) + sir_par <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE)) + log_par <- sir_interpretation_history(clean = TRUE) - # and then got this: - # identical(history[, -1], history_parallel[, -1]) - #> [1] TRUE + # 1. parallel = TRUE gives identical SIR results to sequential + expect_identical(sir_seq[["AMC"]], sir_par[["AMC"]]) + expect_identical(sir_seq[["GEN"]], sir_par[["GEN"]]) + expect_identical(sir_seq[["CIP"]], sir_par[["CIP"]]) + expect_identical(sir_seq[["PEN"]], sir_par[["PEN"]]) - # so parallel on Apple M2 is 2.8x faster, with identical history -> GREAT! + # 2. same number of log rows as sequential + expect_equal(nrow(log_seq), nrow(log_par)) + + # 3. pre-existing log entries must not be duplicated + # run sequential once to populate the history, then run parallel and + # verify the new parallel run adds exactly as many rows as sequential + sir_interpretation_history(clean = TRUE) + suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE)) # populate history + pre_n <- nrow(sir_interpretation_history()) + suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE)) + post_n <- nrow(sir_interpretation_history()) + expect_equal(post_n - pre_n, nrow(log_seq)) # exactly one run's worth of new rows + sir_interpretation_history(clean = TRUE) + + # 4. two sequential runs and two parallel runs yield identical results + sir_par2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE)) + expect_identical(sir_par[["AMC"]], sir_par2[["AMC"]]) + expect_identical(sir_par[["GEN"]], sir_par2[["GEN"]]) + + # 5. max_cores = 1 gives same results as default sequential + sir_mc1 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 1L)) + expect_identical(sir_seq[["AMC"]], sir_mc1[["AMC"]]) + expect_identical(sir_seq[["GEN"]], sir_mc1[["GEN"]]) + + # 6. max_cores = 2 and max_cores = 3 give same results as sequential + sir_mc2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 2L)) + sir_mc3 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 3L)) + expect_identical(sir_seq[["AMC"]], sir_mc2[["AMC"]]) + expect_identical(sir_seq[["GEN"]], sir_mc3[["GEN"]]) + + # 7. single-column data frame falls back silently to sequential + df_single <- df_par[, c("mo", "AMC")] + sir_single_seq <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE)) + sir_single_par <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE, parallel = TRUE)) + expect_identical(sir_single_seq[["AMC"]], sir_single_par[["AMC"]]) + + # 9. row-batch mode (n_cols < n_cores): force row splitting via max_cores and + # verify identical output to sequential for a dataset with 2 AB columns so + # pieces_per_col = ceiling(max_cores / 2) >= 2 and row batching activates + df_wide <- data.frame( + mo = "B_ESCHR_COLI", + AMC = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)), + GEN = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)), + stringsAsFactors = FALSE + ) + sir_wide_seq <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE)) + sir_wide_par <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE, + parallel = TRUE, max_cores = 8L)) + expect_identical(sir_wide_seq[["AMC"]], sir_wide_par[["AMC"]]) + expect_identical(sir_wide_seq[["GEN"]], sir_wide_par[["GEN"]]) + + # 8. info = TRUE with parallel does not produce per-column worker messages + # (messages should only appear in the main process, not duplicated from workers) + msgs <- capture.output( + suppressWarnings(as.sir(df_par, col_mo = "mo", info = TRUE, parallel = TRUE)), + type = "message" + ) + # each AB column name should appear at most once in all messages combined + for (ab_nm in c("AMC", "GEN", "CIP", "PEN")) { + n_mentions <- sum(grepl(ab_nm, msgs, fixed = TRUE)) + expect_lte(n_mentions, 1L) + } }) diff --git a/tools/benchmark_parallel.R b/tools/benchmark_parallel.R new file mode 100644 index 000000000..db0141be0 --- /dev/null +++ b/tools/benchmark_parallel.R @@ -0,0 +1,115 @@ +# Benchmark: sequential vs parallel as.sir() across data-set shapes +# +# Run from the repo root: +# Rscript tools/benchmark_parallel.R +# or inside an R session: +# source("tools/benchmark_parallel.R") +# +# Two panels: +# Left – fixed columns (n_ab_fixed), varying rows. +# Parallel wins at small n; sequential catches up at large n due to +# memory-bandwidth saturation (all workers compete for the same +# clinical_breakpoints lookup table in L3 cache / RAM). +# Right – fixed rows (n_rows_fixed), varying column count. +# This is the shape that actually benefits: each additional column +# keeps another core busy. The "real world" gain for a 2854×65 +# dataset lives here. +# +# Requires ggplot2; uses devtools::load_all() so the package need not be +# installed. + +devtools::load_all(".", quiet = TRUE) + +# ── configuration ───────────────────────────────────────────────────────────── +row_sizes <- c(200, 1000, 5000, 20000) +col_sizes <- c(4, 8, 16, 32, 48) +n_rows_fixed <- 1000 +n_ab_fixed <- 16 +n_cores_avail <- AMR:::get_n_cores(Inf) + +all_abs <- c("AMC", "GEN", "CIP", "TZP", "IPM", "MEM", + "AMP", "TMP", "SXT", "NIT", "FOX", "CRO", + "FEP", "CAZ", "CTX", "TOB", "AMK", "ERY", + "AZM", "CLI", "VAN", "TEC", "RIF", "MTR", + "MFX", "LNZ", "TGC", "DOX", "FLC", "OXA", + "PEN", "CXM", "CZO", "KAN", "COL", "FOS", + "MUP", "TCY", "TEC", "IPM", "CHL", "FEP", + "MEM", "TZP", "GEN", "AMC", "AMX", "AMP") +all_abs <- unique(all_abs) + +mic_vals <- c("0.25", "0.5", "1", "2", "4", "8", "16", "32") + +make_df <- function(n_rows, n_ab) { + set.seed(42) + ab_sel <- all_abs[seq_len(min(n_ab, length(all_abs)))] + mics <- lapply(ab_sel, function(a) as.mic(sample(mic_vals, n_rows, TRUE))) + names(mics) <- ab_sel + data.frame(mo = "B_ESCHR_COLI", mics, stringsAsFactors = FALSE) +} + +time_both <- function(n_rows, n_ab, label) { + df <- make_df(n_rows, n_ab) + t_seq <- system.time( + suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = FALSE)) + )[["elapsed"]] + t_par <- system.time( + suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = TRUE)) + )[["elapsed"]] + message(sprintf("%-28s seq=%5.2fs par=%5.2fs speedup=%.1fx", + label, t_seq, t_par, t_seq / t_par)) + data.frame(group = label, mode = c("sequential", "parallel"), + seconds = c(t_seq, t_par), stringsAsFactors = FALSE) +} + +# ── warm-up (avoid first-call overhead biasing results) ─────────────────────── +message("Warming up cache ...") +invisible(suppressMessages(as.sir(make_df(100, 6), col_mo = "mo", info = FALSE))) +invisible(suppressMessages(as.sir(make_df(100, 6), col_mo = "mo", info = FALSE, parallel = TRUE))) +sir_interpretation_history(clean = TRUE) + +# ── panel 1: vary rows, fixed columns ───────────────────────────────────────── +message(sprintf("\nPanel 1 – varying rows, %d fixed columns:", n_ab_fixed)) +res_rows <- do.call(rbind, lapply(row_sizes, function(n) { + time_both(n, n_ab_fixed, sprintf("rows=%d", n)) +})) +res_rows$x <- rep(row_sizes, each = 2) +res_rows$panel <- "Vary rows (16 fixed AB columns)" + +# ── panel 2: vary columns, fixed rows ───────────────────────────────────────── +message(sprintf("\nPanel 2 – varying columns, %d fixed rows:", n_rows_fixed)) +res_cols <- do.call(rbind, lapply(col_sizes, function(n_ab) { + time_both(n_rows_fixed, n_ab, sprintf("cols=%d", n_ab)) +})) +res_cols$x <- rep(col_sizes, each = 2) +res_cols$panel <- sprintf("Vary columns (%d fixed rows)", n_rows_fixed) + +results <- rbind(res_rows, res_cols) + +if (requireNamespace("ggplot2", quietly = TRUE)) { + p <- ggplot2::ggplot( + results, + ggplot2::aes(x = x, y = seconds, colour = mode, group = mode) + ) + + ggplot2::geom_line(linewidth = 1) + + ggplot2::geom_point(size = 2.5) + + ggplot2::facet_wrap(~panel, scales = "free_x") + + ggplot2::scale_colour_manual( + values = c(sequential = "#E05C5C", parallel = "#2E86AB") + ) + + ggplot2::labs( + title = "as.sir() throughput: sequential vs parallel", + subtitle = sprintf("E. coli, EUCAST 2026, %d cores available", n_cores_avail), + x = "Dataset dimension (rows ·left· or columns ·right·)", + y = "Wall-clock time (seconds)", + colour = NULL + ) + + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme(legend.position = "top") + + out_file <- "tools/benchmark_parallel.png" + ggplot2::ggsave(out_file, p, width = 10, height = 5, dpi = 150) + message("\nPlot saved to ", out_file) +} else { + message("Install ggplot2 to get a plot; raw results:") + print(results[, c("panel", "group", "mode", "seconds")]) +}