Nous essayons d'allouer le même exécuteur et le même partitionneur pour les RDD afin d'éviter tout trafic réseau. De plus, les opérations de brassage telles que les cogroupes et les jointures n'ont pas de limites d'étape et toutes les transformations sont réalisées en une seule étape.
Pour cela, nous enveloppons le RDD avec notre classe RDD personnalisée (ExtendRDD.class) en Java qui a une fonction getPreferredLocation surchargée de RDD.class (en scala) comme :
public Seq<String> getPreferredLocations(Partition split){
listString.add("11.113.57.142");
listString.add("11.113.57.163");
listString.add("11.113.57.150");
List<String> finalList = new ArrayList<String>();
finalList.add(listString.get(split.index() % listString.size()));
Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq();
return toReturnListString;
}
Grâce à cela, nous sommes en mesure de contrôler le comportement de l'étincelle en ce qui concerne le nœud sur lequel il place le RDD dans le cluster. Mais le problème maintenant est, puisque le partitionneur pour ces RDDs étant différent, spark les considère comme dépendant du shuffle et crée à nouveau plusieurs étapes pour ces opérations de shuffle. Nous avons essayé de remplacer la méthode partitioner de la même classe RDD.class dans le même RDD personnalisé comme :
public Option<Partitioner> partitioner() {
Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner());
return optionPartitioner;
}
Pour que spark les place sous la même étape, il doit considérer que ces RDDs proviennent du même partitionneur. Notre méthode de partitionnement ne semble pas fonctionner car spark prend le partitionnement différent pour 2 RDDs et crée plusieurs étapes pour les opérations de shuffle.
Nous avons enveloppé le RDD de scala avec notre RDD personnalisé comme :
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1);
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
Nous créons un autre RDD personnalisé de manière similaire et obtenons un PairRDD(pairRDD2) à partir de ce RDD. Ensuite, nous essayons d'appliquer le même partitionneur que dans l'objet extendRDD à l'objet PairRDDFunction en utilisant la fonction partitionBy, puis nous appliquons le cogroupe à cet objet :
RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl());
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null);
pair.partitionBy(extendRDD2.getPartitioner());
pair.cogroup(pairRDD2);
Tout cela ne semble pas fonctionner car l'étincelle crée de multiples étapes lorsqu'elle rencontre la transformation du cogroupe.
Avez-vous des suggestions sur la manière d'appliquer le même partitionneur aux RDD ?