47 votes

Existe-t-il un moyen de suivre les progrès sur un mclapply?

J'aime le paramètre .progress = 'text' en plyr's llply. Cependant, il provoque mon beaucoup d'anxiété de ne pas savoir dans quelle mesure le long d'un mclapply (de colis multicore) est, depuis les éléments de la liste sont envoyés à différents coeurs et alors réuni à la fin.

J'ai été la sortie des messages comme *currently in sim_id # ....* mais ce n'est pas très utile, parce qu'elle ne veut pas me donner un indicateur du pourcentage des éléments de la liste sont complètes (bien qu'il est utile de savoir que mon script n'est pas coincé et se déplaçant le long).

Quelqu'un peut-il suggérer d'autres idées qui me permettrait de regarder mes .Rout le fichier et avoir une idée de progrès? J'ai pensé à l'ajout d'un manuel de comptoir, mais ne peut pas voir comment j'allais le mettre en œuvre que depuis mclapply doit terminer le traitement de tous les éléments de la liste avant de pouvoir donner de la rétroaction.

26voto

fotNelton Points 2329

En raison du fait qu' mclapply engendre de multiples processus, on pourrait vouloir utiliser fifo, des tuyaux, ou même des sockets. Considérons maintenant l'exemple suivant:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")

Ici, un fichier temporaire est utilisé comme fifo, et les principaux processus de fourches d'un enfant dont la seule tâche est de faire rapport sur les progrès. Les principaux processus se poursuit en appelant mclapply où l'expression (plus précisément, l'expression de bloc) qui doit être évalué à l'écrit partielle des informations sur la progression de la mémoire tampon fifo par le biais de l' writeBin.

Comme ce n'est qu'un simple exemple, vous aurez probablement à adapter l'ensemble du résultat des trucs à vos besoins. HTH!

7voto

Max Ghenis Points 400

Voici une fonction basée sur la solution de @ fotNelton à appliquer partout où vous utiliseriez normalement mcapply.

 mcadply <- function(X, FUN) {                                                                                                                                                  
  # Runs multicore lapply with progress indicator and transformation to                                                                                                        
  # data.table output                                                                                                                                                          
  #                                                                                                                                                                            
  # Arguments (same as lapply):                                                                                                                                                
  # X:   Vector                                                                                                                                                                
  # FUN: Function to apply to each value of X                                                                                                                                  
  #                                                                                                                                                                            
  # Output: data.table stack of each mclapply return value                                                                                                                     
  #         Note FUN is transformed to a data.frame return if necessary                                                                                                        
  #                                                                                                                                                                            
  # Progress bar code based on http://stackoverflow.com/a/10993589                                                                                                             

  require(multicore)                                                                                                                                                            
  require(plyr)                                                                                                                                                                
  require(data.table)                                                                                                                                                          

  local({                                                                                                                                                                      
    f <- fifo(tempfile(), open="w+b", blocking=T)                                                                                                                              
    if (inherits(fork(), "masterProcess")) {                                                                                                                                   
      # Child                                                                                                                                                                  
      progress <- 0                                                                                                                                                            
      print.progress <- 0                                                                                                                                                      
      while (progress < 1 && !isIncomplete(f)) {                                                                                                                               
        msg <- readBin(f, "double")                                                                                                                                            
        progress <- progress + as.numeric(msg)                                                                                                                                 
        # Print every 1%                                                                                                                                                       
        if(progress >= print.progress + 0.01) {                                                                                                                                
          cat(sprintf("Progress: %.0f%%\n", progress * 100))                                                                                                                   
          print.progress <- floor(progress * 100) / 100                                                                                                                        
        }                                                                                                                                                                      
      }                                                                                                                                                                        
      exit()                                                                                                                                                                   
    }                                                                                                                                                                          

    newFun <- function(...) {                                                                                                                                                  
      writeBin(1 / length(X), f)                                                                                                                                               
      return(as.data.frame(FUN(...)))                                                                                                                                          
    }                                                                                                                                                                          

    result <- as.data.table(rbind.fill(mclapply(X, newFun)))                                                                                                                   
    close(f)                                                                                                                                                                   
    cat("Done\n")                                                                                                                                                              
    return(result)                                                                                                                                                             
  })                                                                                                                                                                           
}
 

4voto

kamula Points 118

Ceux-ci ont fière allure, sauf qu'ils ne vous permettent pas de passer des arguments supplémentaires à mclapply, j'en ai juste ajouté quelques-uns ... pour corriger cela:

 mcadply <- function(X, FUN, ...) {                                                                                                                                                  
  # Runs multicore lapply with progress indicator and transformation to                                                                                                        
  # data.table output                                                                                                                                                          
  #                                                                                                                                                                            
  # Arguments (same as lapply):                                                                                                                                                
  # X:   Vector                                                                                                                                                                
  # FUN: Function to apply to each value of X                                                                                                                                  
  #                                                                                                                                                                            
  # Output: data.table stack of each mclapply return value                                                                                                                     
  #         Note FUN is transformed to a data.frame return if necessary                                                                                                        
  #                                                                                                                                                                            
  # Progress bar code based on http://stackoverflow.com/a/10993589                                                                                                             

  require(multicore)                                                                                                                                                            
  require(plyr)                                                                                                                                                                
  require(data.table)                                                                                                                                                          

  local({                                                                                                                                                                      
    f <- fifo(tempfile(), open="w+b", blocking=T)                                                                                                                              
    if (inherits(fork(), "masterProcess")) {                                                                                                                                   
      # Child                                                                                                                                                                  
      progress <- 0                                                                                                                                                            
      print.progress <- 0                                                                                                                                                      
      while (progress < 1 && !isIncomplete(f)) {                                                                                                                               
        msg <- readBin(f, "double")                                                                                                                                            
        progress <- progress + as.numeric(msg)                                                                                                                                 
        # Print every 1%                                                                                                                                                       
        if(progress >= print.progress + 0.01) {                                                                                                                                
          cat(sprintf("Progress: %.0f%%\n", progress * 100))                                                                                                                   
          print.progress <- floor(progress * 100) / 100                                                                                                                        
        }                                                                                                                                                                      
      }                                                                                                                                                                        
      exit()                                                                                                                                                                   
    }                                                                                                                                                                          

    newFun <- function(...) {                                                                                                                                                  
      writeBin(1 / length(X), f)                                                                                                                                               
      return(as.data.frame(FUN(...)))                                                                                                                                          
    }                                                                                                                                                                          

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))                                                                                                                   
    close(f)                                                                                                                                                                   
    cat("Done\n")                                                                                                                                                              
    return(result)                                                                                                                                                             
  })                                                                                                                                                                           
}
 

2voto

lcallot Points 87

Basé sur la réponse de @fotNelson, en utilisant une barre de progression au lieu de l'impression ligne par ligne et en appelant une fonction externe avec mclapply.

 library('utils')
library('multicore')

prog.indic <- local({ #evaluates in local environment only
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
    assign(x='f',value=f,envir=.GlobalEnv)
    pb <- txtProgressBar(min=1, max=MC,style=3)

    if (inherits(fork(), "masterProcess")) { #progress tracker
        # Child
        progress <- 0.0
        while (progress < MC && !isIncomplete(f)){ 
            msg <- readBin(f, "double")
                progress <- progress + as.numeric(msg)

            # Updating the progress bar.
            setTxtProgressBar(pb,progress)
            } 


        exit()
        }
   MC <- 100
   result <- mclapply(1:MC, .mcfunc)

    cat('\n')
    assign(x='result',value=result,envir=.GlobalEnv)
    close(f)
    })

.mcfunc<-function(i,...){
        writeBin(1, f)
        return(i)
    }
 

L'affectation de la connexion fifo au .GlobalEnv est nécessaire pour l'utiliser à partir d'une fonction en dehors de l'appel mclapply. Merci pour les questions et les réponses précédentes, je me demandais comment faire depuis un moment.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X