2015-07-08 4 views
2

Я пытаюсь получить отфильтрованный список списка аукционов во время конкретных выигрышных аукционов при использовании искры. Выигрышная аукцион РДД и полные аукционы DD состоит из тематических классов в формате:
case class auction(id: String, prodID: String, timestamp: Long)Spark inested transformations SPARK-5063

Я хотел бы, чтобы отфильтровать полные аукционы РДДА, где торги происходили в течение 10 секунд после победы аукциона, на тот же идентификатор продукта и получить полный RDD.

Я попытался отфильтровать это следующим образом:

val specificmessages = winningauction.map(it => 
    allauctions.filter(x => 
    x.timestamp > it.timestamp - 10 && 
    x.timestamp < it.timestamp + 10 && 
    x.productID == it.productID 
) 
) 

Есть ли способ, чтобы выполнить это, как вложенные преобразования не представляется возможным?

есть еще один ответ, но это в основном имеет дело с вложенными картами SPARK-5603 nested map funcitons

+0

Постарайся смотреть на 'cartesian' https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD, чтобы создать новый RDD и применить к нему свой фильтр. – ccheneson

+1

Untested: val specificmessages = allauctions.cartesian (victoryauction) .filter ( \t (x, y) => x.timestamp> y.timestamp - 10 && x.timestamp ccheneson

+0

yep, спасибо, что напомнили мне о декартовой функции. Это безупречное удовольствие. Можете ли вы добавить его в качестве ответа? – eboni

ответ

5

Попробуйте посмотреть на метод cartesian построить новый RDD и применить фильтр к нему

val specificmessages = allauctions.cartesian(winningauction) 
            .filter((x, y) => x.timestamp > y.timestamp - 10 && 
               x.timestamp < y.timestamp + 10 && 
               x.productID == y.productID)