2014-12-11 1 views
0

Мы оцениваем Kinesis, и я нашел следующие поведения. У меня есть простой тест с использованием Kinesis для проверки точности и базовой функциональности.Kinesis отправленные записи не равны потребляемым записям

Тест производит элемент в поток следующим образом:

PutRecordRequest putRecordRequest = new PutRecordRequest(); 
    putRecordRequest.setStreamName(streamName); 
    putRecordRequest.setData(ByteBuffer.wrap(event.getBytes())); 
    putRecordRequest.setPartitionKey(message.getEventList().getEvents().get(0).getLicenseKey()); 

    UsageServiceStatistics.instance().getKinesisSent().increase(); 
    PutRecordResult putRecordResult = kinesisManager.getConnection().putRecord(putRecordRequest); 

Затем я использую клиентскую библиотеку Amazon Kinesis (KCL) следующим образом:

@Override 
public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) 
{ 
    logger.debug("Received a list of records for processing with size:" + records.size()); 

    for (Record record : records) 
    { 
     UsageServiceStatistics.instance().getKinesisConsumed().increase(); 
     logger.debug("Kinesis consumed:" + UsageServiceStatistics.instance().getKinesisConsumed()); 
     if (!processRecord(record)) 
     { 
      logger.error("Couldn't process record " + record + ". Skipping the record."); 
     } 
    } 

    checkpointManager.checkpoint(iRecordProcessorCheckpointer); 
} 

Я вижу несоответствие между количеством произведенного по сравнению с количеством потребляемой записи. Например, при отправке серии 2000 пунктов 3 раза подряд я вижу следующее:

Kinesis sent:counter=2000 
Kinesis consumed:1999 

Kinesis sent:counter=4000 
Kinesis consumed:counter=3994 

Kinesis sent:counter=6000 
Kinesis consumed:counter=5999 

Почему я не вижу, точно такое же количество полученного против потребляются? Почему после второго прохода не было 6 предметов, и я получил записи 2006 года только при запуске 3, хотя я ждал не менее 2 минут между запуском 2 и запуском 3.

Наконец, я сделал набор тестов перед этим с более высокой частотой контрольной точки, а затем расхождения были еще больше? Каково правило, что Amazon KCL использует, чтобы инициировать отправку записей в conumer? Зачем ему прекращать отправку и хранение элементов в очереди (например, от 2 до 3)? Где последний элемент из 6000, который был отправлен?

Thx заблаговременно

+0

Вы проверили исключения для записи событий в Кинезисе. Возможно, вас задушили для некоторых вызовов put_record. – Guy

+0

Я проверил, и исключений нет даже при отладке уровня журнала кинези. –

+0

Как вы отложили уровень журнала кинезита. Можете ли вы опубликовать сделанные шаги/образец? –

ответ

2

Я нашел причину.

Это было ошибкой в ​​моем коде.

KCL создает ряд процессоров записи, которые равны количеству осколков в конкретном потоке.

Однако я ввел ошибку, указав, что они используют один и тот же объект Checkpointer в многопоточной среде. Когда я исправил его, чтобы каждый процессор записи имел свой собственный checkpointer, он работал отлично, и подсчеты были согласованы.

Смежные вопросы