有没有办法跟踪mclapply的进度?

我喜欢plyr's llply的setting .progress = 'text' 。 然而,由于列表项目被发送到各个核心,然后在最后进行整理,所以它引起我的焦虑,不知道mclapply (从multicore包装)到多远。

我一直在输出消息,如*currently in sim_id # ....*但这不是很有帮助,因为它没有给我一个指标列表项目完成百分比(虽然这是有帮助的,知道我的脚本没有被卡住和移动)。

有人可以提出其他的想法,让我看看我的.Rout文件,并获得进步的感觉? 我想过添加一个手动计数器,但是看不到我将如何实现,因为mclapply必须完成处理所有列表项,才能发出任何反馈。

由于mclapply衍生出多个进程的事实,人们可能想要使用fifos,pipe道甚至套接字。 现在考虑下面的例子:

 library(multicore) finalResult <- local({ f <- fifo(tempfile(), open="w+b", blocking=T) if (inherits(fork(), "masterProcess")) { # Child progress <- 0.0 while (progress < 1 && !isIncomplete(f)) { msg <- readBin(f, "double") progress <- progress + as.numeric(msg) cat(sprintf("Progress: %.2f%%\n", progress * 100)) } exit() } numJobs <- 100 result <- mclapply(1:numJobs, function(...) { # Dome something fancy here # ... # Send some progress update writeBin(1/numJobs, f) # Some arbitrary result sample(1000, 1) }) close(f) result }) cat("Done\n") 

在这里,一个临时文件被用作fifo,主进程分派一个小孩,其唯一的职责就是报告当前的进度。 主进程通过调用mclapply继续,其中要被评估的expression式(更确切地说,expression式块)通过writeBin将部分进度信息写入fifo缓冲器。

因为这只是一个简单的例子,你可能不得不根据你的需要调整整个输出的东西。 HTH!

基本上增加@ fotNelson的解决scheme的另一个版本,但有一些修改:

  • 代替mclapply(支持所有mclapplyfunction)
  • 捕获ctrl-c调用并优雅地中止
  • 使用内置的进度条(txtProgressBar)
  • 选项跟踪进度与否,并使用指定风格的进度条
  • 使用parallel而不是multicore ,现在已经从CRAN中删除
  • 强迫X按照mclapply列出(所以长度(X)给出了预期的结果)
  • 在顶部的roxygen2风格的文档

希望这可以帮助别人…

 library(parallel) #------------------------------------------------------------------------------- #' Wrapper around mclapply to track progress #' #' Based on http://stackoverflow.com/questions/10984556 #' #' @param X a vector (atomic or list) or an expressions vector. Other #' objects (including classed objects) will be coerced by #' 'as.list' #' @param FUN the function to be applied to #' @param ... optional arguments to 'FUN' #' @param mc.preschedule see mclapply #' @param mc.set.seed see mclapply #' @param mc.silent see mclapply #' @param mc.cores see mclapply #' @param mc.cleanup see mclapply #' @param mc.allow.recursive see mclapply #' @param mc.progress track progress? #' @param mc.style style of progress bar (see txtProgressBar) #' #' @examples #' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01)) #' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1) #' #' dat <- lapply(1:10, function(x) rnorm(100)) #' func <- function(x, arg1) mean(x)/arg1 #' mclapply2(dat, func, arg1=10, mc.cores=2) #------------------------------------------------------------------------------- mclapply2 <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE, mc.allow.recursive = TRUE, mc.progress=TRUE, mc.style=3) { if (!is.vector(X) || is.object(X)) X <- as.list(X) if (mc.progress) { f <- fifo(tempfile(), open="w+b", blocking=T) p <- parallel:::mcfork() pb <- txtProgressBar(0, length(X), style=mc.style) setTxtProgressBar(pb, 0) progress <- 0 if (inherits(p, "masterProcess")) { while (progress < length(X)) { readBin(f, "double") progress <- progress + 1 setTxtProgressBar(pb, progress) } cat("\n") parallel:::mcexit() } } tryCatch({ result <- mclapply(X, ..., function(...) { res <- FUN(...) if (mc.progress) writeBin(1, f) res }, mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed, mc.silent = mc.silent, mc.cores = mc.cores, mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive ) }, finally = { if (mc.progress) close(f) }) result } 

这里有一个基于@ fotNelton的解决scheme的函数,适用于任何你通常使用mcapply的地方。

 mcadply <- function(X, FUN, ...) { # Runs multicore lapply with progress indicator and transformation to # data.table output. Arguments mirror those passed to lapply. # # Args: # X: Vector. # FUN: Function to apply to each value of X. Note this is transformed to # a data.frame return if necessary. # ...: Other arguments passed to mclapply. # # Returns: # data.table stack of each mclapply return value # # Progress bar code based on https://stackoverflow.com/a/10993589 require(multicore) require(plyr) require(data.table) local({ f <- fifo(tempfile(), open="w+b", blocking=T) if (inherits(fork(), "masterProcess")) { # Child progress <- 0 print.progress <- 0 while (progress < 1 && !isIncomplete(f)) { msg <- readBin(f, "double") progress <- progress + as.numeric(msg) # Print every 1% if(progress >= print.progress + 0.01) { cat(sprintf("Progress: %.0f%%\n", progress * 100)) print.progress <- floor(progress * 100) / 100 } } exit() } newFun <- function(...) { writeBin(1 / length(X), f) return(as.data.frame(FUN(...))) } result <- as.data.table(rbind.fill(mclapply(X, newFun, ...))) close(f) cat("Done\n") return(result) }) } 

pbapply包已经在一般情况下实现了。 pblapplypbsapply都有一个cl参数。 从文档 :

并行处理可以通过cl参数来启用。 当cl是一个“ cluster ”对象时调用mclapply ,当cl是一个整数时调用mclapply 。 显示进度条会增加主进程和节点/subprocess之间的通信开销,与没有进度栏的并行等效函数相比。 当进度条被禁用时(即getOption("pboptions")$type == "none" dopb()FALSE ),函数将回退到原来的等getOption("pboptions")$type == "none" 。 当interactive()如果为FALSE (即从命令行R脚本调用interactive()时,这是默认的。

如果没有提供cl (或者传递NULL ),则使用默认的非并行lapply ,还包括一个进度条。

基于@fotNelson的答案,使用进度条而不是逐行打印并用mclapply调用外部函数。

 library('utils') library('multicore') prog.indic <- local({ #evaluates in local environment only f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection assign(x='f',value=f,envir=.GlobalEnv) pb <- txtProgressBar(min=1, max=MC,style=3) if (inherits(fork(), "masterProcess")) { #progress tracker # Child progress <- 0.0 while (progress < MC && !isIncomplete(f)){ msg <- readBin(f, "double") progress <- progress + as.numeric(msg) # Updating the progress bar. setTxtProgressBar(pb,progress) } exit() } MC <- 100 result <- mclapply(1:MC, .mcfunc) cat('\n') assign(x='result',value=result,envir=.GlobalEnv) close(f) }) .mcfunc<-function(i,...){ writeBin(1, f) return(i) } 

分配到.GlobalEnv的fifo连接对于从mclapply调用之外的函数中使用它是必要的。 感谢您提出的问题和之前的答复,我一直在想如何做一段时间。