0

я реализовал циклическом разметки следующим образом:Кафка - проверить количество сообщений в каждом разделе

public class KafkaRoundRobinPartitioner implements Partitioner { 

    private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class); 

    final AtomicInteger counter = new AtomicInteger(0); 

    public KafkaRoundRobinPartitioner() {} 

    @Override 
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
     int partitionsCount = partitions.size(); 

     int partitionId = counter.incrementAndGet() % partitionsCount; 
     if (counter.get() > 65536) { 
      counter.set(partitionId); 
     } 
     return partitionId; 
    } 

    @Override 
    public void close() { 
    } 

    @Override 
    public void configure(Map<String, ?> map) { 
    } 
} 

Теперь я хочу, чтобы проверить, что каждый раздел имеет одинаковое число сообщений. Например, если у меня есть 1 тема с 32 разделами, и я отправляю 32 сообщения в эту тему, я ожидаю, что каждый раздел будет иметь ровно 1 сообщение.

Я хочу сделать что-то вроде следующего:

KafkaPartitions allPartitions = new KafkaTopic("topic_name"); 
for (KafkaPartition partition : allPartitions) { 
    int msgCount = partition.getMessagesCount(); 
    // do asserts 
} 

Насколько я знаю, Кафка Java API не предоставляет нам такую ​​функциональность, но я, возможно, потерял что-то в документации.

Есть ли способ реализовать его изящно?

Обновление Я нашел только базовое решение. Так как я с помощью модели многоковшовые потребителя, я могу просто сделать следующее для каждого потребителя:

consumer.assignment().size(); 

После того, что я могу сделать:

consumer.poll(100); 

И убедитесь, что каждый потребитель имеет сообщение. И в этом случае я не должен сталкиваться с ситуацией, когда один потребитель получает сообщения для другого из своего раздела, потому что, поскольку у меня одинаковое количество потребителей и разделов, Kafka должен распределять разделы между потребителями круговым способом.

ответ

0

Наконец-то я написал что-то вроде следующего.

работник Мои KafkaConsumer имеет следующий код:

public void run() { 
    while (keepProcessing) { 
     try { 
      ConsumerRecords<byte[], byte[]> records = consumer.poll(100); 
      for (ConsumerRecord<byte[], byte[]> record : records) { 
       // do processing 
       consumer.commitSync(); 
      } 
     } catch (Exception e) { 
      logger.error("Couldn't process message", e); 
     } 
    } 
} 

И в моих тестах я решил проверить, что каждый потребитель сделал ровно один коммит, что означает, что распределение сообщений в циклическом порядке , Код проверки:

public class KafkaIntegrationTest { 

private int consumersAndPartitionsNumber; 
private final CountDownLatch latch = new CountDownLatch(consumersAndPartitionsNumber); 

@Test 
public void testPartitions() throws Exception { 
    consumersAndPartitionsNumber = Config.getConsumerThreadAmount(); // it's 5 
    KafkaMessageQueue kafkaMessageQueue = new KafkaMessageQueue(); // just a class with Producer configuration 
    String groupId = Config.getGroupId(); 
    List<KafkaConsumer<byte[], byte[]>> consumers = new ArrayList<>(consumersAndPartitionsNumber); 

    for (int i = 0; i < consumersAndPartitionsNumber; i++) { 
     consumers.add(spy(new KafkaConsumer<>(KafkaManager.createKafkaConsumerConfig(groupId)))); 
    } 

    ExecutorService executor = Executors.newFixedThreadPool(consumersAndPartitionsNumber); 
    for (KafkaConsumer<byte[], byte[]> consumer : consumers) { 
     executor.submit(new TestKafkaWorker(consumer)); 
    } 

    for (int i = 0; i < consumersAndPartitionsNumber; i++) { 
     // send messages to topic 
     kafkaMessageQueue.send(new PostMessage("pageid", "channel", "token", "POST", null, "{}")); 
    } 

    latch.await(60, TimeUnit.SECONDS); 

    for (KafkaConsumer<byte[], byte[]> consumer : consumers) { 
     verify(consumer).commitSync(); 
    } 
} 

class TestKafkaWorker implements Runnable { 

    private final KafkaConsumer<byte[], byte[]> consumer; 
    private boolean keepProcessing = true; 

    TestKafkaWorker(KafkaConsumer<byte[], byte[]> consumer) { 
     this.consumer = consumer; 
     consumer.subscribe(Arrays.asList(Config.getTaskProcessingTopic())); 
    } 

    public void run() { 
     while (keepProcessing) { 
      try { 
       ConsumerRecords<byte[], byte[]> records = consumer.poll(100); 
       for (ConsumerRecord<byte[], byte[]> record : records) { 
        consumer.commitSync(); 
        keepProcessing = false; 
        latch.countDown(); 
       } 
      } catch (Exception e) { 
      } 
     } 
    } 
} 
} 
0

Вы можете использовать seekToBeginning() и seekToEnd() и вычислить разницу смещений, которые вы получаете для каждого раздела.

Смежные вопросы