2

В кассандре у меня есть тип столбца списка. Я новичок в искры и scala, и не знаю, с чего начать. В искры я хочу получить количество всех значений, возможно ли это сделать. Ниже dataframeКоличество значений в искровом окне - dataframe

+--------------------+------------+ 
|     id|  data| 
+--------------------+------------+ 
|53e5c3b0-8c83-11e...|  [b, c]| 
|508c1160-8c83-11e...|  [a, b]| 
|4d16c0c0-8c83-11e...| [a, b, c]| 
|5774dde0-8c83-11e...|[a, b, c, d]| 
+--------------------+------------+ 

Я хочу выход как

+--------------------+------------+ 
| value   |  count | 
+--------------------+------------+ 
|a     |  3  | 
|b     |  4  | 
|c     |  3  | 
|d     |  1  | 
+--------------------+------------+ 

искрового версии: 1,4

ответ

4

Здесь вы идете:

scala> val rdd = sc.parallelize(
    Seq(
    ("53e5c3b0-8c83-11e", Array("b", "c")), 
    ("53e5c3b0-8c83-11e1", Array("a", "b")), 
    ("53e5c3b0-8c83-11e2", Array("a", "b", "c")), 
    ("53e5c3b0-8c83-11e3", Array("a", "b", "c", "d")))) 
// rdd: org.apache.spark.rdd.RDD[(String, Array[String])] = ParallelCollectionRDD[22] at parallelize at <console>:27 

scala> rdd.flatMap(_._2).map((_, 1)).reduceByKey(_ + _) 
// res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:30 

scala> rdd.flatMap(_._2).map((_,1)).reduceByKey(_ + _).collect 
// res16: Array[(String, Int)] = Array((a,3), (b,4), (c,3), (d,1)) 

Это также на самом деле довольно легко с DataFrame API:

scala> val df = rdd.toDF("id", "data") 
// res12: org.apache.spark.sql.DataFrame = ["id": string, "data": array<string>] 

scala> df.select(explode($"data").as("value")).groupBy("value").count.show 
// +-----+-----+ 
// |value|count| 
// +-----+-----+ 
// | d| 1| 
// | c| 3| 
// | b| 4| 
// | a| 3| 
// +-----+-----+ 
2

Вам нужно что-то вроде этого (от Apache Spark Examples):

val textFile = sc.textFile("hdfs://...") 
val counts = textFile 
      .flatMap(line => line.split(" ")) 
      .map(word => (word, 1)) 
      .reduceByKey(_ + _) 

догадываясь, что у вас уже есть пары, .reduceByKey (_ + _) вернет то, что вы необходимость.

Вы также можете попробовать в искровой оболочки что-то вроде этого:

sc.parallelize(Array[Integer](1,1,1,2,2),3).map(x=>(x,1)).reduceByKey(_+_).foreach(println) 
+0

вы можете проверить правильность –

Смежные вопросы