2015-07-22 2 views
2

Я пытаюсь разделить поток Spark на основе разделителя и сохранить каждый из этих фрагментов в новый файл.Разделение искрового потока по разделителю

Кажется, что каждый из моих RDD разбит по разделителю.

У меня возникли трудности с настройкой одного сообщения разделителя на RDD или, если вы можете сохранить каждый раздел отдельно в новый файл part-000....

Любая помощь будет высоко оценена. Благодаря

val sparkConf = new SparkConf().setAppName("DataSink").setMaster("local[8]").set("spark.files.overwrite","false") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 

class RouteConsumer extends Actor with ActorHelper with Consumer { 
    def endpointUri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q" 
    def receive = { 
     case msg: CamelMessage => 
      val m = msg.withBodyAs[String] 
      store(m.body) 
    } 
} 

val dstream = ssc.actorStream[String](Props(new RouteConsumer()), "SparkReceiverActor") 
val splitStream = dstream.flatMap(_.split("MSH|^~\\&")) 
splitStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///home/user/spark/data")) 

ssc.start() 
ssc.awaitTermination() 

ответ

2

Вы не можете контролировать part-NNNNN файл (раздел) получает какой выход, но вы можете написать в разных каталогах. «Простой» способ сделать этот вид колонки расщепления с отдельными операторами карты (как SELECT заявления), что-то вроде этого, если вы будете иметь n элементов массива после расщепления:

... val dstream2 = dstream.map(_.split("...")) // like above, but with map dstream2.cache() // very important for what follows, repeated reads of this... val dstreams = new Array[DStream[String]](n) for (i <- 0 to n-1) { dstreams[i] = dstream2.map(array => array[i] /* or similar */) dstreams[i].saveAsTextFiles(rootDir+"/"+i) } ssc.start() ssc.awaitTermination()

+0

Спасибо за ответ , Я не уверен, что понимаю, что сделает карта в первой строке цикла for. Не могли бы вы объяснить? Спасибо. – CatsLoveJazz

+0

Извините, я перешел на большую часть вашего примера. Это строка 'val splitStream = dstream.flatMap (_. Split (" MSH |^~ \\ & "))'. Вместо того, чтобы возвращать отдельную запись для каждой полученной подстроки, она возвращает весь массив для каждой записи как новую запись, так что последующие вызовы 'map' просто проецируют нужные элементы массива. –

+0

Спасибо. Не могли бы вы обновить свой ответ, поскольку я все еще стараюсь понять – CatsLoveJazz