2016-11-15 2 views
1

У меня есть список ISBN, которые выполняют поиск в Amazon. Я уже решил это последовательно, поэтому теперь задача заключается в реализации параллелизма. Я попробовал это, используя core.async. Проблема, с которой я столкнулась, заключается в том, что после завершения поиска мне нужно объединить все книги в коллекцию, чтобы я мог сортировать их по рангу книги. Поскольку я использую один канал с размером буфера 10, я не уверен, как это сделать. Мой подход может быть совершенно неправильным. Помощь приветствуется.Слияние буфера асинхронного канала

Вот функция для параллельности

(def book_channel (chan 10)) 

(defn concurrency_test [list_of_isbns] 
     ([doseq [isbn list_of_isbns] 
     (go(>! book_channel(get_title_and_rank_for_one_isbn(amazon_search isbn))))]) 
    ) 
) 

название прибудете:

(defn get_title_and_rank_for_one_isbn [amazon_report] 
    (def book_title (get-in amazon_report [:items 0 :item-atributes :title])) 
    (def sales_rank(get-in amazon_report [:items 0 :SalesRank])) 
    (def book_isbn(get-in amazon_report [:items 0 :asin])) 
    (reduce into [[book_title] [book_isbn] [sales_rank]])) 

и вызов:

(def list_of_isbns (split_isbns "src/clj_amazon/isbn_list.txt")) 
(concurrency_test list_of_isbns) 

ответ

0

Вы должны быть в состоянии использовать

(async/reduce conj '() book-chan) 

, чтобы создать коллекцию всех элементов в канале, но не забудьте закрыть канал, потому что уменьшение не вернет результат до тех пор, пока канал не будет закрыт.

0

Если ваша цель состоит в том, чтобы совместить все связанные с I/O задачи (например, amazon-search) и распараллелить все связанные с ЦПУ задачи (например, разбор отчета и т. Д.), Тогда вы должны изучить функции конвейера. Ниже приведены некоторые псевдокоды, иллюстрирующие их использование:

(let [isbn> (chan) 
     report> (chan) 
     out> (chan)] 

    ;; pipeline-async will take isbn from isbn> channel, invoke 
    ;; amazon-search-async and pipe the result to report> channel. 

    (pipeline-async 10 ;; up to 10 I/O bound requests 
        report> 
        amazon-search-asyn 
        isbn>) 

    ;; pipeline will take report from report> channel and feed it 
    ;; to the transducer (map get-title-and-rank-etc) for processing, 
    ;; the processed report will be pushed to the out> channel. 

    (pipeline (.. Runtime getRuntime availableProcessors) 
      out> 
      (map get-title-and-rank-etc) 
      report>) 

    ;; read isbn from file and push it to isbn> channel 
    (->> "isbn_list.txt" 
     io/reader 
     line-seq 
     (onto-chan isbn>)) 

    ;; take all report from out> channel and sort it by rank & title 
    (sort-by (juxt :rank :title) (<!! (async/into [] out>)))) 
+0

oh аккуратный! плохо посмотрите, работает ли это для меня. –

Смежные вопросы