Я читаю от kafka как строки из 2 разных файлов по 2 разные темы. Пример линий:kafka muliple topic seggregation in spark
например: FILE1: 2015-04-15T18:44:14+01:00,192.168.11.42,%ASA-2-106007:
Файл2: "04/15/2012","18:44:14",,"Start","Unknown","Unknown",,"192.168.63.128","444","2","7","192.168.63.128",,,,,,,,,,,,,,,,,
Я могу читать из искры из двух разных topics.Code, как показано ниже:
SparkConf sparkConfig = new SparkConf().setAppName("KafkaStreaming").setMaster("local[5]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConfig,Durations.seconds(5));
final HiveContext sqlContext = new HiveContext(jsc.sc());
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jsc,
prop.getProperty("zookeeper.connect"),
prop.getProperty("group.id"),
topicMap
);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
Проблема я см. сейчас:
lines rdd содержит обе строки, которые очевидны. Как я могу отделить te или узнать, какие записи из какой темы или какого файла.
Причина в том, что я хочу применить различную логику для разных тем, которые исходят от искры. Но рдд имеет все строки во время
Цените любое предложение