Использование scalaz-потока - это возможность разделить/форвард, а затем снова присоединиться к потоку?Разбиение процесса с использованием скаляз-потока на два дочерних потока
В качестве примера, скажем, у меня есть следующая функция
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip(sumOfEven, sumOfOdd).to(someEffectfulFunction)
С scalaz-потоком, в данном примере результаты будут, как вы ожидаете - кортеж из чисел от 1 до 10 передается на приемник ,
Однако, если мы заменим streamOfNumbers
на что-то, что требует ввода-вывода, оно фактически выполнит операцию ввода-вывода дважды.
Использование Topic
Я могу создать паб/подпроцесс, который правильно дублирует элементы в потоке, однако он не буферизует - он просто максимально быстро загружает весь источник, независимо от того, как его поглощают.
Я могу обернуть это в ограниченную очередь, однако конечный результат кажется намного более сложным, чем это должно быть.
Есть ли более простой способ разделения потока в потоке scalaz без дублирования операций ввода-вывода из источника?
Я вижу в вашем ответе общий комбинатор: 'def partition [A] (p: A => Boolean): Process1 [A, A \/A] = process1.lift (a => if (p (a)) right (a) else left (a)) '. Должен ли он быть добавлен? –
Фрэнк Я думаю, что мы не видим более эффективного 'partialTee' для этих случаев использования. Но да, ваше предложение кажется мне прекрасным. Я бы хотел увидеть что-то вроде наблюдения [A, B] (pf: PartialFunction [A, B]) (Раковина [T, B]). –
Спасибо. Мой реальный сценарий выглядит довольно сложным, поэтому его нельзя было упростить таким образом, но это дает мне еще несколько потоков, которые могут решить проблему. –