7 votes

Les messages d'Offset sont réinitialisés en mode streaming structuré dans Spark

Spark (v2.4) Fonction du programme :

  • Lire les données JSON à partir de Kafka file d'attente en mode streaming structuré dans spark
  • Imprimer les données lues sur la console telles qu'elles sont

Les questions qui se posent :
- Obtenir Resetting offset for partition nifi-log-batch-0 to offset 2826180.

Code source :

package io.xyz.streaming

import org.apache.spark.sql.avro._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._

object readKafkaJson {
    private val topic = "nifi-log-batch"
    private val kafkaUrl = "http://<hostname>:9092"
    private val chk = "/home/xyz/tmp/checkpoint"
    private val outputFileLocation = "/home/xyz/abc/data"
    private val sparkSchema = StructType(Array(
                StructField("timestamp", StringType),
                StructField("level", StringType),
                StructField("thread", StringType),
                StructField("class", StringType),
                StructField("message", StringType),
                StructField("updatedOn", StringType),
                StructField("stackTrace", StringType)))

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder
            .appName("ConfluentConsumer")
            .master("local[*]")
            .getOrCreate()

        import spark.implicits._ 

        // ===================Read Kafka data in JSON==================
        val df = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUrl)
            .option("startingOffsets", "latest")
            .option("subscribe", topic)
            .load()

        val dfs1 = df
            .selectExpr("CAST(value AS STRING)")
            .select(from_json(col("value"), sparkSchema).alias("my_column"))
            .select("my_column.*")

        // ===================Write to console==================
        dfs1
            .writeStream
            .format("console")
            .start()
            .awaitTermination()

    }
}

Journal détaillé du problème sur la console :

2019-04-10 01:12:58 INFO  WriteToDataSourceV2Exec:54 - Start processing data source writer: org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057. The input RDD has 0 partitions.
2019-04-10 01:12:58 INFO  SparkContext:54 - Starting job: start at readKafkaJson.scala:70
2019-04-10 01:12:58 INFO  DAGScheduler:54 - Job 0 finished: start at readKafkaJson.scala:70, took 0.003870 s
2019-04-10 01:12:58 INFO  WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057 is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
2019-04-10 01:12:58 INFO  CodeGenerator:54 - Code generated in 41.952695 ms
+---------+-----+------+-----+-------+---------+----------+
|timestamp|level|thread|class|message|updatedOn|stackTrace|
+---------+-----+------+-----+-------+---------+----------+
+---------+-----+------+-----+-------+---------+----------+

2019-04-10 01:12:58 INFO  WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@622d0057 committed.
2019-04-10 01:12:58 INFO  SparkContext:54 - Starting job: start at readKafkaJson.scala:70
2019-04-10 01:12:58 INFO  DAGScheduler:54 - Job 1 finished: start at readKafkaJson.scala:70, took 0.000104 s
2019-04-10 01:12:58 INFO  CheckpointFileManager:54 - Writing atomically to file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/0 using temp file file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/.0.eb290a31-1965-40e7-9028-d18f2eea0627.tmp
2019-04-10 01:12:58 INFO  CheckpointFileManager:54 - Renamed temp file file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/.0.eb290a31-1965-40e7-9028-d18f2eea0627.tmp to file:/tmp/temporary-df2fea18-7b2f-4146-bcfd-7923cfab65e7/commits/0
2019-04-10 01:12:58 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "fb44fbef-5d05-4bb8-ae72-3327b98af261",
  "runId" : "ececfe49-bbc6-4964-8798-78980cbec525",
  "name" : null,
  "timestamp" : "2019-04-10T06:12:56.414Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1324,
    "getBatch" : 10,
    "getEndOffset" : 1,
    "queryPlanning" : 386,
    "setOffsetRange" : 609,
    "triggerExecution" : 2464,
    "walCommit" : 55
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[nifi-log-batch]]",
    "startOffset" : null,
    "endOffset" : {
      "nifi-log-batch" : {
        "0" : 2826180
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@6ced6212"
  }
}
2019-04-10 01:12:58 INFO  Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "fb44fbef-5d05-4bb8-ae72-3327b98af261",
  "runId" : "ececfe49-bbc6-4964-8798-78980cbec525",
  "name" : null,
  "timestamp" : "2019-04-10T06:12:58.935Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getEndOffset" : 1,
    "setOffsetRange" : 11,
    "triggerExecution" : 15
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[nifi-log-batch]]",
    "startOffset" : {
      "nifi-log-batch" : {
        "0" : 2826180
      }
    },
    "endOffset" : {
      "nifi-log-batch" : {
        "0" : 2826180
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@6ced6212"
  }
}
2019-04-10 01:12:58 INFO  Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO  Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.
2019-04-10 01:12:58 INFO  Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-9a027b2b-0a3a-4773-a356-a585e488062c--81433247-driver-0] Resetting offset for partition nifi-log-batch-0 to offset 2826180.

Même lorsque j'exécute un code équivalent dans pySpark Je suis également confronté au même problème.
Veuillez nous indiquer comment résoudre ce problème.

  • Kafka : v2.1.0 cpl, confluent
  • Étincelle : 2,4

Le travail a été soumis par le biais de la commande suivante :

 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --jars /home/xyz/Softwares/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar --class io.xyz.streaming.readKafkaJson --master local[*] /home/xyz/ScalaCode/target/SparkSchemaKafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar

0voto

Dennis Jaheruddin Points 10154

Il semble que l'auteur de la demande ait déjà trouvé la solution, voici les passages pertinents des commentaires :

Résolution principale

Il s'agissait d'un problème de structure de schéma en Scala. Après avoir corrigé le le problème a été résolu.

Thème secondaire

dans le code Pyspark, le traitement a lieu mais les messages ne sont pas transmis s'arrêtent pas, c'est-à-dire que je suis capable d'exécuter le code et d'écrire les données du flux dans un fichier JSON, mais les messages ne s'arrêtent pas. dans un fichier JSON, mais les messages de la console sont remplis des messages mentionnés ci-dessus Resetting offset for ... les messages du journal

Le problème avec pyspark était en fait que des messages INFO étaient imprimés, ce que j'ai désactivé

Après quoi, tout est rentré dans l'ordre.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X