2016-12-02 3 views
1

Аналогично, но немного иначе, как этот вопрос: KStream batch process windows, я хочу, чтобы сообщения от KStream, прежде чем нажимать их на потребителей.Как собрать KStream в список фиксированного размера?

Однако это нажатие не должно быть запланировано на фиксированное временное окно, но на фиксированное пороговое значение количества сообщений на ключ.

Для начала 2 вопроса приходят на ум:

1) Является ли обычай AbstractProcessor путь этот должен быть обработан? Что-то вдоль линий:

@Override 
public void punctuate(long streamTime) { 
    KeyValueIterator<String, Message[]> it = messageStore.all(); 
    while (it.hasNext()) 
     KeyValue<String, Message[]> entry = it.next(); 
     if (entry.value.length > 10) { 
      this.context.forward(entry.key, entry.value); 
      entry.value = new Message[10](); 
     } 
    } 
} 

2) Поскольку в StateStore потенциально взрываются (в случае, если входное значение не достигает порога, чтобы быть перенаправлены), что это лучший способ «мусора-Collect» это ? Я мог бы выполнить расписание по времени и удалить слишком старые ключи ... но это выглядит очень разумно и подвержено ошибкам.

ответ

2

Я думаю, это сработает. Звучит разумно также применение временной сборки мусора. И да, использование API-интерфейса Processor вместо DSL имеет некоторый колорит DIY - это в первую очередь цель PAPI (дать возможность пользователю делать все, что необходимо).

Несколько замечаний, хотя:

  • Вам потребуется более сложную структуру данных: потому что punctuate() называется на основе прогресса в поток времени, это может случиться так, что у вас есть более 10 записей для одного ключа между двумя звонки. Таким образом, вам понадобится что-то вроде KeyValueIterator<String, List<Message[]>> it = messageStore.all();, чтобы иметь возможность хранить несколько партий на ключ.
  • Я бы предположил, что вам нужно будет точно настроить расписание для пунктуации, что будет сложно: если ваше расписание слишком жесткое, многие партии могут быть еще не завершены, а вы теряете процессор - если ваше расписание слишком свободно, вам понадобится много памяти, и ваши операторы нисходящего потока получат много данных, поскольку вы сразу же испускаете много вещей. Отправка пакета данных вниз по течению может стать проблемой.
  • Сканирование всего магазина стоит дорого - кажется, неплохо попытаться «сортировать» ваши пары ключ-значение в соответствии с их размером партии. Это должно позволить вам касаться только клавиш, которые завершили партии, а не всех клавиш. Возможно, вы можете сохранить список в памяти, в котором есть пакеты со скидкой, и только искать их (при сбое вам нужно сделать один проход по всем ключам из хранилища, чтобы воссоздать этот список в памяти).
+0

Спасибо за эти ценные замечания. Без сомнения, последуют более конкретные вопросы. – Raf