2016-02-08 3 views
3

У меня есть WebSocket, с которым клиенты могут подключаться. У меня также есть поток данных с использованием akka-потоков. Как я могу сделать так, чтобы все клиенты получали одни и те же данные. В настоящий момент они, похоже, мчатся за данные.Akka-Http Websockets: Как отправить потребителям тот же поток данных

Благодаря

+0

Попробуйте реализовать или использовать систему каналов pub/sub broker, такую ​​как Redis ... и помните, что вы можете обрабатывать websocket только потоком или потоком и раковиной. Поток данных akka может завершиться раковиной, которая публикует данные на канал. – salc2

+0

@ salc2 Я думаю, что мне нужен поток событий, который позволит многоуровневому коммуникационному решению. – Ciaran0

ответ

7

Одним из способов вы можете сделать, это должно иметь актера, который расширяет ActorPublisher и он подписаться на какое-то сообщение.

class MyPublisher extends ActorPublisher[MyData]{ 

    override def preStart = { 
    context.system.eventStream.subscribe(self, classOf[MyData]) 
    } 

    override def receive: Receive = { 

    case msg: MyData ⇒ 
     if (isActive && totalDemand > 0) { 
     // Pushes the message onto the stream 
     onNext(msg) 
     } 
    } 
} 

object MyPublisher { 
    def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher()) 
} 

case class MyData(data:String) 

Вы можете использовать этот актер в качестве источника для потока:

val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext)) 

Вы можете создать поток из этого источника данных и применить преобразование для преобразования данных в сообщение WebSocket

val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)}) 

Тогда вы можете использовать этот поток в своем маршруте.

path("readings") { 
    handleWebsocketMessages(myFlow) 
} 

От обработки исходного потока, вы можете опубликовать данные в потоке событий и любого экземпляра этого актера будет забрать его и положить в на поток, что их WebSocket в настоящее время обслуживаются.

val actorSystem = ActorSystem("foo") 

    val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator) 

    otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))} 

Каждый сокет будет иметь свой собственный экземпляр актера, чтобы предоставить ему все данные, поступающие из одного источника.

+0

Спасибо Al. Этот тип решения я попытался избежать в комментариях выше. Это кажется очень хорошим решением проблемы. В настоящее время я пытаюсь реализовать это. Спасибо за пример. – Ciaran0