2017-01-27 2 views
1

У меня есть RDD как следующийPYSPARK: использование сортировки с reduceByKey

dataSource = sc.parallelize([("user1", (3, "blue")), ("user1", (4, "black")), ("user2", (5, "white"), ("user2", (3, "black")), ("user2", (6, "red")), ("user1", (1, "red"))]) 

Я хочу использовать reduceByKey найти Top 2 цвета для каждого пользователя, так что выход был бы РДД нравится:

sc.parallelize([("user1", ["black", "blue"]), ("user2", ["red", "white"])]) 

поэтому мне нужно уменьшить по клавише, а затем отсортировать значения каждой клавиши, т. Е. (Число, цвет) по числу и вернуть верхние цвета.

Я не хочу использовать groupBy. Если есть что-то лучше, чем reduceByKey кроме groupBy, было бы здорово :)

ответ

1

Вы можете, например, использовать heap queue. Необходимые импорт:

import heapq 
from functools import partial 

Вспомогательные функции:

def zero_value(n): 
    """Initialize a queue. If n is large 
    it could be more efficient to track a number of the elements 
    on heap (cnt, heap) and switch between heappush and heappushpop 
    if we exceed n. I leave this as an exercise for the reader.""" 
    return [(float("-inf"), None) for _ in range(n)] 

def seq_func(acc, x): 
    heapq.heappushpop(acc, x) 
    return acc 

def merge_func(acc1, acc2, n): 
    return heapq.nlargest(n, heapq.merge(acc1, acc2)) 

def finalize(kvs): 
    return [v for (k, v) in kvs if k != float("-inf")] 

данных:

rdd = sc.parallelize([ 
    ("user1", (3, "blue")), ("user1", (4, "black")), 
    ("user2", (5, "white")), ("user2", (3, "black")), 
    ("user2", (6, "red")), ("user1", (1, "red"))]) 

Решение:

(rdd 
    .aggregateByKey(zero_value(2), seq_func, partial(merge_func, n=2)) 
    .mapValues(finalize) 
    .collect()) 

Результат:

[('user2', ['red', 'white']), ('user1', ['black', 'blue'])] 
Смежные вопросы