71 votes

Comment convertir une colonne de tableau (c'est-à-dire de liste) en vecteur ?

Version courte de la question !

Considérons l'extrait suivant (en supposant que spark est déjà fixé à un certain SparkSession ) :

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Remarquez que le champ des températures est une liste de flottants. Je voudrais convertir ces listes de flottants au type MLlib Vector et j'aimerais que cette conversion soit exprimée à l'aide de la formule de base suivante DataFrame plutôt que de passer par des RDD (ce qui est inefficace car toutes les données sont envoyées de la JVM vers Python, le traitement est effectué en Python, nous ne bénéficions pas des avantages de l'optimiseur Catalyst de Spark, etc.) Comment faire ? Plus précisément :

  1. Existe-t-il un moyen de faire fonctionner une distribution directe ? Voir ci-dessous pour les détails (et une tentative ratée de contournement) ? Ou bien, existe-t-il une autre opération qui produise l'effet que je recherche ?
  2. Laquelle est la plus efficace parmi les deux solutions alternatives que je propose ci-dessous (UDF vs exploser/réassembler les éléments de la liste) ? Ou existe-t-il d'autres solutions presque parfaites, mais pas tout à fait correctes, qui sont meilleures que l'une ou l'autre ?

Un plâtre droit ne fonctionne pas

C'est ce que j'attendrais de la solution "correcte". Je veux convertir le type d'une colonne d'un type à un autre, et je dois donc utiliser un cast. Pour situer le contexte, laissez-moi vous rappeler la manière normale de convertir une colonne en un autre type :

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Maintenant, par exemple df_with_strings.collect()[0]["temperatures"][1] est '-7.0' . Mais si je fais un lancer vers un vecteur ml, les choses ne se passent pas aussi bien :

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Cela donne une erreur :

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Oups ! Une idée pour réparer ça ?

Alternatives possibles

Alternative 1 : utiliser VectorAssembler

Il existe un Transformer qui semble presque idéal pour ce travail : le VectorAssembler . Il prend une ou plusieurs colonnes et les concatène en un seul vecteur. Malheureusement, elle ne prend que Vector et Float les colonnes, et non Array colonnes, donc le suivi ne fonctionne pas :

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

Il donne cette erreur :

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

La meilleure solution à laquelle je pense est d'éclater la liste en plusieurs colonnes, puis d'utiliser la fonction VectorAssembler pour les rassembler à nouveau :

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

Cela semble être l'idéal, sauf que TEMPERATURE_COUNT être supérieure à 100, et parfois supérieure à 1000. (Un autre problème est que le code serait plus compliqué si vous ne connaissez pas la taille du tableau à l'avance, bien que ce ne soit pas le cas pour mes données). Spark génère-t-il réellement un ensemble de données intermédiaires avec autant de colonnes, ou considère-t-il simplement qu'il s'agit d'une étape intermédiaire que les éléments individuels traversent de manière transitoire (ou bien optimise-t-il entièrement cette étape d'éloignement lorsqu'il voit que la seule utilisation de ces colonnes est d'être assemblées en un vecteur) ?

Alternative 2 : utiliser un UDF

Une alternative plus simple consiste à utiliser un UDF pour effectuer la conversion. Cela me permet d'exprimer assez directement ce que je veux faire en une ligne de code, et ne nécessite pas de créer un ensemble de données avec un nombre fou de colonnes. Mais toutes ces données doivent être échangées entre Python et la JVM, et chaque nombre individuel doit être traité par Python (qui est notoirement lent pour itérer sur des éléments de données individuels). Voici à quoi cela ressemble :

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Remarques ignorantes

Les autres sections de cette question décousue sont des éléments supplémentaires que j'ai trouvés en essayant de trouver une réponse. La plupart des personnes qui lisent ce document peuvent probablement les ignorer.

Ce n'est pas une solution : utilisez Vector pour commencer

Dans cet exemple trivial, il est possible de créer les données en utilisant le type vectoriel pour commencer, mais bien sûr, mes données ne sont pas vraiment une liste Python que je parallélise, mais sont plutôt lues depuis une source de données. Mais pour mémoire, voici à quoi cela ressemblerait :

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Solution inefficace : utiliser map()

Une possibilité est d'utiliser le RDD map() pour transformer la liste en un Vector . C'est similaire à l'idée de l'UDF, sauf que c'est encore pire car le coût de la sérialisation, etc. est engagé pour tous les champs de chaque ligne, et pas seulement pour celui sur lequel on opère. Pour mémoire, voici à quoi ressemblerait cette solution :

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Échec de la tentative de contournement de l'action de cast

En désespoir de cause, j'ai remarqué que Vector est représenté en interne par un struct avec quatre champs, mais l'utilisation d'un cast traditionnel à partir de ce type de struct ne fonctionne pas non plus. Voici une illustration (où j'ai construit la structure en utilisant un udf mais l'udf n'est pas la partie importante) :

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

Cela donne l'erreur :

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

1 votes

Quelqu'un peut-il répondre à la question de savoir comment faire avec Spark version 2.4.3+ en utilisant le dataframe ?

28voto

user6910411 Points 32156

Personnellement, j'opterais pour Python UDF et ne m'embêterais pas avec autre chose :

Mais si vous voulez vraiment d'autres options, les voici :

  • UDF Scala avec wrapper Python :

    Installer sbt en suivant les instructions données sur le site du projet.

    Créez un paquet Scala avec la structure suivante :

    .
     build.sbt
     udfs.scala

    Modifier build.sbt (à ajuster en fonction de la version de Scala et de Spark) :

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",
      "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )

    Modifier udfs.scala :

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }

    Paquet :

    sbt package

    et include (ou équivalent selon la version de Scala) :

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    comme argument pour --driver-class-path lors du démarrage du shell / de la soumission de l'application.

    Dans PySpark, définissez un wrapper :

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    Test :

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
  • Dump de données vers un format JSON reflétant DenseVector et le relire :

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)

1 votes

f = sc._jvm.com.example.spark.udfs.udfs.as_vector() Cette ligne dans la méthode Pyspark donne l'erreur suivante TypeError: 'JavaPackage' object is not callable . Dois-je installer un paquet java pour cela ?

0 votes

@user7348570 Cela ressemble à un problème de CLASSPATH.

0 votes

J'ai aussi cette erreur. Quelle est la solution pour y remédier ?

3voto

GGDammy Points 49

J'ai eu le même problème que vous et j'ai procédé de cette façon. Cette méthode inclut la transformation des RDD, ce qui n'est pas un facteur de performance critique, mais elle fonctionne.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

le résultat est,

DataFrame[city: string, temperatures: vector]

0 votes

C'est une autre option, merci de l'avoir mentionnée. Mais si les performances ne sont pas critiques, vous pouvez également utiliser un UDF comme je l'ai mentionné dans ma question, ce qui, je pense, est mieux que RDD dans la plupart des cas. La transformation RDD envoie toutes les données à Python alors que l'UDF n'envoie que la colonne concernée. La transformation RDD nécessite également plus de code parce que vous devez dire comment traiter toutes les colonnes, même celles que vous voulez laisser non affectées, contrairement à UDF. Mais la transformation RDD pourrait être meilleure si vous voulez manipuler beaucoup de colonnes.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X