2015-01-22 4 views
2

У меня есть Iterable [String], и я хочу передать это внешнему процессу и вернуть Iterable [String] для вывода.Поток ввода для внешнего процесса в Scala

Я чувствую, как это должно работать, как он компилирует

import scala.sys.process._ 

object PipeUtils { 
    implicit class IteratorStream(s: TraversableOnce[String]) { 
    def pipe(cmd: String) = s.toStream.#>(cmd).lines 
    def run(cmd: String) = s.toStream.#>(cmd).! 
    } 
} 

Однако Scala пытается выполнить содержимое с вместо передать их в стандарт. Может кто-нибудь пожалуйста, скажите мне, что я делаю неправильно ?

UPDATE:

Я думаю, что моя первоначальная проблема заключалась в том, что s.toStream был быть неявно преобразован в ProcessBuilder, а затем выполняется. Это неверно, так как это вход для процесса.

У меня появилось следующее решение. Это кажется очень взломанным и неправильным, но, похоже, сейчас работает. Я не пишу это как ответ, потому что я чувствую, что ответ должен быть одной строкой, а не такой гигантской.

object PipeUtils { 

    /** 
    * This class feels wrong. I think that for the pipe command it actually loads all of the output 
    * into memory. This could blow up the machine if used wrong, however, I cannot figure out how to get it to 
    * work properly. Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala 
    * will get some good responses. 
    * @param s 
    */ 
    implicit class IteratorStream(s: TraversableOnce[String]) { 

    val in = (in: OutputStream) => { 
     s.foreach(x => in.write((x + "\n").getBytes)) 
     in.close 
    } 

    def pipe(cmd: String) = { 
     val output = ListBuffer[String]() 
     val io = new ProcessIO(in, 
     out => {Source.fromInputStream(out).getLines.foreach(output += _)}, 
     err => {Source.fromInputStream(err).getLines.foreach(println)}) 

     cmd.run(io).exitValue 
     output.toIterable 
    } 

    def run(cmd: String) = { 
     cmd.run(BasicIO.standard(in)).exitValue 
    } 
    } 
} 

EDIT

Мотивация это происходит от использования функции .pipe искры на РДУ. Я хочу, чтобы эта точно такая же функциональность в моем локальном коде.

+0

Вы правы неявного преобразования из 's.toStream' к ProcessBuilder. В любом случае, не будет 'def pipe (cmd: String): Stream [String] = (cmd +: s.toSeq) .lineStream' тоже работает или я что-то упускаю? – edi

+0

Как это работает для бесконечного потока в качестве входных данных? Или в случае реального мира очень большой поток, который слишком велик, чтобы вписаться в Seq? – Jon

+0

Хорошо, мне было непонятно, что ваш вход потенциально может быть очень большим. – edi

ответ

3

Вот решение, демонстрирующее, как написать код процесса, чтобы он передавал как входные, так и выходные данные. Ключ должен создать java.io.PipedInputStream, который передается на вход процесса. Этот поток асинхронно заполняется из итератора с помощью java.io.PipedOutputStream. Очевидно, не стесняйтесь изменять тип ввода неявного класса на Iterable.

Вот итератор, используемый для показа этих работ.

/** 
* An iterator with pauses used to illustrate data streaming to the process to be run. 
*/ 
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A) 
extends Iterator[A] { 
    private[this] var current = zero 
    def hasNext = current != until 
    def next(): A = { 
    if (!hasNext) throw new NoSuchElementException 
    val r = current 
    current = subsequent(current) 
    Thread.sleep(pauseMs) 
    r 
    } 
} 

Вот фактический код, который вы хотите

import java.io.PipedOutputStream 
import java.io.PipedInputStream 
import java.io.InputStream 
import java.io.PrintWriter 

// For process stuff 
import scala.sys.process._ 
import scala.language.postfixOps 

// For asynchronous stream writing. 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 

/** 
* A streaming version of the original class. This does not block to wait for the entire 
* input or output to be constructed. This allows the process to get data ASAP and allows 
* the process to return information back to the scala environment ASAP. 
* 
* NOTE: Don't forget about error handling in the final production code. 
*/ 
implicit class X(it: Iterator[String]) { 
    def pipe(cmd: String) = cmd #< iter2is(it) lineStream 

    /** 
    * Convert an iterator to an InputStream for use in the pipe function. 
    * @param it an iterator to convert 
    */ 
    private[this] def iter2is[A](it: Iterator[A]): InputStream = { 
    // What is written to the output stream will appear in the input stream. 
    val pos = new PipedOutputStream 
    val pis = new PipedInputStream(pos) 
    val w = new PrintWriter(pos, true) 

    // Scala 2.11 (scala 2.10, use 'future'). Executes asynchrously. 
    // Fill the stream, then close. 
    Future { 
     it foreach w.println 
     w.close 
    } 

    // Return possibly before pis is fully written to. 
    pis 
    } 
} 

Окончательного вызов будет показывать дисплей от 0 до 9 и пауз в течение 3 секунд между отображением каждого номера (второй паузой на стороне лестницы, 1 секунда на стороне сценария оболочки).

// echo-sleep.sh is the same script as in my previous post 
new PausingIterator(0, 10, 2000)(_ + 1) 
    .map(_.toString) 
    .pipe("echo-sleep.sh") 
    .foreach(println) 

Выход

0   [ pause 3 secs ] 
1   [ pause 3 secs ] 
... 
8   [ pause 3 secs ] 
9   [ pause 3 secs ] 
+0

Спасибо @deaktator, это сработало отлично. – Jon

+0

Следуя этому примеру, нам удалось сделать потоковое IO очень эффективным. Ключ должен читать из другого потока, так как lineStream в противном случае является блокирующим вызовом –

4

Предполагая, что scala 2.11+, вы должны использовать lineStream, как предложено компанией @edi. Причина в том, что вы получаете потоковый ответ, поскольку он становится доступным вместо пакетного ответа. Скажем, у меня есть сценарий оболочки echo-sleep.sh:

#/usr/bin/env bash 
# echo-sleep.sh 
while read line; do echo $line; sleep 1; done 

и мы хотим вызвать его из Скале, используя код, как в следующем:

import scala.sys.process._ 
import scala.language.postfixOps 
import java.io.ByteArrayInputStream 

implicit class X(in: TraversableOnce[String]) { 
    // Don't do the BAOS construction in real code. Just for illustration. 
    def pipe(cmd: String) = 
    cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream 
} 

Тогда, если мы делаем окончательный вызов, как:

1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println 

число в последовательности отображается в STDOUT каждые 1 секунду. Если вы буферизируете и конвертируете в Iterable, как в вашем примере, вы потеряете эту отзывчивость.

+0

Как это будет работать, когда вход очень большой? Это потоковый вход? – Jon

+0

Кроме того, я знаю, вы предложили не использовать BOAS для реального, так как бы вы сделали это по-настоящему? – Jon

Смежные вопросы