2017-01-06 2 views
4

У меня проблемы с удалением строк из dataframe на основе двух столбцов списка элементов для фильтрации. Например, для этого dataframe:Невозможно создать литерал массива в spark/pyspark

df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id']) 
df.show() 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 100|  A|304| 
| 200|  B|305| 
| 300|  C|306| 
+------+------+---+ 

можно легко удалить строки с помощью isin на одной колонке:

df.where(~col('number').isin([100, 200])).show() 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 300|  C|306| 
+------+------+---+ 

Но когда я пытаюсь удалить их двумя колонками я получаю исключение:

df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show() 

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit. 
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A] 
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57) 
    at org.apache.spark.sql.functions$.lit(functions.scala:101) 
    at org.apache.spark.sql.functions.lit(functions.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 

После некоторого расследования я понял, что основной причиной проблемы является создание литералов из непримиримых типов. Я попытался следующий код в pyspark:

lit((100, 'A')) 
lit([100, 'A']) 

и следующее в Искре лестницу:

lit((100, "A")) 
lit(List(100, "A")) 
lit(Seq(100, "A")) 
lit(Array(100, "A")) 

, но не повезло ... Кто-нибудь знает, как создать массив буквальный в свече/pyspark ? Или есть другой способ фильтрации данных на два столбца?

ответ

1

Прежде всего вы, вероятно, хотите struct не arrays. Помните, что Spark SQL не поддерживает гетерогенные массивы, поэтому array(1, 'a') отлита до array<string>.

Так запрос может выглядеть следующим образом:

choices = [(100, 'A'), (200, 'B')] 

target = [ 
    struct(
     lit(number).alias("number").cast("long"), 
     lit(letter).alias("letter").cast("string")) 
    for number, letter in choices] 

query = struct("number", "letter").isin(target) 

Это, кажется, генерировать корректное выражение:

query 
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'> 

Но по какой-то причине не удается на анализаторе:

df.where(~query) 
AnalysisException       Traceback (most recent call last) 
... 
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n" 

Как ни странно с SQL следующий сбой, а также:

df.createOrReplaceTempView("df") 

spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))") 
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n +- SubqueryAlias df\n  +- LogicalRDD [number#0L, letter#1, id#2L]\n" 

, но при замене буквенные с обеих сторон:

spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))") 
DataFrame[number: bigint, letter: string, id: bigint] 

работает отлично, так это выглядит как ошибка.

Это сказанное осталось анти присоединиться должны хорошо работать здесь:

from pyspark.sql.functions import broadcast 

df.join(
    broadcast(spark.createDataFrame(choices, ("number", "letter"))), 
    ["number", "letter"], 
    "leftanti" 
) 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 300|  C|306| 
+------+------+---+ 
1

Для создания массива буквального в искре вам нужно создать массив из ряда колонн, где столбец создается из функции lit:

scala> array(lit(100), lit("A")) 
res1: org.apache.spark.sql.Column = array(100, A) 
Смежные вопросы