Я пытаюсь подключить akka-stream
Source
с помощью websocket.Source.actorRef не потоковая передача в websocket с akka-stream
object TestWebServer {
val source1 = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead)
.map { case [email protected](a,b,c,d,e,f) => println("Received from stream" + msg);TextMessage(c) }
import scala.concurrent.duration._
val source2 = Source.tick(initialDelay = 0 second, interval = 1 second, tick = TextMessage("tick"))
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val requestHandler: HttpRequest => HttpResponse = {
case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source1)
case None => HttpResponse(400, entity = "Not a valid websocket request!")
}
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
Использование клиента, таких как Simple Web Socket Client
(SWSC), я могу видеть, что
- если я подключаю WebSocket с
source1
,upgrade.handleMessagesWithSinkSource(Sink.ignore, source1)
я не вижу что-нибудь с SWSC - если я подключу websocket с
source2
,upgrade.handleMessagesWithSinkSource(Sink.ignore, source2)
Я вижу сообщениеtick
, отображаемое на консоли SVSC каждые 1 секунду (ожидается)
Когда я отправляю сообщение source1
, я могу видеть сообщение Received from stream
. Поэтому я считаю, что source1
настроен правильно.
Кто-нибудь знает, как я могу заставить source1
вести себя как source2
? Что-то специальное для подключения source1
к?
Спасибо
Обновлено:
- Я обновил код. Я фактически объявил 2 источника за пределами
main
, чтобы я мог использовать его из другой системы Actor для отправки сообщения. Это правильный способ поделиться ссылкой для отправки сообщенияsource1
или использовать что-то вродеactorSelection
или вариант? - Как упоминалось Владимир Матвеев, я попытался:
source1.mapMaterializedValue { ref => ref ! WsMessage(..., "x", ...); ref ! WsMessage(..., "y", ...) }
, но я до сих пор не может видеть обновления в SWSC
Вот код моего тестового клиента:
object Test {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("my-system2")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val source1Client = TestWebServer.source1
source1Client.mapMaterializedValue { ref => ref ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0); ref ! WsMessage(DateTime.now(), "y", "yy", 0, 0, 0) }
val source11Client = TestWebServer.source1
val actorRefClient = source11.to(Sink.ignore).run()
actorRef2 ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0)
}
}
source1
изTest
не достигаетSource.actorRef
(source1
вTestWebServer
)actorRefClient
делает достичьsource1
вTestWebServer
в печати в консолиReceived from streamWsMessage(...)
Ваш код работает правильно для меня. Вы уверены, что отправляете сообщения правильному актеру? Я сделал mapMaterializedValue {ref => ref! WsMessage (..., "x", ...); ref! WsMessage (..., "y", ...)} 'на источнике, и я могу видеть' x' и 'y' на выходе соединения websocket. –
Владимир, посмотри мои обновления - Спасибо! – ccheneson