J'ai un cadre de données et je veux appliquer une fonction à chaque ligne. Cette fonction dépend d'autres dataframes.
Exemple simplifié. J'ai trois dataframes comme ci-dessous :
df = sc.parallelize([
['a', 'b', 1],
['c', 'd', 3]
]).toDF(('feat1', 'feat2', 'value'))
df_other_1 = sc.parallelize([
['a', 0, 1, 0.0],
['a', 1, 3, 0.1],
['a', 3, 10, 1.0],
['c', 0, 10, 0.2],
['c', 10, 25, 0.5]
]).toDF(('feat1', 'lower', 'upper', 'score'))
df_other_2 = sc.parallelize([
['b', 0, 4, 0.1],
['b', 4, 20, 0.5],
['b', 20, 30, 1.0],
['d', 0, 5, 0.05],
['d', 5, 22, 0.9]
]).toDF(('feat1', 'lower', 'upper', 'score'))
Pour chaque ligne de df
je veux collecter les valeurs supérieures uniques pour feat1
y feat2
de df_other_1
y df_other_2
Par exemple, pour la première ligne, les valeurs uniques sont (1, 3, 10, 4, 20, 30). Ensuite, je vais les trier comme (30, 20, 10, 4, 3, 1) et ajouter au début, un numéro au-dessus du premier. Le site df
deviendrait ainsi :
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
['c', 'd', 3, [26, 25, 22, 10, 5]]
]).toDF(('feat1', 'feat2', 'value', 'lst'))
Ensuite, pour chaque ligne de df
et pour chacune des valeurs respectives de la lst
Je veux calculer la somme de score
des deux df_other_1
y df_other_2
où chaque valeur de lst
relève de upper
y lower
. Mon but est de trouver la valeur la plus basse dans chaque lst
dont le score total est supérieur à un certain seuil (par exemple, 1,4).
Voici comment calculer le score total. Donc, pour la première ligne de df
la première valeur de lst
a 31 ans. Sur le site df_other_1
para feat1
il se trouve au-dessus du seau le plus élevé et obtient donc la note 1. Il en va de même pour df_other_2
. Le score total serait donc de 1+1 =2. Pour la valeur 10 (toujours pour la première ligne), le score total serait de 1 + 0,5 = 1,5.
C'est ainsi que le df
ressemblerait à la fin :
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))
Je cherche en fait à trouver ces valeurs cibles 4
y 25
. Les étapes intermédiaires n'ont pas vraiment d'importance.
\==========================================================================
Voici ce que j'ai essayé jusqu'à présent :
def get_threshold_for_row(feat1, feat2, threshold):
this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
this_df_other_2 = df_other_2.filter(col('feat1') == feat2)
values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
values_feat_1.append(values_feat_1[-1] + 1)
values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
values_feat_2.append(values_feat_2[-1] + 1)
values = values_feat_1 + values_feat_2
values = list(set(values)) #Keep unique values
values.sort(reverse=True) #Sort from largest to smallest
df_1_score = df_2_score = 0
prev_value = 10000 #Any large number
prev_score = 10000
for value in values:
df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)
total_score = df_1_score + df_2_score
if total_score < threshold and prev_score >= threshold:
return prev_value
prev_score = total_score
prev_value = value
def is_dataframe_empty(df):
return len(df.take(1)) == 0
def get_score_for_key(scores_df, grouping_key, this_id, value):
if is_dataframe_empty(scores_df):
return 0.0
w = Window.partitionBy([grouping_key]).orderBy(col('upper'))
scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
.withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
.drop('prev_value')
scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
.withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
.drop('next_value').cache()
grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) &
(((value >= col('from_value')) & (value < col('to_value'))) |
((value >= col('to_value')) & (col('is_last') == 1)) |
((value < col('from_value')) & (col('is_first') == 1)) |
(col('from_value').isNull()))) \
.withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
.collect()[0]['final_score']
return grouping_key_score
df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
.map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
.toDF()
Mais j'y arrive : AttributeError: 'Py4JError' object has no attribute 'message'
Désolé pour le long post. Des idées ?