2017-02-23 15 views
3

Как создать несколько потоков из одной главной темы? Когда я делаю что-то вроде этого:Несколько потоков из одной основной темы

KStreamBuilder builder = new KStreamBuilder(); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output1"); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 

Я получаю следующее сообщение об ошибке:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source. 
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347) 
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92) 

мне нужно сделать еще один экземпляр KafkaStreams для каждого потока от «хозяина» делать?

ответ

7

Вы можете создать KStream, что вы можете повторно использовать:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master"); 

, то вы можете использовать его:

inputStream.filter(..logic1) 
     .to(Serdes.String(), Serdes.String(), "output1"); 
inputStream.filter(..logic2) 
     .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 
+2

Если у вас нет совпадения в вас фильтры, вы можете также использовать ' inputStream.branch() ', которые возвращают неперекрывающиеся субпотоки. –

Смежные вопросы