У меня есть RDD, связанный с таблицей HBase. Каждая строка (ключ) представляет собой местоположение GPS. Теперь я написал функцию для вычисления расстояния между двумя точками. Функция должна вызываться с текущей строкой и ее предшественником [i-1]Функциональный подход при последовательной обработке RDD [Apache Spark]
Теперь я изо всех сил пытаюсь сделать это с помощью функций RDD, чтобы я мог распараллелить его.
Мой быстрый и грязный подход к первым создать массив
val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)
rows.foreach(row => {
if (predecssorPoint == null) {
predecssorPoint = getPointByRow(row._2)
}
else {
currentPoint = getPointByRow(row._2)
rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)
i += 1
predecssorPoint = currentPoint
}
})
return rowArray
Тогда я распараллелить массив и вычислить расстояние
//create a parallel-enabled data set
val parallelDataSet = sc.parallelize(rows)
parallelDataSet.foreach(row => {
Functions.logDistance(row)
})
Это работает, но это некрасиво и, конечно, неэффективно.
Моя идея была в том, чтобы использовать rdd.reduce(), чтобы избавиться от цикла foreach, и это может сработать, если функция расстояния обрабатывает проблему, когда порядок (a + b) не гарантируется.
В любом случае, есть ли лучшее решение? Я понимаю, что нет возможности иметь (эффективный) индексный доступ при работе с RDD.
Спасибо.