2013-06-14 4 views
7

Я хотел бы знать, если есть элегантный способ добиться чего-то вроде этого:разделить поток во многих

val l = Stream.from(1) 

val parts = l.some_function(3) //any number 

parts.foreach(println(_)) 

> 1,4,7,10... 
> 2,5,8,11... 
> 3,6,9,12... 

На самом деле мне нужна такая операция на Streams для распараллеливания - для разделения данных между несколькими субъектами, не загружая весь материал в память.

ответ

4

Answe r от Split a scala list into n interleaving lists полностью соответствует условиям, немного изменен в соответствии с потоками:

def round[A](seq: Iterable[A], n: Int) = { 
    (0 until n).map(i => seq.drop(i).sliding(1, n).flatten) 
} 
round(Stream.from(1),3).foreach(i => println(i.take(3).toList)) 
List(1, 4, 7) 
List(2, 5, 8) 
List(3, 6, 9) 
2

Единственное, что я могу думать:

def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p => 
    x.zipWithIndex.collect { 
    case (e,i) if i % n == p => e 
    } 
} 

Это своего рода некрасиво, потому что каждый из подпотоков должен полностью пройти через основной поток. Но я не думаю, что вы можете смягчить это, сохраняя (очевидную) неизменность.

Рассматривали ли вы отдельные задачи для участников и наличие «распределителя задач», который выполняет именно это?

+0

Да, я думал об этом. Мне нужно объединить результаты с актерами, и проблема в том, что промежуточные результаты тоже потребляют много памяти, и я хочу, чтобы было несколько актеров и одинаковое количество задач/результатов.Но тем не менее я мог бы модифицировать актеров для повторного использования результатов предыдущих задач, и я буду следовать этому пути, если нет простого способа разделить поток. –

0
scala> (1 to 30 grouped 3).toList.transpose foreach println 
List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28) 
List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29) 
List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30) 
+0

Хорошо ли это работает с Streams? – gzm0

+0

измените 'toList' на' toStream' и найдите его самостоятельно ... – sschaef

+1

'Stream.from (1) .grouped (3) .toStream.transpose foreach println' висит в бесконечном цикле ... – gzm0

2

Простой подход включает в себя создание арифметической последовательности для нужных индексов и последующее сопоставление ее с потоком. Метод применяет вытащит соответствующие значения:

def f[A](s:Stream[A], n:Int) = 
    0 until n map (i => Iterator.iterate(0)(_+n) map (s drop i)) 

f(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

Более производительное решение было бы использовать итератор, чей следующий метод просто возвращает значение из потока на следующий индексе в арифметической последовательности:

def comb[A](s:Stream[A], first:Int, step:Int):Iterator[A] = new Iterator { 
    var i  = first - step 
    def hasNext = true 
    def next = { i += step; s(i) } 
} 
def g[A](s:Stream[A], n:Int) = 
    0 until n map (i => comb(s,i,n)) 

g(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

Вы упомянули, что это было для актеров, хотя - если это Акка, возможно, вы могли бы использовать round-robin router.

ОБНОВЛЕНИЕ: Вышеприведенное (видимо, неправильно) предполагает, что при работе программы может быть больше работы, поэтому hasNext всегда возвращает true; см. ответ Михаила для версии, которая также работает с конечными потоками.

ОБНОВЛЕНИЕ: Михаил определил, что у this answer to a prior StackOverflow question есть ответ, который работает для конечных и бесконечных потоков (хотя это не похоже на то, что он будет работать как с итератором).

+0

Создание итераторов выглядит хорошо. Единственное, что в вашей реализации hasNext всегда возвращает true - оно обрабатывает только бесконечные коллекции, для обычного случая код будет более сложным. Я использовал актеров из стандартной библиотеки Scala, но кажется, что Akka стоит ее изучить, спасибо. –

+0

См. Также http://stackoverflow.com/questions/11132788/split-a-scala-list-into-n-interleaving-lists?lq=1 для конечного случая. – AmigoNico

+0

Ой! «Скользящая» функция с шагом сделала трюк. Он подходит и для ручьев. Поэтому можно было бы избежать написания пользовательского итератора. –

0

Я не нашел такой функции в библиотеке Scala, поэтому я модифицировал вариант итератора ответа AmigoNico. В коде рассматриваются как конечные, так и бесконечные коллекции.

def splitRoundRobin[A](s: Iterable[A], n: Int) = { 
    def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] { 
     val iter = s.iterator 
     var nextElem: Option[A] = iterToNext(first) 
     def iterToNext(elemsToSkip: Int) = { 
     iterToNextRec(None, elemsToSkip) 
     } 
     def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match { 
     case 0 => next 
     case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None 
     } 
     def hasNext = nextElem.isDefined || { 
     nextElem = iterToNext(step) 
     nextElem.isDefined 
     } 
     def next = { 
     var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next") 
     nextElem = None 
     result 
     } 
    } 
    0 until n map (i => comb(s, i, n)) 
    } 

    splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(",")) 
// Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 

    splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(",")) 
//> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 
0
def roundRobin[T](n: Int, xs: Stream[T]) = { 
    val groups = xs.grouped(n).map(_.toIndexedSeq).toStream 
    (0 until n).map(i => groups.flatMap(_.lift(i))) 
} 

работает в бесконечном случае:

scala> roundRobin(3, Stream.from(0)).map(_.take(3).force.mkString).mkString(" ") 
res6: String = 036 147 258 

с использованием flatMap/lift вместо простого map/apply означает, что он работает, даже если вход конечен и длина не кратна от n:

scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ") 
res5: String = 0369 147 258 
Смежные вопросы