2015-11-23 2 views
0

У меня есть 2 комплекта, один с положительным и один с отрицательными образцами. Первое значение в массиве - это идентификация элемента, второе значение - сумма таких элементов.Spark - уменьшить с помощью оператора деления

positive: Array[(String, Int)] 

negative: Array[(String, Int)] 

Я хотел бы построить результат массива, который будет содержать имя элемента и его положительного до отрицательного коэффициента как числа с плавающей точкой. Приведенная ниже команда возвращает мне только целочисленное отношение.

val result = positive.union(negativeCount).reduceByKey((a, b) => (a/b) 

Не могли бы вы посоветовать, как сделать соотношение числа поплавка?

Спасибо.

ответ

2

Насколько я понимаю ваши намерения, вы должны использовать join не union

val positive = sc.parallelize(Seq(("a", 1), ("b", 2))) 
val negative = sc.parallelize(Seq(("a", 4), ("b", 1))) 

val ratios = positive 
    .join(negative) 
    .mapValues{case (x: Int, y: Int) => x.toFloat/y} 

ratios.collect 
// Array[(String, Float)] = Array((a,0.25), (b,2.0)) 

С DataFrames:

val ratiosDF = positive.toDF("pk", "pv") 
    .join(negative.toDF("nk", "nv"), $"pk" === $"nk") 
    .select($"pk".alias("k"), $"pv".divide($"nv").alias("v")) 
ratiosDF.show 

// +---+----+ 
// | k| v| 
// +---+----+ 
// | a|0.25| 
// | b| 2.0| 
// +---+----+ 

Использование union, а затем reduceByKey не имеет смысла и не дает никаких сильных гарантий относительно порядка значений.

-1

Выполните одно из чисел с плавающей точкой, используя toFloat

val result = positive.union(negativeCount) 
    .mapValues(_.toFloat) 
    .reduceByKey((a, b) => (a/b)) 
+0

Я пробовал это, но он возвращает меня : 32: ошибка: тип несоответствия; найдено: Float required: Int – volk

+0

О да, потому что для уменьшения по ключу требуется тот же тип вывода, что и тип ввода. Делайте 'map (_. ToFloat)' перед запуском reduceByKey 'val result = positive.union (negativeCount) .mapValues ​​(_. ToFloat) .reduceByKey ((a, b) => (a/b)' –

+0

Спасибо, он работает отлично! – volk