2016-10-04 3 views
1

Я боролся с заданием, которое должно выделять побочные выходы, поскольку я продолжаю получать исключение («неспособный сериализовать xxx»).Не удалось запустить задание с помощью OutputTags

Даже если я явно указан кодер для типа я работаю с, я продолжал получать ту же ошибку, поэтому я решил написать простую работу после этой документации:

https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs

К моему удивлению, , Я все равно получаю такое же исключение, и теперь я подозреваю, что я, должно быть, сделал что-то не так (но я не могу понять это сам). Что касается кода, я попытался следовать приведенному выше примеру.

Ниже я отправляю исходный код, а также сообщение об ошибке. Я получаю, когда запускаю его. Я считаю, что это воспроизводимо (измените «GCS_BUCKET» на любое ваше ведро и создайте метод main(), который вызывает «TestSideOutput» с помощью args), и будет хорошо знать, сможет ли кто-то другой воспроизвести их на своем конце. Мы используем JDK 8 и Dataflow SDK 1.7.0.

Обратите внимание, что пример в приведенной выше документации использует анонимный класс, расширяющий DoFn, который я также пытался, но получил сообщение об ошибке такого же типа; приведенный ниже код реорганизует этот класс в именованный внутренний класс («Фильтр»).

Я также попытался инициализировать TupleTags без фигурных скобок ("{}") - потому что это фактически дает предупреждение - что приводит к исключению (см. Последний фрагмент кода в этом сообщении).

Вот код, который я использовал:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.values.PCollectionTuple; 
import com.google.cloud.dataflow.sdk.values.TupleTag; 
import com.google.cloud.dataflow.sdk.values.TupleTagList; 
import com.moloco.dataflow.DataflowConstants; 

public class TestSideOutput { 
    private TestOptions options; 
    private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name 

    public TestSideOutput(String[] args) { 
    options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class); 
    options.setProject(DataflowConstants.PROJCET_NAME); 
    options.setStagingLocation(DataflowConstants.STAGING_BUCKET); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setJobName(options.getJob() + "-test-sideoutput"); 
    } 

    public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>() {}; 
    final TupleTag<String> sideTag = new TupleTag<String>() {}; 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = 
     profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.apply(
     TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.apply(
     TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

    static class Filter extends DoFn<String, String> { 
    private static final long serialVersionUID = 0; 
    final TupleTag<String> sideTag; 
    String keyword; 

    public Filter(String keyword, TupleTag<String> sideTag) { 
     this.sideTag = sideTag; 
     this.keyword = keyword; 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     String profile = c.element(); 
     if (profile.contains(keyword)) { 
     c.output(profile); 
     } else { 
     c.sideOutput(sideTag, profile); 
     } 
    } 
    } 
} 

И это команда, которую я использовал, и ошибка/исключение, которое я получил (это как раз содержит несколько аргументов командной строки, которые мы используем для нашего потока данных пакета, ничего особенного здесь, но только, чтобы дать вам идею):

dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01 
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41) 
     at com.moloco.dataflow.Main.main(Main.java:152) 
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) 
     ... 6 more 

Кроме того, я не думаю, что это уместно, но код для класса «» TestOptions:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.Description; 
import com.google.cloud.dataflow.sdk.options.Validation; 

public interface TestOptions extends DataflowPipelineOptions { 
    @Description("Job") 
    @Validation.Required 
    String getJob(); 

    void setJob(String value); 

    @Description("Job suffix") 
    String getJobSuffix(); 

    void setJobSuffix(String value); 

    @Description("Date") 
    @Validation.Required 
    String getDate(); 

    void setDate(String value); 
} 

Наконец, если бы я удалил фигурные скобки «{}» при создании экземпляра TupleTags, вместо этого я получил бы следующее исключение (и я нашел предложения по Stackoverflow, которые я должен создать с помощью «{}», чтобы избежать такого рода выпуск):

Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes: 
    No Coder has been manually specified; you may do so using .setCoder(). 
    Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible. 
    Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) 
     at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
     at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50) 
     at com.moloco.dataflow.Main.main(Main.java:152) 

Edit: Смотрите ответ ниже для решения этого путем выполнения() 'статическими'.

Код ниже напоминает то, что я изначально опубликовал, с двумя изменениями: Если возможно, я указываю явно (и избыточно) 'coder' снова для каждого PCollection. Кроме того, при создании экземпляров TupleTags нет фигурных скобок.

Обратите внимание, что подход (статический и этот избыточный подход) более уместен.

public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>(); 
    final TupleTag<String> sideTag = new TupleTag<String>(); 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of()) 
     .apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain") 
     .to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide") 
     .to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

ответ

2

ошибка вы получаете, потому что ваш Filter сноски ссылается на TupleTag, что, в свою очередь, (потому что это не статический анонимный класс экземпляра из не-статической функции execute()) ссылается вмещающие TestSideOutput.

Таким образом, конвейер пытается сериализовать объект TestSideOutput, и он не является сериализуемым - о чем свидетельствует сообщение: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput.

Основной причиной является то, что метод execute() не является статическим. Создание статики должно решить проблему.

+0

Действительно, что я предложил исправить проблему, с которой я столкнулся. Спасибо! С другой стороны, у нас есть другое задание, которое имеет нестатический метод execute(), из которого мы применяем ParDo с боковыми выходными тегами, и он не генерирует исключение (и отчасти поэтому я написал пример кода выше , как мне показалось странным). Я не могу опубликовать эту стоимость в это время, но мне интересно, есть ли другой способ исправить эту проблему, не делая метод execute() статическим? –

+0

Я как-то ответил на мой вопрос о последующих действиях (см. Добавленный фрагмент кода в конце моего отредактированного вопроса). Кажется возможным сохранить execute() как нестатический, путем явного и избыточного объявления кодеров, когда это возможно. –

Смежные вопросы