2012-04-25 3 views
1

В настоящее время у меня есть два задания hadoop, где второе задание требует вывода первого для добавления в распределенный кеш. в настоящее время я запускаю их вручную, поэтому после завершения первого задания я передаю выходной файл в качестве аргумента во второе задание, а его драйвер добавляет его в кеш.Запуск зависимых заданий hadoop в одном драйвере

Первое задание - это просто простая работа с картой, и я надеялся, что смогу запустить одну команду при выполнении обоих заданий в последовательности.

Может ли кто-нибудь помочь мне с кодом, чтобы получить результат первого задания, помещенного в распределенный кеш, чтобы он мог быть передан во второе задание?

Благодаря

Edit: Это текущий драйвер для работы 1:

public class PlaceDriver { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: PlaceMapper <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "Place Mapper"); 
    job.setJarByClass(PlaceDriver.class); 
    job.setMapperClass(PlaceMapper.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    TextInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

Это драйвер для job2. Выход задания 1, передаваемый на работу 2 в качестве первого аргумента и загружается в кэш

public class LocalityDriver { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 3) { 
     System.err.println("Usage: LocalityDriver <cache> <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "Job Name Here"); 
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration()); 
    job.setNumReduceTasks(1); //TODO: Will change 
    job.setJarByClass(LocalityDriver.class); 
    job.setMapperClass(LocalityMapper.class); 
    job.setCombinerClass(TopReducer.class); 
    job.setReducerClass(TopReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    TextInputFormat.addInputPath(job, new Path(otherArgs[1])); 
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

Вы можете начать писать здесь свой код, который вызывает два рабочих места, и тогда люди могут помочь вам изменить его. – adranale

+0

Хорошо, я добавил его –

ответ

0

Простой ответ будет извлечь код из обоего основных методов двух отдельных методов, например: boolean job1() и boolean job2() и называть их в основном методе после того, как друг с другом, как это:

public static void main(String[] args) throws Exception { 
    if (job1()) { 
     jobs2(); 
    } 
} 

где возвращаемое значением job1 и job2 вызовов является результатом вызова job.waitForCompletion(true)

+0

Как мне загрузить результат работы1 в распределенный кеш job2? –

+0

Как я понял, 'new Path (otherArgs [1])' корректно задан как выход 1-го и 2-го задания. – adranale

+0

Другими словами, вывести первое задание в временную директорию? –

0

Цепочка работы в MapReduce - довольно распространенный сценарий. Вы можете попробовать cascading, программное обеспечение для управления рабочими процессами с открытым исходным кодом MapReduce. И есть некоторые дискуссии о каскадировании на here. Или вы можете проверить похожие обсуждения, как ваши here.

1

Создайте два объекта задания в том же главном. Перед тем, как запустить другой, нужно дождаться завершения.

public class DefaultTest extends Configured implements Tool{ 


    public int run(String[] args) throws Exception { 

     Job job = new Job(); 

     job.setJobName("DefaultTest-blockx15"); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 

     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 

     job.setNumReduceTasks(15); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setJarByClass(DefaultTest.class); 

     job.waitForCompletion(true): 

       job2 = new Job(); 

       // define your second job with the input path defined as the output of the previous job. 


     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
     ToolRunner.run(new DefaultTest(), otherArgs); 
    } 
} 
0

Вы также можете использовать ChainMapper, JobControl и ControlledJob контролировать вашу работу потока

Configuration config = getConf(); 

Job j1 = new Job(config); 
Job j2 = new Job(config); 
Job j3 = new Job(config); 

j1.waitForCompletion(true); 


JobControl jobFlow = new JobControl("j2"); 
ControlledJob cj3 = new ControlledJob(j2, null); 
jobFlow.addJob(cj3); 
jobFlow.addJob(new ControlledJob(j2, Lists.newArrayList(cj3))); 
jobFlow.addJob(new ControlledJob(j3, null)); 
jobFlow.run(); 
Смежные вопросы