2015-08-25 3 views
4

У меня есть файл, содержащий данные, которые необходимо загрузить в таблицу Hive. Я написал пользовательский SerDe (который в основном является модификацией Regex Serde, уже доступной с помощью Hive), чтобы помочь мне загрузить данные.Ошибка при попытке создать таблицу Hive с использованием пользовательского SerDe

Это SerDe, что я написал

package my.hive.customserde; 

public class FIASC2 extends AbstractSerDe { 

    public static final Log LOG = LogFactory.getLog(FIASC2.class.getName()); 

    int colwidths[] = {1, 10, 6, 12, 8, 14, 16, 6, 6, 2, 10, 10, 19, 2, 2, 6, 8, 1}; 
    String outputformat = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s %11$s %12$s %13$s %14$s %15$s " 
     + "%16$s %17$s %18$s"; 


    int datetimecols[] = {5}; 
    int datecols[] = {17}; 
    String cols; 
    int numColumns; 
    int totalcolwidth = 0; 

    List<String> columnNames; 
    List<TypeInfo> columnTypes; 

    ArrayList<String> row; 
    StructObjectInspector rowOI; 

    Object[] outputFields; 
    Text outputRowText; 


@Override 
    public void initialize(Configuration conf, Properties tbl) throws SerDeException { 
     LOG.debug("Initializing SerDe"); 
     // Get column names 
     String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); 
     String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); 
     LOG.debug("Columns : " + columnNameProperty + "Types : " + columnTypeProperty); 

     if(columnNameProperty.length() == 0) { 
      columnNames = new ArrayList<String>(); 
     } 
     else { 
      columnNames = Arrays.asList(columnNameProperty.split(",")); 
     } 

     columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); 

     assert columnNames.size() == columnTypes.size(); 
     assert colwidths.length == columnNames.size(); 

     numColumns = columnNames.size(); 

     for(int i = 0; i < numColumns; i++) { 
      totalcolwidth += i; 
     } 

     List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size()); 

     for (int i = 0; i < numColumns; i++) { 
      columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); 
     } 

     rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); 

     row = new ArrayList<String>(numColumns); 

     for(int i = 0; i < numColumns; i++) { 
      row.add(null); 
     } 

     outputFields = new Object[numColumns]; 
     outputRowText = new Text(); 
} 


@Override 
    public Object deserialize(Writable blob) throws SerDeException { 
     // TODO Auto-generated method stub 
     Text rowText = (Text) blob; 
     int index = 0; 

     if(rowText.toString().length() < totalcolwidth) { 
      return null; 
     } 

     if((rowText.toString().substring(0, 1) == "H") || (rowText.toString().substring(0, 1) == "T")) { 
      return null; 
     } 

     for(int i = 0; i < numColumns; i++) { 
      int len = colwidths[i]; 
      String col = rowText.toString().substring(index, index + len); 
     // Convert the datetime string into the correct format so that it can be uploaded to the hive table 
      if(Arrays.asList(datetimecols).contains(i)) { 
       DateTimeFormatConverter dtc = new DateTimeFormatConverter(); 
       try { 
        col = dtc.convertCurrToNew(col); 
       } catch (ParseException e) { 
        LOG.error("Unable to parse Date Time string : " + col); 
        e.printStackTrace(); 
       } 
      } 
      if(Arrays.asList(datecols).contains(i)) { 
       DateFormatConverter dtc = new DateFormatConverter(); 
       try { 
        col = dtc.convertCurrToNew(col); 
       } catch (ParseException e) { 
        LOG.error("Unable to parse Date String : " + col); 
        e.printStackTrace(); 
       } 
      } 
      row.set(i, col); 
      index += len; 
     } 

     return row; 
    } 


@Override 
    public ObjectInspector getObjectInspector() throws SerDeException { 
     return rowOI; 
    } 


    @Override 
    public SerDeStats getSerDeStats() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

@Override 
    public Class<? extends Writable> getSerializedClass() { 
     return Text.class; 
    } 


    @Override 
    public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { 
     if(outputformat == null) { 
      throw new SerDeException("Cannot write into table because no output format was specified"); 
     } 

     StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; 
     List<? extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs(); 

     if(outputFieldRefs.size() != numColumns) { 
      throw new SerDeException("Output format does not have the same number fields as the number of columns"); 
     } 

     for(int i = 0; i < numColumns; i++) { 
      Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(i)); 
      ObjectInspector fieldOI = outputFieldRefs.get(i).getFieldObjectInspector(); 

      StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; 


      outputFields[i] = fieldStringOI.getPrimitiveJavaObject(field); 
     } 

     String outputRowString = null; 

     try { 
      outputRowString = String.format(outputformat, outputFields); 
     } catch (MissingFormatArgumentException e) { 
      throw new SerDeException("The table contains " + numColumns + "columns but the output format requires more", e); 
     } 

     outputRowText.set(outputRowString); 

     return outputRowText; 
    } 

}

Вы можете быть уверены, что я импортировал каждый класс, который должен быть импортирован.

Когда я пытаюсь создать таблицу, я получаю сообщение об ошибке сказав «Не удалось получить поле из serde: my.hive.customserde.FIASC2»

здесь является StackTrace

2015-08-25 15:57:51,995 ERROR [HiveServer2-Background-Pool: Thread-57]: metadata.Table (Table.java:getCols(608)) - Unable to get field from serde: my.hive.customserde.FIASC2 
java.lang.NullPointerException 
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.getFieldsFromDeserializer(MetaStoreUtils.java:1257) 
    at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605) 
    at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:694) 
    at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4135) 
    at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:306) 
    at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160) 
    at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88) 
    at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653) 
    at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412) 
    at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195) 
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) 
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054) 
    at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154) 
    at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71) 
    at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 
    at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2015-08-25 15:57:51,996 ERROR [HiveServer2-Background-Pool: Thread-57]: exec.DDLTask (DDLTask.java:failed(520)) - org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720) 
    at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4135) 
    at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:306) 
    at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160) 
    at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88) 
    at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653) 
    at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412) 
    at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195) 
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) 
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054) 
    at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154) 
    at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71) 
    at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 
    at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NullPointerException 
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.getFieldsFromDeserializer(MetaStoreUtils.java:1257) 
    at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:695) 
    ... 21 more 

Я понимаю что создание таблицы не удалось. Но кто-нибудь знает, почему я получаю эту ошибку? Я попробовал поиск в Интернете, но не получил большой помощи.

Если это какая-либо помощь, вот скрипт create table, который я использую.

create table if not exists fiasc2(
record_type varchar(1), 
fin_id varchar(16), 
corp_id varchar(8), 
merc_id varchar(16), 
term_id varchar(8), 
tran_time timestamp, 
cashcard_number varchar(16), 
ttc varchar(8), 
tcc varchar(8), 
tran_type varchar(2), 
tran_amount varchar(16), 
deposit_amount varchar(16), 
pan varchar(32), 
account_type varchar(2), 
response_code varchar(2), 
card_balance varchar(8), 
settlement_date date, 
tran_mode varchar(1)) 
row format serde 'my.hive.customserde.FIASC2' 
location '/user/hive/fiasc2_test'; 

ответ

0

Звучит знакомо. Вы возвращаетесьSerDeStats, и это единственное, что я видел, что может быть пустым, и он звонит в колокольчик. Думаю, у меня была такая же проблема с моим JSON SerDe, когда они представили SerdeStats несколько версий улья назад. Попытка:

// add this to the members 
private SerDeStats stats; 
// ... 
public void initialize(Configuration conf, Properties tbl) throws SerDeException { 
.. 
// add this in initialize() 
stats = new SerDeStats(); 

// and of course here 
@Override 
public SerDeStats getSerDeStats() { 
    return stats; 
} 
Смежные вопросы