2013-11-24 2 views
0

Я написал следующий Scala-код для вычисления расстояния матрицы:Scala вложенными для перебрать Streams

def dist(fasta: Stream[FastaRecord], f: (FastaRecord, FastaRecord) => Int) = { 
    val inF = fasta.par 
    for (i <- inF; j <- inF) 
    yield (f(i, j)) 
} 

Этот код прекрасно работает в том смысле, что я получаю отличную параллельность. К сожалению, я выполняю в два раза больше работы, чем мне нужно, поскольку f (i, j) совпадает с f (j, i). То, что я хочу сделать, - запустить j в i + 1 в потоке. Я могу сделать это с помощью индексов:

for (i <- 0 until inF.length - 1; j <- i+1 until inF.length) 
    yield(f(inF(i), inF(j))) 

Однако просить inF.length я слышал не хорошо на поток, и это не дает мне параллельность.

Я думаю, что должен быть способ сделать эту итерацию, однако я еще ничего не придумал.

спасибо! джим

+0

Streams кешировать их результаты, как в [этом примере расчетов фибоначчи] (http://www.derekwyatt.org/2011/07/29/understanding-scala-streams-through-fibonacci/). Таким образом, после того, как первый раз «j» пересекает весь поток, он должен быть таким же быстрым, как обычный список. Смысл я думаю, что вам лучше оценить ваш поток с длиной 'в начале, а затем сделать только половину числа параллельных вычислений с вашей функцией' f'. Я комментирую это, чтобы вы понимали производительность Streams __after__, они были повторены один раз. –

+1

Я думаю, что большая проблема здесь в том, что потоки не оптимизированы для случайного доступа, поэтому операции 'inF (i)' и 'inF (j)' будут медленными. – DaoWen

ответ

1

Я думаю, что с помощью zipWithIndex могли бы получить то, что вы ищете:

def dist(fasta: Stream[FastaRecord], f: (FastaRecord, FastaRecord) => Int) = { 
    val inF = fasta.zipWithIndex.par 
    for ((x, i) <- inF; (y, j) <- inF; if i <= j) 
    yield f(x, y) 
} 

Путем фильтрации i <= j вы можете устранить повторяющиеся (зеркальные) случаев. Тем не менее, я получаю предупреждение, когда я компилирую это:

warning: `withFilter' method does not yet exist on scala.collection.parallel.immutable.ParSeq[(FastaRecord, Int)], using `filter' method instead 

Я не думаю, что это действительно проблема, но я не знаю, как подавить ошибку ...

Смежные вопросы