2015-04-01 2 views
93

Предположим, что я делаю что-то вроде:Как изменить типы столбцов в DataFrame Spark SQL?

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) 
df.printSchema() 

root 
|-- year: string (nullable = true) 
|-- make: string (nullable = true) 
|-- model: string (nullable = true) 
|-- comment: string (nullable = true) 
|-- blank: string (nullable = true) 

df.show() 
year make model comment    blank 
2012 Tesla S  No comment     
1997 Ford E350 Go get one now th... 

, но я действительно хотел year в Int (и, возможно, трансформировать некоторые другие столбцы).

Лучшее, что я мог придумать это

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) 
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] 

который немного запутанным.

Я иду из R, и я привык к тому, что могу писать, например.

df2 <- df %>% 
    mutate(year = year %>% as.integer, 
      make = make %>% toupper) 

Я, вероятно, что-то отсутствует, так как должен быть лучший способ сделать это в свече/Скале ...

ответ

79

[EDIT: март 2016: спасибо за голоса! Хотя на самом деле это не лучший ответ, я думаю, что решения, основанные на withColumn, withColumnRenamed и cast, выдвинутые msemelman, Martin Senne и другими, проще и чище].

Я думаю, что ваш подход хорошо, напомню, что искра DataFrame является (неизменным) РДДОМ рядов, так что мы никогда действительно заменяющего колонна, просто создавая новые DataFrame каждый раз с новой схемой.

Если у вас есть оригинальный ФР со следующей схемой:

scala> df.printSchema 
root 
|-- Year: string (nullable = true) 
|-- Month: string (nullable = true) 
|-- DayofMonth: string (nullable = true) 
|-- DayOfWeek: string (nullable = true) 
|-- DepDelay: string (nullable = true) 
|-- Distance: string (nullable = true) 
|-- CRSDepTime: string (nullable = true) 

И некоторые UDF, определенных на одной или нескольких колонок:

import org.apache.spark.sql.functions._ 

val toInt = udf[Int, String](_.toInt) 
val toDouble = udf[Double, String](_.toDouble) 
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt) 
val days_since_nearest_holidays = udf( 
    (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 
) 

Изменение типов столбцов или даже строительство нового DataFrame от другого можно написать следующим образом:

val featureDf = df 
.withColumn("departureDelay", toDouble(df("DepDelay"))) 
.withColumn("departureHour", toHour(df("CRSDepTime"))) 
.withColumn("dayOfWeek",  toInt(df("DayOfWeek")))    
.withColumn("dayOfMonth",  toInt(df("DayofMonth")))    
.withColumn("month",   toInt(df("Month")))    
.withColumn("distance",  toDouble(df("Distance")))    
.withColumn("nearestHoliday", days_since_nearest_holidays(
       df("Year"), df("Month"), df("DayofMonth")) 
      )    
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
     "month", "distance", "nearestHoliday")    

который дает:

scala> df.printSchema 
root 
|-- departureDelay: double (nullable = true) 
|-- departureHour: integer (nullable = true) 
|-- dayOfWeek: integer (nullable = true) 
|-- dayOfMonth: integer (nullable = true) 
|-- month: integer (nullable = true) 
|-- distance: double (nullable = true) 
|-- nearestHoliday: integer (nullable = true) 

Это довольно близко к вашему собственному решению. Просто, сохраняя изменения типа и другие преобразования как отдельные udf val, сделайте код более читаемым и повторно используемым.

+8

Это небезопасно и эффективно. __Не безопасно, потому что одна ошибка «NULL» или неправильная запись приведет к сбою всей работы. __Не эффективно, потому что UDF не прозрачны для Catalyst. Использование UDF для сложных операций просто отлично, но нет оснований использовать их для базового типа. Вот почему у нас есть метод «cast» (см. [Ответ Мартина Сенне] (http://stackoverflow.com/a/32634826/1560062)). Сделать вещи прозрачными для Catalyst требует больше работы, но базовая безопасность - это просто вопрос использования 'Try' и' Option'. – zero323

+0

Я не видел ничего, связанного с преобразованием строки на сегодняшний день, например, «05-APR-2015» – dbspace

+3

Есть ли способ уменьшить раздел 'withColumn()' к универсальному, который выполняет итерацию через все столбцы? – Boern

9

Вы можете использовать selectExpr, чтобы сделать его немного очистителя:

df.selectExpr("cast(year as int) as year", "upper(make) as make", 
    "model", "comment", "blank") 
8

Чтобы преобразовать год из строки в целое, вы можете добавить следующие опции для читателя CSV: «inferSchema» -> «истинный» см DataBricks documentation

+4

Это работает хорошо, но уловка заключается в том, что читатель должен сделать второй проход вашего файла – beefyhalo

+0

@beefyhalo абсолютно спот на, есть ли какой-нибудь способ? –

23

Во-первых, если вы хотите литая Тип

import org.apache.spark.sql 
df.withColumn("year", $"year".cast(sql.types.IntegerType)) 

с таким же именем столбца, столбец будет заменен новым, вам не нужно т o добавить и удалить.

Во-вторых, о Scala vs R. код Scala наиболее похож на R, что я могу достигнуть:

val df2 = df.select(
    df.columns.map { 
    case year @ "year" => df(year).cast(IntegerType).as(year) 
    case make @ "make" => functions.upper(df(make)).as(make) 
    case other   => df(other) 
    }: _* 
) 

Хотя длина немного больше, чем R-х. Обратите внимание, что mutate является функцией для кадра данных R, поэтому Scala очень хороша в выразительной мощности, заданной без использования специальной функции.

(df.columns удивительно массив [String] вместо массива [Колонка], может быть, они хотят это выглядеть dataframe Python панд'S.)

+1

Не могли бы вы предоставить эквивалент для pyspark? –

+0

Я получаю «незаконный старт определения» .withColumn («возраст», «возраст» .cast (sql.types.DoubleType)) для моего «возраста». Любое предложение? – BlueDolphin

47

Как cast операции доступна Спарк Column-х (и, как я лично не жалуют udf «s, предложенные @Svend на данный момент), как о:

df.select(df("year").cast(IntegerType).as("year"), ...) 

приведение к запрашиваемому типу? В качестве аккуратного побочного эффекта значения, не зависящие/«конвертируемые» в этом смысле, станут null.

В случае, если вам это нужно, как вспомогательный метод, использование:

object DFHelper{ 
    def castColumnTo(df: DataFrame, cn: String, tpe: DataType) : DataFrame = { 
    df.withColumn(cn, df(cn).cast(tpe)) 
    } 
} 

, который используется как:

import DFHelper._ 
val df2 = castColumnTo(df, "year", IntegerType) 
+2

Можете ли вы посоветовать мне, как действовать, если мне нужно наложить и переименовать целую кучу столбцов (у меня есть 50 столбцов и довольно новый для scala, не уверен, что это лучший способ приблизиться к нему, не создавая массивного дублирования) ? Некоторые столбцы должны содержать String, некоторые должны быть отнесены к Float. –

+0

как преобразовать строку в дату, например «25-APR-2016» в столбце и «20160302» – dbspace

+0

@DmitrySmirnov Вы когда-нибудь получали ответ? У меня такой же вопрос. ;) –

87

С Спарк версии 1.4 вы можете применить метод произнесения с ТипДанных на колонка:

import org.apache.spark.sql.types.IntegerType 
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) 
    .drop("year") 
    .withColumnRenamed("yearTmp", "year") 

Если вы ar е с помощью SQL выражений вы также можете сделать:

val df2 = df.selectExpr("cast(year as int) year", 
         "make", 
         "model", 
         "comment", 
         "blank") 

Для получения дополнительной информации проверьте документы: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

+1

Почему вы использовали withColumn с последующим падением? Не проще просто использовать withColumn с исходным именем столбца? –

+0

@AmebaSpugnosa Я думаю, что к тому моменту, когда я использовал его, Spark разбился, если у него были повторные имена столбцов. Не когда вы их создаете, но когда используете их. – msemelman

+1

нет необходимости отбрасывать столбец, за которым следует переименование. Вы можете сделать в одной строке 'df.withColumn (« ctr », temp (« ctr »). Cast (DecimalType (decimalPrecision, decimalScale)))' – ruhong

6

Так что это только действительно работает, если ваши имеющие проблемы сохранения в драйвер JDBC, как SQLServer, но это действительно полезно для ошибок вы столкнетесь с синтаксисом и типами.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} 
import org.apache.spark.sql.jdbc.JdbcType 
val SQLServerDialect = new JdbcDialect { 
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") 

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { 
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) 
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) 
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) 
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) 
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) 
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) 
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) 
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) 
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) 
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) 
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) 
    //  case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) 
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) 
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") 
    } 
} 

JdbcDialects.registerDialect(SQLServerDialect) 
+0

Можете ли вы помочь мне реализовать тот же код на Java?и как зарегистрировать customJdbcDialect в DataFrame – abhijitcaps

+0

Ницца, я сделал то же самое с Vertica, но с искры 2.1. JDbcUtil вам нужно реализовать только конкретный тип данных, который вам нужен. getOrse ( throw new IllegalArgumentException (s) Невозможно получить тип JDBC для $ {dt.simpleString} ")) –

4

Java-код для изменения типа данных в DataFrame из строки в целое

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType)) 

Это просто отбрасывать существующую (String тип данных) в целое.

+1

В' sql. 'нет 'DataTypes'. types'! это 'DataType'. Кроме того, можно просто импортировать IntegerType и cast. –

+0

@ EhsanM.Kermani фактически DatyaTypes.IntegerType - это законная ссылка. – Cupitor

+1

@Cupitor 'DataTypes.IntegerType' раньше находился в [режиме DeveloperAPI] (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/types/IntegerType.html) и [стабильно в v.2.1.0] (https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/types/IntegerType.html) –

2

ответы, предлагающие использовать литые, FYI, метод отливки в искры 1.4.1 нарушен.

например, dataframe с строка столбца, имеющего значение «8182175552014127960», когда литым к BIGINT имеет значение «8182175552014128100»

df.show 
+-------------------+ 
|     a| 
+-------------------+ 
|8182175552014127960| 
+-------------------+ 

    df.selectExpr("cast(a as bigint) a").show 
+-------------------+ 
|     a| 
+-------------------+ 
|8182175552014128100| 
+-------------------+ 

Нам пришлось столкнуться много вопроса, прежде чем найти эту ошибку, потому что мы имели BIGINT колонны в производстве.

+3

psst, обновите свою искру – msemelman

+1

@msemelman смешно, что нужно обновить до новой версии искры в производстве для небольшой ошибки. – sauraI3h

+0

https://issues.apache.org/jira/browse/SPARK-8052 – msemelman

0
val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as("Data_Value_Std_Err")).rdd 
    //Schema to be applied to the table 
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) 

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates() 
0

Можно изменить тип данных столбца с помощью бросание в искровом SQL. имя таблицы - это таблица, и в ней есть только два столбца: только столбцы column1 и column2 и тип данных столбца1 должны быть изменены. ex-spark.sql ("select cast (column1 as Double) column1NewName, column2 from table") Вместо двойной записи введите свой тип данных.

2

Этот метод удаляет старый столбец и создает новые столбцы с одинаковыми значениями и новым типом данных. Мои первоначальные типы данных, когда была создана DataFrame были: -

root 
|-- id: integer (nullable = true) 
|-- flag1: string (nullable = true) 
|-- flag2: string (nullable = true) 
|-- name: string (nullable = true) 
|-- flag3: string (nullable = true) 

После этого я побежал следующий код, чтобы изменить тип данных: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3 
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>) 

После этого мой результат вышел быть: -

root 
|-- id: integer (nullable = true) 
|-- flag2: string (nullable = true) 
|-- name: string (nullable = true) 
|-- flag1: boolean (nullable = true) 
|-- flag3: boolean (nullable = true) 
0

Вы можете использовать нижеследующий код.

df.withColumn("year", df("year").cast(IntegerType)) 

Какой будет конвертировать год колонка IntegerType колонка.