2016-06-02 3 views
1

Я пытаюсь обработать набор данных, который составляет приблизительно 2 тб, используя кластер с 4,5 тб оперативной памяти. Данные находятся в паркетном формате и изначально загружаются в фреймворк данных. Затем подмножество данных запрашивается и преобразуется в RDD для более сложной обработки. Первым этапом этой обработки является mapToPair для использования каждого идентификатора строк в качестве ключа в кортеже. Затем данные проходят через операцию combByKey для группировки всех значений с одним и тем же ключом. Эта операция всегда превышает максимальную память кластера, и работа в конечном итоге терпит неудачу. Хотя это перетасовка, есть много сообщений «проливать память на диск». Мне интересно, должны ли я сначала разбивать данные таким образом, чтобы все строки с одним и тем же идентификатором находились в одном и том же разделе, если ему нужно было бы перетасовать и выполнить правильно.Оптимизация Spark combinationByKey

Для начальной загрузки я использую:

sqlContext.read().parquet(inputPathArray).repartition(10000, new Column("id")); 

Я не уверен, если это правильный способ разбить dataframe так, что это мой первый вопрос выше правильно.

Мой следующий вопрос в том, что, когда я иду от dataframe к РДД с помощью:

JavaRDD<LocationRecord> locationsForSpecificKey = sqlc.sql("SELECT * FROM standardlocationrecords WHERE customerID = " + customerID + " AND partnerAppID = " + partnerAppID) 
        .toJavaRDD().map(new LocationRecordFromRow()::apply); 

является схема разделов из dataframe сохранившейся или мне нужно разметить после выполнения mapToPair с помощью:

rdd.partitionBy и передача в пользовательский HashPartitioner, который использует хэш поля ID.

Моя цель состоит в том, чтобы уменьшить перетасовку при выполнении окончательного combByKey, чтобы не допустить нехватки памяти и сбоя в работе. Любая помощь будет принята с благодарностью.

Спасибо, Натан

ответ

1

Я не уверен, что это правильный способ разбить dataframe

Это выглядит примерно правильно.

является схема разделов из dataframe сохранились

Распределение данных должны быть сохранены, что можно легко проверить, посмотрев на debugString:

val df = sqlContext.read.parquet("/tmp/foo").repartition(10000, $"id") 

df.rdd.toDebugString 
// String = 
// (10000) MapPartitionsRDD[46] at rdd at <console>:26 [] 
// | ShuffledRowRDD[45] at rdd at <console>:26 [] 
// +-(8) MapPartitionsRDD[44] at rdd at <console>:26 [] 
//  | $anon$1[43] at [] 

но нет никакого набора разметки для выход RDD:

df.rdd.partitioner 
// Option[org.apache.spark.Partitioner] = None 

, поэтому эту информацию нельзя использовать для оптимизации последующей агрегации.

Моя цель состоит в том, чтобы уменьшить перетасовки

Если это так, это не выглядит как правильный подход. Предполагая, что функция mergeValue, переданная в combineByKey, представляет собой операцию уменьшения, которую вы фактически перетасовываете больше, чем напрямую с помощью combineByKey. Если это не так, то применение combineByKey с mapSideCombine, установленным в false, вероятно, является лучшим выбором.

В зависимости от логики комбинирования вы также должны рассмотреть возможность выполнения агрегации непосредственно на DataFrame.

+0

Функция слияния просто добавляет значения в список, поэтому в конце у меня есть сопоставление клавиш со списком значений. Какая функция была бы наиболее подходящей в этом случае? Как вы примените комбинацию по ключу напрямую? –

+0

ОК, так это просто другая группаByKey? – zero323

+0

Если да, см. Обсуждение ниже http://stackoverflow.com/a/37580350/1560062 и http://stackoverflow.com/q/37189802/1560062 – zero323

Смежные вопросы