2017-01-06 3 views
0

Так я работаю с Java, чтобы развернуть в искру, это был мой исходный код:Serialized задача превышает максимально допустимый, Спарк - Кластер

List<Float> data = some_data; 
JavaRDD<Float> dataAsRDD = javaSparkContext.parallelize(data); 
JavaRDD<Float> dataWithoutNaN = dataAsRDD.filter(number -> !number.isNaN()); 
JavaDoubleRDD dataAsDouble = dataWithoutNaN.mapToDouble(number -> (double) number); 
logger.info("\t\t\tMean: " + dataAsDouble.mean()); 

Таким образом, это может работать в апача Искре автономный режим с предупреждение, но в кластере режим сохраняет подножку в ошибку: (строка 86 является dataAsDouble.mean())

17/01/06 17:54:24 INFO DAGScheduler: Job 0 failed: mean at Cluster.java:86, took 12.678086 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 45337325 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values. 

Я следую инструкциям, предложенные в исключении, Exceeding spark.akka.frameSize when saving Word2VecModel и Spark broadcast error: exceeds spark.akka.frameSize Consider using broadcast, я использовал переменное вещание:

List<Float> dataAsList = some_data; 
Broadcast<List<Float>> broadcast = javaSparkContext.broadcast(dataAsList); 
JavaRDD<Float> dataAsRDD = javaSparkContext.parallelize(broadcast.value()); 
JavaRDD<Float> dataWithoutNaN = dataAsRDD.filter(number -> !number.isNaN()); 
JavaDoubleRDD dataAsDouble = dataWithoutNaN.mapToDouble(number -> (double) number); 
logger.info("\t\t\tMean: " + dataAsDouble.mean()); 

Но я продолжаю вести ту же ошибку, что я делаю неправильно?

Заранее благодарен!

+0

В чем смысл использования Spark, когда вы просто распараллеливаете некоторую структуру данных и вызывают метод 'mean' в этой структуре? Не проще ли вычислять среднее значение с помощью регулярных операций Java? –

+0

Привет @ PawełJurczenko, я строю POC, используя [SparkTS] (http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series- data-with-apache-spark /), а мой код немного сложнее, но я использовал 'mean' для иллюстрации моей проблемы. – acastavi

ответ

0

Вы используете трансляции не по назначению. Они не должны быть немедленно преобразованы в RDD (как в javaSparkContext.parallelize(broadcast.value())), но они должны быть переданы в операции с RDD. Поскольку вы распараллеливаете некоторую структуру данных только для того, чтобы вычислить ее среднее значение (я полагаю, вы просто экспериментируете с Spark), вы не можете добиться этого с помощью трансляций. Как предупреждает сообщение, у вас есть альтернативное решение: увеличение spark.akka.frameSize. Это может быть достигнуто путем передачи этого к вашей искровой оболочке/искровому представить:

--conf spark.akka.frameSize=64 

что позволит увеличить размер кадра до 64 МБ.

+0

Привет @PawelJurczenko, вы могли бы быть немного более конкретным о _they должны быть переданы в операции над RDD_, код, который я написал, был основан на http://spark.apache.org/docs/1.3.1/programming-guide. html # broadcast-variables – acastavi

+0

Трансляции должны передаваться таким операциям, как фильтр, карта, flatMap и т. д. Например, вы можете транслировать массив целых чисел (тип будет «Broadcast '), и вы можете использовать его внутри операции фильтра (что может, например, проверить, присутствует ли текущий обработанный элемент в широковещательном массиве) –

Смежные вопросы