2015-08-06 2 views
0

Я хочу читать данные с FTP-сервера. Я предоставляю путь к файлу, который находится на FTP-сервере в формате ftp://Username:[email protected]/path. Когда я использую программу уменьшения карты для чтения данных из файла, она отлично работает. Я хочу читать данные из того же файла через каскадную структуру. Я использую Hfs tap каскадной структуры для чтения данных. Он бросает следующее исключениеЧтение данных с FTP-сервера в Hadoop/Cascading

java.io.IOException: Stream closed 
    at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98) 
    at java.io.FilterInputStream.close(Unknown Source) 
    at org.apache.hadoop.util.LineReader.close(LineReader.java:83) 
    at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168) 
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212) 

Ниже приведен код каскадной структуры, где я читаю файлы:

public class FTPWithHadoopDemo { 
    public static void main(String args[]) { 
     Tap source = new Hfs(new TextLine(new Fields("line")), "ftp://user:[email protected]//input1"); 
     Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op", SinkMode.REPLACE); 
     Pipe pipe = new Pipe("First"); 
     pipe = new Each(pipe, new RegexSplitGenerator("\\s+")); 
     pipe = new GroupBy(pipe); 
     Pipe tailpipe = new Every(pipe, new Count()); 
     FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(tailpipe, sink); 
     new HadoopFlowConnector().connect(flowDef).complete(); 
    } 
} 

Я попытался посмотреть в Hadoop Исходный код для того же исключения. Я обнаружил, что в классе MapTask есть один метод runOldMapper, который имеет дело с потоком. И в том же методе есть, наконец, блок, где поток закрывается (in.close()). Когда я удаляю эту строку из блока finally, она работает нормально. Ниже приведен код:

private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, 
      final TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
        throws IOException, InterruptedException, ClassNotFoundException { 
     InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); 

     updateJobWithSplit(job, inputSplit); 
     reporter.setInputSplit(inputSplit); 

     RecordReader<INKEY, INVALUE> in = isSkipping() 
       ? new SkippingRecordReader<INKEY, INVALUE>(inputSplit, umbilical, reporter) 
       : new TrackedRecordReader<INKEY, INVALUE>(inputSplit, job, reporter); 
     job.setBoolean("mapred.skip.on", isSkipping()); 

     int numReduceTasks = conf.getNumReduceTasks(); 
     LOG.info("numReduceTasks: " + numReduceTasks); 
     MapOutputCollector collector = null; 
     if (numReduceTasks > 0) { 
      collector = new MapOutputBuffer(umbilical, job, reporter); 
     } else { 
      collector = new DirectMapOutputCollector(umbilical, job, reporter); 
     } 
     MapRunnable<INKEY, INVALUE, OUTKEY, OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), 
       job); 

     try { 
      runner.run(in, new OldOutputCollector(collector, conf), reporter); 
      collector.flush(); 
     } finally { 
      // close 
      in.close(); // close input 
      collector.close(); 
     } 
    } 

, пожалуйста, помогите мне в решении этой проблемы.

Спасибо, Arshadali

ответ

0

После некоторых усилий я узнал, что Hadoop использует org.apache.hadoop.fs.ftp.FTPFileSystem класс для FTP.
Этот класс не поддерживает поиск, т. Е. Ищет заданное смещение от начала файла. Данные считываются в одном блоке, а затем файловая система ищет следующий блок для чтения. Размер блока по умолчанию составляет 4 КБ для FTPFileSystem. Поскольку поиск не поддерживается, он может читать только данные, не превышающие или равные 4 КБ.

Смежные вопросы