2016-04-01 4 views
2

У меня есть поток потока, который я хочу сохранить в HDFS через искру. Ниже искра коды, который я бегFlume + Spark - Хранение DStream в HDFS

object FlumePull { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println(
     "Usage: FlumePollingEventCount <host> <port>") 
     System.exit(1) 
    } 

    val batchInterval = Milliseconds(60000) 
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") 
    val ssc = new StreamingContext(sparkConf, batchInterval) 
    val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999) 

    stream.map(x => x + "!!!!") 
      .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

Когда я начинаю spsark потоковой работу, он делает хранит выход в HDFS, но выводится что-то вроде этого:

[[email protected] ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
org.apache.spark.streaming.flume.SparkF[email protected]!!!! 
[email protected]!!!! 

Он хранит водовод событие вместо данные, поступающие из Flume. Как получить данные из этого?

Благодаря

ответ

0

Вам необходимо извлечь основной буфер из SparkFlumeEvent и спасти. Например, если ваше тело события является String:

stream.map(x => new String(x.event.getBody.array) + "!!!!") 
     .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 
Смежные вопросы