2016-11-02 3 views
1

Я пытаюсь создать пользовательскую раковину для распаковки файлов.Google Dataflow: java.lang.IllegalArgumentException: Can not setCoder (null)

Имея этот простой код:

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<String> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    private final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<String, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<String, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     public long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(String p){ 
      System.out.println(p); 
     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    public long filesUnziped=0;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

Обработка с ошибкой:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot setCoder(null) at com.google.cloud.dataflow.sdk.values.TypedPValue.setCoder(TypedPValue.java:67) at com.google.cloud.dataflow.sdk.values.PCollection.setCoder(PCollection.java:150) at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:380) at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:112) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2118) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2099) at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:465) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:463) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) at com.mcd.de.tlogdataflow.StarterPipeline.main(StarterPipeline.java:93)

Любая помощь приветствуется.

Благодаря & BR Philipp

ответ

0

Это падение вызвано ошибкой в ​​Dataflow Java SDK (specifically, this line), который также присутствовал в Apache Beam (инкубирования) Java SDK.

Метод Sink.WriterOperation#getWriterResultCoder() всегда должен быть переопределен, но мы не смогли его пометить abstract. Он исправлен в Beam, но не изменяется в SDK Dataflow. Вы должны переопределить этот метод и вернуть соответствующий кодер.

У вас есть несколько вариантов, чтобы придумать с кодером:

  1. Написать свой собственный небольшой класс кодировщиков, обертывание один из VarLongCoder или BigEndianLongCoder
  2. Просто используйте long вместо UnzipResult структуры, так что вы можете использовать те как есть.
  3. менее желательно из-за избыточного размера, можно использовать SerializableCoder.of(UnzipResult.class)
+0

Hi Кенн, спасибо за вашу помощь. Я изменил его на длинную структуру, когда операция close() вернула длинную. Тем не менее, я получаю то же самое не может setCoder (null) Exception. – bigdataclown

+0

Привет, Кенн, вы нашли решение для этого? Спасибо & BR Philipp – bigdataclown

+0

Вы переопределили 'getWriterResultCoder()'? Это то, что вам нужно будет сделать. –