2016-04-07 2 views
1

Я пытаюсь сделать что-то довольно простое. У меня есть объект datetime как часть моего фреймворка данных, и когда я делаю карту, я хотел бы форматировать дату определенным образом. Я создал пользовательскую функцию:Pyspark - Функция вызова в лямбда вызывает ошибку импорта

def format_date(dt): 
    """Set this for date formatting. dt is datetime.""" 
    return dt.strftime("%Y/%m/%d %H:%M:%S") 

И затем позже, я использую это в моей карте вызова (х является DateTime объекта):

unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
     .reduceByKey(lambda x,y: x+y)\ 
     .collectAsMap() 

Это вызывает следующее исключение при представлении как работу:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
ImportError: No module named analysis 

Обратите внимание, что мое имя скрипта «run_analyses.py», а импорт всех функций из «analysis.py». Я представляю работу с

/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py 

Самое странное в том, что она отлично работает, если скопировать код на интерактивной сессии pyspark (или если я удалить вызов format_date). Я могу обойти это, создав новый столбец и используя UDF в моей функции format_date, чтобы создать новый столбец, но я хотел бы знать, почему этот подход терпит неудачу.

Я наклеил более полный код ниже.

Редактировать: Кажется, что это удается, если я запускаю код непосредственно из анализа.py, но сбой, если я запустил его из run_analysis.py. Я изменил код ниже, чтобы более точно показать это.

run_analyses.py

import datetime, json, math, subprocess 
from os.path import expanduser 
from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
from analysis import * 

sc = SparkContext() 
sqlCtx = HiveContext(sc) 
ids = {} 
... 
my_func(sqlCtx,ids) 

analysis.py

def my_func(sqlCtx,ids): 
    df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val") 
    df = df.filter((df.t > last_week)&(df.t < now)) 
    df = df.filter(df.val > 0) 
    write_vals(df) 
    ... 

def write_vals(df): 
    unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
      .reduceByKey(lambda x,y: x+y)\ 
      .collectAsMap() 
    ... 
    return 

ответ

2

Ключ в TRACEBACK:

ImportError: No module named analysis 

PySpark говорит вам, что рабочий процесс не имеет доступ к анализу. При инициализации SparkContext вы можете передать список файлов, которые должны быть скопированы на рабочий:

sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 

Больше информации: https://spark.apache.org/docs/0.9.0/python-programming-guide.html#standalone-use

+0

Это сделал это! Спасибо :) изменение моего sc-экземпляра на sc = SparkContext (pyFiles = ['analysis.py']) исправил его. –

Смежные вопросы