2 votes

Comment lire à partir de embedded-kafka avec fs2-kafka ?

J'utilise fs2-kafka à lire à partir de embedded-kafka .

Je crée le kafka intégré en utilisant withRunningKafkaOnFoundPort , créez un sujet et publiez quelques messages. Cependant, lorsque j'essaie de les relire avec fs2-kafka, j'obtiens une NullPointerException. J'ai isolé un cas de test et le code est ci-dessous.

Voici mon code :

import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
  implicit val contextShift = IO.contextShift(singleThreadExecutor)
  implicit val timer = IO.timer(singleThreadExecutor)

  val topic = "example"
  val partition = 0
  val clientId = "client"

  test("works") {
    val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

    withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
      createCustomTopic(topic)
      publishStringMessageToKafka(topic, "example-message1")
      publishStringMessageToKafka(topic, "example-message2")
      publishStringMessageToKafka(topic, "example-message3")
      publishStringMessageToKafka(topic, "example-message4")

      val broker = s"localhost:${actualConfig.kafkaPort}"

      val consumerSettings = ConsumerSettings[IO, String, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers(broker)
        .withGroupId("group")
        .withClientId(clientId)

      val r = consumerStream[IO].using(consumerSettings)
        .evalTap(_.subscribeTo(topic))
        .evalTap(_.seekToBeginning)
        .flatMap { consumer =>
          consumer.stream.take(1)
        }
        .compile
        .toList

      val res = r.unsafeRunSync()
      Console.println(res)
      assert(res.size == 1)
    }
  }

}

build.sbt :

name := "test"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies ++= Seq(
  "org.scalatest" % "scalatest_2.12" % "3.1.2" % "test",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "com.github.fd4s" %% "fs2-kafka" % "1.0.0",
  "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1.1" % Test
)

Et voici la trace de la pile :

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at java.lang.String.<init>(String.java:515)
    at fs2.kafka.Deserializer$.$anonfun$string$1(Deserializer.scala:208)
    at fs2.kafka.Deserializer$.$anonfun$lift$1(Deserializer.scala:184)
    at fs2.kafka.Deserializer$$anon$1.deserialize(Deserializer.scala:133)
    at fs2.kafka.ConsumerRecord$.deserializeFromBytes(ConsumerRecord.scala:166)
    at fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:177)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:378)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:300)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:245)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:376)
    at cats.instances.VectorInstances$$anon$1.$anonfun$traverse$2(vector.scala:80)
    at cats.instances.VectorInstances$$anon$1.loop$2(vector.scala:43)
    at cats.instances.VectorInstances$$anon$1.$anonfun$foldRight$2(vector.scala:44)
    at cats.Eval$.advance(Eval.scala:271)
    at cats.Eval$.loop$1(Eval.scala:350)
    at cats.Eval$.cats$Eval$$evaluate(Eval.scala:368)
    at cats.Eval$Defer.value(Eval.scala:257)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:79)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:15)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:373)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:405)
    at cats.effect.internals.IORunLoop$.liftedTree1$1(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
    at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
    at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
    at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
    at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2voto

Lev Denisov Points 397

Il s'avère que le problème vient du fait que le type de clé dans ConsumerSettings[IO, String, String] est String mais embedded-kafka écrit Null en tant que clé, de sorte que la désérialisation de la clé se solde par une exception NullPointerException. En définissant le type de clé à Unit résout le problème avec une exception.

Un autre problème est que withRunningKafkaOnFoundPort avant que l'évaluation de l'OI ne commence. Pour le faire fonctionner, il est nécessaire de faire un Resource de embedded-kafka et d'y intégrer l'IO.

val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))

Le problème suivant est que fs2-kafka ne peut pas fonctionner avec un exécuteur à un seul thread, vous devez donc lui fournir un pool d'exécuteurs (par exemple ExecutionContext.global ).

Voici un exemple complet de fonctionnement :

import cats.effect._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.FunSuite

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  implicit val ec = ExecutionContext.global
  implicit val contextShift = IO.contextShift(ec)
  implicit val timer = IO.timer(ec)

  val topic = "example"
  val partition = 0
  val clientId = "client"
  val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

  def broker(port: Long) = s"localhost:${port}"

  val consumerSettings = ConsumerSettings[IO, Unit, String]
    .withAutoOffsetReset(AutoOffsetReset.Earliest)
    .withEnableAutoCommit(true)
    .withGroupId("group")
    .withClientId(clientId)

  val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))

  test("works") {
    val r = Stream.resource(embeddedKafka).flatMap { kafka =>
      implicit val actualConfig: EmbeddedKafkaConfig = kafka.config
      createCustomTopic(topic)
      publishStringMessageToKafka(topic, "example-message1")
      publishStringMessageToKafka(topic, "example-message2")
      publishStringMessageToKafka(topic, "example-message3")
      publishStringMessageToKafka(topic, "example-message4")

      consumerStream(consumerSettings.withBootstrapServers(broker(actualConfig.kafkaPort)))
        .evalTap(_.subscribeTo(topic))
        .evalTap(_.seekToBeginning)
        .flatMap(_.stream)
        .map(_.record.value)
        .take(1)
    }
    val res = r.compile.toList.unsafeRunSync()
    assert(res.contains("example-message1"))
  }

}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X