2015-06-24 2 views
3

Я новичок в искру, и я пытаюсь достичь определенного манипулирования данными, основанный на подсчете - проблема, как это - у меня есть текстовый файл с информацией, которая выглядит следующим образом -Манипулирование данными в Спарк

john, apple 
john, apple 
john, orange 
jill, apple 
jill, orange 
jill, orange 

что я хочу сделать, просто: я хочу посчитать количество раз, когда каждый плод появляется для каждого человека и делит это число на общее количество плодов среди двух человек, поэтому результат будет выглядеть так:

john, apple, 2, 3 
jill, apple, 1, 3 
john, orange, 1, 3 
jill orange, 2, 3 

Затем я могу разделить ряд 3 на ряд 4 для этого конечного продукта -

john, apple, 2, 3, 2/3 
jill, apple, 1, 3, 1/3 
john, orange, 1, 3, 1/3 
jill orange, 2, 3, 2/3 

Я попробовал несколько вещей в scala, как это -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1))) 
persons.map{case(person, fruit)=>((person, fruit), 1)}.reduceByKey(_+_).collect 

Выходной сигнал этого обеспечивает -

((jill,orange),2) 
((jill,apple),1) 
((john,orange),1) 
((john,apple),2) 

Это похоже на хороший старт, но тогда я не знать, как исходить отсюда. Любая помощь или подсказки были бы высоко оценены!

UPDATE:

У меня есть предлагаемое решение этой проблемы -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1))) 

var count = persons.map{case(name, fruit)=>((name,fruit),1)}.reduceByKey(_+_) 

var total = persons.map{case(name, fruit)=>(fruit,1)}.reduceByKey(_+_) 

var fruit = count.map{case((name, fruit), count)=>(fruit, (name, count))} 

fruit.join(total).map{case((fruit,((name, count), total)))=>(name, fruit, count, total, count.toDouble/total.toDouble)}.collect.foreach(println) 

Выход для этого лестницу кода искры -

(jill,orange,2,3,0.6666666666666666) 
(john,orange,1,3,0.3333333333333333) 
(jill,apple,1,3,0.3333333333333333) 
(john,apple,2,3,0.6666666666666666) 
+0

Я думаю, что вы не можете не делать две функции агрегации. Один для подсчета частот первого столбца, а второй - для частот пар (как вы в своем примере). Затем вы можете объединить два агрегата в конце. – marios

ответ

1

Одно из возможных решений:

def getFreqs(x: String, vals: Iterable[String]) = { 
    val counts = vals.groupBy(identity).mapValues(_.size) 
    val sum = counts.values.sum.toDouble 
    counts.map { case (k, v) => (x, k, v, sum.toInt, v/sum) } 
} 

persons.groupByKey.flatMap { case(k, v) => getFreqs(k, v) } 

И еще:

val fruitsPerPerson = sc.broadcast(persons.countByKey) 

persons.groupBy(identity).map { case (k, v) => { 
    val sum: Float = fruitsPerPerson.value.get(k._1) match { 
     case Some(x) => x 
     case _ => 1 
    } 
    (k._1, k._2, v.size, sum.toInt, v.size/sum) 
}} 

Оба groupByKey и groupBy могут быть весьма неэффективны, так что если вы ищете более надежное решение, вы можете рассмотреть возможность использования combineByKey:

def create(value: String) = Map(value -> 1) 

def mergeVals(x: Map[String, Int], value: String) = { 
    val count = x.getOrElse(value, 0) + 1 
    x ++ Map(value -> count) 
} 

def mergeCombs(x: Map[String, Int], y: Map[String, Int]) = { 
    val keys = x.keys ++ y.keys 
    keys.map((k: String) => (k -> (x.getOrElse(k, 0) + y.getOrElse(k, 0)))).toMap 
} 

val counts = persons.combineByKey(create, mergeVals, mergeCombs) 

counts.flatMap { case (x: String, counts: Map[String, Int]) => { 
    val sum = counts.values.sum.toDouble 
    counts.map { case (k: String, v: Int) => (x, k, v, sum.toInt, v/sum) } 
}} 
+0

Спасибо за ваш ответ! Я также новичок в scala, возможно, вы можете немного объяснить, что такое 'vals: Iterable [String]' используется и как он работает в коде? – RDizzl3

+0

'groupByKey' transforms принимает' (RDD [(K, T)]) 'и возвращает' RDD [(K Iterable [T])] ', co в вашем случае вы получаете' RDD [(String, Iterable [String]) ] '. Следовательно, 'vals: Iterable [String]', как аргумент. – zero323

+0

большое спасибо! Это отличная помощь! – RDizzl3

Смежные вопросы