2015-07-24 4 views
2

Мне нужно подсчитать время, когда в каждом столбце встречается пример «2».Spark Scala суммирует элементы столбца набора данных?

Мой набор данных имеет такую ​​структуру:

1 1 2 0 0 0 2 
0 2 0 1 1 1 1 
1 2 1 0 2 2 2 
0 0 0 0 1 1 2 

Я импортирован файл:

val ip = sc.textFile("/home/../data-scala.txt").map(line => line.split(" ")) 

Как я могу суммировать значение, равное "2" в каждом столбце? я бы ожидать, чтобы иметь результат массив элементов как

[0,2,1,0,1,1,3] 
+0

это как вы хотите получить ответ [0, 2, 1,0,1,1,3]? –

+3

Вы можете [транспонировать] (http://stackoverflow.com/questions/29390717/how-to-transpose-an-rdd-in-spark) свой RDD, а затем подсчитать '' '' '' '' 's' .count (_ == "2")). –

+0

@PeterNeyens Транспонирование дорогое, не всегда выполнимое, и в этом нет необходимости. – zero323

ответ

2

Вы можете map существование 2 в каждой позиции первого, давая вам

[ 0 0 1 0 0 0 1 ] 
[ 0 1 0 0 0 0 0 ] 
[ 0 1 0 0 1 1 1 ] 
[ 0 0 0 0 0 0 1 ] 

Тогда просто свертка постепенно SUM каждый столбец ,

без привлечения Спарк, это выглядит примерно так:

val list = Seq(
    Seq(1, 1, 2, 0, 0, 0, 2), 
    Seq(0, 2, 0, 1, 1, 1, 1), 
    Seq(1, 2, 1, 0, 2, 2, 2), 
    Seq(0, 0, 0, 0, 1, 1, 2) 
) 

list. 
    map(_.map(v => if(v == 2) 1 else 0)). 
    reduce((a,b) => a.zip(b).map(t => t._1 +t._2)) 

Нахождение оптимальной версии этого однострочника, вероятно, немного кода гольф вызов.

+0

Это аккуратно сделано , –

3

Как о чем-то вроде этого:

import breeze.linalg.DenseVector 

def toInd(s: String): DenseVector[Int] = { 
    DenseVector[Int](s.split(" ").map(x => if(x == "2") 1 else 0)) 
} 

sc.textFile("/path/to/file").map(toInd).reduce(_ + _) 

Если вы предполагаете значительное количество столбцов, сумма которых равна нулю, вы можете заменить DenseVector с SparseVector.

Для решения вышеизложенного требуется новый объект DenseVector для каждого элемента RDD. По причине производительности вы можете рассмотреть возможность использования aggregate и векторной мутации:

def seqOp(acc: DenseVector[Int] , cols: Array[String]): DenseVector[Int] = { 
    cols.zipWithIndex.foreach{ case (x, i) => if(x == "2") acc(i) += 1} 
    acc 
} 

def combOp(acc1: DenseVector[Int], acc2: DenseVector[Int]): DenseVector[Int] = { 
    acc1 += acc2 
    acc1 
} 

val n = ip.first.length 
ip.aggregate(DenseVector.zeros[Int](n))(seqOp, combOp) 

Вы можете легко заменить DenseVector с редкой один или scala.collection.mutable.Map если вы хотите.

Если вы спросите меня, это довольно уродливо, поэтому я предоставляю его только для того, чтобы сделать ответ полным.

Смежные вопросы