1

Выполнение конвейеров DataFlow, время от времени мы видим эти Исключения. Мы можем что-то сделать с ними? У нас довольно простой поток, который считывает данные из запроса BigQuery и заполняет данные в BigTable.Исключения из конвейеров Google Cloud Dataflow от BigQuery до Cloud Bigtable

Также что происходит с данными внутри трубопровода? Перерабатывается ли она? Или он потерян в пути к BigTable?

CloudBigtableIO.initializeForWrite(p); 
    p.apply(BigQueryIO.Read.fromQuery(getQuery())) 
    .apply(ParDo.of(new DoFn<TableRow, Mutation>() { 
      public void processElement(ProcessContext c) { 
      Mutation output = convertDataToRow(c.element()); 
      c.output(output); 
      } 

      })) 
     .apply(CloudBigtableIO.writeToTable(config)); 


private static Mutation convertDataToRow(TableRow element) { 
    LOG.info("element: "+ element); 
    LOG.info("BASM_segment_id: "+ element.get("BASM_segment_id")); 
    if(element.get("BASM_AID") != null){ 
     Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes()); 
       obj.addColumn(USER_FAMILY, "AID".getBytes(), ((String)element.get("BASM_AID")).getBytes()); 
     if(element.get("BASM_segment_id") != null){ 
       obj.addColumn(SEGMENT_FAMILY, "segment_id".getBytes(), ((String)element.get("BASM_segment_id")).getBytes()); 
     } 
     if(element.get("BAS_sub_category") != null){ 
       obj.addColumn(SEGMENT_FAMILY, "sub_category".getBytes(), ((String)element.get("BAS_sub_category")).getBytes()); 
     } 
     if(element.get("BAS_name") != null){ 
       obj.addColumn(SEGMENT_FAMILY, "name".getBytes(), ((String)element.get("BAS_name")).getBytes()); 
     } 
     if(element.get("BAS_description") != null){ 
       obj.addColumn(SEGMENT_FAMILY, "description".getBytes(), ((String)element.get("BAS_description")).getBytes()); 
     } 
     if(element.get("BAS_last_compute_day") != null){obj.addColumn(USER_FAMILY, "Krux_User_id".getBytes(), ((String)element.get("BASM_krux_user_id")).getBytes()); 
       obj.addColumn(SEGMENT_FAMILY, "last_compute_day".getBytes(), ((String)element.get("BAS_last_compute_day")).getBytes()); 
     } 
     if(element.get("BAS_type") != null){ 
       obj.addColumn(SEGMENT_FAMILY, "type".getBytes(), ((String)element.get("BAS_type")).getBytes()); 
     }  
     if(element.get("BASM_REGID") != null){ 
       obj.addColumn(USER_FAMILY, "REGID".getBytes(), ((String)element.get("BASM_REGID")).getBytes()); 
     } 
     return obj; 
    }else{ 
     return null; 
    } 
    } 

Ниже приводится исключение, которое мы получаем:

2016-08-22T21:47:33.469Z: Error: (84707221e08b977b): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeExc ption: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsExcept on: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Вызванное:

org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtabl IO.java:966)

Исключения скопированного из Dataflow консоли

(7e75740160102c05): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtableIO.java:966) 

2016-08-23 (13:17:54) java.lang.Runti meException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud. dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop ... . 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java. lang.RuntimeException: com.google.cloud.dataflow.sdk.util.User CodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop. ... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08- 23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com. google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk. util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13 : 17: 54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang .RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud .dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException : org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .. .. 2016-08-23 (13:17:54) java.lang.Run timeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud. dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016- 08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17: 54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop .... 2016-08-23 (13:17:54) java.lang.RuntimeException:

Спасибо заранее, что

+0

Какую версию клиента вы используете? 0.9.1? –

+0

@ LesVogel-GoogleDevRel да, мы используем версию 0.9.1 для bigtable-hbase-dataflow – Amandeep

+0

Я просил кого-то в технике комментировать - это должно быть позже сегодня. –

ответ

1

Мы разговаривали в автономном режиме. Проблема здесь в том, что у вас слишком много сотрудников Dataflow по сравнению с количеством узлов Cloud Bigtable в вашем кластере. Вы должны изменить это соотношение либо reducing Dataflow workers, либо связаться с нашей командой, чтобы увеличить ресурсы Cloud Bigtable.

Bigtable превосходно работал с количеством облачных Bigtable Nodes, которые у вас были, но загрузка из Dataflow была слишком высокой для надежной обработки.

Вы можете просмотреть свое использование на графике "CPU Usage" в консоли Google Cloud. Все, что может превысить 80% ваших возможностей, может вызвать проблемы. Если вы получите больше Bigtable Quota, вы можете увеличить количество узлов, которые у вас есть до запуска задания Dataflow, и уменьшить его после выполнения задания. Например, Scio does that.

==

Что касается «? Кроме того, что происходит с данными внутри трубопровода ли переработаны Или он потерял в пути к BigTable?»:

Dataflow пытается отправить данные Bigtable снова. В таких случаях механизм повторных запросов Dataflow будет исправлять временные проблемы.

К сожалению, когда проблема оказывается перегружаемой облачной большой таблицей, повторные попытки осложняют проблему, отправляя больше трафика на Bigtable, тем самым усугубляя проблему.

+0

Спасибо @Solomon Duskis, я сократил количество работников, которых я не вижу исключение, которое происходит раньше, но, очевидно, моя работа теперь занимает больше времени, чтобы закончить. – Amandeep

+0

Yup. Сокращение рабочих сделает это. Я быстро просмотрел ваши диаграммы, и похоже, вы можете добавить еще пару человек. В качестве альтернативы вы можете попросить нас увеличить свою квоту и добавить больше узлов Bigtable перед началом потока данных и уменьшить число после завершения. Чем больше Bigtable узлов, тем больше пропускной способности вы можете сделать, и тем быстрее ваша работа будет завершена. –

Смежные вопросы