Я интегрирую Kafka и Spark, используя искрообразование. Я создал тему в качестве продюсера Кафки:Kafka Spark streaming: не удалось прочитать сообщения
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Я издаю сообщения в Кафка и пытаюсь читать их с помощью искрового потокового кода Java и отображения их на экране.
Демоны все: Искра-мастер, рабочий; работник зоопарка; Кафка.
Я пишу код Java для выполнения его, используя KafkaUtils.createStream
код ниже:
public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
System.exit(1);
}
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}
JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
System.out.println("Connection done++++++++++++++");
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
return message._2();
}
}
);
data.print();
jssc.start();
jssc.awaitTermination();
}
}
Я бегу работу, а на другом терминале я бегу Кафка-производителя публиковать сообщения:
Hi kafka
second message
another message
но выходные журналы на искру потоковой консоли не показывает сообщения, но показывает нулевые блоки получили:
-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------
2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
Почему не получает блок данных? я пробовал использовать kafka производитель-потребитель на консоли bin/kafka-console-producer....
и bin/kafka-console-consumer...
его работала идеально, но почему бы не мой код ... любая идея?
@ aiman- Не могли бы вы рассказать о своем ответе. Какой искровой файл конфигурации. Где я могу найти этот файл? – kit
изменить количество ядер (скрипт conf/spark-env.sh) на каждом узле или изменить его глобально в conf/spark-defaults.conf – aiman