2015-02-06 2 views
0

Я построение DataFlow трубопровода, например, в основном, на основе коды на https://cloud.google.com/dataflow/java-sdk/combineNotSerializableException: org.apache.avro.io.DecoderFactory в трубопроводе Google Cloud Dataflow

Но когда я запускаю мой код, я переживаю следующее исключение:

исключение в потоке "основного" java.lang.IllegalArgumentException: неспособно сериализовать com.google.[email protected]139982de на com.google. cloud.dataflow .sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:51) на com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable (SerializableUtils.java:81) на com.google.cloud.dataflow .sdk.runners.DirectPipelineRunner $ Evaluator.ensureSerializable (DirectPipelineRunner.java:784) на com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper (ParDo.java:1025) в com.google.cloud .dataflow.sdk.transforms.ParDo.evaluateSingleHelper (ParDo.java:963) на com.google.cloud.dataflow.sdk.transforms.ParDo.access $ 000 (ParDo.java:441) на com.google. cloud.dataflow.sdk.transforms.ParDo $ 1. Вычислить значение (Par Do.java:951) на com.google.cloud.dataflow.sdk.transforms.ParDo $ 1. Вычислить значение (ParDo.java:946) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner $ оценщика .visitTransform (DirectPipelineRunner.java:611) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:200) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode .visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode .visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:109) на com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically (Pipeline.java:204) в com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner $ Evaluator.run (DirectPipelineRunner.java:584) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run (DirectPipelineRunner.java:328) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run (DirectPipelineRunner.java:70) по адресу com.google.cloud.dataflow.sdk.Pip eline.run (Pipeline.java:145) в com.google.cloud.dataflow.examples.CalcMeanExample.main (CalcMeanExample.java:50) Вызвано: java.io.NotSerializableException: org.apache.avro.io .DecoderFactory на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) на java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java: 1509) по адресу java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) по адресу java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.Java: 1178) на java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) на java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream .java: 1432) на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) на java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) на com.google.cloud.dataflow. sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:47) ... 20 еще

Мой код выглядит следующим образом:

package com.google.cloud.dataflow.examples; 

import java.io.Serializable; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.AvroCoder; 
import com.google.cloud.dataflow.sdk.coders.DefaultCoder; 
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.Default; 
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; 
import com.google.cloud.dataflow.sdk.options.Description; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.Combine; 
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; 
import com.google.cloud.dataflow.sdk.values.PCollection; 


public class CalcMeanExample 

{

public static void main(String[] args) 
{ 
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 
    Pipeline p = Pipeline.create(options); 

    PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput())); 

    numbers.apply(ParDo.of(new DoFn<String,String>(){ 
     @Override 
     public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { 

      System.out.println(c.element()); 

     } 
    })); 

    PCollection<String> average = numbers.apply(Combine.globally(new AverageFn())); 


    average.apply(TextIO.Write.named("WriteAverage") 
      .to(options.getOutput()) 
      .withNumShards(options.getNumShards())); 

    p.run(); 

    System.out.println("done"); 
} 


public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> { 
     @DefaultCoder(AvroCoder.class) 
     public static class Accum implements Serializable { 
     int sum = 0; 
     int count = 0; 
     } 
     public Accum createAccumulator() { return new Accum(); } 
     public void addInput(Accum accum, String input) { 
      accum.sum += Integer.parseInt(input); 
      accum.count++; 
     } 
     public Accum mergeAccumulators(Iterable<Accum> accums) { 
     Accum merged = createAccumulator(); 
     for (Accum accum : accums) { 
      merged.sum += accum.sum; 
      merged.count += accum.count; 
     } 
     return merged; 
     } 
     public String extractOutput(Accum accum) { 
     return Double.toString(((double) accum.sum)/accum.count); 
     } 
    } 



    /** 
    * Options supported by {@link WordCount}. 
    * <p> 
    * Inherits standard configuration options. 
    */ 
    public static interface Options extends PipelineOptions { 
    @Description("Path of the file to read from") 
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") 
    String getInput(); 
    void setInput(String value); 

    @Description("Path of the file to write to") 
    @Default.InstanceFactory(OutputFactory.class) 
    String getOutput(); 
    void setOutput(String value); 

    /** 
    * Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination. 
    */ 
    public static class OutputFactory implements DefaultValueFactory<String> { 
     @Override 
     public String create(PipelineOptions options) { 
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
     if (dataflowOptions.getStagingLocation() != null) { 
      return GcsPath.fromUri(dataflowOptions.getStagingLocation()) 
       .resolve("sorts.txt").toString(); 
     } else { 
      throw new IllegalArgumentException("Must specify --output or --stagingLocation"); 
     } 
     } 
    } 

    /** 
    * By default (numShards == 0), the system will choose the shard count. 
    * Most programs will not need this option. 
    */ 
    @Description("Number of output shards (0 if the system should choose automatically)") 
    @Default.Integer(1) 
    int getNumShards(); 
    void setNumShards(int value); 
    }  

}

Любые мысли о том, что будет причиной этого?

ответ

1

Мы знаем об этой проблеме и работаем над исправлением, которое должно быть доступно в ближайшее время.

На данный момент вы можете использовать SerializableCoder вместо AvroCoder для аккумулятора.

Смежные вопросы