2016-05-30 2 views
1

Я читаю от kafka как строки из 2 разных файлов по 2 разные темы. Пример линий:kafka muliple topic seggregation in spark

например: FILE1: 2015-04-15T18:44:14+01:00,192.168.11.42,%ASA-2-106007:
Файл2: "04/15/2012","18:44:14",,"Start","Unknown","Unknown",,"192.168.63.128","444","2","7","192.168.63.128",,,,,,,,,,,,,,,,,

Я могу читать из искры из двух разных topics.Code, как показано ниже:

SparkConf sparkConfig = new SparkConf().setAppName("KafkaStreaming").setMaster("local[5]"); 
     JavaStreamingContext jsc = new JavaStreamingContext(sparkConfig,Durations.seconds(5)); 
     final HiveContext sqlContext = new HiveContext(jsc.sc()); 
     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jsc, 
                         prop.getProperty("zookeeper.connect"), 
                         prop.getProperty("group.id"), 
                         topicMap 
                         ); 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 

        private static final long serialVersionUID = 1L; 

        public String call(Tuple2<String, String> tuple2) { 
         return tuple2._2(); 
        } 
       }); 

Проблема я см. сейчас:

lines rdd содержит обе строки, которые очевидны. Как я могу отделить te или узнать, какие записи из какой темы или какого файла.
Причина в том, что я хочу применить различную логику для разных тем, которые исходят от искры. Но рдд имеет все строки во время

Цените любое предложение

ответ

1

Вы должны выбрать createDirectStream перегруженный метод, который принимает в качестве параметра scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler. Затем вам нужно передать как messageHandler функцию, которая - получение объекта MessageAndMetadata в качестве входных данных - возвращает фактическое сообщение и тему.

Здесь я отправляю вам код, написанный в Scala. Вы можете легко адаптировать его в Java:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, 
     kafkaParams, 
     topicOffsetsMap, 
     (m:MessageAndMetadata[String, String])=> (m.topic,m.message()) 
     ) 
Смежные вопросы