2016-10-28 2 views
0

Я пытаюсь разархивировать некоторые файлы (которые сами содержат zipfiles) из хранилища google в хранилище Google.Google Dataflow: Coder для ZipInputStream

Таким образом, у меня есть следующий DoFn собрать ZipInputStreams:

static class UnzipFilesFN extends DoFn<GcsPath,ZipInputStream>{ 

private static final long serialVersionUID = 7373250969860890761L; 
public void processElement(ProcessContext c){ 
    GcsPath p = c.element(); 
    try{ 
     ZipInputStream zis = new ZipInputStream(new FileInputStream(p.toString())); 
     c.output(zis); 

    } 
    catch (FileNotFoundException fnfe){ 
     // 
    } 
    } 

}

И следующий пользовательский мойку, чтобы сделать распаковки и письменной части:

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<ZipInputStream> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<ZipInputStream, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<ZipInputStream, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     private long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(ZipInputStream zis){ 
      byte[] buffer = new byte[1024]; 
      try{ 
       ZipEntry ze = zis.getNextEntry(); 
       while(ze!=null){ 
        File f = new File(writeOp.sink.unzipTarget + "/" + ze.getName()); 
        FileOutputStream fos = new FileOutputStream(f); 
        int len; 
        while((len=zis.read(buffer))>0){ 
         fos.write(buffer, 0, len); 
        } 
        fos.close(); 
        this.totalUnzipped++; 
       } 
       zis.closeEntry(); 
       zis.close(); 
      } 
      catch(Exception e){ 
       // 
      } 

     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    final long filesUnziped;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

Когда я пытаюсь запустить конвейер, я получаю несколько ошибок:

Построение Coder от запасного варианта CoderProvider: не могу предоставить кодировщик для типа java.util.zip.ZipInputStream: [email protected]37 не может обеспечить Кодер для типа java.util.zip.ZipInputStream: не может обеспечить ProtoCoder, потому что java.util.zip.ZipInputStream не является подклассом com.google.protobuf.Message; [email protected] не смог предоставить кодер для типа java.util.zip.ZipInputStream: не может предоставить SerializableCoder, потому что java.util.zip.ZipInputStream не реализует Serializable. на com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail (TypedPValue.java:195) на com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder (TypedPValue.java:48) на com.google.cloud.dataflow.sdk.values.PCollection.getCoder (PCollection.java:137) по адресу com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying (TypedPValue.java:88) at com. google.cloud.dataflow.sdk.Pipeline.applyInternal (Pipeline.java:332) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform (Pipeline.java:291) в com.google.cloud.dataflow. sdk.values.PCollection.apply (PCollection.java:174)

Какой код Мне нужно назначить для обработки ZipInputStreams?

Благодаря & BR Philipp

ответ

0

Кодеры необходимы для того, чтобы бегун может материализовать PCollection для временного хранения и читать его обратно, а не держа его в памяти. Я не могу придумать разумный способ материализовать объект ZipInputStream - это принципиальная концептуальная проблема, а не проблема API Coder.

Однако, в вашем конкретном случае, я думаю, вы можете просто открыть ZipInputStream в вашей ZipWriter.write() функции, и сделать ZipIO.Sink быть Sink<GcsPath>, а не Sink<ZipInputStream>.

Еще одна вещь, которую я заметил в вашем коде: я предполагаю, что вы планируете использовать этот код с файлами, расположенными на GCS, и с бегуном Cloud Dataflow, а не только с бегунком и локальными файлами в памяти. В этом случае java.io.File не будет прозрачно обрабатывать чтение/запись в GCS - для этого вам нужно использовать GcsUtil.

+0

Привет, Я получаю ту же ошибку кодера при использовании GcsPath вместо ZipInputStream. Спасибо & BR Philipp – bigdataclown

+0

Oh. Мои извинения, GcsPath не отмечен Serializable (хотя я думаю, что это должно быть). Похоже, вам нужно будет указать путь как 'String'. – jkff

Смежные вопросы