Я пытаюсь сделать что-то довольно простое. У меня есть объект datetime как часть моего фреймворка данных, и когда я делаю карту, я хотел бы форматировать дату определенным образом. Я создал пользовательскую функцию:Pyspark - Функция вызова в лямбда вызывает ошибку импорта
def format_date(dt):
"""Set this for date formatting. dt is datetime."""
return dt.strftime("%Y/%m/%d %H:%M:%S")
И затем позже, я использую это в моей карте вызова (х является DateTime объекта):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
Это вызывает следующее исключение при представлении как работу:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named analysis
Обратите внимание, что мое имя скрипта «run_analyses.py», а импорт всех функций из «analysis.py». Я представляю работу с
/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py
Самое странное в том, что она отлично работает, если скопировать код на интерактивной сессии pyspark (или если я удалить вызов format_date). Я могу обойти это, создав новый столбец и используя UDF в моей функции format_date, чтобы создать новый столбец, но я хотел бы знать, почему этот подход терпит неудачу.
Я наклеил более полный код ниже.
Редактировать: Кажется, что это удается, если я запускаю код непосредственно из анализа.py, но сбой, если я запустил его из run_analysis.py. Я изменил код ниже, чтобы более точно показать это.
run_analyses.py
import datetime, json, math, subprocess
from os.path import expanduser
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from analysis import *
sc = SparkContext()
sqlCtx = HiveContext(sc)
ids = {}
...
my_func(sqlCtx,ids)
analysis.py
def my_func(sqlCtx,ids):
df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val")
df = df.filter((df.t > last_week)&(df.t < now))
df = df.filter(df.val > 0)
write_vals(df)
...
def write_vals(df):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
...
return
Это сделал это! Спасибо :) изменение моего sc-экземпляра на sc = SparkContext (pyFiles = ['analysis.py']) исправил его. –