В настоящее время я использую Java (и Yarn Client) для отправки заданий в кластер пряжи (в среде Ubunutu/Linux). При отправке программ на Java все работает отлично. При отправке программы на Python она, похоже, останавливается в состоянии ACCEPTED и, в конечном итоге, выдается ошибка.Проблема Подача приложения Python на пряжу из кода Java
Вот код, который я использую, чтобы представить программу:
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
* This class submits a SparkPi to a YARN from a Java client (as opposed
* to submitting a Spark job from a shell command line using spark-submit).
*
* To accomplish submitting a Spark job from a Java client, we use
* the org.apache.spark.deploy.yarn.Client class described below:
*
Usage: org.apache.spark.deploy.yarn.Client [options]
Options:
--jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)
--class CLASS_NAME Name of your application's main class (required)
--primary-py-file A main Python file
--arg ARG Argument to be passed to your application's main class.
Multiple invocations are possible, each will be passed in order.
--num-executors NUM Number of executors to start (Default: 2)
--executor-cores NUM Number of cores per executor (Default: 1).
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
--driver-cores NUM Number of cores used by the driver (Default: 1).
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
--name NAME The name of your application (Default: Spark)
--queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')
--addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
--files files Comma separated list of files to be distributed with the job.
--archives archives Comma separated list of archives to be distributed with the job.
How to call this program example:
export SPARK_HOME="/opt/spark/spark-1.6.0"
java -DSPARK_HOME="$SPARK_HOME" org.dataalgorithms.client.SubmitYARNJobFromJava 10
*/
public class SubmitPyYARNJobFromJava {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// this is passed to SparkPi program
String slices = args[0];
// String slices = "15";
// String SPARK_HOME = System.getProperty("SPARK_HOME");
String SPARK_HOME = "/opt/spark/spark-1.6.0";
//
pi(SPARK_HOME, slices); // ... the code being measured ...
//
long elapsedTime = System.currentTimeMillis() - startTime;
}
static void pi(String SPARK_HOME, String slices) throws Exception {
//
String[] args = new String[]{
// application name
"--name",
"SparkPi-Python",
// Python Program
"--primary-py-file",
SPARK_HOME + "/examples/src/main/python/pi.py",
// number of executors
"--num-executors",
"2",
// driver memory
"--driver-memory",
"512m",
// executor memory
"--executor-memory",
"512m",
// executor cores
"--executor-cores",
"2",
// argument 1 to my Spark program
"--arg",
slices,
// argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object)
"--arg",
"yarn-cluster"
};
Configuration config = new Configuration();
//
System.setProperty("SPARK_YARN_MODE", "true");
//
SparkConf sparkConf = new SparkConf();
ClientArguments clientArgs = new ClientArguments(args, sparkConf);
Client client = new Client(clientArgs, config, sparkConf);
client.run();
// done!
}
}
Я вызова кода из командной строки следующим образом:
java -cp *:. SubmitPyYARNJobFromJava 10
Программа Pi.py является стандартная программа, поставляемая с Spark-1.6.0, созданная для Hadoop-2.6.0.
from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count/n))
sc.stop()
Когда работа подана, похоже, что она будет отправлена правильно. Он добирается до принятого состояния, а затем он останавливается.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/12 16:15:24 INFO Client: Requesting a new application from cluster with 1 NodeManagers
16/02/12 16:15:24 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
16/02/12 16:15:24 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
16/02/12 16:15:24 INFO Client: Setting up container launch context for our AM
16/02/12 16:15:24 INFO Client: Setting up the launch environment for our AM container
16/02/12 16:15:24 INFO Client: Preparing resources for our AM container
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/home/shunley/workspace/rabbitmq_java_rpc/spark-assembly-1.6.0-hadoop2.6.0.jar
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/tmp/spark-7dbbb73f-e5bc-4fc1-a535-02a60cb68b16/__spark_conf__6244658246692860568.zip
16/02/12 16:15:25 INFO SecurityManager: Changing view acls to: shunley
16/02/12 16:15:25 INFO SecurityManager: Changing modify acls to: shunley
16/02/12 16:15:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shunley); users with modify permissions: Set(shunley)
16/02/12 16:15:26 INFO Client: Submitting application 8 to ResourceManager
16/02/12 16:15:26 INFO YarnClientImpl: Submitted application application_1455307995259_0008
16/02/12 16:15:27 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:27 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1455311726233
final status: UNDEFINED
tracking URL: http://shunley-VirtualBox:8088/proxy/application_1455307995259_0008/
user: shunley
16/02/12 16:15:28 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:29 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:30 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:31 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:32 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:33 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:34 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
Тогда наконец FAILS и ошибка, которая дается следующая:
16/02/12 16:43:56 INFO Client: Application report for application_1455307995259_0009 (state: FAILED)
16/02/12 16:43:56 INFO Client:
client token: N/A
diagnostics: Application application_1455307995259_0009 failed 2 times due to AM Container for appattempt_1455307995259_0009_000002 exited with exitCode: 10
For more detailed output, check application tracking page:http://shunley-VirtualBox:8088/proxy/application_1455307995259_0009/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1455307995259_0009_02_000001
Exit code: 10
Stack trace: ExitCodeException exitCode=10:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 10
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1455313224060
final status: FAILED
tracking URL: http://shunley-VirtualBox:8088/cluster/app/application_1455307995259_0009
user: shunley
Exception in thread "main" org.apache.spark.SparkException: Application application_1455307995259_0009 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1029)
at SubmitPyYARNJobFromJava.pi(SubmitPyYARNJobFromJava.java:101)
at SubmitPyYARNJobFromJava.main(SubmitPyYARNJobFromJava.java:52)
16/02/12 16:43:56 INFO ShutdownHookManager: Shutdown hook called
16/02/12 16:43:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-f5f15d4f-7383-4a97-b2ff-5734148d8a29
Я пытался искать везде на гугле что-нибудь подобное, но безрезультатно. Кто-нибудь видел что-то подобное раньше? Мне нужно иметь возможность отправлять приложения python и java в Yarn через код. Пока что Python - единственный, который не работает. Я могу представить Java и Scala (еще не пробовал R), но Python, который использует наши Ученые Data для машинного обучения, не работает.
Любая помощь или указатели, которые помогут, будут НАСТОЯТЕЛЬНО оценены!
Спасибо.
Спасибо за класс, открывший путь к решению, в котором мы можем обрабатывать все это в python без необходимости java-клиента! – sehunley