2014-09-26 5 views
3

Я новичок в Spark, а также в SparkR. Я успешно установил Spark и SparkR.Как построить модель логистической регрессии в SparkR

Когда я попытался построить модель логистической регрессии с R и Spark над файлом CSV, хранящимся в HDFS, я получил ошибку «неправильное количество измерений».

Мой код:

points <- cache(lapplyPartition(textFile(sc, "hdfs://localhost:54310/Henry/data.csv"), readPartition)) 

collect(points) 

w <- runif(n=D, min = -1, max = 1) 

cat("Initial w: ", w, "\n") 

# Compute logistic regression gradient for a matrix of data points 
gradient <- function(partition) { 
    partition = partition[[1]] 
    Y <- partition[, 1] # point labels (first column of input file) 
    X <- partition[, -1] # point coordinates 
    # For each point (x, y), compute gradient function 

    dot <- X %*% w 
    logit <- 1/(1 + exp(-Y * dot)) 
    grad <- t(X) %*% ((logit - 1) * Y) 
    list(grad) 
} 


for (i in 1:iterations) { 
    cat("On iteration ", i, "\n") 
    w <- w - reduce(lapplyPartition(points, gradient), "+") 
} 

Сообщение об ошибке является:

On iteration 1 
Error in partition[, 1] : incorrect number of dimensions 
Calls: do.call ... func -> FUN -> FUN -> Reduce -> <Anonymous> -> FUN -> FUN 
Execution halted 
14/09/27 01:38:13 ERROR Executor: Exception in task 0.0 in stage 181.0 (TID 189) 
java.lang.NullPointerException 
    at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:701) 
14/09/27 01:38:13 WARN TaskSetManager: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: 
     edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125) 
     org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
     org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:701) 
14/09/27 01:38:13 ERROR TaskSetManager: Task 0 in stage 181.0 failed 1 times; aborting job 
Error in .jcall(getJRDD(rdd), "Ljava/util/List;", "collect") : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 181.0 failed 1 times, most recent failure: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) Driver stacktrace: 

Размер данных (образец):

data <- read.csv("/home/Henry/data.csv") 

dim(data) 

[1] 17 541

Что может быть возможной причиной этого ошибка?

+0

Я думаю, вы забыли показать нам результат 'dim (data)'. – voidHead

+0

@voidHead, я добавил вывод dim (data) – Hanry

ответ

0

Проблема заключается в том, что textFile() читает некоторые текстовые данные и возвращает распределенную коллекцию из строк, каждая из которых соответствует строке текстового файла. Поэтому позже в программе partition[, -1] терпит неудачу. Фактическое намерение программы, по-видимому, относится к points как к распределенной коллекции данных. В настоящее время мы работаем над предоставлением поддержки фреймов данных в SparkR (SPARKR-1).

Для решения проблемы просто манипулируйте своим partition с помощью строковых операций, чтобы правильно извлечь X, Y. Некоторые другие способы включают (я думаю, вы, вероятно, видели это раньше), производя другой тип распределенной коллекции с самого начала, как это делается здесь: examples/logistic_regression.R.

+0

В то же время снижается и lapplyPartition удаляются (https://issues.apache.org/jira/browse/SPARK-7230) из интерфейса, поэтому, когда DataFrames будут доступны, эта программа будет полностью нефункциональной – piccolbo

Смежные вопросы