1

Я встречаюсь с этой ошибкой при работе штормовой топологии в локальном режиме. У меня есть простая программа, которая проверяет, является ли число простым или нет. Я использую KafkaSpout как источник и шторм для его обработки. Кафка версия 2.10-0.8.2.1 шторм версия 0.9.4 Zookeeper 3.4.6java.lang.RuntimeException: java.lang.ClassCastException: [B не может быть добавлено в java.lang.String

Ниже мой болт, который проверяет простое число

public class PrimeNumberBolt extends BaseRichBolt 
{ 
    private static final long serialVersionUID = 1L; 
    private OutputCollector collector; 



    public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
    { 
     this.collector = collector; 
    } 

    public void execute(Tuple tuple) 
    { 
     //System.out.println(tuple.getFields()); 
     //System.out.println(tuple.getString(0)); 
     String num = tuple.getString(0); 
     //int number = tuple.getInteger(0); 
     int number = Integer.parseInt(num); 
     //System.out.println("IN Primenumber bolt = "+number); 

     if(isPrime(number)) 
     { 
      System.out.println(number); 

     } 
     collector.ack(tuple); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    { 
     declarer.declare(new Fields("number")); 
    } 

    private boolean isPrime(int n) 
    { 
     if(n == 1) 
     { 
      return false; 
     } 
     if(n == 2 || n == 3) 
     { 
      return true; 
     } 

     // Is n an even number? 
     if(n % 2 == 0) 
     { 
      return false; 
     } 

     //if not, then just check the odds 
     for(int i=3; i*i<=n; i+=2) 
     { 
      if(n % i == 0) 
      { 
       return false; 
      } 
     } 
     return true; 
    } 
} 

Ошибка:

**18156 [Thread-11-prime] ERROR backtype.storm.util - Async loop died! 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String 
    **at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4] 
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]** 
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4] 
    ... 6 common frames omitted 
**18158 [Thread-11-prime] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 
**Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String** 
    at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4] 
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na] 
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4] 
    ... 6 common frames omitted 
18375 [Thread-11-prime] ERROR backtype.storm.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 
    at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4] 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4] 
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 

Пожалуйста, предложите мне изменения, которые мне нужно сделать в коде. Спасибо в Advance !!

+0

какой tuple.getString (0); возвращается? –

+1

возвращает строку. В моем случае в кластере kafka данные хранятся как строки (например, 232,12 и т. Д.). Я читаю и разбираю его в int. –

ответ

4

Кажется, что ваш кафковый носик считывает данные в формате байтовой матрицы.

Попробуйте использовать String Scheme, установив spoutconfig.scheme, как показано ниже.

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
+1

Это мой носок кафки ________________________ частный статический KafkaSpout buildKafkaSentenceSpout() { String zkHostPort = "localhost: 2181"; Тема строки = "cust3"; Строка zkRoot = "/ prime"; Строка zkSpoutId = "prime-spout"; ZkHosts zkHosts = новый ZkHosts (zkHostPort); SpoutConfig.scheme = new SchemeAsMultiScheme (новый StringScheme()); SpoutConfig spoutCfg = new SpoutConfig (zkHosts, topic, zkRoot, zkSpoutId); KafkaSpout kafkaSpout = новый KafkaSpout (spoutCfg); возвращение kafkaSpout; } –

+1

Я добавил его. Но он бросает ошибку, поскольку не может сделать статическую ссылку на нестатическое поле SpoutConfig.scheme –

+1

В вашем случае я думаю, что вы можете объявить наконец, как spoutCfg.scheme = new SchemeAsMultiScheme (новый StringScheme()); –

Смежные вопросы