2016-02-12 2 views
1

В настоящее время я использую Java (и Yarn Client) для отправки заданий в кластер пряжи (в среде Ubunutu/Linux). При отправке программ на Java все работает отлично. При отправке программы на Python она, похоже, останавливается в состоянии ACCEPTED и, в конечном итоге, выдается ошибка.Проблема Подача приложения Python на пряжу из кода Java

Вот код, который я использую, чтобы представить программу:

import org.apache.spark.SparkConf; 
import org.apache.spark.deploy.yarn.Client; 
import org.apache.spark.deploy.yarn.ClientArguments; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.log4j.Logger; 

/** 
* This class submits a SparkPi to a YARN from a Java client (as opposed 
* to submitting a Spark job from a shell command line using spark-submit). 
* 
* To accomplish submitting a Spark job from a Java client, we use 
* the org.apache.spark.deploy.yarn.Client class described below: 
* 
Usage: org.apache.spark.deploy.yarn.Client [options] 
Options: 
    --jar JAR_PATH   Path to your application's JAR file (required in yarn-cluster mode) 
    --class CLASS_NAME  Name of your application's main class (required) 
    --primary-py-file  A main Python file 
    --arg ARG    Argument to be passed to your application's main class. 
        Multiple invocations are possible, each will be passed in order. 
    --num-executors NUM  Number of executors to start (Default: 2) 
    --executor-cores NUM  Number of cores per executor (Default: 1). 
    --driver-memory MEM  Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) 
    --driver-cores NUM  Number of cores used by the driver (Default: 1). 
    --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) 
    --name NAME    The name of your application (Default: Spark) 
    --queue QUEUE   The hadoop queue to use for allocation requests (Default: 'default') 
    --addJars jars   Comma separated list of local jars that want SparkContext.addJar to work with. 
    --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. 
    --files files   Comma separated list of files to be distributed with the job. 
    --archives archives  Comma separated list of archives to be distributed with the job. 

    How to call this program example: 

    export SPARK_HOME="/opt/spark/spark-1.6.0" 
    java -DSPARK_HOME="$SPARK_HOME" org.dataalgorithms.client.SubmitYARNJobFromJava 10 
*/ 
public class SubmitPyYARNJobFromJava { 

    public static void main(String[] args) throws Exception { 
    long startTime = System.currentTimeMillis(); 

    // this is passed to SparkPi program 
    String slices = args[0]; 
    // String slices = "15"; 


    // String SPARK_HOME = System.getProperty("SPARK_HOME"); 
    String SPARK_HOME = "/opt/spark/spark-1.6.0"; 

    // 
    pi(SPARK_HOME, slices); // ... the code being measured ... 
    // 
    long elapsedTime = System.currentTimeMillis() - startTime; 
    } 

    static void pi(String SPARK_HOME, String slices) throws Exception { 
    //  
    String[] args = new String[]{ 
     // application name 
     "--name", 
     "SparkPi-Python", 

     // Python Program 
     "--primary-py-file", 
     SPARK_HOME + "/examples/src/main/python/pi.py", 

     // number of executors 
     "--num-executors", 
     "2", 

     // driver memory 
     "--driver-memory", 
     "512m", 

     // executor memory 
     "--executor-memory", 
     "512m", 

     // executor cores 
     "--executor-cores", 
     "2", 

     // argument 1 to my Spark program 
     "--arg", 
     slices, 

     // argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object) 
     "--arg", 
     "yarn-cluster" 
    }; 

    Configuration config = new Configuration(); 
    // 
    System.setProperty("SPARK_YARN_MODE", "true"); 
    // 
    SparkConf sparkConf = new SparkConf(); 
    ClientArguments clientArgs = new ClientArguments(args, sparkConf); 
    Client client = new Client(clientArgs, config, sparkConf); 

    client.run(); 
    // done! 
    } 
} 

Я вызова кода из командной строки следующим образом:

java -cp *:. SubmitPyYARNJobFromJava 10 

Программа Pi.py является стандартная программа, поставляемая с Spark-1.6.0, созданная для Hadoop-2.6.0.

from __future__ import print_function 
# 
# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements. See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License. You may obtain a copy of the License at 
# 
# http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
# 

import sys 
from random import random 
from operator import add 

from pyspark import SparkContext 


if __name__ == "__main__": 
    """ 
    Usage: pi [partitions] 
    """ 
    sc = SparkContext(appName="PythonPi") 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
    n = 100000 * partitions 

    def f(_): 
    x = random() * 2 - 1 
    y = random() * 2 - 1 
    return 1 if x ** 2 + y ** 2 < 1 else 0 

    count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) 
    print("Pi is roughly %f" % (4.0 * count/n)) 

    sc.stop() 

Когда работа подана, похоже, что она будет отправлена ​​правильно. Он добирается до принятого состояния, а затем он останавливается.

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
16/02/12 16:15:24 INFO Client: Requesting a new application from cluster with 1 NodeManagers 
16/02/12 16:15:24 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container) 
16/02/12 16:15:24 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 
16/02/12 16:15:24 INFO Client: Setting up container launch context for our AM 
16/02/12 16:15:24 INFO Client: Setting up the launch environment for our AM container 
16/02/12 16:15:24 INFO Client: Preparing resources for our AM container 
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/home/shunley/workspace/rabbitmq_java_rpc/spark-assembly-1.6.0-hadoop2.6.0.jar 
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/tmp/spark-7dbbb73f-e5bc-4fc1-a535-02a60cb68b16/__spark_conf__6244658246692860568.zip 
16/02/12 16:15:25 INFO SecurityManager: Changing view acls to: shunley 
16/02/12 16:15:25 INFO SecurityManager: Changing modify acls to: shunley 
16/02/12 16:15:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shunley); users with modify permissions: Set(shunley) 
16/02/12 16:15:26 INFO Client: Submitting application 8 to ResourceManager 
16/02/12 16:15:26 INFO YarnClientImpl: Submitted application application_1455307995259_0008 
16/02/12 16:15:27 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:27 INFO Client: 
    client token: N/A 
    diagnostics: N/A 
    ApplicationMaster host: N/A 
    ApplicationMaster RPC port: -1 
    queue: default 
    start time: 1455311726233 
    final status: UNDEFINED 
    tracking URL: http://shunley-VirtualBox:8088/proxy/application_1455307995259_0008/ 
    user: shunley 
16/02/12 16:15:28 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:29 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:30 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:31 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:32 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:33 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 
16/02/12 16:15:34 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED) 

Тогда наконец FAILS и ошибка, которая дается следующая:

16/02/12 16:43:56 INFO Client: Application report for application_1455307995259_0009 (state: FAILED) 
16/02/12 16:43:56 INFO Client: 
    client token: N/A 
    diagnostics: Application application_1455307995259_0009 failed 2 times due to AM Container for appattempt_1455307995259_0009_000002 exited with exitCode: 10 
For more detailed output, check application tracking page:http://shunley-VirtualBox:8088/proxy/application_1455307995259_0009/Then, click on links to logs of each attempt. 
Diagnostics: Exception from container-launch. 
Container id: container_1455307995259_0009_02_000001 
Exit code: 10 
Stack trace: ExitCodeException exitCode=10: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) 
    at org.apache.hadoop.util.Shell.run(Shell.java:455) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) 
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) 
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 


Container exited with a non-zero exit code 10 
Failing this attempt. Failing the application. 
    ApplicationMaster host: N/A 
    ApplicationMaster RPC port: -1 
    queue: default 
    start time: 1455313224060 
    final status: FAILED 
    tracking URL: http://shunley-VirtualBox:8088/cluster/app/application_1455307995259_0009 
    user: shunley 
Exception in thread "main" org.apache.spark.SparkException: Application application_1455307995259_0009 finished with failed status 
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1029) 
    at SubmitPyYARNJobFromJava.pi(SubmitPyYARNJobFromJava.java:101) 
    at SubmitPyYARNJobFromJava.main(SubmitPyYARNJobFromJava.java:52) 
16/02/12 16:43:56 INFO ShutdownHookManager: Shutdown hook called 
16/02/12 16:43:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-f5f15d4f-7383-4a97-b2ff-5734148d8a29 

Я пытался искать везде на гугле что-нибудь подобное, но безрезультатно. Кто-нибудь видел что-то подобное раньше? Мне нужно иметь возможность отправлять приложения python и java в Yarn через код. Пока что Python - единственный, который не работает. Я могу представить Java и Scala (еще не пробовал R), но Python, который использует наши Ученые Data для машинного обучения, не работает.

Любая помощь или указатели, которые помогут, будут НАСТОЯТЕЛЬНО оценены!

Спасибо.

ответ

1

Аргумент вашего клиента пропустил «--class» и «--py-files».

Для отправки сценариев python класс должен быть «org.apache.spark.deploy.PythonRunner». Кроме того, также необходимо подключить библиотеку pyspark и py4j, чтобы драйвер мог правильно исправить искру.

Таким образом, ваш клиент конфигурации должен выглядеть следующим образом:

String[] args = new String[]{ 
    // application name 
    "--name", 
    "SparkPi-Python", 

    "--class", 
    "org.apache.spark.deploy.PythonRunner", 

    "--py-files", 
    SPARK_HOME + "/python/lib/pyspark.zip,"+ SPARK_HOME +"/python/lib/py4j-0.9-src.zip", 

    // Python Program 
    "--primary-py-file", 
    SPARK_HOME + "/examples/src/main/python/pi.py", 

    // number of executors 
    "--num-executors", 
    "2", 

    // driver memory 
    "--driver-memory", 
    "512m", 

    // executor memory 
    "--executor-memory", 
    "512m", 

    // executor cores 
    "--executor-cores", 
    "2", 

    // argument 1 to my Spark program 
    "--arg", 
    slices, 

    // argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object) 
    "--arg", 
    "yarn-cluster" 
}; 
+0

Спасибо за класс, открывший путь к решению, в котором мы можем обрабатывать все это в python без необходимости java-клиента! – sehunley

0

было что-то всемогущее напечатано в STDERR? check by running yarn log -applicationId appid

oozie теперь поддерживает искровое действие, возможно, вы могли бы попробовать использовать oozie, а не писать свой собственный податель.

+0

Ну, стандартный вывод был в основном один и тот же вывод, что и консоль. Я смотрю на Hue и Livy Server для использования для представления работы. От взгляда на код кажется, что проблема связана с pyspark от моего java-клиента. Он запускает Scala и Java (jars) нормально, поскольку они работают через JVM? Но у python и R есть свои собственные среды, которые нужно вызвать? – sehunley

+0

Ну, в конце концов, я отказался от отправки заданий python YARN из кода Java. Гораздо проще было использовать Livy-Server для подачи заявок RESTful на Python, Java, Scala и R. Scott – sehunley

Смежные вопросы