2015-02-12 3 views
2

Можно ли дважды вызвать UpdateStateByKey на том же RDD. Мое требование следующее.Вызов updateStateByKey дважды на том же RDD

  1. Получить поток событий от Кафки
  2. UpdateStateByKey агрегировать и набор фильтров событий на основе временной метки
  3. некоторую обработку и сохранение в Cassandra БД
  4. UpdateStateByKey удалить ключ на основе типСобытия

Я попытался присвоить результаты с шага 2 VAR и переназначить его на обновленное значение на шаге 4. Но похоже, что это не работает. Я новичок в искры и не знаю, как такое поведение возможно.

Цените любую помощь.

+0

этот ответ должен объяснить, как состояние обновления на ключевых работ, http://stackoverflow.com/questions/24771823/spark-streaming-accumulated-word-count/24771886#24771886, и да, вы должен иметь возможность вызывать updateStateByKey более одного раза на одном и том же RDD, вы можете захотеть кешировать, если вы это сделаете, чтобы получить дополнительную помощь, вы должны опубликовать попытку того, что вы сделали – aaronman

+0

вы имеете в виду, я должен использовать широковещательную переменную или RDD persistence like, cache(), persist(), не уверен, какой из них точно поможет в приведенном выше случае – Harsha

ответ

0

Я решил эту проблему, выполнив очистку, которую хотел сделать во втором вызове updateStateByKey() в начале моего метода обновления состояния. Небольшой пример:

private static Optional<State> updateState(
     final List<Events> allEvents, 
     final Optional<State> state) { 
    State state = state.or(State::new); 
    state.clearAccumulatedValues(); 

    // Do some work... 
    state.addValue("Purple Elephants!"); 

    return Optional.fromNullable(state.isEmpty() ? null : state); 
} 
Смежные вопросы