Компонента для тестирования являетсямногопоточность грозового Болт параллелизм
Кафки Procucer, который считывает файл из машины, файл состоит из 1000 строк.
String sCurrentLine;
br = new BufferedReader(new FileReader("D:\\jsonLogTest.txt"));
while ((sCurrentLine = br.readLine()) != null) {
//System.out.println(sCurrentLine);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sCurrentLine);
producer.send(message);
}
Сторм Потребитель с тремя болтами, BoltOne предполагается получить поток и разделить его на два различных Streams (Stream1 & stream2). BoltTwo и BoltThree должны подписаться на эти потоки. (Проще говоря, я ищу, чтобы обработать кортеж в BoltOne Парли как Bolt2 обрабатывает первые 500 строк и BolltThree последние 500 строк.
Топология
builder.setSpout("line-reader-spout",kafkaSpout,1);
builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
builder.setBolt("bolt-two", new BoltTwo(),1).shuffleGrouping("bolt-one","stream1");
builder.setBolt("bolt-three", new BoltThree(),1).shuffleGrouping("bolt-one","stream2");
BoltOne
collector.emit("stream1", new Values(input.getString(0)));
collector.emit("stream2", new Values(input.getString(0)));
x++;System.out.println("" + x);
collector.ack(input);
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// TODO Auto-generated method stub
outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
}
BoltTwo & BoltThree
public void execute(Tuple input) {
String sentence = input.getString(0);
System.out.println("*********B2*************");
}
StackTrace
*********B2*************
1
*********B3*************
2
*********B2*************
*********B3*************
3
*********B3*************
*********B2*************
4
*********B3*************
*********B2*************
5
*********B2*************
*********B3*************
6
*********B2*************
*********B3*************
7
*********B3*************
*********B2*************
Совершенно путают с расщепляющимися потоками и параллелизмом. Пример был бы полезен.
Обновлено Решение я придумал сейчас:
public void execute(Tuple input) {
@SuppressWarnings("unused")
String sentence = input.getString(0);
if (x%2==0) {
collector.emit("stream1", new Values(input.getString(0)));
}
else{
collector.emit("stream2", new Values(input.getString(0)));
}
x++;
collector.ack(input);
}
Я просто разделить поток на четный-нечетный основе, и время процесса становится наполовину, В то время как BoltTwo обрабатывает кортеж другой обрабатывается от BoltThree.
вы прошли тщательный [это] (http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism -of-а-бури-topology.html)? – user2720864
Достигнутые это как-то: общественной пустоте выполнения (вход Кортеж) { \t \t @SuppressWarnings ("неиспользуемый") \t \t Строка предложение = input.getString (0); \t \t, если (х% 2 == 0) { \t \t \t collector.emit ("Stream1", новые значения (input.getString (0))); \t \t} \t \t еще { \t \t \t collector.emit ("stream2", новые значения (input.getString (0))); \t \t} \t \t \t \t \t х ++; \t \t collector.ack (ввод); \t} –