1
0
mirror of https://github.com/msberends/AMR.git synced 2026-05-31 13:41:42 +02:00

Fix parallel computing in as.sir.data.frame

Six bugs in parallel = TRUE mode:

1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every
   exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR))
   with a graceful fallback to sequential when the package cannot be loaded
   (e.g. dev-only load_all() environments).

2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the
   worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale
   copy, so the captured log was always wrong. Fixed by resolving AMR_env
   dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the
   worker function, and removing AMR_env from clusterExport.

3. In the fork-based (mclapply) path each worker inherited the parent's full
   sir_interpretation_history. Capturing the whole log then combining across
   workers duplicated every pre-existing entry. Fixed by recording the log
   row count before the as.sir() call and slicing only the new rows
   afterwards.

4. run_as_sir_column used non-exported internals (%pm>%, pm_pull,
   as.sir.default) that are inaccessible on PSOCK workers after library(AMR).
   Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE]))
   and as.disk(...) calls, and changed as.sir.default() to as.sir() which
   dispatches correctly via S3.

5. With info = TRUE, worker forks printed per-column progress messages
   simultaneously, producing garbled interleaved console output. Per-column
   messages are now suppressed inside workers (effective_info = FALSE) while
   the outer "Running in parallel" / "DONE" messages still appear.

6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was
   parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  .

https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR
This commit is contained in:
Claude
2026-04-24 15:31:40 +00:00
parent e7780b6d5f
commit 4ad3812e13
3 changed files with 110 additions and 65 deletions

View File

@@ -1,6 +1,6 @@
Package: AMR Package: AMR
Version: 3.0.1.9048 Version: 3.0.1.9050
Date: 2026-04-22 Date: 2026-04-24
Title: Antimicrobial Resistance Data Analysis Title: Antimicrobial Resistance Data Analysis
Description: Functions to simplify and standardise antimicrobial resistance (AMR) Description: Functions to simplify and standardise antimicrobial resistance (AMR)
data analysis and to work with microbial and antimicrobial properties by data analysis and to work with microbial and antimicrobial properties by

View File

@@ -1,4 +1,4 @@
# AMR 3.0.1.9048 # AMR 3.0.1.9050
### New ### New
* Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations. * Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations.
@@ -21,6 +21,7 @@
* Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values * Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values
### Fixes ### Fixes
* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message
* Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter * Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter
* Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved * Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved
* Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245) * Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245)

168
R/sir.R
View File

@@ -716,7 +716,7 @@ as.sir.disk <- function(x,
} }
#' @rdname as.sir #' @rdname as.sir
#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used. #' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows.
#' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. #' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise.
#' @export #' @export
as.sir.data.frame <- function(x, as.sir.data.frame <- function(x,
@@ -906,6 +906,21 @@ as.sir.data.frame <- function(x,
return(NULL) return(NULL)
} }
) )
if (!is.null(cl)) {
# Each PSOCK worker is a fresh R session — the AMR package must be loaded there
# so all exported functions (as.sir, as.mic, as.disk, ...) are available.
amr_loaded_on_workers <- tryCatch({
parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE))
TRUE
}, error = function(e) FALSE)
if (!amr_loaded_on_workers) {
if (isTRUE(info)) {
message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.")
}
parallel::stopCluster(cl)
cl <- NULL
}
}
if (is.null(cl)) { if (is.null(cl)) {
n_cores <- 1 n_cores <- 1
} }
@@ -916,65 +931,93 @@ as.sir.data.frame <- function(x,
message_("Processing columns:", as_note = FALSE) message_("Processing columns:", as_note = FALSE)
} }
# In parallel mode suppress per-column messages: workers print simultaneously and
# their output would be interleaved on the console.
is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1
effective_info <- if (is_parallel_run) FALSE else info
run_as_sir_column <- function(i) { run_as_sir_column <- function(i) {
# Always resolve AMR_env from the package namespace. This is essential for
# PSOCK workers (where the closure-captured AMR_env is a stale serialised copy
# while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing
# pre-existing log entries from earlier in the session when forking.
.amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE)
# In parallel mode each worker (fork or PSOCK) has its own copy of the
# history; record the current length so we capture only the new rows added
# by the as.sir() call below, not any pre-existing entries inherited at fork
# time or carried over from earlier as.sir() calls.
if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history)
ab_col <- ab_cols[i] ab_col <- ab_cols[i]
out <- list(result = NULL, log = NULL) out <- list(result = NULL, log = NULL)
if (types[i] == "mic") { if (types[i] == "mic") {
result <- x %pm>% result <- as.sir(
pm_pull(ab_col) %pm>% as.mic(as.character(x[, ab_col, drop = TRUE])),
as.character() %pm>% mo = x_mo,
as.mic() %pm>% mo.bak = x[, col_mo, drop = TRUE],
as.sir( ab = ab_col,
mo = x_mo, guideline = guideline,
mo.bak = x[, col_mo, drop = TRUE], uti = uti,
ab = ab_col, capped_mic_handling = capped_mic_handling,
guideline = guideline, as_wt_nwt = as_wt_nwt,
uti = uti, add_intrinsic_resistance = add_intrinsic_resistance,
capped_mic_handling = capped_mic_handling, reference_data = reference_data,
as_wt_nwt = as_wt_nwt, substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
add_intrinsic_resistance = add_intrinsic_resistance, include_screening = include_screening,
reference_data = reference_data, include_PKPD = include_PKPD,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, breakpoint_type = breakpoint_type,
include_screening = include_screening, host = host,
include_PKPD = include_PKPD, verbose = verbose,
breakpoint_type = breakpoint_type, info = effective_info,
host = host, conserve_capped_values = conserve_capped_values,
verbose = verbose, is_data.frame = TRUE
info = info, )
conserve_capped_values = conserve_capped_values,
is_data.frame = TRUE
)
out$result <- result out$result <- result
out$log <- AMR_env$sir_interpretation_history if (is_parallel_run) {
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out) return(out)
} else if (types[i] == "disk") { } else if (types[i] == "disk") {
result <- x %pm>% result <- as.sir(
pm_pull(ab_col) %pm>% as.disk(as.character(x[, ab_col, drop = TRUE])),
as.character() %pm>% mo = x_mo,
as.disk() %pm>% mo.bak = x[, col_mo, drop = TRUE],
as.sir( ab = ab_col,
mo = x_mo, guideline = guideline,
mo.bak = x[, col_mo, drop = TRUE], uti = uti,
ab = ab_col, as_wt_nwt = as_wt_nwt,
guideline = guideline, add_intrinsic_resistance = add_intrinsic_resistance,
uti = uti, reference_data = reference_data,
as_wt_nwt = as_wt_nwt, substitute_missing_r_breakpoint = substitute_missing_r_breakpoint,
add_intrinsic_resistance = add_intrinsic_resistance, include_screening = include_screening,
reference_data = reference_data, include_PKPD = include_PKPD,
substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, breakpoint_type = breakpoint_type,
include_screening = include_screening, host = host,
include_PKPD = include_PKPD, verbose = verbose,
breakpoint_type = breakpoint_type, info = effective_info,
host = host, is_data.frame = TRUE
verbose = verbose, )
info = info,
is_data.frame = TRUE
)
out$result <- result out$result <- result
out$log <- AMR_env$sir_interpretation_history if (is_parallel_run) {
AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] full_log <- .amr_env$sir_interpretation_history
out$log <- if (pre_log_n < NROW(full_log)) {
full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE]
} else {
full_log[0L, , drop = FALSE]
}
} else {
out$log <- .amr_env$sir_interpretation_history
.amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE]
}
return(out) return(out)
} else if (types[i] == "sir") { } else if (types[i] == "sir") {
ab <- ab_col ab <- ab_col
@@ -982,27 +1025,27 @@ as.sir.data.frame <- function(x,
show_message <- FALSE show_message <- FALSE
if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) {
show_message <- TRUE show_message <- TRUE
if (isTRUE(info)) { if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, info = info), ")... ", ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ",
appendLF = FALSE, appendLF = FALSE,
as_note = FALSE as_note = FALSE
) )
} }
} else if (!is.sir(x.bak[, ab, drop = TRUE])) { } else if (!is.sir(x.bak[, ab, drop = TRUE])) {
show_message <- TRUE show_message <- TRUE
if (isTRUE(info)) { if (isTRUE(effective_info)) {
message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (",
ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""),
ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ", ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ",
appendLF = FALSE, appendLF = FALSE,
as_note = FALSE as_note = FALSE
) )
} }
} }
result <- as.sir.default(x = as.character(x[, ab, drop = TRUE])) result <- as.sir(as.character(x[, ab, drop = TRUE]))
if (show_message == TRUE && isTRUE(info)) { if (show_message == TRUE && isTRUE(effective_info)) {
message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE) message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE)
} }
out$result <- result out$result <- result
@@ -1025,8 +1068,9 @@ as.sir.data.frame <- function(x,
"x", "x.bak", "x_mo", "ab_cols", "types", "x", "x.bak", "x_mo", "ab_cols", "types",
"capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance", "capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance",
"reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD", "reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD",
"breakpoint_type", "guideline", "host", "uti", "info", "verbose", "breakpoint_type", "guideline", "host", "uti", "verbose",
"col_mo", "AMR_env", "conserve_capped_values", "col_mo", "conserve_capped_values",
"effective_info", "is_parallel_run",
"run_as_sir_column" "run_as_sir_column"
), envir = environment()) ), envir = environment())
result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column) result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column)
@@ -1035,7 +1079,7 @@ as.sir.data.frame <- function(x,
result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores) result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores)
} }
if (isTRUE(info)) { if (isTRUE(info)) {
message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE) message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE)
message_(as_note = FALSE) message_(as_note = FALSE)
message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.") message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.")
} }