2016-08-18 5 views
1

У меня есть dataframe в pyspark. Вот как это выглядит,Вставка записей в световой информационный кадр

+---------+---------+ 
|timestamp| price | 
+---------+---------+ 
|670098928| 50  | 
|670098930| 53  | 
|670098934| 55  | 
+---------+---------+ 

Я хочу, чтобы заполнить пробелы в метку времени с предыдущего состояния, так что я могу получить идеальный набор для расчета времени, взвешенные средние значения. Вот то, что результат должен быть как -

+---------+---------+ 
|timestamp| price | 
+---------+---------+ 
|670098928| 50  | 
|670098929| 50  | 
|670098930| 53  | 
|670098931| 53  | 
|670098932| 53  | 
|670098933| 53  | 
|670098934| 55  | 
+---------+---------+ 

В конце концов, я хочу, чтобы упорствовать этот новый dataframe на диске и визуализировать мой анализ.

Как это сделать в pyspark? (Для простоты, я просто держал 2 колонки. Моя текущая dataframe имеет 89 столбцов с ~ 670 миллионов записей до заполнения пробелов.)

+0

Вы можете сделать интерполяцию с помощью scipy. Я не слишком уверен, что PySpark может делать то, что вы хотите –

+0

@ cricket_007 искра не может этого сделать. Винет, я не знаю, почему ты хочешь это сделать? – eliasah

+0

@eliasah Я пытаюсь создать dataframe с записью для каждой отметки времени (ранжирование самого низкого уровня), так что, если я хочу делать средневзвешенные средние значения, это очень удобно. – Veenit

ответ

1

Вы можете создать диапазон временных меток, расплющить их и выбрать строки

import pyspark.sql.functions as func 

from pyspark.sql.types import IntegerType, ArrayType 


a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\ 
.toDF(['timestamp','price']) 

f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType())) 

a.withColumn('timestamp',f(a.timestamp))\ 
.withColumn('timestamp',func.explode(func.col('timestamp')))\ 
.groupBy('timestamp')\ 
.agg(func.max(func.col('price')))\ 
.show() 

+---------+----------+ 
|timestamp|max(price)| 
+---------+----------+ 
|670098928|  50| 
|670098929|  50| 
|670098930|  53| 
|670098931|  53| 
|670098932|  53| 
|670098933|  53| 
|670098934|  55| 
|670098935|  55| 
|670098936|  55| 
|670098937|  55| 
|670098938|  55| 
+---------+----------+ 
+0

Я получаю 'AttributeError: объект JavaMember 'не имеет атрибута' parseDataType'', когда я выполняю 'f = func.udf (lambda x: range (x, x + 5), ArrayType (IntegerType()))' – Veenit

+0

код работает, проверяет ваш pyspark.sql импорт – marmouset

+0

Нет. Это не так. В какой версии Spark вы? Я на 2.0.0 – Veenit

Смежные вопросы