Я имею RDD данные, которые я конвертирована в JavaDStream, теперь я хочу, чтобы отправить его Кафка тему, Я не хочу Кафка отправки кода, только мне нужна реализация foreachRDD, мой код похож, каккак отправить данные с помощью foreachRDD с помощью Java
public void publishtoKafka(ITblStream t)
{
MyTopicProducer MTP = ProducerFactory.createProducer(hostname+":"+port);
JavaDStream<?> rdd = (JavaDStream<?>) t.getRDD();
rdd.foreachRDD(new Function<String, String>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
KafkaUtils.sendDataAsString(MTP,topicName, "String RDDData");
return null;
}
});
log.debug("------------------------sent to kafka: ------------------");
}
здесь myTopicproducer создаст производителя, который работает отлично KafkaUtils.sendDataAsString является метод, который будет публиковать данные Кафке тему также работает нормально,
у меня есть только одна проблема, я я не могу преобразовать JavaDS Tream РДД в виде строки с использованием Еогеаспа или foreachRDD наконец, мне нужно Струнное сообщение от РДА, любезно предложить Java код только и я не хочу, чтобы использовать анонимные классы,
Спасибо заранее,
@ serejja-его не работает, пожалуйста, вы можете отправить мне только часть отправить JavaDStream RDD с помощью foreachRDD, используя вызов функции, остальное я буду манипулировать. –
Можете ли вы указать, что не работает? Его обычно проще для других, если вы укажете, какая ошибка вы получаете, когда говорите «не работает». –
@Tathagata Das- когда я пытаюсь отправить JavaDStream > rdd, как указано выше, в моем коде, данные не публикуются в теме kafka, а также я пробовал в соответствии с комментариями serejja, а также –