2016-09-12 4 views
2

Я новичок в Apache Spark (Pyspark) и был бы рад получить помощь в решении этой проблемы. В настоящее время я использую Pyspark 1.6 (если бы не было 2.0, поскольку поддержка MQTT отсутствует).Apache Spark Count State Change

Итак, у меня есть кадр данных, который имеет следующую информацию,

+----------+-----------+ 
|  time|door_status| 
+----------+-----------+ 
|1473678846|   2| 
|1473678852|   1| 
|1473679029|   3| 
|1473679039|   3| 
|1473679045|   2| 
|1473679055|   1| 

В основном это время и статус двери. Мне нужно рассчитать количество открывшихся дверей & закрыто. Поэтому мне нужно определить изменение состояния и сохранить независимые счетчики для каждого статуса.

Поскольку я новичок в этом, мне очень сложно понять, как это реализовать. Если кто-то может предложить идею &, то указывайте мне в правильном направлении, это будет здорово.

Спасибо заранее!

ответ

0

В этом случае использование аккумулятора должно решить проблему. В принципе, вы создаете три разных аккумулятора для трех статусов.

status_1 = sc.accumulator(0) 
status_2 = sc.accumulator(0) 
status_3 = sc.accumulator(0) 

, то вы можете сделать что-то вроде следующего

if (status == 1): 
    status_1 += 1 
1

Там не эффективный метод, который может выполнять операции, как этот вне коробки. Вы можете использовать функции окна:

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, lag 

df = sc.parallelize([ 
    (1473678846, 2), (1473678852, 1), 
    (1473679029, 3), (1473679039, 3), 
    (1473679045, 2), (1473679055, 1) 
]).toDF(["time", "door_status"]) 


w = Window().orderBy("time") 
(df 
    .withColumn("prev_status", lag("door_status", 1).over(w)) 
    .where(col("door_status") != col("prev_status")) 
    .groupBy("door_status", "prev_status") 
    .count()) 

но это просто не будет масштабироваться. Вы можете попробовать mapParitions. Кулак давайте определим функцию, мы будем использовать для отображения разделов:

from collections import Counter 

def process_partition(iter): 
    """Given an iterator of (time, state) 
    return the first state, the last state and 
    a counter of state changes 

    >>> process_partition([]) 
    [(None, None, Counter())] 
    >>> process_partition(enumerate([1, 1, 1])) 
    [(1, 1, Counter())] 
    >>> process_partition(enumerate([1, 2, 3])) 
    [(1, 3, Counter({(1, 2): 1, (2, 3): 1}))] 
    """ 

    first = None 
    prev = None 
    cnt = Counter() 

    for i, (_, x) in enumerate(iter): 
     # Store the first object per partition 
     if i == 0: 
      first = x 

     # If change of state update couter 
     if prev is not None and prev != x: 
      cnt[(prev, x)] += 1 

     prev = x 

    return [(first, prev, cnt)] 

и простые функции слияния:

def merge(x, y): 
    """Given a pair of tuples: 
    (first-state, last-state, counter_of changes) 
    return a tuple of the same shape representing aggregated results 

    >>> merge((None, None, Counter()), (1, 1, Counter())) 
    (None, 1, Counter()) 
    >>> merge((1, 2, Counter([(1, 2)])), (2, 2, Counter())) 
    (None, 2, Counter({(1, 2): 1})) 
    >>> merge((1, 2, Counter([(1, 2)])), (3, 2, Counter([(3, 2)]) 
    (None, 2, Counter({(1, 2): 1, (2, 3): 1, (3, 2): 1})) 
    """ 
    (_, last_x, cnt_x), (first_y, last_y, cnt_y) = x, y 

    # If previous partition wasn't empty update counter 
    if last_x is not None and first_y is not None and last_x != first_y: 
     cnt_y[(last_x, first_y)] += 1 

    # Merge counters 
    cnt_y.update(cnt_x) 

    return (None, last_y, cnt_y) 

С этими двумя мы можем выполнить операцию так:

partials = (df.rdd 
    .mapPartitions(process_partition) 
    .collect()) 

reduce(merge, [(None, None, Counter())] + partials) 
+0

@ zero323 Спасибо. Первый метод работал. Как уже упоминалось, для больших объемов данных он кажется медленным. Мне еще предстоит попробовать второй метод. – Sisyphus

+0

@DavidArenburg Я не уверен, следую ли я. Не могли бы вы уточнить? – zero323

+1

@DavidArenburg Нет, конечно нет. Я использовал его для запуска набора тестов с разным расположением разделов. Спасибо! – zero323

0

Вы можете попробовать следующее решение:

import org.apache.spark.sql.expressions.Window 

var data = Seq((1473678846, 2), (1473678852, 1), (1473679029, 3), (1473679039, 3), (1473679045, 2), (1473679055, 1),(1473779045, 1), (1474679055, 2), (1475679055, 1), (1476679055, 3)).toDF("time","door_status") 

data. 
    select(
    $"*", 
    coalesce(lead($"door_status", 1).over(Window.orderBy($"time")), $"door_status").as("next_door_status") 
). 
    groupBy($"door_status"). 
    agg(
    sum(($"door_status" !== $"next_door_status").cast("int")).as("door_changes") 
). 
    show 

Это на языке scala. Вы должны сделать то же самое в python.

0

Я попробовал это в Java, но на самом деле это также должно быть возможным в python API-интерфейсов данных сопоставимым образом.

Doing следующее:

  • загрузить данные в dataframe/наборе данных
  • регистр dataframe как временная таблица
  • выполнить этот запрос: SELECT состояния, COUNT (*) FROM doorstates GROUP BY состояния

Обязательно удалите заголовок