2016-05-29 2 views
1

Я пытаюсь объединить два потока, один из которых должен быть с сохранением состояния (например, статические данные с не частыми обновлениями):Apache Спарк слияния после updateStateByKey()

SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]"); 
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10)); 
context.checkpoint("."); 
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998); 
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999); 

JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}); 

JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> { 
    if (!strings.isEmpty()) { 
     return Optional.of(strings.get(0)); 
    } 
    return Optional.absent(); 
}); 

pairDataStream.join(pairRefDataStream).print(); 


context.start(); 
context.awaitTermination(); 

Когда я пишу 1 aaa в первый поток и 1 111 во второй сразу все работает нормально, я вижу результат слияния. Но, когда я пишу 1 bbb в первый поток через минуту, я ничего не вижу.

Я правильно понял, что делает updateStateByKey()? Или я ошибаюсь?

ответ

3

updateStateByKey делает именно то, о чем вы просите. В частности, если текущее окно не содержит данных (strings.isEmpty()) вы учите его забыть (возвращение Optional.absent();):

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return Optional.absent(); 

в то время как то, что вы, вероятно, хотите, чтобы вернуть прежнее состояние:

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return stringOptional; 
Смежные вопросы