2015-09-29 6 views
3

Данные, которые в data.csv файле:Как мы можем сортировать и группировать данные из Spark RDD?

07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 

Это мой код:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

object ScalaApp { 
def main(args: Array[String]) { 
val sc = new SparkContext("local[4]", "Program") 

    // we take the raw data in CSV format and convert it into a 

    val data = sc.textFile("data.csv") 
.map(line => line.split(",")) 

.map(GroupRecord => (GroupRecord(0), 
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))) 

val numPurchases = data.count() 
val d1=data.groupByKey(GroupRecord(2)) // here is the error 

println("No: " + numPurchases) 
println("Grouped Data" + d1) 

} 
} 

Я просто хочу те же данные, что группа по источникам-IP (вторая колонка) и порядка по времени (1-й столбец). Итак, мои требуют данных является:

07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
    07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
    07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
    07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
    07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
    07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
    07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
    07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
    07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
    07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
    07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
    07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
    07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
    07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
    07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
    07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
    07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 
    07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 

, но у меня есть проблемы с моим кодом так плз помочь мне!

ответ

2

Как Глени отметили, вы не создаете ключ-значение пара для операции groupByKey. Однако вы также можете использовать groupBy(_._3), чтобы получить тот же результат. Чтобы отсортировать каждую группу по первому столбцу, вы можете применить flatMapValues после группировки для сортировки элементов в каждой группе. Следующий код делает именно то, что:

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]") 
    val sc = new SparkContext(sparkConf) 

    val data = sc.textFile("data.csv") 
     .map(line => line.split("\\s+")) 
     .map(GroupRecord => (GroupRecord(2), (GroupRecord(0), GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))) 

    // sort the groups by the first tuple field 
    val result = data.groupByKey.flatMapValues(x => x.toList.sortBy(_._1)) 

    // assign the partition ID to each item to see that each group is sorted 
    val resultWithPartitionID = result.mapPartitionsWithIndex((id, it) => it.map(x => (id, x))) 

    // print the contents of the RDD, elements of different partitions might be interleaved 
    resultWithPartitionID foreach println 

    val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2) 

    // print collected results 
    println(collectedResult.mkString("\n")) 
    } 
+1

Thanks @Till Rohrmann –

3

Ваша проблема заключается в том, что ваш второй map создает вместо пары ключ-значение Tuple6, что и требуется, если вы хотите выполнить операцию xxxByKey. Если вы хотите сгруппировать по 2-м столбце, вы должны сделать GroupRecord(1) ключ и значения покоя, а затем вызвать groupByKey, как это:

data 
    .map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))) 
    .groupByKey() 
+1

ошибка по-прежнему дает: _italic_ ** val data = sc.textFile ("data.csv"). map (line => line.split (",")) .map (GroupRecord => (GroupRecord (1), (GroupRecord (0), GroupRecord (2), GroupRecord (3), GroupRecord (4), GroupRecord (5))). GroupByKey() println («Сгруппированные данные» + данные) ** –

+0

Вы не можете напечатать такой RDD. Вместо этого попробуйте 'data foreach println'. –

0

Здесь мы требуем, чтобы преобразовать его в ключевом, пар значений, чтобы применить механизм GroupByKey и после того, что значения будут получать преобразования в Iterable установить и применить сортировку к значениям каждому мы должны преобразовать его в последовательность, а затем применить функцию сортировки, и после этого функция flatMap сгладит последовательные значения для наборов строк.

data.csv ->

07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48 
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66 
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47 
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49 
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33 
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48 
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48 

код ->

val data = sc.textFile("src/Data.csv") 
    .map(line => { 
    val GroupRecord = line.split("\t") 

    ((GroupRecord(1)), (GroupRecord(0), GroupRecord(2), GroupRecord(3), GroupRecord(4), GroupRecord(5))) 
    }) 

val numPurchases = data.count() 

val d1 = data.groupByKey().map(f => (f._1, f._2.toSeq.sortBy(f => f._1))).flatMapValues(f => f).map(f => (f._2._1, f._1, f._2._2, f._2._3, f._2._4, f._2._5)) 

d1 foreach (println(_)) 

println("No: " + numPurchases) 
+1

Downvoted для пояснений к изменению –

+1

Ошибка: [trace] Stack trace suppressed: run last compile: run for the full output. [ошибка] (скомпилируйте: запустите) Неверный код выхода: 1 [error] Общее время: 13 с, завершено 30 сентября 2015 г. 10:44:20 –

+1

Sangeen, пожалуйста, ознакомьтесь с выполненными изменениями. –

3

Вышеупомянутое решение будет работать нормально. Но при увеличении данных большого размера я не уверен, как и нужно обращаться с перестановками

Лучший способ создать dataframe и используя порядок sqlContext BY IP-адреса и времени

Смежные вопросы