2015-07-31 2 views
12

Я пытаюсь отправить JAR с помощью Spark в кластер YARN из Java-кода. Я использую SparkLauncher представить SparkPi пример:Spark Launcher ждет завершения работы бесконечно

Process spark = new SparkLauncher() 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi") 
    .setMaster("yarn-cluster") 
    .launch(); 
System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

Есть две проблемы:

  1. При подаче в режиме «Нить-кластер», заявка успешно представленные на пряжу и выполняется успешно (это видимый в пользовательском интерфейсе YARN, сообщаемый как SUCCESS, и pi печатается на выходе). Однако подающее приложение никогда не уведомляется о завершении обработки - оно бесконечно зависает после печати «Ожидание до завершения ...». Журнал контейнера можно найти here
  2. При отправке в режиме «пряжа-клиент» приложение не отображается в пользовательском интерфейсе YARN, а подающая приложение зависает в «Ожидание до завершения ...». Когда висящий код убит, приложение отображается в пользовательском интерфейсе YARN и сообщается как SUCCESS, но выход пуст (pi не печатается вне). Журнал контейнера можно найти here

Я попытался запустить приложение подавшее заявку как с Oracle Java 7 и 8.

ответ

14

я получил помощь в списке рассылки Спарк. Ключ должен читать/очищать getInputStream и getErrorStream() в процессе. Детский процесс может заполнить буфер и вызвать тупик - см. Oracle docs regarding Process. Потоки должны быть считаны в отдельных потоках:

Process spark = new SparkLauncher() 
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6") 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch(); 

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input"); 
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 
inputThread.start(); 

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error"); 
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 
errorThread.start(); 

System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

где InputStreamReaderRunnable класс:

public class InputStreamReaderRunnable implements Runnable { 

    private BufferedReader reader; 

    private String name; 

    public InputStreamReaderRunnable(InputStream is, String name) { 
     this.reader = new BufferedReader(new InputStreamReader(is)); 
     this.name = name; 
    } 

    public void run() { 
     System.out.println("InputStream " + name + ":"); 
     try { 
      String line = reader.readLine(); 
      while (line != null) { 
       System.out.println(line); 
       line = reader.readLine(); 
      } 
      reader.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

В моем случае у меня была проблема с классом, поэтому искра немедленно вышла. Итак, если кому-то кажется, что он просто не звонит в ваше искровое приложение, этот ответ также работает. – jmmut

7

Поскольку это старый пост, я хотел бы добавить обновление, которое могло бы помочь кому-либо прочитать этот пост после. В искрах 1.6.0 есть некоторые дополнительные функции в классе SparkLauncher. Что:

def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle 

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

Вы можете запустить приложение из необходимости дополнительных потоков для стандартного вывода и стандартный вывод обработки плюшевого есть хорошие отчеты о состоянии запущенного приложения. Используйте этот код:

val env = Map(
     "HADOOP_CONF_DIR" -> hadoopConfDir, 
     "YARN_CONF_DIR" -> yarnConfDir 
    ) 
    val handler = new SparkLauncher(env.asJava) 
     .setSparkHome(sparkHome) 
     .setAppResource("Jar/location/.jar") 
     .setMainClass("path.to.the.main.class") 
     .setMaster("yarn-client") 
     .setConf("spark.app.id", "AppID if you have one") 
     .setConf("spark.driver.memory", "8g") 
     .setConf("spark.akka.frameSize", "200") 
     .setConf("spark.executor.memory", "2g") 
     .setConf("spark.executor.instances", "32") 
     .setConf("spark.executor.cores", "32") 
     .setConf("spark.default.parallelism", "100") 
     .setConf("spark.driver.allowMultipleContexts","true") 
     .setVerbose(true) 
     .startApplication() 
println(handle.getAppId) 
println(handle.getState) 

Вы можете сохранить завоевание состояния, если искровое приложение пока оно не даст успеха. Информацию о том, как работает Spark Launcher server в версии 1.6.0. см. эту ссылку: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

+3

Я хотел бы подчеркнуть, что это работает только в клиентском режиме. – msemelman

+0

@msemelman Большое спасибо за это разъяснение, застрял на этом. Как вы узнали этот факт? –

+0

Работает в режиме кластера. Я использую Spark-1.6.1 – Tariq

3

Я реализовал с использованием CountDownLatch, и он работает как ожидается. Это для SparkLauncher версии 2.0.1, и он также работает в режиме с прямыми кластерами.

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
+0

Это действительно комментарий, а не ответ. Когда вы достигнете 50 [репутации] (// stackoverflow.com/help/whats-reputation), вы сможете [прокомментировать все сообщения] (// stackoverflow.com/privileges/comment). – dorukayhan

Смежные вопросы