2010-03-23 3 views
110

Во многих реальных ситуациях, когда вы применяете MapReduce, конечные алгоритмы заканчиваются несколькими шагами MapReduce.Цепочка нескольких заданий MapReduce в Hadoop

i.e Map1, Reduce1, Map2, Reduce2 и т. Д.

Итак, у вас есть результат последнего сокращения, необходимого в качестве входа для следующей карты.

Промежуточные данные - это то, что вы (в общем) не хотите сохранять, как только трубопровод будет успешно завершен. Кроме того, поскольку эти промежуточные данные в целом представляют собой некоторую структуру данных (например, «карта» или «набор»), вы не хотите прикладывать слишком много усилий при написании и чтении этих пар ключ-значение.

Каков рекомендуемый способ сделать это в Hadoop?

Есть ли (простой) пример, показывающий, как правильно обрабатывать эти промежуточные данные, включая очистку после?

+2

с помощью которых рамки MapReduce? – skaffman

+1

Я отредактировал вопрос, чтобы уточнить, что я говорю о Hadoop. –

+0

Я бы порекомендовал камень для свиней: https://github.com/Ganglion/swineherd best, Tobias – Tobias

ответ

52

Я думаю, что это учебник по сети разработчиков Yahoo поможет вам в этом: Chaining Jobs

Вы используете JobClient.runJob(). Выходной путь данных из первого задания становится входным путем для вашего второго задания. Они должны быть переданы в качестве аргументов для ваших заданий с соответствующим кодом для их анализа и настройки параметров для задания.

Я думаю, что вышеупомянутый метод мог бы быть тем, как это сделал ранее старый API-интерфейс, но он все равно должен работать. В новом API-интерфейсе mapreduce будет аналогичный метод, но я не уверен, что это такое.

Что касается удаления промежуточных данных после завершения работы, вы можете сделать это в своем коде. Путь я сделал это перед тем, как использовать что-то вроде:

FileSystem.delete(Path f, boolean recursive); 

Где путь является расположение на HDFS данных. Вы должны убедиться, что вы удаляете только эти данные, как только это не требует другой работы.

+2

Спасибо за ссылку на учебник Yahoo. Целевые задания действительно то, что вы хотите, если они находятся в одном и том же режиме.То, что я искал, - это то, что вам нужно сделать, если вы хотите иметь возможность запускать их отдельно. В упомянутом учебнике я нашел SequenceFileOutputFormat «Записывает двоичные файлы, подходящие для чтения в последующие задания MapReduce» и соответствующий SequenceFileInputFormat, что делает его очень простым. Благодарю. –

7

На самом деле существует несколько способов сделать это. Я сосредоточусь на двух.

1 через Riffle (http://github.com/cwensel/riffle) аннотационную библиотеку для идентификации зависимых вещей и «выполнения» их в зависимости (топологическом) порядке.

Или вы можете использовать каскад (и MapReduceFlow) в каскадировании (http://www.cascading.org/). Будущая версия будет поддерживать аннотации Riffle, но теперь она отлично работает с необработанными заданиями MR JobConf.

Вариант заключается в том, чтобы вручную не выполнять задания MR, но разрабатывайте приложение с использованием Cascading API. Затем JobConf и цепочка заданий обрабатываются внутренне через каскадные планировщики и классы Flow.

Таким образом, вы уделяете время сосредоточению внимания на своей проблеме, а не на механизме управления рабочими местами Hadoop и т. Д. Вы даже можете сложить различные языки сверху (например, clojure или jruby), чтобы еще больше упростить разработку и приложения. http://www.cascading.org/modules.html

17

Существует много способов сделать это.

(1) Каскадные работы

Создание объекта JobConf «job1» для первого задания и установить все параметры с «входом» в качестве inputdirectory и «Темп» в качестве выходного каталога. Выполнить эту работу:

JobClient.run(job1). 

Сразу под ним, создать объект JobConf «job2» для второй работы и установить все параметры с «Темп», как inputdirectory и «выход» в качестве выходного каталога. Выполнить эту работу:

JobClient.run(job2). 

(2) Создание двух объектов JobConf и установить все параметры в них так же, как (1) за исключением того, что вы не используете JobClient.run.

Затем создайте два задания объекты с jobconfs в качестве параметров:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2); 

Использование объекта jobControl, необходимо указать зависимости задания, а затем запустить задания:

JobControl jbcntrl=new JobControl("jbcntrl"); 
jbcntrl.addJob(job1); 
jbcntrl.addJob(job2); 
job2.addDependingJob(job1); 
jbcntrl.run(); 

(3) Если вам нужна структура, похожая на Map + | Уменьшить | Map *, вы можете использовать классы ChainMapper и ChainReducer, которые поставляются с версией Hadoop 0.19 и далее.

Приветствия

1

Хотя есть комплекс серверов на базе двигателей документооборота Hadoop, например, oozie, у меня есть простой библиотеки Java, которая позволяет выполнение нескольких Hadoop рабочих мест в качестве рабочего процесса. Конфигурация заданий и рабочий процесс, определяющие зависимость между заданиями, настраиваются в файле JSON. Все настраивается извне и не требует каких-либо изменений в существующей реализации сокращения карты, чтобы быть частью рабочего процесса.

Детали можно найти здесь. Исходный код и банку доступны в github.

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

1

Я думаю, что oozie помогает последующие рабочие места, чтобы получить входы непосредственно из предыдущего задания. Это позволяет избежать операции ввода-вывода, выполняемой с помощью управления заданиями.

3

Мы можем использовать метод задания Job для определения зависимости между заданием.

В моем сценарии у меня было 3 задания, которые зависели друг от друга. В классе драйвера я использовал приведенный ниже код и работает так, как ожидалось.

public static void main(String[] args) throws Exception { 
     // TODO Auto-generated method stub 

     CCJobExecution ccJobExecution = new CCJobExecution(); 

     Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); 
     Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); 
     Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); 

     System.out.println("****************Started Executing distanceTimeFraudJob ================"); 
     distanceTimeFraudJob.submit(); 
     if(distanceTimeFraudJob.waitForCompletion(true)) 
     { 
      System.out.println("=================Completed DistanceTimeFraudJob================= "); 
      System.out.println("=================Started Executing spendingFraudJob ================"); 
      spendingFraudJob.submit(); 
      if(spendingFraudJob.waitForCompletion(true)) 
      { 
       System.out.println("=================Completed spendingFraudJob================= "); 
       System.out.println("=================Started locationFraudJob================= "); 
       locationFraudJob.submit(); 
       if(locationFraudJob.waitForCompletion(true)) 
       { 
        System.out.println("=================Completed locationFraudJob================="); 
       } 
      } 
     } 
    } 
+0

Ваш ответ о том, как присоединиться к этим заданиям с точки зрения исполнения. Первоначальный вопрос касался лучших данных. Поэтому ваш ответ не имеет отношения к этому конкретному вопросу. –

1

Если вы хотите программно объединить свои задания, вы будете использовать JobControl. Использование довольно просто:

JobControl jobControl = new JobControl(name); 

После этого вы добавляете экземпляры ControlledJob. ControlledJob определяет задание с его зависимостями, таким образом автоматически подключая входы и выходы в соответствии с «цепочкой» заданий.

jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); 

    jobControl.run(); 

начинает цепь. Вы захотите поместить это в поток ответов. Это позволяет проверить состояние вашей цепи whil он работает:

while (!jobControl.allFinished()) { 
     System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); 
     System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); 
     System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); 
     List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList(); 
     System.out.println("Jobs in success state: " + successfulJobList.size()); 
     List<ControlledJob> failedJobList = jobControl.getFailedJobList(); 
     System.out.println("Jobs in failed state: " + failedJobList.size()); 
    } 
5

Я сделал работу цепочки использования с объектами JobConf один за другим. Я взял пример WordCount для цепочки заданий. Одна работа определяет, сколько раз слово повторяется в данном выпуске. Второе задание принимает первый результат работы как входной сигнал и вычисляет общие слова в данном вводе. Ниже приведен код, который необходимо поместить в класс драйвера.

//First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class); 
    job1.setJobName("WordCount"); 

    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    job1.setMapperClass(WordCountMapper.class); 
    job1.setCombinerClass(WordCountReducer.class); 
    job1.setReducerClass(WordCountReducer.class); 

    job1.setInputFormat(TextInputFormat.class); 
    job1.setOutputFormat(TextOutputFormat.class); 

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files 
    FileInputFormat.setInputPaths(job1, new Path("input_data")); 

    //"first_job_output" contains data that how many times a word occurred in the given file 
    //This will be the input to the second job. For second job, input data name should be 
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); 

    JobClient.runJob(job1); 


    //Second Job - Counts total number of words in a given file 

    JobConf job2 = new JobConf(TotalWords.class); 
    job2.setJobName("TotalWords"); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    job2.setMapperClass(TotalWordsMapper.class); 
    job2.setCombinerClass(TotalWordsReducer.class); 
    job2.setReducerClass(TotalWordsReducer.class); 

    job2.setInputFormat(TextInputFormat.class); 
    job2.setOutputFormat(TextOutputFormat.class); 

    //Path name for this job should match first job's output path name 
    FileInputFormat.setInputPaths(job2, new Path("first_job_output")); 

    //This will contain the final output. If you want to send this jobs output 
    //as input to third job, then third jobs input path name should be "second_job_output" 
    //In this way, jobs can be chained, sending output one to other as input and get the 
    //final output 
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); 

    JobClient.runJob(job2); 

Команда для выполнения этих заданий является:

бен/Hadoop банку TotalWords.

Нам нужно указать имя конечной работы для команды. В приведенном выше случае это TotalWords.

4

Вы можете использовать MR-цепь так, как указано в коде.

ОБРАТИТЕ ВНИМАНИЕ: только код драйвера был обеспечен

public class WordCountSorting { 
// here the word keys shall be sorted 
     //let us write the wordcount logic first 

     public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { 
      //THE DRIVER CODE FOR MR CHAIN 
      Configuration conf1=new Configuration(); 
      Job j1=Job.getInstance(conf1); 
      j1.setJarByClass(WordCountSorting.class); 
      j1.setMapperClass(MyMapper.class); 
      j1.setReducerClass(MyReducer.class); 

      j1.setMapOutputKeyClass(Text.class); 
      j1.setMapOutputValueClass(IntWritable.class); 
      j1.setOutputKeyClass(LongWritable.class); 
      j1.setOutputValueClass(Text.class); 
      Path outputPath=new Path("FirstMapper"); 
      FileInputFormat.addInputPath(j1,new Path(args[0])); 
        FileOutputFormat.setOutputPath(j1,outputPath); 
        outputPath.getFileSystem(conf1).delete(outputPath); 
      j1.waitForCompletion(true); 
        Configuration conf2=new Configuration(); 
        Job j2=Job.getInstance(conf2); 
        j2.setJarByClass(WordCountSorting.class); 
        j2.setMapperClass(MyMapper2.class); 
        j2.setNumReduceTasks(0); 
        j2.setOutputKeyClass(Text.class); 
        j2.setOutputValueClass(IntWritable.class); 
        Path outputPath1=new Path(args[1]); 
        FileInputFormat.addInputPath(j2, outputPath); 
        FileOutputFormat.setOutputPath(j2, outputPath1); 
        outputPath1.getFileSystem(conf2).delete(outputPath1, true); 
        System.exit(j2.waitForCompletion(true)?0:1); 
     } 

} 

последовательность является

(JOB1) MAP-> REDUCE-> (JOB2) MAP
Это было сделано чтобы получить ключи отсортированы еще есть больше способов, таких как использование treemap
Тем не менее, я хочу сосредоточить ваше внимание на том, как Вакансии были прикованы !!
Спасибо

2

Нового класса org.apache.hadoop.mapreduce.lib.chain.ChainMapper поможет этому сценарию

+1

Ответ хороший - но вы должны добавить более подробную информацию о том, что он делает, или, по крайней мере, ссылку на ссылку API, чтобы люди могли проголосовать. –

+0

ChainMapper и ChainReducer используются, чтобы иметь 1 или более карт перед сокращением и 0 или более mappers после сокращения, spec. (Mapper +) Уменьшить (Mapper *). Исправьте меня, если я ошибаюсь, но я не думаю, что этот подход обеспечивает последовательную цепочку заданий, как задал ОП. – rahul1210

0

Как вы упомянули в вашем требовании, что вы хотите о/р MRJob1 быть я/p из MRJob2 и т. д., вы можете рассмотреть возможность использования рабочего процесса oozie для этой утилиты. Также вы можете подумать о том, чтобы записать промежуточные данные в HDFS, поскольку он будет использоваться следующим MRJob. И после завершения работы вы можете очистить свои промежуточные данные.

<start to="mr-action1"/> 
<action name="mr-action1"> 
    <!-- action for MRJob1--> 
    <!-- set output path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="mr-action2"> 
    <!-- action for MRJob2--> 
    <!-- set input path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="success"> 
     <!-- action for success--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="fail"> 
     <!-- action for fail--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<end name="end"/> 

Смежные вопросы