2015-04-08 3 views
0

Так что я только начал работать со штормом и попытался его понять. Я пытаюсь подключиться к теме kafka, читать данные и записывать их на болт HDFS. Сначала я создал его без shuffleGrouping («stormspout»), и мой пользовательский интерфейс Storm показывал, что носик потреблял данные из этой темы, но ничего не записывалось на болт (за исключением пустых файлов, которые он создавал на HDFS). Затем я добавил shuffleGrouping («stormspout»); и теперь болт, кажется, дает ошибку. Если кто-то может помочь в этом, я буду очень благодарен.Storm HDFS Болт не работает

Спасибо, Колман

Ошибка

2015-04-13 00:02:58 skPartitionManager [INFO] Читайте информацию раздела из:/шторма/partition_0 -> нулевого 2015-04-13 00:02:58 skPartitionManager [INFO] Информация о разделе не найдена, используя конфигурацию для определения смещения 2015-04-13 00:02:58 skPartitionManager [INFO] Последнее снятие фиксации от zookeeper: 0 2015 -04-13 00:02:58 skPartitionManager [INFO] Com mit offset 0 больше, чем 9223372036854775807 сзади, сбрасывается на startOffsetTime = -2 2015-04-13 00:02:58 skPartitionManager [INFO] Запуск Kafka 192.168.134.137:0 из офтальмования 0 2015-04-13 00:02 : 58 skZkCoordinator [INFO] Задача [1/1] Готовое обновление 2015-04-13 00:02:58 bsdtask [INFO] Испушение: stormspout default [colmanblah] 2015-04-13 00:02:58 bsd Исполнитель [INFO] ТРАНСФЕРНЫЙ ТИП ЗАПУСКА: 2 TUPLE: источник: stormspout: 3, stream: default, id: {462820364856350458 = 5573117062061876630}, [colmanblah] 2015-04-13 00:02:58 bsdtask [INFO] Испускание: stormspout __ack_init [462820364856350458 5] 2015-04-13 00:02:58 bsdexecutor [INFO] ПЕРЕХОДНЫЙ кортеж ЗАДАЧА: 1 TUPLE: источник: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 bsdexecutor [INFO] Обработка полученного сообщения FOR 1 TUPLE: source: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00 : 02: 58 bsdexecutor [INFO] BOLT ack TASK: 1 TIME: TUPLE: источник: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 55] [0] 2015-04-13 00:02:58 bsd исполнитель [INFO] Выполнение выполнено TUPLE source: stormspout: 3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] ЗАДАЧА: 1 DELTA: 2015-04-13 00:02:59 bsdexecutor [INFO] Готовый болт stormbolt: (2) 2015-04-13 00:02:59 bsdexecutor [INFO] Обработка полученного сообщения FOR 2 TUPLE: source: stormspout: 3, stream: default, id: {462820364856350458 = 5573117062061876630}, [colmanblah]

2015-04-13 00:02:59 b.s.util [ERROR] Асинхронная петля умерла!

  java.lang.RuntimeException: java.lang.NullPointerException 
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] 
      Caused by: java.lang.NullPointerException: null 
        at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        ... 6 common frames omitted 
      2015-04-08 04:26:39 b.s.d.executor [ERROR] 
      java.lang.RuntimeException: java.lang.NullPointerException 
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] 
      Caused by: java.lang.NullPointerException: null 
        at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041] 

Код:

TopologyBuilder builder = new TopologyBuilder();  
    Config config = new Config(); 
    //config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 7000); 
    config.setNumWorkers(1); 
    config.setDebug(true); 
    //LocalCluster cluster = new LocalCluster(); 


    //zookeeper 
    BrokerHosts brokerHosts = new ZkHosts("192.168.134.137:2181", "/brokers");  

    //spout 
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "myTopic", "/kafkastorm", "KafkaSpout"); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    spoutConfig.forceFromStart = true; 
    builder.setSpout("stormspout", new KafkaSpout(spoutConfig),4); 

    //bolt 
    SyncPolicy syncPolicy = new CountSyncPolicy(10); //Synchronize data buffer with the filesystem every 10 tuples 
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Rotate data files when they reach five MB 
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/stormstuff"); // Use default, Storm-generated file names 
    builder.setBolt("stormbolt", new HdfsBolt() 
           .withFsUrl("hdfs://192.168.134.137:8020")//54310 
           .withSyncPolicy(syncPolicy) 
           .withRotationPolicy(rotationPolicy) 
           .withFileNameFormat(fileNameFormat),2 
        ).shuffleGrouping("stormspout");   


    //cluster.submitTopology("ColmansStormTopology", config, builder.createTopology());  

    try { 
     StormSubmitter.submitTopologyWithProgressBar("ColmansStormTopology", config, builder.createTopology()); 

    } catch (AlreadyAliveException e) { 
     e.printStackTrace(); 
    } catch (InvalidTopologyException e) { 
     e.printStackTrace(); 
    } 

зависимости pom.xml

   <dependencies> 
      <dependency> 
       <groupId>org.apache.storm</groupId> 
       <artifactId>storm-core</artifactId> 
       <version>0.9.3</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.storm</groupId> 
       <artifactId>storm-kafka</artifactId> 
       <version>0.9.3</version> 
      </dependency> 
      <dependency> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
       <version>1.2.17</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.storm</groupId> 
       <artifactId>storm-hdfs</artifactId> 
       <version>0.9.3</version> 
      </dependency> 
        <dependency> 
         <groupId>org.apache.kafka</groupId> 
         <artifactId>kafka_2.10</artifactId> 
         <version>0.8.1.1</version> 
         <exclusions> 
          <exclusion> 
           <groupId>log4j</groupId> 
           <artifactId>log4j</artifactId> 
          </exclusion> 
          <exclusion> 
           <groupId>org.slf4j</groupId> 
           <artifactId>slf4j-simple</artifactId> 
          </exclusion> 
         </exclusions> 
        </dependency> 
       </dependencies> 
+0

Перед тем, как добавить 'shuffleGrouping' в spout & bolt, не было подключено. Как только вы подключите болт, фактически начните обработку данных. вам нужно будет поделиться своим кодом на болт и носик и добавить некоторые следы в болт – Mzf

+0

Да, я думал, что это было то, что происходило, это все мой код? Должен ли я иметь что-то еще? Опять я только начал смотреть на шторм, я до сих пор использую дымоход. Cheers, Colman – Colman

+0

Вы можете запустить штурм в локальном режиме, чтобы вы могли его отлаживать с помощью вашей IDE – Mzf

ответ

0

Прежде всего попытаться испускают значения из метода выполнения, если вы испуская из разных рабочий поток, затем пусть все рабочие потоки будут подавать данные в LinkedBlockingQueue и только один рабочий thr ead позволит испускать значения из LinkedBlockingQueue.

Во-вторых, попробуйте установить Config.setMaxSpoutPending на некоторое значение и снова попробуйте запустить код и проверьте, не перестает ли сценарий уменьшать это значение.

Ссылка - Config.TOPOLOGY_MAX_SPOUT_PENDING: задает максимальное количество кортежей-носиков, которые могут быть отложены одновременно на одной задаче на носик (в ожидании означает, что кортеж еще не был отменен или не был завершен). Настоятельно рекомендуется установить эту конфигурацию для предотвращения взрыва очереди.

+0

Я установил max_spout_pending, и это не повлияло, я ничего не испускаю, поскольку все это делается в рамках. Вы говорите о том, чтобы преодолеть метод исполнения KafkaSpout? Я действительно чувствую, что это несоответствие версии каким-то образом. Может быть, какой-то файл jar тянет неправильную зависимость или, может быть, мой pom, xml ошибочен? Я добавил в зависимости от pom-файлов выше, если бы вы могли взглянуть? – Colman

+0

Я включил блок ошибок перед ошибкой, может ли кто-нибудь помочь с этим ?! – Colman

0

В итоге я понял это, пройдя исходный код шторма.

Я не заходило

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); 

и включая его, как

builder.setBolt("stormbolt", new HdfsBolt() 
             .withFsUrl("hdfs://192.168.134.137:8020")//54310 
           .withSyncPolicy(syncPolicy) 
           .withRecordFormat(format) 
           .withRotationPolicy(rotationPolicy) 
           .withFileNameFormat(fileNameFormat),1 
        ).shuffleGrouping("stormspout");  

В классе HDFSBolt.Java, он пытается использовать это и в основном падает, если его не установлена. Именно здесь и появился NPE.

Надеюсь, это поможет кому-то еще, убедитесь, что вы установили все биты, которые требуются в этом классе. Более полезное сообщение об ошибке, такое как «RecordFormat not set», было бы неплохо.

Смежные вопросы