Mon jeu de données compte environ 20 millions de lignes et nécessite environ 8 Go de RAM. J'exécute mon travail avec 2 exécuteurs, 10 Go de RAM par exécuteur, 2 cœurs par exécuteur. En raison de transformations ultérieures, les données doivent être mises en cache en une seule fois.
J'ai besoin de réduire les doublons sur la base de 4 champs (choisir n'importe lequel des doublons). Deux options : utiliser groupBy
et en utilisant repartition
y mapPartitions
. La deuxième approche vous permet de spécifier le nombre de partitions, et pourrait être plus rapide dans certains cas, non ?
Pouvez-vous nous expliquer quelle option est la plus performante ? Les deux options ont-elles la même consommation de RAM ?
Utilisation de groupBy
dataSet
.groupBy(col1, col2, col3, col4)
.agg(
last(col5),
...
last(col17)
);
Utilisation de repartition
y mapPartitions
dataSet.sqlContext().createDataFrame(
dataSet
.repartition(parallelism, seq(asList(col1, col2, col3, col4)))
.toJavaRDD()
.mapPartitions(DatasetOps::reduce),
SCHEMA
);
private static Iterator<Row> reduce(Iterator<Row> itr) {
Comparator<Row> comparator = (row1, row2) -> Comparator
.comparing((Row r) -> r.getAs(name(col1)))
.thenComparing((Row r) -> r.getAs(name(col2)))
.thenComparingInt((Row r) -> r.getAs(name(col3)))
.thenComparingInt((Row r) -> r.getAs(name(col4)))
.compare(row1, row2);
List<Row> list = StreamSupport
.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
.collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));
return list.iterator();
}