2015-08-06 3 views
0

У меня следующий RDDискровых Эффективный способ Scala рассчитывать на каждом уровне группы

(000740C7AD5274,8884165739991289,0) 
(000740C7AD5274,5247914560952402,1) 
(000740C7AD5274,6366183814312296,0) 
(000740C7AD5274,8416039242203850,1) 
(000740C7AD5274,8767784019249585,0) 
(000740C7AD5274,8875366436847528,0) 
(000740C7AD5274,6878583261589229,0) 
(000740C7AD5274,7480419089929113,1) 
(000740C7AD5274,7480419089929113,0) 
(000740C7AD5274,8848143710281107,0) 
(000740C7AD5274,7617664942496492,1) 
(000740C7AD5274,4905980213247549,0) 
(000740C7AD5274,6806506896473929,1) 

это представляет USERID, PRODUCTID, BuyorNot информацию. Я хочу создать набор статистических данных из таких данных, как. Количество товаров, купленных каждым пользователем, и количество пользователей на продукт.

Я начал следующим образом:

val userProduct = userProductRDD.groupBy(x => (x._1, x._2)).flatMap(k => (k._1, if (k._2._3) != 0) 1 else 0)) 

Но это не дает (userId, distinct_bought_count) Некоторые рекомендации было бы здорово, чтобы двигаться вперед.

ответ

1

Просто:

val userProducts: RDD[(String, Long)] = 
    userProductRDD.filter(_._3 == 1)     // buys only 
       .map { case (u, p, _, _) => (u, p) } // drop buys and price 
       .distinct       // keep distinct (user, product) 
       .map { case (u, _) => (u, 1L) } // word count problem 
       .reduceByKey(_ + _) 

// similarly 
val productUsers: RDD[(Long, Long)] = 
    userProductRDD.filter(_._3 == 1) 
       .map { case (u, p, _, _) => (p, u) } 
       .distinct 
       .map { case (p, _) => (p, 1L) } 
       .reduceByKey(_ + _) 
+0

Небольшая вещь, которую я пропустил, если если у меня есть несколько элементов, после третьего значения, такие как '000740C7AD5274,8884165739991289,0, price', что мне нужно изменить? –

+0

, потому что я получаю следующую ошибку: 'found: (T1, T2, T3, T4) required: (String, String, Int, String, String)' –

+0

Я отредактировал мой ответ –