2015-09-02 2 views
0

Так что название этого должно быть достаточно запутанным, поэтому я сделаю все возможное, чтобы объяснить. Я пытаюсь разбить эту функцию на определенные функции, чтобы лучше видеть, как aggregateByKey работает для других команд, которые будут писать мой код. У меня есть следующий агрегат:Spark aggregateByKey использует карту и определяет типы данных для функций

val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
     (accumCount, value) => accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
     }, 
     (accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
    ).collect() 

Я давно хотел, чтобы разорвать этот следующим образом:

val firstLet = Map[Char, Int] 
    def fSeq(accumCount:?, value:?) = { 
    accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
    } 
    } 
    def fComb(accum1:?, accum2:?) = { 
    accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0)) 
    } 

В связи с начальным значением ЯВЛЯЮЩЕЙСЯ Map [Char, Int] Я не уверен, что для создания типов учетных данных, значений для определения. Я пробовал разные вещи, но ничего не получается. Может ли кто-нибудь помочь мне определить типы данных и объяснить, как вы это определили?

+0

И что здесь вводит? 'RDD [(T, String)]'? – zero323

ответ

1
  • seqOp принимает аккумулятор того же типа в качестве начального значения в качестве первого аргумента и значение того же типа, как значения в вашем RDD.
  • combOp принимает два аккумулятора одного и того же типа начального значения.

Предполагая, что вы хотите агрегировать RDD[(T,U)]:

def fSeq(accumCount: Map[Char, Int], value: U): Map[Char, Int] = ??? 
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = ??? 

Я думаю, в вашем случае U просто, как String, так что вы должны настроить fSeq подпись.

Кстати, вы можете использовать обеспечить отображение по умолчанию и упростить функции:

val firstLet = Map[Char, Int]().withDefault(x => 0) 

def fSeq(accumCount: Map[Char, Int], value: String): Map[Char, Int] = { 
    accumCount + (value.head -> (accumCount(value.head) + 1)) 
} 

def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = { 
    val accum = (accum1.keys ++ accum2.keys).map(k => (k, accum1(k) + accum2(k))) 
    accum.toMap.withDefault(x => 0) 
} 

Наконец, может быть более эффективно использовать scala.collection.mutable.Map:

import scala.collection.mutable.{Map => MMap} 

def firstLetM = MMap[Char, Int]().withDefault(x => 0) 

def fSeqM(accumCount: MMap[Char, Int], value: String): MMap[Char, Int] = { 
    accumCount += (value.head -> (accumCount(value.head) + 1)) 
} 

def fCombM(accum1: MMap[Char, Int], accum2: MMap[Char, Int]): MMap[Char, Int] = { 
    accum2.foreach{case (k, v) => accum1 += (k -> (accum1(k) + v))} 
    accum1 
} 

Тест:

def randomChar() = (scala.util.Random.nextInt.abs % 58 + 65).toChar 
def randomString() = { 
    (Seq(randomChar) ++ Iterator.iterate(randomChar)(_ => randomChar) 
     .takeWhile(_ => scala.util.Random.nextFloat > 0.1)).mkString 
} 

val stringRdd = sc.parallelize(
    (1 to 500000).map(_ => (scala.util.Random.nextInt.abs % 60, randomString))) 


val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
    (accumCount, value) => accumCount.get(value.head) match { 
    case None => accumCount + (value.head -> 1) 
    case Some(count) => accumCount + (value.head -> (count + 1)) 
    }, 
    (accum1, accum2) => accum1 ++ accum2.map{ 
    case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
).collectAsMap() 

val firstLetter2 = stringRDD 
    .aggregateByKey(firstLet)(fSeq, fComb) 
    .collectAsMap 

val firstLetter3 = stringRDD 
    .aggregateByKey(firstLetM)(fSeqM, fCombM) 
    .mapValues(_.toMap) 
    .collectAsMap 


firstLetter == val firstLetter2 
firstLetter == val firstLetter3 
+0

Благодарим за добавленное обновление, которое упрощает чтение и использование кода. – theMadKing

+0

Кстати, этот код не работает должным образом, старый код подсчитывал все ключи и coutns, этот код по какой-либо причине останавливается на 50 отсчетов. Мой старый код этого не делает! – theMadKing

+0

Не могли бы вы представить пример ввода? – zero323

Смежные вопросы