2016-01-21 7 views
1

Я получаю NullPointerException с помощью метода backtype.storm.utils.DisruptorQueue.consumeBatchToCursor при запуске моей топологии, в частности в болте. Резервуары выполнены надлежащим образом.NullPointerException in Storm при запуске топологии

Устранение неисправностей Storm page говорит, что это может быть связано с несколькими методами выдачи потоков на OutputCollector. Тем не менее, я не вижу, что это касается моего дела.

Вот код для носика:

(defspout stub-spout ["stub-spout"] 
    [conf context collector] 
    (spout 
    (nextTuple [] 
     (let [channel-value (<!! storm-async-channel)] 
     (emit-spout! collector [channel-value]))) 
    (ack [id] 
    )))) 

и для болта:

(defbolt stub-bolt ["stub-bolt"] [tuple collector] 
    (println "Invocation!") 
    (let [obj (get tuple "object") 
     do-some-calculations (resolve 'calclib/do-some-calculations) 
     new-obj (do-some-calculations obj)] 
    (emit-bolt! collector new-obj))) 

После некоторого исследования выяснилось, что вызов resolve возвращает нулевое значение (я должен resolve во время выполнения поскольку некоторые вычисления происходят в макросе, расположенном в calclib).

Код работает правильно в локальном кластере. Почему это происходит?

Будем благодарны за любые предложения. Спасибо!

ответ

1

Я думаю, что нашел решение. Определение болта изменяется на подготовленный болт:

(defbolt stub-bolt ["stub-bolt"] 
    {:prepare true} 
    [conf context collector] 
    (let [f (load "/calclib/core") 
     do-some-calculations (resolve 'calclib/do-some-calculations)] 
    (bolt 
     (execute [tuple] 
     (let [obj (get tuple "object") 
       new-obj (do-some-calculations obj)] 
      (emit-bolt! collector new-obj)))))) 

Ключа вызов load. Интересно, есть ли более элегантный подход.

Смежные вопросы