2016-02-03 4 views
4

Мои аккорды-актеры продолжают учиться. Я хотел бы интегрировать приложение akka-streams с akka-cluster and DistributedPubSubMediator.akka-streams с акка-кластером

Добавление поддержки публикации достаточно прямолинейно, но в разделе «Подписка» у меня возникают проблемы.

Для справки, абоненту дается следующим образом в Typesafe sample:

class ChatClient(name: String) extends Actor { 
    val mediator = DistributedPubSub(context.system).mediator 
    mediator ! Subscribe("some topic", self) 

    def receive = { 
    case ChatClient.Message(from, text) => 
     ...process message... 
    } 
} 

Мой вопрос, как я должен интегрировать этот актер с моим потоком, и как я должен убедиться, что я получаю публиковать сообщения в отсутствие противодавления потока?

Я пытаюсь выполнить модель pubsub, где один поток может опубликовать сообщение, а другой поток будет потреблять его (если он подписан).

+0

Я хотел бы закрыть этот, потому что у меня возникает другой вопрос, который более конкретен. –

ответ

8

Возможно, вы захотите, чтобы ваш актер расширил ActorPublisher. Затем вы можете создать источник из него и интегрировать его в свой поток.

Смотрите документацию на ActorPublisher здесь: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html

+0

Часть, которую я не получаю, состоит в том, что вы обычно создаете одного участника за подписку. Поэтому, когда я обрабатываю подписные сообщения, сам поток становится актерской фабрикой. Поток создавал бы одного актера для каждой желаемой подписки, и эти участники были бы привязаны к жизни потока (если только я не получаю запрос на отмену подписки). Я перечитаю документацию, возможно, лампочка погаснет. –

+0

Я выяснил, как создать актера для потока, все еще работая над работой с двумя источниками. Спасибо за предложение, я обновлю. –

2

Существует очень хороший YouTube presentation на этой самой теме. Интеграция с кластером подходит к концу, но весь разговор достаточно информативен.

+0

Спасибо. Я все еще не понимаю, как заставить ActorPublisher записывать в мой поток HTTP. У меня есть источник actorPublisher (src), как в Flow [A] .to (Sink.ignore) .runWith (src), но я думаю, что мне нужно отправить его на мой приемник akka-http вместо Sink.ignore. Но я не уверен, как получить эту раковину, или если это правильный способ сделать это. Все работает, за исключением того, что я не могу понять часть инъекции. –

0

Другие ответы устарели: они предлагают использовать ActorPublisher, который устарел с версии 2.5.0.

Для тех, кто интересуется текущим подходом, Колин Брек написал замечательную серию в своем блоге об интеграции аккордовых актеров и акков Акка. В течение серии Брек формирует систему, которая начинается с аккских потоков и простых актеров, затем включает в себя кластер Акка и упорство Акки. Первое сообщение в серии - here (часть обработки распределенного потока находится в part 3).

Смежные вопросы