2013-07-30 3 views
3

Я новичок в Hadoop, но это был мой учебный проект за последний месяц.Hadoop 1 входной файл = 1 выходной файл, только для карт

В попытке сохранить это достаточно расплывчатым, чтобы быть полезным для других, позвольте мне выбросить основную цель первого .... Предположим:

  1. У вас есть большой набор данных (очевидно), миллионов основных текстовых файлов ASCII.
    • Каждый файл является «записью».
  2. Записи сохраняются в структуре каталога для идентификации клиента & даты
    • например/Пользователь/hduser/данные/customer1/YYYY-MM-DD,/пользователь/hduser/данные/customer2/YYYY-MM-DD
  3. Вы хотите, чтобы имитировать структуру ввода для вывода структуры
    • например/Пользователь/hduser/выход/customer1/YYYY-MM-DD,/пользователь/hduser/выход/customer2/YYYY-MM-DD

Я посмотрел на несколько потоков:

И многое другое .. Я также читал книгу Хэдоопа Том Уайта. Я с нетерпением пытаюсь это узнать. и я часто обмениваюсь между новым API и старым API, что добавляет к путанице попытки узнать об этом.

Многие указали на MultipleOutputs (или на старые версии api), но я, похоже, не могу произвести свой желаемый результат - например, MultipleOutputs, похоже, не принимает «/» для создания структуры каталогов в write()

Какие шаги необходимо предпринять для создания файла с желаемой структурой вывода? В настоящее время у меня есть WholeFileInputFormat класс, и связанной с ним RecordReader, который имеет (NullWritable K, ByteWritable V) пара (который можно изменить при необходимости)

Моя установка карта:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { 
    private Text filenameKey; 
    private MultipleOutputs<NullWritable, Text> mos; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 
     filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/ 
     mos = new MultipleOutputs(context); 
    } 
} 

Существует также очистки() функция, которая вызывает mos.close() и карта() функция в настоящее время неизвестно (что мне нужна помощь здесь)

является ли это достаточно информации указать новичку в направлении ответа? Мои последующие мысли заключались в создании объекта MultipleOutputs() в каждой задаче map(), каждая из которых имеет новую базовую строку String, но я не уверен, что это эффективный или даже правильный вид действий.

Совет будет оценен, что-либо в программе может измениться на данный момент, за исключением ввода - я просто пытаюсь изучить фреймворк, - но я хотел бы как можно ближе подойти к этому результату (позже на Я, вероятно, посмотрю на объединение записей с большими файлами, но они уже имеют значение 20 МБ на запись, и я хочу убедиться, что он работает до того, как я сделаю невозможным чтение в Notepad.

Редактировать: может ли эта проблема быть решена путем изменения/расширение TextOutputFormat.class? Кажется, у него могут быть некоторые из методов, которые могут работать, но я не уверен, какие методы мне нужно переопределить ...

+0

Я не пробовал это, но книга «Hadoop окончательное руководство» говорит MultipleOutputs из нового API поддерживает использование путь к файлу разделитель (/). Вы говорите, что это не работает? – Rags

+0

@Rags Вероятно, ошибка в моем выполнении MultipleOutputs – Pseudo

ответ

5

Если вы отключите спекулятивное выполнение, там i s ничего не мешает вам вручную создавать структуру папок/файлов в вашем картографе и записывать записи в них (игнорируя выходной контекст/коллекционер)

Например, расширяя фрагмент (способ настройки), вы можете сделать что-то вроде это (который является в основном то, что несколько выходов делает, но при условии, что спекулятивное выполнение выключен, чтобы избежать столкновений файлов, где две карты задачи пытаются написать тот же выходной файл):

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class MultiOutputsMapper extends 
     Mapper<LongWritable, Text, NullWritable, NullWritable> { 
    protected String filenameKey; 
    private RecordWriter<Text, Text> writer; 
    private Text outputValue; 
    private Text outputKey; 

    @Override 
    protected void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     // operate on the input record 
     // ... 

     // write to output file using writer rather than context 
     writer.write(outputKey, outputValue); 
    } 

    @Override 
    protected void setup(Context context) throws IOException, 
      InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 

     // extract parent folder and filename 
     filenameKey = path.getParent().getName() + "/" + path.getName(); 

     // base output folder 
     final Path baseOutputPath = FileOutputFormat.getOutputPath(context); 
     // output file name 
     final Path outputFilePath = new Path(baseOutputPath, filenameKey); 

     // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder 
     TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() { 
      @Override 
      public Path getDefaultWorkFile(TaskAttemptContext context, 
        String extension) throws IOException { 
       return outputFilePath; 
      } 
     }; 

     // create a record writer that will write to the desired output subfolder 
     writer = tof.getRecordWriter(context); 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, 
      InterruptedException { 
     writer.close(context); 
    } 
} 

Некоторые моменты для рассмотрения:

  • Являются ли файлы customerx/yyyy-MM-dd файлами или папками файлов (если это папки с файлами, то вам необходимо внести соответствующие изменения), эта реализация предполагает, что для даты есть один файл, а имя файла - yyyy-MM-dd)
  • Вы возможно, пожелает посмотреть в LazyOutputFormat для предотвращения пустых файлов вывода карты создается
+0

Я использовал ваш скелет и многому научился у него ... и в качестве учебного инструмента ваш код был превосходным ** .. Вы правы, 'yyyy- MM-dd' - это еще одна папка с файлом внутри нее. Немного поиграл, но он работал, один из сложнейших битов был источником ввода, который должен был быть '/ user/hduser/data/*' (со звездой), чтобы отобразить задачу для всех файлов в подкаталогах. Я также реализовал 'NullOutputFormat' (вместо Lazy) в конфигурации заданий и использовал' TextOutputFormat', как вы настраивали (хотя ленивый - удобный формат, чтобы знать!) Большое вам спасибо за указателей Криса! – Pseudo

+0

@Chris, только для уточнения, Не может ли эта проблема также быть решена с помощью MultipleOutputs (новый API)? (используя WholeFileInputFormat (пользовательский класс с isSplittable false и с использованием пути из FileSplit)? – Rags

+0

@Rags, возможно, но у меня есть некоторые смутные воспоминания о попытке сделать то же самое, но с проблемами с разделителем пути в базовом пути вывода. Возможно, это исправлено в более поздних версиях. Конечно, стоит попробовать. –