2015-08-22 3 views
6

У меня есть искра (для 1.4.1), получающая поток событий кафки. Я хотел бы сохранить их непрерывно, как паркет на тахионе.Отключить резюме метаданных паркета в Искра

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) => 
    if (rdd.count() > 0) { 
    val mil = time.floor(Duration(86400000)).milliseconds 
    hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil") 
    hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS (path 'tachyon://192.168.1.12:19998/persisted5$mil')") 
    } 
} 

однако я вижу, что с течением времени, на каждой паркетной записи, искра проходит через каждую 1 секунду части паркетных, которые получают все медленнее и медленнее

15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536) 

я пришел к выводу, что это из-за обновления сводных данных, я считаю, что искры не используют их. поэтому я хотел бы отключить его

parquet sources показывает, что я должен установить false для паркета.enable.summary-metadata.

сейчас, я попытался установить его, как это, сразу же после создания hiveContext

hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false) 
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10) 

, но без успеха, я все еще получаю журналы, показывающий параллельность 5 (по умолчанию).

Каков правильный способ отключения сводных данных в искре с паркетом?

ответ

9

установка «паркет.enable.summary-metadata» как текст («false», а не false), похоже, сработает для нас.

Кстати Спарк действительно использует файл _common_metadata (копируем, что над вручную для повторяющихся заданий)

+0

Если я отключу shemaMerging, используются ли метаданные? похоже, работает нормально с beeline. – Pierre

+0

в Spark 1.3 нет и в 1.4.1 он по-прежнему ищет _common_metadata (возможно, что ошибка) –

+0

Кажется, что откат для чтения нижнего колонтитула детали, когда я обновляюсь, если метаданные отсутствуют. Но работает (хотя и медленно, хотя, по сути, переместил проблему из вставки в обновление) – Pierre

7

Спарк 2.0 не сохраняет сводки метаданных по умолчанию больше, см SPARK-15719.

Если вы работаете с данными, размещенными на S3, то вы можете найти may все еще найти производительность паркета, поразив сам паркет, пытаясь отсканировать хвост всех объектов, чтобы проверить их схемы. Это может быть отключено явно

sparkConf.set("spark.sql.parquet.mergeSchema", "false") 
Смежные вопросы