From 4ad3812e13372df22fd3e1b522ce4cded605da89 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 15:31:40 +0000 Subject: [PATCH] Fix parallel computing in as.sir.data.frame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six bugs in parallel = TRUE mode: 1. PSOCK workers (Windows / R < 4.0) never had AMR loaded, so every exported/AMR function call failed. Added clusterEvalQ(cl, library(AMR)) with a graceful fallback to sequential when the package cannot be loaded (e.g. dev-only load_all() environments). 2. clusterExport'd AMR_env was a frozen serialised copy; as.sir() on the worker wrote to AMR:::AMR_env while run_as_sir_column read from the stale copy, so the captured log was always wrong. Fixed by resolving AMR_env dynamically via get("AMR_env", envir = asNamespace("AMR")) inside the worker function, and removing AMR_env from clusterExport. 3. In the fork-based (mclapply) path each worker inherited the parent's full sir_interpretation_history. Capturing the whole log then combining across workers duplicated every pre-existing entry. Fixed by recording the log row count before the as.sir() call and slicing only the new rows afterwards. 4. run_as_sir_column used non-exported internals (%pm>%, pm_pull, as.sir.default) that are inaccessible on PSOCK workers after library(AMR). Replaced pipe chains with direct as.mic(as.character(x[, col, drop=TRUE])) and as.disk(...) calls, and changed as.sir.default() to as.sir() which dispatches correctly via S3. 5. With info = TRUE, worker forks printed per-column progress messages simultaneously, producing garbled interleaved console output. Per-column messages are now suppressed inside workers (effective_info = FALSE) while the outer "Running in parallel" / "DONE" messages still appear. 6. Malformed Unicode escape \u00a (3 hex digits) in the "DONE" banner was parsed by R as U+00AD (soft hyphen) + "ONE"; corrected to  . https://claude.ai/code/session_012DXCXbZUC54Zij1z9bFiHR --- DESCRIPTION | 4 +- NEWS.md | 3 +- R/sir.R | 168 +++++++++++++++++++++++++++++++++------------------- 3 files changed, 110 insertions(+), 65 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index db313c43a..81618ee49 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: AMR -Version: 3.0.1.9048 -Date: 2026-04-22 +Version: 3.0.1.9050 +Date: 2026-04-24 Title: Antimicrobial Resistance Data Analysis Description: Functions to simplify and standardise antimicrobial resistance (AMR) data analysis and to work with microbial and antimicrobial properties by diff --git a/NEWS.md b/NEWS.md index c5c7ad602..e9b122393 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# AMR 3.0.1.9048 +# AMR 3.0.1.9050 ### New * Support for clinical breakpoints of 2026 of both CLSI and EUCAST, by adding all of their over 5,700 new clinical breakpoints to the `clinical_breakpoints` data set for usage in `as.sir()`. EUCAST 2026 is now the new default guideline for all MIC and disk diffusion interpretations. @@ -21,6 +21,7 @@ * Two new `NA` objects, `NA_ab_` and `NA_mo_`, analogous to base R's `NA_character_` and `NA_integer_`, for use in pipelines that require typed missing values ### Fixes +* Fixed multiple bugs in the `parallel = TRUE` mode of `as.sir()` for data frames: (1) PSOCK workers (Windows / R < 4.0) now correctly load the AMR package before processing, with a graceful fallback to sequential mode when the package cannot be loaded; (2) resolved stale-environment issue where the PSOCK path read a frozen copy of `AMR_env` instead of the live one, causing the wrong log entries to be captured; (3) fixed log-entry duplication in the fork-based path (`mclapply`) where pre-existing `sir_interpretation_history` rows were included in every worker's captured log; (4) removed use of non-exported internal functions (`%pm>%`, `pm_pull`, `as.sir.default`) from the worker closure, which made PSOCK workers fail; (5) suppressed per-column progress messages inside workers to prevent interleaved console output; (6) fixed a malformed Unicode escape `\u00a` (3 digits) in the "DONE" status message * Fixed a bug in `as.sir()` where values that were purely numeric (e.g., `"1"`) and matched the broad SIR-matching regex would be incorrectly stripped of all content by the Unicode letter filter * Fixed a bug in `as.mic()` where MIC values in scientific notation (e.g., `"1e-3"`) were incorrectly handled because the letter `e` was removed along with other Unicode letters; scientific notation `e` is now preserved * Fixed a bug in `as.ab()` where certain AB codes containing "PH" or "TH" (such as `ETH`, `MTH`, `PHE`, `PHN`, `STH`, `THA`, `THI1`) would incorrectly return `NA` when combined in a vector with any untranslatable value (#245) diff --git a/R/sir.R b/R/sir.R index 4020acf2e..c1c07b2c6 100755 --- a/R/sir.R +++ b/R/sir.R @@ -716,7 +716,7 @@ as.sir.disk <- function(x, } #' @rdname as.sir -#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. This requires no additional packages, as the used `parallel` package is part of base \R. On Windows and on \R < 4.0.0 [parallel::parLapply()] will be used, in all other cases the more efficient [parallel::mclapply()] will be used. +#' @param parallel A [logical] to indicate if parallel computing must be used, defaults to `FALSE`. The `parallel` package is part of base \R and no additional packages are required. On Unix/macOS with \R >= 4.0.0, [parallel::mclapply()] (fork-based) is used; on Windows and \R < 4.0.0, [parallel::parLapply()] with a PSOCK cluster is used (requires the AMR package to be installed, not just loaded via `devtools::load_all()`). Parallelism distributes columns across cores; it is most beneficial when there are many antibiotic columns and a large number of rows. #' @param max_cores Maximum number of cores to use if `parallel = TRUE`. Use a negative value to subtract that number from the available number of cores, e.g. a value of `-2` on an 8-core machine means that at most 6 cores will be used. Defaults to `-1`. There will never be used more cores than variables to analyse. The available number of cores are detected using [parallelly::availableCores()] if that package is installed, and base \R's [parallel::detectCores()] otherwise. #' @export as.sir.data.frame <- function(x, @@ -906,6 +906,21 @@ as.sir.data.frame <- function(x, return(NULL) } ) + if (!is.null(cl)) { + # Each PSOCK worker is a fresh R session — the AMR package must be loaded there + # so all exported functions (as.sir, as.mic, as.disk, ...) are available. + amr_loaded_on_workers <- tryCatch({ + parallel::clusterEvalQ(cl, library(AMR, quietly = TRUE)) + TRUE + }, error = function(e) FALSE) + if (!amr_loaded_on_workers) { + if (isTRUE(info)) { + message_("Could not load AMR on parallel workers (package may not be installed); falling back to single-core computation.") + } + parallel::stopCluster(cl) + cl <- NULL + } + } if (is.null(cl)) { n_cores <- 1 } @@ -916,65 +931,93 @@ as.sir.data.frame <- function(x, message_("Processing columns:", as_note = FALSE) } + # In parallel mode suppress per-column messages: workers print simultaneously and + # their output would be interleaved on the console. + is_parallel_run <- isTRUE(parallel) && n_cores > 1 && length(ab_cols) > 1 + effective_info <- if (is_parallel_run) FALSE else info + run_as_sir_column <- function(i) { + # Always resolve AMR_env from the package namespace. This is essential for + # PSOCK workers (where the closure-captured AMR_env is a stale serialised copy + # while as.sir() writes to the live AMR:::AMR_env) and also avoids capturing + # pre-existing log entries from earlier in the session when forking. + .amr_env <- get("AMR_env", envir = asNamespace("AMR"), inherits = FALSE) + # In parallel mode each worker (fork or PSOCK) has its own copy of the + # history; record the current length so we capture only the new rows added + # by the as.sir() call below, not any pre-existing entries inherited at fork + # time or carried over from earlier as.sir() calls. + if (is_parallel_run) pre_log_n <- NROW(.amr_env$sir_interpretation_history) + ab_col <- ab_cols[i] out <- list(result = NULL, log = NULL) if (types[i] == "mic") { - result <- x %pm>% - pm_pull(ab_col) %pm>% - as.character() %pm>% - as.mic() %pm>% - as.sir( - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], - ab = ab_col, - guideline = guideline, - uti = uti, - capped_mic_handling = capped_mic_handling, - as_wt_nwt = as_wt_nwt, - add_intrinsic_resistance = add_intrinsic_resistance, - reference_data = reference_data, - substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, - include_screening = include_screening, - include_PKPD = include_PKPD, - breakpoint_type = breakpoint_type, - host = host, - verbose = verbose, - info = info, - conserve_capped_values = conserve_capped_values, - is_data.frame = TRUE - ) + result <- as.sir( + as.mic(as.character(x[, ab_col, drop = TRUE])), + mo = x_mo, + mo.bak = x[, col_mo, drop = TRUE], + ab = ab_col, + guideline = guideline, + uti = uti, + capped_mic_handling = capped_mic_handling, + as_wt_nwt = as_wt_nwt, + add_intrinsic_resistance = add_intrinsic_resistance, + reference_data = reference_data, + substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, + include_screening = include_screening, + include_PKPD = include_PKPD, + breakpoint_type = breakpoint_type, + host = host, + verbose = verbose, + info = effective_info, + conserve_capped_values = conserve_capped_values, + is_data.frame = TRUE + ) out$result <- result - out$log <- AMR_env$sir_interpretation_history - AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] # reset log + if (is_parallel_run) { + full_log <- .amr_env$sir_interpretation_history + out$log <- if (pre_log_n < NROW(full_log)) { + full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE] + } else { + full_log[0L, , drop = FALSE] + } + } else { + out$log <- .amr_env$sir_interpretation_history + .amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE] + } return(out) } else if (types[i] == "disk") { - result <- x %pm>% - pm_pull(ab_col) %pm>% - as.character() %pm>% - as.disk() %pm>% - as.sir( - mo = x_mo, - mo.bak = x[, col_mo, drop = TRUE], - ab = ab_col, - guideline = guideline, - uti = uti, - as_wt_nwt = as_wt_nwt, - add_intrinsic_resistance = add_intrinsic_resistance, - reference_data = reference_data, - substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, - include_screening = include_screening, - include_PKPD = include_PKPD, - breakpoint_type = breakpoint_type, - host = host, - verbose = verbose, - info = info, - is_data.frame = TRUE - ) + result <- as.sir( + as.disk(as.character(x[, ab_col, drop = TRUE])), + mo = x_mo, + mo.bak = x[, col_mo, drop = TRUE], + ab = ab_col, + guideline = guideline, + uti = uti, + as_wt_nwt = as_wt_nwt, + add_intrinsic_resistance = add_intrinsic_resistance, + reference_data = reference_data, + substitute_missing_r_breakpoint = substitute_missing_r_breakpoint, + include_screening = include_screening, + include_PKPD = include_PKPD, + breakpoint_type = breakpoint_type, + host = host, + verbose = verbose, + info = effective_info, + is_data.frame = TRUE + ) out$result <- result - out$log <- AMR_env$sir_interpretation_history - AMR_env$sir_interpretation_history <- AMR_env$sir_interpretation_history[0, , drop = FALSE] + if (is_parallel_run) { + full_log <- .amr_env$sir_interpretation_history + out$log <- if (pre_log_n < NROW(full_log)) { + full_log[seq.int(pre_log_n + 1L, NROW(full_log)), , drop = FALSE] + } else { + full_log[0L, , drop = FALSE] + } + } else { + out$log <- .amr_env$sir_interpretation_history + .amr_env$sir_interpretation_history <- .amr_env$sir_interpretation_history[0L, , drop = FALSE] + } return(out) } else if (types[i] == "sir") { ab <- ab_col @@ -982,27 +1025,27 @@ as.sir.data.frame <- function(x, show_message <- FALSE if (!all(x[, ab, drop = TRUE] %in% c("S", "SDD", "I", "R", "NI", NA), na.rm = TRUE)) { show_message <- TRUE - if (isTRUE(info)) { - message_("\u00a0\u00a0", AMR_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", + if (isTRUE(effective_info)) { + message_("\u00a0\u00a0", .amr_env$bullet_icon, " Cleaning values in column ", paste0("{.field ", font_bold(ab), "}"), " (", ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), - ab_name(ab_coerced, tolower = TRUE, info = info), ")... ", + ab_name(ab_coerced, tolower = TRUE, info = effective_info), ")... ", appendLF = FALSE, as_note = FALSE ) } } else if (!is.sir(x.bak[, ab, drop = TRUE])) { show_message <- TRUE - if (isTRUE(info)) { - message_("\u00a0\u00a0", AMR_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", + if (isTRUE(effective_info)) { + message_("\u00a0\u00a0", .amr_env$bullet_icon, " Assigning class {.cls sir} to already clean column ", paste0("{.field ", font_bold(ab), "}"), " (", ifelse(ab_coerced != toupper(ab), paste0(ab_coerced, ", "), ""), - ab_name(ab_coerced, tolower = TRUE, language = NULL, info = info), ")... ", + ab_name(ab_coerced, tolower = TRUE, language = NULL, info = effective_info), ")... ", appendLF = FALSE, as_note = FALSE ) } } - result <- as.sir.default(x = as.character(x[, ab, drop = TRUE])) - if (show_message == TRUE && isTRUE(info)) { + result <- as.sir(as.character(x[, ab, drop = TRUE])) + if (show_message == TRUE && isTRUE(effective_info)) { message_(font_green_bg("\u00a0OK\u00a0"), as_note = FALSE) } out$result <- result @@ -1025,8 +1068,9 @@ as.sir.data.frame <- function(x, "x", "x.bak", "x_mo", "ab_cols", "types", "capped_mic_handling", "as_wt_nwt", "add_intrinsic_resistance", "reference_data", "substitute_missing_r_breakpoint", "include_screening", "include_PKPD", - "breakpoint_type", "guideline", "host", "uti", "info", "verbose", - "col_mo", "AMR_env", "conserve_capped_values", + "breakpoint_type", "guideline", "host", "uti", "verbose", + "col_mo", "conserve_capped_values", + "effective_info", "is_parallel_run", "run_as_sir_column" ), envir = environment()) result_list <- parallel::parLapply(cl, seq_along(ab_cols), run_as_sir_column) @@ -1035,7 +1079,7 @@ as.sir.data.frame <- function(x, result_list <- parallel::mclapply(seq_along(ab_cols), run_as_sir_column, mc.cores = n_cores) } if (isTRUE(info)) { - message_(font_green_bg("\u00aDONE\u00a"), as_note = FALSE) + message_(font_green_bg("\u00a0DONE\u00a0"), as_note = FALSE) message_(as_note = FALSE) message_("Run {.help [{.fun sir_interpretation_history}](AMR::sir_interpretation_history)} to retrieve a logbook with all details of the breakpoint interpretations.") }