2014-02-03 5 views
0

Я пытаюсь запустить программу WordCount MapReduce для чтения и подсчета данных, хранящихся в таблице Cassandra (Column Family), но когда я скомпилирую свою программу, я получил ту же ошибку, что и повторяющиеся времена. Ниже мой исходный код и ошибка, которые я получил. Может ли кто-нибудь помочь мне решить эту проблему? Заранее спасибо.Ошибка компиляции MapReduce-Cassandra: ConfigHelper не найден

import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.util.*; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

import org.apache.cassandra.db.IColumn; 
import org.apache.cassandra.hadoop.*; 
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; 
import org.apache.cassandra.hadoop.ConfigHelper; 
import org.apache.cassandra.thrift.*; 
import org.apache.cassandra.utils.ByteBufferUtil; 

/** 
* This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1". 
* 
* Output is written to a text file. 
*/ 
public class WordCountCounters extends Configured implements Tool 
{ 
    private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class); 

    static final String COUNTER_COLUMN_FAMILY = "input_words"; 
    private static final String OUTPUT_PATH_PREFIX = "/Users/Deepu/Documents/dse-3.2.4/dse-data/word_count_counters"; 


    public static void main(String[] args) throws Exception 
    { 
    // Let ToolRunner handle generic command-line options 
    ToolRunner.run(new Configuration(), new WordCountCounters(), args); 
    System.exit(0); 
    } 

public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable> 
{ 
    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException 
    { 
     long sum = 0; 
     for (IColumn column : columns.values()) 
     { 
      logger.debug("read " + key + ":" + column.name() + " from " + context.getInputSplit()); 
      sum += ByteBufferUtil.toLong(column.value()); 
     } 
     context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum)); 
    } 
} 

public int run(String[] args) throws Exception 
{ 
    Job job = new Job(getConf(), "wordcountcounters"); 
    job.setJarByClass(WordCountCounters.class); 
    job.setMapperClass(SumMapper.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(LongWritable.class); 
    FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); 


    job.setInputFormatClass(ColumnFamilyInputFormat.class); 


    ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); 
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); 
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); 
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY); 
    SlicePredicate predicate = new SlicePredicate().setSlice_range(
                    new SliceRange(). 
                    setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER). 
                    setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER). 
                    setCount(100)); 
    ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); 

    job.waitForCompletion(true); 
    return 0; 
} 
} 

Compiation Ошибки:

compilation error

ответ

0

Потому что вы закомментирована этих двух линий возможно:

 
//import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; 
//import org.apache.cassandra.hadoop.ConfigHelper; 
+0

Я пробовал уже, но не использовать. – Anudeep

Смежные вопросы