Version courte de la question !
Considérons l'extrait suivant (en supposant que spark
est déjà fixé à un certain SparkSession
) :
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Remarquez que le champ des températures est une liste de flottants. Je voudrais convertir ces listes de flottants au type MLlib Vector
et j'aimerais que cette conversion soit exprimée à l'aide de la formule de base suivante DataFrame
plutôt que de passer par des RDD (ce qui est inefficace car toutes les données sont envoyées de la JVM vers Python, le traitement est effectué en Python, nous ne bénéficions pas des avantages de l'optimiseur Catalyst de Spark, etc.) Comment faire ? Plus précisément :
- Existe-t-il un moyen de faire fonctionner une distribution directe ? Voir ci-dessous pour les détails (et une tentative ratée de contournement) ? Ou bien, existe-t-il une autre opération qui produise l'effet que je recherche ?
- Laquelle est la plus efficace parmi les deux solutions alternatives que je propose ci-dessous (UDF vs exploser/réassembler les éléments de la liste) ? Ou existe-t-il d'autres solutions presque parfaites, mais pas tout à fait correctes, qui sont meilleures que l'une ou l'autre ?
Un plâtre droit ne fonctionne pas
C'est ce que j'attendrais de la solution "correcte". Je veux convertir le type d'une colonne d'un type à un autre, et je dois donc utiliser un cast. Pour situer le contexte, laissez-moi vous rappeler la manière normale de convertir une colonne en un autre type :
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Maintenant, par exemple df_with_strings.collect()[0]["temperatures"][1]
est '-7.0'
. Mais si je fais un lancer vers un vecteur ml, les choses ne se passent pas aussi bien :
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Cela donne une erreur :
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Oups ! Une idée pour réparer ça ?
Alternatives possibles
Alternative 1 : utiliser VectorAssembler
Il existe un Transformer
qui semble presque idéal pour ce travail : le VectorAssembler
. Il prend une ou plusieurs colonnes et les concatène en un seul vecteur. Malheureusement, elle ne prend que Vector
et Float
les colonnes, et non Array
colonnes, donc le suivi ne fonctionne pas :
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Il donne cette erreur :
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
La meilleure solution à laquelle je pense est d'éclater la liste en plusieurs colonnes, puis d'utiliser la fonction VectorAssembler
pour les rassembler à nouveau :
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Cela semble être l'idéal, sauf que TEMPERATURE_COUNT
être supérieure à 100, et parfois supérieure à 1000. (Un autre problème est que le code serait plus compliqué si vous ne connaissez pas la taille du tableau à l'avance, bien que ce ne soit pas le cas pour mes données). Spark génère-t-il réellement un ensemble de données intermédiaires avec autant de colonnes, ou considère-t-il simplement qu'il s'agit d'une étape intermédiaire que les éléments individuels traversent de manière transitoire (ou bien optimise-t-il entièrement cette étape d'éloignement lorsqu'il voit que la seule utilisation de ces colonnes est d'être assemblées en un vecteur) ?
Alternative 2 : utiliser un UDF
Une alternative plus simple consiste à utiliser un UDF pour effectuer la conversion. Cela me permet d'exprimer assez directement ce que je veux faire en une ligne de code, et ne nécessite pas de créer un ensemble de données avec un nombre fou de colonnes. Mais toutes ces données doivent être échangées entre Python et la JVM, et chaque nombre individuel doit être traité par Python (qui est notoirement lent pour itérer sur des éléments de données individuels). Voici à quoi cela ressemble :
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Remarques ignorantes
Les autres sections de cette question décousue sont des éléments supplémentaires que j'ai trouvés en essayant de trouver une réponse. La plupart des personnes qui lisent ce document peuvent probablement les ignorer.
Ce n'est pas une solution : utilisez Vector
pour commencer
Dans cet exemple trivial, il est possible de créer les données en utilisant le type vectoriel pour commencer, mais bien sûr, mes données ne sont pas vraiment une liste Python que je parallélise, mais sont plutôt lues depuis une source de données. Mais pour mémoire, voici à quoi cela ressemblerait :
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Solution inefficace : utiliser map()
Une possibilité est d'utiliser le RDD map()
pour transformer la liste en un Vector
. C'est similaire à l'idée de l'UDF, sauf que c'est encore pire car le coût de la sérialisation, etc. est engagé pour tous les champs de chaque ligne, et pas seulement pour celui sur lequel on opère. Pour mémoire, voici à quoi ressemblerait cette solution :
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Échec de la tentative de contournement de l'action de cast
En désespoir de cause, j'ai remarqué que Vector
est représenté en interne par un struct avec quatre champs, mais l'utilisation d'un cast traditionnel à partir de ce type de struct ne fonctionne pas non plus. Voici une illustration (où j'ai construit la structure en utilisant un udf mais l'udf n'est pas la partie importante) :
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Cela donne l'erreur :
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
1 votes
Quelqu'un peut-il répondre à la question de savoir comment faire avec Spark version 2.4.3+ en utilisant le dataframe ?