1
0
mirror of https://github.com/msberends/AMR.git synced 2026-04-28 09:03:51 +02:00

Fix parallel computing in as.sir.data.frame (#276)

* Fix parallel computing in as.sir.data.frame

Six bugs in parallel = TRUE mode:

1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every
   exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR))
   with a graceful fallback to sequential when the package cannot be loaded
   (e.g. dev-only load_all() environments).

2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the
   worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale
   copy, so the captured log was always wrong. Fixed by resolving AMR_env
   dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the
   worker function, and removing AMR_env from clusterExport.

3. In the fork-based (mclapply) path each worker inherited the parent's full
   sir_interpretation_history. Capturing the whole log then combining across
   workers duplicated every pre-existing entry. Fixed by recording the log
   row count before the as.sir() call and slicing only the new rows
   afterwards.

4. run_as_sir_column used non-exported internals (%pm>%, pm_pull,
   as.sir.default) that are inaccessible on PSOCK workers after library(AMR).
   Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE]))
   and as.disk(...) calls, and changed as.sir.default() to as.sir() which
   dispatches correctly via S3.

5. With info = TRUE, worker forks printed per-column progress messages
   simultaneously, producing garbled interleaved console output. Per-column
   messages are now suppressed inside workers (effective_info = FALSE) while
   the outer "Running in parallel" / "DONE" messages still appear.

6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was
   parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  .

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Add parallel computing tests to test-sir.R

Eight targeted tests verify correctness of the parallel as.sir() path:
identical SIR output vs sequential, matching log row counts, no
pre-existing history duplication, reproducibility across runs, results
consistency across max_cores values, single-column fallback, and no
per-column worker messages leaking when info = TRUE. All pass when only
1 core is available (parallel silently falls back to sequential).

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Fix as.sir() data.frame: preserve already-<sir> columns, exclude metadata

Issue #278: two related bugs in the column-detection / type-assignment pipeline.

Bug 1 – already-<sir> columns deleted on re-run
  Line 886 excluded already-sir columns from the type assignment (they
  stayed type "") causing the result loop to do x[,col] <- NULL, deleting
  them.  Fix: drop the !is.sir() guard so all untyped columns fall through
  to type "sir" and are re-processed correctly.

Bug 2 – metadata columns treated as antibiotics
  as.ab("patient") -> OXY, as.ab("ward") -> PRU.  The column detector
  accepted any column whose name matched an antibiotic code, regardless of
  content.  Fix: for name-matched columns that do not already carry an AMR
  class, also verify content looks like AMR data (all_valid_mics, all-
  numeric, or any SIR-like string).  all_valid_disks() is intentionally
  avoided here because it strips letters from strings (as.disk("Pt_1")==1).

Also adds tools/benchmark_parallel.R: a standalone script that times
sequential vs parallel as.sir() across n=20/200/2000/20000 rows and
saves a ggplot2 PNG to tools/benchmark_parallel.png.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Update benchmark: two-panel script with warm-up and column-count sweep

Previous single-panel benchmark was misleading: the first sequential run
paid one-time cache-warm-up cost (skewing n=20), and only 6 columns were
used so only 6 cores were ever active on a 16-core machine.

New two-panel design:
  Left  – vary rows with 16 fixed AB columns (shows memory-bandwidth
          saturation for large n)
  Right – vary columns with fixed rows (shows the real speedup profile:
          parallel wins when n_cols >> 1)

Also adds a warm-up pass before measurements to eliminate first-call bias.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Optimise parallel as.sir(): row-batch mode when n_cols < n_cores

Previously parallel dispatch only parallelised by column, so a 6-column
dataset on a 16-core machine used at most 6 cores with the other 10 idle.
For large n this also caused memory-bandwidth saturation (each worker did
a full n-row scan of clinical_breakpoints simultaneously).

New row-batch mode (fork path, R >= 4.0, non-Windows):
  pieces_per_col = ceil(n_cores / n_cols)
  Jobs = n_cols × pieces_per_col  (≈ n_cores jobs total)
  Each job: one column × one row slice

Benefits:
  - All cores stay busy regardless of column count
  - Per-worker memory footprint shrinks by pieces_per_col ×
  - Breakpoints lookup cache pressure reduced per worker

PSOCK path (Windows / R < 4.0) is unchanged: per-job serialisation
overhead makes row batching unprofitable there.

run_as_sir_column() gains an optional `rows` parameter (NULL = all rows,
backward-compatible). Results are reassembled via as.sir(c(as.character(.)))
which is safe for already-clean SIR values.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Fix info=FALSE ignored when no breakpoints found in as_sir_method

Operator-precedence bug at line 1601:

  if (isTRUE(info) && nrow(df_unique) < 10 || nrow(breakpoints) == 0)

R evaluates && before ||, so this was equivalent to:

  (isTRUE(info) && nrow(df_unique) < 10) || (nrow(breakpoints) == 0)

When nrow(breakpoints) == 0 (e.g. cefoxitin / flucloxacillin / mupirocin
against E. coli in EUCAST) the intro message was always printed regardless
of info. Fix: add parentheses so info gates both conditions:

  isTRUE(info) && (nrow(df_unique) < 10 || nrow(breakpoints) == 0)

Also pass print = isTRUE(info) to progress_ticker so the progress bar
(which prints intro_txt as its title) is suppressed when info = FALSE.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Fix cli formatting in as.sir() messages

- stop_if for empty ab_cols: wrap as.mic() and as.disk() in
  {.help [{.fun ...}](...)} for clickable links in cli output
- Parallel mode message: use {.field col} formatting for column names
  and quotes = FALSE in vector_and(), consistent with the rest of the
  codebase (avoids double-quoting from both font_bold and quotes="'")

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Use font_bold() inside {.field} for column names in parallel message

Convention: paste0("{.field ", font_bold(col), "}") gives bold green
column names without quotation marks, consistent with the rest of the
codebase (e.g. the 'Cleaning values' message in run_as_sir_column).

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Add collapse = NULL to font_bold() for column name vectors

font_bold() without collapse = NULL joins a vector with "" into a single
string, breaking paste0() element-wise formatting for length > 1 vectors.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

* Add tools/ to .Rbuildignore

Keeps the benchmark script out of the built package tarball.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Matthijs Berends
2026-04-25 00:34:38 +02:00
committed by GitHub
parent e7780b6d5f
commit 19157ce718
6 changed files with 420 additions and 107 deletions

View File

@@ -41,4 +41,5 @@
^CRAN-SUBMISSION$
^PythonPackage$
^README\.Rmd$
^tools$
\.no_include$

View File

@@ -1,6 +1,6 @@
Package: AMR
Version: 3.0.1.9048
Date: 2026-04-22
Version: 3.0.1.9050
Date: 2026-04-24
Title: Antimicrobial Resistance Data Analysis
Description: Functions to simplify and standardise antimicrobial resistance (AMR)
data analysis and to work with microbial and antimicrobial properties by

View File

@@ -1,4 +1,4 @@
# AMR 3.0.1.9048
# AMR 3.0.1.9050
### New
* Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations.
@@ -21,6 +21,7 @@
* Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values
### Fixes
* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message
* Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter
* Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved
* Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245)
@@ -34,6 +35,10 @@
* Fixed SIR and MIC coercion of combined values, e.g. `as.sir("<= 0.002; S") ` or `as.mic("S; 0.002")` (#252)
* Fixed translation of foreign languages in `sir_df()` (#272)
* Fixed BRMO classification by including bacterial complexes (#275)
* Fixed `as.sir()` for data frames silently deleting columns whose AB class was already `<sir>` when called a second time (re-running on already-converted data) (#278)
* Fixed `as.sir()` for data frames incorrectly treating metadata columns (e.g. `patient`, `ward`) as antibiotic columns when their names coincidentally matched an antibiotic code; column content is now validated against AMR data patterns before inclusion
* Improved parallel computing in `as.sir()`: when the number of AB columns is smaller than the number of available cores, rows are now split into batches so all cores stay active (row-batch mode). Previously, a 6-column dataset on a 16-core machine would only use 6 cores; now all 16 are used, with each worker processing a smaller row slice (lower per-worker memory pressure)
* Fixed `as.sir()` ignoring `info = FALSE` for columns with no breakpoints (e.g. cefoxitin against *E. coli*): an operator-precedence bug (`&&`/`||`) caused the "Interpreting MIC values" intro message to fire unconditionally when `nrow(breakpoints) == 0`, regardless of `info`; the progress bar title was also not gated by `info`
### Updates
* Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265)

254
R/sir.R
View File

@@ -716,7 +716,7 @@ as.sir.disk <- function(x,
}
#' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used.
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
#' @export
as.sir.data.frame <- function(x,
@@ -852,7 +852,6 @@ as.sir.data.frame <- function(x,
i <- 0
ab_cols <- colnames(x)[vapply(FUN.VALUE = logical(1), x, function(y) {
i <<- i + 1
check <- is.mic(y) | is.disk(y)
ab <- colnames(x)[i]
if (!is.null(col_mo) && ab == col_mo) {
return(FALSE)
@@ -861,13 +860,30 @@ as.sir.data.frame <- function(x,
return(FALSE)
}
if (length(sel) == 0 || (length(sel) > 0 && ab %in% sel)) {
# columns already carrying an AMR class are always included
y_bak <- x.bak[, ab, drop = TRUE]
if (is.mic(y_bak) || is.disk(y_bak) || is.sir(y_bak)) {
return(TRUE)
}
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
if (is.na(ab_coerced) || (length(sel) > 0 & !ab %in% sel)) {
# not even a valid AB code
return(FALSE)
} else {
return(TRUE)
}
# Name matches an antibiotic; also verify column content resembles AMR
# data. This prevents false positives on metadata columns whose names
# happen to match a drug code (e.g. 'patient' -> OXY, 'ward' -> PRU).
# Note: all_valid_disks() is intentionally avoided here because it strips
# non-numeric characters (as.disk("Pt_1") == 1), accepting patient IDs.
y_char <- tryCatch(as.character(y), error = function(e) character(0))
y_valid <- y_char[!is.na(y_char) & nzchar(trimws(y_char))]
if (length(y_valid) == 0L) {
return(FALSE)
}
y_numeric <- suppressWarnings(as.numeric(y_valid))
all_valid_mics(y) ||
all(!is.na(y_numeric)) ||
any(y_valid %in% c("S", "SDD", "I", "R", "NI"))
} else {
return(FALSE)
}
@@ -875,7 +891,7 @@ as.sir.data.frame <- function(x,
stop_if(
length(ab_cols) == 0,
"no columns with MIC values, disk zones or antibiotic column names found in this data set. Use as.mic() or as.disk() to transform antimicrobial columns."
"no columns with MIC values, disk zones or antibiotic column names found in this data set. Use {.help [{.fun as.mic}](AMR::as.mic)} or {.help [{.fun as.disk}](AMR::as.disk)} to transform antimicrobial columns."
)
# set type per column
types <- character(length(ab_cols))
@@ -883,7 +899,7 @@ as.sir.data.frame <- function(x,
types[vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.mic)] <- "mic"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_disks)] <- "disk"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_mics)] <- "mic"
types[types == "" & !vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.sir)] <- "sir"
types[types == ""] <- "sir"
if (any(types %in% c("mic", "disk"), na.rm = TRUE)) {
# now we need an mo column
stop_if(is.null(col_mo), "{.arg col_mo} must be set")
@@ -906,6 +922,21 @@ as.sir.data.frame <- function(x,
return(NULL)
}
)
if (!is.null(cl)) {
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
amr_loaded_on_workers <- tryCatch({
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
TRUE
}, error = function(e) FALSE)
if (!amr_loaded_on_workers) {
if (isTRUE(info)) {
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
}
parallel::stopCluster(cl)
cl <- NULL
}
}
if (is.null(cl)) {
n_cores <- 1
}
@@ -916,93 +947,139 @@ as.sir.data.frame <- function(x,
message_("Processing columns:", as_note = FALSE)
}
run_as_sir_column <- function(i) {
# In parallel mode suppress per-column messages: workers print simultaneously and
# their output would be interleaved on the console.
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
effective_info <- if (is_parallel_run) FALSE else info
# Row-batch mode: when n_cols < n_cores we would leave cores idle under plain
# column-parallel dispatch. Instead we split rows into pieces so every core
# gets work. pieces_per_col = ceil(n_cores / n_cols) gives ~n_cores jobs
# total; each job processes one column on one row slice, which also reduces
# per-worker memory pressure (smaller breakpoints search space).
# Only used for the fork path (R >= 4.0, non-Windows); PSOCK clusters already
# incur high per-job serialisation overhead so we keep column-mode there.
use_fork <- is_parallel_run &&
!(.Platform$OS.type == "windows" || getRversion() < "4.0.0")
pieces_per_col <- if (use_fork && length(ab_cols) < n_cores) {
ceiling(n_cores / length(ab_cols))
} else {
1L
}
run_as_sir_column <- function(i, rows = NULL) {
# Always resolve AMR_env from the package namespace. This is essential for
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
# pre-existing log entries from earlier in the session when forking.
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
# In parallel mode each worker (fork or PSOCK) has its own copy of the
# history; record the current length so we capture only the new rows added
# by the as.sir() call below, not any pre-existing entries inherited at fork
# time or carried over from earlier as.sir() calls.
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
ab_col <- ab_cols[i]
out <- list(result = NULL, log = NULL)
# row subsetting: NULL means all rows (column-mode), otherwise row-batch mode
row_idx <- if (is.null(rows)) seq_len(nrow(x)) else rows
if (types[i] == "mic") {
result <- x %pm>%
pm_pull(ab_col) %pm>%
as.character() %pm>%
as.mic() %pm>%
as.sir(
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
capped_mic_handling = capped_mic_handling,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = info,
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
result <- as.sir(
as.mic(as.character(x[row_idx, ab_col, drop = TRUE])),
mo = x_mo[row_idx],
mo.bak = x[row_idx, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = if (length(uti) > 1L) uti[row_idx] else uti,
capped_mic_handling = capped_mic_handling,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = if (length(host) > 1L) host[row_idx] else host,
verbose = verbose,
info = effective_info,
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
out$result <- result
out$log <- AMR_env$sir_interpretation_history
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log
if (is_parallel_run) {
full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out)
} else if (types[i] == "disk") {
result <- x %pm>%
pm_pull(ab_col) %pm>%
as.character() %pm>%
as.disk() %pm>%
as.sir(
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = info,
is_data.frame = TRUE
)
result <- as.sir(
as.disk(as.character(x[row_idx, ab_col, drop = TRUE])),
mo = x_mo[row_idx],
mo.bak = x[row_idx, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = if (length(uti) > 1L) uti[row_idx] else uti,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = if (length(host) > 1L) host[row_idx] else host,
verbose = verbose,
info = effective_info,
is_data.frame = TRUE
)
out$result <- result
out$log <- AMR_env$sir_interpretation_history
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE]
if (is_parallel_run) {
full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out)
} else if (types[i] == "sir") {
ab <- ab_col
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
show_message <- FALSE
if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
if (!all(x[row_idx, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
show_message <- TRUE
if (isTRUE(info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, info = info), ")... ",
ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ",
appendLF = FALSE,
as_note = FALSE
)
}
} else if (!is.sir(x.bak[, ab, drop = TRUE])) {
show_message <- TRUE
if (isTRUE(info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ",
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ",
appendLF = FALSE,
as_note = FALSE
)
}
}
result <- as.sir.default(x = as.character(x[, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(info)) {
result <- as.sir(as.character(x[row_idx, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(effective_info)) {
message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE)
}
out$result <- result
@@ -1016,26 +1093,55 @@ as.sir.data.frame <- function(x,
if (isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1) {
if (isTRUE(info)) {
message_(as_note = FALSE)
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(font_bold(ab_cols, collapse = NULL), quotes = "'", sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
if (pieces_per_col > 1L) {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), " (", pieces_per_col, " row slices per column)...", as_note = FALSE, appendLF = FALSE)
} else {
message_("Running in parallel mode using ", n_cores, " out of ", get_n_cores(Inf), " cores, on columns ", vector_and(paste0("{.field ", font_bold(ab_cols, collapse = NULL), "}"), quotes = FALSE, sort = FALSE), "...", as_note = FALSE, appendLF = FALSE)
}
}
if (.Platform$OS.type == "windows" || getRversion() < "4.0.0") {
# `cl` has been created in the part above before the `run_as_sir_column` function
# PSOCK cluster: column-mode only (row-batch serialisation overhead not worth it)
on.exit(parallel::stopCluster(cl), add = TRUE)
parallel::clusterExport(cl, varlist = c(
"x", "x.bak", "x_mo", "ab_cols", "types",
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
"breakpoint_type", "guideline", "host", "uti", "info", "verbose",
"col_mo", "AMR_env", "conserve_capped_values",
"breakpoint_type", "guideline", "host", "uti", "verbose",
"col_mo", "conserve_capped_values",
"effective_info", "is_parallel_run",
"run_as_sir_column"
), envir = environment())
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
} else if (pieces_per_col > 1L) {
# Row-batch mode (R >= 4.0, non-Windows, n_cols < n_cores):
# build (col, row_slice) job pairs so all cores stay active
row_cuts <- unique(round(seq(0, nrow(x), length.out = pieces_per_col + 1L)))
row_ranges <- lapply(seq_len(length(row_cuts) - 1L), function(p) {
seq.int(row_cuts[p] + 1L, row_cuts[p + 1L])
})
jobs <- do.call(c, lapply(seq_along(ab_cols), function(ci) {
lapply(seq_along(row_ranges), function(p) list(col = ci, rows = row_ranges[[p]]))
}))
flat <- parallel::mclapply(jobs, function(job) {
run_as_sir_column(job$col, job$rows)
}, mc.cores = n_cores)
# Reassemble: for each column concatenate row pieces in order
result_list <- lapply(seq_along(ab_cols), function(ci) {
pieces <- flat[vapply(jobs, function(j) j$col == ci, logical(1L))]
list(
result = as.sir(do.call(c, lapply(pieces, function(p) as.character(p$result)))),
log = {
logs <- Filter(Negate(is.null), lapply(pieces, function(p) p$log))
if (length(logs) > 0L) do.call(rbind_AMR, logs) else NULL
}
)
})
} else {
# R>=4.0 on unix
# Column-parallel mode (R >= 4.0, non-Windows, n_cols >= n_cores)
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
}
if (isTRUE(info)) {
message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE)
message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
message_(as_note = FALSE)
message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.")
}
@@ -1492,11 +1598,11 @@ as_sir_method <- function(method_short,
add_intrinsic_resistance_to_AMR_env()
}
if (isTRUE(info) && nrow(df_unique) < 10 || nrow(breakpoints) == 0) {
if (isTRUE(info) && (nrow(df_unique) < 10 || nrow(breakpoints) == 0)) {
# only print intro under 10 items, otherwise progressbar will print this and then it will be printed double
message_(intro_txt, appendLF = FALSE, as_note = FALSE)
}
p <- progress_ticker(n = nrow(df_unique), n_min = 10, title = intro_txt, only_bar_percent = TRUE)
p <- progress_ticker(n = nrow(df_unique), n_min = 10, print = isTRUE(info), title = intro_txt, only_bar_percent = TRUE)
has_progress_bar <- !is.null(import_fn("progress_bar", "progress", error_on_fail = FALSE)) && nrow(df_unique) >= 10
on.exit(close(p))

View File

@@ -406,40 +406,126 @@ test_that("test-sir.R", {
expect_equal(out3, as.sir(c("NWT", "WT", "NWT")))
expect_equal(out4, as.sir(c("NWT", "WT", "NWT")))
# Issue #278: re-running as.sir() on already-<sir> data must preserve columns
df_already_sir <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(c("1", "2", "4")),
GEN = sample(c("S", "I", "R"), 3, replace = TRUE),
stringsAsFactors = FALSE
)
first_pass <- suppressMessages(as.sir(df_already_sir, col_mo = "mo", info = FALSE))
second_pass <- suppressMessages(as.sir(first_pass, col_mo = "mo", info = FALSE))
expect_equal(ncol(first_pass), ncol(second_pass))
expect_true(is.sir(second_pass[["AMC"]]))
expect_true(is.sir(second_pass[["GEN"]]))
expect_identical(first_pass[["AMC"]], second_pass[["AMC"]])
expect_identical(first_pass[["GEN"]], second_pass[["GEN"]])
# Issue #278: metadata columns whose names coincidentally match antibiotic
# codes (e.g. 'patient' -> OXY, 'ward' -> PRU) must not be processed
df_meta <- data.frame(
mo = "B_ESCHR_COLI",
patient = paste0("Pt_", 1:20),
ward = rep(c("ICU", "Surgery", "Outpatient", "ED"), 5),
AMC = as.mic(rep(c("1", "2", "4", "8"), 5)),
stringsAsFactors = FALSE
)
df_meta_sir <- suppressMessages(as.sir(df_meta, col_mo = "mo", info = FALSE))
expect_true("patient" %in% colnames(df_meta_sir))
expect_true("ward" %in% colnames(df_meta_sir))
expect_false(is.sir(df_meta_sir[["patient"]]))
expect_false(is.sir(df_meta_sir[["ward"]]))
expect_true(is.sir(df_meta_sir[["AMC"]]))
# Parallel computing ----------------------------------------------------
# Tests must pass even when only 1 core is available; parallel = TRUE then
# silently falls back to sequential, but results must still be identical.
# MB 29 Apr 2025: I have run the code of AVC, PEI, Canada (dataset of 2854x65), and compared it like this:
set.seed(42)
n_par <- 200
df_par <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n_par, TRUE)),
GEN = as.mic(sample(c("0.5", "1", "2", "4", "8", "16", "32", "64"), n_par, TRUE)),
CIP = as.mic(sample(c("0.001", "0.002", "0.004", "0.008", "0.016", "0.032"), n_par, TRUE)),
PEN = sample(c("S", "I", "R", NA_character_), n_par, TRUE),
stringsAsFactors = FALSE
)
# system.time({
# data_2022_2023_SIR_parallel <- data_2022_2023_clean |>
# as.sir(amikacin:tiamulin,
# col_mo = "mo",
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI",
# parallel = TRUE)
# })
# # user system elapsed
# # 271.424 2.767 45.762
#
# history_parallel <- sir_interpretation_history(clean = TRUE)
#
# system.time({
# data_2022_2023_SIR <- data_2022_2023_clean |>
# as.sir(amikacin:tiamulin,
# col_mo = "mo",
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI")
# })
# # user system elapsed
# # 120.637 5.406 128.835
# history <- sir_interpretation_history()
# clear any existing history before comparing
sir_interpretation_history(clean = TRUE)
sir_seq <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE))
log_seq <- sir_interpretation_history(clean = TRUE)
sir_par <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
log_par <- sir_interpretation_history(clean = TRUE)
# and then got this:
# identical(history[, -1], history_parallel[, -1])
#> [1] TRUE
# 1. parallel = TRUE gives identical SIR results to sequential
expect_identical(sir_seq[["AMC"]], sir_par[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_par[["GEN"]])
expect_identical(sir_seq[["CIP"]], sir_par[["CIP"]])
expect_identical(sir_seq[["PEN"]], sir_par[["PEN"]])
# so parallel on Apple M2 is 2.8x faster, with identical history -> GREAT!
# 2. same number of log rows as sequential
expect_equal(nrow(log_seq), nrow(log_par))
# 3. pre-existing log entries must not be duplicated
# run sequential once to populate the history, then run parallel and
# verify the new parallel run adds exactly as many rows as sequential
sir_interpretation_history(clean = TRUE)
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE)) # populate history
pre_n <- nrow(sir_interpretation_history())
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
post_n <- nrow(sir_interpretation_history())
expect_equal(post_n - pre_n, nrow(log_seq)) # exactly one run's worth of new rows
sir_interpretation_history(clean = TRUE)
# 4. two sequential runs and two parallel runs yield identical results
sir_par2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_par[["AMC"]], sir_par2[["AMC"]])
expect_identical(sir_par[["GEN"]], sir_par2[["GEN"]])
# 5. max_cores = 1 gives same results as default sequential
sir_mc1 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 1L))
expect_identical(sir_seq[["AMC"]], sir_mc1[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc1[["GEN"]])
# 6. max_cores = 2 and max_cores = 3 give same results as sequential
sir_mc2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 2L))
sir_mc3 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 3L))
expect_identical(sir_seq[["AMC"]], sir_mc2[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc3[["GEN"]])
# 7. single-column data frame falls back silently to sequential
df_single <- df_par[, c("mo", "AMC")]
sir_single_seq <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE))
sir_single_par <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_single_seq[["AMC"]], sir_single_par[["AMC"]])
# 9. row-batch mode (n_cols < n_cores): force row splitting via max_cores and
# verify identical output to sequential for a dataset with 2 AB columns so
# pieces_per_col = ceiling(max_cores / 2) >= 2 and row batching activates
df_wide <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)),
GEN = as.mic(sample(c("1", "2", "4", "8"), n_par, TRUE)),
stringsAsFactors = FALSE
)
sir_wide_seq <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE))
sir_wide_par <- suppressMessages(as.sir(df_wide, col_mo = "mo", info = FALSE,
parallel = TRUE, max_cores = 8L))
expect_identical(sir_wide_seq[["AMC"]], sir_wide_par[["AMC"]])
expect_identical(sir_wide_seq[["GEN"]], sir_wide_par[["GEN"]])
# 8. info = TRUE with parallel does not produce per-column worker messages
# (messages should only appear in the main process, not duplicated from workers)
msgs <- capture.output(
suppressWarnings(as.sir(df_par, col_mo = "mo", info = TRUE, parallel = TRUE)),
type = "message"
)
# each AB column name should appear at most once in all messages combined
for (ab_nm in c("AMC", "GEN", "CIP", "PEN")) {
n_mentions <- sum(grepl(ab_nm, msgs, fixed = TRUE))
expect_lte(n_mentions, 1L)
}
})

115
tools/benchmark_parallel.R Normal file
View File

@@ -0,0 +1,115 @@
# Benchmark: sequential vs parallel as.sir() across data-set shapes
#
# Run from the repo root:
# Rscript tools/benchmark_parallel.R
# or inside an R session:
# source("tools/benchmark_parallel.R")
#
# Two panels:
# Left fixed columns (n_ab_fixed), varying rows.
# Parallel wins at small n; sequential catches up at large n due to
# memory-bandwidth saturation (all workers compete for the same
# clinical_breakpoints lookup table in L3 cache / RAM).
# Right fixed rows (n_rows_fixed), varying column count.
# This is the shape that actually benefits: each additional column
# keeps another core busy. The "real world" gain for a 2854×65
# dataset lives here.
#
# Requires ggplot2; uses devtools::load_all() so the package need not be
# installed.
devtools::load_all(".", quiet = TRUE)
# ── configuration ─────────────────────────────────────────────────────────────
row_sizes <- c(200, 1000, 5000, 20000)
col_sizes <- c(4, 8, 16, 32, 48)
n_rows_fixed <- 1000
n_ab_fixed <- 16
n_cores_avail <- AMR:::get_n_cores(Inf)
all_abs <- c("AMC", "GEN", "CIP", "TZP", "IPM", "MEM",
"AMP", "TMP", "SXT", "NIT", "FOX", "CRO",
"FEP", "CAZ", "CTX", "TOB", "AMK", "ERY",
"AZM", "CLI", "VAN", "TEC", "RIF", "MTR",
"MFX", "LNZ", "TGC", "DOX", "FLC", "OXA",
"PEN", "CXM", "CZO", "KAN", "COL", "FOS",
"MUP", "TCY", "TEC", "IPM", "CHL", "FEP",
"MEM", "TZP", "GEN", "AMC", "AMX", "AMP")
all_abs <- unique(all_abs)
mic_vals <- c("0.25", "0.5", "1", "2", "4", "8", "16", "32")
make_df <- function(n_rows, n_ab) {
set.seed(42)
ab_sel <- all_abs[seq_len(min(n_ab, length(all_abs)))]
mics <- lapply(ab_sel, function(a) as.mic(sample(mic_vals, n_rows, TRUE)))
names(mics) <- ab_sel
data.frame(mo = "B_ESCHR_COLI", mics, stringsAsFactors = FALSE)
}
time_both <- function(n_rows, n_ab, label) {
df <- make_df(n_rows, n_ab)
t_seq <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = FALSE))
)[["elapsed"]]
t_par <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = TRUE))
)[["elapsed"]]
message(sprintf("%-28s seq=%5.2fs par=%5.2fs speedup=%.1fx",
label, t_seq, t_par, t_seq / t_par))
data.frame(group = label, mode = c("sequential", "parallel"),
seconds = c(t_seq, t_par), stringsAsFactors = FALSE)
}
# ── warm-up (avoid first-call overhead biasing results) ───────────────────────
message("Warming up cache ...")
invisible(suppressMessages(as.sir(make_df(100, 6), col_mo = "mo", info = FALSE)))
invisible(suppressMessages(as.sir(make_df(100, 6), col_mo = "mo", info = FALSE, parallel = TRUE)))
sir_interpretation_history(clean = TRUE)
# ── panel 1: vary rows, fixed columns ─────────────────────────────────────────
message(sprintf("\nPanel 1 varying rows, %d fixed columns:", n_ab_fixed))
res_rows <- do.call(rbind, lapply(row_sizes, function(n) {
time_both(n, n_ab_fixed, sprintf("rows=%d", n))
}))
res_rows$x <- rep(row_sizes, each = 2)
res_rows$panel <- "Vary rows (16 fixed AB columns)"
# ── panel 2: vary columns, fixed rows ─────────────────────────────────────────
message(sprintf("\nPanel 2 varying columns, %d fixed rows:", n_rows_fixed))
res_cols <- do.call(rbind, lapply(col_sizes, function(n_ab) {
time_both(n_rows_fixed, n_ab, sprintf("cols=%d", n_ab))
}))
res_cols$x <- rep(col_sizes, each = 2)
res_cols$panel <- sprintf("Vary columns (%d fixed rows)", n_rows_fixed)
results <- rbind(res_rows, res_cols)
if (requireNamespace("ggplot2", quietly = TRUE)) {
p <- ggplot2::ggplot(
results,
ggplot2::aes(x = x, y = seconds, colour = mode, group = mode)
) +
ggplot2::geom_line(linewidth = 1) +
ggplot2::geom_point(size = 2.5) +
ggplot2::facet_wrap(~panel, scales = "free_x") +
ggplot2::scale_colour_manual(
values = c(sequential = "#E05C5C", parallel = "#2E86AB")
) +
ggplot2::labs(
title = "as.sir() throughput: sequential vs parallel",
subtitle = sprintf("E. coli, EUCAST 2026, %d cores available", n_cores_avail),
x = "Dataset dimension (rows ·left· or columns ·right·)",
y = "Wall-clock time (seconds)",
colour = NULL
) +
ggplot2::theme_minimal(base_size = 12) +
ggplot2::theme(legend.position = "top")
out_file <- "tools/benchmark_parallel.png"
ggplot2::ggsave(out_file, p, width = 10, height = 5, dpi = 150)
message("\nPlot saved to ", out_file)
} else {
message("Install ggplot2 to get a plot; raw results:")
print(results[, c("panel", "group", "mode", "seconds")])
}