3 votes

Analyse rapide de JSON avec de grandes régions pouvant être sautées

J'ai un JSON qui a la structure suivante (l'exemple du monde réel est ici https://gist.github.com/PavelPenkov/3432fe522e02aa3a8a597020d4ee7361 ):

{
  "metadata": { /* Huge TYPED object */ },
  "payload": { /* Small flat UNTYPED object */
    "field_1": 1
    "field_2": "Alice"
  }
}

Je veux extraire payload Le fichier est énorme et son analyse en classe de cas est plutôt lente (5000 op/s sur mon ordinateur portable). Jusqu'à présent, j'ai essayé

  1. Parse le document entier dans la classe de cas avec Jackson.

  2. Parse en AST avec Jackson et extrait seulement payload champ - légèrement plus rapide.

  3. scala-jsoniter alors qu'il peut probablement analyser la partie typée plus rapidement, il est incapable d'analyser les champs non typés par conception.

Existe-t-il d'autres options accessibles depuis Java ou (de préférence) Scala ?

2voto

Andriy Plokhotnyuk Points 3309

L'omission des valeurs JSON non désirées est le domaine dans lequel jsoniter-scala brille. Oui, il ne fournit pas de modèle AST pour JSON, mais vous pouvez le construire vous-même ou utiliser celui fourni par une bibliothèque tierce. Voici un exemple du codec personnalisé pour circe AST :

package io.circe

import java.util

import com.github.plokhotnyuk.jsoniter_scala.core._
import io.circe.Json._

object CirceJsoniter {
  implicit val codec: JsonValueCodec[Json] = new JsonValueCodec[Json] {
    override def decodeValue(in: JsonReader, default: Json): Json = {
      var b = in.nextToken()
      if (b == 'n') in.readNullOrError(default, "expected `null` value")
      else if (b == '"') {
        in.rollbackToken()
        new JString(in.readString(null))
      } else if (b == 'f' || b == 't') {
        in.rollbackToken()
        if (in.readBoolean()) Json.True
        else Json.False
      } else if ((b >= '0' && b <= '9') || b == '-') {
        new JNumber({
          in.rollbackToken()
          in.setMark() // TODO: add in.readNumberAsString() to Core API of jsoniter-scala
          try {
            do b = in.nextByte()
            while (b >= '0' && b <= '9')
          } catch { case _: JsonReaderException => /* ignore end of input error */} finally in.rollbackToMark()
          if (b == '.' || b == 'e' || b == 'E') new JsonDouble(in.readDouble())
          else new JsonLong(in.readLong())
        })
      } else if (b == '[') {
        new JArray(if (in.isNextToken(']')) Vector.empty
        else {
          in.rollbackToken()
          var x = new Array[Json](4)
          var i = 0
          do {
            if (i == x.length) x = java.util.Arrays.copyOf(x, i << 1)
            x(i) = decodeValue(in, default)
            i += 1
          } while (in.isNextToken(','))
          (if (in.isCurrentToken(']'))
            if (i == x.length) x
            else java.util.Arrays.copyOf(x, i)
          else in.arrayEndOrCommaError()).to[Vector]
        })
      } else if (b == '{') {
        new JObject(if (in.isNextToken('}')) JsonObject.empty
        else {
          val x = new util.LinkedHashMap[String, Json]
          in.rollbackToken()
          do x.put(in.readKeyAsString(), decodeValue(in, default))
          while (in.isNextToken(','))
          if (!in.isCurrentToken('}')) in.objectEndOrCommaError()
          JsonObject.fromLinkedHashMap(x)
        })
      } else in.decodeError("expected JSON value")
    }

    override def encodeValue(x: Json, out: JsonWriter): Unit = x match {
      case JNull => out.writeNull()
      case JString(s) => out.writeVal(s)
      case JBoolean(b) => out.writeVal(b)
      case JNumber(n) => n match {
        case JsonLong(l) => out.writeVal(l)
        case _ => out.writeVal(n.toDouble)
      }
      case JArray(a) =>
        out.writeArrayStart()
        a.foreach(v => encodeValue(v, out))
        out.writeArrayEnd()
      case JObject(o) =>
        out.writeObjectStart()
        o.toIterable.foreach { case (k, v) =>
          out.writeKey(k)
          encodeValue(v, out)
        }
        out.writeObjectEnd()
    }

    override def nullValue: Json = Json.Null
  }
}

Une autre option, si vous avez juste besoin d'extraire des octets de valeurs de charge utile, vous pouvez utiliser un code comme celui-ci pour le faire avec un taux de ~300000 messages par seconde pour l'échantillon fourni :

import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.reflect.io.Streamable
import scala.util.hashing.MurmurHash3

case class Payload private(bs: Array[Byte]) {
  def this(s: String) = this(s.getBytes(UTF_8))

  override lazy val hashCode: Int = MurmurHash3.arrayHash(bs)

  override def equals(obj: Any): Boolean = obj match {
    case that: Payload => java.util.Arrays.equals(bs, that.bs)
    case _ => false
  }

  override def toString: String = new String(bs, UTF_8)
}

object Payload {
  def apply(s: String) = new Payload(s.getBytes)

  implicit val codec: JsonValueCodec[Payload] = new JsonValueCodec[Payload] {
    override def decodeValue(in: JsonReader, default: Payload): Payload = new Payload(in.readRawValAsBytes())

    override def encodeValue(x: Payload, out: JsonWriter): Unit = out.writeRawVal(x.bs)

    override val nullValue: Payload = new Payload(new Array[Byte](0))
  }
}

case class MessageWithPayload(payload: Payload)

object MessageWithPayload {
  implicit val codec: JsonValueCodec[MessageWithPayload] = JsonCodecMaker.make(CodecMakerConfig())

  val jsonBytes: Array[Byte] = Streamable.bytes(getClass.getResourceAsStream("debezium.json"))
}

@State(Scope.Thread)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1, jvmArgs = Array(
"-server",
"-Xms2g",
"-Xmx2g",
"-XX:NewSize=1g",
"-XX:MaxNewSize=1g",
"-XX:InitialCodeCacheSize=512m",
"-XX:ReservedCodeCacheSize=512m",
"-XX:+UseParallelGC",
"-XX:-UseBiasedLocking",
"-XX:+AlwaysPreTouch"
))
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class ExtractPayloadReading {
  @Benchmark
  def jsoniterScala(): MessageWithPayload = readFromArray[MessageWithPayload](MessageWithPayload.jsonBytes)
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X