2015-05-15 3 views
9

Недавно я познакомился с Apache Kafka и получил рабочий пример производителя-потребителя.Рабочий пример KafkaSpout

Следующий шаг - интегрировать Kafka с Spout и Bolt, и мне трудно найти доступные примеры (они в основном старые), работающие локально.

Я получил следующий пример рабочей штормовой книги/examples-ch02-get_started, которая считывает данные из локального текстового файла.

В том же репо есть пример для штормовой книги/примеров-ch04-spouts kafka-spout, но я не могу заставить его работать.

Я попробовал следующий пример, а cep.kafka, но получил следующее error-

5034 [Thread-11-words] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting 
5047 [Thread-11-words] ERROR backtype.storm.util - Async loop died! 
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V 
     at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na] 
     at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na] 
     at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] 
     at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] 
     at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] 
     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] 
     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05] 
5049 [Thread-11-words] ERROR backtype.storm.daemon.executor - 
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V 
     at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na] 
     at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na] 
     at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na] 
     at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] 
     at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] 
     at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] 
     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] 
     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05] 
5088 [Thread-11-words] ERROR backtype.storm.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
     at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] 
     at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 
     at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4] 
     at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4] 
     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4] 
     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05] 

ответ

10

Когда у меня была такая же проблема обучения, как запустить создать и запустить Кафка носиком, я нашел это Github repo очень полезно, и я смог забрать свои кепки KafkaSpout на остальные болты.

Это образец высокого уровня по созданию моей топологии для этого.

public class TestTopology { 

    public static void main(String[] args) { 

     String zkIp = "192.168.59.103"; 

     String nimbusHost = "192.168.59.103"; 

     String zookeeperHost = zkIp +":2181"; 

     ZkHosts zkHosts = new ZkHosts(zookeeperHost); 

     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "myKafkaTopic", "", "storm"); 

     kafkaConfig.scheme = new SchemeAsMultiScheme(new JsonScheme() { 
      @Override 
      public Fields getOutputFields() { 
       return new Fields("events"); 
      } 
     }); 

     KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 

     TopologyBuilder builder = new TopologyBuilder(); 

     builder.setSpout("eventsEmitter", kafkaSpout, 8); 

     builder.setBolt("eventsProcessor", new RollingCountBolt(2, 1), 8) 
       .fieldsGrouping("requestsEmitter", new Fields("request")); 

     //More bolts stuffzz 

     Config config = new Config(); 

     config.setMaxTaskParallelism(5); 
     config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); 
     config.put(Config.NIMBUS_HOST, nimbusHost); 
     config.put(Config.NIMBUS_THRIFT_PORT, 6627); 
     config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
     config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp)); 

     try { 
      StormSubmitter.submitTopology("my-topology", config, builder.createTopology()); 
     } catch (Exception e) { 
      throw new IllegalStateException("Couldn't initialize the topology", e); 
     } 
    } 

} 

Надеется, что это помогает,

Хосе Луис

+0

Привет Jose, Большой вам спасибо за ссылку на repo..i может запустить примеры формирования там успешно. – user3798920

+0

Рад, что это помогло! – jbarrueta

+0

Когда я использую LocalCluster, данные потребляются блотами и кафкой. но когда я использую данные StormSubmitter, Kafka не потребляется. Я нажимаю данные в kafka с помощью консольного производителя Kafka и использую Storm vs 1.0.3 –

Смежные вопросы