2015-12-14 6 views
0

Мне нужно отсортировать ключи в RDD, но нет естественного порядка сортировки (не по возрастанию или по убыванию). Я даже не знаю, как написать Comparator для этого. Скажем, у меня была карта яблок, груш, апельсинов и винограда, я хочу сортировать апельсины, яблоки, виноград и груши.Сортировка ключей в RDD

Любые идеи о том, как это сделать в Spark/Scala? Благодаря!

ответ

4

В Scala вам нужно найти признак Ordering[T], а не интерфейс Comparator - в основном косметическая разница, так что фокус находится на атрибуте данных, а не на вещи, которая сравнивает два экземпляра данных. Для реализации признака необходимо определить метод compare(T,T). Очень явный вариант перечисленном сравнения может быть:

object fruitOrdering extends Ordering[String] { 
    def compare(lhs: String, rhs: String): Int = (lhs, rhs) match { 
    case ("orange", "orange") => 0 
    case ("orange", _)  => -1 
    case ("apple", "orange") => 1 
    case ("apple", "apple") => 0 
    case ("apple", _)   => -1 
    case ("grape", "orange") => 1 
    case ("grape", "apple") => 1 
    case ("grape", "grape") => 0 
    case ("grape", _)   => -1 
    case ("pear", "orange") => 1 
    case ("pear", "apple") => 1 
    case ("pear", "grape") => 1 
    case ("pear", "pear")  => 0 
    case ("pear", _)   => -1 
    case _ => 0 
    } 
} 

Или, слегка адаптировать zero323's answer:

object fruitOrdering2 extends Ordering[String] { 
    private val values = Seq("orange", "apple", "grape", "pear") 
    // generate the map based off of indices so we don't have to worry about human error during updates 
    private val ordinalMap = values.zipWithIndex.toMap.withDefaultValue(Int.MaxValue) 

    def compare(lhs: String, rhs: String): Int = ordinalMap(lhs).compare(ordinalMap(rhs)) 
} 

Теперь, когда у вас есть экземпляр Ordering[String], вам необходимо сообщить метод sortBy использовать этот а не встроенный. Если вы посмотрите на подписи для RDD#sortBy вы увидите полной подпись

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

что во втором списке параметров неявного Ordering[K] обычно посмотрело компилятор для предопределенных порядков - вот как она знает, что естественный порядок должен быть. Однако любому неявному параметру может быть присвоено явное значение. Обратите внимание: если вы укажете одно неявное значение, то вам нужно предоставить все, поэтому в этом случае нам также необходимо предоставить ClassTag[K]. Это всегда генерируется компилятором, но может быть легко явным образом сгенерировано с использованием scala.reflect.classTag.

Указание все, что призывание будет выглядеть следующим образом:

import scala.reflect.classTag 
rdd.sortBy { case (key, _) => key }(fruitOrdering, classOf[String]) 

Это все еще довольно грязно, хотя, не так ли? К счастью, мы можем использовать неявные классы, чтобы забрать много трещин. Вот отрывок, который я использую довольно часто:

package com.example.spark 

import scala.reflect.ClassTag 
import org.apache.spark.rdd.RDD 

package object implicits { 
    implicit class RichSortingRDD[A : ClassTag](underlying: RDD[A]) { 
    def sorted(implicit ord: Ordering[A]): RDD[A] = 
     underlying.sortBy(identity)(ord, implicitly[ClassTag[A]]) 

    def sortWith(fn: (A, A) => Int): RDD[A] = { 
     val ord = new Ordering[A] { def compare(lhs: A, rhs: A): Int = fn(lhs, rhs) } 
     sorted(ord) 
    } 
    } 

    implicit class RichSortingPairRDD[K : ClassTag, V](underlying: RDD[(K, V)]) { 
    def sortByKey(implicit ord: Ordering[K]): RDD[(K, V)] = 
     underlying.sortBy { case (key, _) => key } (ord, implicitly[ClassTag[K]]) 

    def sortByKeyWith(fn: (K, K) => Int): RDD[(K, V)] = { 
     val ord = new Ordering[K] { def compare(lhs: K, rhs: K): Int = fn(lhs, rhs) } 
     sortByKey(ord) 
    } 
    } 
} 

И в действии:

import com.example.spark.implicits._ 

val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6))) 
rdd.sortByKey(fruitOrdering).collect 
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3)) 

rdd.sortByKey.collect // Natural ordering by default 
// Array[(String, Double)] = Array((apple,5.0), (grape,0.3), (orange,5.6)) 

rdd.sortWith(_._2 compare _._2).collect // sort by the value instead 
// Array[(String, Double)] = Array((grape,0.3), (apple,5.0), (orange,5.6)) 
+0

Удивительный ответ. благодаря – user1660256

1

Я не знаю, о искры, но с чистой коллекции Scala, которые бы

_.sortBy(_.fruitType) 

Например,

val l: List[String] = List("the", "big", "bang") 
val sortedByFirstLetter = l.sortBy(_.head) 
// List(big, bang, the) 
+0

Спасибо за быстрый ответ, но я не совсем понимаю. Как это позволяет мне указать, какую часть фруктов я хочу на первом месте, и так далее? – user1660256

+0

Извините, я случайно написал 'groupBy' вместо' sortBy'. Обновлен ответ. – VasyaNovikov

1

Существует sortBy метод Спарк, который позволяет определить произвольный порядок и хотите ли вы восходящего или нисходящего. Например.

scala> val rdd = sc.parallelize(Seq (("a", 1), ("z", 7), ("p", 3), ("a", 13) )) 
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[331] at parallelize at <console>:70 

scala> rdd.sortBy(_._2, ascending = false) .collect.mkString("\n") 
res34: String = 
(a,13) 
(z,7) 
(p,3) 
(a,1) 

scala> rdd.sortBy(_._1, ascending = false) .collect.mkString("\n") 
res35: String = 
(z,7) 
(p,3) 
(a,1) 
(a,13) 

scala> rdd.sortBy 

def sortBy[K](f: T => K, ascending: Boolean, numPartitions: Int)(implicit ord: scala.math.Ordering[K], ctag: scala.reflect.ClassTag[K]): RDD[T] 

Последняя часть сообщает вам, что такое подпись sortBy. Порядок, используемый в предыдущих примерах, - это первая и вторая части пары.

Edit: ответил слишком быстро, без проверки на ваш вопрос, извините ... Во всяком случае, вы должны определить свой порядок, как в вашем примере:

def myord(fruit:String) = fruit match { 
    case "oranges" => 1 ; 
    case "apples" => 2; 
    case "grapes" =>3; 
    case "pears" => 4; 
    case _ => 5} 

val rdd = sc.parallelize(Seq("apples", "oranges" , "pears", "grapes" , "other")) 

Тогда результат упорядочения будет:

scala> rdd.sortBy[Int](myord, ascending = true).collect.mkString("\n") 
res1: String = 
oranges 
apples 
grapes 
pears 
other 
+0

Это выглядит очень близко к тому, что я хочу. Но вместо Seq у меня есть карта плодов («яблоки» -> «хорошие», «апельсины» -> «лучше», «груши» -> «великие»). Так как же я тогда сортировать k из k, v? – user1660256

2

Если единственным способом вы можете описать порядок является перечисление, то просто перечислить:

val order = Map("orange" -> 0L, "apple" -> 1L, "grape" -> 2L, "pear" -> 3L) 
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6))) 
val sorted = rdd.sortBy{case (key, _) => order.getOrElse(key, Long.MaxValue)} 
sorted.collect 

// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3)) 
+0

Да, я так думал! Но я уже работаю с ключами -> парами значений. Где я могу добавить счетчик? – user1660256

+0

Если он будет повторно использован, вам следует подумать о создании переменной широковещания, иначе код, как указано выше, должен работать нормально. Любая переменная, на которую ссылаются внутренние блокировки, автоматически сериализуется и передается всем работникам. – zero323

Смежные вопросы