2015-01-30 2 views
2

Я имею RDD данные, которые я конвертирована в JavaDStream, теперь я хочу, чтобы отправить его Кафка тему, Я не хочу Кафка отправки кода, только мне нужна реализация foreachRDD, мой код похож, каккак отправить данные с помощью foreachRDD с помощью Java

public void publishtoKafka(ITblStream t) 
    { 
     MyTopicProducer MTP = ProducerFactory.createProducer(hostname+":"+port); 
     JavaDStream<?> rdd = (JavaDStream<?>) t.getRDD(); 

     rdd.foreachRDD(new Function<String, String>() { 
      @Override 
      public Void call(JavaRDD<String> rdd) throws Exception { 
      KafkaUtils.sendDataAsString(MTP,topicName, "String RDDData"); 
      return null; 
      } 
      }); 
     log.debug("------------------------sent to kafka: ------------------"); 

    } 

здесь myTopicproducer создаст производителя, который работает отлично KafkaUtils.sendDataAsString является метод, который будет публиковать данные Кафке тему также работает нормально,

у меня есть только одна проблема, я я не могу преобразовать JavaDS Tream РДД в виде строки с использованием Еогеаспа или foreachRDD наконец, мне нужно Струнное сообщение от РДА, любезно предложить Java код только и я не хочу, чтобы использовать анонимные классы,

Спасибо заранее,

ответ

3

Я не проверял, что, но должно работать нормально:

public Void call(JavaRDD<String> rdd) throws Exception { 
    for (rddData : rdd.collect()) { 
     KafkaUtils.sendDataAsString(MTP,topicName, rddData); 
    } 
    return null; 
} 

Дело в том, что вам нужно collect ваш RDD (javadoc here), чтобы получить фактический сбор данных Струнные из вашего РДУ.

+0

@ serejja-его не работает, пожалуйста, вы можете отправить мне только часть отправить JavaDStream RDD с помощью foreachRDD, используя вызов функции, остальное я буду манипулировать. –

+0

Можете ли вы указать, что не работает? Его обычно проще для других, если вы укажете, какая ошибка вы получаете, когда говорите «не работает». –

+0

@Tathagata Das- когда я пытаюсь отправить JavaDStream rdd, как указано выше, в моем коде, данные не публикуются в теме kafka, а также я пробовал в соответствии с комментариями serejja, а также –

1

проблема пчелы решена, , когда я использовал

rdd.foreachRDD(new Function<JavaRDD<String>, Void>() { 
       @Override 
       public Void call(JavaRDD<String> rdd) throws Exception { 
       if(rdd!=null) 
      { 
      List<String> result = rdd.collect(); 
      KafkaUtil.sendString(p,topic,result.get(0)); 
KafkaUtils.sendDataAsString(MTP,topicName, result.get(0)); 

      } 
Смежные вопросы