У меня есть случай, когда sink
(или промежуточный flow
) может фактически производить некоторые побочные эффекты, которые необходимо оттолкнуть (или добавить) в Source
. Есть ли способ сделать это с использованием потокового DSL? Я мог бы использовать некоторую блокирующую очередь или ее сортировку для создания source
, а затем передавать данные непосредственно в эту очередь, однако это то, что нарушает абстракции потоков. Возможно, есть лучшее решение, о котором я не знаю?Акка поток - подключить раковину к источнику?
ответ
Как сказал Виктор, вы можете использовать круговые графы.
Например, этап partition
позволяет вам выбрать определенные элементы вашего потока.
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
В этом примере источник in
подключен к одному входу merge
. Затем целые числа проходят через этап partition
, который будет разделяться четным и нечетным.
Даже проходят через поток addOne
, а затем на второй вход merge
(который вернет их обратно на этап partition
).
Нечетные непосредственно идут к раковине out
.
Это позволяет вернуть некоторые значения обратно в график, но это может легко привести к циклу (поэтому здесь важна стадия addOne
, без нее четные числа были бы захвачены на графике).
Reactive-kafka сделал что-то вроде этого (в версии 0.8 по крайней мере): он передает сообщения, потребляемые Раковиной обратно в источник (потребитель Кафки).
KafkaCommitterSink - это реализация. На самом деле это не круговой график, он скорее «обновляет» источник независимо от потока, насколько я понимаю.
- 1. Как подключить представление таблицы к сортируемому источнику данных
- 2. Акка-поток и обработка делегирования актеру
- 3. Как подключить буферизованный входной поток к строке?
- 4. Android подключить к MJPEG поток - Отказано Ошибка
- 5. C++ подключить выходной поток к потоку ввода
- 6. Акка поток для нескольких запросов http
- 7. Как "contramap" Акка-потоки Раковина
- 8. IRandomAccessStreamReference к источнику изображения
- 9. Запуск перезагрузки Акка Акка
- 10. Проверка акка Акка
- 11. Акка Акка не инициализируется
- 12. Акка Акка - разделяющая ответственность
- 13. Акка Акка, Фьючерсы и затворы
- 14. Binding WPF DatagridCell к источнику
- 15. Подключение к источнику данных DDE
- 16. WPF привязка к источнику предка
- 17. PyCharm Перейти к источнику Python
- 18. Подключиться к другому источнику данных
- 19. AspxCombox привязка к источнику данных
- 20. GPS, привязанный к одному источнику
- 21. правильный путь к источнику js
- 22. Назначают Uri к источнику изображения
- 23. Акка HTTP-поток слушатель прекращает обработку databytes через некоторое время
- 24. Получение разницы в самооценке акка Акка
- 25. Акка акка всегда в ожидании будущего
- 26. Акка Акка, возвращающая разные типы вывода
- 27. Как подключить входной поток к потоку вывода в Java?
- 28. Акка-поток, действие триггера, когда я получил несколько предметов?
- 29. Понимание резьбы аккеров Акка
- 30. Акка Акка, не получающая Массив [Byte]?
Если «слив» производит выход, то это не раковина, а скорее поток. –
@ViktorKlang в порядке, поэтому я могу связать «поток» с его «источником» условно, поэтому при определенных обстоятельствах событие, испущенное этим «потоком», пройдет через корень Графа, поскольку оно испускается графом 'Source'? – jdevelop
Да, вы используете GraphDSL и включаете круговые графы. Имейте в виду, что круговые графы с пониженным давлением требуют немного глубокого мышления, чтобы получить право. –