2017-02-21 11 views
0

Я запускаю несколько сотен параллельных запросов http-kit.client/get, снабженных обратным вызовом для записи результатов в один файл.Самый простой способ использовать обратный вызов i/o в параллельном http-kit/get экземплярах

Что было бы хорошим способом борьбы с потоками? С использованием chan и <!! от core.asyc?

Вот код, который я хотел бы рассмотреть:

(defn launch-async [channel url]                                 
    (http/get url {:timeout 5000                                 
       :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}            
      (fn [{:keys [status headers body error]}]                            
      (if error                                   
       (put! channel (json/generate-string {:url url :headers headers :status status}))                 
       (put! channel (json/generate-string body))))))                          

(defn process-async [channel func]                                
    (when-let [response (<!! channel)]                                
    (func response)))                                   

(defn http-gets-async [func urls]                                
    (let [channel (chan)]                                   
    (doall (map #(launch-async channel %) urls))                             
    (process-async channel func)))  

Спасибо за ваши идеи.

ответ

3

Поскольку вы уже используете core.async в своем примере, я подумал, что хочу указать несколько вопросов и как вы можете их решить. В другом ответе упоминается использование более основополагающего подхода, и я полностью согласен с тем, что более простой подход просто прекрасен. Однако с каналами у вас есть простой способ использования данных, которые не связаны с отображением над вектором, который также будет увеличиваться с течением времени, если у вас много ответов. Рассмотрите следующие проблемы и как их исправить:

(1) Ваша текущая версия сработает, если ваш URL-адрес содержит более 1024 элементов. Существует внутренний буфер для puts и принимает асинхронные (т. Е. put! и take! не блокируют, а всегда возвращают сразу), а ограничение равно 1024. Это необходимо для предотвращения неограниченного асинхронного использования канала. Чтобы убедиться в этом, позвоните по телефону (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com")).

Что вы хотите сделать, это только поместить что-то на канал, когда есть место для этого. Это называется обратным давлением. Принимая страницу из отличной вики на go block best practices, один из умных способов сделать это из вашего обратного вызова http-kit - использовать функцию обратного вызова put!, чтобы запустить следующий http-получение; это будет происходить только тогда, когда put! сразу удается, так что вы никогда не будете иметь ситуацию, в которой вы можете выйти за пределы буфера канала:

(defn launch-async 
    [channel [url & urls]] 
    (when url 
    (http/get url {:timeout 5000 
        :user-agent "Mozilla"} 
       (fn [{:keys [status headers body error]}] 
       (let [put-on-chan (if error 
            (json/generate-string {:url url :headers headers :status status}) 
            (json/generate-string body))] 
        (put! channel put-on-chan (fn [_] (launch-async channel urls)))))))) 

(2) Далее, вы, кажется, только один ответ обработки. Вместо этого используйте идти-цикл:

(defn process-async 
    [channel func] 
    (go-loop [] 
    (when-let [response (<! channel)] 
     (func response) 
     (recur)))) 

(3) Вот ваша http-gets-async функция. Я не вижу никакого вреда в добавлении буфера здесь, как это должно помочь вам выстрелить хороший всплеск запросов в начале:

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 1000)] 
    (launch-async channel urls) 
    (process-async channel func))) 

Теперь у вас есть возможность обрабатывать бесконечное число адресов, с бэк- давление. Чтобы проверить это, определите счетчик, а затем сделайте, чтобы ваша функция обработки увеличила этот счетчик, чтобы увидеть ваш прогресс. Используя локальный URL, который легко барабанить (не рекомендовал бы стреляя сотни тысяч запросов, скажем, Google и т.д.):

(def responses (atom 0)) 
(http-gets-async (fn [_] (swap! responses inc)) 
       (repeat 1000000 "http://localhost:8000")) 

Как все это асинхронный, ваша функция будет немедленно вернуться и вы можете посмотреть на @responses grow.

Еще одна интересная вещь, которую вы можете сделать, - это вместо того, чтобы выполнять функцию обработки в process-async, вы можете по желанию применить ее как датчик на самом канале.

(defn process-async 
    [channel] 
    (go-loop [] 
    (when-let [_ (<! channel)] 
     (recur)))) 

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 10000 (map func))] ;; <-- transducer on channel 
    (launch-async channel urls) 
    (process-async channel))) 

Есть много способов сделать это, в том числе строительства его так, что закрывает канал (обратите внимание, что выше, он остается открытым). У вас есть java.util.concurrent примитивов, чтобы помочь в этом отношении, если хотите, и они довольно просты в использовании. Возможности очень многочисленны.

+0

Я бы ожидал, что «запуск-asyc» будет переполнен при наличии большой последовательности URL-адресов. Почему это не так? (Я предполагаю, потому что это дано как официальное предложение). Thanks – user3639782

+0

другой предмет. Он записывает файл до тех пор, пока я отправляю код в repl (boot repl), но ничего не происходит, когда я обертываю один и тот же код внутри '-main' и запускаю его как скрипт. Должно ли это так вести себя? Спасибо – user3639782

+1

@ user3639782 каждый элемент, взятый из последовательности, сгенерированной функцией 'repeat', создается« по требованию », то есть последовательность является ленивой и фактически может быть бесконечной. Таким образом, список URL-адресов практически не имеет памяти. Что касается вашего другого вопроса, я не уверен, что вы хотите сказать о записи в файл. – Josh

1

Это достаточно просто, что я бы не использовал core.async для этого. Вы можете сделать это при хранении атома, используя вектор ответов, а затем отдельный поток, читающий содержимое атома, пока он не увидит все ответы. Затем, в вашем обратном вызове с помощью http-kit, вы можете просто получить ответ в атоме прямо swap!.

Если вы хотите использовать core.async, я бы рекомендовал буферный канал, чтобы не блокировать пул потоков http-kit.

Смежные вопросы