2016-05-07 7 views
6

Это должно быть легко, но ... с использованием Spark 1.6.1 .... У меня есть DataFrame # 1 со столбцами A, B, C . с значением:Создание нового Spark DataFrame с новым значением столбца на основе столбца в первом фрейме данных Java

A B C 
1 2 A 
2 2 A 
3 2 B 
4 2 C 

Я затем создать новый dataframe с новым столбцом D так:

DataFrame df2 = df1.withColumn("D", df1.col("C")); 

до сих пор так хорошо, но я на самом деле хочу, значение в столбце D, чтобы быть условной, а именно:

// pseudo code 
if (col C = "A") the col D = "X" 
else if (col C = "B") the col D = "Y" 
else col D = "Z" 

Затем я закрою столбец C и переименуйте D на C. Я пробовал смотреть на функции столбца, но ничего не похоже на счет; Я думал об использовании df1.rdd(). Map() и итерации по строкам, но, кроме того, что на самом деле не удалось заставить его работать, я думал, что вся точка DataFrames должна отойти от абстракции RDD?

К сожалению, я должен сделать это на Java (и, конечно, Spark с Java не является оптимальным !!). Похоже, что я пропускаю очевидное, и я счастлив, что меня показывают, что он идиот, когда ему представлено решение!

ответ

12

Я считаю, что вы можете использовать when для достижения этого. Кроме того, вы, вероятно, можете заменить старый столбец напрямую. Для примера, код будет что-то вроде:

import static org.apache.spark.sql.functions.*; 

Column newCol = when(col("C").equalTo("A"), "X") 
    .when(col("C").equalTo("B"), "Y") 
    .otherwise("Z"); 

DataFrame df2 = df1.withColumn("C", newCol); 

Для получения более подробной информации о when, проверьте Column Javadoc.

+1

Спасибо за это - я действительно смотрел очевидное в лицо: s - то, что я пропускал был статический импорт из SQL функций, то есть: импорт статических org.apache. spark.sql.functions. * – user1128482

+0

@ user1128482 Прошу прощения, я забыл об импорте. Хорошо знать, что вы узнали в конце. –

2

Благодаря Даниилу я решил это :)

Недостающий кусок был статический импорт из SQL функций

import static org.apache.spark.sql.functions.*; 

Должно быть, я попробовал миллион различных способов использования, когда, но получил компилировать сбои/ошибки выполнения, потому что я не делал импорт. Когда-то импорт Даниэля был на месте!

1

Вы также можете использовать udf для выполнения той же работы. Просто написать простой, если потом еще структура

import org.apache.spark.sql.functions.udf 
val customFunct = udf { d => 
     //if then else construct 
    } 

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))