2017-01-07 6 views
2

Я новичок Спарк и у меня есть файл CSV с такими данными:pyspark подстроки и агрегация

date,   accidents, injured 
2015/20/03 18:00 15,   5 
2015/20/03 18:30 25,   4 
2015/20/03 21:10 14,   7 
2015/20/02 21:00 15,   6 

Я хотел бы объединить эти данные на определенный час, когда это произошло. Моя идея - подстроить дату на «год/месяц/день hh» без минут, поэтому я могу сделать это ключом. Я хотел, чтобы каждый час приводил среднее число несчастных случаев и раненых. Может быть, есть другой, более умный способ с pyspark?

Спасибо, ребята!

ответ

4

Ну, это зависит от того, что вы собираетесь делать потом, я думаю.

Самый простой способ будет делать, как вы предлагаете: подстроки строку с датой, а затем агрегат:

data = [('2015/20/03 18:00', 15, 5), 
    ('2015/20/03 18:30', 25, 4), 
    ('2015/20/03 21:10', 14, 7), 
    ('2015/20/02 21:00', 15, 6)] 
df = spark.createDataFrame(data, ['date', 'accidents', 'injured']) 

df.withColumn('date_hr', 
       df['date'].substr(1, 13) 
    ).groupby('date_hr')\ 
     .agg({'accidents': 'avg', 'injured': 'avg'})\ 
     .show() 

Если вы, однако, хотите сделать некоторые вычисления позже, вы можете анализировать данные в a TimestampType(), а затем извлечь из него дату и час.

import pyspark.sql.types as typ 
from pyspark.sql.functions import col, udf 
from datetime import datetime 

parseString = udf(lambda x: datetime.strptime(x, '%Y/%d/%m %H:%M'), typ.TimestampType()) 
getDate = udf(lambda x: x.date(), typ.DateType()) 
getHour = udf(lambda x: int(x.hour), typ.IntegerType()) 

df.withColumn('date_parsed', parseString(col('date'))) \ 
    .withColumn('date_only', getDate(col('date_parsed'))) \ 
    .withColumn('hour', getHour(col('date_parsed'))) \ 
    .groupby('date_only', 'hour') \ 
    .agg({'accidents': 'avg', 'injured': 'avg'})\ 
    .show() 
+0

удалось легко подстроить при сопоставлении с помощью y [0] [: 13]. верьте, что ваше решение выглядит более элегантно. Спасибо! У вас есть еще один вопрос, если бы у меня был другой файл с другими данными, скажем, с другого года, как бы я получил среднее количество несчастных случаев и травм? помещая все в один файл, а затем выполняя вычисления? – sampak

+0

Я бы либо прочитал этот файл, либо выполнил агрегацию только по этим данным, либо, если вам нужно, чтобы результаты были получены за один раз (и если вы работаете с Spark 2.0), вы можете: .union (...) ' два (или более) 'DataFrames' вместе. http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union – TDrabas

Смежные вопросы