2016-04-25 4 views
3

Все примеры, которые я видел для переменных Spark broadcast, определяют их в рамках используемых ими функций (, join() и т. Д.). Я хотел бы использовать как функцию map(), так и функцию mapPartitions(), которая ссылается на широковещательную переменную, но я хотел бы их модулировать, поэтому я могу использовать те же функции для модульного тестирования.Как ссылаться на переменные трансляции Spark Вне области видимости

  • Как это сделать?

мысли, у меня было снискать функцию, чтобы я передать ссылку на переменные вещания при использовании либо map или mapPartitions вызова.

  • Есть ли какие-либо последствия для производительности путем передачи вокруг ссылки на широковещательную переменную, которые обычно не обнаруживаются при определении функций внутри исходной области?

У меня было что-то вроде этого в виду (псевдо-код):

// firstFile.scala 
// --------------- 

def mapper(bcast: Broadcast)(row: SomeRow): Int = { 
    bcast.value(row._1) 
} 

def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator { 
    val broadcastVariable = bcast.value 

    for { 
    i <- iter 
    } yield broadcastVariable(i) 
}) 


// secondFile.scala 
// ---------------- 

import firstFile.{mapMyPartition, mapper} 

val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable)) 
.mapPartitions(mapMyPartition(bcastVariable)) 

ответ

2

Ваше решение должно работать нормально. В обоих случаях функция, переданная в map{Partitions}, будет содержать ссылку на переменную широковещательной передачи при сериализации, но не на ее значение, и только вызывать bcast.value при вычислении на узле.

Что нужно избегать нечто вроде

def mapper(bcast: Broadcast): SomeRow => Int = { 
    val value = bcast.value 
    row => value(row._1) 
} 
1

Вы делаете это правильно. Вам просто нужно помнить, чтобы передать широковещательную ссылку, а не само ее значение. Использование примера разница может быть показаны следующим образом:

а) эффективный способ:

// the whole Map[Int, Int] is serialized and sent to every worker 
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker 
.mapPartitions(mapMyPartition(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker 

б) неэффективный способ:

// the whole Map[Int, Int] is serialized and sent to every worker 
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker 
.mapPartitions(mapMyPartition(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker 

Конечно во втором примере mapper и mapMyPartition бы слегка отличающаяся подпись.

Смежные вопросы