2015-03-14 2 views
1

У меня есть новая установка искры 1.2.1 над кластером mapr, и при ее тестировании я нахожу, что он работает хорошо в локальном режиме, но в режимах пряжи он, похоже, не может получить доступ к переменным, ни при трансляции. Чтобы быть точным, следующий контрольный кодИскры карты RDD в режиме пряжи не позволяют получить доступ к переменным?

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object JustSpark extends App { 
val conf = new org.apache.spark.SparkConf().setAppName("SimpleApplication") 
val sc = new SparkContext(conf) 
val a = List(1,3,4,5,6) 
val b = List("a","b","c") 
val bBC= sc.broadcast(b) 
val data = sc.parallelize(a) 
val transform = data map (t => { "hi" }) 
transform.take(3) foreach (println _) 
val transformx2 = data map (t => { bBC.value.size }) 
transformx2.take(3) foreach (println _) 
//val transform2 = data map (t => { b.size }) 
//transform2.take(3) foreach (println _) 
} 

работает в локальном режиме, но не работает. Точнее, оба метода: transform2 и transformx2, fail, и все они работают, если --master local[8].

Я компиляции с SBT и отправки с инструментом представить

/opt/mapr/spark/spark-1.2.1/bin/spark-submit --class JustSpark --master yarn target/scala-2.10/simulator_2.10-1.0.jar 

Любая идея, что происходит? Сообщение о сбое просто утверждает, что имеет исключение нулевого указателя java в том месте, где он должен получить доступ к переменной. Есть ли другой способ передать переменные внутри карт RDD?

+0

Определить «сбой»? это важно, и вы не сказали, какую строку или какую ошибку. –

ответ

2

Я собираюсь сделать довольно хорошее предположение: это потому, что вы используете App. См. https://issues.apache.org/jira/browse/SPARK-4170. Вместо этого напишите main().

+0

Это! Я понял свой ответ, узнав об этом. Благодарю. – arivero

+0

А, обратите внимание, что я все еще могу использовать приложение, просто понижая до уровня, чтобы искровой код работал, по крайней мере, в этом случае. На самом деле первая попытка заключалась в том, чтобы написать основную часть, затем я немного экспериментировал. – arivero

0

Я полагаю, преступник были

val transform2 = data map (t => { b.size }) 

В частности, доступ к локальной переменной б. Фактически вы можете видеть в своих файлах журнала java.io.NotSerializableException.

Что должно произойти: Spark будет пытаться сериализовать любой объект ссылки. Это означает, что в данном случае весь класс JustSpark - поскольку упоминается один из его членов.

Зачем это произошло? Ваш класс не является Serializable. Поэтому Spark не может отправить его по кабелю. В частности, у вас есть ссылка на SparkContext - который не распространяется Serializable

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { 

So - ваш первый код - который делает Broadcast только значение переменной - это правильный путь.

+0

Извините, что неясно: также сбой передачи – arivero

+0

'b' определенно сериализуется здесь как список строк. Думаю, это не ошибка. Он говорит, что это NPE где-то. –

+0

okey dokey. Я никогда не использовал приложение, поэтому не знал его недостатков. – javadba

0

Это оригинальный пример трансляции, из искровых источников, измененных использовать списки вместо массивов:

import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkConf, SparkContext} 
object MultiBroadcastTest { 
def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test") 
val sc = new SparkContext(sparkConf) 
val slices = if (args.length > 0) args(0).toInt else 2 
val num = if (args.length > 1) args(1).toInt else 1000000 
val arr1 = (1 to num).toList 
val arr2 = (1 to num).toList 
val barr1 = sc.broadcast(arr1) 
val barr2 = sc.broadcast(arr2) 
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ => 
    (barr1.value.size, barr2.value.size) 
} 
observedSizes.collect().foreach(i => println(i)) 
sc.stop() 
}} 

Я собирал его в своей среде, и это работает.

Так в чем же разница?

Проблемный пример использует extends App, в то время как исходный пример - простой одноэлементный.

Так что я понижен код к "Doit()" функции

object JustDoSpark extends App{ 
def doIt() { 
... 
} 
doIt() 

и угадать, что. Это сработало.

Несомненно, проблема связана с сериализацией действительно, но по-другому. Наличие кода в теле объекта, по-видимому, вызывает проблемы.

+1

Это действительно любопытное из-за delayedInit из App, а не сама сериализация. –

+0

И в соответствии с https://github.com/jongwook даже def может быть затенен, просто 'extends App {{...}}' будет работать. См. Комментарии в https://github.com/apache/spark/pull/3497 – arivero

Смежные вопросы