2015-10-12 4 views
1

Я новичок в scala & Функциональное программирование. У меня следующий искровой фрагмент кода:Можем ли мы использовать внешний объект карты в функции spark.map

case class SPR(symbol:String, splitOrg:Double, splitAdj:Double, timeStamp: String, unx_tt: Int) 

var oldFct = 15.0 
val splitMap = collection.mutable.Map[String, Double]() 

val tmp = splitsData.map{ row=> 
    var newFct = 1.0; 
    var sym = row(0).toString; 
    oldFct = splitMap.getOrElse(sym, 1.0) 
    newFct = row(12).toString.toDouble * oldFct 
    splitMap += (sym->newFct) 
    SPR(row(0).toString, row(12).toString.toDouble, newFct, row(10).toString, row(13).toString.toInt) 
}.collect() 

println("MAP ===========" + splitMap.size) 

По моим наблюдениям, я могу использовать примитивный тип данных внутри блока, но в случае объекта на карте, я всегда получаю размер как 0. Таким образом, кажется, нет ключа, значение пара добавлено.

Заранее спасибо.

+0

Просьба представить минимальный воспроизводимый пример, чтобы мы могли помочь! Что это связано с искрами? – eliasah

ответ

2

Прочитать Understanding closures в документации Spark. Большинство соответствующих частей (просто заменить counter с splitMap):

операции РДД, что изменение переменных за пределами их сферы может быть частым источником путаницы ...

Основная задача состоит в том, что поведение выше код не определен. В локальном режиме с одним JVM приведенный выше код суммирует значения в RDD и сохраняет их в счетчике. Это связано с тем, что и RDD, и счетчик переменных находятся в одном и том же пространстве памяти на узле драйвера.

Однако в режиме кластера, что происходит, сложнее, и вышеуказанное может работать не так, как предполагалось. Для выполнения заданий Spark разбивает обработку операций RDD на задачи - каждый из которых управляется исполнителем. Перед выполнением Spark вычисляет замыкание. Закрытие - это те переменные и методы, которые должны быть видны исполнителю для выполнения его вычислений на RDD (в данном случае foreach()). Это закрытие сериализуется и отправляется каждому исполнителю. В локальном режиме есть только один исполнитель, поэтому все имеет одинаковое закрытие. Однако в других режимах это не так, и у исполнителей, работающих на отдельных рабочих узлах, есть своя копия закрытия.

Здесь происходит то, что переменные в закрытии, отправленные каждому исполнителю, теперь являются копиями, и поэтому, когда счетчик ссылается в пределах функции foreach, он больше не является счетчиком на узле драйвера. В памяти узла драйвера все еще есть счетчик, но это больше не видно исполнителям! Исполнители видят только копию из сериализованного закрытия. Таким образом, окончательное значение счетчика будет по-прежнему равно нулю, поскольку все операции на счетчике ссылались на значение в сериализованном закрытии.

Для обеспечения четкого поведения в подобных сценариях необходимо использовать Аккумулятор. Аккумуляторы Spark используются специально для обеспечения механизма безопасного обновления переменной, когда выполнение разбивается на рабочие узлы в кластере. Раздел «Аккумуляторы» этого руководства более подробно обсуждает их.

В общем, замыкания - конструкции, подобные циклам или локально определенные методы, не должны использоваться для изменения какого-либо глобального состояния. Spark не определяет или не гарантирует поведение мутаций для объектов, на которые ссылаются снаружи закрытий. Некоторый код, который делает это, может работать в локальном режиме, но это случайно, и такой код не будет вести себя так, как ожидалось, в распределенном режиме. Вместо этого используйте Аккумулятор, если требуется какая-то глобальная агрегация.

Смежные вопросы