2014-12-18 2 views
1

я получил эти журналы ошибок на консолиHadoop: java.io.IOException: Передам Удалить или пут

java.io.IOException: Pass a Delete or a Put 
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) 
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) 
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586) 
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156) 
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) 
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) 
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398) 
15/01/06 14:13:34 INFO mapred.JobClient: Job complete: job_local259887539_0001 
15/01/06 14:13:34 INFO mapred.JobClient: Counters: 19 
15/01/06 14:13:34 INFO mapred.JobClient: File Input Format Counters 
15/01/06 14:13:34 INFO mapred.JobClient:  Bytes Read=0 
15/01/06 14:13:34 INFO mapred.JobClient: FileSystemCounters 
15/01/06 14:13:34 INFO mapred.JobClient:  FILE_BYTES_READ=12384691 
15/01/06 14:13:34 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=12567287 
15/01/06 14:13:34 INFO mapred.JobClient: Map-Reduce Framework 
15/01/06 14:13:34 INFO mapred.JobClient:  Reduce input groups=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Map output materialized bytes=8188 
15/01/06 14:13:34 INFO mapred.JobClient:  Combine output records=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Map input records=285 
15/01/06 14:13:34 INFO mapred.JobClient:  Reduce shuffle bytes=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Reduce output records=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Spilled Records=285 
15/01/06 14:13:34 INFO mapred.JobClient:  Map output bytes=7612 
15/01/06 14:13:34 INFO mapred.JobClient:  Total committed heap usage (bytes)=1029046272 
15/01/06 14:13:34 INFO mapred.JobClient:  CPU time spent (ms)=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
15/01/06 14:13:34 INFO mapred.JobClient:  SPLIT_RAW_BYTES=77 
15/01/06 14:13:34 INFO mapred.JobClient:  Map output records=285 
15/01/06 14:13:34 INFO mapred.JobClient:  Combine input records=0 
15/01/06 14:13:34 INFO mapred.JobClient:  Reduce input records=0 

Когда я пытаюсь сделать CopyTable с реализацией Scala на основе http://hbase.apache.org/book/mapreduce.example.html#mapreduce.example.readwrite

Вот например, мой код, в любом случае лучше, чем делать это?

package com.example 

import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.client.HBaseAdmin 
import org.apache.hadoop.hbase.client.HTable 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Get 
import java.io.IOException 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.hbase._ 
import org.apache.hadoop.hbase.client._ 
import org.apache.hadoop.hbase.io._ 
import org.apache.hadoop.hbase.mapreduce._ 
import org.apache.hadoop.io._ 
import org.apache.hadoop.mapreduce._ 
import scala.collection.JavaConversions._ 

case class HString(name: String) { 
     lazy val bytes = name.getBytes 
     override def toString = name 
} 
object HString { 
     import scala.language.implicitConversions 
     implicit def hstring2String(src: HString): String = src.name 
     implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes 
} 

object Families { 
     val stream = HString("stream") 
     val identity = HString("identity") 
} 
object Qualifiers { 
     val title = HString("title") 
     val url = HString("url") 
     val media = HString("media") 
     val media_source = HString("media_source") 
     val content = HString("content") 
     val nolimitid_timestamp = HString("nolimitid.timestamp") 
     val original_id = HString("original_id") 
     val timestamp = HString("timestamp") 
     val date_created = HString("date_created") 
     val count = HString("count") 
} 
object Tables { 
     val rawstream100 = HString("raw_stream_1.0.0") 
     val rawstream = HString("rawstream") 
} 

class tmapper extends TableMapper[ImmutableBytesWritable, Put]{ 
    def map (row: ImmutableBytesWritable, value: Result, context: Context) { 
    val put = new Put(row.get()) 
    for (kv <- value.raw()) { 
     put.add(kv) 
    } 
    context.write(row, put) 
    } 
} 

object Hello { 
    val hbaseMaster = "127.0.0.1:60000" 
    val hbaseZookeper = "127.0.0.1" 
    def main(args: Array[String]): Unit = { 
     val conf = HBaseConfiguration.create() 
    conf.set("hbase.master", hbaseMaster) 
    conf.set("hbase.zookeeper.quorum", hbaseZookeper) 
    val hbaseAdmin = new HBaseAdmin(conf) 

    val job = Job.getInstance(conf, "CopyTable") 
    job.setJarByClass(classOf[Hello]) 
    job.setMapperClass(classOf[tmapper]) 
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) 
    job.setMapOutputValueClass(classOf[Result]) 
    // 
    job.setOutputKeyClass(classOf[ImmutableBytesWritable]) 
    job.setOutputValueClass(classOf[Put]) 

     val scan = new Scan() 
     scan.setCaching(500)   // 1 is the default in Scan, which will be bad for MapReduce jobs 
     scan.setCacheBlocks(false) // don't set to true for MR jobs 

     TableMapReduceUtil.initTableMapperJob(
      Tables.rawstream100.bytes,  // input HBase table name 
      scan,      // Scan instance to control CF and attribute selection 
      classOf[tmapper], // mapper class 
      null,    // mapper output key class 
      null,  // mapper output value class 
      job 
     ) 

     TableMapReduceUtil.initTableReducerJob(
      Tables.rawstream,   // Table name 
      null, // Reducer class 
      job 
     ) 
     val b = job.waitForCompletion(true); 
     if (!b) { 
      throw new IOException("error with job!"); 
     } 
    } 
} 

class Hello {} 

Еще раз спасибо

+0

Если вам не нужен прокомментированный код, пожалуйста, удалите его из вопроса. – Shekhar

+0

aah спасибо за советы – ans4175

ответ

1

Если ваша задача просто скопировать таблицы (не осуществлять MapReduce в HBase через Скале), вы можете использовать CopyTable класс в пакете HBase-сервера, например:

import org.apache.hadoop.hbase.mapreduce.CopyTable 
CopyTable.main(Array("--peer.adr=127.0.0.1:2181:/hbase", "--new.name=rawstream", "raw_stream_1.0.0")) 

Посмотрите на CopyTable documentation для получения дополнительных параметров.

+1

ah спасибо, что я попытался, и он работает как шарм для самостоятельной установки :) Но мне нужно знать, что случилось с моим кодом раньше, кроме того, я хочу узнать, как MapReduce. Вы выяснили, в чем проблема? – ans4175

Смежные вопросы