2016-06-26 3 views
2

Я пытаюсь подключить akka-streamSource с помощью websocket.Source.actorRef не потоковая передача в websocket с akka-stream

object TestWebServer { 

    val source1 = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead) 
    .map { case [email protected](a,b,c,d,e,f) => println("Received from stream" + msg);TextMessage(c) } 

    import scala.concurrent.duration._ 
    val source2 = Source.tick(initialDelay = 0 second, interval = 1 second, tick = TextMessage("tick")) 



    def main(args: Array[String]) { 

    implicit val system = ActorSystem("my-system") 
    implicit val materializer = ActorMaterializer() 
    // needed for the future flatMap/onComplete in the end 
    implicit val executionContext = system.dispatcher 


    val requestHandler: HttpRequest => HttpResponse = { 
     case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) => 
     req.header[UpgradeToWebSocket] match { 
      case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source1) 
      case None => HttpResponse(400, entity = "Not a valid websocket request!") 
     } 
     case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") 
    } 


    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
     .flatMap(_.unbind()) // trigger unbinding from the port 
     .onComplete(_ => system.terminate()) // and shutdown when done 
    } 
} 

Использование клиента, таких как Simple Web Socket Client (SWSC), я могу видеть, что

  • если я подключаю WebSocket с source1, upgrade.handleMessagesWithSinkSource(Sink.ignore, source1) я не вижу что-нибудь с SWSC
  • если я подключу websocket с source2, upgrade.handleMessagesWithSinkSource(Sink.ignore, source2) Я вижу сообщение tick, отображаемое на консоли SVSC каждые 1 секунду (ожидается)

Когда я отправляю сообщение source1, я могу видеть сообщение Received from stream. Поэтому я считаю, что source1 настроен правильно.

Кто-нибудь знает, как я могу заставить source1 вести себя как source2? Что-то специальное для подключения source1 к?

Спасибо

Обновлено:

  • Я обновил код. Я фактически объявил 2 источника за пределами main, чтобы я мог использовать его из другой системы Actor для отправки сообщения. Это правильный способ поделиться ссылкой для отправки сообщения source1 или использовать что-то вроде actorSelection или вариант?
  • Как упоминалось Владимир Матвеев, я попытался: source1.mapMaterializedValue { ref => ref ! WsMessage(..., "x", ...); ref ! WsMessage(..., "y", ...) }, но я до сих пор не может видеть обновления в SWSC

Вот код моего тестового клиента:

object Test { 

    def main(args: Array[String]): Unit = { 

    implicit val system = ActorSystem("my-system2") 
    implicit val materializer = ActorMaterializer() 
    // needed for the future flatMap/onComplete in the end 
    implicit val executionContext = system.dispatcher 

    val source1Client = TestWebServer.source1 
    source1Client.mapMaterializedValue { ref => ref ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0); ref ! WsMessage(DateTime.now(), "y", "yy", 0, 0, 0) } 

    val source11Client = TestWebServer.source1 
    val actorRefClient = source11.to(Sink.ignore).run() 
    actorRef2 ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0) 

    } 
} 
  • source1 из Test не достигает Source.actorRef (source1 в TestWebServer)
  • actorRefClient делает достичь source1 в TestWebServer в печати в консоли Received from streamWsMessage(...)
+0

Ваш код работает правильно для меня. Вы уверены, что отправляете сообщения правильному актеру? Я сделал mapMaterializedValue {ref => ref! WsMessage (..., "x", ...); ref! WsMessage (..., "y", ...)} 'на источнике, и я могу видеть' x' и 'y' на выходе соединения websocket. –

+0

Владимир, посмотри мои обновления - Спасибо! – ccheneson

ответ

1

Мне очень жаль, но ваши обновления не помогают. Ниже приведен пример программа, которая работает хорошо для меня, и когда я использую Акку-клиент WebSockets клиента и, когда я использую некоторые внешние средства, как wsta:

import java.time.Instant 
import scala.io.StdIn 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.model.ws.{TextMessage, UpgradeToWebSocket, WebSocketRequest} 
import akka.stream.{ActorMaterializer, OverflowStrategy} 
import akka.stream.scaladsl.{Flow, Sink, Source} 

case class WsMessage(a: Instant, b: String, c: String, d: Int, e: Int, f: Int) 

object MainServer extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 

    val source = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead) 
    .map { 
     case [email protected](_, _, c, _, _, _) => 
     println(s"Received from stream: $msg") 
     TextMessage(c) 
    } 
    .mapMaterializedValue { ref => 
     ref ! WsMessage(Instant.now(), "a", "x", 0, 0, 0) 
     ref ! WsMessage(Instant.now(), "b", "y", 0, 0, 0) 
    } 

    val requestHandler: HttpRequest => HttpResponse = { 
    case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) => 
     req.header[UpgradeToWebSocket] match { 
     case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source) 
     case None => HttpResponse(StatusCodes.BadRequest, entity = "Not a valid websocket request!") 
     } 
    case _ => 
     HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") 
    } 

    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 
    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => actorSystem.terminate()) 
} 

object MainClient extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 

    Http() 
    .singleWebSocketRequest(WebSocketRequest(Uri("ws://localhost:8080/ws")), Flow.fromSinkAndSource(Sink.foreach(println), Source.empty)) 
    Thread.sleep(5000) 
    actorSystem.terminate() 
} 

Я не вижу какое-либо важное различия между этой программой и вашим. Эта программа также включает в себя как сервер, так и клиент, поэтому вы можете запустить сервер, а затем запустить клиент несколько раз.Например, вот мой вывод сервера после двух пробегов клиента:

Server online at http://localhost:8080/ 
Press RETURN to stop... 
Received from stream: WsMessage(2016-06-28T08:58:21.478Z,a,x,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:21.478Z,b,y,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:29.925Z,a,x,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:29.925Z,b,y,0,0,0) 

А вот один из выхода клиентов:

TextMessage.Strict(x) 
TextMessage.Strict(y) 

я вижу нечто подобное, когда я бегу wsta ws://localhost:8080/ws.

Кроме того, это не имеет значения, где вы объявляете Source с, Sink с или Flow S: они являются непреложными чертежи, которые только могут «действовать», когда они run().

+0

Теперь я вижу сообщения, распечатанные, если я использую ваш код. Единственное различие, которое я вижу между моим кодом и вашим, заключается в том, что ваш источник отправляет сообщение из своего определения. В моем коде я получил «источник», и я отправляю ему сообщение (в основном я хочу, чтобы внешний участник отправлял сообщение в «источник»). Я думаю, что проблема может возникнуть при запуске web-сервера в 1 JVM, и я запускаю свой тестовый клиент в другой JVM (другая программа). Может быть? – ccheneson

+0

@ccheneson, каков ваш тестовый клиент? В моем примере выше есть объект «MainClient», который является отдельным приложением. Естественно, когда я тестировал его, работая вместе с «MainServer», они запускались в отдельных JVM. –

+0

@ccheneson, также, как именно вы «извлекаете» исходного актера? Я не вижу возможности сделать это без «mapMaterializedValue» и хранить его где-то внутри «var» или что-то вроде этого, что, естественно, не очень приятно. –

Смежные вопросы