Я построение DataFlow трубопровода, например, в основном, на основе коды на https://cloud.google.com/dataflow/java-sdk/combineNotSerializableException: org.apache.avro.io.DecoderFactory в трубопроводе Google Cloud Dataflow
Но когда я запускаю мой код, я переживаю следующее исключение:
исключение в потоке "основного" java.lang.IllegalArgumentException: неспособно сериализовать com.google.[email protected]139982de на com.google. cloud.dataflow .sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:51) на com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable (SerializableUtils.java:81) на com.google.cloud.dataflow .sdk.runners.DirectPipelineRunner $ Evaluator.ensureSerializable (DirectPipelineRunner.java:784) на com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper (ParDo.java:1025) в com.google.cloud .dataflow.sdk.transforms.ParDo.evaluateSingleHelper (ParDo.java:963) на com.google.cloud.dataflow.sdk.transforms.ParDo.access $ 000 (ParDo.java:441) на com.google. cloud.dataflow.sdk.transforms.ParDo $ 1. Вычислить значение (Par Do.java:951) на com.google.cloud.dataflow.sdk.transforms.ParDo $ 1. Вычислить значение (ParDo.java:946) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner $ оценщика .visitTransform (DirectPipelineRunner.java:611) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:200) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode .visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode .visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit (TransformTreeNode.java:196) на com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:109) на com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically (Pipeline.java:204) в com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner $ Evaluator.run (DirectPipelineRunner.java:584) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run (DirectPipelineRunner.java:328) на com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run (DirectPipelineRunner.java:70) по адресу com.google.cloud.dataflow.sdk.Pip eline.run (Pipeline.java:145) в com.google.cloud.dataflow.examples.CalcMeanExample.main (CalcMeanExample.java:50) Вызвано: java.io.NotSerializableException: org.apache.avro.io .DecoderFactory на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) на java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java: 1509) по адресу java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) по адресу java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.Java: 1178) на java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) на java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream .java: 1432) на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) на java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) на com.google.cloud.dataflow. sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:47) ... 20 еще
Мой код выглядит следующим образом:
package com.google.cloud.dataflow.examples;
import java.io.Serializable;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;
public class CalcMeanExample
{
public static void main(String[] args)
{
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput()));
numbers.apply(ParDo.of(new DoFn<String,String>(){
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
System.out.println(c.element());
}
}));
PCollection<String> average = numbers.apply(Combine.globally(new AverageFn()));
average.apply(TextIO.Write.named("WriteAverage")
.to(options.getOutput())
.withNumShards(options.getNumShards()));
p.run();
System.out.println("done");
}
public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
int sum = 0;
int count = 0;
}
public Accum createAccumulator() { return new Accum(); }
public void addInput(Accum accum, String input) {
accum.sum += Integer.parseInt(input);
accum.count++;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
public String extractOutput(Accum accum) {
return Double.toString(((double) accum.sum)/accum.count);
}
}
/**
* Options supported by {@link WordCount}.
* <p>
* Inherits standard configuration options.
*/
public static interface Options extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInput();
void setInput(String value);
@Description("Path of the file to write to")
@Default.InstanceFactory(OutputFactory.class)
String getOutput();
void setOutput(String value);
/**
* Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination.
*/
public static class OutputFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (dataflowOptions.getStagingLocation() != null) {
return GcsPath.fromUri(dataflowOptions.getStagingLocation())
.resolve("sorts.txt").toString();
} else {
throw new IllegalArgumentException("Must specify --output or --stagingLocation");
}
}
}
/**
* By default (numShards == 0), the system will choose the shard count.
* Most programs will not need this option.
*/
@Description("Number of output shards (0 if the system should choose automatically)")
@Default.Integer(1)
int getNumShards();
void setNumShards(int value);
}
}
Любые мысли о том, что будет причиной этого?