3 votes

Comment lire des fichiers .csv avec spark streaming et écrire dans un fichier parquet avec Scala ?

J'essaie de lire un fichier en utilisant le programme spark 2.1.0 SparkStreaming. Les fichiers csv sont stockés dans un répertoire sur ma machine locale et j'essaie d'utiliser le parquet writestream avec un nouveau fichier sur ma machine locale. Mais à chaque fois que j'essaie, il y a toujours une erreur dans .parquet ou des dossiers vides.

Voici mon code :

case class TDCS_M05A(TimeInterval:String ,GantryFrom:String ,GantryTo:String ,VehicleType:Integer ,SpaceMeanSpeed:Integer ,CarTimes:Integer)

object Streamingcsv {
  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("Streamingcsv")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    import org.apache.spark.sql.types._

    val schema = StructType(
        StructField("TimeInterval",DateType, false) ::
        StructField("GantryFrom", StringType, false) ::
        StructField("GantryTo", StringType, false) ::
        StructField("VehicleType", IntegerType, false) ::
        StructField("SpaceMeanSpeed", IntegerType, false) ::
        StructField("CarTimes", IntegerType, false) ::  Nil)

    import org.apache.spark.sql.Encoders

    val usrschema = Encoders.product[TDCS_M05A].schema

    val csvDF = spark.readStream
      .schema(usrschema) // Specify schema of the csv files
      .csv("/home/hduser/IdeaProjects/spark2.1/data/*.csv")

    val query = csvDF.select("GantryFrom").where("CarTimes > 0")

    query
      .writeStream
      .outputMode("append")
      .format("parquet")
      .option("checkpointLocation", "checkpoint")
      .start("/home/hduser/IdeaProjects/spark2.1/output/")
      //.parquet("/home/hduser/IdeaProjects/spark2.1/output/")
      //.start()

    query.awaitTermination()
  }

Je me réfère à la page Comment lire un fichier en utilisant sparkstreaming et écrire dans un fichier simple en utilisant Scala ? et cela ne fonctionne pas, s'il vous plaît aidez-moi ,merci.

0voto

Gabe Church Points 73

Vous devez vous assurer que le répertoire de point de contrôle existe (ou le créer) avant de commencer. Votre implémentation de query doit également inclure un val pour query qui est séparé de votre DF.

Si vous n'en avez pas besoin, ne mettez pas à l'intérieur un objet contenant des args vides afin de pouvoir accéder directement à d'autres méthodes telles que

 query.lastProgress
 query.stop

(pour n'en citer que quelques-uns)

Changez la seconde moitié de votre code pour qu'elle ressemble à ceci.

import org.apache.spark.sql.streaming.OutputMode

val csvQueriedDF = csvDF.select("GantryFrom").where("CarTimes > 0")

val query = csvQueriedDF
  .writeStream
  .outputMode(OutputMode.Append())
  .format("parquet")
  .option("checkpointLocation", "/home/hduser/IdeaProjects/spark2.1/partialTempOutput")
  .option("path", "/home/hduser/IdeaProjects/spark2.1/output/")
  .start()

Bonne chance !

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X