2015-08-25 2 views
1

Я разработал функцию в clojure, чтобы заполнить пустой столбец из последнего непустого значения, я предполагаю, что это работает, учитываяПреобразование из clojure.lang.LazySeq для ввода org.apache.spark.api.java.JavaRDD

(:require [flambo.api :as f]) 

(defn replicate-val 
    [ rdd input ] 
    (let [{:keys [ col ]} input 
    result (reductions (fn [a b] 
         (if (empty? (nth b col)) 
          (assoc b col (nth a col)) 
          b)) rdd)] 
(println "Result type is: "(type result)))) 

Got это:

;=> "Result type is: clojure.lang.LazySeq" 

вопрос заключается в том, как преобразовать это назад к типу JavaRDD, используя flambo (искровой обертку)

Я попытался (f/map result #(.toJavaRDD %)) в let формы, чтобы попытаться преобразовать JavaRDD типа

Я получил эту ошибку

"No matching method found: map for class clojure.lang.LazySeq" 

, который, как ожидается, так как результат типа clojure.lang.LazySeq

Вопрос в том, как я делаю это преобразование, или как я могу реорганизуйте код, чтобы разместить это.

Вот входной образец рдд:

(type rdd) ;=> "org.apache.spark.api.java.JavaRDD" 

Но выглядит следующим образом:

[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]] 

Обязательный выход:

[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]] 

Спасибо.

ответ

1

Прежде всего, RDD не являются итерабельными (не реализуйте ISeq), поэтому вы не можете использовать reductions. Игнорирование того, что вся идея доступа к предыдущей записи довольно сложная. Прежде всего, вы не можете напрямую получать доступ к значениям из другого раздела. Более того, только преобразования, которые не требуют перетасовки, сохраняют порядок.

Самый простой способ здесь - использовать функции Data Frames и Window с явным порядком, но насколько я знаю, Flambo не реализует требуемые методы. Всегда можно использовать raw SQL или получить доступ к API Java/Scala, но если вы хотите этого избежать, вы можете попробовать следующий конвейер.

Первая позволяет создать переменную широковещательный с последних значений в раздел:

(require '[flambo.broadcast :as bd]) 
(import org.apache.spark.TaskContext) 

(def last-per-part (f/fn [it] 
    (let [context (TaskContext/get) xs (iterator-seq it)] 
    [[(.partitionId context) (last xs)]]))) 

(def last-vals-bd 
(bd/broadcast sc 
    (into {} (-> rdd (f/map-partitions last-per-part) (f/collect))))) 

Далее некоторые помощник для фактической работы:

(defn fill-pair [col] 
    (fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b)))) 

(def fill-pairs 
    (f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID 
        xs (iterator-seq it) ;; Convert input to seq 
        prev (if (zero? part-id) ;; Find previous element 
        (first xs) ((bd/value last-vals-bd) part-id))   
        ;; Create seq of pairs (prev, current) 
        pairs (partition 2 1 (cons prev xs)) 
        ;; Same as before 
        {:keys [ col ]} input 
        ;; Prepare mapping function 
        mapper (fill-pair col)] 
       (map mapper pairs)))) 

Наконец, вы можете использовать fill-pairs для map-partitions:

(-> rdd (f/map-partitions fill-pairs) (f/collect)) 

Скрытое предположение заключается в том, что порядок разбиения следуют порядку значений. Это может быть или не быть в общем случае, но без явного заказа это, вероятно, лучшее, что вы можете получить.

Альтернативный подход заключается в zipWithIndex, порядок подкачки значений и выполнять соединение со смещением.

(require '[flambo.tuple :as tp]) 

(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %))) 

(def rdd-idx-offset 
    (f/map-to-pair rdd-idx 
    (fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p)))))) 

(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple) 

Дальше вы можете сопоставить, используя аналогичный подход, как и раньше.

Редактировать

on using atoms Быстрое примечание. В чем проблема отсутствия ссылочной прозрачности и что вы используете случайные свойства данной реализации, а не контракт. В семантике map нет ничего, что требует обработки элементов в заданном порядке. Если внутренняя реализация изменится, она может быть недействительной. Использование Clojure

(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa)) 

(def a (atom 0)) 
(map foo (range 1 20)) 

по сравнению с:

(def a (atom 0)) 
(pmap foo (range 1 20)) 
+0

@Jyd Я добавил краткий комментарий о атомах. Я думаю, вы найдете это полезным. – zero323

+0

Я очень ценю усилия, которые вы вложили в это, я не был уверен в использовании изменчивых структур данных в разработке clojure, как советовал, его нет пути, если я мутирую структуру данных, просто пытаясь избежать использования измененных данных, особенно сейчас, когда я нахожусь на раннем этапе clojure dev. или что вы посоветуете? – Jyd

+0

Спасибо, сейчас – Jyd

Смежные вопросы