2016-11-03 3 views
2

Я написал простую программу Spark и хочу развернуть ее на распределенные серверы. Это довольно просто:Spark: Плохая производительность на распределенной системе. Как улучшить>

получить данные-> упорядочить данные-> данные поезда-> повторно подать заявку на обучение.

Входные данные - это всего лишь 10 тыс. Строк, с тремя функциями. Сначала я запускал свою локальную машину, используя «local [*]». Он длится около 3 минут. Теперь, когда я развертываю кластер, он работает очень медленно: полчаса не закончено. На стадии обучения он становится очень медленным.

Мне любопытно, если я сделал что-то не так. Пожалуйста, помогите мне проверить. Я использую Spark 1.6.1.

Я утверждаю:

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 orderprediction_2.11-1.0.jar --driver-cores 1 --driver-memory 4g --executor-cores 8 --executor-memory 4g 

Код здесь:

def main(args: Array[String]) { 
    // Set the log level to only print errors 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    val conf = new SparkConf() 
     .setAppName("My Prediction") 
     //.setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val data = sqlContext.read 
     .option("header","true") 
     .option("delimiter", "\t") 
     .format("com.databricks.spark.csv") 
     .option("inferSchema","true") 
     .load("mydata.txt") 

    data.printSchema() 
    data.show() 

    val dataDF = data.toDF().filter("clicks >=10") 
    dataDF.show() 

    val assembler = new VectorAssembler() 
     .setInputCols(Array("feature1", "feature2", "feature3")) 
     .setOutputCol("features") 

    val trainset = assembler.transform(dataDF).select("target", "features") 
    trainset.printSchema() 
    val trainset2 = trainset.withColumnRenamed("target", "label") 

    trainset2.printSchema() 
    val trainset3 = trainset2.withColumn("label", trainset2.col("label").cast(DataTypes.DoubleType)) 
    trainset3.cache() // cache data into memory 
    trainset3.printSchema() 
    trainset3.show() 

    // Train a RandomForest model. 

    println("training Random Forest") 

    val rf = new RandomForestRegressor() 
     .setLabelCol("label") 
     .setFeaturesCol("features") 
     .setNumTrees(1000) 

    val rfmodel = rf.fit(trainset3) 

    println("prediction") 
    val result = rfmodel.transform(trainset3) 

    result.show() 
} 

Update: После расследования, я обнаружил, что застряла в

collectAsMap at RandomForest.scala:525 

Это уже 1,1 часа провели на этой линии еще не закончена. Данные, я считаю, всего несколько мегабайт.

+0

Сколько количества исполнителей, которые вы используете в кластере? Вы пытались увеличить память исполнителей или посмотрели ли вы на этапы, где в кластере больше времени? –

+0

--driver-core 1 --driver-memory 4g -executor-core 8 --executor-memory 4g. Фактически, файл составляет всего 180 М, поэтому память должна быть более чем достаточно. – lserlohn

+0

Это было на этапе обучения. В обучении дерева решений – lserlohn

ответ

0

Вы строите RandomForest из 1000 RandomTrees, которые будут обучать 1000 экземпляров.

В коде collectAsMap первое действие, а все остальные - преобразования (оцениваются лениво). Поэтому, пока вы видите это , висит в этой строке, потому что теперь оцениваются все maps, flatMaps, filters, groupBy, и т. Д.

+0

, но почему это так много времени? – lserlohn

Смежные вопросы