Я написал простую программу Spark и хочу развернуть ее на распределенные серверы. Это довольно просто:Spark: Плохая производительность на распределенной системе. Как улучшить>
получить данные-> упорядочить данные-> данные поезда-> повторно подать заявку на обучение.
Входные данные - это всего лишь 10 тыс. Строк, с тремя функциями. Сначала я запускал свою локальную машину, используя «local [*]». Он длится около 3 минут. Теперь, когда я развертываю кластер, он работает очень медленно: полчаса не закончено. На стадии обучения он становится очень медленным.
Мне любопытно, если я сделал что-то не так. Пожалуйста, помогите мне проверить. Я использую Spark 1.6.1.
Я утверждаю:
spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 orderprediction_2.11-1.0.jar --driver-cores 1 --driver-memory 4g --executor-cores 8 --executor-memory 4g
Код здесь:
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("My Prediction")
//.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val data = sqlContext.read
.option("header","true")
.option("delimiter", "\t")
.format("com.databricks.spark.csv")
.option("inferSchema","true")
.load("mydata.txt")
data.printSchema()
data.show()
val dataDF = data.toDF().filter("clicks >=10")
dataDF.show()
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val trainset = assembler.transform(dataDF).select("target", "features")
trainset.printSchema()
val trainset2 = trainset.withColumnRenamed("target", "label")
trainset2.printSchema()
val trainset3 = trainset2.withColumn("label", trainset2.col("label").cast(DataTypes.DoubleType))
trainset3.cache() // cache data into memory
trainset3.printSchema()
trainset3.show()
// Train a RandomForest model.
println("training Random Forest")
val rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(1000)
val rfmodel = rf.fit(trainset3)
println("prediction")
val result = rfmodel.transform(trainset3)
result.show()
}
Update: После расследования, я обнаружил, что застряла в
collectAsMap at RandomForest.scala:525
Это уже 1,1 часа провели на этой линии еще не закончена. Данные, я считаю, всего несколько мегабайт.
Сколько количества исполнителей, которые вы используете в кластере? Вы пытались увеличить память исполнителей или посмотрели ли вы на этапы, где в кластере больше времени? –
--driver-core 1 --driver-memory 4g -executor-core 8 --executor-memory 4g. Фактически, файл составляет всего 180 М, поэтому память должна быть более чем достаточно. – lserlohn
Это было на этапе обучения. В обучении дерева решений – lserlohn