2016-12-28 2 views
6

Я ищу способ легко использовать потоки akka-stream.Элегантный способ повторного использования потоков akka-stream

Я лечить Flow я намерен повторно использовать как функцию, так что я хотел бы сохранить свою подпись, как:

Flow[Input, Output, NotUsed]

Теперь, когда я использую этот поток, я хотел бы быть в состоянии «вызов «этот поток и сохранить результат в сторону для дальнейшей обработки.

Итак, я хочу начать с потока, испускающего [Input], применить мой поток и продолжить с испусканием потока [(Input, Output)].

пример:

val s: Source[Int, NotUsed] = Source(1 to 10) 

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) 

val via: Source[(Int, String), NotUsed] = ??? 

Теперь это невозможно прямым путем, поскольку комбинируя поток с .via() даст мне Проточная излучающий только [Output]

val via: Source[String, NotUsed] = s.via(stringIfEven) 

Альтернатива, чтобы сделать мой многоразовый поток испускают [(Input, Output)] но это требует, чтобы каждый поток вводил свой вход через все этапы и делал мой код плохой.

Так что я придумал объединитель, как это:

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[In](2)) 
    val zip = b.add(Zip[In, Out]) 

    broadcast.out(0) ~> zip.in0 
    broadcast.out(1) ~> flow ~> zip.in1 

    FlowShape(broadcast.in, zip.out) 
}) 

}

, вещающие вход в поток и, а также в параллельной линии непосредственно -> как к «Zip» где я присоединяю значения к кортежу. Это может быть элегантно применен:

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven)) 

Все замечательно, но когда данный поток делает «фильтр» операцию - это объединитель застревает и прекращает обработку дальнейших событий.

Я предполагаю, что это связано с поведением «Zip», которое требует, чтобы все субпотоки выполняли то же самое - в моем случае одна ветвь передает данный объект напрямую, так что другой подпоток не может игнорировать этот элемент. filter(), а так как это происходит - поток останавливается, потому что Zip ждет нажатия.

Есть ли лучший способ достичь состава потока? Есть ли что-нибудь, что я могу сделать в моем tupledFlow, чтобы получить желаемое поведение, когда «поток» игнорирует элементы с помощью «фильтра»?

+0

Основная проблема концепции является то, что 'Flow [T, U, ...]' не является функцией. Для каждого входного элемента он может возвращать 0, 1 или более выходных элементов. Он может даже удерживать элементы ввода и использовать их только позже, когда доступно больше данных. По этой причине невозможно предоставить эту функцию в общем случае, если обернутый поток не поддерживает ее сам. Он может работать в общих чертах, если строго соблюдается, что завершенный «поток» - это поток «один к одному», который фактически работает как функция (но фильтр тогда не работает). Обычно использование «mapAsync» в таких случаях является более простым способом. – jrudolph

+0

Да, вы правы. Проблема будет, если мой многоразовый поток вернет N элементов. Заявив, что обернутый «поток» может выводить 0 или 1 элемент для каждого 1 элемента ввода, позволит написать другой семантический оператор «Zip», который будет зависеть от ввода только при завершении выводов «Flow» и пропустить все, Поток' не толкает какой-либо элемент. –

+0

Даже это было бы трудно сделать, потому что вытягивание и толкание обернутого потока происходит не синхронно. Вы не можете проверить, является ли «обтекаемый« поток не нажатием какого-либо элемента »- он может быть просто медленным или буферизованным и т. Д. – jrudolph

ответ

3

два возможных подхода - с дискуссионной элегантность - это:

1) избегать использования ступеней фильтрации, мутирует свой фильтр в Flow[Int, Option[Int], NotUsed]. Таким образом, вы можете применить свою оберточную ленту вокруг всего графика, как и ваш первоначальный план. Тем не менее, код выглядит более испорченным, и добавляется накладные расходы, проходя вокруг None с.

val stringIfEvenOrNone = Flow[Int].map{ 
    case x if x % 2 == 0 => Some(x.toString) 
    case _ => None 
} 

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{ 
    case (num, Some(str)) => (num,str) 
} 

2) отделяет фильтрацию и трансформирующие стадии, и применять те фильтрации перед вашими проносясь обертками. Вероятно, более легкий и лучший компромисс.

val filterEven = Flow[Int].filter(_ % 2 == 0) 

val toString = Flow[Int].map(_.toString) 

val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString)) 

EDIT

3) Проводка другого решения здесь для ясности, как в дискуссии в комментариях.

Эта обтекатель потоков позволяет испускать каждый элемент из заданного потока в сочетании с исходным элементом ввода, который сгенерировал его. Он работает для любого внутреннего потока (излучая 0, 1 или более элементов для каждого входа).

def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] = 
    Flow[In].flatMapConcat(in => Source.single(in).via(flow).map(out => in -> out)) 
+0

Да, опция 0) является чем-то значительно менее вредным для всех. Однако всегда опасно, когда у вас есть api, который доступен, но * не должен * использоваться при вызове в обертке 'TupledFlow'. Не то, что мы ожидаем от композиционных фрагментов кода. –

+0

Я думаю, что конечным решением является выделенный оператор, который будет вести себя так, как я описал в http://stackoverflow.com/questions/41366030/elegant-way-of-reusing-akka-stream-flows#comment69952575_41366030 –

+0

Согласен, учитывая сильные предположения, это TupledFlow, безусловно, не то, что я бы разделил - например - автономная библиотека. Но это все равно будет иметь смысл в качестве этапа многоразового использования в вашем проекте. –

1

я пришел с реализацией TupledFlow, который работает, когда обернутый Flow использует filter() или mapAsync() и когда обернутый Flow испускает 0,1 или N элементов для каждого входа:

def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = { 
    val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In => 
    val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq) 
    val bothFuture: Future[Seq[(In,Out)]] = outFuture.map(seqOfOut => seqOfOut.map((in,_))) 
    bothFuture 
    } 
    val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable]) 
    onlyDefined 
} 

единственным недостатком я см. здесь, что я создаю и материализую поток для одного объекта - просто чтобы получить понятие «вызов потока как функции».

Я не проводил никаких проверок эффективности, однако, поскольку тяжелая работа выполняется в обернутом Flow, который выполнен в будущем, я считаю, что это будет нормально.

Эта реализация проходит все тесты из https://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala

+2

Если это то, что вам нужно, вы, возможно, сойдете с 'def tupledFlow [In, Out] (поток : Flow [In, Out, _]): Flow [In, (In, Out), NotUsed] = { Поток [In] .flatMapConcat (in => Source.single (in) .via (flow) .map (out => in -> out)) } ' –

+0

Да, я думаю, это то, что мне нужно. Он удовлетворяет каждую потребность, и он будет правильно обрабатывать каждое поведение завернутого потока. –

+0

Ваша реализация краткая и минимальная. Спасибо за это. Думаю, это правильный ответ на мой вопрос –

Смежные вопросы