2016-01-19 2 views
0

Приложение читает сообщения из одной темы Kafka и после хранения в MongoDB и делает некоторые проверки, пишет в другую тему. Здесь я столкнулся с проблемой, так как приложение переходит в бесконечный цикл. код у меня есть ниже:Как написать испущенный кортеж в тему kafka

Hosts zkHosts = new ZkHosts("localhost:2181"); 
String zkRoot = "/brokers/topics" ; 
String clientRequestID = "reqtest"; 
String clientPendingID = "pendtest"; 
SpoutConfig kafkaRequestConfig = new SpoutConfig(zkHosts,"reqtest",zkRoot,clientRequestID); 
SpoutConfig kafkaPendingConfig = new SpoutConfig(zkHosts,"pendtest",zkRoot,clientPendingID); 

kafkaRequestConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
kafkaPendingConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaRequestSpout = new KafkaSpout(kafkaRequestConfig); 
KafkaSpout kafkaPendingSpout = new KafkaSpout(kafkaPendingConfig); 

MongoBolt mongoBolt = new MongoBolt() ; 
DeviceFilterBolt deviceFilterBolt = new DeviceFilterBolt() ; 
KafkaRequestBolt kafkaReqBolt = new KafkaRequestBolt() ; 
abc1DeviceBolt abc1DevBolt = new abc1DeviceBolt() ; 
DefaultTopicSelector defTopicSelector = new DefaultTopicSelector(xyzKafkaTopic.RESPONSE.name()) ; 
KafkaBolt kafkaRespBolt = new KafkaBolt() 
    .withTopicSelector(defTopicSelector) 
    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) ; 

TopologyBuilder topoBuilder = new TopologyBuilder(); 
topoBuilder.setSpout(xyzComponent.KAFKA_REQUEST_SPOUT.name(), kafkaRequestSpout); 
topoBuilder.setSpout(xyzComponent.KAFKA_PENDING_SPOUT.name(), kafkaPendingSpout); 
topoBuilder.setBolt(xyzComponent.KAFKA_PENDING_BOLT.name(), 
    deviceFilterBolt, 1) 
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_SPOUT.name()) ; 
topoBuilder.setBolt(xyzComponent.abc1_DEVICE_BOLT.name(), 
    abc1DevBolt, 1) 
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_BOLT.name(), 
     xyzDevice.abc1.name()) ; 
topoBuilder.setBolt(xyzComponent.MONGODB_BOLT.name(), 
    mongoBolt, 1) 
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(), 
     xyzStreamID.KAFKARESP.name()); 
topoBuilder.setBolt(xyzComponent.KAFKA_RESPONSE_BOLT.name(), 
    kafkaRespBolt, 1) 
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(), 
     xyzStreamID.KAFKARESP.name()); 

Config config = new Config() ; 
config.setDebug(true); 
config.setNumWorkers(1); 

Properties props = new Properties(); 
props.put("metadata.broker.list", "localhost:9092"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 
props.put("request.required.acks", "1"); 
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); 

LocalCluster cluster = new LocalCluster(); 
try{ 
    cluster.submitTopology("demo", config, topoBuilder.createTopology()); 
} 

В приведенном выше коде, KAFKA_RESPONSE_BOLT пишет данные в тему. abc1_DEVICE_BOLT кормит эту KAFKA_RESPONSE_BOLT испуская данные, как:

@Override 
public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    Fields respFields = IoTFields.getKafkaResponseFieldsRTEXY(); 
    ofd.declareStream(IoTStreamID.KAFKARESP.name(), respFields); 
} 

@Override 
public void execute(Tuple tuple, BasicOutputCollector collector) { 
    List<Object> newTuple = new ArrayList<Object>() ; 
    String params = tuple.getStringByField("params") ; 
    newTuple.add(3, params); 
    ---- 
    collector.emit(IoTStreamID.KAFKARESP.name(), newTuple); 
} 
+0

что вы имеете в виду «как приложение будет в бесконечный цикл»? Какое поведение вы получаете? Какое поведение вы ожидаете? –

+0

@ MatthiasJ.Sax, то же самое сообщение ввода снова и снова читается из темы pendtest, я ожидаю, что мне нужно поместить прочитанный ввод в тему resptest, изменив структуру и сохранить в mongo. –

+0

Я могу изменить и сохранить его в mongo, но не умеет писать на тему –

ответ

1

Я обеспокоен тем же вопросом в течение длительного времени, ответ очень прост ... вы не поверите.

Насколько я понимаю, реализация KafkaBolt должна получать кортежи, имеет название поля «сообщение», независимо от того, является ли это болтом или носиком. Поэтому вам нужно внести некоторые изменения в свой код, который я еще не видел тщательно. (Но я считаю, что это помогло бы!)

конкретная причина, как говорят в https://mail-archives.apache.org/mod_mbox/incubator-storm-user/201409.mbox/%[email protected]%3E

+0

Это помогло мне поблагодарить. – Matt

+0

Не могли бы вы помочь мне, как вы исправили эту проблему. Я сталкиваюсь с тем же. благодаря! – user3244172

Смежные вопросы