2017-01-18 2 views
0

У меня есть служба REST, давайте позвоним ей MDD, в которой есть один потребитель кафки. Когда я ПЕРВЫЙ запускаю службу отдыха, другая служба сообщает потребителю MDD о подписке на определенную тему, все, кажется, идет хорошо.Одна проблема с несколькими потребителями вызывает потенциальную проблему с потоком?

Затем служба сообщает потребителю MDD о подписании другой темы. То, как я это делаю сейчас, - это метод consumer.assign(). В принципе, если вводится новая тема, которой не назначен потребитель, я назначаю эту новую тему потребителю. Таким образом, один потребитель теперь назначен на две разные темы.

Этот потребитель просматривает сообщения и откладывает их в HDFS.

Теперь, что я заметил, это когда подписка на вторую тему приходит, иногда я получаю сообщение об ошибке при отказе добавить файл в HDFS, и когда я смотрел журналы, он пытался добавить некоторые данные, которые не следует добавлять до конца. Например, данные для kafka поступают в этом порядке A, B, C. Когда MDD завершается добавлением A в HDFS, он пытается добавить C (а не B) и одновременно пытается добавить B также. Также еще одно примечание: никаких данных не поступает из первой темы на данный момент, только потоки данных из второй темы. Так что в настоящее время только одна тема кафки имеет поток данных в любой момент времени.

Кто-нибудь знает, что может быть? Существует ли потенциал для создания каких-либо проблем с потоком, когда я назначаю ОДНОГО потребителя нескольким темам? Поскольку все кажется прекрасным, когда потребитель присваивается ОДНОЙ теме, но как только он назначен более чем одной теме, мне не удается добавить файл в HDFS, потому что другой автор уже владеет арендой. Эта ошибка происходит не часто, просто очень случайным образом.

Также было бы рекомендовано исправить каждый раз, когда создается новая тема, создайте нового потребителя kafka?

ответ

0

Это определенно действительный и выполнимый, чтобы иметь только один потребитель, читающий сообщения из нескольких тем. Проблема, с которой вы столкнулись, связана с тем, что Kafka в настоящее время не поддерживает оба назначения вручную (с назначением KafkaConsumer #) и групповым управлением (с подпиской KafkaConsumer #).

Чтобы поддержать подписку на недавно созданные темы, вы можете попытаться вызвать подписку на KafkaConsumer #, на которую передается регулярное выражение, соответствующее всем вновь созданным темам.

+0

Вы можете подробнее рассказать об этом, пожалуйста: Проблема, с которой вы столкнулись, связана с тем, что Kafka в настоящее время не поддерживает оба назначения вручную (с назначением KafkaConsumer #) и групповым управлением (с подпиской KafkaConsumer #). Я все еще не понимаю, почему проблема вызвана. – StephCurry3093

+0

Проще говоря, вы не должны использовать KafkaConsumer.subscribe и KafkaConsumer.assign оба, поэтому придерживайтесь только одного из них. – amethystic

+0

Я использую только один. Сначала я использовал назначение, затем я попытался переключиться на подписку и все еще получаю ту же ошибку – StephCurry3093

Смежные вопросы