2016-03-10 4 views
7

У меня есть поток входов, и я хочу сделать 2 HTTPS сетевых запросов для каждого, прежде чем передавать результат в другую часть программы. Типичная пропускная способность составляет 50 в секунду.Как создать большое количество одновременных запросов HTTPS в Clojure (/ Java)

for each input: 
    HTTP request A 
    HTTP request B 
    pass event on with (A.body and B.body) 

Я использую клиент http-kit, который асинхронно по умолчанию. Он возвращает обещание и может также принимать обратный вызов. Http-kit использует Java NIO (см. here и here)

Скорость поступающих запросов в сочетании со временем подачи запроса достаточно высока, чтобы это нужно было выполнить асинхронно.

Я пробовал 3 подхода:

  1. Когда событие приходит, положить его на канале. Несколько процедур go, вытягивающих канал. Каждый из них делает запросы, которые «блокируют» goblock на deref, обещают запросы HTTP. Это не работает, потому что я не думаю, что это обещание хорошо работает с потоками.
  2. Когда приходит событие, немедленно запустите future, который «блокирует» по асинхронным обещаниям. Это дает очень хорошее использование центрального процессора. Плюс голод сетевых ресурсов как-то.
  3. Когда приходит событие, немедленно вызовите запрос http-kit для запроса A, передавая обратный вызов, который делает запрос B, передавая обратный вызов, который передает событие. Это приводит к ошибке из памяти после нескольких часов.

Все они работают и обрабатывают емкость на некоторое время. В конечном итоге все они разбиваются. Последний краш, примерно через 12 часов:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending 
tasks! 
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status: 
     Managed Threads: 3 
     Active Threads: 1 
     Active Tasks: 
       com[email protected]65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0) 
     Pending Tasks: 
       [email protected]0d 
Pool thread stack traces: 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main] 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen 
java.lang.OutOfMemoryError: Java heap space 
     at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) 
     at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76) 
     at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65) 
     at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63) 
     at sun.security.ssl.Handshaker.activate(Handshaker.java:514) 
     at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717) 
     at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743) 
     at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310) 
     at org.httpkit.client.HttpClient.run(HttpClient.java:375) 
     at java.lang.Thread.run(Thread.java:745) 
Mar 10, 2016 4:56:34 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 5:00:43 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 4:58:25 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 

Я не знаю, в чем причина отказа. Может быть, слишком много закрытий, на которые приходится держаться, или постепенная утечка ресурсов, или головокружение нитей.

Вопросы

  1. ли делает 50 HTTP запросов в секунду, каждый из которых может принимать 200мс, а это означает, что там может быть 100 запросов в полете в любое время, звук, как чрезмерное бремя?

  2. Как это сделать таким образом, чтобы обеспечить пропускную способность и надежность?

EDIT

YourKit профайлер говорит мне, что у меня есть около 2 Гб char[] с помощью org.httpkit.client.Handler с помощью java.util.concurrent.FutureTask с которой наводит на мысль, что ссылки на старые обработчики (т.е. запросы) сохраняются лишь каким-то образом. Целая причина для использования обратных вызовов заключалась в том, чтобы избежать этого (хотя они могут как-то быть уловлены в закрытии)

+1

OutOfMemoryError указывает на то, что проблема связана с памятью ... но мы не можем не видеть ваш код или написать полное решение с нуля. Я хотел бы держать на голове бесконечную последовательность или не очищать ресурсы, такие как соединения. –

+0

Я задавался вопросом, могут ли быть сохранены буферы, но насколько я могу судить, сбор мусора должен обрабатывать освобождение памяти/внешних буферов, например, NIO. Что происходит в нисходящем потоке, это в значительной степени просто вставка базы данных и вставка на канал. – Joe

+0

Я думал о размещении кода, но он довольно запутан, и потребуется около дня, чтобы узнать, не реплицировал ли проблему проблему в упрощенной версии. – Joe

ответ

0

Альтернатива вашему методу A(), связанная с HTTP-набором возвращаемых фьючерсов внутри блока go) может быть возможность, просто делая это таким образом, чтобы не блокировать ядро.Обработчик асинхронных потоки на будущем, которое вы можете сделать, комбинируя обратные вызовы httpkit в и core.async:

(defn handle-event 
"Return a core.async channel that will contain the result of making both HTTP call A and B." 
    [event-data] 
    (let [event-a-chan (clojure.core.async/chan) 
     event-b-chan (clojure.core.async/chan) 
     return-chan (clojure.core.async/chan)] 
    (org.httpkit.client/request "https://event-a-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-a-chan resp))) 
    (org.httpkit.client/request "https://event-b-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-b-chan resp))) 
    (clojure.core.async/go 
     (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan) 
              :event-b-response (clojure.core.async/<! event-b-chan)})) 
    return-chan)) 
0
  1. ли делают 50 HTTP запросов в секунду, каждый из которых может принимать 200мс, а это означает, что может быть 100 запросов в полете в любой момент времени, звучать как чрезмерное бремя?

Это, безусловно, не чрезмерное на современном оборудовании.

  1. Как это сделать таким образом, чтобы обеспечить пропускную способность и надежность?

Вы можете объединить core.async трубопроводы и обратные вызовы HTTP-Kit для достижения этой цели. Вам действительно не нужно создавать процедуру go для каждого запроса (хотя это не должно повредить), потому что вы можете использовать async put! из обратного вызова http-kit.

Используйте ограниченные буферы для каждого шага конвейера, чтобы ограничить количество активных соединений, которые будут (по крайней мере) ограничены количеством эфемерных портов TCP, доступных в вашей системе.

Вот пример небольшой программы, которая делает что-то похожее на то, что вы описали. Он считывает «события» из канала —, в этом случае каждое событие имеет идентификатор «1» — и ищет эти идентификаторы в службе HTTP. Он берет ответ от этого первого вызова, просматривает ключ JSON "next" и помещает его в качестве URL-адреса для шага 2. Наконец, когда этот поиск завершен, он добавляет событие в канал out, который выполняет обычную программу go для представления статистики.

(ns concur-req.core 
    (require [clojure.core.async :as async] 
      [cheshire.core :refer [decode]] 
      [org.httpkit.client :as http])) 

(defn url-of 
    [id] 
    ;; this service responds within 100-200ms 
    (str "http://localhost:28080/" id ".json")) 

(defn retrieve-json-async 
    [url c] 
    (http/get url nil 
      (fn [{body :body status :status :as resp}] 
       (if (= 200 status) 
       (async/put! c (decode body true)) 
       (println "ERROR:" resp)) 
       (async/close! c)))) 

(defn run [parallelism stop-chan] 
    (let [;; allocate half of the parallelism to each step 
     step1-n (int (max (/ parallelism 2) 1)) 
     step2-n step1-n 
     ;; buffer to take ids, transform them into urls 
     step1-chan (async/chan step1-n (map url-of)) 
     ;; buffer for result of pulling urls from step1, xform by extracting :next url 
     step2-chan (async/chan step2-n (map :next)) 
     ;; buffer to count completed results 
     out-chan (async/chan 1 (map (constantly 1))) 
     ;; for delivering the final result 
     final-chan (async/chan) 
     start-time (System/currentTimeMillis)] 

    ;; process URLs from step1 and put the result in step2 
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan) 
    ;; process URLs from step2 and put the result in out 
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan) 

    ;; keep the input channel full until stop-chan is closed. 
    (async/go-loop [] 
     (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])] 
     (if (= c stop-chan) 
      (async/close! step1-chan) 
      (recur)))) 

    ;; count messages on out-chan until the pipeline is closed, printing 
    ;; status message every second 
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0] 
     (let [[v c] (async/alts! [status-timer out-chan])] 
     (cond (= c status-timer) 
       (do (println subt "records...") 
        (recur (async/timeout 1000) 0 (+ subt accu))) 

       (nil? v) 
       (async/>! final-chan (+ subt accu)) 

       :else 
       (recur status-timer (+ v subt) accu)))) 

    ;; block until done, then emit final report. 
    (let [final-total (async/<!! final-chan) 
      elapsed-ms (- (System/currentTimeMillis) start-time) 
      elapsed-s (/ elapsed-ms 1000.0)] 
     (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n" 
        final-total parallelism elapsed-s 
        (int (/ final-total elapsed-s))))))) 

(defn run-for 
    [seconds parallelism] 
    (let [stop-chan (async/chan)] 
    (future 
     (Thread/sleep (* seconds 1000)) 
     (async/close! stop-chan)) 
    (run parallelism stop-chan))) 

(do 
    ;; Warm up the connection pool, avoid somaxconn problems... 
    (doseq [p (map #(* 20 (inc %)) (range 25))] 
    (run-for 1 p)) 
    (run-for (* 60 60 6) 500)) 

Чтобы проверить это, я настроил службу HTTP, которая отвечает только после спящего случайного времени между 100-200 мс. Затем я запускал эту программу в течение 6 часов на моем Macbook Pro.

С параллелизмом, установленным в 500, я получил в среднем 1155 транзакций в секунду (2310 завершенных HTTP-запросов в секунду). Я уверен, что это может быть намного выше при некоторой настройке (и особенно при перемещении HTTP-сервиса на другую машину). Память JVM ползала до 1,5 ГБ в течение первых 30 минут, а затем поддерживала этот размер. Я использую Oracle 64-bit 1.8 JVM.

Смежные вопросы