2014-11-15 1 views
7

Я использую hadoop 2.4.1 и Spark 1.1.0. Я загрузил набор данные обзора пищи в HDFS от here, а затем я использовал следующий код, чтобы прочитать файл и обработать его на свечу оболочке:Зачем искровой оболочке бросать ArrayIndexOutOfBoundsException при чтении большого файла из HDFS?

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

var path = "hdfs:///user/hduser/finefoods.txt" 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "\n\n") 
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}  
var tupleSet = datasetObj.map(strArr => strArr.map(elm => elm.split(": ")(1))).map(arr => (arr(0),arr(1),arr(4).toDouble)) 
tupleSet.groupBy(t => t._2) 

Когда я запускаю последнюю строку tupleSet.groupBy(t => t._2), искра оболочка кидает следующее исключение:

scala> tupleSet.groupBy(t => t._2).first() 
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Registering RDD 11 (groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:28) with 1 output partitions (allowLocal=true) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Final stage: Stage 1(first at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[11] at groupBy at <console>:28), which has no missing parents 
14/11/15 22:46:59 INFO storage.MemoryStore: ensureFreeSpace(3592) called with curMem=221261, maxMem=278302556 
14/11/15 22:46:59 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.2 MB) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[11] at groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:46:59 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4) 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:0+134217728 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:134217728+134217728 
14/11/15 22:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 4) 
java.lang.ArrayIndexOutOfBoundsException 
14/11/15 22:47:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 5, localhost, ANY, 1221 bytes) 
14/11/15 22:47:02 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:268435456+102361028 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

14/11/15 22:47:02 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 INFO scheduler.DAGScheduler: Failed to run first at <console>:28 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 2.0 (TID 5, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Но когда я использую фиктивный набор данных как следующий, он хорошо работает:

var tupleSet = sc.parallelize(List(
("B001E4KFG0","A3SGXH7AUHU8GW",3.0), 
("B001E4KFG1","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG2","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG3","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG4","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG5","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG0","bbb",5.0) 
)) 

Любая идея ш гип?

ответ

9

В наборе данных есть запись, которая не соответствует формату и поэтому: elm.split(": ")(1) не работает, потому что в этом индексе нет элемента.

Вы можете избежать этой ошибки, проверив результаты разделения перед доступом к индексу (1). Один из способов сделать это может быть что-то вроде этого:

var tupleSet = datasetObj.map(elem => elm.split(": ")).collect{case x if (x.length>1) x(1)} 

Одно замечание: Ваши примеры, кажется, не соответствует трубопровода синтаксического анализа в коде. Они не содержат токенов «:».

Поскольку преобразования являются ленивыми, Spark не расскажет вам многое о вашем наборе данных ввода (и вы его не заметите) только до выполнения действия, такого как groupBy().

0

Это может быть связано с пустым/пустым строками в вашем наборе данных. И вы применяете функцию split для данных. В этом случае отфильтруйте пустые строки.

Например: myrdd.filter (. _ Непустые) .map (...)

0

У меня была аналогичная проблема, когда я был преобразования данных журнала в dataframe с помощью pySpark.

Когда запись в журнале недействительна, я вернул нулевое значение вместо экземпляра строки. Перед преобразованием в dataframe я отфильтровал эти значения. Но, тем не менее, я получил вышеупомянутую проблему. Наконец, ошибка исчезла, когда я вернул строку с нулевыми значениями вместо единственного нулевого значения.

Псевдо код ниже:

Didnt работы:

rdd = Parse log (log lines to Rows if valid else None) 
filtered_rdd = rdd.filter(lambda x:x!=None) 
logs = sqlContext.inferSchema(filtered_rdd) 

Работал:

rdd = Parse log (log lines to Rows if valid else Row(None,None,...)) 
logs = sqlContext.inferSchema(rdd) 
filtered_rdd = logs.filter(logs['id'].isNotNull()) 
Смежные вопросы