2015-01-22 4 views
4

у меня есть набор данных, который выглядит, как это, где каждый пользователь и идентификатор продукта является строка:Spark - преобразование идентификаторов строк уникальных идентификаторов целочисленных

userA, productX 
userA, productX 
userB, productY 

с ~ 2,8 млн продуктов и 300 миллионов пользователей; около 2,1 млрд. пользовательских ассоциаций.

Моя конечная цель - запустить Spark-совместную фильтрацию (ALS) в этом наборе данных. Поскольку для пользователей и продуктов требуются ключи int, первым моим шагом является назначение уникального int каждому пользователю и продукту и преобразование набора данных выше, чтобы пользователи и продукты были представлены ints.

Вот что я пытался до сих пор:

val rawInputData = sc.textFile(params.inputPath) 
    .filter { line => !(line contains "\\N") } 
    .map { line => 
     val parts = line.split("\t") 
     (parts(0), parts(1)) // user, product 
    } 

// find all unique users and assign them IDs 
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache() 

// find all unique products and assign IDs 
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache() 

idx1map.map{ case (id, idx) => id + "\t" + idx.toString 
}.saveAsTextFile(params.idx1Out) 
idx2map.map{ case (id, idx) => id + "\t" + idx.toString 
}.saveAsTextFile(params.idx2Out) 

// join with user ID map: 
// convert from (userStr, productStr) to (productStr, userIntId) 
val rev = rawInputData.cogroup(idx1map).flatMap{ 
    case (id1, (id2s, idx1s)) => 
    val idx1 = idx1s.head 
    id2s.map { (_, idx1) 
    } 
} 

// join with product ID map: 
// convert from (productStr, userIntId) to (userIntId, productIntId) 
val converted = rev.cogroup(idx2map).flatMap{ 
    case (id2, (idx1s, idx2s)) => 
    val idx2 = idx2s.head 
    idx1s.map{ (_, idx2) 
    } 
} 

// save output 
val convertedInts = converted.map{ 
    case (a,b) => a.toInt.toString + "\t" + b.toInt.toString 
} 
convertedInts.saveAsTextFile(params.outputPath) 

Когда я пытаюсь запустить это на моем кластере (40 исполнителей с 5 Гб оперативной памяти каждого), она способна производить idx1map и idx2map файлы в порядке, но он терпит неудачу с ошибками памяти и сбоями при первой плоской карте после cogroup. Я не много делал с Spark, поэтому мне интересно, есть ли лучший способ сделать это; У меня нет четкого представления о том, какие шаги в этой работе будут дорогими. Конечно, cogroup потребует перетасовки всего набора данных по сети; но что значит что-то подобное?

FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25) 

Причина, почему я не только с помощью хэш-функции является то, что я в конце концов хотел бы запустить это на гораздо больший набор данных (порядка 1 млрд товаров, 1 миллиард пользователей, 35 млрд ассоциаций) , а количество столкновений с Int-ключами станет довольно большим. Выполняет ли ALS на наборе данных этого масштаба даже близко к возможному?

ответ

2

Похоже, вы по существу собираете все списки пользователей, просто чтобы разделить их снова. Попробуйте просто использовать join вместо cogroup, и мне кажется, что вы больше похожи на то, что хотите. Например:

import org.apache.spark.SparkContext._ 
// Create some fake data 
val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB"))) 
val userId = sc.parallelize(Seq(("userA",1),("userB",2))) 
val productId = sc.parallelize(Seq(("productA",1),("productB",2))) 

// Replace userName with ID's 
val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)} 
// Replace product names with ID's 
val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)} 

// Check results: 
bothReplaced.collect()) // Array((1,1), (1,2), (2,2)) 

Пожалуйста, оставьте комментарии к тому, насколько хорошо он работает.

(я не знаю, что такое FetchFailed(...))

Смежные вопросы