1
0
mirror of https://github.com/msberends/AMR.git synced 2026-05-31 13:41:42 +02:00

3 Commits

Author SHA1 Message Date
Claude
6ece73cb22 Fix as.sir() data.frame: preserve already-<sir> columns, exclude metadata
Issue #278: two related bugs in the column-detection / type-assignment pipeline.

Bug 1 – already-<sir> columns deleted on re-run
  Line 886 excluded already-sir columns from the type assignment (they
  stayed type "") causing the result loop to do x[,col] <- NULL, deleting
  them.  Fix: drop the !is.sir() guard so all untyped columns fall through
  to type "sir" and are re-processed correctly.

Bug 2 – metadata columns treated as antibiotics
  as.ab("patient") -> OXY, as.ab("ward") -> PRU.  The column detector
  accepted any column whose name matched an antibiotic code, regardless of
  content.  Fix: for name-matched columns that do not already carry an AMR
  class, also verify content looks like AMR data (all_valid_mics, all-
  numeric, or any SIR-like string).  all_valid_disks() is intentionally
  avoided here because it strips letters from strings (as.disk("Pt_1")==1).

Also adds tools/benchmark_parallel.R: a standalone script that times
sequential vs parallel as.sir() across n=20/200/2000/20000 rows and
saves a ggplot2 PNG to tools/benchmark_parallel.png.

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 21:30:21 +00:00
Claude
ce79dd1f75 Add parallel computing tests to test-sir.R
Eight targeted tests verify correctness of the parallel as.sir() path:
identical SIR output vs sequential, matching log row counts, no
pre-existing history duplication, reproducibility across runs, results
consistency across max_cores values, single-column fallback, and no
per-column worker messages leaking when info = TRUE. All pass when only
1 core is available (parallel silently falls back to sequential).

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 20:42:27 +00:00
Claude
4ad3812e13 Fix parallel computing in as.sir.data.frame
Six bugs in parallel = TRUE mode:

1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every
   exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR))
   with a graceful fallback to sequential when the package cannot be loaded
   (e.g. dev-only load_all() environments).

2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the
   worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale
   copy, so the captured log was always wrong. Fixed by resolving AMR_env
   dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the
   worker function, and removing AMR_env from clusterExport.

3. In the fork-based (mclapply) path each worker inherited the parent's full
   sir_interpretation_history. Capturing the whole log then combining across
   workers duplicated every pre-existing entry. Fixed by recording the log
   row count before the as.sir() call and slicing only the new rows
   afterwards.

4. run_as_sir_column used non-exported internals (%pm>%, pm_pull,
   as.sir.default) that are inaccessible on PSOCK workers after library(AMR).
   Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE]))
   and as.disk(...) calls, and changed as.sir.default() to as.sir() which
   dispatches correctly via S3.

5. With info = TRUE, worker forks printed per-column progress messages
   simultaneously, producing garbled interleaved console output. Per-column
   messages are now suppressed inside workers (effective_info = FALSE) while
   the outer "Running in parallel" / "DONE" messages still appear.

6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was
   parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  .

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
2026-04-24 15:31:40 +00:00
5 changed files with 304 additions and 99 deletions

View File

@@ -1,6 +1,6 @@
Package: AMR Package: AMR
Version: 3.0.1.9048 Version: 3.0.1.9050
Date: 2026-04-22 Date: 2026-04-24
Title: Antimicrobial Resistance Data Analysis Title: Antimicrobial Resistance Data Analysis
Description: Functions to simplify and standardise antimicrobial resistance (AMR) Description: Functions to simplify and standardise antimicrobial resistance (AMR)
data analysis and to work with microbial and antimicrobial properties by data analysis and to work with microbial and antimicrobial properties by

View File

@@ -1,4 +1,4 @@
# AMR 3.0.1.9048 # AMR 3.0.1.9050
### New ### New
* Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations. * Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations.
@@ -21,6 +21,7 @@
* Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values * Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values
### Fixes ### Fixes
* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message
* Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter * Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter
* Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved * Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved
* Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245) * Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245)
@@ -34,6 +35,8 @@
* Fixed SIR and MIC coercion of combined values, e.g. `as.sir("<= 0.002; S") ` or `as.mic("S; 0.002")` (#252) * Fixed SIR and MIC coercion of combined values, e.g. `as.sir("<= 0.002; S") ` or `as.mic("S; 0.002")` (#252)
* Fixed translation of foreign languages in `sir_df()` (#272) * Fixed translation of foreign languages in `sir_df()` (#272)
* Fixed BRMO classification by including bacterial complexes (#275) * Fixed BRMO classification by including bacterial complexes (#275)
* Fixed `as.sir()` for data frames silently deleting columns whose AB class was already `<sir>` when called a second time (re-running on already-converted data) (#278)
* Fixed `as.sir()` for data frames incorrectly treating metadata columns (e.g. `patient`, `ward`) as antibiotic columns when their names coincidentally matched an antibiotic code; column content is now validated against AMR data patterns before inclusion
### Updates ### Updates
* Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265) * Extensive `cli` integration for better message handling and clickable links in messages and warnings (#191, #265)

192
R/sir.R
View File

@@ -716,7 +716,7 @@ as.sir.disk <- function(x,
} }
#' @rdname as.sir #' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used. #' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. #' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
#' @export #' @export
as.sir.data.frame <- function(x, as.sir.data.frame <- function(x,
@@ -852,7 +852,6 @@ as.sir.data.frame <- function(x,
i <- 0 i <- 0
ab_cols <- colnames(x)[vapply(FUN.VALUE = logical(1), x, function(y) { ab_cols <- colnames(x)[vapply(FUN.VALUE = logical(1), x, function(y) {
i <<- i + 1 i <<- i + 1
check <- is.mic(y) | is.disk(y)
ab <- colnames(x)[i] ab <- colnames(x)[i]
if (!is.null(col_mo) && ab == col_mo) { if (!is.null(col_mo) && ab == col_mo) {
return(FALSE) return(FALSE)
@@ -861,13 +860,30 @@ as.sir.data.frame <- function(x,
return(FALSE) return(FALSE)
} }
if (length(sel) == 0 || (length(sel) > 0 && ab %in% sel)) { if (length(sel) == 0 || (length(sel) > 0 && ab %in% sel)) {
# columns already carrying an AMR class are always included
y_bak <- x.bak[, ab, drop = TRUE]
if (is.mic(y_bak) || is.disk(y_bak) || is.sir(y_bak)) {
return(TRUE)
}
ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE)) ab_coerced <- suppressWarnings(as.ab(ab, info = FALSE))
if (is.na(ab_coerced) || (length(sel) > 0 & !ab %in% sel)) { if (is.na(ab_coerced) || (length(sel) > 0 & !ab %in% sel)) {
# not even a valid AB code # not even a valid AB code
return(FALSE) return(FALSE)
} else {
return(TRUE)
} }
# Name matches an antibiotic; also verify column content resembles AMR
# data. This prevents false positives on metadata columns whose names
# happen to match a drug code (e.g. 'patient' -> OXY, 'ward' -> PRU).
# Note: all_valid_disks() is intentionally avoided here because it strips
# non-numeric characters (as.disk("Pt_1") == 1), accepting patient IDs.
y_char <- tryCatch(as.character(y), error = function(e) character(0))
y_valid <- y_char[!is.na(y_char) & nzchar(trimws(y_char))]
if (length(y_valid) == 0L) {
return(FALSE)
}
y_numeric <- suppressWarnings(as.numeric(y_valid))
all_valid_mics(y) ||
all(!is.na(y_numeric)) ||
any(y_valid %in% c("S", "SDD", "I", "R", "NI"))
} else { } else {
return(FALSE) return(FALSE)
} }
@@ -883,7 +899,7 @@ as.sir.data.frame <- function(x,
types[vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.mic)] <- "mic" types[vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.mic)] <- "mic"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_disks)] <- "disk" types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_disks)] <- "disk"
types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_mics)] <- "mic" types[types == "" & vapply(FUN.VALUE = logical(1), x[, ab_cols, drop = FALSE], all_valid_mics)] <- "mic"
types[types == "" & !vapply(FUN.VALUE = logical(1), x.bak[, ab_cols, drop = FALSE], is.sir)] <- "sir" types[types == ""] <- "sir"
if (any(types %in% c("mic", "disk"), na.rm = TRUE)) { if (any(types %in% c("mic", "disk"), na.rm = TRUE)) {
# now we need an mo column # now we need an mo column
stop_if(is.null(col_mo), "{.arg col_mo} must be set") stop_if(is.null(col_mo), "{.arg col_mo} must be set")
@@ -906,6 +922,21 @@ as.sir.data.frame <- function(x,
return(NULL) return(NULL)
} }
) )
if (!is.null(cl)) {
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
amr_loaded_on_workers <- tryCatch({
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
TRUE
}, error = function(e) FALSE)
if (!amr_loaded_on_workers) {
if (isTRUE(info)) {
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
}
parallel::stopCluster(cl)
cl <- NULL
}
}
if (is.null(cl)) { if (is.null(cl)) {
n_cores <- 1 n_cores <- 1
} }
@@ -916,65 +947,93 @@ as.sir.data.frame <- function(x,
message_("Processing columns:", as_note = FALSE) message_("Processing columns:", as_note = FALSE)
} }
# In parallel mode suppress per-column messages: workers print simultaneously and
# their output would be interleaved on the console.
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
effective_info <- if (is_parallel_run) FALSE else info
run_as_sir_column <- function(i) { run_as_sir_column <- function(i) {
# Always resolve AMR_env from the package namespace. This is essential for
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
# pre-existing log entries from earlier in the session when forking.
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
# In parallel mode each worker (fork or PSOCK) has its own copy of the
# history; record the current length so we capture only the new rows added
# by the as.sir() call below, not any pre-existing entries inherited at fork
# time or carried over from earlier as.sir() calls.
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
ab_col <- ab_cols[i] ab_col <- ab_cols[i]
out <- list(result = NULL, log = NULL) out <- list(result = NULL, log = NULL)
if (types[i] == "mic") { if (types[i] == "mic") {
result <- x %pm>% result <- as.sir(
pm_pull(ab_col) %pm>% as.mic(as.character(x[, ab_col, drop = TRUE])),
as.character() %pm>% mo = x_mo,
as.mic() %pm>% mo.bak = x[, col_mo, drop = TRUE],
as.sir( ab = ab_col,
mo = x_mo, guideline = guideline,
mo.bak = x[, col_mo, drop = TRUE], uti = uti,
ab = ab_col, capped_mic_handling = capped_mic_handling,
guideline = guideline, as_wt_nwt = as_wt_nwt,
uti = uti, add_intrinsic_resistance = add_intrinsic_resistance,
capped_mic_handling = capped_mic_handling, reference_data = reference_data,
as_wt_nwt = as_wt_nwt, substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
add_intrinsic_resistance = add_intrinsic_resistance, include_screening = include_screening,
reference_data = reference_data, include_PKPD = include_PKPD,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, breakpoint_type = breakpoint_type,
include_screening = include_screening, host = host,
include_PKPD = include_PKPD, verbose = verbose,
breakpoint_type = breakpoint_type, info = effective_info,
host = host, conserve_capped_values = conserve_capped_values,
verbose = verbose, is_data.frame = TRUE
info = info, )
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
out$result <- result out$result <- result
out$log <- AMR_env$sir_interpretation_history if (is_parallel_run) {
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out) return(out)
} else if (types[i] == "disk") { } else if (types[i] == "disk") {
result <- x %pm>% result <- as.sir(
pm_pull(ab_col) %pm>% as.disk(as.character(x[, ab_col, drop = TRUE])),
as.character() %pm>% mo = x_mo,
as.disk() %pm>% mo.bak = x[, col_mo, drop = TRUE],
as.sir( ab = ab_col,
mo = x_mo, guideline = guideline,
mo.bak = x[, col_mo, drop = TRUE], uti = uti,
ab = ab_col, as_wt_nwt = as_wt_nwt,
guideline = guideline, add_intrinsic_resistance = add_intrinsic_resistance,
uti = uti, reference_data = reference_data,
as_wt_nwt = as_wt_nwt, substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
add_intrinsic_resistance = add_intrinsic_resistance, include_screening = include_screening,
reference_data = reference_data, include_PKPD = include_PKPD,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, breakpoint_type = breakpoint_type,
include_screening = include_screening, host = host,
include_PKPD = include_PKPD, verbose = verbose,
breakpoint_type = breakpoint_type, info = effective_info,
host = host, is_data.frame = TRUE
verbose = verbose, )
info = info,
is_data.frame = TRUE
)
out$result <- result out$result <- result
out$log <- AMR_env$sir_interpretation_history if (is_parallel_run) {
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out) return(out)
} else if (types[i] == "sir") { } else if (types[i] == "sir") {
ab <- ab_col ab <- ab_col
@@ -982,27 +1041,27 @@ as.sir.data.frame <- function(x,
show_message <- FALSE show_message <- FALSE
if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
show_message <- TRUE show_message <- TRUE
if (isTRUE(info)) { if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, info = info), ")... ", ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ",
appendLF = FALSE, appendLF = FALSE,
as_note = FALSE as_note = FALSE
) )
} }
} else if (!is.sir(x.bak[, ab, drop = TRUE])) { } else if (!is.sir(x.bak[, ab, drop = TRUE])) {
show_message <- TRUE show_message <- TRUE
if (isTRUE(info)) { if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ", ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ",
appendLF = FALSE, appendLF = FALSE,
as_note = FALSE as_note = FALSE
) )
} }
} }
result <- as.sir.default(x = as.character(x[, ab, drop = TRUE])) result <- as.sir(as.character(x[, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(info)) { if (show_message == TRUE && isTRUE(effective_info)) {
message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE) message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE)
} }
out$result <- result out$result <- result
@@ -1025,8 +1084,9 @@ as.sir.data.frame <- function(x,
"x", "x.bak", "x_mo", "ab_cols", "types", "x", "x.bak", "x_mo", "ab_cols", "types",
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance", "capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD", "reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
"breakpoint_type", "guideline", "host", "uti", "info", "verbose", "breakpoint_type", "guideline", "host", "uti", "verbose",
"col_mo", "AMR_env", "conserve_capped_values", "col_mo", "conserve_capped_values",
"effective_info", "is_parallel_run",
"run_as_sir_column" "run_as_sir_column"
), envir = environment()) ), envir = environment())
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column) result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
@@ -1035,7 +1095,7 @@ as.sir.data.frame <- function(x,
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores) result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
} }
if (isTRUE(info)) { if (isTRUE(info)) {
message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE) message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
message_(as_note = FALSE) message_(as_note = FALSE)
message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.") message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.")
} }

View File

@@ -406,40 +406,111 @@ test_that("test-sir.R", {
expect_equal(out3, as.sir(c("NWT", "WT", "NWT"))) expect_equal(out3, as.sir(c("NWT", "WT", "NWT")))
expect_equal(out4, as.sir(c("NWT", "WT", "NWT"))) expect_equal(out4, as.sir(c("NWT", "WT", "NWT")))
# Issue #278: re-running as.sir() on already-<sir> data must preserve columns
df_already_sir <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(c("1", "2", "4")),
GEN = sample(c("S", "I", "R"), 3, replace = TRUE),
stringsAsFactors = FALSE
)
first_pass <- suppressMessages(as.sir(df_already_sir, col_mo = "mo", info = FALSE))
second_pass <- suppressMessages(as.sir(first_pass, col_mo = "mo", info = FALSE))
expect_equal(ncol(first_pass), ncol(second_pass))
expect_true(is.sir(second_pass[["AMC"]]))
expect_true(is.sir(second_pass[["GEN"]]))
expect_identical(first_pass[["AMC"]], second_pass[["AMC"]])
expect_identical(first_pass[["GEN"]], second_pass[["GEN"]])
# Issue #278: metadata columns whose names coincidentally match antibiotic
# codes (e.g. 'patient' -> OXY, 'ward' -> PRU) must not be processed
df_meta <- data.frame(
mo = "B_ESCHR_COLI",
patient = paste0("Pt_", 1:20),
ward = rep(c("ICU", "Surgery", "Outpatient", "ED"), 5),
AMC = as.mic(rep(c("1", "2", "4", "8"), 5)),
stringsAsFactors = FALSE
)
df_meta_sir <- suppressMessages(as.sir(df_meta, col_mo = "mo", info = FALSE))
expect_true("patient" %in% colnames(df_meta_sir))
expect_true("ward" %in% colnames(df_meta_sir))
expect_false(is.sir(df_meta_sir[["patient"]]))
expect_false(is.sir(df_meta_sir[["ward"]]))
expect_true(is.sir(df_meta_sir[["AMC"]]))
# Parallel computing ---------------------------------------------------- # Parallel computing ----------------------------------------------------
# Tests must pass even when only 1 core is available; parallel = TRUE then
# silently falls back to sequential, but results must still be identical.
# MB 29 Apr 2025: I have run the code of AVC, PEI, Canada (dataset of 2854x65), and compared it like this: set.seed(42)
n_par <- 200
df_par <- data.frame(
mo = "B_ESCHR_COLI",
AMC = as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n_par, TRUE)),
GEN = as.mic(sample(c("0.5", "1", "2", "4", "8", "16", "32", "64"), n_par, TRUE)),
CIP = as.mic(sample(c("0.001", "0.002", "0.004", "0.008", "0.016", "0.032"), n_par, TRUE)),
PEN = sample(c("S", "I", "R", NA_character_), n_par, TRUE),
stringsAsFactors = FALSE
)
# system.time({ # clear any existing history before comparing
# data_2022_2023_SIR_parallel <- data_2022_2023_clean |> sir_interpretation_history(clean = TRUE)
# as.sir(amikacin:tiamulin, sir_seq <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE))
# col_mo = "mo", log_seq <- sir_interpretation_history(clean = TRUE)
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI",
# parallel = TRUE)
# })
# # user system elapsed
# # 271.424 2.767 45.762
#
# history_parallel <- sir_interpretation_history(clean = TRUE)
#
# system.time({
# data_2022_2023_SIR <- data_2022_2023_clean |>
# as.sir(amikacin:tiamulin,
# col_mo = "mo",
# guideline = "CLSI 2024",
# host = "Species",
# uti = "isUTI")
# })
# # user system elapsed
# # 120.637 5.406 128.835
# history <- sir_interpretation_history()
sir_par <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
log_par <- sir_interpretation_history(clean = TRUE)
# and then got this: # 1. parallel = TRUE gives identical SIR results to sequential
# identical(history[, -1], history_parallel[, -1]) expect_identical(sir_seq[["AMC"]], sir_par[["AMC"]])
#> [1] TRUE expect_identical(sir_seq[["GEN"]], sir_par[["GEN"]])
expect_identical(sir_seq[["CIP"]], sir_par[["CIP"]])
expect_identical(sir_seq[["PEN"]], sir_par[["PEN"]])
# so parallel on Apple M2 is 2.8x faster, with identical history -> GREAT! # 2. same number of log rows as sequential
expect_equal(nrow(log_seq), nrow(log_par))
# 3. pre-existing log entries must not be duplicated
# run sequential once to populate the history, then run parallel and
# verify the new parallel run adds exactly as many rows as sequential
sir_interpretation_history(clean = TRUE)
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE)) # populate history
pre_n <- nrow(sir_interpretation_history())
suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
post_n <- nrow(sir_interpretation_history())
expect_equal(post_n - pre_n, nrow(log_seq)) # exactly one run's worth of new rows
sir_interpretation_history(clean = TRUE)
# 4. two sequential runs and two parallel runs yield identical results
sir_par2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_par[["AMC"]], sir_par2[["AMC"]])
expect_identical(sir_par[["GEN"]], sir_par2[["GEN"]])
# 5. max_cores = 1 gives same results as default sequential
sir_mc1 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 1L))
expect_identical(sir_seq[["AMC"]], sir_mc1[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc1[["GEN"]])
# 6. max_cores = 2 and max_cores = 3 give same results as sequential
sir_mc2 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 2L))
sir_mc3 <- suppressMessages(as.sir(df_par, col_mo = "mo", info = FALSE, parallel = TRUE, max_cores = 3L))
expect_identical(sir_seq[["AMC"]], sir_mc2[["AMC"]])
expect_identical(sir_seq[["GEN"]], sir_mc3[["GEN"]])
# 7. single-column data frame falls back silently to sequential
df_single <- df_par[, c("mo", "AMC")]
sir_single_seq <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE))
sir_single_par <- suppressMessages(as.sir(df_single, col_mo = "mo", info = FALSE, parallel = TRUE))
expect_identical(sir_single_seq[["AMC"]], sir_single_par[["AMC"]])
# 8. info = TRUE with parallel does not produce per-column worker messages
# (messages should only appear in the main process, not duplicated from workers)
msgs <- capture.output(
suppressWarnings(as.sir(df_par, col_mo = "mo", info = TRUE, parallel = TRUE)),
type = "message"
)
# each AB column name should appear at most once in all messages combined
for (ab_nm in c("AMC", "GEN", "CIP", "PEN")) {
n_mentions <- sum(grepl(ab_nm, msgs, fixed = TRUE))
expect_lte(n_mentions, 1L)
}
}) })

View File

@@ -0,0 +1,71 @@
# Benchmark: sequential vs parallel as.sir() across data-set sizes
#
# Run from the repo root with:
# Rscript tools/benchmark_parallel.R
# or from inside an R session:
# source("tools/benchmark_parallel.R")
#
# Requires ggplot2 for the output plot; uses devtools::load_all() so the
# package does not need to be installed.
devtools::load_all(".", quiet = TRUE)
sizes <- c(20, 200, 2000, 20000)
n_ab <- 6 # number of antibiotic columns
make_df <- function(n) {
set.seed(42)
mics <- lapply(seq_len(n_ab), function(j) {
as.mic(sample(c("0.25", "0.5", "1", "2", "4", "8", "16", "32"), n, TRUE))
})
names(mics) <- c("AMC", "GEN", "CIP", "TZP", "IPM", "MEM")
data.frame(mo = "B_ESCHR_COLI", mics, stringsAsFactors = FALSE)
}
results <- do.call(rbind, lapply(sizes, function(n) {
df <- make_df(n)
t_seq <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = FALSE))
)[["elapsed"]]
t_par <- system.time(
suppressMessages(as.sir(df, col_mo = "mo", info = FALSE, parallel = TRUE))
)[["elapsed"]]
message(sprintf("n = %6d seq = %.3fs par = %.3fs speedup = %.1fx",
n, t_seq, t_par, t_seq / t_par))
data.frame(n = n, mode = c("sequential", "parallel"),
seconds = c(t_seq, t_par))
}))
if (requireNamespace("ggplot2", quietly = TRUE)) {
p <- ggplot2::ggplot(results, ggplot2::aes(x = n, y = seconds,
colour = mode, group = mode)) +
ggplot2::geom_line(linewidth = 1) +
ggplot2::geom_point(size = 3) +
ggplot2::scale_x_log10(
breaks = sizes,
labels = format(sizes, big.mark = ",", scientific = FALSE)
) +
ggplot2::scale_colour_manual(
values = c(sequential = "#E05C5C", parallel = "#2E86AB")
) +
ggplot2::labs(
title = "as.sir() throughput: sequential vs parallel",
subtitle = sprintf("%d antibiotic columns, E. coli, EUCAST 2025", n_ab),
x = "Number of rows (log scale)",
y = "Wall-clock time (seconds)",
colour = NULL
) +
ggplot2::theme_minimal(base_size = 13) +
ggplot2::theme(legend.position = "top")
out_file <- "tools/benchmark_parallel.png"
ggplot2::ggsave(out_file, p, width = 7, height = 5, dpi = 150)
message("Plot saved to ", out_file)
} else {
message("Install ggplot2 to get a plot; raw results:")
print(results)
}