4

Я использую AWS Kinesis Client Library.Kinesis: Какой лучший/безопасный способ закрыть рабочего?

Мне нужен способ отключения рабочего потока Kinesis во время развертывания, чтобы я остановился на контрольно-пропускном пункте, а не в середине processRecords().

Я вижу выключение булевского присутствия в Worker.java, но оно сделано частным.

Причина, по которой мне нужно, что контрольная точка и идемпотентность важны для меня, и я не хочу убивать процесс посередине партии.

[EDIT]

Благодаря @CaptainMurphy, я заметил, что Worker.java выставляет shutdown() метод, который надежно закрывает вниз работника и LeaseCoordinator. То, что он не делает, это вызов shutdown() задачи в IRecordProcessor. Он внезапно заканчивает IRecordProcessor, не беспокоясь о состоянии.

Я понимаю, что Идемпотентность между контрольно-пропускными пунктами не гарантируется KCL и разработчик должен сделать дизайн вино терпимо, но я чувствую, что IRecordProcessor должен быть правильно завершить работу перед LeaseCoordinator останавливается независимо от того, что.

+0

Публичная функция установки флага выключения определена в строке 471. – engineerC

+1

public void shutdown() { this.shutdown = true; } – engineerC

+0

@CaptainMurphy: спасибо. это отключает работника, который, в свою очередь, отключает «LeaseCoordinator». Я надеялся, что Работник будет ждать завершения процесса processRecords. Теперь я планирую вручную «контрольно-пропускные пункты» осколков, а затем прекратить работу. Пожалуйста, совет, если есть более рекомендуемый способ сделать это. –

ответ

0

Метод выключения процессора записи действительно вызван, когда вы вызываете завершение работы Рабочего. Вы можете отследить его обратно из класса ShutdownTask, который создается классом ShardConsumer, который закрыт рабочим.

Таким образом, вы можете контрольно-пропускную точку на последней принятой записи, прослушивая вызов выключения, передавая контрольный указатель последней полученной последовательности в точке итерации вашего последнего обработанного значения. Например. в вашем переопределенном процессеRecords():

for(Record currRecord : records) 
{ 
    someProcessSingleRecordMethod(currRecord) 
    if(shutdown) 
    { 
     checkpointer.checkpoint(currRecord.getSequenceNumber()); 
     return; 
    } 
} 

Где ваш метод выключения устанавливает флаг выключения в значение true.

Обратите внимание на то, что для приложений Kinesis по-прежнему лучше всего разрабатывать «по крайней мере один раз» в случае неуважительного отключения, например, для завершения экземпляра. Получение и обработка «только один раз» не может быть хорошим прецедентом для Kinesis.

0

С версии 1.7.1 (см. Примечание ниже) приложение может запросить изящное завершение работы, а записи процессоров, которые реализуют IShutdownNotificationAware, будут предоставлены возможность контрольной точки перед выключением.

  • убедитесь, что запись процессор реализует IShutdownNotificationAware интерфейс в дополнение к одному из IRecordProcessor интерфейсов. Контрольная точка вызова в методе shutdownRequested(IRecordProcessorCheckpointer checkpointer). Обратите внимание - shutdown метод IRecordProcessor должен вызвать пункт пропуска только если отключение причиной является ПРЕКРАТИТЬ
  • на завершение работы приложения инициировать процесс выключения уборщица

    Future<Void> shutdown = worker.requestShutdown(); 
    shutdown.get(); // wait for shutdown complete 
    

PS: Kinesis клиента перед тем версия 1.7.4 содержит гонки условие, предотвращает правильное выключение. Поэтому используйте версию 1.7.4 или новее.

Смежные вопросы