В то время как ответ, предоставленный KrisP, подчеркивает все важные отличия, я думаю, стоит отметить, что mapPartitions
- это всего лишь строительный блок низкого уровня за трансформациями более высокого уровня, а не метод достижения общего состояния.
Хотя mapPartitions
может использоваться, чтобы сделать общедоступное состояние явным, он технически не используется (его срок службы ограничен mapPartitions
закрытием`), и есть другие способы его достижения. В частности, переменные, на которые ссылаются внутренние блоки, разделяются внутри раздела. Для того, чтобы проиллюстрировать, что позволяет играть немного с одноплодной:
object DummySharedState {
var i = 0L
def get(x: Any) = {
i += 1L
i
}
}
sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
// res3: Long = 100
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
// res4: Long = 50
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
// res5: Long = 2
и подобная вещь в PySpark:
модуль одноточечно dummy_shared_state.py
:
i = 0
def get(x):
global i
i += 1
return i
Основной сценарий:
from pyspark import SparkConf, SparkContext
import dummy_shared_state
master = "spark://..."
conf = (SparkConf()
.setMaster(master)
.set("spark.python.worker.reuse", "false"))
sc.addPyFile("dummy_shared_state.py")
sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
Обратите внимание, что для параметра spark.python.worker.reuse
установлено значение false.Если оставить значение по умолчанию, вы на самом деле увидеть что-то вроде этого:
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 150
В конце дня вы должны различать три разных вещи:
- широковещательных переменные, которые предназначены для уменьшения сети трассировать объем памяти, сохраняя копию переменной на рабочем месте, а не отправляя ее с каждой задачей.
- переменные, определенные внешним закрытием, и ссылочные внутренние блокировки, которые должны быть отправлены с каждой задачей и совместно используются для этой задачи
- переменные, определенные внутри замыкания, которые не являются общими
Кроме того, существуют некоторые специфические ошибки Python, связанные с использованием постоянных интерпретаторов.
По-прежнему нет практической разницы между map
(filter
или другими преобразованиями) и mapPartitions
, когда дело доходит до переменной продолжительности жизни.
Я вижу, что основное отличие - это правильный уровень? широковещательная передача находится на уровне узла, но mapPartitions находится на уровне разделов. – xuanyue
В некотором смысле, да. Однако использование (в частности, синтаксис) настолько отличается, что я бы смутился, чтобы провести параллель между этими двумя случаями. Обычно вы передаете существующий массив данных, но в mapPartitions вы создаете новый объект на этом уровне. Btw, широковещательная передача находится на уровне кластера, а не уровне узла. – KrisP