2017-01-16 2 views
0

У меня есть проблема с производительностью с кодом, который я редактирую, каждый раз будет давать OOM при выполнении счета. Я думаю, что я нашел проблему, в основном после keyBy tranformation, исполняемый aggregateByKey. Проблема заключается в том, что почти 98% элементов RDD имеют один и тот же ключ, поэтому aggregationByKey генерирует shuffle, помещает почти все записи в один и тот же перегородка, нижняя строка: всего несколько исполнителей работает, и для этого требуется много давления памяти.избежать перегородки дисбаланс Spark

Это код:

val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies 
    .keyBy(po => po.getProcessCreator.name) 
    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_) 
    .map {case(name,list) => 
     val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID)) 
     val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))} 
     lastOfGroupByKeys.flatMap(f => f._2) 
    } 
    .flatMap(f => f) 

log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count) 

Я бы способ выполнить операцию в более параллельно, что позволяет всем исполнителям работать почти одинаково. Как я могу это сделать?

Должен ли я использовать пользовательский разделитель?

+0

* «Проблема заключается в том, что почти 98% элементов RDD имеет один и тот же ключ» * Есть ли причина, так много элементы имеют один и тот же ключ? Это бизнес-требование? –

+1

На самом деле я не знаю, у меня нет функциональных знаний, я просто пытаюсь найти узкое место производительности. Я должен подумать, что они думали об этом, и разделение правильное. – Giorgio

+0

Возможно, если бы генерация ключей была лучше и идеально одинаковой, у вас не было бы проблем, когда один раздел был бы таким большим. –

ответ

1

Если наблюдение верно и

98% элементов RDD имеет тот же ключ

тогда изменение секционирования не поможет вам на всех. По определению разделителя 98% данных должны обрабатываться одним исполнителем.

К счастью, плохой код, вероятно, представляет большую проблему, чем перекос. Пропустив через:

.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_) 

который просто народная магия, похоже, весь трубопровод можно переписать в виде простого reuceByKey. Псевдокод:

  • Объединить name и локальные ключи в одном ключе:

    def key(po: AnomalyPO) = (
        // "major" key 
        po.getProcessCreator.name, 
        // "minor" key 
        po.getPodId, po.getAnomalyCode, 
        po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID 
    ) 
    

    ключ, содержащий имя, дата и дополнительные поля должны иметь гораздо более высокую мощность, чем только имя.

  • Карта пар и сократить ключ:

    rddAnomalies 
        .map(po => (key(po), po)) 
        .reduceByKey((x, y) => 
        if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y 
    ) 
    
Смежные вопросы