2014-12-17 4 views
7

Использование scalaz-потока - это возможность разделить/форвард, а затем снова присоединиться к потоку?Разбиение процесса с использованием скаляз-потока на два дочерних потока

В качестве примера, скажем, у меня есть следующая функция

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) 

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add) 
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add) 

zip(sumOfEven, sumOfOdd).to(someEffectfulFunction) 

С scalaz-потоком, в данном примере результаты будут, как вы ожидаете - кортеж из чисел от 1 до 10 передается на приемник ,

Однако, если мы заменим streamOfNumbers на что-то, что требует ввода-вывода, оно фактически выполнит операцию ввода-вывода дважды.

Использование Topic Я могу создать паб/подпроцесс, который правильно дублирует элементы в потоке, однако он не буферизует - он просто максимально быстро загружает весь источник, независимо от того, как его поглощают.

Я могу обернуть это в ограниченную очередь, однако конечный результат кажется намного более сложным, чем это должно быть.

Есть ли более простой способ разделения потока в потоке scalaz без дублирования операций ввода-вывода из источника?

ответ

6

Также уточнить предыдущий ответ с требованием «расщепления». Решение вашей конкретной проблемы может быть без необходимости разделения потоков:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) 
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map { 
    case even if even % 2 == 0 => right(even) 
    case odd => left(odd) 
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1) 

val evenSink: Sink[Task,Int] = ??? 
val oddSink: Sink[Task,Int] = ??? 

summed 
.drainW(evenSink) 
.to(oddSink) 
+0

Я вижу в вашем ответе общий комбинатор: 'def partition [A] (p: A => Boolean): Process1 [A, A \/A] = process1.lift (a => if (p (a)) right (a) else left (a)) '. Должен ли он быть добавлен? –

+0

Фрэнк Я думаю, что мы не видим более эффективного 'partialTee' для этих случаев использования. Но да, ваше предложение кажется мне прекрасным. Я бы хотел увидеть что-то вроде наблюдения [A, B] (pf: PartialFunction [A, B]) (Раковина [T, B]). –

+0

Спасибо. Мой реальный сценарий выглядит довольно сложным, поэтому его нельзя было упростить таким образом, но это дает мне еще несколько потоков, которые могут решить проблему. –

2

Возможно, вы все еще можете использовать тему и просто заверить, что детские процессы будут подписаны, прежде чем вы будете настаивать на теме.

Однако, пожалуйста, обратите внимание, что это решение не имеет никаких ограничений на него, то есть если вы будете слишком быстро нажимать, вы можете столкнуться с ошибкой OOM.

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = { 
    val topic = async.topic[A] 

    val sub1 = topic.subscribe 
    val sub2 = topic.subscribe 

    merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain)) 
} 
+0

Для динамического числа разрывов будет возвращать тип '' 'Процесс [Задача, Массив [Процесс [Задача, A]]]' '' имеет смысл? Кроме того, как я могу изменить это, чтобы использовать '' 'Queue''', а не' '' Topic''' и предоставлять уникальные элементы потока для каждого дочернего процесса? Спасибо за ваше время –

0

Мне также нужна эта функциональность. Моя ситуация была довольно сложной, не позволяя мне обойти это таким образом.

Благодаря ответу Даниила Сплекака в this thread, я смог получить следующее, чтобы работать. Я улучшил его решение, добавив onHalt, поэтому мое приложение выйдет после завершения Process.

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = { 
    val left = async.boundedQueue[A](limit) 
    val right = async.boundedQueue[A](limit) 

    val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause => 
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) } 
    } 
    val dequeue = Process((left.dequeue, right.dequeue)) 

    enqueue merge dequeue 
} 
Смежные вопросы