2015-09-10 2 views
0

У меня есть приложение MapReduce, который обрабатывается данные из HDFS и хранит выходные данные в HDFSкак хранить обработанные данные из HDFS с использованием MapReduce в MongoDB в качестве выходного сигнала

но теперь мне нужно хранить выходные данные в MongoDB InstEd из храня его в HDFS

Может ли кто-нибудь дать мне знать, как это сделать?

Спасибо

MAPPER КЛАСС

package com.mapReduce; 

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class FMapper extends Mapper<LongWritable, Text, Text, Text> { 
    private String pART; 
    private String actual; 
    private String fdate; 
    public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { 
     String tempString = ivalue.toString(); 
     String[] data = tempString.split(","); 
     pART=data[1]; 
     try{ 
      fdate=convertyymmdd(data[0]); 
      /**IF ACTUAL IS LAST HEADER 
      * actual=data[2]; 
      * */ 
      actual=data[data.length-1]; 
      context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data))); 
     }catch(ArrayIndexOutOfBoundsException ae){ 
      System.err.println(ae.getMessage()); 
     } 

    } 


    public static String convertyymmdd(String date){ 

     String dateInString=null; 
     String data[] =date.split("/"); 
     String month=data[0]; 
     String day=data[1]; 
     String year=data[2]; 
     dateInString =year+"/"+month+"/"+day; 
     System.out.println(dateInString); 
     return dateInString; 
    } 

    public static String dynamicVariables(String[] data){ 
     StringBuilder str=new StringBuilder(); 
     boolean isfirst=true; 
    /** IF ACTUAL IS LAST HEADER 
    * for(int i=3;i<data.length;i++){ */ 
     for(int i=2;i<data.length-1;i++){ 

      if(isfirst){ 
       str.append(data[i]); 
       isfirst=false; 
      } 
      else 
      str.append(","+data[i]); 
     } 
     return str.toString(); 
     } 

} 

REDUCER КЛАСС

package com.mapReduce; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.Comparator; 
import java.util.List; 

import javax.faces.bean.ApplicationScoped; 
import javax.faces.bean.ManagedBean; 
import javax.faces.bean.ManagedProperty; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

import com.ihub.bo.ForcastBO; 
import com.ihub.service.ForcastService; 
public class FReducer extends Reducer<Text, Text, Text, Text> { 
    private String pART; 
    private List<ForcastBO> list = null; 
    private List<List<String>> listOfList = null; 
    private List<String> vals = null; 
    private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>(); 

    @Override 
    public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
     pART = _key.toString(); 
     // process values 
     for (Text val : values) { 
      String tempString = val.toString(); 
      String[] data = tempString.split(","); 
      ForcastBO fb=new ForcastBO(); 
      fb.setPart(pART); 
      fb.setDate(data[0]); 
      fb.setActual(data[1]); 
      fb.setW0(data[2]); 
      fb.setW1(data[3]); 
      fb.setW2(data[4]); 
      fb.setW3(data[5]); 
      fb.setW4(data[6]); 
      fb.setW5(data[7]); 
      fb.setW6(data[8]); 
      fb.setW7(data[9]); 
      try { 
       list.add(fb); 
      } catch (Exception ae) { 
       System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage()); 
      } 
     } 
    } 

    @Override 
    public void run(Context context) throws IOException, InterruptedException { 
     setup(context); 
     try { 
      while (context.nextKey()) { 

     listOfList = new ArrayList<List<String>>(); 
     list=new ArrayList<ForcastBO>(); 
      reduce(context.getCurrentKey(), context.getValues(), context); 
      files_WE(listOfList, list, context); 

      } 

      }finally { 
       cleanup(context); 
      } 
    } 


    public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) { 

     Collections.sort(list); 

      try { 
       setData(listOfList, list); 

       Collections.sort(listOfList, new Comparator<List<String>>() { 
        @Override 
        public int compare(List<String> o1, List<String> o2) { 
         return o1.get(0).compareTo(o2.get(0)); 
        } 
       }); 

       for (int i = listOfList.size() - 1; i > -1; i--) { 
        List<String> list1 = listOfList.get(i); 
        int k = 1; 
        for (int j = 3; j < list1.size(); j++) { 
         try { 
          list1.set(j, listOfList.get(i - k).get(j)); 
         } catch (Exception ex) { 
          list1.set(j, null); 
         } 
         k++; 
        } 

       } 
      } catch (Exception e) { 
       //e.getLocalizedMessage(); 
      } 

      for(List<String> ls:listOfList){ 
       System.out.println(ls.get(0)); 
       ForcastBO forcastBO=new ForcastBO(); 
       try{ 
        forcastBO.setPart(ls.get(0)); 
        forcastBO.setDate(ls.get(1)); 
        forcastBO.setActual(ls.get(2)); 
        forcastBO.setW0(ls.get(3)); 
        forcastBO.setW1(ls.get(4)); 
        forcastBO.setW2(ls.get(5)); 
        forcastBO.setW3(ls.get(6)); 
        forcastBO.setW4(ls.get(7)); 
        forcastBO.setW5(ls.get(8)); 
        forcastBO.setW6(ls.get(9)); 
        forcastBO.setW7(ls.get(10)); 
        forcastBos.add(forcastBO); 
        }catch(Exception e){ 
         forcastBos.add(forcastBO); 
        } 
       try{ 
        System.out.println(forcastBO); 
        //service.setForcastBOs(forcastBos); 
      }catch(Exception e){ 
       System.out.println("FB::::"+e.getStackTrace()); 
      } 
      } 
    } 





     public void setData(List<List<String>> listOfList, List<ForcastBO> list) { 
      List<List<String>> temListOfList=new ArrayList<List<String>>(); 
      for (ForcastBO str : list) { 
       vals = new ArrayList<String>(); 
       vals.add(str.getPart()); 
       vals.add(str.getDate()); 
       vals.add(str.getActual()); 
       vals.add(str.getW0()); 
       vals.add(str.getW1()); 
       vals.add(str.getW2()); 
       vals.add(str.getW3()); 
       vals.add(str.getW4()); 
       vals.add(str.getW5()); 
       vals.add(str.getW6()); 
       vals.add(str.getW7()); 
       temListOfList.add(vals); 
      } 


      Collections.sort(temListOfList, new Comparator<List<String>>() { 
       @Override 
       public int compare(List<String> o1, List<String> o2) { 
        return o1.get(1).compareTo(o2.get(1)); 
       } 
      }); 

      for(List<String> ls:temListOfList){ 
       System.out.println(ls); 
       listOfList.add(ls); 
       } 
     } 

     public static List<ForcastBO> getForcastBos() { 
      return forcastBos; 
     } 



    } 

ВОДИТЕЛЯ КЛАСС

package com.mapReduce; 

import java.net.URI; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class MRDriver { 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "JobName"); 
     job.setJarByClass(MRDriver.class); 
     // TODO: specify a mapper 
     job.setMapperClass(FMapper.class); 
     // TODO: specify a reducer 
     job.setReducerClass(FReducer.class); 

     // TODO: specify output types 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     // TODO: delete temp file 
     FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"), 
       conf); 
     Path workingDir=hdfs.getWorkingDirectory(); 

     Path newFolderPath= new Path("/sd1"); 
     newFolderPath=Path.mergePaths(workingDir, newFolderPath); 
     if(hdfs.exists(newFolderPath)) 

     { 
      hdfs.delete(newFolderPath); //Delete existing Directory 

     } 
     // TODO: specify input and output DIRECTORIES (not files) 

     FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData")); 
     FileOutputFormat.setOutputPath(job, newFolderPath); 

     if (!job.waitForCompletion(true)) 
      return; 
    } 
} 
+0

FIrst вам нужен код для «чтения» из HDFS, а затем вам понадобится драйвер MongoDB и введите код «write» в MongoDB или просто выйдите непосредственно в MongoDB с вашего «редуктора» или на заключительный этап, если требуется. В основном получить драйвер для вашего языка (hadoop поддерживает пару в разных режимах, но, возможно, вы имеете в виду Java), затем подключайтесь и пишите. Сначала изучите драйвер. –

+0

Какой формат обрабатываемых данных? Вы всегда можете вызвать клиента MongoDB в редукторе и навалом записать данные в части очистки (например). Пожалуйста, предоставьте более подробную информацию, если вы ожидаете от нас помощи. – void

+0

обработанные данные в LIST formate –

ответ

0

В основном то, что вам нужно, это изменить «класс формат вывода », И у вас есть несколько способов: там

  1. Использование MongoDB Connector для Hadoop: http://docs.mongodb.org/ecosystem/tools/hadoop/?_ga=1.111209414.370990604.1441913822
  2. Реализовать свой собственный OUTPUTFORMAT: https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/OutputFormat.html (вместо того, чтобы использовать FileOutputFormat).
  3. Выполнить MongoDB запросы внутри редуктора вместо того, чтобы писать в контексте MapReduce (не Ницца, вы можете закончить с пустыми outut файлов в HDFS в зависимости от OUTPUTFORMAT указанного в драйвере)

В моих вариантах мнения 1 - лучший вариант, но я не использовал разъем MongoDB, чтобы сказать, достаточно ли он стабилен и функциональен. Вариант 2 требует, чтобы вы действительно понимали, как работает underoop underoop, чтобы избежать конца с множеством открытых подключений и проблем с транзакциями и попытками выполнения задач adoop.

+0

спасибо Rojo за ценную информацию ...... это действительно помогает мне lot –

+0

Rojo Знаете ли вы, как хранить выходные данные в объекте, а не хранить его в файловой системе –

+0

Не могли бы вы рассказать о своем вопросе? И если это другой вопрос, чем оригинал, не могли бы вы создать новый вопрос, чтобы позволить другим с той же проблемой найти его проще? Я постараюсь помочь вам, если я знаю ответ – RojoSam

Смежные вопросы