2016-07-20 2 views
15

Я хочу переписать определенные разделы вместо всех в искровом режиме. Я пытаюсь следующая команда:Перезаписать конкретные разделы в методе записи данных с помощью свечей.

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4') 

где ФР dataframe имея дополнительные данные, которые будут перезаписаны.

hdfs-base-path содержит основные данные.

Когда я пытаюсь выполнить приведенную выше команду, она удаляет все разделы и вставляет те, которые присутствуют в df по пути hdfs.

В чем заключается мое требование переписать только те разделы, которые присутствуют в df по указанному пути hdfs. Может кто-нибудь, пожалуйста, помогите мне в этом?

ответ

13

Это распространенная проблема. Единственное решение с искрой до 2,0, чтобы написать непосредственно в каталоге раздела, например,

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value") 

Если вы используете Спарк до 2.0, вам необходимо остановить Огонек из излучающих файлы метаданных (потому что они будут сломать автоматическое обнаружение разделов) с помощью:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

Если вы используете Спарк до 1.6.2, вам также необходимо удалить файл _SUCCESS в /root/path/to/data/partition_col=value или ее присутствие нарушит автоматическое обнаружение разделов. (Я настоятельно рекомендую использовать 1.6.2 или новее.)

Вы можете получить еще несколько сведений о том, как управлять большими секционированными таблицами из моей беседы Spark Summit на Bulletproof Jobs.

+0

Большое спасибо Sim для ответа , Несколько сомнений больше, если предположить, что исходный dataframe имеет данные для около 100 разделов, тогда мне нужно разбить этот фрейм данных на еще 100 фреймов данных с соответствующим значением раздела и вставить непосредственно в каталог разделов. Можно ли сохранить эти 100 разделов параллельно? Также я использую Spark 1.6.1 Если я использую формат файла orc, как я могу прекратить выдавать файлы метаданных для этого, это то же самое, что вы упомянули для паркета? – yatin

+0

Re: metadata, no, ORC - это другой формат, и я не думаю, что он создает файлы без данных. С 1.6.1 вам нужны только файлы ORC в подкаталогах дерева разделов. Поэтому вам придется удалить '_SUCCESS' вручную. Вы можете писать параллельно нескольким разделам, но не с одной и той же работы. Запуск нескольких заданий на основе возможностей вашей платформы, например, с использованием REST API. – Sim

+3

Любое обновление об этом? Сохраняет ли saveToTable() только отдельные разделы? Является ли искра достаточно умной, чтобы выяснить, какие разделы были переписаны? –

4

Использование Спарк 1.6 ...

HiveContext может упростить этот процесс значительно. Ключ состоит в том, что вы должны сначала создать таблицу в Hive, используя оператор CREATE EXTERNAL TABLE с определенным разделением. Например:

# Hive SQL 
CREATE EXTERNAL TABLE test 
(name STRING) 
PARTITIONED BY 
(age INT) 
STORED AS PARQUET 
LOCATION 'hdfs:///tmp/tables/test' 

здесь, скажем, у вас есть Dataframe с новыми записями в ней для конкретного раздела (или несколько разделов). Вы можете использовать оператор HiveContext SQL для выполнения INSERT OVERWRITE с помощью этого Dataframe, который перепишет таблицу только для разделов, содержащихся в Dataframe:

# PySpark 
hiveContext = HiveContext(sc) 
update_dataframe.registerTempTable('update_dataframe') 

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age) 
        SELECT name, age 
        FROM update_dataframe""") 

Примечание: update_dataframe в этом примере имеет схему, которая соответствует таковому цель test таблица.

Простой способ сделать этот подход - пропустить шаг CREATE EXTERNAL TABLE в Hive и просто сделать таблицу с использованием методов записи API Dataframe. В частности, для таблиц на основе Паркета таблица не будет определена надлежащим образом для поддержки функции INSERT OVERWRITE... PARTITION Hive.

Надеюсь, это поможет.

+0

Я пробовал описанный выше подход, я получаю ошибку, например '' Dynamic partition strict mode требует хотя бы одного статического столбца раздела. Чтобы отключить это, установите hive.exec.dynamic.partition.mode = nonstrict' – Shankar

+0

У меня нет статических столбцов раздела – Shankar

0

Если вы используете DataFrame, возможно, вы хотите использовать таблицу Hive над данными. В этом случае вам нужно просто вызвать метод

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name) 

Это будет перезаписывать разделы, которые содержат DataFrame.

Нет необходимости указывать формат (orc), так как Spark будет использовать формат таблицы Hive.

Он отлично работает в Спарк версии 1.6

0

Вы могли бы сделать что-то вроде этого, чтобы сделать работу реентерабельные (идемпотентную): (пробовал это на искру 2.2)

# drop the partition 
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition) 
print drop_query 
spark.sql(drop_query) 

# delete directory 
dbutils.fs.rm(<partition_directoy>,recurse=True) 

# Load the partition 
df.write\ 
    .partitionBy("partition_col")\ 
    .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>) 
Смежные вопросы