1

У меня есть некоторые проблемы с Cloudera VM и Spark. Прежде всего, я совершенно новичок в Spark, и мой босс попросил меня запустить Spark на Scala на виртуальной машине для какого-то теста.SparkSQL-Scala с POM

Я загрузил виртуальную машину в среду виртуального ящика, поэтому я открываю Eclipse, и у меня появился новый проект на Maven. Непосредственно после запуска среды Cloudera и запуска всех сервисов, таких как Spark, Yarn, Hive и т. Д. Все услуги работают нормально, и все проверки в услугах Cloudera являются зелеными. Я провел некоторое испытание с Impala, и это работает отлично.

С Eclipse, и средой Scala-Maven, вещи стали хуже всего: это мой очень простой код в Scala:

package org.test.spark 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 

object TestSelectAlgorithm { 

    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("TestSelectAlgorithm") 
     .setMaster("local") 
    val sc = new SparkContext(conf) 

    val sqlContext = new SQLContext(sc) 

    val df = sqlContext.sql("SELECT * FROM products").show() 
    } 
} 

Тест очень прост, так как таблица «продукты» существует: если я копирую -and-вставить тот же запрос на Impala, запрос работает отлично!

В среде Eclipse, в противном случае, у меня есть некоторые проблемы:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
16/06/30 05:43:17 INFO SparkContext: Running Spark version 1.6.0 
16/06/30 05:43:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/06/30 05:43:18 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 
16/06/30 05:43:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
16/06/30 05:43:18 INFO SecurityManager: Changing view acls to: cloudera 
16/06/30 05:43:18 INFO SecurityManager: Changing modify acls to: cloudera 
16/06/30 05:43:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriver' on port 53730. 
16/06/30 05:43:19 INFO Slf4jLogger: Slf4jLogger started 
16/06/30 05:43:19 INFO Remoting: Starting remoting 
16/06/30 05:43:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:39288] 
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 39288. 
16/06/30 05:43:19 INFO SparkEnv: Registering MapOutputTracker 
16/06/30 05:43:19 INFO SparkEnv: Registering BlockManagerMaster 
16/06/30 05:43:19 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7d685fc0-ea88-423a-9335-42ca12db85da 
16/06/30 05:43:19 INFO MemoryStore: MemoryStore started with capacity 1619.3 MB 
16/06/30 05:43:20 INFO SparkEnv: Registering OutputCommitCoordinator 
16/06/30 05:43:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/06/30 05:43:20 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040 
16/06/30 05:43:20 INFO Executor: Starting executor ID driver on host localhost 
16/06/30 05:43:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57294. 
16/06/30 05:43:20 INFO NettyBlockTransferService: Server created on 57294 
16/06/30 05:43:20 INFO BlockManagerMaster: Trying to register BlockManager 
16/06/30 05:43:20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57294 with 1619.3 MB RAM, BlockManagerId(driver, localhost, 57294) 
16/06/30 05:43:20 INFO BlockManagerMaster: Registered BlockManager 
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products; 
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:306) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:315) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:310) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:310) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:300) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36) 
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) 
    at org.test.spark.TestSelectAlgorithm$.main(TestSelectAlgorithm.scala:18) 
    at org.test.spark.TestSelectAlgorithm.main(TestSelectAlgorithm.scala) 
16/06/30 05:43:22 INFO SparkContext: Invoking stop() from shutdown hook 
16/06/30 05:43:22 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040 
16/06/30 05:43:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/06/30 05:43:22 INFO MemoryStore: MemoryStore cleared 
16/06/30 05:43:22 INFO BlockManager: BlockManager stopped 
16/06/30 05:43:22 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/06/30 05:43:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/06/30 05:43:22 INFO SparkContext: Successfully stopped SparkContext 
16/06/30 05:43:22 INFO ShutdownHookManager: Shutdown hook called 
16/06/30 05:43:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-29d381e9-b5e7-485c-92f2-55dc57ca7d25 

Основная ошибка это (для меня):

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products; 

я искал на другом сайте и документации, и я основал что проблема связана с таблицей Hive ... но я не использую таблицу Hive, я использую SparkSql ...

Может ли кто-нибудь помочь мне, пожалуйста? Спасибо за любой ответ.

+0

где эта таблица 'products' существует? в реляционном db? или вы пытаетесь прочитать файл из hdfs? –

+0

из hdfs: возможно, я выполняю тот же запрос на http: //quickstart.cloudera: 8888/impala/execute/query/8 # query/results ==> IMPALA, в виртуальной машине - и это работает отлично. – Alessandro

+0

вам нужно будет использовать dataframe или 'create the schema> register temp table> run query' - этот код даст вам некоторый намек - для формата текстового файла: https://gist.github.com/InvisibleTech/c71cb88b2390eb2223a8 для формата jsonfile : http: //www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm –

ответ

1

Вы можете проверить /user/cloudera/.sparkStaging/stagingArea местоположение существует или содержит .avro файл ?? И, пожалуйста, измените «Ваш адрес местонахождения» по месту нахождения каталога.
Для получения более подробной информации, пожалуйста, проверьте страницу avro github. https://github.com/databricks/spark-avro

2

In spark, Для impala нет прямой поддержки, поскольку улей имеет. Так что вам нужно загрузить файл. Если CSV вы можете использовать искровой CSV,

val df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("inferSchema", "true") 
     .load("your .csv file location") 

import sqlContext.implicits._ 
import sqlContext._ 

df.registerTempTable("products") 

sqlContext.sql("select * from products").show() 

П зависимость для искрового Csv

<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 --> 
<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-csv_2.10</artifactId> 
    <version>1.4.0</version> 
</dependency> 

для Авро есть искровой Avro

val sqlContext = new SQLContext(sc) 

val df = sqlContext.read.avro("your .avro file location") 

import sqlContext.implicits._ 
import sqlContext._ 

df.registerTempTable("products") 


val result= sqlContext.sql("select * from products") 
val result.show() 

result.write 
    .format("com.databricks.spark.avro") 
    .save("Your ouput location") 

П зависимость для AVRO

<!-- http://mvnrepository.com/artifact/com.databricks/spark-avro_2.10 --> 
      <dependency> 
       <groupId>com.databricks</groupId> 
       <artifactId>spark-avro_2.10</artifactId> 
       <version>2.0.1</version> 
      </dependency> 

и паркетная искра имеет встроенный d поддержка

val sqlContext = new SQLContext(sc) 
    val parquetFile = sqlContext.read.parquet("your parquet file location") 

    parquetFile.registerTempTable("products") 

    sqlContext.sql("select * from products").show() 
+0

ОК, в первую очередь: большое спасибо! Теперь я должен понять, какой путь я могу поместить в «местоположение вашего файла»! Теперь создаются таблицы «продукты», для чего проблема решена !!! ;-) Но, если вы захотите помочь мне в другой раз, можете ли вы сказать мне, где я нашел путь для вывода формата avro? – Alessandro

+0

В этом коде мы не сохранили файл в любом месте, для которого вам нужно выполнить дополнительную работу. проверьте обновленный код. –

Смежные вопросы