2015-08-27 4 views
1

Я очень новичок в программировании. Я пытаюсь реализовать карту и reduceByKey для следующего набора данных с 15 полями.IndexError: строка index out of range pyspark

rdd=sc.parallelize([ 
("West", "Apple", 2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0,2.0, 10), 
("West", "Apple", 3.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0]) 

Это моя функция карты, где я пытаюсь создать кортеж с несколькими ключами и значениями

rdd1 = rdd.map(lambda x: ((x[0],x[1]),(x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14]))) 

Следующего шага я пытаюсь reduceByKey (для реализации SQL, как совокупности значения в выше кортежа)

rdd2 = rdd1.reduceByKey(lambda x,y: (x[1]+','+y[1])) 

Эта функция снижения работает как и ожидалось значения индекса кортеж 0-4, но при попытке значение индекса кортеж 5-14, я получаю IndexError.

rdd2 = rdd1.reduceByKey(lambda x,y: (x[10]+','+y[10])) 
Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/opt/spark/python/pyspark/rdd.py", line 1277, in take 
res = self.context.runJob(self, takeUpToNumLeft, p, True) 
File "/opt/spark/python/pyspark/context.py", line 897, in runJob 
allowLocal) 
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError15/08/28 01:23:22 WARN TaskSetManager: Lost task 1.0 in stage 78.0 (TID 91, localhost): TaskKilled (killed intentionally) 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 78.0 failed 1 times, most recent failure: Lost task 0.0 in stage 78.0 (TID 90, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
process() 
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
serializer.dump_stream(func(split_index, iterator), outfile) 
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func 
return func(split, prev_func(split, iterator)) 
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func 
return func(split, prev_func(split, iterator)) 
File "/opt/spark/python/pyspark/rdd.py", line 316, in func 
return f(iterator) 
File "/opt/spark/python/pyspark/rdd.py", line 1758, in combineLocally 
merger.mergeValues(iterator) 
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 268, in mergeValues 
d[k] = comb(d[k], v) if k in d else creator(v) 
File "<stdin>", line 1, in <lambda> 
IndexError: string index out of range 

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) 
at org.apache.spark.scheduler.Task.run(Task.scala:70) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at  java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
at java.lang.Thread.run(Thread.java:695) 

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Это похоже на очень нетривиальную ошибку. Я не уверен, что эта ошибка связана с оборудованием моей машины или моей реализацией функции уменьшения или что-то с искровым.

Любая помощь приветствуется.

ответ

0
File "<stdin>", line 1, in <lambda> 
IndexError: string index out of range 

Ошибка возникает в вашей лямбда-функции. Тип последовательности (кортеж, список, строка), который у вас есть, не имеет такого количества элементов, как вы закодировали свою функцию.