105 votes

Éviter les fuites de mémoire avec les zipWithIndex/group enumeratees de Scalaz 7

Contexte

Comme indiqué dans cette question, j'utilise les itérations Scalaz 7 pour traiter un flux de données volumineux (c'est-à-dire illimité) dans un espace mémoire constant.

Mon code ressemble à ceci :

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Le Problème

Il semble y avoir une fuite de mémoire, mais je ne connais pas assez bien Scalaz/FP pour savoir si le bug se trouve dans Scalaz ou dans mon code. Intuitivement, je m'attends à ce que ce code ne nécessite qu'un espace de la taille de P fois la taille de Chunk.

Remarque : J'ai trouvé une question similaire dans laquelle un OutOfMemoryError a été rencontré, mais mon code n'utilise pas consume.

Test

J'ai effectué quelques tests pour essayer d'isoler le problème. En résumé, la fuite semble survenir uniquement lorsque à la fois zipWithIndex et group sont utilisés.

// pas de zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// seulement le grouping
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping et grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// seulement le zipping
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// pas de zipping/grouping, tableaux plus grands
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// seulement le zipping, tableaux plus grands
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Code pour les tests :

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// définir un énumérateur qui produit un flux de nouveaux tableaux remplis de zéros
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// définir un iterateur qui consomme un flux de tableaux 
// et calcule sa longueur
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// définir un iterateur qui consomme un flux de tableaux groupés 
// et calcule sa longueur
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// définir un iterateur qui consomme un flux de tableaux groupés/zippés
// et calcule sa longueur
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// définir un iterateur qui consomme un flux de tableaux zippés
// et calcule sa longueur
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Questions

  • Le bug est-il dans mon code ?
  • Comment puis-je faire fonctionner cela dans un espace mémoire constant ?

4voto

Aaron Novstrup Points 10742

Cela ne constituera qu'un maigre réconfort pour ceux qui sont coincés avec l'ancienne API iteratee, mais j'ai récemment vérifié qu'un test équivalent passe avec l'API scalaz-stream. Il s'agit d'une API plus récente de traitement de flux qui est censée remplacer iteratee.

Pour des raisons de complétude, voici le code de test :

// Créer un flux contenant `n` tableaux avec `sz` entiers dans chacun
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Ceci devrait fonctionner avec n'importe quelle valeur pour le paramètre n (à condition que vous soyez prêt à attendre suffisamment longtemps) -- j'ai testé avec des tableaux de 2^14 32 MiB (c'est-à-dire un total de la moitié de TiB de mémoire allouée au fil du temps).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X