2017-01-20 3 views
2

Я новичок в этой области, поэтому я не могу получить смысл этого ...Выпуск данных Спарк Streaming помещать данные в HBase

  • HBase версия: 0.98.24-hadoop2
  • Спарк ver: 2.1.0

Следующий код пытается помещать данные о получении от производителя Spark Streming-Kafka в HBase.

  • Кафка формат входных данных, как это:

    Строка1, TAG1,123
    Строка1, TAG2,134

искровым потокового процесса разделения приемной линии по ограничителю», 'затем поместите данные в HBase. Однако мое приложение встретило ошибку при вызове метода htable.put(). Может ли кто-нибудь помочь, почему приведенный ниже код порождает ошибку?

спасибо.

JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() { 
    private static final long serialVersionUID = 7113426295831342436L; 

    HTable htable; 
    public HTable set() throws IOException{ 
     Configuration hconfig = HBaseConfiguration.create(); 
     hconfig.set("hbase.zookeeper.property.clientPort", "2222"); 
     hconfig.set("hbase.zookeeper.quorum", "127.0.0.1"); 

     HConnection hconn = HConnectionManager.createConnection(hconfig); 

     htable = new HTable(hconfig, tableName); 

     return htable; 
    }; 
    @Override 
    public Iterator<String> call(String x) throws IOException { 

     ////////////// Put into HBase ///////////////////// 
     String[] data = x.split(","); 

     if (null != data && data.length > 2){ 
      SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); 
      String ts = sdf.format(new Date()); 

      Put put = new Put(Bytes.toBytes(ts)); 

      put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0])); 
      put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1])); 
      put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2])); 

/*I've checked data passed like this 
{"totalColumns":3,"row":"20170120200927", 
"families":{"TAGVALUE": 
[{"qualifier":"LINEID","vlen":3,"tag[], "timestamp":9223372036854775807}, 
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807}, 
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/ 


//********************* ERROR *******************// 
      htable.put(put); 
      htable.close(); 


     } 

     return Arrays.asList(COLDELIM.split(x)).iterator(); 
    } 
}); 

ERRO Код:

Exception in thread "main" org.apache.spark.SparkException: Job 

aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException 
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154) 
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171) 
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171) 
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

ответ

2

вы не вызывать этот метод public HTable set() throws IOException который возвращает htable экземпляр.

Поскольку htable экземпляр является пустым, и вы пытаетесь сделать операцию на нуле

htable.put() 

вы получаете NPE как ниже

stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException 
+0

Спасибо за вашу любезную помощь. Я решил проблему, над которой я не мог продолжать работать ... –

Смежные вопросы