2013-08-18 14 views
1

У меня есть несколько CSV-файлов, и заголовок всегда является первой строкой в ​​файле. Каков наилучший способ получить эту строку из CSV-файла в виде строки в Pig? Предварительная обработка с помощью sed, awk и т. Д. Не является вариантом.Извлечь первую строку CSV-файла в Pig

Я пробовал загружать файл с помощью обычного PigStorage и Piggy bank CsLLoader, но мне не ясно, как я могу получить эту первую строку, если вообще.

Я открыт для написания UDF, если это то, что требуется.

+2

Что вы имеете в виду под «получить эту строку из файла CSV как строки '? –

ответ

2

Отказ от ответственности: Я не очень хорошо знаком с Java.

Вам понадобится UDF. Я не уверен, что именно вы просите, но этот UDF будет брать серию файлов CSV и превращать их в карты, где ключи являются значениями в верхней части файла. Этого, должно быть, достаточно скелета, чтобы вы могли изменить его на то, что хотите.

Пара тестов, которые я сделал удаленно и локально, указывает, что это сработает.

package myudfs; 
import java.io.IOException; 
import org.apache.pig.LoadFunc; 

import java.util.Map; 
import java.util.HashMap; 
import java.util.ArrayList; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.InputFormat; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

import org.apache.pig.PigException; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; 

public class ExampleCSVLoader extends LoadFunc { 
    protected RecordReader in = null; 
    private String fieldDel = "" + '\t'; 
    private Map<String, String> outputMap = null; 
    private TupleFactory mTupleFactory = TupleFactory.getInstance(); 

    // This stores the fields that are defined in the first line of the file 
    private ArrayList<Object> topfields = null; 

    public ExampleCSVLoader() {} 

    public ExampleCSVLoader(String delimiter) { 
     this(); 
     this.fieldDel = delimiter; 
    } 

    @Override 
    public Tuple getNext() throws IOException { 
     try { 
      boolean notDone = in.nextKeyValue(); 
      if (!notDone) { 
       outputMap = null; 
       topfields = null; 
       return null; 
      } 

      String value = in.getCurrentValue().toString(); 
      String[] values = value.split(fieldDel); 
      Tuple t = mTupleFactory.newTuple(1); 

      ArrayList<Object> tf = new ArrayList<Object>(); 

      int pos = 0; 
      for (int i = 0; i < values.length; i++) { 
       if (topfields == null) { 
        tf.add(values[i]); 
       } else { 
        readField(values[i], pos); 
        pos = pos + 1; 
       } 
      } 
      if (topfields == null) { 
       topfields = tf; 
       t = mTupleFactory.newTuple(); 
      } else { 
       t.set(0, outputMap); 
      } 

      outputMap = null; 
      return t; 
     } catch (InterruptedException e) { 
      int errCode = 6018; 
      String errMsg = "Error while reading input"; 
      throw new ExecException(errMsg, errCode, 
        PigException.REMOTE_ENVIRONMENT, e); 
     } 

    } 

    // Applies foo to the appropriate value in topfields 
    private void readField(String foo, int pos) { 
     if (outputMap == null) { 
      outputMap = new HashMap<String, String>(); 
     } 
     outputMap.put((String) topfields.get(pos), foo); 
    } 

    @Override 
    public InputFormat getInputFormat() { 
     return new TextInputFormat(); 
    } 

    @Override 
    public void prepareToRead(RecordReader reader, PigSplit split) { 
     in = reader; 
    } 

    @Override 
    public void setLocation(String location, Job job) 
      throws IOException { 
     FileInputFormat.setInputPaths(job, location); 
    } 
} 

Пример вывода загрузки каталога с:

csv1.in    csv2.in 
-------   --------- 
A|B|C    D|E|F 
Hello|This|is  PLEASE|WORK|FOO 
FOO|BAR|BING  OR|EVERYTHING|WILL 
BANG|BOSH   BE|FOR|NAUGHT 

Производит этот вывод:

A: {M: map[]} 
() 
([D#PLEASE,E#WORK,F#FOO]) 
([D#OR,E#EVERYTHING,F#WILL]) 
([D#BE,E#FOR,F#NAUGHT]) 
() 
([A#Hello,B#This,C#is]) 
([A#FOO,B#BAR,C#BING]) 
([A#BANG,B#BOSH]) 

() В s верхние строки файла. getNext() требует, чтобы мы что-то возвращали, иначе файл перестанет обрабатываться. Поэтому они возвращают нулевую схему.