2016-05-20 2 views
3

После обновления до Spark 1.6.1 я начал рефакторинг приложения, чтобы заменить updateStateByKey на mapWithState.Spark mapWithState обновленные состояния вывода

Чтобы воспользоваться преимуществами производительности нового API, я не хочу звонить stateSnapshots, который загружает все состояния. Мне нужны только обновленные состояния.

API возвращает DStream из [key, input, state, output], где каждое состояние является частично обновленным после поступления материала. Как я могу извлечь только последние состояния из этого DStream (т. Е. Состояние после того, как все соответствующие входы попали/отображены)?

я могу сделать map (уронить вход и выход) и reduceByKey на MapWithStateDStream, выбирая состояние с новой отметкой времени (который я установил внутри функции обновления), но у меня нет никакой гарантии, что не будет два частичных состояния с одной и той же меткой времени, даже если используется пользовательский, ключ, разделитель.

Как я могу определить, какое частичное состояние является последним в MapWithStateDStream выводах mapWithState?

ответ

3

mapWithState будет вызываться только для каждого состояния, которое обновляется в текущей микропакете. Один из способов достижения желаемого результата - вернуть Some[S] в случае, если состояние было обновлено.

StateSpec.function принимает метод со следующей подписью:

mappingFunction: 
    (Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType] 

Что мы можем сделать, это убедиться, что наш Option[MappedType] всегда Some[MappedType], когда значение было обновлено, иначе None.

Например:

def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = { 
    value match { 
     case Some(something) if something > 10 => 
     val updatedVal = something * something 
     state.update(updatedVal) 
     Some(updatedVal) 
     case _ => None 
    } 
} 

И тогда вы можете сделать:

val spec = StateSpec.function(updateState _) 
ssc.mapWithState(spec).filter(!_.isEmpty).foreachRDD(/* do stuff on updated state */) 

Таким образом, вы отфильтровать NONE обновляется состояние и сохранить только обновленные снимки, которые вы ищете.

+0

Состояние всегда обновляется - каждое действие ввода обновляет состояние, поэтому я всегда возвращаю 'Some (updatedState)'. Это означает, что вывод содержит все частичные состояния, я просто хочу, чтобы он был обработан после всех связанных действий. Я думаю, я должен был упомянуть, что есть несколько действий, соответствующих одному и тому же ключу. 'mapWithState' вызывается для каждого действия, а не для каждого состояния. Его можно назвать несколько раз для одного и того же состояния. – Sepph

+0

@Seppeh Можете ли вы опубликовать [MCVE] вашего кода? Особенно ваш Spark DAG. Несмотря на то, что существует несколько действий для одного и того же ключа, если вы обновляете 'State [S]', нет необходимости выводить значение, как в 'updateStateByKey'. –

0

Одним из решений, которое могло бы работать, если это возможно для вашего алгоритма обновления, является вызов reduceByKey во входном потоке перед вызовом mapWithstate. Тогда будет только одно обновление для каждого ключа и отсутствие частичных состояний.

Смежные вопросы