2014-11-28 2 views
4

Я интегрирую Kafka и Spark, используя искрообразование. Я создал тему в качестве продюсера Кафки:Kafka Spark streaming: не удалось прочитать сообщения

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

Я издаю сообщения в Кафка и пытаюсь читать их с помощью искрового потокового кода Java и отображения их на экране.
Демоны все: Искра-мастер, рабочий; работник зоопарка; Кафка.
Я пишу код Java для выполнения его, используя KafkaUtils.createStream
код ниже:

public class SparkStream { 
    public static void main(String args[]) 
    { 
     if(args.length != 3) 
     { 
      System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>"); 
      System.exit(1); 
     } 


     Map<String,Integer> topicMap = new HashMap<String,Integer>(); 
     String[] topic = args[2].split(","); 
     for(String t: topic) 
     { 
      topicMap.put(t, new Integer(1)); 
     } 

     JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000)); 
     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 

     System.out.println("Connection done++++++++++++++"); 
     JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
               { 
                public String call(Tuple2<String, String> message) 
                { 
                 System.out.println("NewMessage: "+message._2()+"++++++++++++++++++"); 
                 return message._2(); 
                } 
               } 
               ); 
     data.print(); 

     jssc.start(); 
     jssc.awaitTermination(); 

    } 
} 

Я бегу работу, а на другом терминале я бегу Кафка-производителя публиковать сообщения:

Hi kafka 
second message 
another message 

но выходные журналы на искру потоковой консоли не показывает сообщения, но показывает нулевые блоки получили:

------------------------------------------- 
Time: 1417438988000 ms 
------------------------------------------- 

2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms 
2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms 
2014-12-01 08:03:08,009 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s) 
2014-12-01 08:03:08,010 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms 
2014-12-01 08:03:08,015 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list 
2014-12-01 08:03:08,024 INFO [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39 
2014-12-01 08:03:08,027 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list 
2014-12-01 08:03:08,031 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38 
2014-12-01 08:03:08,033 INFO [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms 
2014-12-01 08:03:09,002 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks 

Почему не получает блок данных? я пробовал использовать kafka производитель-потребитель на консоли bin/kafka-console-producer.... и bin/kafka-console-consumer... его работала идеально, но почему бы не мой код ... любая идея?

ответ

7

Проблема решена.

приведенный выше код. Мы добавим еще две строки, чтобы подавить созданные [INFO] и [WARN]. Таким образом, окончательный код:

package com.spark; 

import scala.Tuple2; 
import org.apache.log4j.Logger; 
import org.apache.log4j.Level; 
import kafka.serializer.Decoder; 
import kafka.serializer.Encoder; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.*; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.api.java.*; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.kafka.*; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import java.util.Map; 
import java.util.HashMap; 

public class SparkStream { 
    public static void main(String args[]) 
    { 
     if(args.length != 3) 
     { 
      System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>"); 
      System.exit(1); 
     } 

     Logger.getLogger("org").setLevel(Level.OFF); 
     Logger.getLogger("akka").setLevel(Level.OFF); 
     Map<String,Integer> topicMap = new HashMap<String,Integer>(); 
     String[] topic = args[2].split(","); 
     for(String t: topic) 
     { 
      topicMap.put(t, new Integer(3)); 
     } 

     JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000)); 
     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 

     System.out.println("Connection done++++++++++++++"); 
     JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
               { 
                public String call(Tuple2<String, String> message) 
                { 
                 return message._2(); 
                } 
               } 
               ); 
     data.print(); 

     jssc.start(); 
     jssc.awaitTermination(); 

    } 
} 

Также необходимо добавить зависимость в pom.xml:

<dependency> 
<groupId>com.msiops.footing</groupId> 
<artifactId>footing-tuple</artifactId> 
<version>0.2</version> 
</dependency> 

Эта зависимость используется для использования в scala.Tuple2
Ошибка Stream 0 received 0 block объясняется к искровому рабочему, недоступному, и ядро-искрообразователь было установлено равным 1. Для искрообразования нам нужно, чтобы ядро ​​составляло> = 2. Итак, нам нужно внести изменения в файл spark-config. Обратитесь к руководству по установке. для добавления строки export SPARK_WORKER_CORE=5 Также измените SPARK_MASTER='hostname' на SPARK_MASTER=<your local IP>. Этот локальный ip - это то, что вы видите в BOLD, когда вы идете на веб-консоль Spark UI ... что-то вроде: spark://192.168..:<port>. Нам здесь не нужен порт. требуется только IP.
Теперь перезагрузите искровой-мастер и искровым рабочего и начать потоковое :)

выход:

------------------------------------------- 
Time: 1417443060000 ms 
------------------------------------------- 
message 1 

------------------------------------------- 
Time: 1417443061000 ms 
------------------------------------------- 
message 2 

------------------------------------------- 
Time: 1417443063000 ms 
------------------------------------------- 
message 3 
message 4 

------------------------------------------- 
Time: 1417443064000 ms 
------------------------------------------- 
message 5 
message 6 
messag 7 

------------------------------------------- 
Time: 1417443065000 ms 
------------------------------------------- 
message 8 
+0

@ aiman- Не могли бы вы рассказать о своем ответе. Какой искровой файл конфигурации. Где я могу найти этот файл? – kit

+0

изменить количество ядер (скрипт conf/spark-env.sh) на каждом узле или изменить его глобально в conf/spark-defaults.conf – aiman

2

Да, вам необходимо получить доступ к контенту из DStream.

messages.foreachRDD(<<processing for the input received in the interval>>); 
+0

Привет Виджай. Я уже добавил строку messages.print(). это не работает? как использовать логику печати внутри foreachRDD()? – aiman

+0

Hi Vijay, согласно вашему предложению, я отредактировал код выше, т. Е. Добавил эти строки: – aiman

+0

JavaDStream data = messages.карта (новая функция , String>() \t \t \t \t \t \t \t \t \t \t \t \t { \t \t \t \t \t \t \t \t \t \t \t \t \t общественного Строка вызова (Tuple2 < String, String>) \t \t \t \t \t \t \t \t \t \t \t \t \t {\t \t \t \t \t \t \t \t \t \t \t \t \t \t System.out.println ("NewMessage:" + message._2()); \t \t \t \t \t \t \t \t \t \t \t \t \t \t возвращение message._2(); \t \t \t \t \t \t \t \t \t \t \t \t \t} \t \t \t \t \t \t \t \t \t \t \t \t} \t \t \t \t \t \t \t \t \t \t \t \t); \t \t данные.print(); – aiman

Смежные вопросы