2017-02-17 7 views
3

Я создал писателя для BucketingSink. Раковина и писатель работают без ошибок, но когда дело доходит до писателя, пишущего avro genericrecord для паркета, файл был создан из незавершенного процесса, ожидающего завершения. Но файлы пустые с 0 байтами. Может ли кто-нибудь сказать мне, что не так с кодом? Я попытался поставить инициализацию AvroParquetWriter на метод open(), но результат все тот же.Flink BucketingSink с пользовательским AvroParquetWriter создать пустой файл

При отладке кода, я подтверждаю, что writer.write (элемент) не выполняется, и элемент содержит AVRO данные genericrecord

потоковые данные

BucketingSink<DataEventRecord> sink = 
    new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/"); 

sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm")); 
sink.setWriter(new ParquetSinkWriter<DataEventRecord>()); 

ParquetSinkWriter

import java.io.File; 
import java.io.IOException; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.flink.streaming.connectors.fs.StreamWriterBase; 
import org.apache.flink.streaming.connectors.fs.Writer; 
import org.apache.parquet.avro.AvroParquetWriter; 
import org.apache.parquet.hadoop.ParquetWriter; 
import org.apache.parquet.hadoop.metadata.CompressionCodecName; 
import com.any.DataEventRecord; 

public class ParquetSinkWriter<T> extends StreamWriterBase<T> { 

    private transient ParquetWriter<GenericRecord> writer; 

    private Path path; 
    private FileSystem fs; 
    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; 
    private final int blockSize = 256 * 1024 * 1024; 
    private final int pageSize = 64 * 1024; 


    @Override 
    // workaround 
    public void open(FileSystem fs, Path path) throws IOException { 
    super.open(fs, path); 
    this.path = path; 
    this.fs = fs; 
    } 

    @Override 
    public void write(T event) throws IOException { 
    DataEventRecord element = (DataEventRecord) event; 

    if (writer == null) { 
     writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize); 
    } 

    if (writer != null) { 
     GenericRecord datum = element.getRecord(); 
     writer.write(datum); 
    } 
    } 

    @Override 
    public void close() throws IOException { 
    if (writer != null) { 
     writer.close(); 
    } 
    super.close(); 
    } 

    @Override 
    public Writer<T> duplicate() { 
    return new ParquetSinkWriter<T>(); 
    } 

} 
+0

Мне удалось решить проблему. Существует проблема, когда вызов super.open (fs, path) в то же время создает экземпляр AvroParquetWRiter во время процесса записи. Открытое событие уже создает файл, и автор также пытается создать тот же файл, но не может, потому что файл уже существует. Поэтому в файл всегда записывается 0 записей, так как автор Avro не записывает в уже существующий файл. Удаление super.open приведет к сбою базового класса из-за того, что «Writer не открыт». В конечном итоге я расширяю свой собственный класс раковины на основе BucketingSink, и теперь все работает нормально. – jlim

+0

Не могли бы вы показать какой-то ссылочный код о том, как вы его решили? Я также придерживаюсь той же проблемы – neoeahit

+0

Не можете ли вы просто реализовать интерфейс 'Writer' вместо использования' StreamWriterBase'? 'StreamWriterBase' открывает' FSDataOutputStream' файл, который вам не нужен. –

ответ

0

Непосредственно реализации Writer должно выглядеть как

import org.apache.flink.util.Preconditions; 

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.parquet.avro.AvroParquetWriter; 
import org.apache.parquet.hadoop.ParquetWriter; 
import org.apache.parquet.hadoop.metadata.CompressionCodecName; 

import java.io.IOException; 

/** 
* Parquet writer. 
* 
* @param <T> 
*/ 
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> { 

    private static final long serialVersionUID = -975302556515811398L; 

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; 
    private final int pageSize = 64 * 1024; 

    private final String schemaRepresentation; 

    private transient Schema schema; 
    private transient ParquetWriter<GenericRecord> writer; 
    private transient Path path; 

    private int position; 

    public ParquetSinkWriter(String schemaRepresentation) { 
     this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation); 
    } 

    @Override 
    public void open(FileSystem fs, Path path) throws IOException { 
     this.position = 0; 
     this.path = path; 

     if (writer != null) { 
      writer.close(); 
     } 

     writer = createWriter(); 
    } 

    @Override 
    public long flush() throws IOException { 
     Preconditions.checkNotNull(writer); 
     position += writer.getDataSize(); 
     writer.close(); 
     writer = createWriter(); 

     return position; 
    } 

    @Override 
    public long getPos() throws IOException { 
     Preconditions.checkNotNull(writer); 
     return position + writer.getDataSize(); 
    } 

    @Override 
    public void close() throws IOException { 
     if (writer != null) { 
      writer.close(); 
      writer = null; 
     } 
    } 

    @Override 
    public void write(T element) throws IOException { 
     Preconditions.checkNotNull(writer); 
     writer.write(element); 
    } 

    @Override 
    public Writer<T> duplicate() { 
     return new ParquetSinkWriter<>(schemaRepresentation); 
    } 

    private ParquetWriter<GenericRecord> createWriter() throws IOException { 
     if (schema == null) { 
      schema = new Schema.Parser().parse(schemaRepresentation); 
     } 

     return AvroParquetWriter.<GenericRecord>builder(path) 
      .withSchema(schema) 
      .withDataModel(new GenericData()) 
      .withCompressionCodec(compressionCodecName) 
      .withPageSize(pageSize) 
      .build(); 
    } 
} 
+0

Вызвано: org.apache.hadoop.fs.FileAlreadyExistsException: /clicks-json/partitionkey=2018-01-12--16-30/_part-6-0.in-progress для клиента 127.0.0.1 уже существует \t на org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal (FSNamesystem.java:2563) на org.apache.parquet.hadoop.ParquetWriter $ Builder.build (ParquetWriter.java:495) \t at org. apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState (AbstractUdfStreamOperator.java:90) \t at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState (AbstractStreamOperator.java:357) –

Смежные вопросы