2013-02-17 2 views
8

Я запускаю работу hadoop во многих входных файлах. Но если один из файлов поврежден, вся работа завершается неудачно.Как предотвратить работу hadoop с ошибкой на поврежденном входном файле

Как я могу заставить работу игнорировать поврежденный файл? возможно написать для меня некоторый счетчика/журнал ошибок, но не преминула всю работу

ответ

6

Это зависит от того, где вашей работы заключается в отсутствии - если линия повреждена, и где-то в вашем методе карты генерируется исключение, то вы должны просто быть в состоянии обернуть тело вашего метода карты с TRY/поймать и просто зарегистрировать ошибку:

protected void map(LongWritable key, Text value, Context context) { 
    try { 
    // parse value to a long 
    int val = Integer.parseInt(value.toString()); 

    // do something with key and val.. 
    } catch (NumberFormatException nfe) { 
    // log error and continue 
    } 
} 

Но если ошибка брошена RecordReader вашего InputFormat, то вы должны будете изменить картостроитель run(..) метода - кто выполняет по умолчанию следующее:

public void run(Context context) { 
    setup(context); 
    while (context.nextKeyValue()) { 
    map(context.getCurrentKey(), context.getCurrentValue(), context); 
    } 
    cleanup(context); 
} 

Таким образом, вы можете исправить это, чтобы попытаться поймать исключение по вызову context.nextKeyValue(), но вы должны быть осторожны, просто игнорируя любые ошибки, вызванные читателем. Например, IOExeption не может быть «пропускаемым», просто игнорируя ошибку ,

Если вы написали свой собственный InputFormat/RecordReader, и у вас есть конкретные исключения, обозначающие сбой записи, но позволит вам пропустить и продолжить разбор, то что-то, как это будет, вероятно, работать:

public void run(Context context) { 
    setup(context); 
    while (true) { 
    try { 
     if (!context.nextKeyValue()) { 
     break; 
     } else { 
     map(context.getCurrentKey(), context.getCurrentValue(), context); 
     } 
    } catch (SkippableRecordException sre) { 
     // log error 
    } 

    } 
    cleanup(context); 
} 

Но просто для повторного использования - ваш RecordReader должен иметь возможность восстанавливаться при ошибке, иначе приведенный выше код может отправить вас в бесконечный цикл.

Для вашего конкретного случая - если вы просто хотите, чтобы игнорировать файл при первой неудачи, то вы можете обновить метод запуска к чему-то гораздо проще:

public void run(Context context) { 
    setup(context); 
    try { 
    while (context.nextKeyValue()) { 
     map(context.getCurrentKey(), context.getCurrentValue(), context); 
    } 
    cleanup(context); 
    } catch (Exception e) { 
    // log error 
    } 
} 

Некоторые заключительные слова предупреждения:

  • Вам необходимо убедиться, что это не ваш код-карп, который вызывает исключение, в противном случае вы будете игнорировать файлы по неправильной причине.
  • GZip сжатые файлы, которые не сжаты GZip, фактически будут l в инициализации устройства чтения с записью - поэтому выше не будет улавливать этот тип или ошибку (вам нужно будет написать свою собственную программу чтения записей). Это верно для любой ошибки файла, выбрасываемая при создании чтения звукозаписывающего
+0

Он бросает мне: cascading.tuple.TupleException: невозможно прочитать из идентификатора входа: HDFS: //master/input/2013_1_17/file.close.uploaded.gz \t в cascading.tuple.TupleEntrySchemeIterator.hasNext (TupleEntrySchemeIterator .java: 127) Вызванный: java.io.EOFException: Неожиданный конец входного потока \t на org.apache.hadoop.io.compress.DecompressorStream.getCompressedData (DecompressorStream.java:99) – Julias

+0

Я также не для gunzip этого файла на linux – Julias

+0

Звучит как файл GZip, который не был закрыт должным образом. Вы можете читать строки за строкой, пока не достигнете ошибки, и просто проигнорируйте ошибку (у вас будет некоторая потеря данных). Вы также должны выяснить, почему файл GZip был неправильно закрыт. –

2

Это то, что Failure Ловушка используется для в каскадном:

Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.

Это будет существенно препятствовать вашей работе продолжится, и пусть вас проверить поврежденные файлы позже

Если вы немного знакомы с каскадными в выписке определения потока:

new FlowDef().addTrap(String branchName, Tap trap); 

Failure Traps

0

Существует также другой возможный способ. Вы можете использовать опцию конфигурации mapred.max.map.failures.percent.Конечно, этот способ решения этой проблемы может также скрыть некоторые другие проблемы, возникающие во время фазы карты.

Смежные вопросы