2 votes

Comment convertir les Seq en RDD lorsqu'on travaille avec le contexte de streaming de spark ?

J'utilise TestSuiteBase pour créer des tests avec spark-streaming (en utilisant le contexte de streaming spark scc ). Ensuite, je crée des données fictives en utilisant output: Seq[Seq[(Double, Double)]] . Enfin, je veux appliquer une fonction à output mais cette fonction accepte RDD[(Double, Double)] pas Seq[Seq[(Double, Double)]] .

Pour résoudre ce problème, j'envisage d'utiliser val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten) mais comment et où obtenir le contexte de l'étincelle ? sc de scc ? Ou, peut-être, existe-t-il un moyen de créer directement des données fictives dans le fichier RDD sans utiliser Seq ?

class StreamingTestLR  extends SparkFunSuite
                       with TestSuiteBase {

  // use longer wait time to ensure job completion
  override def maxWaitTimeMillis: Int = 20000

  var ssc: StreamingContext = _

  override def afterFunction() {
    super.afterFunction()
    if (ssc != null) {
      ssc.stop()
    }
  }

//...

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)

// THE PROBLEM IS HERE!!!
// val metrics = new SomeFuncThatAcceptsRDD(rdd)

}

UPDATE

  // Test if the prediction accuracy of increases when using hyper-parameter optimization
  // in order to learn Y = 10*X1 + 10*X2 on streaming data
  test("Test 1") {
    // create model initialized with zero weights
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(0.0, 0.0))
      .setStepSize(0.2)
      .setNumIterations(25)

    // generate sequence of simulated data for testing
    val numBatches = 10
    val nPoints = 100
    val testInput = (0 until numBatches).map { i =>
      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
    }
    val inputDStream = DStream[LabeledPoint]

    withStreamingContext(setupStreams(testInput, inputDStream)) { ssc =>
      model.trainOn(inputDStream)
      model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
      val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)

      val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten)

      // Instantiate metrics object
      val metrics = new RegressionMetrics(rdd)

      // Squared error
      println(s"MSE = ${metrics.meanSquaredError}")
      println(s"RMSE = ${metrics.rootMeanSquaredError}")

      // R-squared
      println(s"R-squared = ${metrics.r2}")

      // Mean absolute error
      println(s"MAE = ${metrics.meanAbsoluteError}")

      // Explained variance
      println(s"Explained variance = ${metrics.explainedVariance}")
    }
  }

3voto

Vitalii Kotliarenko Points 2388

Essayez ça :

 class MyTestSuite extends TestSuiteBase with BeforeAndAfter {

  test("my test") {
    withTestServer(new TestServer()) { testServer =>
      // Start the server
      testServer.start()
      // Set up the streaming context and input streams
      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
        val rdd = ssc.sparkContext.parallelize(output.flatten)
        // your code here 
        testServer.stop()
        ssc.stop()
      }
     }
    }
 }

Plus de détails ici : https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X