2008-09-24 2 views
4

Я пытаюсь запустить процесс и делать материал с его потоками ввода, вывода и ошибок. Очевидный способ сделать это - использовать что-то вроде select(), но единственное, что я могу найти в Java, это Selector.select(), которое принимает Channel. Это не представляется возможным получить Channel от InputStream или OutputStream (FileStream имеет метод getChannel(), но это не помогает здесь)Как обрабатывать несколько потоков в Java?

Итак, вместо этого я написал код, чтобы опрашивать все потоки:

while(!out_eof || !err_eof) 
{ 
    while(out_str.available()) 
    { 
     if((bytes = out_str.read(buf)) != -1) 
     { 
      // Do something with output stream 
     } 
     else 
      out_eof = true; 
    } 
    while(err_str.available()) 
    { 
     if((bytes = err_str.read(buf)) != -1) 
     { 
      // Do something with error stream 
     } 
     else 
      err_eof = true; 
    } 
    sleep(100); 
} 

который работает, за исключением того, что он никогда не заканчивается. Когда один из потоков достигает конца файла, available() возвращает ноль, поэтому read() не вызывается, и мы никогда не получаем возврат -1, который указывает на EOF.

Одним из решений является неблокирующий способ обнаружения EOF. Я не вижу его нигде в документах. Или есть лучший способ сделать то, что я хочу сделать?

Я вижу этот вопрос здесь: link text и, хотя это не совсем то, что я хочу, я, вероятно, могу использовать эту идею, нерестовые отдельные потоки для каждого потока, для конкретной проблемы, у меня сейчас. Но, конечно, это не единственный способ сделать это? Разумеется, должен быть способ чтения из нескольких потоков без использования потока для каждого?

ответ

4

Как вы сказали, решение outlined in this Answer - это традиционный способ чтения как stdout, так и stderr из процесса. Поток для потока - путь, хотя это немного раздражает.

2

Вам действительно нужно идти по пути нереста нити для каждого потока, который вы хотите контролировать. Если ваш случай использования позволяет комбинировать как stdout, так и stderr рассматриваемого процесса, вам нужен только один поток, в противном случае нужны два.

Мне потребовалось некоторое время, чтобы понять это в одном из наших проектов, где мне нужно запустить внешний процесс, выполнить его вывод и что-то сделать с ним, в то же время найти ошибки и завершить процесс, а также может прекратить его, когда пользователь приложения java отменяет операцию.

Я создал довольно простой класс, чтобы инкапсулировать смотреть партию, бег() метод выглядит примерно так:

public void run() { 
    BufferedReader tStreamReader = null; 
    try { 
     while (externalCommand == null && !shouldHalt) { 
      logger.warning("ExtProcMonitor(" 
          + (watchStdErr ? "err" : "out") 
          + ") Sleeping until external command is found"); 
      Thread.sleep(500); 
     } 
     if (externalCommand == null) { 
      return; 
     } 
     tStreamReader = 
       new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream() 
         : externalCommand.getInputStream())); 
     String tLine; 
     while ((tLine = tStreamReader.readLine()) != null) { 
      logger.severe(tLine); 
      if (filter != null) { 
       if (filter.matches(tLine)) { 
        informFilterListeners(tLine); 
        return; 
       } 
      } 
     } 
    } catch (IOException e) { 
     logger.logExceptionMessage(e, "IOException stderr"); 
    } catch (InterruptedException e) { 
     logger.logExceptionMessage(e, "InterruptedException waiting for external process"); 
    } finally { 
     if (tStreamReader != null) { 
      try { 
       tStreamReader.close(); 
      } catch (IOException e) { 
       // ignore 
      } 
     } 
    } 
} 

На вызывающей стороне это выглядит следующим образом:

Thread tExtMonitorThread = new Thread(new Runnable() { 

     public void run() { 
      try { 
       while (externalCommand == null) { 
        getLogger().warning("Monitor: Sleeping until external command is found"); 
        Thread.sleep(500); 
        if (isStopRequested()) { 
         getLogger() 
           .warning("Terminating external process on user request"); 
         if (externalCommand != null) { 
          externalCommand.destroy(); 
         } 
         return; 
        } 
       } 
       int tReturnCode = externalCommand.waitFor(); 
       getLogger().warning("External command exited with code " + tReturnCode); 
      } catch (InterruptedException e) { 
       getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit"); 
      } 
     } 
    }, "ExtCommandWaiter"); 

    ExternalProcessOutputHandlerThread tExtErrThread = 
      new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true); 
    ExternalProcessOutputHandlerThread tExtOutThread = 
      new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true); 
    tExtMonitorThread.start(); 
    tExtOutThread.start(); 
    tExtErrThread.start(); 
    tExtErrThread.setFilter(new FilterFunctor() { 

     public boolean matches(Object o) { 
      String tLine = (String)o; 
      return tLine.indexOf("Error") > -1; 
     } 
    }); 

    FilterListener tListener = new FilterListener() { 
     private boolean abortFlag = false; 

     public boolean shouldAbort() { 
      return abortFlag; 
     } 

     public void matched(String aLine) { 
      abortFlag = abortFlag || (aLine.indexOf("Error") > -1); 
     } 

    }; 

    tExtErrThread.addFilterListener(tListener); 
    externalCommand = new ProcessBuilder(aCommand).start(); 
    tExtErrThread.setProcess(externalCommand); 
    try { 
     tExtMonitorThread.join(); 
     tExtErrThread.join(); 
     tExtOutThread.join(); 
    } catch (InterruptedException e) { 
     // when this happens try to bring the external process down 
     getLogger().severe("Aborted because auf InterruptedException."); 
     getLogger().severe("Killing external command..."); 
     externalCommand.destroy(); 
     getLogger().severe("External command killed."); 
     externalCommand = null; 
     return -42; 
    } 
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue(); 

    externalCommand = null; 
    try { 
     getLogger().warning("command exit code: " + tRetVal); 
    } catch (IllegalThreadStateException ex) { 
     getLogger().warning("command exit code: unknown"); 
    } 
    return tRetVal; 

К сожалению, я не обязательно для автономного запускаемого примера, но, возможно, это помогает. Если бы мне пришлось это сделать снова, я бы еще раз посмотрел на использование метода Thread.interrupt() вместо самозапускаемого флажка остановки (ум, чтобы объявить его volatile!), Но я оставляю это в другое время. :)

Смежные вопросы