Я создал писателя для BucketingSink. Раковина и писатель работают без ошибок, но когда дело доходит до писателя, пишущего avro genericrecord для паркета, файл был создан из незавершенного процесса, ожидающего завершения. Но файлы пустые с 0 байтами. Может ли кто-нибудь сказать мне, что не так с кодом? Я попытался поставить инициализацию AvroParquetWriter на метод open(), но результат все тот же.Flink BucketingSink с пользовательским AvroParquetWriter создать пустой файл
При отладке кода, я подтверждаю, что writer.write (элемент) не выполняется, и элемент содержит AVRO данные genericrecord
потоковые данные
BucketingSink<DataEventRecord> sink =
new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");
sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());
ParquetSinkWriter
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;
public class ParquetSinkWriter<T> extends StreamWriterBase<T> {
private transient ParquetWriter<GenericRecord> writer;
private Path path;
private FileSystem fs;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int blockSize = 256 * 1024 * 1024;
private final int pageSize = 64 * 1024;
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.path = path;
this.fs = fs;
}
@Override
public void write(T event) throws IOException {
DataEventRecord element = (DataEventRecord) event;
if (writer == null) {
writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
}
if (writer != null) {
GenericRecord datum = element.getRecord();
writer.write(datum);
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<T>();
}
}
Мне удалось решить проблему. Существует проблема, когда вызов super.open (fs, path) в то же время создает экземпляр AvroParquetWRiter во время процесса записи. Открытое событие уже создает файл, и автор также пытается создать тот же файл, но не может, потому что файл уже существует. Поэтому в файл всегда записывается 0 записей, так как автор Avro не записывает в уже существующий файл. Удаление super.open приведет к сбою базового класса из-за того, что «Writer не открыт». В конечном итоге я расширяю свой собственный класс раковины на основе BucketingSink, и теперь все работает нормально. – jlim
Не могли бы вы показать какой-то ссылочный код о том, как вы его решили? Я также придерживаюсь той же проблемы – neoeahit
Не можете ли вы просто реализовать интерфейс 'Writer' вместо использования' StreamWriterBase'? 'StreamWriterBase' открывает' FSDataOutputStream' файл, который вам не нужен. –