2015-01-16 4 views
1

В моей заявке, когда вы принимаете номера перфоманса, groupby много времени уходит.Spark: groupBy занимает много времени

Мой РДД имеет ниже Страницы данного раздела:

JavaPairRDD<CustomTuple, Map<String, Double>> 

CustomTuple: Этот объект содержит информацию о текущей строке в РДУ, как уже которую неделю, месяц, город и т.д.

public class CustomTuple implements Serializable{ 

private Map hierarchyMap = null; 
private Map granularMap = null; 
private String timePeriod = null; 
private String sourceKey = null; 
} 

Карта

Эта карта содержит статистические данные данные о том, что ряд, как, сколько инвестиций, сколько ВРП и т.д.

<"Inv", 20> 

<"GRP", 30> 

Я выполнял ниже DAG на этом РДУ

  1. применить фильтр по этому РДУ и объема из соответствующих строк: Фильтр
  2. применить фильтр по этому РДУ и рамок из соответствующих строк: фильтр
  3. Вступи РД: Регистрация
  4. применять фазы карты для расчета инвестиций: Map
  5. применяют фазу GroupBy для группировки данных в соответствии с желаемым представлением: GroupBy
  6. применяет фазу карты для агрегирования данных в соответствии с группировкой, достигнутой на вышеуказанном этапе (например, данные просмотра в промежуток времени), а также создавать новые объекты на основе набора результатов желательно собирать: Карта
  7. собрать результат: Collect

Таким образом, если пользователь хочет просмотреть инвестиции через периоды времени, то ниже список возвращается (это было достигнуто в шаге 4):

<timeperiod1, value> 

Когда я проверил время, проведенное в операциях, GroupBy брал 90% времени, затраченного на выполнение всей DAG.

IMO, мы можем заменить GroupBy и последующие операции с Map на единицу сокращения. Но сокращение будет работать на объект типа JavaPairRDD>. Итак, мой снимок будет подобен T reduce (T, T, T), где T будет CustomTuple, Map.

Или, может быть, после шага 3 в вышеприведённой DAG я запустил еще одну функцию карты, которая возвращает мне RDD типа для метрики, которая должна быть агрегирована, а затем запустить сокращение.

Кроме того, я не уверен, как работает агрегатная функция, и сможет ли она мне помочь в этом случае.

Во-вторых, мое приложение получит запрос на различные ключи. В моем текущем дизайне RDD каждый запрос потребует от меня переделки или перегруппировки моего RDD по этому ключу. Это означает, что для каждого запроса группировка/повторное разбиение займет 95% моего времени, чтобы вычислить задание.

<"market1", 20> 
<"market2", 30> 

Это очень обескураживает, поскольку текущая производительность приложения без искры в 10 раз лучше, чем производительность с помощью Spark.

Любое понимание оценено.

[EDIT] Мы также заметили, что JOIN занимал много времени. Возможно, именно поэтому группа взяла время. [EDIT]

TIA!

ответ

5

Документация Spark рекомендует избегать операций groupBy операций, вместо этого они предлагают combByKey или некоторые из его деривативных операций (reduceByKey или aggregateByKey). Вы должны использовать эту операцию, чтобы сделать агрегацию до и после перетасовки (на этапе «Карта» и в фазе «Уменьшить», если мы используем терминологию Hadoop), поэтому время выполнения будет улучшено (я не знаю, будет ли он 10 раз лучше, но это должно быть лучше)

Если я понимаю вашей обработку я думаю, что вы можете использовать одну операцию combineByKey объяснения следующего кода выполнен для кода лестницы, но вы можете перевести на Java код, не слишком много усилий ,

combineByKey имеют три аргумента: combineByKey [C] (createCombiner: (V) ⇒ C, mergeValue: (С, В) ⇒ С, mergeCombiners: (C, C) ⇒ C): РДД [(K, C)]

  • createCombiner: в этой операции создается новый класс для того, чтобы объединить ваши данные, чтобы вы могли агрегировать данные CustomTuple в новый класс CustomTupleCombiner (я не знаете, хотите ли вы только сделать сумму или, может быть, хотите применить сом е процесс к этим данным, но любому варианту может быть сделан в этой операции)

  • mergeValue: В этой операции вы должны описать, как CustomTuple является суммой к другому CustumTupleCombiner (опять же я предполагающий простую операцию реферирования). Например, если вы хотите суммировать данные по ключу, вы будете иметь в своем классе CustumTupleCombiner карту, поэтому операция должна быть примерно такой: CustumTupleCombiner.sum (CustomTuple), которые делают CustumTupleCombiner.Map (CustomTuple.key) -> CustomTuple.Map (CustomTuple.key) + CustumTupleCombiner.value

  • mergeCombiners: в этой операции вы должны определить, как объединить два класса объединитель, CustumTupleCombiner в моем примере. Так что это будет что-то вроде CustumTupleCombiner1.merge (CustumTupleCombiner2), что будет что-то вроде CustumTupleCombiner1.Map.keys.foreach (к -> CustumTupleCombiner1.Map (к) + CustumTupleCombiner2.Map (к)) или что-то подобное

Записанный код не подтвержден (это даже не компилируется, потому что я сделал это с помощью vim), но я думаю, что это может сработать для вашего сценария.

Надеюсь, это будет полезно

+0

Спасибо jlopezmat. Можете ли вы разработать пример из моего варианта использования. Это прояснилось бы лучше. Скажем, например, я хочу, чтобы мои значения были агрегированы с использованием некоторого поля в карте CustomTuple, скажем, на рынке. Это было бы действительно полезно. Я хочу понять, как он заменит groupby, в котором я верну рынок в качестве ключа в том же сценарии, а затем сделаю агрегат в фазе карты. ТИА! – user1441849

0

Перетасовка инициируется каких-либо изменений в тональности A [K, V], пара, или с помощью repartition() вызова. Разбиение рассчитывается на основе значения K (key). По умолчанию разбиение на разделы вычисляется с использованием значения Хэш вашего ключа, реализуемого методом hashCode(). В вашем случае ваш ключ содержит две переменные экземпляра Map.По умолчанию реализация метода hashCode() также должна будет рассчитать hashCode() этих карт, в результате чего итерация будет происходить по всем ее элементам, чтобы по очереди снова вычислить hashCode() этих элементов.

Растворы:

  1. Не включайте Map экземпляры в вашем Key. Это кажется очень необычным.
  2. Внесите и отмените свой собственный hashCode(), который избегает прохождения через переменные экземпляра Map.
  3. Возможно, вы можете полностью отказаться от использования объектов Map. Если это то, что разделяется между несколькими элементами, вам может потребоваться использовать широковещательные переменные в искровом режиме. Накладные расходы на сериализацию ваших карт во время перетасовки также могут стать большим фактором.
  4. Избегайте перетасовки, настраивая хеширование между двумя последовательными групповыми.
  5. Храните перетасованный узел локальным, выбрав Partitioner, который будет иметь сродство к тому, чтобы разделы были локальными во время последовательного использования.

Хорошее чтение на hashCode(), в том числе ссылки на цитаты Джош Блох можно найти в wiki.

Смежные вопросы