3 votes

Spark groupBy vs repartition plus mapPartitions

Mon jeu de données compte environ 20 millions de lignes et nécessite environ 8 Go de RAM. J'exécute mon travail avec 2 exécuteurs, 10 Go de RAM par exécuteur, 2 cœurs par exécuteur. En raison de transformations ultérieures, les données doivent être mises en cache en une seule fois.

J'ai besoin de réduire les doublons sur la base de 4 champs (choisir n'importe lequel des doublons). Deux options : utiliser groupBy et en utilisant repartition y mapPartitions . La deuxième approche vous permet de spécifier le nombre de partitions, et pourrait être plus rapide dans certains cas, non ?

Pouvez-vous nous expliquer quelle option est la plus performante ? Les deux options ont-elles la même consommation de RAM ?

Utilisation de groupBy

dataSet
    .groupBy(col1, col2, col3, col4)
    .agg(
        last(col5),
        ...
        last(col17)
    );

Utilisation de repartition y mapPartitions

dataSet.sqlContext().createDataFrame(
    dataSet
        .repartition(parallelism, seq(asList(col1, col2, col3, col4)))
        .toJavaRDD()
        .mapPartitions(DatasetOps::reduce),
    SCHEMA
);

private static Iterator<Row> reduce(Iterator<Row> itr) {
    Comparator<Row> comparator = (row1, row2) -> Comparator
        .comparing((Row r) -> r.getAs(name(col1)))
        .thenComparing((Row r) -> r.getAs(name(col2)))
        .thenComparingInt((Row r) -> r.getAs(name(col3)))
        .thenComparingInt((Row r) -> r.getAs(name(col4)))
        .compare(row1, row2);

    List<Row> list = StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
        .collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));

    return list.iterator();
}

4voto

La deuxième approche vous permet de spécifier le nombre de partitions, et pourrait être plus rapide dans certains cas, non ?

Pas vraiment. Les deux approches vous permettent de spécifier le nombre de partitions - dans le premier cas par le biais de spark.sql.shuffle.partitions

spark.conf.set("spark.sql.shuffle.partitions", parallelism)

Cependant, la deuxième approche est intrinsèquement moins efficace si les doublons sont fréquents, car elle mélange d'abord, et réduit ensuite, en sautant la réduction côté carte (en d'autres termes, c'est encore un autre groupe par clé). Si les doublons sont rares, cela ne fera pas une grande différence.

A propos Dataset fournit déjà dropDuplicates variantes qui prennent un ensemble de colonnes, et first / last n'a pas de signification particulière ici (voir la discussion dans la rubrique Comment sélectionner la première ligne de chaque groupe ? ).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X