2016-04-11 2 views
1

Я определил схему AVRO и создал несколько классов с помощью авро-инструментов для схем. Теперь я хочу сериализовать данные на диск. Я нашел несколько ответов о scala для этого, но не для Java. Класс Article генерируется с помощью avro-tools и выполняется из определенной мной схемы.Как сериализовать данные в схеме AVRO в Spark (с Java)?

Вот упрощенная версия кода, как я пытаюсь сделать это:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> { 
    // The name of the file 
    String fileName = fileNameContent._1(); 
    // The content of the file 
    String fileContent = fileNameContent._2(); 

    // An object from my avro schema 
    Article a = new Article(fileContent); 

    Processing processing = new Processing(); 
    // .... some processing of the content here ... // 

    processing.serializeArticleToDisk(avroFileName); 

    return a; 
}); 

где serializeArticleToDisk(avroFileName) определяется следующим образом:

public void serializeArticleToDisk(String filename) throws IOException{ 
    // Serialize article to disk 
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class); 
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter); 
    dataFileWriter.create(this.article.getSchema(), new File(filename)); 
    dataFileWriter.append(this.article); 
    dataFileWriter.close(); 
} 

где Article моя Avro схема.

Теперь, картограф бросает мне ошибку:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory) 
at java.io.FileOutputStream.open0(Native Method)  
at java.io.FileOutputStream.open(FileOutputStream.java:270)  
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) 
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) 
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60) 
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129) 
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129) 
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)  
. . . rest of the stacktrace ... 

хотя путь к файлу является правильным.

После этого я использую метод collect(), поэтому все остальное в функции map работает нормально (за исключением части сериализации).

Я совершенно новый с Spark, поэтому я не уверен, что это может быть что-то тривиальное на самом деле. Я подозреваю, что мне нужно использовать некоторые функции записи, а не писать в картографе (не уверен, правда ли это, правда). Любые идеи, как справиться с этим?

EDIT:

Последняя строка ошибки стека-следа я показал, на самом деле в этой части:

dataFileWriter.create(this.article.getSchema(), new File(filename));

Это та часть, которая бросает фактическую ошибку. Я предполагаю, что dataFileWriter нужно заменить чем-то другим. Есть идеи?

+0

Возможно, посмотрите обсуждения и ответы здесь: http://stackoverflow.com/questions/20612571/spark-writing-to-avro-file –

+0

Я уже видел это, меня больше интересовал Java-эквивалент , Спасибо за комментарий! – Belphegor

ответ

1

Это решение не использует данные-кадров и не бросать какие-либо ошибки:

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.avro.mapred.AvroKey; 
import org.apache.spark.api.java.JavaPairRDD; 
import scala.Tuple2; 

    . . . . . 

// Serializing to AVRO 
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {  
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get()); 
}); 
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema()); 
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
     job.getConfiguration()); 

где AvroUtils.getJobOutputKeyAvroSchema есть:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) { 
    Job job; 

    try { 
     job = new Job(); 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 

    AvroJob.setOutputKeySchema(job, avroSchema); 
    return job; 
} 

Аналогичные вещи для Spark + Avro можно найти здесь ->https://github.com/CeON/spark-utils.

0

Похоже, что вы используете Spark неправильно.

Map является функцией преобразования. Просто вызывая map, он не вызывает тотализацию RDD. Вы должны позвонить действию как forEach() или collect().

Также обратите внимание, что лямбда, поставляемая в map, будет сериализована у водителя и перенесена в некоторый Node в кластер.

ДОБАВЛЕНО

Попробуйте использовать Спарк SQL и Spark-Avro, чтобы сохранить Спарк DataFrame в Avro формате:

// Load a text file and convert each line to a JavaBean. 
JavaRDD<Person> people = sc.textFile("/examples/people.txt") 
    .map(Person::parse); 

// Apply a schema to an RDD 
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class); 
peopleDF.write() 
    .format("com.databricks.spark.avro") 
    .save("/output"); 
+0

Что вы говорите - 'map' абсолютно делает вызов вычисления' RDD'. 'map' возвращает новый' RDD' со всеми повторными вычислениями элементов на основе функции 'map'. –

+0

@Denis Kokorin: Я использую 'collect()' впоследствии, поэтому все в 'map' работает уже, это нормально. Все, кроме сериализации, работает в функции «map». – Belphegor

+0

Возможно, он имеет в виду, что вам следует приклеить «foreach» после «карты» и написать там свое письмо? Было бы полезно, если бы этот ответ имел пример кода. –

Смежные вопросы