2016-05-26 2 views
2

У меня есть существующее приложение akka, созданное на веб-сайтах socko. Связь с сокетами происходит внутри одного актера, а сообщения, выходящие и входящие в актер (входящие и исходящие сообщения, соответственно), помечены идентификатором сокета, который является свойством первого класса в socko websocket (в socko поступает запрос на соединение с меткой id, и все переходы жизненного цикла, такие как установление связи, разъединение, входящие кадры и т. д. аналогично помечены)Перемещение из socko в akka-http websockets

Я хотел бы переопределить этого одного актера, используя akka-http (socko - в наши дни, по понятным причинам), но это не так просто, потому что две библиотеки концептуально отличаются друг от друга; akka-http скрывает детали нижнего уровня подтверждения связи, отключения и т. д., просто отправляя какой-либо актер, связанный с http-сервером заголовком запроса UpgradeToWebsocket. Объект заголовка содержит метод, который берет материализованный поток как обработчик для всех сообщений, обмениваемых с клиентом.

До сих пор так хорошо; Я могу получать сообщения в веб-сокете и отвечать на них напрямую. В официальных примерах все принимают какую-то модель запроса-ответа без гражданства, поэтому я борюсь с пониманием того, как сделать следующий шаг для присвоения метки материализованному потоку, управления его жизненным циклом и состоянием соединения (мне нужно информировать других участников в приложение, когда соединение отбрасывается клиентом, а также помечены сообщениями.)

Альтернатива (ремоделирование всего приложения с использованием akka-streams) - слишком большая работа, поэтому любые советы о том, как отслеживать из сокетов будет высоко оценена.

ответ

2

Ответ от Rüdiger Klaehn был полезной отправной точкой, спасибо!

В конце я пошел с ActorPublisher после прочтения здесь другого вопроса (Pushing messages via web sockets with akka http).

Главное, что поток «материализуется» где-то под капюшоном akka-http, поэтому вам нужно передать в UpgradeToWebSocket.handleMessagesWithSinkSource пару Source/Sink, которая уже знает о существующем акторе. Поэтому я создаю актера (который реализует ActorPublisher[TextMessage.Strict]), а затем оберните его в Source.fromPublisher(ActorPublisher(myActor)).

Если вы хотите, чтобы ввести сообщение в поток из receive метода актера сначала проверить, если totalDemand > 0 (то есть поток готов принимать входные данные), и если да, вызовите onNext с содержимым сообщения.

3

Чтобы взаимодействовать с существующей системой на основе актеров, вы должны посмотреть на Source.actorRef и Sink.actorRef. Source.actorRef создает ActorRef, на который вы можете отправлять сообщения, а Sink.actorRef позволяет обрабатывать входящие сообщения с помощью актера, а также обнаруживать закрытие веб-узла.

Для подключения действующего актера, созданного Source.actorRef, используйте существующий долгоживущий актер, используйте Flow#mapMaterializedValue. Это также было бы хорошим местом для назначения уникального идентификатора для подключения сокета.

Это может быть ваше сообщение answer to a related question.

Одна вещь, о которой нужно знать. Текущая реализация websocket делает не закрывает поток сервера к клиенту, когда поток клиента к серверу закрывается с помощью websocket close message. Для реализации этого есть issue, но пока он не будет реализован, вы должны сделать это сами. Например, имея something like this в стеке протокола.

Смежные вопросы