2015-09-16 3 views
0

Я написал код mapreduce для синтаксического анализа XML как CSV. Но я не могу найти выход в моем выходном каталоге после запуска задания. Я не уверен, что файл не читается или не написан. Я новичок в Hadoop mapreduce.Анализ XML в Hadoop mapreduce

Помогите с этим?

Этот мой весь код.

public class XmlParser11 
{ 
     public static String outvalue; 
     public static class XmlInputFormat1 extends TextInputFormat { 
     public static final String START_TAG_KEY = "xmlinput.start"; 
     public static final String END_TAG_KEY = "xmlinput.end"; 

     public RecordReader<LongWritable, Text> createRecordReader(
       InputSplit split, TaskAttemptContext context) { 
      return new XmlRecordReader(); 
     } 
     public static class XmlRecordReader extends 
       RecordReader<LongWritable, Text> { 
      private byte[] startTag; 
      private byte[] endTag; 
      private long start; 
      private long end; 
      private FSDataInputStream fsin; 
      private DataOutputBuffer buffer = new DataOutputBuffer(); 

      private LongWritable key = new LongWritable(); 
      private Text value = new Text(); 
       @Override 
      public void initialize(InputSplit split, TaskAttemptContext context) 
        throws IOException, InterruptedException { 
        System.out.println("B"); 
       Configuration conf = context.getConfiguration(); 
       startTag = conf.get(START_TAG_KEY).getBytes("utf-8"); 
       endTag = conf.get(END_TAG_KEY).getBytes("utf-8"); 
       FileSplit fileSplit = (FileSplit) split; 

       // open the file and seek to the start of the split 
       start = fileSplit.getStart(); 
       end = start + fileSplit.getLength(); 
       Path file = fileSplit.getPath(); 
       FileSystem fs = file.getFileSystem(conf); 
       fsin = fs.open(fileSplit.getPath()); 
       fsin.seek(start); 

      } 
     @Override 
      public boolean nextKeyValue() throws IOException, 
        InterruptedException { 
      System.out.println("C"); 
       if (fsin.getPos() < end) { 
        if (readUntilMatch(startTag, false)) { 
         try { 
          buffer.write(startTag); 
          if (readUntilMatch(endTag, true)) { 
           key.set(fsin.getPos()); 
           value.set(buffer.getData(), 0, 
             buffer.getLength()); 
           return true; 
          } 
         } finally { 
          buffer.reset(); 
         } 
        } 
       } 
       return false; 
      } 
     @Override 
      public LongWritable getCurrentKey() throws IOException, 
        InterruptedException { 
       return key; 
      } 

     @Override 
      public Text getCurrentValue() throws IOException, 
        InterruptedException { 

       return value; 
      } 
     @Override 
      public void close() throws IOException { 
       fsin.close(); 
      } 
     @Override 
      public float getProgress() throws IOException { 

       return (fsin.getPos() - start)/(float) (end - start); 
      } 

      private boolean readUntilMatch(byte[] match, boolean withinBlock) 
        throws IOException { 
       int i = 0; 

       while (true) { 
        int b = fsin.read(); 
        // end of file: 
        if (b == -1) 
         return false; 
        // save to buffer: 
        if (withinBlock) 
         buffer.write(b); 
        // check if we're matching: 
        if (b == match[i]) { 
         i++; 
         if (i >= match.length) 
          return true; 
        } else 
         i = 0; 
        // see if we've passed the stop point: 
        if (!withinBlock && i == 0 && fsin.getPos() >= end) 
         return false; 
       } 
      } 
     } 
    } 


     public static class Map extends Mapper<Text, Text, 
     Text, Text> { 
      @SuppressWarnings("unchecked") 
      @Override 
      protected void map(Text key, Text value, 
        @SuppressWarnings("rawtypes") Mapper.Context context) 
          throws 
          IOException, InterruptedException { 

       String document = value.toString(); 
       System.out.println("‘" + document + "‘"); 

       XMLInputFactory xmlif = XMLInputFactory.newInstance(); 
       XMLStreamReader xmlr; 

      try { 
       xmlr = xmlif.createXMLStreamReader(new FileReader(document)); 
       while(xmlr.hasNext()) 
       { 
        printEvent(xmlr); 
        xmlr.next(); 
       } 
        xmlr.close(); 
        context.write(null,new Text (outvalue)); 
      } catch (XMLStreamException e) { 

       e.printStackTrace(); 
      } 
      } 
        private void printEvent(XMLStreamReader xmlr) { 

         switch (xmlr.getEventType()) { 

         case XMLStreamConstants.START_ELEMENT: 
          print(xmlr); 
          break; 

         case XMLStreamConstants.CHARACTERS: 
          int start = xmlr.getTextStart(); 
          int length = xmlr.getTextLength(); 
          System.out.print(new String(xmlr.getTextCharacters(), 
             start, 
             length)); 
          break; 
         } 
        } 
        private String print(XMLStreamReader xmlr){ 
         if(xmlr.hasName()){ 
          for (int i=0; i < xmlr.getAttributeCount(); i++) { 
           String localName = xmlr.getLocalName(); 
           if (localName != null); 
           String attName = xmlr.getAttributeLocalName(i); 
           String value = xmlr.getAttributeValue(i); 
           System.out.print(","); 
           String outvalue = localName +":"+ attName +"-"+value; 
           System.out.print(outvalue); 
          } 
         } return outvalue; 
         } 

    } 
     public static void main(String[] args) throws Exception 
     { 
       Configuration conf = new Configuration(); 

       conf.set("xmlinput.start", "<FICHER>"); 
       conf.set("xmlinput.end", "</FICHER>"); 
       Job job = new Job(conf); 
       job.setJarByClass(XmlParser11.class); 
       job.setOutputKeyClass(Text.class); 
       job.setOutputValueClass(Text.class); 

       job.setMapperClass(XmlParser11.Map.class); 
       job.setNumReduceTasks(0); 

       job.setInputFormatClass(XmlInputFormat1.class); 
       job.setOutputFormatClass(TextOutputFormat.class); 
       FileInputFormat.addInputPath(job, new Path(args[0])); 
       FileOutputFormat.setOutputPath(job, new Path(args[1])); 

       job.waitForCompletion(true); 
     } 

Вот из положить замазку

 
File System Counters 
     FILE: Number of bytes read=0 strong text>     
     FILE: Number of bytes written=120678 
     FILE: Number of read operations=0 
     FILE: Number of large read operations=0 
     FILE: Number of write operations=0 
     HDFS: Number of bytes read=1762671 
     HDFS: Number of bytes written=0 
     HDFS: Number of read operations=5 
     HDFS: Number of large read operations=0 
     HDFS: Number of write operations=2 
Job Counters 
     Launched map tasks=1 
     Rack-local map tasks=1 
     Total time spent by all maps in occupied slots (ms)=15960 
     Total time spent by all reduces in occupied slots (ms)=0 
     Total time spent by all map tasks (ms)=3990 
     Total vcore-seconds taken by all map tasks=3990 
     Total megabyte-seconds taken by all map tasks=16343040 
Map-Reduce Framework 
     Map input records=0 
     Map output records=0 
     Input split bytes=124 
     Spilled Records=0 
     Failed Shuffles=0 
     Merged Map outputs=0 
     GC time elapsed (ms)=0 
     CPU time spent (ms)=1390 
     Physical memory (bytes) snapshot=513449984 
     Virtual memory (bytes) snapshot=4122763264 
     Total committed heap usage (bytes)=2058354688 
File Input Format Counters 
     Bytes Read=1762547 
File Output Format Counters 
     Bytes Written=0 

ответ

0

я думаю, что проблема заключается в тег главной страницы.

conf.set("xmlinput.start", "<FICHER");` 
conf.set("xmlinput.end", "</FICHER>"); 

надеюсь, что это вам поможет.