2016-06-08 4 views
2

У меня есть код, как следующее:Спарк РДД: несколько reducebykey или только один раз

// make a rd according to an id 
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... } 
val data:RDD[(VertexId, Double)] = ... // loading from hdfs 
val idList = (1 to 100) 
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_) 
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_)) 

RST1 и RST2 получить результат выборки. Я думал, что rst1 требует больше памяти (100 раз), но только один reduceByKey tranform; однако rst2 требует меньше памяти, но более сокращаетByKey tranforms (99 раз). Итак, это игра времени и пространства?

Мой вопрос: правильно ли мой анализ выше, или Spark treat переводить действия таким же образом внутри?

P.S .: rst1 union all sub rdd then reduceByKey, which reduceByKey снаружи уменьшить. rst2 reduceByKey один за другим, который уменьшаетByKey внутри уменьшить.

+1

Я не уверен, что я понимаю ваш вопрос. rst1 и rst2 имеют один и тот же код, но один использует заполнители для сокращения, а другой - нет. – eliasah

+0

rst1 union all sub rdd then reduceByKey, which reduceByKey ** вне ** уменьшить. rst2 reduceByKey один за другим, которые уменьшаютByKey ** внутри ** уменьшают. – bourneli

+0

Извините, я думал, что reduceByKey в rst2 тоже снаружи. – eliasah

ответ

3

Короткий длинный рассказ обоих решений относительно неэффективен, но второй из них хуже, чем первый.

Начнем с ответа на последний вопрос. Для низкого уровня RDD API существует только два типа глобальных автоматических оптимизаций (вместо):

  • с использованием явно или неявно кэшированные задачами результатов вместо полного пересчета родословной
  • Объединения нескольких преобразований, которые не требуют перетасовать в одиночный ShuffleMapStage

Все остальное в значительной степени является последовательным преобразованием, которое определяет DAG. Это противоречит более ограничительному, высокоуровневому API (DataFrame), который делает конкретные предположения об изменениях и выполняет глобальную оптимизацию плана выполнения.

Что касается вашего кода. Самая большая проблема с первым решением - растущая линия, когда вы применяете итеративный union. Это делает некоторые вещи, такие как восстановление отказа дорогостоящим, и поскольку RDD определены рекурсивно, может завершиться с ошибкой StackOverflow. Менее серьезным побочным эффектом является растущее число разделов, которое, по-видимому, не компенсируется в последующей редукции *. Вы найдете более подробное объяснение в моем ответе на Stackoverflow due to long RDD Lineage но то, что вам действительно нужно здесь есть один union так:

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_) 

Это на самом деле является оптимальным решением при условии, вы применяете действительно уменьшая функцию.

Второе решение, очевидно, испытывает одну и ту же проблему, тем не менее ухудшается. В то время как первый подход требует только двух этапов с одним перетасовкой, для этого требуется перетасовка для каждого RDD. Поскольку число разделов растет, и вы используете по умолчанию HashPartitioner, каждая часть данных должна быть записана на диск несколько раз и, скорее всего, перетасована по сети несколько раз. Игнорируя вычисления низкого уровня, каждая запись перетасовывается O (N) раз, где N - это количество RDD, с которыми вы сливаетесь.

Что касается использования памяти, то это не очевидно, не зная больше о распределении данных, но в худшем случае второй метод может выражать значительно худшее поведение.

Если + работает с постоянным пространством, единственным требованием для сокращения является хэш-карта для хранения результатов объединения стороны карты.Поскольку разделы обрабатываются как поток данных без чтения полного содержимого в память, это означает, что общий объем памяти для каждой задачи будет пропорционален количеству уникальных ключей, а не количеству данных. Поскольку для второго метода требуется больше задач, общее использование памяти будет выше, чем в первом случае. В среднем это может быть немного лучше из-за частичной организации данных, но вряд ли компенсирует дополнительные расходы.


* Если вы хотите узнать, как это может повлиять на общую производительность вы можете увидеть Spark iteration time increasing exponentially when using join Это немного другая проблема, но должна дать вам некоторое представление о том, почему контрольный ряд вопросов разделов.

+2

sc.union явно не улучшает работу. Тем не менее, воодушевленные общими стратегиями оптимизации: уменьшите родословную и shullfe, я сохранил средний результат, тщательно использовал ** persist ** и ** checkpoint ** и удалил последнее сокращениеByKey. Производительность значительно улучшилась. Спасибо @ zero323. А также, спасибо за eliasah, за ваши примеры. – bourneli

Смежные вопросы