2013-02-16 3 views
0

Задача переполнения стека здесь ... попытается включить как можно больше деталей.Возможно, Flume Avro подключен к серверу Node.js?

Я пытаюсь получить данные журнала ловушек Apache, переданные через приемник Avro на сервер Node.js, прислушиваясь к определенному порту. Я намерен использовать Collective Media's node-avro library, чтобы помочь с сериализацией между двоичным форматом Avro и JSON, поэтому я могу работать с данными в Node.js (я передаю его клиентам через socket.io pub/sub).

Я уверен, что у меня есть Flume, правильно настроенный, так как я вижу данные, проходящие через канал и выводящиеся на консоль (только для отладки, у меня также есть данные для консоли). Когда я включаю Avro мойку и воспитывать сервер Node.js, который прослушивает тот же самый порт, хотя, Flume бросает исключение, когда он пытается сделать Avro перевод:

2013-02-15 22:06:09,858 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to send events 
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:325) 
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) 
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Failed to send batch 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236) 
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:309) 
    ... 3 more 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Exception thrown from remote handler 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224) 
    ... 4 more 
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: NettyTransceiver closed 
    at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310) 
    ... 6 more 
Caused by: java.io.IOException: NettyTransceiver closed 
    at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338) 
    at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59) 
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:236) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:623) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:101) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38) 
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 
2013-02-15 22:06:14,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] Avro sink k1: Building RpcClient with hostname: 127.0.0.1, port: 4242 

То, что я не уверен, , как выяснить, если моя служба Node.js по крайней мере получает сообщение. Я довольно новыми для Node.js, так что не поможет, но вот фрагмент кода, который настраивает слушателя:

var flumeSink = require('http').createServer(flumeHandler); 
flumeSink.listen(8000); 
function flumeHandler (req, res) { 
    console.log("Got it!"); 
    //var schema = avro.prepareSchema("string"); 
    //var buffer = schema.encode("foo"); 
    //var value = schema.decode(buffer); 
} 

Я думаю, я создал сторону Node.js неправильно. Я использую HTTP-модуль, который, возможно, не является правильным модулем. Возможно, мне нужно рассмотреть возможность создания пользовательской раковины в Node.js? Указатели/помощь оценены!

+0

Читайте больше об этом, я вижу, что Avro работает через RPC, поэтому я думаю, что HTTP-вещь действительно ошибочна. – ErikB

+0

Попробовал заменить http-прослушиватель на dnode, но получил ту же ошибку. – ErikB

ответ

0

Возможно, avro-раковина - это не то, что вам нужно в этом случае, поскольку оно предназначено для передачи Flume в Flume (именно так вы строите топологию, связанную с потоком).

Если вы хотите создать раковину, которая не входит в стандартный список, вам необходимо создать пользовательскую раковину и использовать настраиваемую конфигурацию, как определено в https://flume.apache.org/FlumeUserGuide.html#custom-sink , что я пробовал, и он отлично работает.

или использовать что-то, что существует:

https://github.com/josealvarezmuguerza/flume-http-sink

я никогда не использовал этот модуль. просто пришел с поиском google.

для части Avro, просто используйте морфины для преобразования источника в avro, а затем отправляйте каждое событие на ваш сервер node.js.

надеюсь, что это даст немного света.

go код!