2015-05-19 1 views
1

Я запускаю CDH 5.4, используя Spark 1.3.0 с поддержкой Spark on YARN. Когда я создаю простую паркетную таблицу в HIVE, а затем попытаюсь выполнить преобразование или агрегацию к ней с помощью PySpark, она вызывает это сообщение об ошибке. Есть предположения? Для того, чтобы воспроизвести проблему сделайте следующее ...Ошибки паркета в PySpark с использованием CDH 5.4 и Spark 1.3.0 с паркетным столом в HIVE

Hive:

CREATE TABLE IF NOT EXISTS TestTable_Parquet(
Investment_Id int, 
Identifier string, 
Package_Id int, 
AsOfDate timestamp 
) STORED AS PARQUET 
; 
INSERT INTO TABLE TestTable_Parquet 
VALUES (1, "id1", 1, "2015-01-01") 
; 

PySpark

:

test = sqlCtx.table("testtable_parquet") 
test.filter(test.identifier == "id1") 

Ошибка

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-3-690105998113> in <module>() 
     1 test = sqlCtx.table("testtable_parquet") 
----> 2 test.filter(test.identifier == "id1") 

/usr/lib/spark/python/pyspark/sql/dataframe.py in filter(self, condition) 
    627    jdf = self._jdf.filter(condition) 
    628   elif isinstance(condition, Column): 
--> 629    jdf = self._jdf.filter(condition._jc) 
    630   else: 
    631    raise TypeError("condition should be string or Column") 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o34.filter. 
: org.apache.spark.sql.AnalysisException: resolved attributes identifier missing from investment_id,identifier,package_id,asofdate; 
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) 
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:93) 
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:43) 
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:88) 
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.apply(CheckAnalysis.scala:43) 
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1069) 
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157) 
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:508) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
at py4j.Gateway.invoke(Gateway.java:259) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:207) 
at java.lang.Thread.run(Thread.java:745) 
+0

После недолгого игры, решение этой проблемы, как представляется, запуск этой команды множество конф первых, это требуется только когда вы хотите Спарк говорить Hive – DAE

ответ

1

После небольшой игры, появляется решение проблемы для запуска этой команды conf conf сначала, это требуется только тогда, когда вы хотите, чтобы Spark поговорила с Hive:

sqlCtx.setConf("spark.sql.hive.convertMetastoreParquet", "false") 
+0

где вы указать эту команду setconf? – G3M

Смежные вопросы