Контекст: Я использую Apache Spark для агрегирования количества запусков различных типов событий из журналов. Журналы хранятся как в Cassandra для целей исторического анализа, так и в Kafka для анализа в реальном времени. Каждый журнал имеет дату и тип события. Для простоты предположим, что я хотел отслеживать количество журналов одного типа за каждый день.Объединить результаты пакетной RDD с потоковой RDD в Apache Spark
У нас есть два RDD, RDD пакетных данных из Cassandra и еще одно потоковое RDD от Kafka. псевдокод:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
Вопрос: Как объединить результаты из streamRDD с batchRDD? Допустим, что batchRDD
имеет следующие данные и эта работа была запущена на 2014-10-16:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
Поскольку запрос Cassandra только включены все данные до времени начала пакетного запроса, мы должны прочитайте из Kafka, когда запрос будет завершен, только учитывая журналы после времени начала работы. Мы предполагаем, что запрос занимает много времени. Это означает, что мне нужно совместить исторические результаты с результатами потоковой передачи.
Для иллюстрации:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
Тогда предположим, что в первом потоке партии мы получили эти данные:
("2014-10-19", 1000)
Тогда я хочу, чтобы объединить партии RDD с этим потоком РДУ, так что поток RDD теперь имеет значение:
("2014-10-19", 2001000)
Тогда предположим, что во второй партии потока w е получил эти данные:
("2014-10-19", 4000)
Затем поток РДД должен быть обновлен, чтобы иметь значение:
("2014-10-19", 2005000)
и так далее ...
Можно использовать streamRDD.transformToPair(...)
объединить streamRDD данные с данными batchRDD с использованием join
, но если мы сделаем это для каждого потока потока, то мы будем добавлять счетчик из batchRDD для каждого потока потока, делая значение состояния «double counted», когда оно должно быть добавлено только к первый поток.
Спасибо. Я хотел бы добавить, что вместо использования 'rdd.union (defaultRdd)' в преобразовании я закончил использование 'rdd.leftOuterJoin (defaultRdd)', так что 'runningTotal' не включает пары, которые не были изменены. Тогда мне нужно сохранить пары, в которых их значения изменились. – Bobby