2

Я использую следующий код для записи HbaseSaveAsHadoopDataset никогда не закрывает подключение к Zookeeper

jsonDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { 

     @Override 
     public Void call(JavaRDD<String> rdd) throws Exception { 

      DataFrame jsonFrame = sqlContext.jsonRDD(rdd); 
      DataFrame selecteFieldFrame = jsonFrame.select("id_str","created_at","text"); 

      Configuration config = HBaseConfiguration.create(); 
      config.set("hbase.zookeeper.quorum", "d-9543"); 
      config.set("zookeeper.znode.parent","/hbase-unsecure"); 
      config.set("hbase.zookeeper.property.clientPort", "2181"); 
      final JobConf jobConfig=new JobConf(config,SveAsHadoopDataSetExample.class); 

      jobConfig.setOutputFormat(TableOutputFormat.class); 
      jobConfig.set(TableOutputFormat.OUTPUT_TABLE,"tableName"); 
      selecteFieldFrame.javaRDD().mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() { 

       @Override 
       public Tuple2<ImmutableBytesWritable, Put> call(Row row) throws Exception { 
        // TODO Auto-generated method stub 
        return convertToPut(row); 
       } 
      }).saveAsHadoopDataset(jobConfig); 


      return null; 
     } 
    }); 

Но когда я вижу zkDump в зоопарка соединений продолжает увеличиваться

любые предложения/указатели будут носить большая помощь!

ответ

4

У меня такая же проблема, это HBase ошибка, я могу это исправить:

изменение org.apache.hadoop.hbase.mapred.TableOutputFormat к org.apache.hadoop.hbase.mapreduce. TableOutputFormat, и использовать org.apache.hadoop.mapreduce.Job, не org.apache.hadoop.mapred.JobConf

это образец:

import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 

val conf = HBaseConfiguration.create() 
conf.set("hbase.zookeeper.quorum", zk_hosts) 
conf.set("hbase.zookeeper.property.clientPort", zk_port) 

conf.set(TableOutputFormat.OUTPUT_TABLE, "TABLE_NAME") 
val job = Job.getInstance(conf) 
job.setOutputFormatClass(classOf[TableOutputFormat[String]]) 

formatedLines.map{ 
    case (a,b, c) => { 
    val row = Bytes.toBytes(a) 

    val put = new Put(row) 
    put.setDurability(Durability.SKIP_WAL) 

    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("node"), Bytes.toBytes(b)) 
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("topic"), Bytes.toBytes(c)) 

    (new ImmutableBytesWritable(row), put) 
    } 
}.saveAsNewAPIHadoopDataset(job.getConfiguration) 

это может вам помочь!

https://github.com/hortonworks-spark/shc/pull/20/commits/2074067c42c5a454fa4cdeec18c462b5367f23b9

+2

Хотя это теоретически может ответить на вопрос, [было бы предпочтительнее] (// meta.stackoverflow.com/q/8259), чтобы включить основные части ответа здесь, и укажите ссылку для справки. –

+0

Да, спасибо за предложение – leocook

+0

Спасибо, человек! Тем не менее большинство руководств имеют старый способ, который имеет ошибку. Я ожидал stg. job.close(). Моя проблема была такой же. – zorkaya

Смежные вопросы