Я пытаюсь разделить поток Spark на основе разделителя и сохранить каждый из этих фрагментов в новый файл.Разделение искрового потока по разделителю
Кажется, что каждый из моих RDD разбит по разделителю.
У меня возникли трудности с настройкой одного сообщения разделителя на RDD или, если вы можете сохранить каждый раздел отдельно в новый файл part-000...
.
Любая помощь будет высоко оценена. Благодаря
val sparkConf = new SparkConf().setAppName("DataSink").setMaster("local[8]").set("spark.files.overwrite","false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
class RouteConsumer extends Actor with ActorHelper with Consumer {
def endpointUri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q"
def receive = {
case msg: CamelMessage =>
val m = msg.withBodyAs[String]
store(m.body)
}
}
val dstream = ssc.actorStream[String](Props(new RouteConsumer()), "SparkReceiverActor")
val splitStream = dstream.flatMap(_.split("MSH|^~\\&"))
splitStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///home/user/spark/data"))
ssc.start()
ssc.awaitTermination()
Спасибо за ответ , Я не уверен, что понимаю, что сделает карта в первой строке цикла for. Не могли бы вы объяснить? Спасибо. – CatsLoveJazz
Извините, я перешел на большую часть вашего примера. Это строка 'val splitStream = dstream.flatMap (_. Split (" MSH |^~ \\ & "))'. Вместо того, чтобы возвращать отдельную запись для каждой полученной подстроки, она возвращает весь массив для каждой записи как новую запись, так что последующие вызовы 'map' просто проецируют нужные элементы массива. –
Спасибо. Не могли бы вы обновить свой ответ, поскольку я все еще стараюсь понять – CatsLoveJazz