2015-10-26 1 views
1

Я передаю данные из Kafka с помощью пакетной потоковой передачи (maxRatePerPartition 10.000). Поэтому в каждой партии я обрабатываю 10.000 сообщений кафки.Spark пакетная передача kafka в один файл

В рамках этого пакетного запуска обрабатываю каждое сообщение, создавая DataFrame из rdd. После обработки я сохраняю каждую обработанную запись в том же файле, используя: dataFrame.write.mode (SaveMode.append). Таким образом, он добавляет все сообщения в один и тот же файл.

Это нормально, пока он работает в рамках одного пакетного запуска. Но после выполнения следующего пакетного запуска (обрабатывается следующее 10.000 сообщений), он создает новый файл для следующих 10.000 сообщений.

Проблема в следующем: каждый файл (блок) резервирует 50 МБ файловой системы, но содержит только около 1 мб (10.000 сообщений). Вместо того, чтобы создавать новые файлы для каждого запуска, я предпочел бы, чтобы все это было добавлено к одному файлу, если оно не превышает 50 МБ.

Вы знаете, как это сделать или почему это не работает в моем примере? Вы можете посмотреть мою кодировку здесь:

import kafka.serializer.{DefaultDecoder, StringDecoder} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.{SQLContext, SaveMode} 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.streaming.{Seconds, StreamingContext, Time} 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.immutable.Set 


object SparkStreaming extends Constants { 


    def main(args: Array[String]) { 

//create a new Spark configuration... 
val conf = new SparkConf() 
    .setMaster("local[2]") // ...using 2 cores 
    .setAppName("Streaming") 
    .set("spark.streaming.kafka.maxRatePerPartition", "10000") //... processing max. 10000 messages per second 

//create a streaming context for micro batch 
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.) 

//Setup up Kafka DStream 
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667", 
    "auto.offset.reset" -> "smallest") //Start from the beginning 
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET) 

val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, 
    kafkaParams, kafkaTopics) 

val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source)) 


records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => { 
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton 
    import sqlContext.implicits._ 

    val dataFrame = rdd.toDF() 

    dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL) 
    println(s"Written entries: ${dataFrame.count()}") 
} 
) 


//start streaming until the process is killed 
ssc.start() 
ssc.awaitTermination() 

    } 


    /** Case class for converting RDD to DataFrame */ 
    case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double) 


    /** Lazily instantiated singleton instance of SQLContext */ 
    object SQLContextSingleton { 

@transient private var instance: SQLContext = _ 

def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
    instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
    } 

} 

Я был бы рад получить ваши мысли об этом. Спасибо, Alex

ответ

1

Это можно сделать, используя функцию coalesce, а затем перезаписав существующий файл.

Но как обсуждалось в потоке Spark coalesce looses file when program is aborted, это связано с ошибками при прерывании программы.

Так что пока этого недостаточно, чтобы реализовать такую ​​логику.