Аналогично, но немного иначе, как этот вопрос: KStream batch process windows, я хочу, чтобы сообщения от KStream
, прежде чем нажимать их на потребителей.Как собрать KStream в список фиксированного размера?
Однако это нажатие не должно быть запланировано на фиксированное временное окно, но на фиксированное пороговое значение количества сообщений на ключ.
Для начала 2 вопроса приходят на ум:
1) Является ли обычай AbstractProcessor
путь этот должен быть обработан? Что-то вдоль линий:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2) Поскольку в StateStore
потенциально взрываются (в случае, если входное значение не достигает порога, чтобы быть перенаправлены), что это лучший способ «мусора-Collect» это ? Я мог бы выполнить расписание по времени и удалить слишком старые ключи ... но это выглядит очень разумно и подвержено ошибкам.
Спасибо за эти ценные замечания. Без сомнения, последуют более конкретные вопросы. – Raf