2017-01-03 3 views
0

У меня проблема с моим reduceByKey(). Я не отображать результат ... У меня есть ключи, ценности ... Но невозможно использовать reduceByKey ...Ошибка Pyspark ReduceByKey

data_test_bis = data_textfile.map(lambda x: (x.split(",")[8].encode("utf-8").replace('"','').replace("'",''), 1)).filter(lambda x: x[0].startswith('Ru'))#.reduceByKey(lambda x, y: x + y) 
#data_test_filter = data_test_bis.filter(lambda x: x[0].startswith('"R')) 
print("TEST FILTER !") 
print(type(data_test_bis)) 
print(data_test_bis.take(5)) 
print(data_test_bis.keys().take(10)) 
print(data_test_bis.values().take(10)) 

Результаты:

TEST FILTER ! 
<class 'pyspark.rdd.PipelinedRDD'> 
[('Rueil-Malmaison', 1), ('Ruse', 1), ('Rueil Malmaison', 1), ('Rueil-Malmaison', 1), ('Ruda Slaska', 1)] 
['Rueil-Malmaison', 'Ruse', 'Rueil Malmaison', 'Rueil-Malmaison', 'Ruda Slaska', 'Ruisbroek (Belgique)', 'Ruda \xc3\x85\xc5\xa1l\xc3\x84\xe2\x80\xa6ska', 'Rueil malmaison', 'Rueil', 'Ruisbroek'] 
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 

Когда я попробовать это, Есть ошибка:

print(data_test_bis.reduceByKey(add).take(10)) 

или

print(data_test_bis.reduceByKey(lambda x, y: x + y).take(10)) 

Ошибка:

17/01/03 17:47:09 ERROR scheduler.TaskSetManager: Task 18 in stage 3.0 failed 4 times; aborting job 
Traceback (most recent call last): 
    File "/home/spark/julien/Test_.py", line 89, in <module> 
    test() 
    File "/home/spark/julien/Test_.py", line 33, in test 
    print(data_test_bis.reduceByKey(lambda x, y:x+y).take(10)) 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1297, in take 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 939, in runJob 
    File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco 
    File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 3.0 failed 4 times, most recent failure: Lost task 18.3 in stage 3.0 (TID 67, 10.0.15.7): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func 
    File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues 
    for k, v in iterator: 
    File "/home/spark/julien/Test_.py", line 25, in <lambda> 
IndexError: list index out of range 

Я не понимаю, почему у меня есть IndexError ...

ответ

0

Просто повторяйте за мной: Я никогда не буду считать, что неструктурированный источник данных также формируется.

вещи, как:

... .map(lambda x: (x.split(",")[8].encode("utf-8") ...) 

отлично подходит для быстрых учебников, но бесполезны на практике. В целом никогда не зависеть от предположений, что:

  • Данные имеют определенную форму (например, 9 полей, разделенных запятыми).
  • Кодирование/декодирование будет успешным (здесь мы действительно можем, но в целом это не так).

По крайней мере, включают в себя минималистичный обработку исключений:

def parse_to_pair(line): 
    try: 
     key = (line 
      .split(",")[8] 
      .encode("utf-8") 
      .replace('"', '') 
      .replace("'", '')) 

     return [(key, 1)] 
    except: 
     return [] 

и использовать flatMap:

data_textfile.flatMap(parse_to_pair) 

Примечания:

  • Вы можете пропустить encode по телефону SparkContext.textFile с use_unicode установлен на False. Он:

    • Использование str вместо unicode в Python 2.
    • Использование bytes в Python 3.
  • Вы должны не только убедиться, что строка содержит по крайней мере 9 полей, но он содержит ожидаемое количество полей.

  • Если у вас есть csv в качестве источника ввода csv читатель.
Смежные вопросы