2015-04-30 2 views
1

Я пытаюсь прочитать файл CVS с помощью Spark, а затем сохранить его в Cassandra. Сохранение в Кассандре работает, когда я использую тривиальные значения.Прочтите файл CSV в Spark и напишите его Cassandra

У меня есть файл со следующими значениями:

id,name,tag1|tag2|tag3

Я хочу сохранить его в Кассандре таблице:

id bigint, name varchar, tags set

Я определил класс случай для этого:

case class Item(id:Integer,name:String,tag:Set[String])

Затем я использую это выражение для получения RDD из файла CVS

val items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})

Когда я теперь называю collect или saveToCassandra по пунктам (который начинает обработку) я получаю следующее сообщение об ошибке:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

+0

Что такое класс class Item (id: Integer, name: String, tag [String])? Является ли тегом Set [String]? –

+0

Да, его набор [Строка]. Я изменил его выше – mniehoff

ответ

2

Как уже упоминалось, проблема заключается в том, что разделение на некоторых входах создает массив, который имеет меньше или больше, чем 3 элемента, используемые в совпадении.

Но partialFuntion, используемый для соответствия, может быть использован для фильтрации по элементам, которые do соответствуют критериям соответствия. rdd.collect{partialFunction} точно означает для этого:

val data = sc.textFile("items.csv") 
val arrayData = data.map(l => l.split(",")) 
val items = arrayData.collect{case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)}) 
items.saveToCassandra(...) 
  • Note1: вы должны также защитить от грязных значений. например ParseInt по значению, это не ИНТ номер, ...)
  • Примечание2: rdd.collect{partialFunc} (фильтры/картографических данных с помощью частичной функции) не следует путать с rdd.collect (вернись данных в драйвере))
+0

, вы оба правы, формат данных был неправильным. Я отметил это как принятый ответ, поскольку он обеспечивает приятное обходное решение, когда вы не полагаетесь на все данные. – mniehoff

1

Вы получите ошибку совпадения, если ваш вход не массив из 3 записей, например

String("a,b").split(",") match { 
    case Array(a,b,c) => .... 
} 

, так что я подозреваю, что это какой-то вопрос ввода данных, и вы должны удовлетворить его в вашем match.

Смежные вопросы