2015-11-05 3 views
0

SparkSql.Ошибка SparkSql Json

Im пытается прочитать json-файл> создать TempTable> сделать простой запрос> и сохранить как текст результат.

Я получаю сообщение об ошибке, которое я вставляю под файлом Json. Я новичок в Spark, поэтому, пожалуйста, объясните как можно больше. Вероятно, я ошибся.

Это код

import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

object testSql { 
    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("sparkSql") 
     .setMaster("local") 
    val sc = new SparkContext(conf) 

    val hiveCtx = new HiveContext(sc) 
    val input = hiveCtx.jsonFile("tweet.json") 
    input.registerTempTable("tweets") 
    val title = hiveCtx.sql("SELECT title FROM tweets") 
    title.saveAsTextFile("twt.json") 
    } 
} 

Это twt.json

{ 
     "glossary": { 
     "title": "example glossary", 
     "GlossDiv": { 
      "title": "S", 
      "GlossList": { 
       "GlossEntry": { 
        "ID": "SGML", 
        "SortAs": "SGML", 
        "GlossTerm": "Standard Generalized Markup Language", 
        "Acronym": "SGML", 
        "Abbrev": "ISO 8879:1986", 
        "GlossDef": { 
         "para": "A meta-markup language, used to create markup languages such as DocBook.", 
         "GlossSeeAlso": ["GML", "XML"] 
        }, 
        "GlossSee": "markup" 
       } 
      } 
     } 
    } 
} 

Это ошибка

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/11/05 17:21:17 INFO SparkContext: Running Spark version 1.4.0 
15/11/05 17:21:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
15/11/05 17:21:18 WARN Utils: Your hostname, daniele-S551LB resolves to a loopback address: 127.0.1.1; using 192.168.1.113 instead (on interface wlan0) 
15/11/05 17:21:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
15/11/05 17:21:18 INFO SecurityManager: Changing view acls to: daniele 
15/11/05 17:21:18 INFO SecurityManager: Changing modify acls to: daniele 
15/11/05 17:21:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(daniele); users with modify permissions: Set(daniele) 
15/11/05 17:21:18 INFO Slf4jLogger: Slf4jLogger started 
15/11/05 17:21:18 INFO Remoting: Starting remoting 
15/11/05 17:21:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:43893] 
15/11/05 17:21:18 INFO Utils: Successfully started service 'sparkDriver' on port 43893. 
15/11/05 17:21:18 INFO SparkEnv: Registering MapOutputTracker 
15/11/05 17:21:18 INFO SparkEnv: Registering BlockManagerMaster 
15/11/05 17:21:18 INFO DiskBlockManager: Created local directory at /tmp/spark-f7dac555-9371-442e-8f7c-fbc21bce9978/blockmgr-90f16880-f62c-4fb1-9ee6-5c8d269ed651 
15/11/05 17:21:18 INFO MemoryStore: MemoryStore started with capacity 944.7 MB 
15/11/05 17:21:18 INFO HttpFileServer: HTTP File server directory is /tmp/spark-f7dac555-9371-442e-8f7c-fbc21bce9978/httpd-7702cb01-9ac0-49cc-ad02-86ceb05623e5 
15/11/05 17:21:18 INFO HttpServer: Starting HTTP Server 
15/11/05 17:21:18 INFO Utils: Successfully started service 'HTTP file server' on port 51255. 
15/11/05 17:21:18 INFO SparkEnv: Registering OutputCommitCoordinator 
15/11/05 17:21:19 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
15/11/05 17:21:19 INFO SparkUI: Started SparkUI at http://192.168.1.113:4040 
15/11/05 17:21:19 INFO Executor: Starting executor ID driver on host localhost 
15/11/05 17:21:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56196. 
15/11/05 17:21:19 INFO NettyBlockTransferService: Server created on 56196 
15/11/05 17:21:19 INFO BlockManagerMaster: Trying to register BlockManager 
15/11/05 17:21:19 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56196 with 944.7 MB RAM, BlockManagerId(driver, localhost, 56196) 
15/11/05 17:21:19 INFO BlockManagerMaster: Registered BlockManager 
15/11/05 17:21:20 INFO MemoryStore: ensureFreeSpace(106480) called with curMem=0, maxMem=990621204 
15/11/05 17:21:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 104.0 KB, free 944.6 MB) 
15/11/05 17:21:20 INFO MemoryStore: ensureFreeSpace(10090) called with curMem=106480, maxMem=990621204 
15/11/05 17:21:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.9 KB, free 944.6 MB) 
15/11/05 17:21:20 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56196 (size: 9.9 KB, free: 944.7 MB) 
15/11/05 17:21:20 INFO SparkContext: Created broadcast 0 from jsonFile at testSql.scala:15 
15/11/05 17:21:20 INFO FileInputFormat: Total input paths to process : 1 
15/11/05 17:21:20 INFO SparkContext: Starting job: jsonFile at testSql.scala:15 
15/11/05 17:21:20 INFO DAGScheduler: Got job 0 (jsonFile at testSql.scala:15) with 1 output partitions (allowLocal=false) 
15/11/05 17:21:20 INFO DAGScheduler: Final stage: ResultStage 0(jsonFile at testSql.scala:15) 
15/11/05 17:21:20 INFO DAGScheduler: Parents of final stage: List() 
15/11/05 17:21:20 INFO DAGScheduler: Missing parents: List() 
15/11/05 17:21:20 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at jsonFile at testSql.scala:15), which has no missing parents 
15/11/05 17:21:20 INFO MemoryStore: ensureFreeSpace(3552) called with curMem=116570, maxMem=990621204 
15/11/05 17:21:20 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 944.6 MB) 
15/11/05 17:21:20 INFO MemoryStore: ensureFreeSpace(2002) called with curMem=120122, maxMem=990621204 
15/11/05 17:21:20 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2002.0 B, free 944.6 MB) 
15/11/05 17:21:20 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56196 (size: 2002.0 B, free: 944.7 MB) 
15/11/05 17:21:20 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874 
15/11/05 17:21:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at jsonFile at testSql.scala:15) 
15/11/05 17:21:20 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
15/11/05 17:21:20 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1421 bytes) 
15/11/05 17:21:20 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 
15/11/05 17:21:20 INFO HadoopRDD: Input split: file:/home/daniele/workspace/sparkSql/tweet.json:0+583 
15/11/05 17:21:20 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 
15/11/05 17:21:20 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 
15/11/05 17:21:20 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 
15/11/05 17:21:20 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 
15/11/05 17:21:20 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 
15/11/05 17:21:21 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
scala.MatchError: glossary (of class java.lang.String) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
    at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:965) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:963) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
15/11/05 17:21:21 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): scala.MatchError: glossary (of class java.lang.String) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
    at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:965) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:963) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

15/11/05 17:21:21 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
15/11/05 17:21:21 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/11/05 17:21:21 INFO TaskSchedulerImpl: Cancelling stage 0 
15/11/05 17:21:21 INFO DAGScheduler: ResultStage 0 (jsonFile at testSql.scala:15) failed in 0,683 s 
15/11/05 17:21:21 INFO DAGScheduler: Job 0 failed: jsonFile at testSql.scala:15, took 0,792765 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): scala.MatchError: glossary (of class java.lang.String) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) 
    at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
    at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:965) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:963) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1801) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/11/05 17:21:21 INFO SparkContext: Invoking stop() from shutdown hook 
15/11/05 17:21:21 INFO SparkUI: Stopped Spark web UI at http://192.168.1.113:4040 
15/11/05 17:21:21 INFO DAGScheduler: Stopping DAGScheduler 
15/11/05 17:21:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/11/05 17:21:21 INFO Utils: path = /tmp/spark-f7dac555-9371-442e-8f7c-fbc21bce9978/blockmgr-90f16880-f62c-4fb1-9ee6-5c8d269ed651, already present as root for deletion. 
15/11/05 17:21:21 INFO MemoryStore: MemoryStore cleared 
15/11/05 17:21:21 INFO BlockManager: BlockManager stopped 
15/11/05 17:21:21 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/11/05 17:21:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
15/11/05 17:21:21 INFO SparkContext: Successfully stopped SparkContext 
15/11/05 17:21:21 INFO Utils: Shutdown hook called 
15/11/05 17:21:21 INFO Utils: Deleting directory /tmp/spark-f7dac555-9371-442e-8f7c-fbc21bce9978 
+0

Возможный дубликат [Как прочитать Json-файл со специальным форматом Spark Scala?] (Http://stackoverflow.com/questions/31702167/how-to-read-a-json-file-with-a -специфичный-формат-с-spark-scala) – zero323

+0

Вы не можете прочитать многострочный JSON 'SqlContext.jsonFile'. Он ожидает один документ в строке. – zero323

ответ

1

Существует не так много, чтобы объяснить здесь. SqlContext.jsonFile ожидает один документ в строке. Он не может читать документы, охватывающие несколько строк, или анализировать списки документов в виде строк. Предполагая, что только один документ правильный вход хотел бы это:

val rdd = sc.parallelize(Seq("""{"glossary": {"GlossDiv": {"GlossList": {"GlossEntry": {"GlossDef": {"GlossSeeAlso": ["GML", "XML"], "para": "A meta-markup language, used to create markup languages such as DocBook."}, "GlossSee": "markup", "Acronym": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Abbrev": "ISO 8879:1986", "SortAs": "SGML", "ID": "SGML"}}, "title": "S"}, "title": "example glossary"}}""")) 

val df = sqlContext.read.json(rdd) 

df.printSchema 
// root 
// |-- glossary: struct (nullable = true) 
// | |-- GlossDiv: struct (nullable = true) 
// | | |-- GlossList: struct (nullable = true) 
// | | | |-- GlossEntry: struct (nullable = true) 
// | | | | |-- Abbrev: string (nullable = true) 
// | | | | |-- Acronym: string (nullable = true) 
// | | | | |-- GlossDef: struct (nullable = true) 
// | | | | | |-- GlossSeeAlso: array (nullable = true) 
// | | | | | | |-- element: string (containsNull = true) 
// | | | | | |-- para: string (nullable = true) 
// | | | | |-- GlossSee: string (nullable = true) 
// | | | | |-- GlossTerm: string (nullable = true) 
// | | | | |-- ID: string (nullable = true) 
// | | | | |-- SortAs: string (nullable = true) 
// | | |-- title: string (nullable = true) 
// | |-- title: string (nullable = true) 

Таким образом, вы просто должны предобработки данных, прежде чем он может быть использован для создания DataFrame.

+0

Большое спасибо. Таким образом, невозможно обработать данные json до тех пор, пока они не будут отформатированы в одной строке? –

+0

Если вы спросите о прямом преобразовании в DataFrame, тогда ответ будет отрицательным. В противном случае это зависит от контекста. Вы используете пользовательский формат ввода Hadoop, вы можете анализировать данные по разделам. Вы можете возиться с разделителями записей. – zero323

Смежные вопросы