я пытаюсь сделать некоторые тесты на Apache Спарке (v1.3.0), я простой Java 8 класс:Apache Spark - Простое Количество слов получает: SparkException: Задача не сериализуемый
public class WordCount {
private JavaSparkContext ctx;
private String inputFile, outputFile;
public WordCount(String inputFile, String outputFile) {
this.inputFile = inputFile;
this.outputFile = outputFile;
// Initialize Spark Conf
ctx = new JavaSparkContext("local", "WordCount",
System.getenv("SPARK_HOME"), System.getenv("JARS"));
}
public static void main(String... args) {
String inputFile = "/home/workspace/spark/src/main/resources/inferno.txt";//args[0];
String outputFile = "/home/workspace/spark/src/main/resources/dv";//args[1];
WordCount wc = new WordCount(inputFile, outputFile);
wc.doWordCount();
wc.close();
}
public void doWordCount() {
long start = System.currentTimeMillis();
JavaRDD<String> inputRdd = ctx.textFile(inputFile);
JavaPairRDD<String, Integer> count = inputRdd.flatMapToPair((String s) -> {
List<Tuple2<String, Integer>> list = new ArrayList<>();
Arrays.asList(s.split(" ")).forEach(s1 -> list.add(new Tuple2<String, Integer>(s1, 1)));
return list;
}).reduceByKey((x, y) -> x + y);
List<Tuple2<String, Integer>> list = count.takeOrdered(10,
(o1, o2) -> o2._2() - o1._2());
list.forEach(t2 -> System.out.println(t2._1()));
// count.saveAsTextFile(outputFile);
long end = System.currentTimeMillis();
System.out.println(String.format("Time in ms is: %d", end - start));
}
public void close() {
ctx.stop();
}
}
Когда я запустить его получаю следующее исключение:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1231)
at org.apache.spark.api.java.JavaRDDLike$class.takeOrdered(JavaRDDLike.scala:578)
at org.apache.spark.api.java.JavaPairRDD.takeOrdered(JavaPairRDD.scala:45)
at it.conker.spark.base.WordCount.doWordCount2(WordCount.java:65)
at it.conker.spark.base.WordCount.main(WordCount.java:37)
Caused by: java.io.NotSerializableException: it.conker.spark.base.WordCount$$Lambda$10/1541232265
Serialization stack:
- object not serializable (class: it.conker.spark.base.WordCount$$Lambda$10/1541232265, value: it.conker.spark.base.WordCount$$Lambda$10/[email protected])
- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, [email protected])
- field (class: org.apache.spark.rdd.RDD$$anonfun$34, name: ord$1, type: interface scala.math.Ordering)
- object (class org.apache.spark.rdd.RDD$$anonfun$34, <function1>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$3, type: interface scala.Function1)
- object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 8 more
это ми POM файл:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>it.conker.spark</groupId>
<artifactId>learning-spark-by-example</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Learning Spark by example</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Я бегу класс в затмении. Может кто-нибудь сказать мне, где я был неправ?
Edit: Как mark91 спросить, заменив строку:
List<Tuple2<String, Integer>> list = count.takeOrdered(10,
(o1, o2) -> o2._2() - o1._2());
с:
List<Tuple2<String, Integer>> list = count.takeOrdered(10);
Я получил это исключение:
java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/27 17:58:55 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/27 17:58:55 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/03/27 17:58:55 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/03/27 17:58:55 INFO TaskSchedulerImpl: Cancelling stage 1
15/03/27 17:58:56 INFO DAGScheduler: Job 0 failed: takeOrdered at WordCount.java:66, took 14.117721 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1234)
at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1231)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Что произойдет, если вы удалите свою лямбда-функцию, переданную как второй параметр в ваш takeOrdered? – mgaido
Я обновил сообщение! – conker84