Так что я только начал работать со штормом и попытался его понять. Я пытаюсь подключиться к теме kafka, читать данные и записывать их на болт HDFS. Сначала я создал его без shuffleGrouping («stormspout»), и мой пользовательский интерфейс Storm показывал, что носик потреблял данные из этой темы, но ничего не записывалось на болт (за исключением пустых файлов, которые он создавал на HDFS). Затем я добавил shuffleGrouping («stormspout»); и теперь болт, кажется, дает ошибку. Если кто-то может помочь в этом, я буду очень благодарен.Storm HDFS Болт не работает
Спасибо, Колман
Ошибка
2015-04-13 00:02:58 skPartitionManager [INFO] Читайте информацию раздела из:/шторма/partition_0 -> нулевого 2015-04-13 00:02:58 skPartitionManager [INFO] Информация о разделе не найдена, используя конфигурацию для определения смещения 2015-04-13 00:02:58 skPartitionManager [INFO] Последнее снятие фиксации от zookeeper: 0 2015 -04-13 00:02:58 skPartitionManager [INFO] Com mit offset 0 больше, чем 9223372036854775807 сзади, сбрасывается на startOffsetTime = -2 2015-04-13 00:02:58 skPartitionManager [INFO] Запуск Kafka 192.168.134.137:0 из офтальмования 0 2015-04-13 00:02 : 58 skZkCoordinator [INFO] Задача [1/1] Готовое обновление 2015-04-13 00:02:58 bsdtask [INFO] Испушение: stormspout default [colmanblah] 2015-04-13 00:02:58 bsd Исполнитель [INFO] ТРАНСФЕРНЫЙ ТИП ЗАПУСКА: 2 TUPLE: источник: stormspout: 3, stream: default, id: {462820364856350458 = 5573117062061876630}, [colmanblah] 2015-04-13 00:02:58 bsdtask [INFO] Испускание: stormspout __ack_init [462820364856350458 5] 2015-04-13 00:02:58 bsdexecutor [INFO] ПЕРЕХОДНЫЙ кортеж ЗАДАЧА: 1 TUPLE: источник: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 bsdexecutor [INFO] Обработка полученного сообщения FOR 1 TUPLE: source: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00 : 02: 58 bsdexecutor [INFO] BOLT ack TASK: 1 TIME: TUPLE: источник: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 55] [0] 2015-04-13 00:02:58 bsd исполнитель [INFO] Выполнение выполнено TUPLE source: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] ЗАДАЧА: 1 DELTA: 2015-04-13 00:02:59 bsdexecutor [INFO] Готовый болт stormbolt: (2) 2015-04-13 00:02:59 bsdexecutor [INFO] Обработка полученного сообщения FOR 2 TUPLE: source: stormspout: 3, stream: default, id: {462820364856350458 = 5573117062061876630}, [colmanblah]
2015-04-13 00:02:59 b.s.util [ERROR] Асинхронная петля умерла!
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.NullPointerException: null
at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
... 6 common frames omitted
2015-04-08 04:26:39 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.NullPointerException: null
at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
Код:
TopologyBuilder builder = new TopologyBuilder();
Config config = new Config();
//config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 7000);
config.setNumWorkers(1);
config.setDebug(true);
//LocalCluster cluster = new LocalCluster();
//zookeeper
BrokerHosts brokerHosts = new ZkHosts("192.168.134.137:2181", "/brokers");
//spout
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "myTopic", "/kafkastorm", "KafkaSpout");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
builder.setSpout("stormspout", new KafkaSpout(spoutConfig),4);
//bolt
SyncPolicy syncPolicy = new CountSyncPolicy(10); //Synchronize data buffer with the filesystem every 10 tuples
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Rotate data files when they reach five MB
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/stormstuff"); // Use default, Storm-generated file names
builder.setBolt("stormbolt", new HdfsBolt()
.withFsUrl("hdfs://192.168.134.137:8020")//54310
.withSyncPolicy(syncPolicy)
.withRotationPolicy(rotationPolicy)
.withFileNameFormat(fileNameFormat),2
).shuffleGrouping("stormspout");
//cluster.submitTopology("ColmansStormTopology", config, builder.createTopology());
try {
StormSubmitter.submitTopologyWithProgressBar("ColmansStormTopology", config, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
зависимости pom.xml
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Перед тем, как добавить 'shuffleGrouping' в spout & bolt, не было подключено. Как только вы подключите болт, фактически начните обработку данных. вам нужно будет поделиться своим кодом на болт и носик и добавить некоторые следы в болт – Mzf
Да, я думал, что это было то, что происходило, это все мой код? Должен ли я иметь что-то еще? Опять я только начал смотреть на шторм, я до сих пор использую дымоход. Cheers, Colman – Colman
Вы можете запустить штурм в локальном режиме, чтобы вы могли его отлаживать с помощью вашей IDE – Mzf