2

У меня есть некоторые данные JSON, связанные с продажей, в моем кластере ElasticSearch, и я бы хотел использовать Spark Streaming (используя Spark 1.4.1) для динамического объединения входящих продажи с моего веб-сайта электронной коммерции через Kafka, чтобы иметь текущее представление об общем объеме продаж пользователя (с точки зрения доходов и продуктов).Как загружать данные истории при запуске процесса Spark Streaming и вычислять скользящие агрегаты

Что мне не совсем ясно из документов, которые я прочитал, является то, как я могу загрузить данные истории из ElasticSearch в начале приложения Spark и рассчитать, например, общий доход для каждого пользователя (на основе истории и входящие продажи от Kafka).

У меня есть следующий код (рабочий) для подключения к примеру Кафки и получить JSON документы:

import kafka.serializer.StringDecoder 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.SQLContext 

object ReadFromKafka { 
    def main(args: Array[String]) { 

    val checkpointDirectory = "/tmp" 
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]") 
    val topicsSet = Array("tracking").toSet 

    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(10)) 

    // Create direct kafka stream with brokers and topics 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092") 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet) 

    //Iterate 
    messages.foreachRDD { rdd => 

     //If data is present, continue 
     if (rdd.count() > 0) { 

     //Create SQLContect and parse JSON 
     val sqlContext = new SQLContext(sc) 
     val trackingEvents = sqlContext.read.json(rdd.values) 

     //Sample aggregation of incoming data 
     trackingEvents.groupBy("type").count().show() 

     } 

    } 

    // Start the computation 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

Я знаю, что есть плагин для ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), но это не совсем понятно мне как интегрировать чтение при запуске и процесс вычисления потоковой передачи для агрегирования данных истории с потоковыми данными.

Помощь очень кровопролитная! Заранее спасибо.

ответ

1

RDD являются неизменяемыми, поэтому после их создания вы не можете добавлять к ним данные, например, обновлять доходы новыми событиями.

Что вы можете сделать, это объединить существующие данные с новыми событиями для создания нового RDD, который затем можно использовать в качестве текущей суммы. Например ...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch 
messages.foreachRDD { rdd => 
    currentTotal = currentTotal.union(rdd) 
} 

В этом случае мы делаем currentTotal в var, поскольку она будет заменена ссылкой на новый РД, когда он получает unioned с поступающими данными.

После объединения вы можете выполнить некоторые дополнительные операции, такие как уменьшение значений, принадлежащих одному и тому же ключу, но вы получите изображение.

Если вы используете эту технику, обратите внимание, что линия ваших RDD будет расти, так как каждый вновь созданный RDD будет ссылаться на своего родителя. Это может привести к возникновению проблемы с потоком в стиле переполнения стека. Чтобы исправить это, вы можете периодически звонить checkpoint() на RDD.

+0

Большое спасибо за ваш ответ. Я уже подозревал, что 'rdd.union()' будет хорошим началом. Будет ли 'updateStateByKey()' также быть способ сделать это? Поскольку я хочу, чтобы сохранить скопления, я думал, что это, возможно, будет удобно ... – Tobi

+0

Я думаю, вы также можете использовать 'updateStateByKey()'. Раньше не было никакого способа указать начальное состояние для 'updateStateByKey()', но я считаю, что это было добавлено, поэтому оно также должно быть жизнеспособным решением. –

+0

Можно ли сделать это с помощью DataFrames? Как-то я не могу заставить его работать ... – Tobi

Смежные вопросы