2015-11-16 3 views
3

У меня есть очень большие временные ряды данных, формат данных: (arrival_time, ключ, значение), единица времени сек, например:Использование Apache-Спарк для анализа временных рядов

0.01, k, v 
0.03, k, v 
.... 
1.00, k, v 
1.10, k, v 
1.20, k, v 
1.99, k, v 
2.00, k, v 
... 

Мне нужно сделать, чтобы получить количество строк в секунду от всех данных. В настоящее время я использую pySpark и мой код, как:

linePerSec = [] 
lo = rdd.take(1)[0] 
hi = lo + 1.0 
end = rdd.collect()[-1][0] 
while(hi < end): 
    number = rdd.filter(lambda (t, k, v): t >= lo and t < hi).count() 
    linePerSec.append(number) 
    lo = hi 
    hi = lo + 1.0 

Но это очень медленно, даже медленнее, чем просто проходя через линию передачи данных по линии в цикле. Я предполагаю, что это потому, что rdd.filter() проходит через весь rdd, чтобы найти линии, соответствующие условиям фильтра. Но для временных рядов нам не нужно проходить данные после границы hi в моем коде. Есть ли какие-либо решения, позволяющие искры перестать проходить через rdd в моей ситуации? Спасибо!

+2

можете ли вы опубликовать свой ожидаемый результат? я не могу понять, что вы пытаетесь сделать. – maxymoo

ответ

2

Первые позволяет создать фиктивные данные: поле времени

rdd = sc.parallelize(
    [(0.01, "k", "v"), 
    (0.03, "k", "v"), 
    (1.00, "k", "v"), 
    (1.10, "k", "v"), 
    (1.20, "k", "v"), 
    (1.99, "k", "v"), 
    (2.00, "k", "v"), 
    (3.10, "k", "v"), 
    (4.50, "k", "v")]) 

экстракта из РДА:

def get_time(x): 
    (start, _, _) = x 
    return start 

times = rdd.map(get_time) 

Далее нам необходимы функцией отображение от времени ключа:

def get_key_(start): 
    offset = start - int(start) 
    def get_key(x): 
     w = int(x) + offset 
     return w if x >= w else int(x - 1) + offset 
    return get_key 

найти минимальное и максимальное время

start = times.takeOrdered(1)[0] 
end = times.top(1)[0] 

генерировать фактическую ключевую функцию:

get_key = get_key_(start) 

и вычислить означают

from operator import add 

total = (times 
    .map(lambda x: (get_key(x), 1)) 
    .reduceByKey(add) 
    .values() 
    .sum()) 

time_range = get_key(end) - get_key(start) + 1.0 

mean = total/time_range 

mean 
## 1.8 

Быстрая проверка:

  • [0.01, 1.01): 3
  • [1.01, 2.01): 4
  • [2.01, 3.01): 0
  • [3.01, 4.01): 1
  • [4.01, 5.01): 1

Это дает 9/5 = 1.8

кадр данных эквивалент может выглядеть следующим образом:

from pyspark.sql.functions import count, col, sum, lit, min, max 

# Select only arrival times 
arrivals = df.select("arrival_time") 

# This is almost identical as before 
start = df.agg(min("arrival_time")).first()[0] 
end = df.agg(max("arrival_time")).first()[0] 

get_key = get_key_(start) 
time_range = get_key(end) - get_key(start) + 1.0 

# But we'll need offset as well 
offset = start - int(start) 

# and define a bucket column 
bucket = (col("arrival_time") - offset).cast("integer") + offset 

line_per_sec = (df 
    .groupBy(bucket) 
    .agg(count("*").alias("cnt")) 
    .agg((sum("cnt")/lit(time_range)).alias("mean"))) 

line_per_sec.show() 

## +----+ 
## |mean| 
## +----+ 
## | 1.8| 
## +----+ 

Пожалуйста, обратите внимание, что это очень похоже на the solution предоставленной Nhor с двумя основными отличиями:

  • использует ту же начальную логику, как ваш код
  • правильно обрабатывает промежутки времени
+0

Большое спасибо, изменив время на его целую часть, затем groupByKey, очень умно! Основываясь на вашей идее, более кратким способом является rdd.map (lambda (t, k, v): (int (t), k, v)). – Dong

+0

Распаковка параметров параметра Tuple краткая, но была удалена с помощью [PEP-3113] (https://www.python.org/dev/peps/pep-3113/) и больше не является допустимым синтаксисом в 3.x. Таким образом, это не то, что вы действительно хотите в своем коде. – zero323

0

Что бы я сделал это первым, я бы пол значения времени:

from pyspark.sql.functions import * 
df = df.select(floor(col('arrival_time')).alias('arrival_time')) 

Теперь у вас есть ваш arrival_time сражен, и вы будете готовы подсчитать количество строк в каждой второй:

df = df.groupBy(col('arrival_time')).count() 

Теперь, когда вы подсчитали строки в каждой секунде, вы можете получить все строки и разделить сумму по счету, чтобы получить средние линии в секунду:

lines_sum = df.select(sum(col('count')).alias('lines_sum')).first().lines_sum 
seconds_sum = df.select(count(col('arrival_time')).alias('seconds_sum')).first().seconds_sum 
result = lines_sum/seconds_sum 
+2

Это близко, но не охватывает всю оригинальную логику. Обратите внимание, что OP отсчитывается вторым с начала. Это означает, что простой «пол» не будет достаточным. – zero323

Смежные вопросы