Contexte
Comme indiqué dans cette question, j'utilise les itérations Scalaz 7 pour traiter un flux de données volumineux (c'est-à-dire illimité) dans un espace mémoire constant.
Mon code ressemble à ceci :
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
Le Problème
Il semble y avoir une fuite de mémoire, mais je ne connais pas assez bien Scalaz/FP pour savoir si le bug se trouve dans Scalaz ou dans mon code. Intuitivement, je m'attends à ce que ce code ne nécessite qu'un espace de la taille de P fois la taille de Chunk
.
Remarque : J'ai trouvé une question similaire dans laquelle un OutOfMemoryError
a été rencontré, mais mon code n'utilise pas consume
.
Test
J'ai effectué quelques tests pour essayer d'isoler le problème. En résumé, la fuite semble survenir uniquement lorsque à la fois zipWithIndex
et group
sont utilisés.
// pas de zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// seulement le grouping
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping et grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// seulement le zipping
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// pas de zipping/grouping, tableaux plus grands
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// seulement le zipping, tableaux plus grands
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
Code pour les tests :
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// définir un énumérateur qui produit un flux de nouveaux tableaux remplis de zéros
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// définir un iterateur qui consomme un flux de tableaux
// et calcule sa longueur
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// définir un iterateur qui consomme un flux de tableaux groupés
// et calcule sa longueur
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// définir un iterateur qui consomme un flux de tableaux groupés/zippés
// et calcule sa longueur
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// définir un iterateur qui consomme un flux de tableaux zippés
// et calcule sa longueur
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
Questions
- Le bug est-il dans mon code ?
- Comment puis-je faire fonctionner cela dans un espace mémoire constant ?