2017-01-19 4 views
0

Я работаю над скриптом Spark на Python (используя Pyspark). У меня есть функция, которая возвращает Ro ш с некоторыми полями, в то числеЗапись метки времени в Postgres с Pyspark

timestamp=datetime.strptime(processed_data[1], DATI_REGEX) 

processed_data [1] является допустимой строкой даты и времени.

Редактировать, чтобы показать полный код:

DATI_REGEX = "%Y-%m-%dT%H:%M:%S" 

class UserActivity(object): 
    def __init__(self, user, rows): 
     self.user = int(user) 
     self.rows = sorted(rows, key=operator.attrgetter('timestamp')) 

    def write(self): 
     return Row(
      user=self.user, 
      timestamp=self.rows[-1].timestamp, 
     ) 

def parse_log_line(logline): 
    try: 
     entries = logline.split('\\t') 
     processed_data = entries[0].split('\t') + entries[1:] 

     return Row(
      ip_address=processed_data[9], 
      user=int(processed_data[10]), 
      timestamp=datetime.strptime(processed_data[1], DATI_REGEX), 
     ) 
    except (IndexError, ValueError): 
      return None 


logFile = sc.textFile(...) 
rows = (log_file.map(parse_log_line).filter(None) 
     .filter(lambda x: current_day <= x.timestamp < next_day)) 
user_rows = rows.map(lambda x: (x.user, x)).groupByKey() 
user_dailies = user_rows.map(lambda x: UserActivity(current_day, x[0], x[1]).write()) 

Проблема возникает, когда я пытаюсь написать, что на PostgreSQL БД, выполнив следующие действия:

fields = [ 
    StructField("user_id", IntegerType(), False), 
    StructField("timestamp", TimestampType(), False), 
] 
schema = StructType(fields) 
user_dailies_schema = SQLContext(sc).createDataFrame(user_dailies, schema) 
user_dailies_schema.write.jdbc(
    "jdbc:postgresql:.......", 
    "tablename") 

Я получаю следующее сообщение об ошибке:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in toInternal 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in <genexpr> 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 436, in toInternal 
    return self.dataType.toInternal(obj) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 190, in toInternal 
    seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 
AttributeError: 'int' object has no attribute 'tzinfo' 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Любая идея о том, как ее решить?

ответ

1

Проблема относительно проста. PySpark - это tuple, упорядоченный по именам полей. Это означает, что при создании:

Row(user=self.user, timestamp=self.rows[-1].timestamp) 

выходная структура упорядочен:

Row(timestamp, user) 

StructType с другой стороны заказана как есть. В результате вы код пытается использовать идентификатор пользователя как метку времени. Вы должны либо вернуть простой tuple:

class UserActivity(object): 
    ... 
    def write(self): 
     return (self.user, timestamp) 

или использовать лексикографически упорядоченную схему:

schema = StructType(sorted(fields, key=operator.attrgetter("name"))) 

Наконец, вы можете использовать namedtuple для достижения как доступ атрибутов и предопределенный порядок.

На боковой ноте не использовать groupByKey вот так. Это типичный случай, когда можно было бы использовать reduceByKey:

(log_file.map(parse_log_line) 
    .map(operator.attrgetter("user", "timestamp")) 
    .reduceByKey(max)) 

с несколькими полями:

from functools import partial 

(log_file.map(parse_log_line) 
    .map(lambda x: (x.user, x)) 
    .reduceByKey(partial(max, key=operator.itemgetter("timestamp"))) 
    .values()) 

или DataFrame скоплениями:

from pyspark.sql import functions as f 

(sqlContext 
    .createDataFrame(
     log_file.map(parse_log_line) 
      # Another way to handle ordering is to choose fields 
      # before you call createDataFrame 
      .map(operator.attrgetter("user", "timestamp")), 
     schema) 
    .groupBy("user_id") 
    .agg(f.max("timestamp").alias("timestamp"))) 

Кроме того, если вы хотите получить SQLContext вы должны использовать заводской метод:

SQLContext.getOrCreate(sc) 

Создание нового контекста, как и у вас, может иметь неожиданные побочные эффекты.

Смежные вопросы