2014-10-23 4 views
8

Контекст: Я использую Apache Spark для агрегирования количества запусков различных типов событий из журналов. Журналы хранятся как в Cassandra для целей исторического анализа, так и в Kafka для анализа в реальном времени. Каждый журнал имеет дату и тип события. Для простоты предположим, что я хотел отслеживать количество журналов одного типа за каждый день.Объединить результаты пакетной RDD с потоковой RDD в Apache Spark

У нас есть два RDD, RDD пакетных данных из Cassandra и еще одно потоковое RDD от Kafka. псевдокод:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type"); 

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() { 
    @Override 
    public Tuple2<String, Integer> call(CassandraRow row) { 
     return new Tuple2<String, Integer>(row.getString("date"), 1); 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}); 

save(batchRDD) // Assume this saves the batch RDD somewhere 

... 

// Assume we read a chunk of logs from the Kafka stream every x seconds. 
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...); 
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() { 
    @Override 
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) { 
     String jsonString = data._2; 
     JSON jsonObj = JSON.parse(jsonString); 
     Date eventDate = ... // get date from json object 
     // Assume startTime is broadcast variable that is set to the time when the job started. 
     if (eventDate.after(startTime.value())) { 
      ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>(); 
      pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1)); 
      return pairs; 
     } else { 
      return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs 
     } 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() { 
    @Override 
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) { 
     Integer previousValue = state.or(0l); 
     Integer currentValue = ... // Sum of counts 
     return Optional.of(previousValue + currentValue); 
    } 
}); 
save(streamRDD); // Assume this saves the stream RDD somewhere 

sc.start(); 
sc.awaitTermination(); 

Вопрос: Как объединить результаты из streamRDD с batchRDD? Допустим, что batchRDD имеет следующие данные и эта работа была запущена на 2014-10-16:

("2014-10-15", 1000000) 
("2014-10-16", 2000000) 

Поскольку запрос Cassandra только включены все данные до времени начала пакетного запроса, мы должны прочитайте из Kafka, когда запрос будет завершен, только учитывая журналы после времени начала работы. Мы предполагаем, что запрос занимает много времени. Это означает, что мне нужно совместить исторические результаты с результатами потоковой передачи.

Для иллюстрации:

|------------------------|-------------|--------------|---------> 
tBatchStart    tStreamStart streamBatch1 streamBatch2 

Тогда предположим, что в первом потоке партии мы получили эти данные:

("2014-10-19", 1000) 

Тогда я хочу, чтобы объединить партии RDD с этим потоком РДУ, так что поток RDD теперь имеет значение:

("2014-10-19", 2001000) 

Тогда предположим, что во второй партии потока w е получил эти данные:

("2014-10-19", 4000) 

Затем поток РДД должен быть обновлен, чтобы иметь значение:

("2014-10-19", 2005000) 

и так далее ...

Можно использовать streamRDD.transformToPair(...) объединить streamRDD данные с данными batchRDD с использованием join, но если мы сделаем это для каждого потока потока, то мы будем добавлять счетчик из batchRDD для каждого потока потока, делая значение состояния «double counted», когда оно должно быть добавлено только к первый поток.

ответ

4

Для решения этого дела, я бы профсоюзу базы РДДА с результатом агрегированного StateDStream который держит итоги потоковых данных. Это фактически обеспечивает базовую линию для данных, сообщаемых в каждом интервале потоковой передачи, без учета упомянутой базовой линии x раз.

Я пробовал эту идею, используя образец WordCount, и он работает.Оставьте это на РЕПЛ для живого примера:

(использование nc -lk 9876 на отдельной оболочке, чтобы обеспечить ввод в socketTextStream)

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7) 
val defaultRdd = sc.parallelize(defaults) 

@transient val ssc = new StreamingContext(sc, Seconds(10)) 
ssc.checkpoint("/tmp/spark") 

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) 
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0)) 
} 
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey(_+_) 

wordCount.print() 
historicCount.print() 
runningTotal.print() 
ssc.start() 
+1

Спасибо. Я хотел бы добавить, что вместо использования 'rdd.union (defaultRdd)' в преобразовании я закончил использование 'rdd.leftOuterJoin (defaultRdd)', так что 'runningTotal' не включает пары, которые не были изменены. Тогда мне нужно сохранить пары, в которых их значения изменились. – Bobby

0

Вы могли бы дать updateStateByKey попробовать:

def main(args: Array[String]) { 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 
     val previousCount = state.getOrElse(0) 
     Some(currentCount + previousCount) 
    } 

    // stream 
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1)) 
    ssc.checkpoint(".") 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc) 
    stateWordCounts.print() 
    ssc.start() 
    ssc.awaitTermination() 
} 
+0

Я уже использую его. Проблема заключается в том, что если необязательное значение состояния равно null, то мне нужно значение по умолчанию. В идеале это будет значение, вычисленное из пакетного RDD. Проблема в том, что 'updateStateByKey()' не проходит в ключе, поэтому я не могу выполнить поиск значения, вычисленного из пакетного RDD. – Bobby

Смежные вопросы