2014-11-03 3 views
3

Работая с парами JavaPairRDD (ключ, значение), я хотел бы обрабатывать значения, связанные с каждым ключом в определенном порядке (компаратор значений). Возможно ли это в Apache Spark?Вторичный сорт в Spark

Используя Hadoop, я бы использовал шаблон secondary sort. Я ищу решение, которое может обрабатывать набор значений, которые не подходят в памяти (даже набор значений с одним и тем же ключом).

+0

Что касается вашей первой потребности, единственным способом, я знаю будет использовать значение ключа шаблон преобразования (я попробовал это в моей юности, см http://codereview.stackexchange.com/questions/56641/producing-a-sorted-wordcount-with-spark). Однако я не уверен, что он подходит для использования с большой памятью. – fxm

ответ

3

Существует проблема с добавлением дополнительной функции сортировки. До тех пор способ вторичной сортировки

rdd.map(row => (row.value, row.key)).sortByKey().map(row => (row.value, row.key)) 

sortByKey не слипаются ключи, чтобы вы могли иметь кратные одного и того же значения.

+0

Сортировка по ключу идет намного дальше, чем 1.2 – aaronman

+0

Сортировка по ключу всегда там, но сортировка по значению будет введена. – Anant

+0

где вы видите это, просто любопытно – aaronman

4

Вот реализация от Sandy Ryza продвинутая Analytics с искрой:

github

я переименовал некоторые переменные и добавлены некоторые комментарии, так что имеет смысл в более общем контексте (этот фрагмент используется в книга для анализа данных такси и некоторые переменные были названы соответственно).

def groupByKeyAndSortValues[K: Ordering : ClassTag, V: ClassTag, S](
    rdd: RDD[(K,V)], 
    secondaryKeyFunc: (V) => S, 
    splitFunc: (V, V) => Boolean, 
    numPartitions: Int): RDD[(K, List[V])] = { 
    // Extract the secondary key by applying a function to the value. 
    val presess = rdd.map { 
     case (key, value) => { 
     ((key, secondaryKeyFunc(value)), value) 
     } 
    } 
    // Define a partitioner that gets a partition by the first 
    // element of the new tuple key. 
    val partitioner = new FirstKeyPartitioner[K, S](numPartitions) 

    // Set the implicit ordering by the first element of the new 
    // tuple key 
    implicit val ordering: Ordering[(K, S)] = Ordering.by(_._1) 

    presess.repartitionAndSortWithinPartitions(partitioner).mapPartitions(groupSorted(_, splitFunc)) 
    } 
    /** 
    * Groups the given iterator according to the split function. Assumes 
    * the data comes in sorted. 
    */ 
    def groupSorted[K, V, S](
    it: Iterator[((K, S), V)], 
    splitFunc: (V, V) => Boolean): Iterator[(K, List[V])] = { 

    val res = List[(K, ArrayBuffer[V])]() 
    it.foldLeft(res)((list, next) => list match { 
     case Nil => { 
     val ((key, _), value) = next 
     List((key, ArrayBuffer(value))) 
     } 
     case cur :: rest => { 
     val (curKey, valueBuf) = cur 
     val ((key, _), value) = next 
     if (!key.equals(curLic) || splitFunc(valueBuf.last, value)) { 
      (key, ArrayBuffer(value)) :: list 
     } else { 
      valueBuf.append(value) 
      list 
     } 
     } 
    }).map { case (key, buf) => (key, buf.toList) }.iterator 
    } 

Вот Разметка:

class FirstKeyPartitioner[K1, K2](partitions: Int) extends 
     Partitioner { 
    val delegate = new HashPartitioner(partitions) 
    override def numPartitions = delegate.numPartitions 
    override def getPartition(key: Any): Int = { 
     val k = key.asInstanceOf[(K1, K2)] 
     delegate.getPartition(k._1) 
    } 

    } 
+1

, даже если эта ссылка содержит ответ, лучше скопировать наиболее важные части здесь. – MZaragoza

+0

Правильно, я помнил это сразу после публикации, но отвлекался. Я продолжу и отредактирую его сейчас, спасибо за напоминание! – ryjm

Смежные вопросы