Так что название этого должно быть достаточно запутанным, поэтому я сделаю все возможное, чтобы объяснить. Я пытаюсь разбить эту функцию на определенные функции, чтобы лучше видеть, как aggregateByKey работает для других команд, которые будут писать мой код. У меня есть следующий агрегат:Spark aggregateByKey использует карту и определяет типы данных для функций
val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
(accumCount, value) => accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
},
(accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))}
).collect()
Я давно хотел, чтобы разорвать этот следующим образом:
val firstLet = Map[Char, Int]
def fSeq(accumCount:?, value:?) = {
accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
}
}
def fComb(accum1:?, accum2:?) = {
accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))
}
В связи с начальным значением ЯВЛЯЮЩЕЙСЯ Map [Char, Int] Я не уверен, что для создания типов учетных данных, значений для определения. Я пробовал разные вещи, но ничего не получается. Может ли кто-нибудь помочь мне определить типы данных и объяснить, как вы это определили?
И что здесь вводит? 'RDD [(T, String)]'? – zero323