Надеюсь, я правильно понял вопрос, что вы хотите, чтобы сообщения для определенного объекта переходили в один и тот же раздел, все еще имея масштабируемое решение.
Самый простой способ (на мой взгляд) сделать это - указать раздел на стороне производителя.
new ProducerRecord(topicName, partitionId,messageKey,message)
Если конкретная тема в вопросе приходит из-за пределы вашей системы, и вы не можете, таким образом, создать свою собственную логику производителя, я бы просто добавить потребитель, который производит сообщения в другую тему, так что указан раздел ,
Продолжая свой пример, предположим, что у вас есть some_topic с миллионами сообщений и 200 тыс. Сущностей, у вас может быть потребитель с высокой пропускной способностью, который потребляет все и производит для some_topic_2, так что сообщение для определенного объекта всегда создается для того же раздел.
Затем вы можете использовать другого высокопроизводительного пользователя, который потребляет от some_topic_2, и будет выполнять описанную вами логику, то есть сохранять вкладки, по которым объекты следует игнорировать и обрабатывать другие.
Конечно, если вам не нужна высокопроизводительная система, вы можете использовать тему kafka с одним разделом и выполнять всю обработку, используя один потребитель для этой темы.
Соответствующие Блогпост: http://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html
Дополнительные мысли:
Другой способ сделать это, если вы используете по крайней мере Кафка 0,10 должны использовать Кафку Streams (http://kafka.apache.org/documentation/streams).
[...] возможность сохранять состояние открывает множество возможностей для сложных приложений для обработки потоков: вы можете присоединяться к входным потокам или группировать и собирать записи данных.
К сожалению, я не работал с API Kafka Streams, но не могу указать подход.
Надеюсь, другие ответчики могут предоставить дополнительную информацию.
Вы все еще можете «использовать» и игнорировать сообщение, если хотите его пропустить - для последующей обработки. Если вы измените свое решение, вы можете запомнить смещение сообщения, а затем '#seek()' для этого смещения, чтобы обработать сообщение позже. –
Как сказал @ MatthiasJ.Sax - вы действительно можете пропустить проблемное сообщение, когда вам нужно. Следовательно, это ложное предположение, что «все последующие сообщения будут заблокированы».Возможно, вам нужно обновить свой вопрос несколькими подробностями, например, что вы добавили в свой комментарий ниже? –
Извинения за недопущение информации. Поскольку для объекта требуется упорядочение, если я удаляю сообщение, последующие сообщения для этих объектов также должны быть отклонены. Поскольку @ ossu54 предложил мне отслеживать сущности, имеющие проблемы, и каждый раз, когда я использую сообщение, я могу проверить этот список и принять решение о его обработке или нет. См. Мои проблемы с этим в комментарии к ответу ossu5 Проясняет ли он miguno и Matthias J. Sax – anfab