2015-09-22 3 views
14

У меня есть DataFrame со схемойУдаление вложенного столбца из искрового DataFrame

root 
|-- label: string (nullable = true) 
|-- features: struct (nullable = true) 
| |-- feat1: string (nullable = true) 
| |-- feat2: string (nullable = true) 
| |-- feat3: string (nullable = true) 

While, я в состоянии фильтровать кадр данных с использованием

val data = rawData 
    .filter(!(rawData("features.feat1") <=> "100")) 

Я не могу отказаться от столбцов с помощью

val data = rawData 
     .drop("features.feat1") 

Это что-то, что я делаю неправильно здесь? Я также пробовал (безуспешно) делать drop(rawData("features.feat1")), хотя это не имеет большого смысла для этого.

Спасибо заранее,

Нихилу

+0

Что делать, если вместо этого вы сопоставили его с новой информационной рамкой? Я не думаю, что API DataFrame позволяет отбросить поле структуры в виде столбца структуры. –

+0

Ох. Я попытаюсь это сделать, но кажется довольно неудобным, если мне нужно сопоставить только для разрешения имени вложенного столбца следующим образом :( –

+0

Вы всегда можете получить все столбцы с помощью метода '.columns()' DataFrame, удалить ненужный столбец из последовательности и do 'select (myColumns: _ *)'. Должно быть немного короче. – Niemand

ответ

12

Это просто программирование упражнения, но вы можете попробовать что-то вроде этого:

import org.apache.spark.sql.{DataFrame, Column} 
import org.apache.spark.sql.types.{StructType, StructField} 
import org.apache.spark.sql.{functions => f} 
import scala.util.Try 

case class DFWithDropFrom(df: DataFrame) { 
    def getSourceField(source: String): Try[StructField] = { 
    Try(df.schema.fields.filter(_.name == source).head) 
    } 

    def getType(sourceField: StructField): Try[StructType] = { 
    Try(sourceField.dataType.asInstanceOf[StructType]) 
    } 

    def genOutputCol(names: Array[String], source: String): Column = { 
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) 
    } 

    def dropFrom(source: String, toDrop: Array[String]): DataFrame = { 
    getSourceField(source) 
     .flatMap(getType) 
     .map(_.fieldNames.diff(toDrop)) 
     .map(genOutputCol(_, source)) 
     .map(df.withColumn(source, _)) 
     .getOrElse(df) 
    } 
} 

Пример использования:

scala> case class features(feat1: String, feat2: String, feat3: String) 
defined class features 

scala> case class record(label: String, features: features) 
defined class record 

scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF 
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] 

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show 
+-------+--------+ 
| label|features| 
+-------+--------+ 
|a_label| [f2,f3]| 
+-------+--------+ 


scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 


scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 

Добавить implicit conversion, и вам хорошо идти.

9

Эта версия позволяет удалить вложенные столбцы на любом уровне:

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{StructType, DataType} 

/** 
    * Various Spark utilities and extensions of DataFrame 
    */ 
object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else { 
     colType match { 
     case colType: StructType => 
      if (dropColName.startsWith(s"${fullColName}.")) { 
      Some(struct(
       colType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } else { 
      Some(col) 
      } 
     case other => Some(col) 
     } 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 
} 

Использование:

import DataFrameUtils._ 
df.dropNestedColumn("a.b.c.d") 
+1

Спасибо большое!Любой шанс, что вы обновили это, чтобы удалить поле из структуры в массиве под массивом? Был взломан в течение дня, близко, но не может его получить. то есть parent: array >>> –

+1

@alexP_Keaton Привет, вы получили решение для удаления столбца внутри массива? –

+0

Я хотел бы добавить, что этот метод не сохраняет свойство «nullable» измененной родительской структуры. В этом примере 'features' станет' struct (nullable = false) ' –

2

Следующий фрагмент кода spektom для Скале, я создал подобный код в Java. Поскольку java 8 не имеет foldLeft, я использовал forEachOder. Этот код подходит для искры 2.x (я использую 2.1) Также я заметил, что удаление столбца и добавление его с помощью withColumn с тем же именем не работает, поэтому я просто заменяю столбец, и это кажется работать.

Кодекс не полностью протестирована, надеюсь, что он работает :-)

public class DataFrameUtils { 

public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) { 
    final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame); 
    Arrays.stream(dataFrame.schema().fields()) 
     .flatMap(f -> { 
      if (columnName.startsWith(f.name() + ".")) { 
       final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName); 
       if (column.isPresent()) { 
        return Stream.of(new Tuple2<>(f.name(), column)); 
       } else { 
        return Stream.empty(); 
       } 
      } else { 
       return Stream.empty(); 
      } 
     }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple)); 

    return dataFrameFolder.getDF(); 
} 

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) { 
    Optional<Column> column = Optional.empty(); 
    if (!fullColumnName.equals(dropColumnName)) { 
     if (colType instanceof StructType) { 
      if (dropColumnName.startsWith(fullColumnName + ".")) { 
       column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName))); 
      } 
     } else { 
      column = Optional.of(col); 
     } 
    } 

    return column; 
} 

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) { 
    return Arrays.stream(colType.fields()) 
     .flatMap(f -> { 
        final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(), 
          fullColumnName + "." + f.name(), dropColumnName); 
        if (column.isPresent()) { 
         return Stream.of(column.get().alias(f.name())); 
        } else { 
         return Stream.empty(); 
        } 
       } 
     ).toArray(Column[]::new); 

} 

private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> { 
    private Dataset<Row> df; 

    public DataFrameFolder(Dataset<Row> df) { 
     this.df = df; 
    } 

    public Dataset<Row> getDF() { 
     return df; 
    } 

    @Override 
    public void accept(Tuple2<String, Optional<Column>> colTuple) { 
     if (!colTuple._2().isPresent()) { 
      df = df.drop(colTuple._1()); 
     } else { 
      df = df.withColumn(colTuple._1(), colTuple._2().get()); 
     } 
    } 
} 

Пример использования:

private class Pojo { 
    private String str; 
    private Integer number; 
    private List<String> strList; 
    private Pojo2 pojo2; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

    public Pojo2 getPojo2() { 
     return pojo2; 
    } 

} 

private class Pojo2 { 
    private String str; 
    private Integer number; 
    private List<String> strList; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

} 

SQLContext context = new SQLContext(new SparkContext("local[1]", "test")); 
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class); 
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str"); 

Оригинальная структура:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = true) 
| |-- number: integer (nullable = true) 
| |-- str: string (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 

После падения:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = false) 
| |-- number: integer (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 
+0

добавьте простой пример того, как его называть, и я подниму вас – xXxpRoGrAmmErxXx

+1

Добавлен пример использования в запросе @xXxpRoGrAmmErxXx –

1

Расширение на спектком ответа. С поддержкой типов массивов:

object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else if (dropColName.startsWith(s"$fullColName.")) { 
     colType match { 
     case colType: StructType => 
      Some(struct(
      colType.fields 
       .flatMap(f => 
       dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
       }) 
       : _*)) 
     case colType: ArrayType => 
      colType.elementType match { 
      case innerType: StructType => 
       Some(struct(innerType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } 

     case other => Some(col) 
     } 
    } else { 
     Some(col) 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 

} 
Смежные вопросы