28 votes

Itérer sur les lignes d'un fichier en parallèle (Scala)?

Je sais que sur le parallèle collections Scala. Ils sont à portée de main! Cependant, je tiens à parcourir les lignes d'un fichier est trop volumineux pour la mémoire en parallèle. J'ai pu créer des threads et mis en place un verrou sur un Scanner, par exemple, mais ce serait génial si je pouvais exécuter le code tel que:

Source.fromFile(path).getLines.par foreach { line =>

Malheureusement, cependant

error: value par is not a member of Iterator[String]

Quelle est la manière la plus simple d'accomplir un certain parallélisme ici? Pour l'instant, je vais lire en quelques lignes et de les gérer en parallèle.

25voto

Joshua Hartman Points 535

Vous pouvez utiliser le regroupement pour découper facilement l'itérateur en morceaux que vous pouvez charger en mémoire puis traiter en parallèle.

 val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}
 

À mon avis, quelque chose comme ça est la façon la plus simple de le faire.

8voto

Dan Simon Points 5408

Je vais mettre cela comme une réponse distincte car elle est fondamentalement différente de mon dernier (et ça marche)

Voici un aperçu d'une solution à l'aide d'acteurs, qui est essentiellement ce que Kim Stebel commentaire décrit. Il y a deux classes acteur, un seul FileReader acteur qui lit les lignes dans le fichier sur demande, et plusieurs Travailleurs acteurs. Les travailleurs de tous les envoyer des demandes de lignes pour le lecteur, et les lignes de production en parallèle comme ils sont lus dans le fichier.

Je suis à l'aide d'Akka acteurs ici, mais en utilisant une autre application est fondamentalement la même idée.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

De cette façon, pas plus de 4 (ou de l'cependant, de nombreux travailleurs que vous avez) non transformés, les lignes sont en mémoire à la fois.

1voto

Dan Simon Points 5408

La solution la plus simple consiste simplement à convertir l'itérateur en liste

 Source.fromFile(path).getLines.toList.par foreach { line =>
 

Edit: j'ai raté la partie sur le fichier étant trop volumineux pour être entièrement traité en mémoire. Dans ce cas, vous devez probablement utiliser un BufferedReader Java ordinaire pour lire les lignes une par une. Ensuite, vous pouvez utiliser une file d'attente ou une liste modifiable pour créer une collection de taille raisonnable, puis utiliser le foreach parallèle.

1voto

Ian McLaird Points 2487

Les commentaires sur la réponse de Dan Simon m'ont fait réfléchir. Pourquoi n'essayons-nous pas d'encapsuler la source dans un flux:

 def src(source: Source) = Stream[String] = {
  if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
  else Stream.empty
}
 

Ensuite, vous pouvez le consommer en parallèle comme ceci:

 src(Source.fromFile(path)).par foreach process
 

J'ai essayé cela, et il se compile et s'exécute en tout cas. Je ne suis pas vraiment sûr de savoir s'il charge le fichier entier en mémoire ou non, mais je ne pense pas que ce soit le cas.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X