2 votes

Arrêter le traitement des gros fichiers texte dans Apache Spark après un certain nombre d'erreurs

Je suis très novice en matière de Spark. Je travaille en 1.6.1. Imaginons que j'ai un gros fichier, je le lis dans RDD[String] via textFile. Ensuite, je veux valider chaque ligne dans une fonction. Parce que le fichier est énorme, je veux arrêter le traitement lorsque j'ai atteint un certain nombre d'erreurs, disons 1000 lignes. Quelque chose comme

val rdd = sparkContext.textFile(fileName) rdd.map(line => myValidator.validate(line))

voici la fonction de validation :

def validate(line:String) : (String, String) = { // 1st in Tuple for resulted line, 2nd ,say, for validation error. }

Comment calculer les erreurs dans 'validate' ? Il est en fait exécuté en parallèle sur plusieurs nœuds ? Des diffusions ? Accumulateurs ?

2voto

Tzach Zohar Points 6701

Vous pouvez obtenir ce comportement en utilisant la paresse de Spark en "divisant" le résultat de l'analyse syntaxique en succès et échecs, en appelant take(n) sur les échecs, et n'utiliser les données de réussite que s'il y a moins de 10 ans. n échecs.

Pour y parvenir de manière plus pratique, je suggère de modifier la signature de la fonction validate pour retourner un type qui peut facilement distinguer le succès de l'échec, par ex. scala.util.Try :

def validate(line:String) : Try[String] = {
    // returns Success[String] on success, 
    // Failure (with details in the exception object) otherwise 
}

Et ensuite, quelque chose comme :

val maxFailures = 1000
val rdd = sparkContext.textFile(fileName)
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache()

val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures)

if (failures.size == maxFailures) { 
  // report failures... 
} else {
  val success: RDD[String] = parsed.collect { case Success(s) => s }
  // continue here...
}

Pourquoi cela fonctionnerait-il ?

  • Si le nombre d'échecs est inférieur à 1000, l'ensemble des données sera analysé lorsque l'utilisateur aura terminé son travail. take(maxFailures) est appelé, les données réussies seront mises en cache et prêtes à être utilisées.
  • Si le nombre d'échecs est égal ou supérieur à 1000, l'analyse s'arrête là, car la fonction take l'opération ne nécessitera plus de lecture

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X