2014-10-28 3 views
0

Мне нужно сгруппировать набор строк csv определенным столбцом и выполнить некоторую обработку в каждой группе.Обработка данных Spark с группировкой

JavaRDD<String> lines = sc.textFile 
         ("somefile.csv"); 
       JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser()); 
       List<String> keys = pairRDD.keys().distinct().collect(); 
       for (String key : keys) 
       { 
       List<String> rows = pairRDD.lookup(key); 

      noOfVisits = rows.size(); 
      country = COMMA.split(rows.get(0))[6]; 
      accessDuration = getAccessDuration(rows,timeFormat); 
      Map<String,Integer> counts = getCounts(rows); 
      whitepapers = counts.get("whitepapers"); 
      tutorials = counts.get("tutorials"); 
      workshops = counts.get("workshops"); 
      casestudies = counts.get("casestudies"); 
      productPages = counts.get("productpages");   
      } 

    private static long dateParser(String dateString) throws ParseException { 
     SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma"); 
     Date date = format.parse(dateString); 
     return date.getTime(); 
    } 
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches. 

параRDD.lookup чрезвычайно медленный .. Есть ли лучший способ сделать это с помощью искры.

ответ

2

Я думаю, вы могли бы просто использовать эту колонку в качестве ключа и сделать groupByKey. В этих строках нет упоминания об операции. Если это функция, которая каким-то образом объединяет эти строки, вы даже можете использовать reduceByKey.

Что-то вроде:

import org.apache.spark.SparkContext._ // implicit pair functions 
val pairs = lines.map(parser _) 
val grouped = pairs.groupByKey 
// here grouped is of the form: (key, Iterator[String]) 

* EDIT * После просмотра этого процесса, я думаю, было бы более эффективным для отображения каждой строки в данные, которые он вносит свой вклад, а затем использовать aggregateByKey, чтобы уменьшить их всех к итогу. aggregateByKey занимает 2 функции и нуль:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, 
     combOp: (U, U) => U): RDD[(K, U)] 

Первой функцией является разбиение агрегатора и будет эффективно работать через локальные разделы, создавая локальные агрегированные партиалы за перегородку. Комбинация будет выполнять те частичные агрегации и объединить их для получения конечного результата.

Что-то вроде этого:

val lines = sc.textFile("somefile.csv") 
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...)) 

val records = lines.map(parse(_)) 

val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty), 
(record, (count, countrySet, minTime, maxTime, counterMap)) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...) 
(cumm1, cumm2) => ??? // add each field of the cummulator 
) 

Это наиболее эффективный метод в Искре, чтобы сделать ключевые основе агрегирования.

+0

Я уже пробовал это. Он еще медленнее. Одна операция разбора столбца даты для каждой группы для вычисления продолжительности. – lochi

+0

Не могли бы вы добавить подробности к вопросу о выполняемой операции над значениями для каждого ключа? 'reduceByKey' более эффективен, чем' groupByKey' и может быть лучшим выбором. – maasg

+0

смотрите выше .. спасибо. – lochi

Смежные вопросы