1

У меня есть 3 раздела: 0, 1, 2. Таким образом, сообщения могут быть классифицированы как 0, 1, 2.Может ли поток, искра, шторм прочитываться у Кафки круговым способом?

например:

1 сообщение в разделе 0: 0

3 сообщений в раздел 1: 111

2 сообщений в разделе 2: 22

Как сделать потребитель потреблять сообщения в порядке 012x12x1x (х означает отсутствие сообщений в то время). Порядок потребления сообщений выглядит так: 012121. Я хотел бы сделать это как на C++, так и на Python. Обращаясь к существующим клиентам, сообщения можно создавать круговым способом, но их нельзя использовать в круговом стиле.

Любая идея?

Существует раздел.assignment.strategy в потребительских конфигурациях Kafka (http://kafka.apache.org/documentation.html#consumerconfigs). Я ищу некоторые инструменты, которые реализуют эту конфигурацию (, такие как лоток, искра, шторм), чтобы читать данные из кафки, переупорядочивать их и снова писать в кафку. Продолжите приведенный выше пример. Переставленные сообщения выглядеть следующим образом: 012121 (012x12x1x)

UPDATE

Теперь я могу сделать это в клиенте C++ Кафки (https://github.com/edenhill/librdkafka).

for(int i = 0; i < 2; i++) 
    { 
     RdKafka::Message *msg = m_consumer->consume(m_topic, i, 1000); 
     // Do something about msg here... 
    } 

Выход:

Reading from 1=>4953---1--- 
Reading from 0=>46164---0--- 
Reading from 1=>4954---1--- 
Reading from 0=>46165---0--- 
Reading from 1=>4955---1--- 
Reading from 0=>46166---0--- 
Reading from 1=>4956---1--- 
Reading from 0=>46167---0--- 
Reading from 1=>4957---1--- 
Reading from 0=>46168---0--- 
+0

Мой вопрос переиздан. Благодарю. – BAE

ответ

-1

В Java, вы можете использовать high level consumer.

Если вы используете все 3 раздела в том же процессе, используя тот же group.id, вы можете создать 3 рабочих потока, которые действуют как потребители, и циклически перемещаться между ними.

Я понимаю, что вы явно упомянули Python и C++ как ваши целевые языки.
Из моего собственного опыта работы на Python нет эквивалента потребителю высокого уровня, предоставляемому на Java.

Таким образом, вы можете каким-то образом обернуть его, например, создать новый Java-процесс, который действует как сервер, и потреблять сообщения через этот процесс, или вы можете попытаться перенести потребителя высокого уровня на Python и/или C++.

Другим вариантом является использование каждого раздела, запись данных на какой-либо другой носитель, такой как MySQL (или любая другая СУБД), и использование сообщений с использованием SQL для циклического развертывания и, в конечном итоге, их удаление из соответствующей таблицы.

В любом случае, я предлагаю вам пересмотреть Кафку как ваш транспортный уровень. Ваши требования (потребляющие циклически) не совпадают с основным дизайном/архитектурой Kafka. Вот почему:
Пока количество разделов меньше доступных ядер на машине пользователя - тогда можно использовать сообщения на этой единственной (!) Машине круглым способом, как описано выше.

Однако Kafka также предназначен для распределенной нагрузки (потребителей). Это мотивация для создания разделов для одной темы. Масштабируемость - ключевая концепция в Кафке.

Так что, если у вас больше разделов, чем доступных ядер на бытовой машине, возможно, вы не сможете правильно перебирать разделы и потреблять сообщения циклически.

Пример: говорят, что у вас есть 20 разделов в определенной теме. В этом разделе все время появляется много сообщений. Теперь предположим, что у вас есть одна машина с 4 ядрами процессора, которые будут потреблять из этой темы. По дизайну вы сможете потреблять не более 4 разделов за раз. Преобразование этого в циклическое потребление может быть достигнуто путем буферизации значимого количества сообщений в памяти или какого-либо другого механизма (что может привести к другим проблемам, таким как латентность). Именно тогда мы предполагаем, что все разделы доступны все время, без проблем с сетью или проблем с дисками, которые влияют на Kafka. Вот почему я думаю, что «объединение назад» разделов в какой-то другой поток сообщений - это не тривиальная вещь, которую нужно делать в больших масштабах.

+0

В настройках для покупок Kafka существует раздел.assignment.strategy (http://kafka.apache.org/documentation.html#consumerconfigs). Я ищу некоторые инструменты, которые реализуют эту конфигурацию (например, плагин kafka logstash), чтобы читать данные из kafka, переупорядочивать их и снова писать в kafka. Есть ли у вас какие-либо предложения? – BAE

+0

[a] Я не знаю таких инструментов; [b] Я знаю плагин Logstash Kafka, но его стратегия потребления довольно прямолинейна; [c] Я действительно считаю, что то, что вы пытаетесь достичь, противоречит основным проектным решениям Кафки. [d] Поэтому я смиренно предлагаю вам использовать данные из Kafka, использовать некоторые другие среды, такие как очереди или РСУБД, переупорядочить по мере необходимости и, наконец, вернуться к Кафке в другой теме. –

+0

Можно написать собственный входной плагин kafka? В клиенте Java Kafka поддерживается циклическое потребление. Таким образом, этот клиент может использоваться для моего собственного плагина. – BAE

Смежные вопросы