2016-03-15 2 views
3

Я использую Kafka 0.8.2, и у меня возникает ошибка в моем потребителе, говоря, что «offset commit failed with ...». При просмотре темы «__consumer_offsets». Я видел, что у него было 50 разделов. Это нормально? Я смог решить эту проблему, удалив все журналы Kafka и перезапустив свой сервер Kafka. Есть ли способ удалить эту тему, когда она достигнет определенного количества разделов, или я неправильно делаю свои смещения?Тема kafka __consumer_offsets имеет чрезмерное количество разделов

Вот как я посвящаю мои коррекции:

public void commitOffsets(BlockingChannel channel, String topic, String groupid, int partition, String clientName, int corrilationid, long offset) throws Exception{ 

    if (commitTryCount > 100){ 
     throw new Exception("Offset commit failed with " + channel.host()); 
    } 

    long now = System.currentTimeMillis(); 
    Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); 
    //for (int i = 0; i < this.totalPartitions; i++){ 
     TopicAndPartition topicPartition = new TopicAndPartition(topic, partition); 
     offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now)); 
    //}  

    //initialize offset commit 
    OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1); 
    channel.send(commitRequest.underlying()); 
    OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); 
    if (commitResponse.hasError()){   
     for (Object partitionErrorCode: commitResponse.errors().values()){ 
      if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){ 
       //reduce the size of the metadata and retry 
       offset--; 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      } else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode() 
        || Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { 
       //discover new coordinator and retry 
       int newCorrilation = corrilationid; 
       newCorrilation++; 
       this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation); 
       commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset); 
       commitTryCount++; 
      } else{ 
       //retry 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      }//end of else    
     }//end of for 
    }//end of if 
}//end of method 

ответ

1

Я понял это после того, как я отправил свой код. Я забыл установить переменную «commitTryCount» равной 0, когда фиксация была успешной. Мне все еще интересно, нормально ли, что тема __consumer_offsets имеет 50 разделов?

+0

50 является значением по умолчанию для 'offsets.topic.num.partitions' config, так что это нормально. Вы можете посмотреть настройки по умолчанию [здесь] (http://kafka.apache.org/documentation.html) – serejja

0

Да, 50 разделов для смещений потребителей по умолчанию. Чтобы изменить, установите свойство offsets.topic.num.partitions.

Смежные вопросы