Я пытаюсь читать и писать с hbase с помощью pyspark.HBase чтение/запись с использованием pyspark
Код
from pyspark import SparkContext
import json
sc = SparkContext(appName="HBaseInputFormat")
host = "localhost"
table = "posts"
conf = {"hbase.zookeeper.quorum": "localhost", "hbase.mapreduce.inputtable": "posts"}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
def save_record(rdd):
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": "localhost",
"hbase.mapred.outputtable": "xxxx19",
"mapreduce.outputformat.class":
"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
row_rdd = rdd.map(lambda x: x.split("\n")[0])
datamap = row_rdd.map(lambda x: (str(json.loads(x)["row"]), [str(json.loads(x)["row"]), "p", "cats_json", "lolva"]))
datamap.saveAsNewAPIHadoopDataset(conf=conf, keyConverter=keyConv, valueConverter=valueConv)
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
message_rdd = hbase_rdd.map(lambda x:x[1]) # message_rdd = hbase_rdd.map(lambda x:x[0]) will give only row-key
save_record(message_rdd)
messages = message_rdd.take(1)
Исключение
17/02/02 16:02:25 INFO mapreduce.TableInputFormatBase: Input split length: 187 M bytes.
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
17/02/02 16:02:26 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 3)
java.lang.IllegalArgumentException: Must specify table name
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.setConf(TableOutputFormat.java:193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/02/02 16:02:26 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3, localhost): java.lang.IllegalArgumentException: Must specify table name
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.setConf(TableOutputFormat.java:193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/02/02 16:02:26 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job
17/02/02 16:02:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2
17/02/02 16:02:26 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 2)
17/02/02 16:02:26 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled
17/02/02 16:02:26 INFO scheduler.DAGScheduler: ResultStage 2 (saveAsNewAPIHadoopDataset at PythonRDD.scala:804) failed in 0.908 s
17/02/02 16:02:26 INFO scheduler.DAGScheduler: Job 2 failed: saveAsNewAPIHadoopDataset at PythonRDD.scala:804, took 0.977607 s
Traceback (most recent call last):
File "/home/sahil/Desktop/Relation_Extraction/pyspark_test.py", line 33, in <module>
save_record(message_rdd)
File "/home/sahil/Desktop/Relation_Extraction/pyspark_test.py", line 22, in save_record
datamap.saveAsNewAPIHadoopDataset(conf=conf1, keyConverter=keyConv, valueConverter=valueConv)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1346, in saveAsNewAPIHadoopDataset
File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaErrorTraceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
17/02/02 16:02:26 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.IllegalArgumentException: Must specify table name
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.setConf(TableOutputFormat.java:193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/02/02 16:02:26 INFO scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) on executor localhost: java.lang.IllegalArgumentException (Must specify table name) [duplicate 1]
17/02/02 16:02:26 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 3, localhost): java.lang.IllegalArgumentException: Must specify table name
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.setConf(TableOutputFormat.java:193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1074)
at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:804)
at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Must specify table name
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.setConf(TableOutputFormat.java:193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Как мы можем видеть, что я упомянул имя выходного таблицы как xxxx19, ошибка относительно Необходимо указать имя таблицы выглядит пугающим. Любая помощь будет приветствоваться.
Заранее спасибо
случайная догадка, что РДД использует значения конфигурационных, которые были даны во время создания. Попробуйте добавить «hbase.mapred.outputtable»: «xxxx19» к первому conf-объекту, и давайте посмотрим, помогает ли он – AdamSkywalker
. Это не сработало :( – wadhwasahil