2015-01-30 3 views
1

У меня есть список файлов csv, каждый из которых содержит группы имен категорий в качестве столбцов заголовка. Каждая строка представляет собой список пользователей с логическим значением (0, 1), являются ли они частью этой категории или нет. Каждый из файлов csv не имеет одинакового набора категорий заголовков.Доступ к глобальному поиску Apache Spark

Я хочу создать композитную CSV во всех файлах, которая имеет следующий вывод:

  1. Header является объединение всех заголовков
  2. Каждая строка представляет собой уникальный пользователь с логическое значение, соответствующее колонка категории

Способ, которым я хотел решить это, - создать кортеж user_id и уникальный category_id для каждой ячейки с «1». Затем уменьшите все эти столбцы для каждого пользователя, чтобы получить окончательный вывод.

Как создать кортеж для начала? Могу ли я получить глобальный поиск по всем категориям?

Пример данных:

File 1 
user_id,cat1,cat2,cat3 
21321,,,1, 
21322,1,1,1, 
21323,1,,, 

Файл 2

user_id,cat4,cat5 
21321,1,,, 
21323,,1,, 

Выход

user_id,cat1,cat2,cat3,cat4,cat5 
21321,,1,1,,, 
21322,1,1,1,,, 
21323,1,1,,,, 
+0

Пример данных поможет проиллюстрировать ваши исходные данные и желаемый результат. – maasg

+0

Добавлен пример. – user1927059

+0

Что вы делаете в случае конфликта? напримеродин csv говорит user_id, cat2 = 0, а другой user_id, cat2 = 1?. Кроме того, что вы пробовали до сих пор? – maasg

ответ

-1

Вам не нужно делать это как 2 шага процесса, если все, что вам нужно, это результирующие значения , Возможная конструкция: 1/Разберитесь csv. Вы не указываете, находятся ли ваши данные в распределенной FS, поэтому я предполагаю, что это не так. 2/Введите ваши (K, V) пары в изменяемую параллельную (чтобы воспользоваться Spark) картой. псевдо-код:

val directory = .. 
mutable.ParHashMap map = new mutable.ParHashMap() 
while (files[i] != null) 
{ 
    val file = directory.spark.textFile("/myfile...") 
    val cols = file.map(_.split(",")) 

    map.put(col[0], col[i++]) 
} 

, а затем вы можете получить доступ к (K/V) кортежей путем итератора на карте.

+0

Я добавил пример, который уточняет вопрос немного больше. Это не решит его. – user1927059

+0

пример путается: я думал о возможных значениях ячеек, где в (0,1), но похоже, что есть значения «21322»? –

+0

Мой плохой, если это было непонятно. Номер - это идентификатор пользователя. Все остальное равно 1 или «соответствует», если значение присутствует или отсутствует. – user1927059

0

Возможно, название вопроса вводит в заблуждение в том смысле, что оно передает определенный выбор реализации, поскольку нет необходимости в глобальном поиске, чтобы решить проблему.

В больших данных существует основной принцип, определяющий большинство решений: разделяйте и властвуйте. В этом случае входные CSV-файлы можно разделить на кортежи (пользователь, категория). Любое количество файлов CSV, содержащих произвольное количество категорий, может быть преобразовано в этот простой формат. Полученные результаты CSV объединения предыдущего шага, извлечение всего количества присутствующих категорий и некоторые преобразования данных, чтобы получить его в желаемом формате.

В коде этот алгоритм будет выглядеть следующим образом:

import org.apache.spark.SparkContext._ 

val file1 = """user_id,cat1,cat2,cat3|21321,,,1|21322,1,1,1|21323,1,,""".split("\\|") 
val file2 = """user_id,cat4,cat5|21321,1,|21323,,1""".split("\\|") 
val csv1 = sparkContext.parallelize(file1) 
val csv2 = sparkContext.parallelize(file2) 

import org.apache.spark.rdd.RDD 
def toTuples(csv:RDD[String]):RDD[(String, String)] = { 
    val headerLine = csv.first 
    val header = headerLine.split(",") 
    val data = csv.filter(_ != headerLine).map(line => line.split(",")) 
    data.flatMap{elem => 
    val merged = elem.zip(header) 
    val id = elem.head 
    merged.tail.collect{case (v,cat) if v == "1" => (id, cat)} 
    } 
} 

val data1 = toTuples(csv1) 
val data2 = toTuples(csv2) 
val union = data1.union(data2) 
val categories = union.map{case (id, cat) => cat}.distinct.collect.sorted //sorted category names 
val categoriesByUser = union.groupByKey.mapValues(v=>v.toSet) 
val numericCategoriesByUser = categoriesByUser.mapValues{catSet => categories.map(cat=> if (catSet(cat)) "1" else "")} 
    val asCsv = numericCategoriesByUser.collect.map{case (id, cats)=> id + "," + cats.mkString(",")} 

Результаты в:

21321,,,1,1, 
21322,1,1,1,, 
21323,1,,,,1 

(Генерация заголовка проста и оставляется в качестве упражнения для читателя)

Смежные вопросы