2015-09-09 2 views
1

Выполнение конвейеров DataFlow, время от времени мы видим эти Исключения. Есть ли что-нибудь, что мы можем с ними сделать? У нас довольно простой поток, который читается из файла в GCS и создает запись на строку во входном файле - примерно 1 миллион строк во входном файле.Исключения из конвейеров Google Cloud Dataflow в Cloud Bigtable

Также что происходит с данными внутри трубопровода? Перерабатывается ли она? Или он потерян в пути к BigTable?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN 
at io.grpc.Status.asRuntimeException(Status.java:428) 
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more 

Есть ли что-нибудь, что мы можем сделать, чтобы упростить наш код?

И сам поток данных довольно просто

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 
options.setMaxNumWorkers(20); 

Pipeline p = Pipeline.create(options); 

CloudBigtableIO.initializeForWrite(p) 
      .apply(TextIO.Read.from(options.getInputFile())) 
      .apply(ParDo.of(new HBaseBigtableWriter(options))); 
p.run(); 

ParDo выглядит следующим образом:

public class HBaseBigtableWriter extends DoFn<String, Void> { 
private Connection conn; 
private BufferedMutator mutator; 
private final CloudBigtableTableConfiguration btConfig; 

public HBaseBigtableWriter(CloudBigtableOptions options) { 
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options); 

@Override 
public void startBundle(DoFn<String, Void>.Context c) throws Exception { 
    super.startBundle(c); 
    conn = new BigtableConnection(btConfig.toHBaseConfig()); 
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId())); 
} 

@Override 
public void processElement(DoFn<String, Void>.ProcessContext c) { 
    Put put = Put(....); 
    //some of based on the input line.. no sideInputs or anything 
    p.addImmutable(...) 
    mutator.mutate(put); //mentioned line in stacktrace 
} 

@Override 
public void finishBundle(DoFn<String, Void>.Context c) throws Exception { 
    try { 
     mutator.close(); 
    } catch (RetriesExhaustedWithDetailsException e) { 
     retriesExceptionAggregator.addValue(1); 
     List<Throwable> causes = e.getCauses(); 
     if (causes.size() == 1) { 
      throw (Exception) causes.get(0); 
     } else { 
      throw e; 

     } 
    } 
    finally { 
     conn.close(); 
     super.finishBundle(c); 
    } 
} 
} 

Кроме того, это один выскакивает каждый сейчас и потом.

java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291] 
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112) 
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398) 
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256) 
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230) 
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180) 
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121) 
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253) 
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168) 
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285) 
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175) 
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162) 
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110) 
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179) 
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69) 
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424) 
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316) 

Также с классами SDK Google это выглядит как то же самое происходит - особенно под нагрузкой - т.е. Dataflow работа 2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times, 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613) 

Любые консультации по этим исключениям? Спасибо!

+0

Закончен с использованием 'CloudBigtableIO.writeToTable()' - и до сих пор не видел эту проблему. –

ответ

1

Закрытие соединения, а затем выполнение мутации может привести к следам стека, которые вы видите (что я угадываю, происходит, когда вы останавливаете работника, в то время как буферизованные мутации все еще ведутся).

Можете ли вы, пожалуйста, открыть ошибку на нашем трекере github? Я думаю, что это самый эффективный способ диагностики этой проблемы. https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

Если я правильно прочитал трассировку стека, похоже, что вы не используете метод CloudBigtableIO.writeToTable() и используете пользовательский ParDo для записи своих данных. Если это так, ответы на ваши вопросы действительно зависят от того, что вы делаете в своем настраиваемом ParDo, а также о динамике «остановки рабочего».

+0

обновил вопрос. Что-нибудь, что мы делаем концептуально неправильно? С другой стороны, реализован конвейер с 'CloudBigtableIO.writeToTable()' и запустить несколько тестов, чтобы проверить, не исчезло ли исключение. –

+0

Если бы удалить тег ответа, как и в SDK Google, мы снова увидели ошибку. См. Обновленный вопрос –

+0

https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/478 как последующий –

Смежные вопросы