2015-08-26 3 views
2

Для некоторых больших сообщений, которые я столкнулся следующее сообщение об ошибке:Кафка kafka.common.MessageSizeTooLargeException на потребителе

kafka.common.MessageSizeTooLargeException: Message size is 1185198 bytes which exceeds the maximum configured message size of 1000012. 

Так, согласно this thread увеличил размер сообщения на брокере и потребителе:

fetch.message.max.bytes=10485760 
replica.fetch.max.bytes=10485760 
message.max.bytes=10485760 


добавлено к config/server.properties

Но тогда сообщения проходят, но c onsumer без ошибок:

[2015-08-26 21:08:08,722] ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) 
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic xyz partition 0 at fetch offset 29. Increase the fetch size, or decrease the maximum message size the broker will allow. 
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90) 
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) 
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) 
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:660) 
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) 
    at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) 
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166) 
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 
Consumed 3 messages 

кажется потребитель не поднимая fetch.message.max.bytes=10485760

kafka_2.9.1-0.8.2.1

Любые указатели?

ответ

4

Вы не должны ставить fetch.message.max.bytes на номер config/server.properties, но на ваш ConsumerConfig (см. here). Если вы используете консольный потребитель, вы можете передать флаг --consumer.config consumer.properties, где файл consumer.properties будет содержать это значение конфигурации.

+0

Как я могу получить носик Storm, чтобы подчиняться более высоким размерам выборки. Есть идеи? @serejja –

+0

@KedarParikh не знаю, никогда не использовал Storm, что до – serejja

+0

SpoutConfig.fetchSizeBytes представляется опцией –

Смежные вопросы