я реализовал циклическом разметки следующим образом:Кафка - проверить количество сообщений в каждом разделе
public class KafkaRoundRobinPartitioner implements Partitioner {
private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);
final AtomicInteger counter = new AtomicInteger(0);
public KafkaRoundRobinPartitioner() {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionsCount = partitions.size();
int partitionId = counter.incrementAndGet() % partitionsCount;
if (counter.get() > 65536) {
counter.set(partitionId);
}
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
Теперь я хочу, чтобы проверить, что каждый раздел имеет одинаковое число сообщений. Например, если у меня есть 1 тема с 32 разделами, и я отправляю 32 сообщения в эту тему, я ожидаю, что каждый раздел будет иметь ровно 1 сообщение.
Я хочу сделать что-то вроде следующего:
KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
int msgCount = partition.getMessagesCount();
// do asserts
}
Насколько я знаю, Кафка Java API не предоставляет нам такую функциональность, но я, возможно, потерял что-то в документации.
Есть ли способ реализовать его изящно?
Обновление Я нашел только базовое решение. Так как я с помощью модели многоковшовые потребителя, я могу просто сделать следующее для каждого потребителя:
consumer.assignment().size();
После того, что я могу сделать:
consumer.poll(100);
И убедитесь, что каждый потребитель имеет сообщение. И в этом случае я не должен сталкиваться с ситуацией, когда один потребитель получает сообщения для другого из своего раздела, потому что, поскольку у меня одинаковое количество потребителей и разделов, Kafka должен распределять разделы между потребителями круговым способом.