Я ищу способ легко использовать потоки akka-stream.Элегантный способ повторного использования потоков akka-stream
Я лечить Flow я намерен повторно использовать как функцию, так что я хотел бы сохранить свою подпись, как:
Flow[Input, Output, NotUsed]
Теперь, когда я использую этот поток, я хотел бы быть в состоянии «вызов «этот поток и сохранить результат в сторону для дальнейшей обработки.
Итак, я хочу начать с потока, испускающего [Input]
, применить мой поток и продолжить с испусканием потока [(Input, Output)]
.
пример:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
Теперь это невозможно прямым путем, поскольку комбинируя поток с .via()
даст мне Проточная излучающий только [Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
Альтернатива, чтобы сделать мой многоразовый поток испускают [(Input, Output)]
но это требует, чтобы каждый поток вводил свой вход через все этапы и делал мой код плохой.
Так что я придумал объединитель, как это:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
, вещающие вход в поток и, а также в параллельной линии непосредственно -> как к «Zip» где я присоединяю значения к кортежу. Это может быть элегантно применен:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
Все замечательно, но когда данный поток делает «фильтр» операцию - это объединитель застревает и прекращает обработку дальнейших событий.
Я предполагаю, что это связано с поведением «Zip», которое требует, чтобы все субпотоки выполняли то же самое - в моем случае одна ветвь передает данный объект напрямую, так что другой подпоток не может игнорировать этот элемент. filter(), а так как это происходит - поток останавливается, потому что Zip ждет нажатия.
Есть ли лучший способ достичь состава потока? Есть ли что-нибудь, что я могу сделать в моем tupledFlow, чтобы получить желаемое поведение, когда «поток» игнорирует элементы с помощью «фильтра»?
Основная проблема концепции является то, что 'Flow [T, U, ...]' не является функцией. Для каждого входного элемента он может возвращать 0, 1 или более выходных элементов. Он может даже удерживать элементы ввода и использовать их только позже, когда доступно больше данных. По этой причине невозможно предоставить эту функцию в общем случае, если обернутый поток не поддерживает ее сам. Он может работать в общих чертах, если строго соблюдается, что завершенный «поток» - это поток «один к одному», который фактически работает как функция (но фильтр тогда не работает). Обычно использование «mapAsync» в таких случаях является более простым способом. – jrudolph
Да, вы правы. Проблема будет, если мой многоразовый поток вернет N элементов. Заявив, что обернутый «поток» может выводить 0 или 1 элемент для каждого 1 элемента ввода, позволит написать другой семантический оператор «Zip», который будет зависеть от ввода только при завершении выводов «Flow» и пропустить все, Поток' не толкает какой-либо элемент. –
Даже это было бы трудно сделать, потому что вытягивание и толкание обернутого потока происходит не синхронно. Вы не можете проверить, является ли «обтекаемый« поток не нажатием какого-либо элемента »- он может быть просто медленным или буферизованным и т. Д. – jrudolph