2015-12-13 3 views
0

Я очень новичок в Apache Spark и пытаюсь использовать SchemaRDD с текстовым файлом, ограниченным текстом. У меня есть отдельная установка Spark 1.5.2 на моем Mac с помощью Scala 10. У меня есть файл CSV со следующими репрезентативными данными, и я пытаюсь разбить их на 4 разных файла на основе первого значения (столбца) записи , Я бы очень признателен за любую помощь, которую я смогу с этим справиться.Apache Spark, работающий с файлами CSV с разделителями каналов

1|1.8|20140801T081137|115810740 
2|20140714T060000|335|22159892|3657|0.00|||181 
2|20140714T061500|335|22159892|3657|0.00|||157 
2|20140714T063000|335|22159892|3657|0.00|||156 
2|20140714T064500|335|22159892|3657|0.00|||66 
2|20140714T070000|335|22159892|3657|0.01|||633 
2|20140714T071500|335|22159892|3657|0.01|||1087 
3|34|Starz 
3|35|VH1 
3|36|CSPAN: Cable Satellite Public Affairs Network 
3|37|Encore 
3|278|CMT: Country Music Television 
3|281|Telehit 
4|625363|1852400|Matlock|9212|The Divorce 
4|625719|1852400|Matlock|16|The Rat Pack 
4|625849|1846952|Smallville|43|Calling 
+2

Добро пожаловать в SO. Если вы включите свои собственные попытки, у вас будет гораздо больше шансов получить ответ. – zero323

ответ

5

Примечание: Ваш файл CSV не имеют одинаковое количество полей в каждой строке - это не может быть проанализирован как в DataFrame. (SchemaRDD был переименован в DataFrame.) Вот что вы можете сделать, если ваш файл csv был хорошо сформирован:

запуск искровой оболочки или искра - отправить с помощью --packages com.databricks: spark-csv_2.10: 1.3.0, чтобы легко разбирать файлы csv (see here). В Scala, ваш код будет, предполагая, что файл CSV имеет заголовок - если да, то проще ссылаться на столбцы:

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv") 
// assume 1st column has name col1 
val df1 = df.filter(df("col1") === 1) // 1st DataFrame 
val df2 = df.filter(df("col1") === 2) // 2nd DataFrame etc... 

Поскольку ваш файл не хорошо сформирован, вы должны проанализировать каждый из разные линии по-разному, например, выполните следующие действия:

val lines = sc.textFile("/path/to/file.csv") 

case class RowRecord1(col1:Int, col2:Double, col3:String, col4:Int) 
def parseRowRecord1(arr:Array[String]) = RowRecord1(arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt) 

case class RowRecord2(col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int) 
def parseRowRecord2(arr:Array[String]) = RowRecord2(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt) 

val df1 = lines.filter(_.startsWith("1")).map(_.split('|')).map(arr => parseRowRecord1(arr)).toDF 
val df2 = lines.filter(_.startsWith("2")).map(_.split('|')).map(arr => parseRowRecord2(arr)).toDF 
+0

Привет, KrisP, Большое вам спасибо за помощь. Я пробовал ваши первые несколько строк кода, и он отлично работал! Я собираюсь попробовать остальную часть вашего примера, а затем разделить файлы (которые имеют разные столбцы num) на несколько файлов с одинаковым количеством столбцов на основе значения COL0 ... – Edward

+0

Hi KrisP Привет, вы также знаете, как сохранить выходные данные в файлах с разделителями каналов? Я думаю, что вывод команды df2.write.format ("com.databricks.spark.csv"). Save ("/ Users/temp/parsed1.txt") по умолчанию разделен запятой и разбивает вещи на несколько файлов. Если возможно, я также пытаюсь записать результаты непосредственно в Amazon Redshift, чтобы сделать рабочий процесс более упорядоченным. Большое спасибо за вашу помощь. – Edward

+3

Это работает только для меня, если я использую двойные кавычки для разделителя: .option ("delimiter", "|") В противном случае я получаю ошибку: java.lang.IllegalArgumentException: разделитель не может быть более одного символа – Brian

Смежные вопросы