2016-07-07 3 views
0

У меня есть скрипт perl, который запускает две внешние программы, зависящие друг от друга, для серии наборов данных. В настоящее время я просто делаю это для каждого набора данных по одному, запускаю его через первую программу, собираю результаты с помощью qx и использую эти результаты для запуска второй программы. Данные добавляются в выходной файл с результатами второй программы, по одному файлу для каждого набора данных. Я создал простой воспроизводимый пример, который мы надеемся, улавливает мой текущий подход:Perl - параллельное программирование - запуск двух внешних программ

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 
     system($cmd_2);   } 
} 

Поскольку первая программа обычно завершается быстрее, чем второй, я думал, что это, вероятно, можно ускорить всю эту сделку, продолжая работать новый запросов через program_1 одновременно с тем, что program_2 также работает в предыдущем запросе. Было бы здорово ускорить это, так как в настоящее время требуется много часов обработки. Однако я не уверен, как это сделать. Будет ли что-то вроде Parallel :: ForkManager иметь решение? или используя потоки в Perl?

Теперь в моем фактическом коде я делаю некоторую обработку ошибок и устанавливаю тайм-аут для program_2. Для этого я использую fork, exec и $ SIG {ALRM}, но я не знаю, что я делаю с те. Важно, что у меня все еще есть возможность сделать это, иначе программа_2 может застрять или неадекватно сообщить о том, почему она не удалась. Вот как выглядит код с обработкой ошибок. Я не думаю, что это работает так, как должно быть на воспроизводимом примере, но, по крайней мере, вы, надеюсь, увидите, что я пытаюсь сделать. Вот с обработкой ошибок:

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 

     my $childPid; 
     eval { 
      local $SIG{ALRM} = sub { die "Timed out" }; 
      alarm 10; 
      if ($childPid = fork()) { 
       wait(); 
      } else { 
       exec($cmd_2); 
      } 
      alarm 0; 
     }; 
     if ($? != 0) { 
      my $exitCode = $? >> 8; 
      print "Program_2 exited with error code $exitCode. Retry...\n"; 
     } 
     if ([email protected] =~ /Timed out/) { 
      print "\tProgram_2 timed out. Skipping...\n"; 
      kill 2, $childPid; 
      wait; 
     }; 
    } 
} 

Вся помощь приветствуется.

ответ

3

Одно из решений:

use threads; 

use Thread::Queue; # 3.01+ 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my $job1_thread = async { 
     while (my $job = $job1_request_queue->dequeue()) { 
     my $result = job1($job); 
     $job2_request_queue->enqueue($result); 
     } 

     $job2_request_queue->end(); 
    }; 

    my $job2_thread = async { 
     while (my $job = $job2_request_queue->dequeue()) { 
     job2($job); 
     } 
    }; 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for $job1_thread, $job2_thread; 
} 

Можно даже иметь несколько работник или/оба типа.

use threads; 

use Thread::Queue; # 3.01+ 

use constant NUM_JOB1_WORKERS => 1; 
use constant NUM_JOB2_WORKERS => 3; 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my @job1_threads; 
    for (1..NUM_JOB1_WORKERS) { 
     push @job1_threads, async { 
     while (my $job = $job1_request_queue->dequeue()) { 
      my $result = job1($job); 
      $job2_request_queue->enqueue($result); 
     } 
     }; 
    } 

    my @job2_threads; 
    for (1..NUM_JOB2_WORKERS) { 
     push @job2_threads, async { 
     while (my $job = $job2_request_queue->dequeue()) { 
      job2($job); 
     } 
     }; 
    } 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for @job1_threads; 
    $job2_request_queue->end(); 
    $_->join() for @job2_threads; 
} 

Использование IPC::Run вместо qx добавить тайм-аут. Нет необходимости в сигналах.

+0

Привет, Икегами, спасибо за помощь. Можете ли вы объяснить окончание и объединение потоков? Когда я пытаюсь использовать многопользовательский подход, я получаю сообщение об ошибке «Perl exited with active threads», большинство из которых работает и не работает, с некоторыми готовыми и несвязанными. Я могу опубликовать свой последний код в качестве ответа ниже. – Tsaari

+0

Это говорит, что работникам больше не нужно работать раньше, а затем ждет их завершения. В противном случае программа закончится преждевременно. – ikegami

+0

Исправлена ​​ошибка в моем коде. ('@ job1_threads' и' @ job2_threads' не были заполнены) – ikegami

Смежные вопросы