J'essaie de lire un fichier en utilisant le programme spark 2.1.0 SparkStreaming. Les fichiers csv sont stockés dans un répertoire sur ma machine locale et j'essaie d'utiliser le parquet writestream avec un nouveau fichier sur ma machine locale. Mais à chaque fois que j'essaie, il y a toujours une erreur dans .parquet ou des dossiers vides.
Voici mon code :
case class TDCS_M05A(TimeInterval:String ,GantryFrom:String ,GantryTo:String ,VehicleType:Integer ,SpaceMeanSpeed:Integer ,CarTimes:Integer)
object Streamingcsv {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Streamingcsv")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types._
val schema = StructType(
StructField("TimeInterval",DateType, false) ::
StructField("GantryFrom", StringType, false) ::
StructField("GantryTo", StringType, false) ::
StructField("VehicleType", IntegerType, false) ::
StructField("SpaceMeanSpeed", IntegerType, false) ::
StructField("CarTimes", IntegerType, false) :: Nil)
import org.apache.spark.sql.Encoders
val usrschema = Encoders.product[TDCS_M05A].schema
val csvDF = spark.readStream
.schema(usrschema) // Specify schema of the csv files
.csv("/home/hduser/IdeaProjects/spark2.1/data/*.csv")
val query = csvDF.select("GantryFrom").where("CarTimes > 0")
query
.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "checkpoint")
.start("/home/hduser/IdeaProjects/spark2.1/output/")
//.parquet("/home/hduser/IdeaProjects/spark2.1/output/")
//.start()
query.awaitTermination()
}
Je me réfère à la page Comment lire un fichier en utilisant sparkstreaming et écrire dans un fichier simple en utilisant Scala ? et cela ne fonctionne pas, s'il vous plaît aidez-moi ,merci.