2015-03-27 3 views
0

я пытаюсь сделать некоторые тесты на Apache Спарке (v1.3.0), я простой Java 8 класс:Apache Spark - Простое Количество слов получает: SparkException: Задача не сериализуемый

public class WordCount { 
    private JavaSparkContext ctx; 
    private String inputFile, outputFile; 
    public WordCount(String inputFile, String outputFile) { 
     this.inputFile = inputFile; 
     this.outputFile = outputFile; 
     // Initialize Spark Conf 
     ctx = new JavaSparkContext("local", "WordCount", 
       System.getenv("SPARK_HOME"), System.getenv("JARS")); 

    } 

    public static void main(String... args) { 
     String inputFile = "/home/workspace/spark/src/main/resources/inferno.txt";//args[0]; 
     String outputFile = "/home/workspace/spark/src/main/resources/dv";//args[1]; 
     WordCount wc = new WordCount(inputFile, outputFile); 
     wc.doWordCount(); 
     wc.close(); 
    } 

    public void doWordCount() { 
     long start = System.currentTimeMillis(); 
     JavaRDD<String> inputRdd = ctx.textFile(inputFile); 
     JavaPairRDD<String, Integer> count = inputRdd.flatMapToPair((String s) -> { 
      List<Tuple2<String, Integer>> list = new ArrayList<>(); 
      Arrays.asList(s.split(" ")).forEach(s1 -> list.add(new Tuple2<String, Integer>(s1, 1))); 
      return list; 
     }).reduceByKey((x, y) -> x + y); 
     List<Tuple2<String, Integer>> list = count.takeOrdered(10, 
       (o1, o2) -> o2._2() - o1._2()); 
     list.forEach(t2 -> System.out.println(t2._1())); 
//  count.saveAsTextFile(outputFile); 
     long end = System.currentTimeMillis(); 
     System.out.println(String.format("Time in ms is: %d", end - start)); 
    } 

    public void close() { 
     ctx.stop(); 
    } 

} 

Когда я запустить его получаю следующее исключение:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1622) 
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) 
    at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1231) 
    at org.apache.spark.api.java.JavaRDDLike$class.takeOrdered(JavaRDDLike.scala:578) 
    at org.apache.spark.api.java.JavaPairRDD.takeOrdered(JavaPairRDD.scala:45) 
    at it.conker.spark.base.WordCount.doWordCount2(WordCount.java:65) 
    at it.conker.spark.base.WordCount.main(WordCount.java:37) 
Caused by: java.io.NotSerializableException: it.conker.spark.base.WordCount$$Lambda$10/1541232265 
Serialization stack: 
    - object not serializable (class: it.conker.spark.base.WordCount$$Lambda$10/1541232265, value: it.conker.spark.base.WordCount$$Lambda$10/[email protected]) 
    - field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator) 
    - object (class scala.math.LowPriorityOrderingImplicits$$anon$7, [email protected]) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$34, name: ord$1, type: interface scala.math.Ordering) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$34, <function1>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$3, type: interface scala.Function1) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 8 more 

это ми POM файл:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <groupId>it.conker.spark</groupId> 
    <artifactId>learning-spark-by-example</artifactId> 
    <modelVersion>4.0.0</modelVersion> 
    <name>Learning Spark by example</name> 
    <packaging>jar</packaging> 
    <version>0.0.1</version> 
    <dependencies> 
     <dependency> <!-- Spark dependency --> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.3.0</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.11</version> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 
    <properties> 
     <java.version>1.8</java.version> 
    </properties> 
    <build> 
     <pluginManagement> 
      <plugins> 
       <plugin> 
        <groupId>org.apache.maven.plugins</groupId> 
        <artifactId>maven-compiler-plugin</artifactId> 
        <version>3.1</version> 
        <configuration> 
         <source>${java.version}</source> 
         <target>${java.version}</target> 
        </configuration> 
       </plugin> 
      </plugins> 
     </pluginManagement> 
    </build> 
</project> 

Я бегу класс в затмении. Может кто-нибудь сказать мне, где я был неправ?

Edit: Как mark91 спросить, заменив строку:

List<Tuple2<String, Integer>> list = count.takeOrdered(10, 
      (o1, o2) -> o2._2() - o1._2()); 

с:

List<Tuple2<String, Integer>> list = count.takeOrdered(10); 

Я получил это исключение:

java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable 
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35) 
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672) 
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
15/03/27 17:58:55 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable 
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35) 
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672) 
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

15/03/27 17:58:55 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
15/03/27 17:58:55 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/27 17:58:55 INFO TaskSchedulerImpl: Cancelling stage 1 
15/03/27 17:58:56 INFO DAGScheduler: Job 0 failed: takeOrdered at WordCount.java:66, took 14.117721 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable 
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35) 
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672) 
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234) 
    at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
+0

Что произойдет, если вы удалите свою лямбда-функцию, переданную как второй параметр в ваш takeOrdered? – mgaido

+0

Я обновил сообщение! – conker84

ответ

0

Хорошо, причина в том, что все классы, которые вы используете в своей прецессии (Т.е. объекты, хранящиеся в вашем RDD, и классы, которые являются функциями, которые должны быть переданы искру) должны быть Serializable. Это означает, что им необходимо реализовать интерфейс Serializable, или вам нужно предоставить другой способ сериализации их как Kryo. На самом деле я не знаю, почему функция лямбда, которую вы определяете, не считается сериализуемой, но я считаю, что это связано с тем, что Java Comparator принимает параметр.

Однако способ сделать его работы является определение Serializable comaprator, такие как:

public class WordCountComparator implements Comparator<Tuple2<String, Integer>>, Serializable { 

@Override 
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { 
    // TODO Auto-generated method stub 
    return o2._2()-o1._2(); 
} 



} 

и затем передать его экземпляр в вашей takeOrdered функции в качестве второго параметра.

+0

Ничего себе это работает. Вы думаете, что это искра? Может быть, я могу открыть проблему в их github – conker84

+0

Нет, я так не думаю, но я не знаю, почему это так ведет себя ... Я думаю, причина в том, что для параметра вы должны перейти на интерфейс 'Comparator', но Я не уверен. – mgaido

+0

Попробуйте это. Как отмечалось в @ mark91, функция, которую вы передаете, должна быть сериализована. вы можете сделать свою лямбду сериализованной, указав ее явно. Список > list = count.takeOrdered (10, (Comparator & Serializable) (o1, o2) -> o2._2() - o1._2()); – user1182253

Смежные вопросы