2016-11-08 5 views
0

Я получаю задание не сериализуемую ошибку в Spark. Я искал и пытался использовать статическую функцию, как это было предложено в некоторых сообщениях, но она по-прежнему дает ту же ошибку.Задача не Serializable - Spark Java

код, как показано ниже:

public class Rating implements Serializable { 
    private SparkSession spark; 
    private SparkConf sparkConf; 
    private JavaSparkContext jsc; 
    private static Function<String, Rating> mapFunc; 

    public Rating() { 
     mapFunc = new Function<String, Rating>() { 
      public Rating call(String str) { 
       return Rating.parseRating(str); 
      } 
     }; 
    } 

    public void runProcedure() { 
     sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local"); 
     jsc = new JavaSparkContext(sparkConf); 
     SparkSession spark = SparkSession.builder().master("local").appName("Word Count") 
      .config("spark.some.config.option", "some-value").getOrCreate();   

     JavaRDD<Rating> ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt") 
       .javaRDD() 
       .map(mapFunc); 
    } 

    public static void main(String[] args) { 
     Rating newRating = new Rating(); 
     newRating.runProcedure(); 
    } 
} 

ошибка дает: enter image description here

Как решить эту ошибку? Спасибо заранее.

ответ

7

Ясно Rating не может быть Serializable, потому что она содержит ссылки на Спарк структуры (т.е. SparkSession, SparkConf и т.д.) в качестве атрибутов.

Проблема здесь заключается в

JavaRDD<Rating> ratingsRD = spark.read().textFile("sample_movielens_ratings.txt") 
      .javaRDD() 
      .map(mapFunc); 

Если посмотреть на определение mapFunc, вы возвращающая Rating объект.

mapFunc = new Function<String, Rating>() { 
    public Rating call(String str) { 
     return Rating.parseRating(str); 
    } 
}; 

Эта функция используется внутри mapтрансформации в терминах Спарк). Поскольку преобразования выполняются непосредственно в рабочих узлах, а не в узле драйвера, их код должен быть сериализуемым. Это заставляет Spark попробовать сериализовать класс Rating, но это невозможно.

Попробуйте извлечь необходимые функции с Rating и разместите их в другом классе, который не имеет структуры Spark. Наконец, используйте этот новый класс как возвращаемый тип вашей функции mapFunc.

+0

Отделив рейтинг и процедуру на два класса! Благодаря :) – Fleur

Смежные вопросы