Одним из способов вы можете сделать, это должно иметь актера, который расширяет ActorPublisher и он подписаться на какое-то сообщение.
class MyPublisher extends ActorPublisher[MyData]{
override def preStart = {
context.system.eventStream.subscribe(self, classOf[MyData])
}
override def receive: Receive = {
case msg: MyData ⇒
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}
object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}
case class MyData(data:String)
Вы можете использовать этот актер в качестве источника для потока:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
Вы можете создать поток из этого источника данных и применить преобразование для преобразования данных в сообщение WebSocket
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
Тогда вы можете использовать этот поток в своем маршруте.
path("readings") {
handleWebsocketMessages(myFlow)
}
От обработки исходного потока, вы можете опубликовать данные в потоке событий и любого экземпляра этого актера будет забрать его и положить в на поток, что их WebSocket в настоящее время обслуживаются.
val actorSystem = ActorSystem("foo")
val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator)
otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}
Каждый сокет будет иметь свой собственный экземпляр актера, чтобы предоставить ему все данные, поступающие из одного источника.
Попробуйте реализовать или использовать систему каналов pub/sub broker, такую как Redis ... и помните, что вы можете обрабатывать websocket только потоком или потоком и раковиной. Поток данных akka может завершиться раковиной, которая публикует данные на канал. – salc2
@ salc2 Я думаю, что мне нужен поток событий, который позволит многоуровневому коммуникационному решению. – Ciaran0