2015-03-15 2 views
2

Итак, я понимаю, что Spark может выполнять итеративные алгоритмы на одиночных RDD, например, логистической регрессии.Итеративные алгоритмы с потоком Spark

val points = spark.textFile(...).map(parsePoint).cache() 
    var w = Vector.random(D) // current separating plane 
    for (i <- 1 to ITERATIONS) { 
     val gradient = points.map(p => 
     (1/(1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x 
    ).reduce(_ + _) 
     w -= gradient 
    } 

Приведенный выше пример является итеративным, поскольку он поддерживает глобальное состояние w, который обновляется после каждой итерации и ее обновленное значение используется в следующей итерации. Возможно ли использование этой функции в потоке Spark? Рассмотрим тот же пример, за исключением того, что теперь points - это DStream. В этом случае, вы можете создать новый DStream, который вычисляет градиент с

val gradient = points.map(p => 
      (1/(1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x 
     ).reduce(_ + _) 

Но как бы вы справиться с глобальным состоянием w. Похоже, w тоже должен был бы быть DStream (возможно, с помощью updateStateByKey), но тогда его последнее значение каким-то образом должно было быть передано в картографическую функцию , которая, как я думаю, невозможна. Я не думаю, что DStreams могут общаться таким образом. Правильно ли, или можно ли проводить итеративные вычисления в Spark Streaming?

ответ

-1

Хм ... вы можете что-то достичь, распараллеливая свой итератор, а затем складывая его, чтобы обновить свой градиент.

Также ... Я думаю, вы должны оставить Spark Streaming из него, поскольку эта проблема не похожа на наличие какой-либо функции, которая связывает ее с любыми требованиями к потоковой передаче.

// So, assuming... points is somehow a RDD[ Point ] 
val points = sc.textFile(...).map(parsePoint).cache() 
var w = Vector.random(D) 

// since fold is (T)((T, T) => T) => T 
val temps = sc.parallelize(1 to ITERATIONS).map(w) 

// now fold over temps. 
val gradient = temps.fold(w)((acc, v) => { 
    val gradient = points.map(p => 
    (1/(1 + exp(-p.y*(acc dot p.x))) - 1) * p.y * p.x 
).reduce(_ + _) 
    acc - gradient 
} 
+0

Вопрос конкретно касался искрового потока. – user1893354

+0

Удачи вам в искрообразовании. –

2

Я только что узнал, что это очень просто с функцией foreachRDD. MLlib фактически предоставляет модели, которые вы можете тренировать с помощью DStreams, и я нашел ответ в коде streamingLinearAlgorithm. Похоже, вы можете просто локально сохранить свою глобальную переменную обновления в драйвере и обновить ее внутри .foreachRDD, поэтому нет необходимости преобразовывать ее в сам DStream. Таким образом, вы можете применить это к примеру, который я привел в качестве примера, приведенного в качестве примера:

points.foreachRDD{(rdd,time) => 

    val gradient=rdd.map(p=> (1/(1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x 
    )).reduce(_ + _) 

    w -= gradient 

    } 
+0

Кажется, что вы используете точки, которые из MLlib, знаете ли вы, какой способ сделать такое итеративное программирование, используя обычную искрообразование? У меня есть аналогичное требование, где я хотел бы сохранить некоторое состояние gobal, обрабатывая итеративно через микробатчи в упорядоченном dstream. – tsar2512

Смежные вопросы