У меня есть Актер, который был разработан для работы с akka-io acking, , так что он будет ждать Ack при отправке сообщений вверх по течению (в сеть ). Этот актер является интерфейсом для асинхронного приложения в бэкэнд.Интеграция актера с акк-потоком
Я хотел бы иметь оболочку слой, который позволяет мне преобразовать этот актера в Акка-потоки Flow[Incoming, Outgoing, ???]
так что она может быть интегрированы с новыми библиотеками, которые ожидают такую подписи.
(Входящие сообщения от вверх по течению редко, поэтому мы все равно слишком много о backpressuring там, но это не было бы плохо для есть.)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream
// including sending messages to upstream and
// `becoming` a stashing state waiting for Ack
// to `unbecome`, then sending Ack downstream
// (which will respect the backpressure).
}
}
Я его на хороший авторитет из списка рассылки akka-user, который не содержит кода в акк-потоках, который объединяет актеров с потоками , и для того, чтобы подключить Актера к потоку и сохранить Противодавление на основе Ack, нужно было бы реализовать PushPullStage.
кажется, что мы на самом деле нужно два PushPullStage
s здесь ... один для upstream => SimpleActor
и один для SimpleActor => upstream
.
Мои вопросы:
- Существуют ли какие-либо библиотеки, которые предлагают объединения, такие как это между актерами и потоками?
- Есть ли более простой способ сделать это, чем реализовать двунаправленный
PushPullStage
с нуля? - Существует ли какая-либо существующая тестовая структура, которая позволила бы протестировать такую реализацию?
Вы пробовали использовать актер ask и 'mapAsync'? Если нет, то писать «PushPullStage» все еще намного проще, чем писать «ActorProcessor». – jrudolph
@jrudolph, но у меня нет ответа на запрос (если бы это было так, я бы использовал REST вместо WebSockets). Издатель может отправлять сообщения в любое время. – fommil
Я предполагаю, что «в любое время» вы подразумеваете, когда это допустимо для противодавления? Как это работает? Если вы создаете поток выходов для каждого элемента ввода, вы можете просто моделировать его таким образом и затем сгладить поток. – jrudolph