61 votes

Comment faire fondre Spark DataFrame?

Existe-t-il un équivalent de Pandas Melt Function dans Apache Spark dans PySpark ou au moins dans Scala?

J'exécutais un exemple de jeu de données jusqu'à présent en python et maintenant je veux utiliser Spark pour l'ensemble de données.

Merci d'avance.

110voto

user6910411 Points 32156

Il n'existe pas de fonction intégrée (si vous travaillez avec SQL et la Ruche est activé, vous pouvez utiliser stack de la fonction, mais elle n'est pas exposée dans l'Étincelle et n'a pas d'implémentation native), mais il est trivial de rouler votre propre. Nécessaire des importations:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

Exemple de mise en œuvre:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

Et certains tests (basé sur les Pandas doctests):

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Remarque: Pour une utilisation avec d'anciennes versions de Python supprimer des annotations de type.

Connexes:

34voto

Ahue Points 51

Je suis tombé sur cette question dans ma recherche d'une implémentation de melt dans Spark pour Scala.

Publier mon port Scala au cas où quelqu'un tomberait également dessus.

 import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}
 

Étant donné que je ne suis pas très avancé compte tenu de Scala , je suis sûr qu'il y a place à amélioration.

Tout commentaire est le bienvenu.

3voto

Anton Alekseev Points 171

Pour une trame de données très large, les performances diminuent lors de la génération de _vars_and_vals à partir de la réponse de l'utilisateur 6910411.

Il était utile de mettre en œuvre la fusion via selectExpr

 columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...
 

-2voto

user18101 Points 304

Voici la version pyspark

 def melt(df, id_vars, value_vars, new_col_name="new_column"):
    var_name = "variable"
    _vars_and_vals = F.array(*(
        F.struct(F.lit(c).alias(var_name), F.col(c).alias(new_col_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", F.explode(_vars_and_vals))

    cols = id_vars + [
            F.col("_vars_and_vals")[x].alias(x) for x in [var_name, new_col_name]]
    return _tmp.select(*cols)
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X