2016-11-23 4 views
-2

Я новичок в Apache Spark.Агрегация над искровым потоком

Мой код Scala использует сообщения JSON как строки из темы Kafka в Apache Spark.

Теперь я хочу заполнить определенное поле в моем JSON. Какие у меня варианты?

+0

http://spark.apache.org/docs/latest/streaming-programming-guide.html – maasg

ответ

1

Вы можете поместить JSON в dataframe/dataset и выполнить следующие агрегатные операции.

  • группеПо
  • groupByKey
  • Накопительный
  • куб

Спарк SQL может автоматически вывести схему набора данных JSON и загрузить его как Dataset [Роу]. Это преобразование может быть выполнено с использованием SparkSession.read.json() либо в RDD String, либо в файле JSON.

val json_path = "dir/example.json" 
val jsonDF = spark.read.json(json_path) 
jsonDF.groupBy("col1").count().show() 
+0

благодаря @Arvind .. это работает для меня ... теперь есть еще одна проблема. Я хочу, чтобы среднее значение определенного числового поля превышало весь пакетный интервал, но данные в этих временных окнах разделены на несколько RDD. И я могу сделать все эти компиляции по foreachRDD .. есть ли способ, которым я могу применить что-то на всех RDD вместе? –

+1

Вы можете объединить все все RDD или Dataframes, зарегистрировать его как таблицу Temp и выполнить SQL для выполнения агрегированных операций. –

Смежные вопросы