2015-05-27 2 views
0

У меня есть Актер, который был разработан для работы с akka-io acking, , так что он будет ждать Ack при отправке сообщений вверх по течению (в сеть ). Этот актер является интерфейсом для асинхронного приложения в бэкэнд.Интеграция актера с акк-потоком

Я хотел бы иметь оболочку слой, который позволяет мне преобразовать этот актера в Акка-потоки Flow[Incoming, Outgoing, ???] так что она может быть интегрированы с новыми библиотеками, которые ожидают такую ​​ подписи.

(Входящие сообщения от вверх по течению редко, поэтому мы все равно слишком много о backpressuring там, но это не было бы плохо для есть.)

sealed trait Incoming //... with implementations 
sealed trait Outgoing //... with implementations 
object Ack 

// `upstream` is an akka-io connection actor that will send Ack 
// when it writes an Outgoing message to the socket 
class SimpleActor(upstream: Actor) extends Actor { 
    def receive = { 
    case in: Incoming if sender() == upstream => 
     // does some work in response to upstream 
    case other => 
     // does some work in response to downstream 
     // including sending messages to upstream and 
     // `becoming` a stashing state waiting for Ack 
     // to `unbecome`, then sending Ack downstream 
     // (which will respect the backpressure). 
    } 
} 

Я его на хороший авторитет из списка рассылки akka-user, который не содержит кода в акк-потоках, который объединяет актеров с потоками , и для того, чтобы подключить Актера к потоку и сохранить Противодавление на основе Ack, нужно было бы реализовать PushPullStage.

кажется, что мы на самом деле нужно два PushPullStage s здесь ... один для upstream => SimpleActor и один для SimpleActor => upstream.

Мои вопросы:

  1. Существуют ли какие-либо библиотеки, которые предлагают объединения, такие как это между актерами и потоками?
  2. Есть ли более простой способ сделать это, чем реализовать двунаправленный PushPullStage с нуля?
  3. Существует ли какая-либо существующая тестовая структура, которая позволила бы протестировать такую ​​реализацию?
+0

Вы пробовали использовать актер ask и 'mapAsync'? Если нет, то писать «PushPullStage» все еще намного проще, чем писать «ActorProcessor». – jrudolph

+0

@jrudolph, но у меня нет ответа на запрос (если бы это было так, я бы использовал REST вместо WebSockets). Издатель может отправлять сообщения в любое время. – fommil

+0

Я предполагаю, что «в любое время» вы подразумеваете, когда это допустимо для противодавления? Как это работает? Если вы создаете поток выходов для каждого элемента ввода, вы можете просто моделировать его таким образом и затем сгладить поток. – jrudolph

ответ

1

Да, вы можете интегрировать актеров с потоками.
Для этой цели есть специальные актеры: актер-издатель и актер-подписчик.

Это все здесь: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

Конечно, вы должны написать актер таким образом, что он работает с противодавлением потоков. Но вам не нужна стадия выталкивания.

+0

Это звучит намного проще, чем внедрение новой фазы и реализация ее методов. Я надеялся избежать необходимости писать промежуточного актера, но похоже, что это единственный путь вперед - реализация как издателя, так и подписчика. Это будет ** лот ** шаблона: - / – fommil

5

Я думаю, что философия акка-потока состоит в том, чтобы обеспечить кирпичи низкого уровня и построить над ними более высокоуровневые инструменты. Если вы посмотрите на нашу недавно выпущенную библиотеку с открытым исходным кодом https://github.com/MfgLabs/akka-stream-extensions, вы увидите, что мы это точно сделали. Мы предоставляем некоторые полезные структуры, чтобы упростить управление лимитерами скорости, процессорами с сохранением состояния, лень & генераторы и т. Д. Для интеграции с актером, я думаю, что должно быть возможно создать своего рода помощников, чтобы упростить интеграцию актеров с аккой -stream пытается распространять противодавление. Акка-Стрим еще молод и экосистема продолжает расти;)

Смежные вопросы