2015-10-06 4 views
0

Я новичок в Spark, может кто-нибудь мне помочь?DStrream [String] .foreachrdd на Spark cluster

def streamStart() { 
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar")) 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

val topics = "test"; 
ssc.checkpoint("checkpoint") 
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2) 
lines.print() 
println("*****************************************************************************") 
lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n"))) 
println("-----------------------------------------------------------------------------") 
ssc.start() 
ssc.awaitTermination() 

На Спарк автономного кластера, что код не работает, но на местном [*], она работает правильно:

lines.foreachRDD(
    iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n") 
    ) 
    ) 
+2

И что это значит _it не work_? Или _works правильно_ в этом отношении. – zero323

+0

Я подозреваю, что вы подаете свое заявление не так. Вы используете spark-submit? Невозможно (на самом деле очень сложно) отправить приложение в кластер из кода. Посмотрите здесь: http://spark.apache.org/docs/latest/submitting-applications.html –

+0

@dwysakowicz да я отправляю работу через spark-submit –

ответ

0

Я полагаю, что его называют, как «работает правильно» что вы видите println s на консоли.

Когда вы отправляете один и тот же код в кластер, println на консоль происходит локально на каждом исполнителе, поэтому, если все остальное работает, отсутствие вывода является единственным результатом распределенного выполнения.

Посмотрите на выходе исполнителей для тех, кто println s

+0

Я написал 'println' здесь, чтобы сократить код. моя фактическая задача - сохранить входящие данные в hbase после некоторых вычислений. на локальном уровне он работает (значит, сэкономить до hbase успешно), но в режиме автономного кластера в исправном режиме он не работает :( –

+0

вы упомянули hbase .. вы уверены, что используете правильную конфигурацию? Вы указываете параметры подключения в коде или через * .xml? На рабочих может не подобрать правильную конфигурацию. –

Смежные вопросы