2014-11-18 2 views
7

У меня есть простая программа:Объединение входных процессов scalaz-поток, кажется, «ждать» на стандартный ввод

import scalaz._ 
import stream._ 

object Play extends App { 
    val in1 = io.linesR("C:/tmp/as.txt") 
    val in2 = io.linesR("C:/tmp/bs.txt") 

    val p = (in1 merge in2) to io.stdOutLines 
    p.run.run 
} 

Файл as.txt содержит пять a с и файл bs.txt содержит 3 b с. Я вижу этот вид продукции:

a 
b 
b 
a 
a 
b 
a 
a 
a 

Однако, когда я изменить объявление о in2 следующим образом:

val in2 = io.stdInLines 

Тогда я получаю то, что я думаю, что это неожиданное поведение. Согласно документации 1, программа должна извлекать данные не определенно из каждого потока в зависимости от того, какой поток быстрее загружает материал. Это должно означать, что я вижу, что на консоль сразу же печатается куча a, но это совсем не то, что происходит вообще.

Действительно, пока я не нажимаю ENTER, ничего не происходит. Совершенно очевидно, что поведение очень похоже на то, что я ожидал бы, если бы я случайно выбрал поток для получения следующего элемента, а затем, если бы этот поток блокировался, объединенные блоки процессов тоже (даже если другой поток содержит данные).

Что происходит?

1 - ну, хорошо, там очень мало документации, но Dan Spiewak очень четко в his talk сказал, что было бы захватить тот, кто был первым, чтобы предоставить данные

ответ

6

Проблема заключается в осуществлении stdInLines. Он блокирует, никогда не Task.fork s нить.

Попробуйте изменить implentation из stdInLines этого:

def stdInLines: Process[Task,String] = 
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine()) 
    .getOrElse(throw Cause.Terminated(Cause.End)) 
}) 

оригинальный io.stdInLines работает в readLine() в том же потоке, так что всегда ждет там до тех пор, пока что-то типа.

Смежные вопросы