У меня есть некоторые данные JSON, связанные с продажей, в моем кластере ElasticSearch, и я бы хотел использовать Spark Streaming (используя Spark 1.4.1) для динамического объединения входящих продажи с моего веб-сайта электронной коммерции через Kafka, чтобы иметь текущее представление об общем объеме продаж пользователя (с точки зрения доходов и продуктов).Как загружать данные истории при запуске процесса Spark Streaming и вычислять скользящие агрегаты
Что мне не совсем ясно из документов, которые я прочитал, является то, как я могу загрузить данные истории из ElasticSearch в начале приложения Spark и рассчитать, например, общий доход для каждого пользователя (на основе истории и входящие продажи от Kafka).
У меня есть следующий код (рабочий) для подключения к примеру Кафки и получить JSON документы:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
def main(args: Array[String]) {
val checkpointDirectory = "/tmp"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
val topicsSet = Array("tracking").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Create SQLContect and parse JSON
val sqlContext = new SQLContext(sc)
val trackingEvents = sqlContext.read.json(rdd.values)
//Sample aggregation of incoming data
trackingEvents.groupBy("type").count().show()
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
Я знаю, что есть плагин для ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), но это не совсем понятно мне как интегрировать чтение при запуске и процесс вычисления потоковой передачи для агрегирования данных истории с потоковыми данными.
Помощь очень кровопролитная! Заранее спасибо.
Большое спасибо за ваш ответ. Я уже подозревал, что 'rdd.union()' будет хорошим началом. Будет ли 'updateStateByKey()' также быть способ сделать это? Поскольку я хочу, чтобы сохранить скопления, я думал, что это, возможно, будет удобно ... – Tobi
Я думаю, вы также можете использовать 'updateStateByKey()'. Раньше не было никакого способа указать начальное состояние для 'updateStateByKey()', но я считаю, что это было добавлено, поэтому оно также должно быть жизнеспособным решением. –
Можно ли сделать это с помощью DataFrames? Как-то я не могу заставить его работать ... – Tobi