Простой подход включает в себя создание арифметической последовательности для нужных индексов и последующее сопоставление ее с потоком. Метод применяет вытащит соответствующие значения:
def f[A](s:Stream[A], n:Int) =
0 until n map (i => Iterator.iterate(0)(_+n) map (s drop i))
f(Stream from 1, 3) map (_ take 4 mkString ",")
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)
Более производительное решение было бы использовать итератор, чей следующий метод просто возвращает значение из потока на следующий индексе в арифметической последовательности:
def comb[A](s:Stream[A], first:Int, step:Int):Iterator[A] = new Iterator {
var i = first - step
def hasNext = true
def next = { i += step; s(i) }
}
def g[A](s:Stream[A], n:Int) =
0 until n map (i => comb(s,i,n))
g(Stream from 1, 3) map (_ take 4 mkString ",")
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)
Вы упомянули, что это было для актеров, хотя - если это Акка, возможно, вы могли бы использовать round-robin router.
ОБНОВЛЕНИЕ: Вышеприведенное (видимо, неправильно) предполагает, что при работе программы может быть больше работы, поэтому hasNext всегда возвращает true; см. ответ Михаила для версии, которая также работает с конечными потоками.
ОБНОВЛЕНИЕ: Михаил определил, что у this answer to a prior StackOverflow question есть ответ, который работает для конечных и бесконечных потоков (хотя это не похоже на то, что он будет работать как с итератором).
Да, я думал об этом. Мне нужно объединить результаты с актерами, и проблема в том, что промежуточные результаты тоже потребляют много памяти, и я хочу, чтобы было несколько актеров и одинаковое количество задач/результатов.Но тем не менее я мог бы модифицировать актеров для повторного использования результатов предыдущих задач, и я буду следовать этому пути, если нет простого способа разделить поток. –