у меня есть набор данных, который выглядит, как это, где каждый пользователь и идентификатор продукта является строка:Spark - преобразование идентификаторов строк уникальных идентификаторов целочисленных
userA, productX
userA, productX
userB, productY
с ~ 2,8 млн продуктов и 300 миллионов пользователей; около 2,1 млрд. пользовательских ассоциаций.
Моя конечная цель - запустить Spark-совместную фильтрацию (ALS) в этом наборе данных. Поскольку для пользователей и продуктов требуются ключи int, первым моим шагом является назначение уникального int каждому пользователю и продукту и преобразование набора данных выше, чтобы пользователи и продукты были представлены ints.
Вот что я пытался до сих пор:
val rawInputData = sc.textFile(params.inputPath)
.filter { line => !(line contains "\\N") }
.map { line =>
val parts = line.split("\t")
(parts(0), parts(1)) // user, product
}
// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()
// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()
idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)
// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
case (id1, (id2s, idx1s)) =>
val idx1 = idx1s.head
id2s.map { (_, idx1)
}
}
// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
case (id2, (idx1s, idx2s)) =>
val idx2 = idx2s.head
idx1s.map{ (_, idx2)
}
}
// save output
val convertedInts = converted.map{
case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)
Когда я пытаюсь запустить это на моем кластере (40 исполнителей с 5 Гб оперативной памяти каждого), она способна производить idx1map и idx2map файлы в порядке, но он терпит неудачу с ошибками памяти и сбоями при первой плоской карте после cogroup. Я не много делал с Spark, поэтому мне интересно, есть ли лучший способ сделать это; У меня нет четкого представления о том, какие шаги в этой работе будут дорогими. Конечно, cogroup потребует перетасовки всего набора данных по сети; но что значит что-то подобное?
FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)
Причина, почему я не только с помощью хэш-функции является то, что я в конце концов хотел бы запустить это на гораздо больший набор данных (порядка 1 млрд товаров, 1 миллиард пользователей, 35 млрд ассоциаций) , а количество столкновений с Int-ключами станет довольно большим. Выполняет ли ALS на наборе данных этого масштаба даже близко к возможному?