2015-09-06 4 views
2

Я загружаю некоторые данные в sparkR (Spark version 1.4.0, работающий на fedora21), над которым я запускаю некоторый алгоритм, который генерирует три разных числа. Мой алгоритм содержит множество параметров, и я хочу использовать разные параметры для одних и тех же данных. Формат вывода должен быть dataframe (или список CSV), столбцы которой являются параметры алгоритма и три числа моего Algo Вычисляет, т.е.собирать sparkr в dataframe

mypar1, mypar2, mypar3, myres1, myres2, myres3 
    1  1.5  1.2  5.6  8.212 5.9 
    2  1.8  1.7  5.1  7.78 8.34 

будет выходом для двух различных значений параметров. Я написал сценарий, ниже которого parallelises бегущего над различными настройками В параметре: он принимает входной файл со значениями параметров в качестве аргумента, что для приведенного выше примера будет выглядеть следующим образом:

1,1.5,1.2 
2,1.8,1.7 

так одну комбинации параметров в каждой строку.

Вот моя проблема: вместо того, чтобы получать по одному параметру, все числа объединены в один длинный список. Функция cv_spark возвращает data.frame (в основном одну строку). Как я могу сказать, что искра объединяет вывод cv_spark в фрейм данных (т. Е. Что-то вроде rbind?) Или список списка?

#!/home/myname/Spark/spark-1.4.0/bin/sparkR 

library(SparkR) 

sparkcontext <- sparkR.init("local[3]","cvspark",sparkEnvir=list(spark.executor.memory="1g")) 

cv_spark <- function(indata) { 
    cv_params <- strsplit(indata, split=",")[[1]] 
    param.par1 = as.integer(cv_params[1]) 
    param.par2 = as.numeric(cv_params[2]) 
    param.par3 = as.numeric(cv_params[3]) 
    predictions <- rep(NA, 1) 
    ## here I run some calculation on some data that I load to my SparkR session, 
    ## but for illustration purpose I'm just filling up with some random numbers 
    mypred = base:::sample(seq(5,10,by=0.01),3) 
    predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3]) 
    return(as.data.frame(predictions)) 
} 

args <- commandArgs(trailingOnly=TRUE) 
print(paste("args ", args)) 
cvpar = readLines(args[[1]]) 

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=4) 
myerr <- SparkR:::flatMap(rdd,cv_spark) 
output <- SparkR:::collect(myerr) 
print("final output") 
print(output) 

outfile = "spark_output.csv" 
write.csv(output,outfile,quote=FALSE,row.names=FALSE) 

ответ

1

мне удалось получить то, что я хотел с помощью flatMapValues вместо flatMap, а также путем создания (key, value) пары моих различных параметров настройки (в основном ключ номер строки в моем входном файле параметров и значением является параметры на том, что линия). Затем я вызываю reduceByKey, который по существу содержит одну строку за ключ. Измененный скрипт выглядит следующим образом:

#!/home/myname/Spark/spark-1.4.0/bin/sparkR 

library(SparkR) 

sparkcontext <- sparkR.init("local[4]","cvspark",sparkEnvir=list(spark.executor.memory="1g")) 

cv_spark <- function(indata) { 
    cv_params <- unlist(strsplit(indata[[1]], split=",")) 
    param.par1 = as.integer(cv_params[1]) 
    param.par2 = as.numeric(cv_params[2]) 
    param.par3 = as.integer(cv_params[3]) 
    predictions <- rep(NA, 1) 
    ## here I run some calculation on some data that I load to my SparkR session, 
    ## but for illustration purpose I'm just filling up with some random numbers 
    mypred = base:::sample(seq(5,10,by=0.01),3) 
    predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3]) 
    return(as.data.frame(predictions)) 
} 

args <- commandArgs(trailingOnly=TRUE) 
print(paste("args ", args)) 
cvpar = readLines(args[[1]]) 
## Creates (key, value) pairs 
cvpar <- Map(list,seq(1,length(cvpar)),cvpar) 

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=1) 
myerr <- SparkR:::flatMapValues(rdd,cv_spark) 
myerr <- SparkR:::reduceByKey(myerr,"c", 2L) 
output <- SparkR:::collect(myerr) 

myres <- sapply(output,`[`,2) 
df_res <- do.call("rbind",myres) 
colnames(df_res) <- c("Element","sigdf","sigq","err","err.sse","err.mse") 

outfile = "spark_output.csv" 
write.csv(df_res,outfile,quote=FALSE,row.names=FALSE) 

Это работает, как ожидалось, то есть на выходе будет dataframe (или CSV-файла) с таким же числом строк, как во входном файле указанного выше сценария (то есть число различных параметры значений параметров), но, возможно, есть более эффективный способ сделать это.

+0

Hi Hadron, Можете ли вы предоставить мне команду для запуска этой программы. –

+0

@Vijay_Shinde './myexample.R myparameterfile.txt' где myexample.R - это сценарий выше. убедитесь, что вы исправили shebang в своем скрипте. myparameterfile.txt содержит 3 числа, разделенные запятыми, на строку. – hadron

Смежные вопросы