3 votes

Comment passer un DataFrame en entrée d'un UDF Spark ?

J'ai un cadre de données et je veux appliquer une fonction à chaque ligne. Cette fonction dépend d'autres dataframes.

Exemple simplifié. J'ai trois dataframes comme ci-dessous :

df = sc.parallelize([
    ['a', 'b', 1],
    ['c', 'd', 3]
    ]).toDF(('feat1', 'feat2', 'value'))

df_other_1 = sc.parallelize([
        ['a', 0, 1, 0.0],
        ['a', 1, 3, 0.1],
        ['a', 3, 10, 1.0],
        ['c', 0, 10, 0.2],
        ['c', 10, 25, 0.5]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

df_other_2 = sc.parallelize([
        ['b', 0, 4, 0.1],
        ['b', 4, 20, 0.5],
        ['b', 20, 30, 1.0],
        ['d', 0, 5, 0.05],
        ['d', 5, 22, 0.9]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

Pour chaque ligne de df je veux collecter les valeurs supérieures uniques pour feat1 y feat2 de df_other_1 y df_other_2 Par exemple, pour la première ligne, les valeurs uniques sont (1, 3, 10, 4, 20, 30). Ensuite, je vais les trier comme (30, 20, 10, 4, 3, 1) et ajouter au début, un numéro au-dessus du premier. Le site df deviendrait ainsi :

df = sc.parallelize([
        ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
        ['c', 'd', 3, [26, 25, 22, 10, 5]]
        ]).toDF(('feat1', 'feat2', 'value', 'lst'))

Ensuite, pour chaque ligne de df et pour chacune des valeurs respectives de la lst Je veux calculer la somme de score des deux df_other_1 y df_other_2 où chaque valeur de lst relève de upper y lower . Mon but est de trouver la valeur la plus basse dans chaque lst dont le score total est supérieur à un certain seuil (par exemple, 1,4).

Voici comment calculer le score total. Donc, pour la première ligne de df la première valeur de lst a 31 ans. Sur le site df_other_1 para feat1 il se trouve au-dessus du seau le plus élevé et obtient donc la note 1. Il en va de même pour df_other_2 . Le score total serait donc de 1+1 =2. Pour la valeur 10 (toujours pour la première ligne), le score total serait de 1 + 0,5 = 1,5.

C'est ainsi que le df ressemblerait à la fin :

df = sc.parallelize([
            ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
            ['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
            ]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))

Je cherche en fait à trouver ces valeurs cibles 4 y 25 . Les étapes intermédiaires n'ont pas vraiment d'importance.

\==========================================================================

Voici ce que j'ai essayé jusqu'à présent :

def get_threshold_for_row(feat1, feat2, threshold):

    this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
    this_df_other_2 = df_other_2.filter(col('feat1') == feat2)

    values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
    values_feat_1.append(values_feat_1[-1] + 1)
    values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
    values_feat_2.append(values_feat_2[-1] + 1)

    values = values_feat_1 + values_feat_2
    values = list(set(values)) #Keep unique values
    values.sort(reverse=True)  #Sort from largest to smallest

    df_1_score = df_2_score = 0
    prev_value = 10000 #Any large number
    prev_score = 10000

    for value in values:
        df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
        df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)

        total_score = df_1_score + df_2_score

        if total_score < threshold and prev_score >= threshold:
            return prev_value

        prev_score = total_score
        prev_value = value

def is_dataframe_empty(df):
    return len(df.take(1)) == 0

def get_score_for_key(scores_df, grouping_key, this_id, value):

    if is_dataframe_empty(scores_df):
        return 0.0

    w = Window.partitionBy([grouping_key]).orderBy(col('upper'))

    scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
                        .withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
                        .drop('prev_value')

    scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
                        .withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
                        .drop('next_value').cache()

    grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) & 
                              (((value >= col('from_value')) & (value < col('to_value'))) | 
                                ((value >= col('to_value')) & (col('is_last') == 1)) |
                                ((value < col('from_value')) & (col('is_first') == 1)) |
                                (col('from_value').isNull()))) \
                    .withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
                    .collect()[0]['final_score']

    return grouping_key_score

df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
    .map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
    .toDF()

Mais j'y arrive : AttributeError: 'Py4JError' object has no attribute 'message'

Désolé pour le long post. Des idées ?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X