2015-12-28 2 views
3

В Spark мы используем широковещательную переменную, чтобы каждая машина имела только копию только для чтения. Обычно мы создаем переменную трансляции вне закрытия (например, таблицу поиска, необходимую для закрытия) для повышения производительности.В искры Apache, в чем разница между использованием mapPartitions и объединением использования широковещательной переменной и карты

У нас также есть оператор преобразования искры, называемый mapPartitions, который пытался достичь того же (использовать общую переменную для повышения производительности). Например, в mapPartitions мы можем совместно использовать соединение с базой данных для каждого раздела.

Так в чем же разница между этими двумя? Можем ли мы использовать это interchangebly только для общих переменных?

ответ

4

broadcast предназначен для отправки объекта каждому рабочему узлу. Этот объект будет разделяться между всеми разделами на этом узле (и объект value/i.e будет одинаковым для каждого узла в кластере). Цель вещания - экономить на сетевых расходах, когда вы используете одни и те же данные во многих разных задачах/разделах на рабочем узле.

mapPartitions, наоборот, является методом, доступным на RDD, и работает как map, только на разделах. Да, вы можете определить новые объекты, такие как соединение jdbc, которые затем будут уникальными для каждого раздела. Однако вы не можете делиться им между разными разделами и тем более между разными узлами.

+0

Я вижу, что основное отличие - это правильный уровень? широковещательная передача находится на уровне узла, но mapPartitions находится на уровне разделов. – xuanyue

+2

В некотором смысле, да. Однако использование (в частности, синтаксис) настолько отличается, что я бы смутился, чтобы провести параллель между этими двумя случаями. Обычно вы передаете существующий массив данных, но в mapPartitions вы создаете новый объект на этом уровне. Btw, широковещательная передача находится на уровне кластера, а не уровне узла. – KrisP

3

В то время как ответ, предоставленный KrisP, подчеркивает все важные отличия, я думаю, стоит отметить, что mapPartitions - это всего лишь строительный блок низкого уровня за трансформациями более высокого уровня, а не метод достижения общего состояния.

Хотя mapPartitions может использоваться, чтобы сделать общедоступное состояние явным, он технически не используется (его срок службы ограничен mapPartitions закрытием`), и есть другие способы его достижения. В частности, переменные, на которые ссылаются внутренние блоки, разделяются внутри раздела. Для того, чтобы проиллюстрировать, что позволяет играть немного с одноплодной:

object DummySharedState { 
    var i = 0L 
    def get(x: Any) = { 
    i += 1L 
    i 
    } 
} 

sc.parallelize(1 to 100, 1).map(DummySharedState.get).max 
// res3: Long = 100 
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max 
// res4: Long = 50 
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max 
// res5: Long = 2 

и подобная вещь в PySpark:

  • модуль одноточечно dummy_shared_state.py:

    i = 0 
    def get(x): 
        global i 
        i += 1 
        return i 
    
  • Основной сценарий:

    from pyspark import SparkConf, SparkContext 
    import dummy_shared_state 
    
    master = "spark://..." 
    conf = (SparkConf() 
        .setMaster(master) 
        .set("spark.python.worker.reuse", "false")) 
    
    sc.addPyFile("dummy_shared_state.py") 
    sc.parallelize(range(100), 1).map(dummy_shared_state.get).max() 
    ## 100 
    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() 
    ## 50 
    

Обратите внимание, что для параметра spark.python.worker.reuse установлено значение false.Если оставить значение по умолчанию, вы на самом деле увидеть что-то вроде этого:

sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() 
## 50 
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() 
## 100 
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() 
## 150 

В конце дня вы должны различать три разных вещи:

  • широковещательных переменные, которые предназначены для уменьшения сети трассировать объем памяти, сохраняя копию переменной на рабочем месте, а не отправляя ее с каждой задачей.
  • переменные, определенные внешним закрытием, и ссылочные внутренние блокировки, которые должны быть отправлены с каждой задачей и совместно используются для этой задачи
  • переменные, определенные внутри замыкания, которые не являются общими

Кроме того, существуют некоторые специфические ошибки Python, связанные с использованием постоянных интерпретаторов.

По-прежнему нет практической разницы между map (filter или другими преобразованиями) и mapPartitions, когда дело доходит до переменной продолжительности жизни.

+0

Это действительно полезно. Мне интересно другое различие между картами и картами. карта работает на основе одной строки, а mapPartitions имеет доступ ко всем строкам, отправленным в этот раздел через итератор. Возможно ли иметь преимущество доступа к глобальным переменным в соответствии с вашим примером выше, а также все строки, отправленные в раздел? Я предполагаю, что mapPartition не может использовать глобальные переменные сферы, как вы показали здесь. – retrocookie

+0

@retrocookie 'map' реализуется с использованием' mapPartitions', поэтому здесь нет никакой разницы. Однако я был бы осторожен. Это больше, чтобы показать, каковы последствия сохранения постоянного интерпретатора, чем что-либо еще. – zero323

+0

. Доступны ли глобальные переменные через mapPartition, совместно используемые на уровне узла, в отличие от уровня раздела? – retrocookie

Смежные вопросы