Я использую потоки Akka в контексте, где раковины для одного источника приходят и уходят. По этой причине я создаю издатель из источника и прикрепляя абонент как потребность возникает:Как установить inputBuffer издателя потока akka?
val publisher= mySource.runWith(Sink.publisher(true))
с
publisher.subscribe(subscriber1)// There will be others
Некоторые абонентов будет быстрее, чем другие, и я хотел бы позволяют быстрее работать независимо от самого медленного, по крайней мере, до уровня, разрешенного входным буфером издателя. Этот буфер описывается комментарием на Sink.publisher (истинного) метода:
Если
fanout
являетсяtrue
, материализованнаяPublisher
будет поддерживать несколькоSubscriber
с и размеромinputBuffer
настроенных на данном этапе становится максимальное количество элементов, которое самый быстрый [[org.reactivestreams.Subscriber]] может опережать самый медленный, прежде чем замедлить обработку вниз из-за противодавления.
Моя проблема в том, что я не знаю, как установить это значение InputBuffer для этого этапа. Самое близкое, что я видел, описано в разделе «Отбрасывание широковещания» от this article, но это, похоже, настаивает на использовании Flow DSL. Я считаю, что я не могу использовать DSL из-за моей потребности постоянно прикреплять новых подписчиков.
В результате моя общая скорость потока сдерживается самым медленным абонентом. Связанный аспект того, что я пытаюсь сделать, связан с тем, чтобы разные подписчики работали на разных потоках (без создания явных участников в качестве подписчиков).