3 votes

Pyspark affiche la valeur maximale (S) et le triage multiple

J'ai besoin d'aide. J'utilise Pyspark (je ne peux pas utiliser SQL). J'ai donc une liste de tuples stockés sous forme de paires RDD :

[(('City1', '2020-03-27', 'X1'), 44),

(('City1', '2020-03-28', 'X1'), 44),

(('City3', '2020-03-28', 'X3'), 15),

(('City4', '2020-03-27', 'X4'), 5),

(('City4', '2020-03-26', 'X4'), 4),

(('City2', '2020-03-26', 'X2'), 14),

(('City2', '2020-03-25', 'X2'), 4),

(('City4', '2020-03-25', 'X4'), 1),

(('City1', '2020-03-29', 'X1'), 1),

(('City5', '2020-03-25', 'X5'), 15)]

Avec par exemple ('City5', '2020-03-25', 'X5') comme Clé, et 15 comme valeur de la dernière paire.

Je voudrais obtenir le résultat suivant :

City1, X1, 2020-03-27, 44

City1, X1, 2020-03-28, 44

City5, X3, 2020-03-25, 15

City3, X3, 2020-03-28, 15

City2, X2, 2020-03-26, 14

City4, X4, 2020-03-27, 5

Veuillez noter que le résultat s'affiche :

  • La ou les clés avec la valeur maximale pour chaque ville (c'est la partie la plus difficile, pour afficher la même ville deux fois si elles ont des valeurs maximales similaires à des dates différentes, je suppose qu'on ne peut pas utiliser ReduceByKey() car la clé n'est pas unique, peut-être GroupBy() ou Filter() ?

  • Dans la séquence d'ordre/de tri suivante :

  1. Valeur maximale décroissante
  2. Date ascendante
  3. Nom de la ville en ordre décroissant (ex : City1)

J'ai donc essayé le code suivant :

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

Bien qu'il me donne les villes avec la valeur maximale, il ne renvoie pas deux fois la même ville (parce que réduite par Key : City) si deux villes ont une valeur maximale similaire à deux dates différentes.

J'espère que c'est assez clair, n'hésitez pas à demander des précisions ! Merci beaucoup !

1voto

Vincent Doba Points 831

Pour garder toutes les villes dont la valeur est égale à la valeur maximale, vous pouvez toujours utiliser reduceByKey mais sur des tableaux au lieu de sur des valeurs :

  • vous transformez vos lignes en clé/valeur, la valeur étant un tableau de tuple au lieu d'un tuple
  • vous réduisez par clé, en fusionnant les tableaux s'ils contiennent la même valeur, sinon en gardant le tableau qui a la valeur maximale, avec reduceByKey
  • vous aplatissez vos tableaux de valeurs, en fusionnant la clé avec eux, avec flatMap
  • enfin vous effectuez votre tri

Le code complet serait le suivant :

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1 + array2
    else:
        return array2

res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))

Et ensuite vous pouvez imprimer votre RDD :

[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]

Cela, avec votre entrée, vous donne la sortie suivante :

City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5

0voto

Luiz Viola Points 97

Pouvez-vous travailler/sortir avec des Dataframes ?

List = [(('City1', '2020-03-27', 'X1'), 44),
        (('City1', '2020-03-28', 'X1'), 44),
        (('City3', '2020-03-28', 'X3'), 15),
        (('City4', '2020-03-27', 'X4'), 5),
        (('City4', '2020-03-26', 'X4'), 4),
        (('City2', '2020-03-26', 'X2'), 14),
        (('City2', '2020-03-25', 'X2'), 4),
        (('City4', '2020-03-25', 'X4'), 1),
        (('City1', '2020-03-29', 'X1'), 1),
        (('City5', '2020-03-25', 'X5'), 15)]

rdd = sc.parallelize(List)

import pyspark.sql.functions as F

df = rdd\
        .toDF()\
        .select('_1.*', F.col('_2').alias('value'))\
        .orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))

df.show(truncate=False)

+-----+----------+---+-----+
|_1   |_2        |_3 |value|
+-----+----------+---+-----+
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
|City2|2020-03-25|X2 |4    |
|City4|2020-03-26|X4 |4    |
|City4|2020-03-25|X4 |1    |
|City1|2020-03-29|X1 |1    |
+-----+----------+---+-----+

0voto

Vincent Doba Points 831

Vous pouvez transformer votre rdd en un Cadre de données et ensuite utiliser un La fenêtre de Spark pour obtenir la valeur maximale pour chaque ville, filtrer les lignes en utilisant cette valeur et enfin ordonner votre cadre de données comme vous le souhaitez :

from pyspark.sql import functions as F
from pyspark.sql import Window

window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = rdd.toDF().select(
 F.col('_1._1').alias('city'),
 F.col('_1._2').alias('date'),
 F.col('_1._3').alias('key'),
 F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))\
 .filter(F.col('value') == F.col('max_value'))\
 .drop('max_value')\
 .orderBy(F.desc('value'), F.asc('date'), F.asc('city'))

Et vous obtenez le dataframe suivant avec votre entrée rdd :

+-----+----------+---+-----+
|city |date      |key|value|
+-----+----------+---+-----+
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
+-----+----------+---+-----+

Si vous avez besoin d'un RDD à la fin du processus, vous pouvez le récupérer en utilisant .rdd méthode :

df.rdd

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X