Я Сбор данных из приложения для обмена сообщениями, я в настоящее время с помощью Flume, он посылает прибл 50 миллионов записей в деньУпорно Спарк Streaming выход
Я хотел бы использовать Кафку, потреблять от Кафки с помощью искровой Streaming и сохраняется его Hadoop и запрос с импало
у меня возникли проблемы с каждым подходом я пробовал ..
подход 1 - Сохранить РДД, как паркет, указать внешнюю улей паркет таблицу в каталог паркетного
// scala
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
// 1 - Create a SchemaRDD object from the rdd and specify the schema
val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)
// 2 - register it as a spark sql table
SchemaRDD1.registerTempTable("sparktable")
// 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
val finalParquet = sqlContext.sql(sql)
finalParquet.saveAsParquetFile(dir)
Проблема в том, что finalParquet. saveAsParquetFile выдает огромный нет. файлов, Dstream, полученный от Kafka, выдает более 200 файлов для 1-минутного размера партии. Причина, по которой он выводит много файлов, заключается в том, что вычисление распределяется, как объяснено в другом сообщении how to make saveAsTextFile NOT split output into multiple file? . Репортерские решения не кажутся мне оптимальными, например. как утверждает один пользователь. Наличие одного выходного файла является хорошей идеей, если у вас очень мало данных.
Подход 2 - Использование Hivecontext. вставить РДД данных непосредственно в таблицу улья
# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)
def sendRecord(rdd):
sql = "INSERT INTO TABLE table select * from beacon_sparktable"
# 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
beaconDF = sqlContext.jsonRDD(rdd,schema)
# 2- Register the DataFrame as a spark sql table.
beaconDF.registerTempTable("beacon_sparktable")
# 3 - insert to hive directly from a qry on the spark sql table
sqlContext.sql(sql);
Это хорошо работает, он вставляет непосредственно к паркетным таблице, но есть диспетчеризация задержки для партий, как время обработки превышает интервал времени пакетной обработки. Потребитель не может идти в ногу с тем, что производится, а партии для обработки начинают стоять в очереди.
похоже, что улей медленно. ive попытался настроить размер пакета intervla, запуская больше экземпляров потребителей.
В заключении
Что является лучшим способом сохраняется большим данные Спарк Streaming, учитывая, что есть проблемы с несколькими файлами и потенциальной задержкой с письменной формой Hive? Что делают другие люди?
Аналогичный вопрос был здесь спросили, но у него есть проблемы с каталогами проставления слишком много файлов How to make Spark Streaming write its output so that Impala can read it?
Большого спасибо за любую помощь
Вы можете установить другое окно для выходного потока. 'val lines = KafkaUtils.createStream (ssc, zkQuorum, group, topicMap) .map (_._ 2) .window (Minutes (15)). foreachRDD (rdd =>' – ssedano
это мне кажется очень распространенным прецедентом , Я удивлен, что никто не ответил на это.Думаю, я бы предложил использовать базу данных, так как Spark сама по себе не может ее заменить. Попробуйте Cassandra или HBase (очень крутая кривая обучения для HBase). – avloss