2016-01-12 2 views
1

Мне было интересно, если sparkR упростит объединение больших наборов данных в отличие от «обычного R»? У меня есть 12 файлов csv, которые составляют приблизительно 500 000 строк на 40 столбцов. Эти файлы являются ежемесячными данными за 2014 год. Я хочу сделать один файл за 2014 год. Все файлы имеют одинаковые метки столбцов, и я хочу объединиться по первому столбцу (году). Однако некоторые файлы имеют больше строк, чем другие.Слияние больших наборов данных с использованием sparkR

Когда я побежал следующий код:

setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014") 

file_list <- list.files() 

for (file in file_list){ 

# if the merged dataset doesn't exist, create it 
if (!exists("dataset")){ 
dataset <- read.table(file, header=TRUE, sep="\t") 
} 

# if the merged dataset does exist, append to it 
if (exists("dataset")){ 
temp_dataset <-read.table(file, header=TRUE, sep="\t") 
dataset<-rbind(dataset, temp_dataset) 
rm(temp_dataset) 
} 

} 

R разбился.

Когда я запустил этот код:

library(SparkR) 
library(magrittr) 
# setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014\\Jan2014.csv") 
sc <- sparkR.init(master = "local") 
sqlContext <- sparkRSQL.init(sc) 

Jan2014_file_path <- file.path('Jan2014.csv') 

system.time(
housing_a_df <- read.df(sqlContext, 
         "C:\\Users\\Anonymous\\Desktop\\Data  2014\\Jan2014.csv", 
         header='true', 
         inferSchema='false') 
) 

я получил следующие ошибки:

Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): 

Так что было бы простой способ объединить эти файлы в sparkR?

+0

Вы читаете [это] (http://stackoverflow.com/questions/23169645/r-3-0-3-rbind-multiple-csv-files) ответ? В первом разделе все файлы в файлах 'file_list' csv? –

+0

Вы говорите, что хотите «объединиться по первому столбцу», но в вашем примере кода вы объединяете строки из разных файлов. Ответы ниже (на момент написания этой статьи) касаются слияния = объединение, а не конкатенация. – kasterma

+0

Отвечает ли кто-нибудь ниже на ваш вопрос? Если да, любезно согласитесь с ответом. Это может помочь другим разработчикам – sag

ответ

0

Вы должны прочитать файл CSV в следующем формате: Ref: https://gist.github.com/shivaram/d0cd4aa5c4381edd6f85

# Launch SparkR using 
# ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 

# The SparkSQL context should already be created for you as sqlContext 
sqlContext 
# Java ref type org.apache.spark.sql.SQLContext id 1 

# Load the local CSV file using `read.df`. Note that we use the CSV reader Spark package here. 
Jan2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Jan2014.csv", "com.databricks.spark.csv", header="true") 

Feb2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Feb2014.csv", "com.databricks.spark.csv", header="true") 

#For merging/joining by year 

#join 
    jan_feb_2014 <- join(Jan2014 , Feb2014 , joinExpr = Jan2014$year == Feb2014$year1, joinType = "left_outer") 
# I used "left_outer", so i want all columns of Jan2014 and matching of columns Feb2014, based upon your requirement change the join type. 
#rename the Feb2014 column name year to year1, as it gets duplicated while joining. Then you can remove the column "jan_feb_2014$year1" after joining by the code, "jan_feb_2014$year1 <- NULL" 

Это, как вы можете присоединиться к одному по одному файлу.

+0

Соединяет ли добавить столбцы в dataframe из другого daraframe? Поскольку он хочет объединить два файла csv, я думаю, что объединение может не соответствовать ему. – sag

+0

Он хотел объединиться по первому столбцу «год», поэтому я использовал соединение. Возможно, он хочет, чтобы все месяцы были в колонках. @ SamuelAlexander –

0

После того как вы прочитали файлы как данные, вы можете использовать unionAll из SparkR, чтобы объединить данные в единый блок данных. Затем вы можете записать его в один файл csv.

Пример кода

df1 <- read.df(sqlContext, "/home/user/tmp/test1.csv", source = "com.databricks.spark.csv") 
    df2 <- read.df(sqlContext, "/home/user/tmp/test2.csv", source = "com.databricks.spark.csv") 
    mergedDF <- unionAll(df1, df2) 
    write.df(mergedDF, "merged.csv", "com.databricks.spark.csv", "overwrite") 

Я тестировал и использовал его, но не против данных вашего размера. Но я надеюсь, что это вам поможет

Смежные вопросы