2015-10-01 3 views
7

Я Сбор данных из приложения для обмена сообщениями, я в настоящее время с помощью Flume, он посылает прибл 50 миллионов записей в деньУпорно Спарк Streaming выход

Я хотел бы использовать Кафку, потреблять от Кафки с помощью искровой Streaming и сохраняется его Hadoop и запрос с импало

у меня возникли проблемы с каждым подходом я пробовал ..

подход 1 - Сохранить РДД, как паркет, указать внешнюю улей паркет таблицу в каталог паркетного

// scala 
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt)) 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
lines.foreachRDD(rdd => { 

    // 1 - Create a SchemaRDD object from the rdd and specify the schema 
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema) 

    // 2 - register it as a spark sql table 
    SchemaRDD1.registerTempTable("sparktable") 

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files 
    val finalParquet = sqlContext.sql(sql) 
    finalParquet.saveAsParquetFile(dir) 

Проблема в том, что finalParquet. saveAsParquetFile выдает огромный нет. файлов, Dstream, полученный от Kafka, выдает более 200 файлов для 1-минутного размера партии. Причина, по которой он выводит много файлов, заключается в том, что вычисление распределяется, как объяснено в другом сообщении how to make saveAsTextFile NOT split output into multiple file? . Репортерские решения не кажутся мне оптимальными, например. как утверждает один пользователь. Наличие одного выходного файла является хорошей идеей, если у вас очень мало данных.

Подход 2 - Использование Hivecontext. вставить РДД данных непосредственно в таблицу улья

# python 
sqlContext = HiveContext(sc) 
ssc = StreamingContext(sc, int(batch_interval)) 
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1}) 
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER) 
lines.foreachRDD(sendRecord) 

def sendRecord(rdd): 

    sql = "INSERT INTO TABLE table select * from beacon_sparktable" 

    # 1 - Apply the schema to the RDD creating a data frame 'beaconDF' 
    beaconDF = sqlContext.jsonRDD(rdd,schema) 

    # 2- Register the DataFrame as a spark sql table. 
    beaconDF.registerTempTable("beacon_sparktable") 

    # 3 - insert to hive directly from a qry on the spark sql table 
    sqlContext.sql(sql); 

Это хорошо работает, он вставляет непосредственно к паркетным таблице, но есть диспетчеризация задержки для партий, как время обработки превышает интервал времени пакетной обработки. Потребитель не может идти в ногу с тем, что производится, а партии для обработки начинают стоять в очереди.

похоже, что улей медленно. ive попытался настроить размер пакета intervla, запуская больше экземпляров потребителей.

В заключении

Что является лучшим способом сохраняется большим данные Спарк Streaming, учитывая, что есть проблемы с несколькими файлами и потенциальной задержкой с письменной формой Hive? Что делают другие люди?

Аналогичный вопрос был здесь спросили, но у него есть проблемы с каталогами проставления слишком много файлов How to make Spark Streaming write its output so that Impala can read it?

Большого спасибо за любую помощь

+0

Вы можете установить другое окно для выходного потока. 'val lines = KafkaUtils.createStream (ssc, zkQuorum, group, topicMap) .map (_._ 2) .window (Minutes (15)). foreachRDD (rdd =>' – ssedano

+0

это мне кажется очень распространенным прецедентом , Я удивлен, что никто не ответил на это.Думаю, я бы предложил использовать базу данных, так как Spark сама по себе не может ее заменить. Попробуйте Cassandra или HBase (очень крутая кривая обучения для HBase). – avloss

ответ

0

в растворе № 2, количество файлов, созданного может управляться через количество разделов каждого RDD.

Смотрите этот пример:

// create a Hive table (assume it's already existing) 
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET") 

// create a RDD with 2 records and only 1 partition 
val rdd = sc.parallelize(List(List(1, "hello"), List(2, "world")), 1) 

// create a DataFrame from the RDD 
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false), 
StructField("txt", StringType, nullable = false) 
)) 
val df = sqlContext.createDataFrame(rdd.map(Row(_:_*)), schema) 

// this creates a single file, because the RDD has 1 partition 
df.write.mode("append").saveAsTable("test") 

Теперь, я думаю, вы можете играть с частотой, на которой вы тянете данные от Кафки, и числом разделов каждого RDD (по умолчанию, разделы вашего Кафке темы , что вы можете уменьшить путем переделки).

Я использую Spark 1.5 из CDH 5.5.1, и получаю тот же результат, используя либо df.write.mode("append").saveAsTable("test"), либо вашу строку SQL.