3 votes

Spark. Diviser le RDD en lots

J'ai un RDD, où chaque enregistrement est un int :

[0,1,2,3,4,5,6,7,8]

Tout ce que je dois faire, c'est diviser ce RDD en lots. C'est-à-dire créer un autre RDD où chaque élément est une liste d'éléments de taille fixe :

[[0,1,2], [3,4,5], [6,7,8]]

Cela semble trivial, cependant, je suis perplexe depuis plusieurs jours et je ne trouve rien d'autre que la solution suivante :

  1. Utilisez ZipWithIndex pour énumérer les enregistrements dans un RDD :

    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]

  2. Itérer sur ce RDD en utilisant map() et calculer l'index comme suit index = int(index / batchSize)

    [1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]

  3. Regrouper ensuite par index généré.

    [(0, [0,1,2]), (1, [3,4,5])]

Cela me permettra d'obtenir ce dont j'ai besoin, mais je ne veux pas utiliser le groupe par ici. C'est trivial lorsque vous utilisez Map Reduce ou une abstraction comme Apache Crunch. Mais existe-t-il un moyen de produire un résultat similaire dans Spark sans utiliser de groupe par ?

0voto

khachik Points 12589

Vous n'avez pas clairement expliqué pourquoi vous avez besoin de RDD de taille fixe, en fonction de ce que vous essayez d'accomplir, il pourrait y avoir une meilleure solution, mais pour répondre à la question telle qu'elle a été posée, je vois les options suivantes :
1) mettre en œuvre des filtres basés sur le nombre d'éléments et la taille des lots. Par exemple, si vous avez 1000 éléments dans le RDD original et que vous souhaitez les diviser en 10 lots, vous devrez appliquer 10 filtres, le premier vérifiant si l'index est [0, 99], le second [100, 199] et ainsi de suite. Après avoir appliqué chaque filtre, vous aurez un seul RDD. Il est important de noter que le RDD original peut être mis en cache avant le filtrage. Avantages : chaque RDD résultant peut être traité séparément et n'a pas besoin d'être entièrement alloué sur un nœud. Inconvénients : cette approche devient plus lente avec le nombre de lots.
2) Logiquement similaire, mais au lieu d'un filtre, vous implémentez simplement un partitionneur personnalisé qui renvoie un identifiant de partition basé sur l'index (clé) comme décrit ici : Séparateur sur mesure pour des cloisons de taille égale . Avantages : plus rapide que les filtres. Inconvénients : chaque partition doit être placée dans un nœud.
3) Si l'ordre dans le RDD d'origine n'est pas important et que vous avez juste besoin qu'il soit divisé en morceaux à peu près égaux, vous pouvez le coalescer/repartitionner, comme expliqué ici. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

0voto

Voleking Points 1

Vous pouvez peut-être utiliser aggregateByKey Il est beaucoup plus rapide et plus léger que l'application groupByKey dans ce cas. J'ai essayé de diviser 500 millions de données en lots de taille 256 sur 10 exécuteurs, et cela ne prend qu'une demi-heure.

data = data.zipWithIndex().map(lambda x: (x[1] / 256, x[0]))
data = data.aggregateByKey(list(), lambda x, y: x + [y], add)

Pour plus d'informations, voir Différence d'étincelle entre reduceByKey vs groupByKey vs aggregateByKey vs combineByKey

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X