2017-02-07 2 views
3

Я проверяю искру потокового приложения с помощью «com.holdenkarau.spark-испытательной базы» и scalatest.

import com.holdenkarau.spark.testing.StreamingSuiteBase 
import org.apache.spark.rdd.RDD 
import org.scalatest.{ BeforeAndAfter, FunSuite } 

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase { 

    var delim: String = "," 

    before { 
    System.clearProperty("spark.driver.port") 
    } 

    test(“This Fails“) { 

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv")) 
    val input = source.getLines.toList 

    val rowRDDOut = Calculator.do(sc.parallelize(input)) //Returns DataFrame 

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1)) 

    source.close 
    } 
} 

я получаю исключение сериализации для поля 'DELIM':

org.apache.spark.SparkException: Task not serializable 
[info] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
[info] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
[info] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
[info] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
[info] at org.apache.spark.rdd.RDD.map(RDD.scala:323) 
[info] ... 
[info] Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper 
[info] Serialization stack: 
[info] - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: [email protected]) 
[info] - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) 

Если я заменю 'DELIM' на значение Строка в месте, он отлично работает.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1)) 

В чем разница между первой и второй версией?

Заранее благодарен!

ответ

4

Проблема не в типе delim (String), это delim.

Не пытайтесь определить переменные за пределами методов test(). Если вы определяете delm внутри своего test, он должен работать.

test(“This Fails“) { 
    val delim = "," 
    ... 
} 

Теперь вы можете спросить, почему? Ну, когда вы ссылаетесь на delim из внешнего объема, Scala попытается объединить окружающий объект class Test. Этот объект содержит ссылку на org.scalatest.Assertions$AssertionsHelper, что он не является Serializable (см. Ваш стек).

+1

Whoa Sir! Не мог даже подумать! Благодаря! Работал как шарм. –

Смежные вопросы