2015-05-19 2 views
2

Я использую storm-kafka-0.9.3 для чтения данных из Kafka и обработки этих данных в Storm. Ниже я использую Kafka Spout. Но проблема в том, что я убиваю кластер Storm, он не читает старые данные, которые были отправлены за время, когда он был мертв, и начал читать с последнего смещения.Storm Kafka Spout Невозможно прочесть последний отзыв

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST); 

SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME 
     , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
//Never should make this true 
spoutConfig.forceFromStart=false; 
spoutConfig.startOffsetTime =-2; 

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 
return kafkaSpout; 
+0

Пожалуйста, вы можете попробовать закомментировать 'spoutConfig.forceFromStart = ложь;' строки или с помощью набора 'spoutConfig.forceFromStart = true' – user2720864

+0

Пытался что, но тот же вопрос, на самом деле увидеть Предположим у меня есть 100 сообщений в Кафки, Штормовая обработка, которая, предположим, после 100-го сообщения Storm пошла вниз, и моя конечная точка http нажала еще 300 сообщений в Kafka, поскольку Storm обработала только 100 сообщений, которые я ожидаю, когда Storm просыпается, она должна начать обработку из 101 сообщения, где оно осталось. – user1249655

+0

так что происходит? в вашем посте вы упомянули, что начинаете читать с последнего смещения .. не то, что вы ищете? – user2720864

ответ

2

Спасибо Все, Поскольку я работала в топологию в локальном режиме, Storm не хранит Offset в ZK, когда я побежал топология в режиме Prod.

Sougata

+0

Итак, какие конфигурации помогли вам решить проблему? – Sankalp

1

Я считаю, что это может произойти, потому что в то время как топология работает, оно используется для хранения всей информации о состоянии в Zookeeper используя следующий путь SpoutConfig.zkRoot+ "/" + SpoutConfig.id так, что в случае неудачи он может возобновить от последней письменного смещения в зоопарке.

Получил это от дока

Важны: При повторном развертывании топологии убедитесь, что настройки для SpoutConfig.zkRoot и SpoutConfig.id не были изменены, в противном случае носик не сможет читать предыдущую информацию о состоянии пользователя (т. е. смещения) от ZooKeeper, что может привести к неожиданному поведению и/или к потере данных в зависимости от вашего варианта использования.

В вашем случае, как SpoutConfig.id является случайной величиной UUID.randomUUID().toString() Ее не в состоянии восстановить последнее совершившие смещение.

Также читайте на той же странице

когда топология запустить один раз заходящего KafkaConfig.startOffsetTime не будет иметь эффект для последующих запусках топологии, потому что теперь топология будет опираться на состояние потребительского информацию (смещения) в ZooKeeper, чтобы определить, откуда она должна начинаться (точнее: возобновить) чтение. Если вы хотите заставить носик игнорировать информацию о состоянии пользователя, хранящуюся в ZooKeeper, тогда вы должны установить параметр KafkaConfig.ignoreZkOffsets равным true. Если это правда, носик всегда начинает чтение со смещением определяется KafkaConfig.startOffsetTime, как описано выше

Вы могли бы использовать статический id, чтобы увидеть, если он может получить.

+0

Спасибо за ваш ответ. Все еще не работает должным образом SpoutConfig spoutConfig = новый SpoutConfig (хосты, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME, "Test"); spoutConfig.forceFromStart = true; – user1249655

+0

в соответствии с моим пониманием, вы делаете это - 1) Начните kafka-spout читать 100 msg (2) Убейте топологию (3) Нажмите еще 100 msg в очередь (4) снова запустите топологию ... и начните чтение с 201 смещение .. это правильно? – user2720864

+0

Да, это начинается с 201, но я хочу, чтобы он начинал с 101, так как он не обрабатывал последние 100 сообщений. Кстати, я использовал трезубец и его ту же проблему. Он всегда читается с 201. – user1249655

0

Вы должны установить spoutConfig.zkServers и spoutConfig.zkPort:

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME 
    , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,"test"); 

spoutConfig.zkPort=Constants.ZOOKEEPER_PORT; 
spoutConfig.zkServers=Constants.ZOOKEEPER_SERVERS; 

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 
return kafkaSpout; 
Смежные вопросы