2016-06-14 3 views
2

Я обновился до нового потока данных Google версии 1.6, и когда я тестирую на локальном компьютере, я получаю java.lang.IllegalStateException в конце моего конвейера. У меня не было этой проблемы с версией 1.5.1.Почему я получаю java.lang.IllegalStateException в Google Dataflow?

Это не происходит в живой среде только в местном масштабе. Это ошибка новой версии? Нужно ли делать изменения в моем коде, чтобы избежать этих ошибок?

Я приложил часть своего конвейера, чтобы попытаться найти проблему.

private static void getTableRowAndWrite(final PCollection<KV<Integer, Iterable<byte[]>>> groupedTransactions, final String tableName) { 
    // Get the tableRow element from the PCollection 
    groupedTransactions 
      .apply(ParDo 
        .of(((tableName.equals("avail")) ? new GetTableRowAvail() : new GetTableRowReservation())) //Get a TableRow 
        .named("Get " + tableName + " TableRows")) 
      .apply(BigQueryIO 
        .Write 
        .named("Write to BigQuery " + tableName) //Write to BigQuery 
        .withSchema(createTableSchema()) 
        .to((SerializableFunction<BoundedWindow, String>) window -> { 
         String date = window.toString(); 
         String date2 = date.substring(1, 5) + date.substring(6, 8) + date.substring(9, 11); 
         return "travelinsights-1056:hotel." + tableName + "_full_" + (TEST ? "test_" : "") + date2; 
        }) 
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      ); 
} 

Ошибка:

Exception in thread "main" java.lang.IllegalStateException: Cleanup time 294293-06-23T12:00:54.774Z is beyond end-of-time 
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) 
at com.google.cloud.dataflow.sdk.util.ReduceFnRunner.onTimer(ReduceFnRunner.java:642) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advance(BatchTimerInternals.java:134) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advanceInputWatermark(BatchTimerInternals.java:110) 
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:91) 
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) 
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) 
+1

Я считаю, что проблема кроется в другом месте вашего трубопровода. Трассировка стека подразумевает, что у вас есть окно, так что конец окна плюс допустимая задержка превышают максимальную временную метку, допустимую в потоке данных. Готовы ли вы поделиться частью своего конвейера, который помещает метки времени на ваши элементы и помещает их в окна? –

ответ

3

Вы нашли ошибку!

Это было зарегистрировано как BEAM-341, и исправление рассматривается как #464, которое будет перенесено в SDK Dataflow Java сразу после проверки.

Не видя кода, который устанавливает окно, запускает и разрешает опоздание, я не могу быть уверенным, как это влияет на вас. Но есть простой способ обхода, который будет работать, если у вас есть неглобальное окно и очень большая допустимая задержка, так что окно не истекает до «конца времени». В этом случае вы можете обновить свою работу с разрешенной задержкой, которая просто очень велика (например, сотни лет), а не эффективно бесконечна.

Смежные вопросы