2016-04-18 3 views
1

У меня есть DataFrame, который содержит различные столбцы. Один столбец содержит карту [Integer, Integer []]. Похоже, что { 2345 -> [1,34,2]; 543 -> [12,3,2,5]; 2 -> [3,4]} Теперь мне нужно отфильтровать некоторые ключи. У меня есть множество целых чисел (javaIntSet) в Java, с которым я должен фильтровать такие, чтоextract или filter MapType of Spark DataFrame

col(x).keySet.isin(javaIntSet) 

т.е.. приведенная выше карта должна содержать только ключи 2 и 543, но не две другие и должна выглядеть как {543 -> [12,3,2,5]; 2 -> [3,4]} после фильтрации.

Документация о том, как использовать класс столбцов Java, является разреженной. Как извлечь col (x), чтобы я мог просто фильтровать его в java, а затем заменить данные ячейки на отфильтрованную карту. Или есть какие-то полезные функции столбцов, которые я пропускаю. Могу ли я написать UDF2<Map<Integer, Integer[]>,Set<Integer>,Map<Integer,Integer[]> Я могу написать UDF1<String,String>, но я не уверен, как он работает с более сложными параметрами.

Обычно javaIntSet - это всего лишь дюжина и обычно менее 100 значений. Карта обычно также имеет только несколько записей (обычно 0-5).

Я должен сделать это на Java (к сожалению), но я знаком с Scala. Ответ Scala, который я перевел на Java, уже будет очень полезен.

ответ

2

Вам не нужен UDF. Может быть чище с одним, но вы можете так же легко сделать это с DataFrame.explode:

case class MapTest(id: Int, map: Map[Int,Int]) 
val mapDf = Seq(
    MapTest(1, Map((1,3),(2,10),(3,2))), 
    MapTest(2, Map((1,12),(2,333),(3,543))) 
).toDF("id", "map") 

mapDf.show 
+---+--------------------+ 
| id|     map| 
+---+--------------------+ 
| 1|Map(1 -> 3, 2 -> ...| 
| 2|Map(1 -> 12, 2 ->...| 
+---+--------------------+ 

Затем вы можете использовать взрываются:

mapDf.explode($"map"){ 
    case Row(map: Map[Int,Int] @unchecked) => { 
    val newMap = map.filter(m => m._1 != 1) // <-- do filtering here 
    Seq(Tuple1(newMap)) 
    } 
}.show 
+---+--------------------+--------------------+ 
| id|     map|     _1| 
+---+--------------------+--------------------+ 
| 1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)| 
| 2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...| 
+---+--------------------+--------------------+ 

Если вы действительно хотели сделать UDF, это будет выглядеть это:

val mapFilter = udf[Map[Int,Int],Map[Int,Int]](map => { 
    val newMap = map.filter(m => m._1 != 1) // <-- do filtering here 
    newMap 
}) 

mapDf.withColumn("newMap", mapFilter($"map")).show 
+---+--------------------+--------------------+ 
| id|     map|    newMap| 
+---+--------------------+--------------------+ 
| 1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)| 
| 2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...| 
+---+--------------------+--------------------+ 

DataFrame.explode является немного более сложным, но в конечном счете, более гибким. Например, вы можете разделить исходную строку на две строки: одну, содержащую карту с отфильтрованными элементами, а другую - с обратным - элементы, которые были отфильтрованы.

+0

Позор ОП не проверял этот ответ! Очень хорошие примеры Давид спасибо – dedpo