2015-12-29 3 views
1

Я использую потоки Akka в контексте, где раковины для одного источника приходят и уходят. По этой причине я создаю издатель из источника и прикрепляя абонент как потребность возникает:Как установить inputBuffer издателя потока akka?

val publisher= mySource.runWith(Sink.publisher(true)) 

с

publisher.subscribe(subscriber1)// There will be others 

Некоторые абонентов будет быстрее, чем другие, и я хотел бы позволяют быстрее работать независимо от самого медленного, по крайней мере, до уровня, разрешенного входным буфером издателя. Этот буфер описывается комментарием на Sink.publisher (истинного) метода:

Если fanout является true, материализованная Publisher будет поддерживать несколько Subscriber с и размером inputBuffer настроенных на данном этапе становится максимальное количество элементов, которое самый быстрый [[org.reactivestreams.Subscriber]] может опережать самый медленный, прежде чем замедлить обработку вниз из-за противодавления.

Моя проблема в том, что я не знаю, как установить это значение InputBuffer для этого этапа. Самое близкое, что я видел, описано в разделе «Отбрасывание широковещания» от this article, но это, похоже, настаивает на использовании Flow DSL. Я считаю, что я не могу использовать DSL из-за моей потребности постоянно прикреплять новых подписчиков.

В результате моя общая скорость потока сдерживается самым медленным абонентом. Связанный аспект того, что я пытаюсь сделать, связан с тем, чтобы разные подписчики работали на разных потоках (без создания явных участников в качестве подписчиков).

ответ

2

Было бы выглядеть примерно так (для Akka Streams 2.0.1):

Sink.asPublisher(true).addAttributes(Attributes.inputBuffer(initialSize, maxSize))