2015-06-14 5 views
4

В моем коде Spark я пытаюсь создать IndexedRowMatrix из файла csv. Тем не менее, я получаю следующее сообщение об ошибке:Spark NotSerializableException

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
... 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 

Вот мой код:

sc = new JavaSparkContext("local", "App", 
       "/srv/spark", new String[]{"target/App.jar"}); 

JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache(); 


JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
       new Function<scala.Tuple2<String, Long>, IndexedRow>() { 
       /** 
       * 
       **/ 
       private static final long serialVersionUID = 4795273163954440089L; 

       @Override 
       public IndexedRow call(Tuple2<String, Long> tuple) 
         throws Exception { 
        String line = tuple._1; 
        long index = tuple._2; 
        String[] strings = line.split(","); 
        double[] doubles = new double[strings.length]; 
        for (int i = 0; i < strings.length; i++) { 
         doubles[i] = Double.parseDouble(strings[i]); 
        } 
        Vector v = new DenseVector(doubles); 
        return new IndexedRow(index, v); 
       } 
      }); 
+0

Вопросы просят отладки помощи должны (1) показать * всю * самодостаточную программу, демонстрирующую проблема, (2) показать трассировку * всего * стека и (3) указать линию, в которой происходит исключение. – kdgregory

+0

Тем не менее, я уверен, что 'sc' является переменной-членом содержащего класса. Не делайте этого, просто сделайте его локальной переменной *, как и все примеры в документации Spark *. – kdgregory

ответ

1

Что-то пахнет тусклым, и если вы показали нам больше коды, может быть, мы могли бы дать лучший ответ.

Во всяком случае, вы могли бы попытаться создать общественный класс в отдельном файле, который представляет вашу функцию сопоставителя:

public class Mapper implements Function<Tuple2<String,Long>, IndexedRow> { 

    @Override 
    public IndexedRow call(Tuple2<String, Long> tuple) throws Exception { 
    String line = tuple._1(); 
    long index = tuple._2(); 
    String[] strings = line.split(","); 
    double[] doubles = new double[strings.length]; 
    for (int i = 0; i < strings.length; i++) { 
     doubles[i] = Double.parseDouble(strings[i]); 
    } 
    Vector v = new DenseVector(doubles); 
    return new IndexedRow(index, v); 
    } 
} 

, а затем использовать его для отображения вашего JavaRDD:

JavaRDD<String> csv = jsc.textFile("data/matrix.csv").cache(); 
JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(new Mapper()); 

Таким образом, для этого вызова map() Spark должен только сериализовать класс Mapper, который не имеет в нем никаких несериализуемых свойств.

Тем не менее, работа может завершиться неудачей по другим причинам, которые мы не можем знать, потому что мы не можем видеть весь задействованный код.

2

Если у вас возникли проблемы с сериализацией, всегда полезно добавить следующий аргумент: -Dsun.io.serialization.extendedDebugInfo=true таким образом вы можете увидеть, где это происходит более точно.

Теперь, вот что может происходить в вашем коде. A JavaSparkContext действительно не сериализуем (по целому ряду причин, которые вы можете найти в Интернете). В вашем коде вы не сериализуете его напрямую, но вы держите ссылку на него, потому что ваш Function не является статичным и, следовательно, содержит ссылку на охватывающий класс. Итак, что в основном происходит, когда вы отправляете карту, так это то, что она попытается сериализовать также охватывающий класс, который содержит JavaSparkContext, который не является сериализуемым, вот откуда должно исходить ваше исключение. Вы можете попробовать переписать эту функцию либо статически, либо написать свою функцию как не-вложенный класс, либо сделать JavaSparkContext локальным, чтобы он не был сериализован.

Если возможно, я скорее советую вам принять последний вариант по той простой причине, что лучше всего создать этот JavaSparkContext локально, потому что в противном случае у вас будут сотни несеризуемых проблем из-за каждой ссылки (иногда сложно найти) вы можете придерживаться своего класса. Вы можете сделать это, например, путем instanciating вашего JavaSparkContext внутри основного класса:

public static void main(String[] args) { 
    JavaSparkContext sc = new JavaSparkContext(); 

    // do whatever you need to do, if you need sc inside other classes, 
    // store this sc into a static class, say Registry.set(sc) and Registry.getJSC() 

    JavaRDD<String> csv = sc.textFile("data/matrix.csv").cache(); 
    JavaRDD<IndexedRow> entries = csv.zipWithIndex().map(
      new Function<scala.Tuple2<String, Long>, IndexedRow>() { 
      private static final long serialVersionUID = 4795273163954440089L; // won't be serialized 

      @Override 
      public IndexedRow call(Tuple2<String, Long> tuple) 
        throws Exception { 
       String line = tuple._1; 
       long index = tuple._2; 
       String[] strings = line.split(","); 
       double[] doubles = new double[strings.length]; 
       for (int i = 0; i < strings.length; i++) { 
        doubles[i] = Double.parseDouble(strings[i]); 
       } 
       Vector v = new DenseVector(doubles); 
       return new IndexedRow(index, v); 
      } 
     }); 
} 

Заметит также, что статические поля не связаны с экземпляром, но в класс, так что я думаю, что ваш serialVersionUID не имеет серийный номер либо (в случае становится проблемой для вас в какой-то момент).

0

Любой код, написанный в драйвере и используемый в преобразованиях RDD, должен быть сериализован. Если у вас возникают проблемы с сериализацией, следуйте нижеуказанным принципам дизайна:

  1. Напишите весь код, который использует объекты, не связанные с сериализацией внутри преобразований (карта).
  2. Использовать forEachPartition в Spark для выполнения действий на каждый раздел. Любой код, который совместно используется преобразованиями RDD, должен всегда быть сериализуемым.
3

У меня была такая же проблема. Это заставило меня покрутить. Это ограничение Java для анонимных экземпляров и Serializability. Моим решением было объявить анонимный экземпляр функции как именованного статического класса, который реализует Serializable и создает его. Я в основном объявил библиотеку функций, которая была внешним классом, который включал определения статического внутреннего класса функций, которые я хотел использовать.

Конечно, если вы напишете его в Scala, это будет один файл, который, скорее всего, будет содержать более аккуратный код, но это не поможет вам в этом случае.

0

В целом объекты Rdd будут сериализованы при помощи искр, когда задачи будут переданы различным исполнителям. Но вы должны использовать закрытие, чтобы избежать этой ошибки.

Вы можете использовать Rdd.mapPartition() для обработки каждого раздела и поместить в него код. Таким образом, сама искра позаботится о сериализации и десериализации объектов карты.

0

создать отдельный класс для картографа и реализует Srielizable, иногда внутренние классы вызывают проблемы с компиляцией в искровом среде ..

Смежные вопросы