groupByKey:
Syntaxe:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)) )
groupByKey peut causer des problèmes de disque que les données sont envoyées sur le réseau et recueillies sur l'réduire les travailleurs.
reduceByKey:
Syntaxe:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Les données sont combinées à chaque partition , une seule sortie pour une touche à chaque partition à envoyer sur le réseau.
reduceByKey nécessaire de combiner toutes vos valeurs en une autre valeur, avec exactement le même type.
aggregateByKey:
même que reduceByKey, qui prend une valeur initiale.
3 paramètres en entrée
j'. valeur initiale
ii. Combinateurs de la logique
iii. séquence op logique
*Example:* `
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
`
sortie:
Agrégation Par Clé somme des Résultats
bar -> 3
foo -> 5
combineByKey:
3 paramètres en entrée
- Valeur initiale : contrairement à aggregateByKey, n'a pas besoin de passer constante toujours, nous pouvons passer d'une fonction une fonction qui renvoie une nouvelle valeur.
- la fonction de fusion
- combiner la fonction
Exemple:`
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
`
reduceByKey,aggregateByKey,combineByKey préféré sur groupByKey
Référence:
Éviter groupByKey