2016-04-20 3 views
0

Компонента для тестирования являетсямногопоточность грозового Болт параллелизм

Кафки Procucer, который считывает файл из машины, файл состоит из 1000 строк.

 String sCurrentLine; 
     br = new BufferedReader(new FileReader("D:\\jsonLogTest.txt")); 
     while ((sCurrentLine = br.readLine()) != null) { 
      //System.out.println(sCurrentLine); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sCurrentLine); 
      producer.send(message); 
     } 

Сторм Потребитель с тремя болтами, BoltOne предполагается получить поток и разделить его на два различных Streams (Stream1 & stream2). BoltTwo и BoltThree должны подписаться на эти потоки. (Проще говоря, я ищу, чтобы обработать кортеж в BoltOne Парли как Bolt2 обрабатывает первые 500 строк и BolltThree последние 500 строк.

Топология

builder.setSpout("line-reader-spout",kafkaSpout,1); 
     builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout"); 
     builder.setBolt("bolt-two", new BoltTwo(),1).shuffleGrouping("bolt-one","stream1"); 
     builder.setBolt("bolt-three", new BoltThree(),1).shuffleGrouping("bolt-one","stream2"); 

BoltOne

collector.emit("stream1", new Values(input.getString(0))); 
      collector.emit("stream2", new Values(input.getString(0))); 
     x++;System.out.println("" + x); 
     collector.ack(input); 

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
     // TODO Auto-generated method stub 
     outputFieldsDeclarer.declareStream("stream1", new Fields("field1")); 
     outputFieldsDeclarer.declareStream("stream2", new Fields("field2")); 
    } 

BoltTwo & BoltThree

public void execute(Tuple input) { 
     String sentence = input.getString(0); 
     System.out.println("*********B2*************"); 

    } 

StackTrace

*********B2************* 
1 
*********B3************* 
2 
*********B2************* 
*********B3************* 
3 
*********B3************* 
*********B2************* 
4 
*********B3************* 
*********B2************* 
5 
*********B2************* 
*********B3************* 
6 
*********B2************* 
*********B3************* 
7 
*********B3************* 
*********B2************* 

Совершенно путают с расщепляющимися потоками и параллелизмом. Пример был бы полезен.

Обновлено Решение я придумал сейчас:

public void execute(Tuple input) { 
     @SuppressWarnings("unused") 
     String sentence = input.getString(0); 
     if (x%2==0) { 
      collector.emit("stream1", new Values(input.getString(0))); 
     } 
     else{ 
      collector.emit("stream2", new Values(input.getString(0))); 
     } 

     x++; 
     collector.ack(input); 
    } 

Я просто разделить поток на четный-нечетный основе, и время процесса становится наполовину, В то время как BoltTwo обрабатывает кортеж другой обрабатывается от BoltThree.

+0

вы прошли тщательный [это] (http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism -of-а-бури-topology.html)? – user2720864

+0

Достигнутые это как-то: общественной пустоте выполнения (вход Кортеж) { \t \t @SuppressWarnings ("неиспользуемый") \t \t Строка предложение = input.getString (0); \t \t, если (х% 2 == 0) { \t \t \t collector.emit ("Stream1", новые значения (input.getString (0))); \t \t} \t \t еще { \t \t \t collector.emit ("stream2", новые значения (input.getString (0))); \t \t} \t \t \t \t \t х ++; \t \t collector.ack (ввод); \t} –

ответ

0

Я предполагаю, что вы запускаете все, используя LocalCluster. Поскольку выполняется несколько потоков, выход через println(...) не синхронизирован, и внутренняя буферизация может испортить порядок вывода ... Таким образом, материал, который вы видите в ненадежном состоянии, сохраняется только в пределах одного желоба/болта ,

Кроме того, каково поведение, которое вы хотите получить?

Прямо сейчас, у вас есть

Spout => Bolt1 =+=> Bolt2 
       +=> Bolt3 

Т.е., выход Bolt1 дублируется и Bolt2 и Bolt3 и получать все выходной кортеж из Bolt1. Таким образом, Bolt1 рассчитывается от 1 до 7, и каждый выходной набор Bolt1 запускает execute() Bolt2 и Bolt3.

Поскольку Bolt2 и Bolt3 делают то же самое, я думаю, вы хотите иметь две копии одного и того же болта и разделить вход на оба. Для этого нужно только добавить один болт и установить параллелизм 2:

builder.setSpout("line-reader-spout",kafkaSpout,1); 
builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout"); 
builder.setBolt("bolt-two", new BoltTwo(),2).shuffleGrouping("bolt-one","stream1"); 

Кроме того, Bolt1 нужно только объявить один выходной поток (а не два). Если вы объявляете несколько выходных потоков и записываете их в оба, вы реплицируете данные ...

Смежные вопросы