Прежде всего, RDD не являются итерабельными (не реализуйте ISeq
), поэтому вы не можете использовать reductions
. Игнорирование того, что вся идея доступа к предыдущей записи довольно сложная. Прежде всего, вы не можете напрямую получать доступ к значениям из другого раздела. Более того, только преобразования, которые не требуют перетасовки, сохраняют порядок.
Самый простой способ здесь - использовать функции Data Frames и Window с явным порядком, но насколько я знаю, Flambo не реализует требуемые методы. Всегда можно использовать raw SQL или получить доступ к API Java/Scala, но если вы хотите этого избежать, вы можете попробовать следующий конвейер.
Первая позволяет создать переменную широковещательный с последних значений в раздел:
(require '[flambo.broadcast :as bd])
(import org.apache.spark.TaskContext)
(def last-per-part (f/fn [it]
(let [context (TaskContext/get) xs (iterator-seq it)]
[[(.partitionId context) (last xs)]])))
(def last-vals-bd
(bd/broadcast sc
(into {} (-> rdd (f/map-partitions last-per-part) (f/collect)))))
Далее некоторые помощник для фактической работы:
(defn fill-pair [col]
(fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b))))
(def fill-pairs
(f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID
xs (iterator-seq it) ;; Convert input to seq
prev (if (zero? part-id) ;; Find previous element
(first xs) ((bd/value last-vals-bd) part-id))
;; Create seq of pairs (prev, current)
pairs (partition 2 1 (cons prev xs))
;; Same as before
{:keys [ col ]} input
;; Prepare mapping function
mapper (fill-pair col)]
(map mapper pairs))))
Наконец, вы можете использовать fill-pairs
для map-partitions
:
(-> rdd (f/map-partitions fill-pairs) (f/collect))
Скрытое предположение заключается в том, что порядок разбиения следуют порядку значений. Это может быть или не быть в общем случае, но без явного заказа это, вероятно, лучшее, что вы можете получить.
Альтернативный подход заключается в zipWithIndex
, порядок подкачки значений и выполнять соединение со смещением.
(require '[flambo.tuple :as tp])
(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %)))
(def rdd-idx-offset
(f/map-to-pair rdd-idx
(fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p))))))
(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple)
Дальше вы можете сопоставить, используя аналогичный подход, как и раньше.
Редактировать
on using atoms Быстрое примечание. В чем проблема отсутствия ссылочной прозрачности и что вы используете случайные свойства данной реализации, а не контракт. В семантике map
нет ничего, что требует обработки элементов в заданном порядке. Если внутренняя реализация изменится, она может быть недействительной. Использование Clojure
(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa))
(def a (atom 0))
(map foo (range 1 20))
по сравнению с:
(def a (atom 0))
(pmap foo (range 1 20))
@Jyd Я добавил краткий комментарий о атомах. Я думаю, вы найдете это полезным. – zero323
Я очень ценю усилия, которые вы вложили в это, я не был уверен в использовании изменчивых структур данных в разработке clojure, как советовал, его нет пути, если я мутирую структуру данных, просто пытаясь избежать использования измененных данных, особенно сейчас, когда я нахожусь на раннем этапе clojure dev. или что вы посоветуете? – Jyd
Спасибо, сейчас – Jyd