Я работаю над скриптом Spark на Python (используя Pyspark). У меня есть функция, которая возвращает Ro
ш с некоторыми полями, в то числеЗапись метки времени в Postgres с Pyspark
timestamp=datetime.strptime(processed_data[1], DATI_REGEX)
processed_data [1] является допустимой строкой даты и времени.
Редактировать, чтобы показать полный код:
DATI_REGEX = "%Y-%m-%dT%H:%M:%S"
class UserActivity(object):
def __init__(self, user, rows):
self.user = int(user)
self.rows = sorted(rows, key=operator.attrgetter('timestamp'))
def write(self):
return Row(
user=self.user,
timestamp=self.rows[-1].timestamp,
)
def parse_log_line(logline):
try:
entries = logline.split('\\t')
processed_data = entries[0].split('\t') + entries[1:]
return Row(
ip_address=processed_data[9],
user=int(processed_data[10]),
timestamp=datetime.strptime(processed_data[1], DATI_REGEX),
)
except (IndexError, ValueError):
return None
logFile = sc.textFile(...)
rows = (log_file.map(parse_log_line).filter(None)
.filter(lambda x: current_day <= x.timestamp < next_day))
user_rows = rows.map(lambda x: (x.user, x)).groupByKey()
user_dailies = user_rows.map(lambda x: UserActivity(current_day, x[0], x[1]).write())
Проблема возникает, когда я пытаюсь написать, что на PostgreSQL БД, выполнив следующие действия:
fields = [
StructField("user_id", IntegerType(), False),
StructField("timestamp", TimestampType(), False),
]
schema = StructType(fields)
user_dailies_schema = SQLContext(sc).createDataFrame(user_dailies, schema)
user_dailies_schema.write.jdbc(
"jdbc:postgresql:.......",
"tablename")
Я получаю следующее сообщение об ошибке:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in toInternal
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in <genexpr>
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 436, in toInternal
return self.dataType.toInternal(obj)
File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 190, in toInternal
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
AttributeError: 'int' object has no attribute 'tzinfo'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Любая идея о том, как ее решить?