2016-02-21 1 views
1

У меня есть входные линии, как показано нижеСпарк потоковой группы по пользовательской функции

t1, file1, 1, 1, 1 
t1, file1, 1, 2, 3 
t1, file2, 2, 2, 2, 2 
t2, file1, 5, 5, 5 
t2, file2, 1, 1, 2, 2 

и я хочу, чтобы достичь выхода, как показано ниже строк, который представляет собой вертикальное сложение соответствующих чисел.

file1 : [ 1+1+5, 1+2+5, 1+3+5 ] 
file2 : [ 2+1, 2+1, 2+2, 2+2 ] 

Я в контексте искры потокового и я с трудом пытаясь понять, каким образом агрегировать по имени файла.

Кажется, мне нужно будет использовать что-то вроде ниже, я не уверен, как добраться до правильного синтаксиса. Любые входы будут полезны.

myDStream.foreachRDD(rdd => rdd.groupBy()) или myDStream.foreachRDD(rdd => rdd.aggregate())

Я знаю, как сделать вертикальную сумму массива данных чисел, но я не уверен, как кормить эту функцию агрегатора.

def compute_counters(counts : ArrayBuffer[List[Int]]) = { 
    counts.toList.transpose.map(_.sum) 
} 

ответ

2

Во-первых, вам нужно извлечь соответствующий ключ и значения из разделенных запятыми, разобрать их и создать кортеж, содержащий ключ и список целых чисел, используя InputDStream.map. Затем используйте PairRDDFunctions.reduceByKey применить сумму за ключ:

dStream 
.map(line => { 
    val splitLines = line.split(", ") 
    (splitLines(1), splitLines.slice(2, splitLines.length).map(_.toInt)) 
}) 
.reduceByKey((first, second) => (first._1, Array(first._2.sum + second._2.sum)) 
.foreachRDD((key, sum) => println(s"Key: $key, sum: ${sum.head}") 

Уменьш даст кортеж (String, Array[Int]), где строка содержит идентификатор (будь то «test1» или «test2»), и массив с одно значение, содержащее сумму за ключ.

1

Спасибо Yuval, я смог сделать это, используя ваш подход. Обновление моего последнего рабочего кода:

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("HBaseStream") 
    val sc = new SparkContext(conf) 
    // create a StreamingContext, the main entry point for all streaming functionality 
    val ssc = new StreamingContext(sc, Seconds(2)) 
    val inputStream = ssc.socketTextStream("hostname", 9999) 
    val parsedDstream = inputStream 
     .map(line => { 
     val splitLines = line.split(",") 
     (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt)) 
     }) 
     .reduceByKey((first, second) => { 
     val listOfArrays = ArrayBuffer(first, second) 
     listOfArrays.toList.transpose.map(_.sum).toArray 
     }) 
     .foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
} 
Смежные вопросы