3 votes

Extraire la valeur d'un sujet spécifique de RDD

J'essaie de lire dans spark des sujets kafka comme ci-dessous :

Map<TopicAndPartition, Long> map = new HashMap<>();
        map.put(new TopicAndPartition("A", 0), 1L);
        map.put(new TopicAndPartition("B", 0), 1L);

        JavaInputDStream<Map.Entry> topicMessages = KafkaUtils.createDirectStream(
                                                                            jssc,
                                                                            String.class,
                                                                            String.class,
                                                                            StringDecoder.class,
                                                                            StringDecoder.class,
                                                                            Map.Entry.class,
                                                                            kafkaParams,
                                                                            map,
                                                                            messageAndMetadata -> 
                                                                                new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
                                                                                                              messageAndMetadata.message())
                                                                          );

Maintenant, le topicMessage a toutes les valeurs dans le format clé et valeur comme ci-dessous :

A="04/15/2015","18:44:28"
A="04/15/2015","18:44:28"
A="04/15/2015","18:44:28"
B="04/15/2016","18:44:28"
B="04/15/2014","18:44:28"  

Comment puis-je extraire les valeurs de sujets particuliers.
Quelque chose comme ci-dessous pour le sujet nommé B

"04/15/2016","18:44:28"
"04/15/2014","18:44:28"

2voto

mgaido Points 2204

Si vous voulez les rangs pour un sujet donné, il suffit de faire :

JavaPairDStream<String> rowsFromTopicB = topicMessages.filter( entry -> entry.getKey().toString().equals("B")).map(entry -> entry.getValue().toString())

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X