1
0
mirror of https://github.com/msberends/AMR.git synced 2026-05-31 12:21:40 +02:00

3 Commits

Author SHA1 Message Date
Claude
6ece73cb22 Fix as.sir() data.frame: preserve already-<sir> columns, exclude metadata
Issue #278: two related bugs in the column-detection / type-assignment pipeline.

Bug 1 – already-<sir> columns deleted on re-run
  Line 886 excluded already-sir columns from the type assignment (they
  stayed type "") causing the result loop to do x[,col] <- NULL, deleting
  them.  Fix: drop the !is.sir() guard so all untyped columns fall through
  to type "sir" and are re-processed correctly.

Bug 2 – metadata columns treated as antibiotics
  as.ab("patient") -> OXY, as.ab("ward") -> PRU.  The column detector
  accepted any column whose name matched an antibiotic code, regardless of
  content.  Fix: for name-matched columns that do not already carry an AMR
  class, also verify content looks like AMR data (all_valid_mics, all-
  numeric, or any SIR-like string).  all_valid_disks() is intentionally
  avoided here because it strips letters from strings (as.disk("Pt_1")==1).

Also adds tools/benchmark_parallel.R: a standalone script that times
sequential vs parallel as.sir() across n=20/200/2000/20000 rows and
saves a ggplot2 PNG to tools/benchmark_parallel.png.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 21:30:21 +00:00
Claude
ce79dd1f75 Add parallel computing tests to test-sir.R
Eight targeted tests verify correctness of the parallel as.sir() path:
identical SIR output vs sequential, matching log row counts, no
pre-existing history duplication, reproducibility across runs, results
consistency across max_cores values, single-column fallback, and no
per-column worker messages leaking when info = TRUE. All pass when only
1 core is available (parallel silently falls back to sequential).

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 20:42:27 +00:00
Claude
4ad3812e13 Fix parallel computing in as.sir.data.frame
Six bugs in parallel = TRUE mode:

1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every
   exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR))
   with a graceful fallback to sequential when the package cannot be loaded
   (e.g. dev-only load_all() environments).

2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the
   worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale
   copy, so the captured log was always wrong. Fixed by resolving AMR_env
   dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the
   worker function, and removing AMR_env from clusterExport.

3. In the fork-based (mclapply) path each worker inherited the parent's full
   sir_interpretation_history. Capturing the whole log then combining across
   workers duplicated every pre-existing entry. Fixed by recording the log
   row count before the as.sir() call and slicing only the new rows
   afterwards.

4. run_as_sir_column used non-exported internals (%pm>%, pm_pull,
   as.sir.default) that are inaccessible on PSOCK workers after library(AMR).
   Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE]))
   and as.disk(...) calls, and changed as.sir.default() to as.sir() which
   dispatches correctly via S3.

5. With info = TRUE, worker forks printed per-column progress messages
   simultaneously, producing garbled interleaved console output. Per-column
   messages are now suppressed inside workers (effective_info = FALSE) while
   the outer "Running in parallel" / "DONE" messages still appear.

6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was
   parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  .

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 15:31:40 +00:00
5 changed files with 304 additions and 99 deletions

View File

@@ -1,6 +1,6 @@
Package: AMR
Version: 3.0.1.9048
Date: 2026-04-22
Version: 3.0.1.9050
Date: 2026-04-24
Title: Antimicrobial Resistance Data Analysis
Description: Functions to simplify and standardise antimicrobial resistance (AMR)
data analysis and to work with microbial and antimicrobial properties by

View File

@@ -1,4 +1,4 @@
# AMR 3.0.1.9048
# AMR 3.0.1.9050
### New
* Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations.
@@ -21,6 +21,7 @@
* Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values
### Fixes
* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message
* Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter
* Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved
* Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245)
@@ -34,6 +35,8 @@
* Fixed SIR and MIC coercion of combined values, e.g. `as.sir("<= 0.002; S") ` or `as.mic("S; 0.002")` (#252)
* Fixed translation of foreign languages in `sir_df()` (#272)
* Fixed BRMO classification by including bacterial complexes (#275)
* Fixed `as.sir()` for data frames silently deleting columns whose AB class was already `<sir>` when called a second time (re-running on already-converted data) (#278)
* Fixed `as.sir()` for data frames incorrectly treating metadata columns (e.g. `patient`, `ward`) as antibiotic columns when their names coincidentally matched an antibiotic code; column content is now validated against AMR data patterns before inclusion
### Updates
* Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265)

192
R/sir.R
View File

@@ -716,7 +716,7 @@ as.sir.disk <- function(x,
}
#' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used.
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
#' @export
as.sir.data.frame <- function(x,
@@ -852,7 +852,6 @@ as.sir.data.frame <- function(x,
i <- 0
ab_cols <- colnames(x)[vapply(FUN.VALUE = logical(1), x, function(y) {
i <<- i + 1
check <- is.mic(y) | is.disk(y)
ab <- colnames(x)[i]
if (!is.null(col_mo) && ab == col_mo) {
return(FALSE)
@@ -861,13 +860,30 @@ as.sir.data.frame <- function(x,
return(FALSE)
}
if (length(sel) == 0 || (length(sel) > 0 && ab %in% sel)) {
# columns already carrying an AMR class are always included
y_bak <- x.bak[, ab, drop = TRUE]
if (is.mic(y_bak) || is.disk(y_bak) || is.sir(y_bak)) {
return(TRUE)
}
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
if (is.na(ab_coerced) || (length(sel) > 0 & !ab %in% sel)) {
# not even a valid AB code
return(FALSE)
} else {
return(TRUE)
}
# Name matches an antibiotic; also verify column content resembles AMR
# data. This prevents false positives on metadata columns whose names
# happen to match a drug code (e.g. 'patient' -> OXY, 'ward' -> PRU).
# Note: all_valid_disks() is intentionally avoided here because it strips
# non-numeric characters (as.disk("Pt_1") == 1), accepting patient IDs.
y_char <- tryCatch(as.character(y), error = function(e) character(0))
y_valid <- y_char[!is.na(y_char) & nzchar(trimws(y_char))]
if (length(y_valid) == 0L) {
return(FALSE)
}
y_numeric <- suppressWarnings(as.numeric(y_valid))
all_valid_mics(y) ||
all(!is.na(y_numeric)) ||
any(y_valid %in% c("S", "SDD", "I", "R", "NI"))
} else {
return(FALSE)
}
@@ -883,7 +899,7 @@ as.sir.data.frame <- function(x,
types[vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.mic)] <- "mic"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_disks)] <- "disk"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_mics)] <- "mic"
types[types == "" & !vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.sir)] <- "sir"
types[types == ""] <- "sir"
if (any(types %in% c("mic", "disk"), na.rm = TRUE)) {
# now we need an mo column
stop_if(is.null(col_mo), "{.arg col_mo} must be set")
@@ -906,6 +922,21 @@ as.sir.data.frame <- function(x,
return(NULL)
}
)
if (!is.null(cl)) {
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
amr_loaded_on_workers <- tryCatch({
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
TRUE
}, error = function(e) FALSE)
if (!amr_loaded_on_workers) {
if (isTRUE(info)) {
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
}
parallel::stopCluster(cl)
cl <- NULL
}
}
if (is.null(cl)) {
n_cores <- 1
}
@@ -916,65 +947,93 @@ as.sir.data.frame <- function(x,
message_("Processing columns:", as_note = FALSE)
}
# In parallel mode suppress per-column messages: workers print simultaneously and
# their output would be interleaved on the console.
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
effective_info <- if (is_parallel_run) FALSE else info
run_as_sir_column <- function(i) {
# Always resolve AMR_env from the package namespace. This is essential for
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
# pre-existing log entries from earlier in the session when forking.
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
# In parallel mode each worker (fork or PSOCK) has its own copy of the
# history; record the current length so we capture only the new rows added
# by the as.sir() call below, not any pre-existing entries inherited at fork
# time or carried over from earlier as.sir() calls.
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
ab_col <- ab_cols[i]
out <- list(result = NULL, log = NULL)
if (types[i] == "mic") {
result <- x %pm>%
pm_pull(ab_col) %pm>%
as.character() %pm>%
as.mic() %pm>%
as.sir(
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
capped_mic_handling = capped_mic_handling,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = info,
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
result <- as.sir(
as.mic(as.character(x[, ab_col, drop = TRUE])),
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
capped_mic_handling = capped_mic_handling,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = effective_info,
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
out$result <- result
out$log <- AMR_env$sir_interpretation_history
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log
if (is_parallel_run) {
full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out)
} else if (types[i] == "disk") {
result <- x %pm>%
pm_pull(ab_col) %pm>%
as.character() %pm>%
as.disk() %pm>%
as.sir(
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = info,
is_data.frame = TRUE
)
result <- as.sir(
as.disk(as.character(x[, ab_col, drop = TRUE])),
mo = x_mo,
mo.bak = x[, col_mo, drop = TRUE],
ab = ab_col,
guideline = guideline,
uti = uti,
as_wt_nwt = as_wt_nwt,
add_intrinsic_resistance = add_intrinsic_resistance,
reference_data = reference_data,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
include_screening = include_screening,
include_PKPD = include_PKPD,
breakpoint_type = breakpoint_type,
host = host,
verbose = verbose,
info = effective_info,
is_data.frame = TRUE
)
out$result <- result
out$log <- AMR_env$sir_interpretation_history
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE]
if (is_parallel_run) {
full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out)
} else if (types[i] == "sir") {
ab <- ab_col
@@ -982,27 +1041,27 @@ as.sir.data.frame <- function(x,
show_message <- FALSE
if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
show_message <- TRUE
if (isTRUE(info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, info = info), ")... ",
ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ",
appendLF = FALSE,
as_note = FALSE
)
}
} else if (!is.sir(x.bak[, ab, drop = TRUE])) {
show_message <- TRUE
if (isTRUE(info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ",
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ",
appendLF = FALSE,
as_note = FALSE
)
}
}
result <- as.sir.default(x = as.character(x[, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(info)) {
result <- as.sir(as.character(x[, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(effective_info)) {
message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE)
}
out$result <- result
@@ -1025,8 +1084,9 @@ as.sir.data.frame <- function(x,
"x", "x.bak", "x_mo", "ab_cols", "types",
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
"breakpoint_type", "guideline", "host", "uti", "info", "verbose",
"col_mo", "AMR_env", "conserve_capped_values",
"breakpoint_type", "guideline", "host", "uti", "verbose",
"col_mo", "conserve_capped_values",
"effective_info", "is_parallel_run",
"run_as_sir_column"
), envir = environment())
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
@@ -1035,7 +1095,7 @@ as.sir.data.frame <- function(x,
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
}
if (isTRUE(info)) {
message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE)
message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
message_(as_note = FALSE)
message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.")
}

View File

@@ -406,40 +406,111 @@ test_that("test-sir.R", {
expect_equal(out3, as.sir(c("NWT", "WT", "NWT")))
expect_equal(out4, as.sir(c("NWT", "WT", "NWT")))
# Issue #278: re-running as.sir() on already-<sir> data must preserve columns
df_already_sir <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(c("1", "2", "4")),
GEN = sample(c("S", "I", "R"), 3, replace = TRUE),
stringsAsFactors = FALSE
)
first_pass <- suppressMessages(as.sir(df_already_sir, col_mo = "mo", info = FALSE))
second_pass <- suppressMessages(as.sir(first_pass, col_mo = "mo", info = FALSE))
expect_equal(ncol(first_pass), ncol(second_pass))
expect_true(is.sir(second_pass[["AMC"]]))
expect_true(is.sir(second_pass[["GEN"]]))
expect_identical(first_pass[["AMC"]], second_pass[["AMC"]])
expect_identical(first_pass[["GEN"]], second_pass[["GEN"]])
# Issue #278: metadata columns whose names coincidentally match antibiotic
# codes (e.g. 'patient' -> OXY, 'ward' -> PRU) must not be processed
df_meta <- data.frame(
mo = "B_ESCHR_COLI",
patient = paste0("Pt_", 1:20),
ward = rep(c("ICU", "Surgery", "Outpatient", "ED"), 5),
AMC = as.mic(rep(c("1", "2", "4", "8"), 5)),
stringsAsFactors = FALSE
)
df_meta_sir <- suppressMessages(as.sir(df_meta, col_mo = "mo", info = FALSE))
expect_true("patient" %in% colnames(df_meta_sir))
expect_true("ward" %in% colnames(df_meta_sir))
expect_false(is.sir(df_meta_sir[["patient"]]))
expect_false(is.sir(df_meta_sir[["ward"]]))
expect_true(is.sir(df_meta_sir[["AMC"]]))
# Parallel computing ----------------------------------------------------
# Tests must pass even when only 1 core is available; parallel = TRUE then
# silently falls back to sequential, but results must still be identical.
# MB 29 Apr 2025: I have run the code of AVC, PEI, Canada (dataset of 2854x65), and compared it like this:
set.seed(42)
n_par <- 200
df_par <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n_par, TRUE)),
GEN = as.mic(sample(c("0.5", "1", "2", "4", "8", "16", "32", "64"), n_par, TRUE)),
CIP = as.mic(sample(c("0.001", "0.002", "0.004", "0.008", "0.016", "0.032"), n_par, TRUE)),
PEN = sample(c("S", "I", "R", NA_character_), n_par, TRUE),
stringsAsFactors = FALSE
)
# system.time({
# data_2022_2023_SIR_parallel <- data_2022_2023_clean |>
# as.sir(amikacin:tiamulin,
# col_mo = "mo",
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI",
# parallel = TRUE)
# })
# # user system elapsed
# # 271.424 2.767 45.762
#
# history_parallel <- sir_interpretation_history(clean = TRUE)
#
# system.time({
# data_2022_2023_SIR <- data_2022_2023_clean |>
# as.sir(amikacin:tiamulin,
# col_mo = "mo",
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI")
# })
# # user system elapsed
# # 120.637 5.406 128.835
# history <- sir_interpretation_history()
# clear any existing history before comparing
sir_interpretation_history(clean = TRUE)
sir_seq <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE))
log_seq <- sir_interpretation_history(clean = TRUE)
sir_par <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
log_par <- sir_interpretation_history(clean = TRUE)
# and then got this:
# identical(history[, -1], history_parallel[, -1])
#> [1] TRUE
# 1. parallel = TRUE gives identical SIR results to sequential
expect_identical(sir_seq[["AMC"]], sir_par[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_par[["GEN"]])
expect_identical(sir_seq[["CIP"]], sir_par[["CIP"]])
expect_identical(sir_seq[["PEN"]], sir_par[["PEN"]])
# so parallel on Apple M2 is 2.8x faster, with identical history -> GREAT!
# 2. same number of log rows as sequential
expect_equal(nrow(log_seq), nrow(log_par))
# 3. pre-existing log entries must not be duplicated
# run sequential once to populate the history, then run parallel and
# verify the new parallel run adds exactly as many rows as sequential
sir_interpretation_history(clean = TRUE)
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE)) # populate history
pre_n <- nrow(sir_interpretation_history())
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
post_n <- nrow(sir_interpretation_history())
expect_equal(post_n - pre_n, nrow(log_seq)) # exactly one run's worth of new rows
sir_interpretation_history(clean = TRUE)
# 4. two sequential runs and two parallel runs yield identical results
sir_par2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_par[["AMC"]], sir_par2[["AMC"]])
expect_identical(sir_par[["GEN"]], sir_par2[["GEN"]])
# 5. max_cores = 1 gives same results as default sequential
sir_mc1 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 1L))
expect_identical(sir_seq[["AMC"]], sir_mc1[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc1[["GEN"]])
# 6. max_cores = 2 and max_cores = 3 give same results as sequential
sir_mc2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 2L))
sir_mc3 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 3L))
expect_identical(sir_seq[["AMC"]], sir_mc2[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc3[["GEN"]])
# 7. single-column data frame falls back silently to sequential
df_single <- df_par[, c("mo", "AMC")]
sir_single_seq <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE))
sir_single_par <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_single_seq[["AMC"]], sir_single_par[["AMC"]])
# 8. info = TRUE with parallel does not produce per-column worker messages
# (messages should only appear in the main process, not duplicated from workers)
msgs <- capture.output(
suppressWarnings(as.sir(df_par, col_mo = "mo", info = TRUE, parallel = TRUE)),
type = "message"
)
# each AB column name should appear at most once in all messages combined
for (ab_nm in c("AMC", "GEN", "CIP", "PEN")) {
n_mentions <- sum(grepl(ab_nm, msgs, fixed = TRUE))
expect_lte(n_mentions, 1L)
}
})

View File

@@ -0,0 +1,71 @@
# Benchmark: sequential vs parallel as.sir() across data-set sizes
#
# Run from the repo root with:
# Rscript tools/benchmark_parallel.R
# or from inside an R session:
# source("tools/benchmark_parallel.R")
#
# Requires ggplot2 for the output plot; uses devtools::load_all() so the
# package does not need to be installed.
devtools::load_all(".", quiet = TRUE)
sizes <- c(20, 200, 2000, 20000)
n_ab <- 6 # number of antibiotic columns
make_df <- function(n) {
set.seed(42)
mics <- lapply(seq_len(n_ab), function(j) {
as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n, TRUE))
})
names(mics) <- c("AMC", "GEN", "CIP", "TZP", "IPM", "MEM")
data.frame(mo = "B_ESCHR_COLI", mics, stringsAsFactors = FALSE)
}
results <- do.call(rbind, lapply(sizes, function(n) {
df <- make_df(n)
t_seq <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = FALSE))
)[["elapsed"]]
t_par <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = TRUE))
)[["elapsed"]]
message(sprintf("n = %6d seq = %.3fs par = %.3fs speedup = %.1fx",
n, t_seq, t_par, t_seq / t_par))
data.frame(n = n, mode = c("sequential", "parallel"),
seconds = c(t_seq, t_par))
}))
if (requireNamespace("ggplot2", quietly = TRUE)) {
p <- ggplot2::ggplot(results, ggplot2::aes(x = n, y = seconds,
colour = mode, group = mode)) +
ggplot2::geom_line(linewidth = 1) +
ggplot2::geom_point(size = 3) +
ggplot2::scale_x_log10(
breaks = sizes,
labels = format(sizes, big.mark = ",", scientific = FALSE)
) +
ggplot2::scale_colour_manual(
values = c(sequential = "#E05C5C", parallel = "#2E86AB")
) +
ggplot2::labs(
title = "as.sir() throughput: sequential vs parallel",
subtitle = sprintf("%d antibiotic columns, E. coli, EUCAST 2025", n_ab),
x = "Number of rows (log scale)",
y = "Wall-clock time (seconds)",
colour = NULL
) +
ggplot2::theme_minimal(base_size = 13) +
ggplot2::theme(legend.position = "top")
out_file <- "tools/benchmark_parallel.png"
ggplot2::ggsave(out_file, p, width = 7, height = 5, dpi = 150)
message("Plot saved to ", out_file)
} else {
message("Install ggplot2 to get a plot; raw results:")
print(results)
}