Я разработал один проект сейчас, где у меня было то же самое требование, я использовал пьедестал в сочетании с core.async для реализации SSE, и он работает очень хорошо.
К сожалению, я не могу с открытым исходным кодом выполнять эту работу сейчас, но в основном, я сделал что-то вроде приведенных ниже фрагментов, только сложнее из-за аутентификации, что не особенно просто в SSE из браузера, потому что вы не можете передавать любые пользовательские заголовки в новый EventSource (SOME_URI); вызов.
Так сниппет:
(ns chat-service.service
(:require [clojure.set :as set]
[clojure.core.async :as async :refer [<!! >!! <! >!]]
[cheshire.core :as json]
[io.pedestal.service.http :as bootstrap]
[io.pedestal.service.log :as log]
[io.pedestal.service.http.route :as route]
[io.pedestal.service.http.sse :as sse]
[io.pedestal.service.http.route.definition :refer [defroutes]]))
(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})
(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))
;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))
(defn- sse-msg [event msg-data]
{:event event :msg msg-data})
;; service functions
(defn- remove-subscriber
"Removes transport channel from atom subscribers->notifications and tears down
SSE connection."
[transport-channel context]
(let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
(log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
(swap! subscribers->notifications dissoc subscriber)
(sse/end-event-stream context)))
(defn send-event
"Sends updates via SSE connection, takes also transport channel to close it
in case of the exception."
[transport-channel context {:keys [event msg]}]
(try
(log/info :msg "calling event sending fn")
(sse/send-event context event (json/generate-string msg json-opts))
(catch java.io.IOException ioe
(async/close! transport-channel))))
(defn create-transport-channel
"Creates transport channel with receiving end pushing updates to SSE connection.
Associates this transport channel in atom subscribers->notifications under random
generated UUID."
[context]
(let [temporary-id (generate-id)
channel (async/chan)]
(swap! subscribers->notifications assoc temporary-id channel)
(async/go-loop []
(when-let [payload (<! channel)]
(send-event channel context payload)
(recur))
(remove-subscriber channel context))
(async/put! channel (sse-msg "eventsourceVerification"
{:handshakeToken temporary-id}))))
(defn subscribe
"Subscribes anonymous user to SSE connection. Transport channel with timeout set up
will be created for pushing any new data to this connection."
[context]
(create-transport-channel context))
(defroutes routes
[[["/notifications/chat"
{:get [::subscribe (sse/start-event-stream subscribe)]}]]])
(def service {:env :prod
::bootstrap/routes routes
::bootstrap/resource-path "/public"
::bootstrap/type :jetty
::bootstrap/port 8081})
One «проблема» я столкнулся это путь по умолчанию, как пьедестал ручка упала соединения SSE.
Из-за запланированного задания биения он регистрирует исключение, когда соединение отбрасывается, и вы не вызывали контекст конечного потока событий.
Я хочу, чтобы был способ отключить/настроить это поведение или, по крайней мере, предоставить мою собственную функцию срыва, которая будет вызываться всякий раз, когда работа с биением прерывается с EofException.
см. Https://github.com/sunng87/ring-jetty9-adapter и http-kit также поддерживают ws – edbond
Поддержка websockets = http://caniuse.com/websockets http-kit docs: http: // http- kit.org/server.html#channel Как только вы получили сообщение, положите! это для канала, и вы сделали. – edbond