2015-10-02 4 views
2

Я столкнулся с проблемой с потребителем высокого уровня kafka (0.8.2.0) - после потребления некоторого количества данных один из наших потребителей останавливается. После перезагрузки он потребляет несколько сообщений и останавливается снова без ошибок/исключений или предупреждений.Apache Kafka с потребителем высокого уровня: пропустить поврежденные сообщения

После некоторого исследования я обнаружил, что проблема с потребителем было это исключение:

ERROR c.u.u.e.impl.kafka.KafkaConsumer - Error consuming message stream: 
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222) 

Любые идеи, как я могу просто пропускать такие сообщения на всех?

+1

Я не думаю, что есть способ пропустить поврежденное сообщение. нашел дискуссию [здесь] (http://grokbase.com/t/kafka/users/133k6n4qf6/kafka-throw-invalidmessageexception-and-lost-data) может стоить взглянуть – user2720864

ответ

2

Итак, отвечая на мой вопрос. После некоторой отладки Кафки потребителя, я нашел одно из возможных решений:

  1. Создать подкласс kafka.consumer.ConsumerIterator
  2. Override makeNext -метода. В этом методе поймайте InvalidMessageException и верните некоторый фиктивный-заполнитель.
  3. В вашем while -loop вам необходимо преобразовать kafka.consumer.ConsumerIterator в вашу реализацию. К сожалению, все поля kafka.consumer.ConsumerIterator являются частными, поэтому вам нужно использовать отражение.

Так что это пример кода:

val skipIt = createKafkaSkippingIterator(ks.iterator()) 

while(skipIt.hasNext()) { 
    val messageAndTopic = skipIt.next() 

    if (messageNotCorrupt(messageAndTopic)) { 
    consumeFn(messageAndTopic) 
    } 
} 

messageNotCorrupt -метода просто проверяет, является ли аргумент равен манекеном-сообщения.

2

другое решение, возможно, проще, используя клиент Kafka 0.8.2.

try { 
    val m = it.next() 
    //... 
} catch { 
    case e: kafka.message.InvalidMessageException ⇒ 
    log.warn("Corrupted message. Skipping.", e) 
    resetIteratorState(it) 
} 

//... 

def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = { 
    val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState") 
    method.setAccessible(true) 
    method.invoke(it) 
} 
Смежные вопросы