2015-01-30 1 views
3

У меня есть RDD, связанный с таблицей HBase. Каждая строка (ключ) представляет собой местоположение GPS. Теперь я написал функцию для вычисления расстояния между двумя точками. Функция должна вызываться с текущей строкой и ее предшественником [i-1]Функциональный подход при последовательной обработке RDD [Apache Spark]

Теперь я изо всех сил пытаюсь сделать это с помощью функций RDD, чтобы я мог распараллелить его.

Мой быстрый и грязный подход к первым создать массив

val rows = rdd.collect() 
val rowCount = rdd.count() - 1 //since the first row has no distance 
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int]) 
var i = 0 //can be better solved in scala, I know ;) 

rows.foreach(row => { 
    if (predecssorPoint == null) { 
    predecssorPoint = getPointByRow(row._2) 
    } 
    else { 
    currentPoint = getPointByRow(row._2) 
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint) 

    i += 1 
    predecssorPoint = currentPoint 
    } 
}) 

return rowArray 

Тогда я распараллелить массив и вычислить расстояние

//create a parallel-enabled data set 
    val parallelDataSet = sc.parallelize(rows) 

    parallelDataSet.foreach(row => {  
    Functions.logDistance(row) 
}) 

Это работает, но это некрасиво и, конечно, неэффективно.

Моя идея была в том, чтобы использовать rdd.reduce(), чтобы избавиться от цикла foreach, и это может сработать, если функция расстояния обрабатывает проблему, когда порядок (a + b) не гарантируется.

В любом случае, есть ли лучшее решение? Я понимаю, что нет возможности иметь (эффективный) индексный доступ при работе с RDD.

Спасибо.

ответ

2

Учитывая, что заказ имеет ключевое значение, хорошим способом продолжения может быть, прежде всего, индексирование RDD. Затем, используя индекс, мы можем имитировать zip и иметь кортежи, разделенные по кластеру. Что-то вроде этого:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)} 
val joined = indexed.join(shifted) 
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)} 

(*) Пример кода - не тестировалось

Смежные вопросы