операции РДД, что изменение переменных за пределами их сферы может быть частым источником путаницы ...
Основная задача состоит в том, что поведение выше код не определен. В локальном режиме с одним JVM приведенный выше код суммирует значения в RDD и сохраняет их в счетчике. Это связано с тем, что и RDD, и счетчик переменных находятся в одном и том же пространстве памяти на узле драйвера.
Однако в режиме кластера, что происходит, сложнее, и вышеуказанное может работать не так, как предполагалось. Для выполнения заданий Spark разбивает обработку операций RDD на задачи - каждый из которых управляется исполнителем. Перед выполнением Spark вычисляет замыкание. Закрытие - это те переменные и методы, которые должны быть видны исполнителю для выполнения его вычислений на RDD (в данном случае foreach()). Это закрытие сериализуется и отправляется каждому исполнителю. В локальном режиме есть только один исполнитель, поэтому все имеет одинаковое закрытие. Однако в других режимах это не так, и у исполнителей, работающих на отдельных рабочих узлах, есть своя копия закрытия.
Здесь происходит то, что переменные в закрытии, отправленные каждому исполнителю, теперь являются копиями, и поэтому, когда счетчик ссылается в пределах функции foreach, он больше не является счетчиком на узле драйвера. В памяти узла драйвера все еще есть счетчик, но это больше не видно исполнителям! Исполнители видят только копию из сериализованного закрытия. Таким образом, окончательное значение счетчика будет по-прежнему равно нулю, поскольку все операции на счетчике ссылались на значение в сериализованном закрытии.
Для обеспечения четкого поведения в подобных сценариях необходимо использовать Аккумулятор. Аккумуляторы Spark используются специально для обеспечения механизма безопасного обновления переменной, когда выполнение разбивается на рабочие узлы в кластере. Раздел «Аккумуляторы» этого руководства более подробно обсуждает их.
В общем, замыкания - конструкции, подобные циклам или локально определенные методы, не должны использоваться для изменения какого-либо глобального состояния. Spark не определяет или не гарантирует поведение мутаций для объектов, на которые ссылаются снаружи закрытий. Некоторый код, который делает это, может работать в локальном режиме, но это случайно, и такой код не будет вести себя так, как ожидалось, в распределенном режиме. Вместо этого используйте Аккумулятор, если требуется какая-то глобальная агрегация.
Просьба представить минимальный воспроизводимый пример, чтобы мы могли помочь! Что это связано с искрами? – eliasah