2013-12-15 5 views
1

Я пытаюсь написать сценарий, который будет загружать не более N файлов одновременно через HTTP.Загрузите M файлов с N одновременными асинхронными HTTP-клиентами, где M является большим и N настраивается

Раньше я использовал AnyEvent::Worker::Pool для управления пулом задач БЛОКИРОВКИ. Я также использовал AnyEvent::HTTP в сочетании с AnyEvent->condvar для управления загрузками NON BLOCKING по отдельности.

Я думал, что должно быть довольно просто объединить два подхода, чтобы AnyEvent->condvar запустил AnyEvent :: HTTP :: http_get BLOCKING с точки зрения AnyEvent::Worker::Pool.

Однако, я получаю некоторые ошибки, которые я не понимаю, по-видимому, из-за деталей реализации AnyEvent::Worker. Вот действительно урезанные варианты сценария, который демонстрирует проблему:

use EV; 
use AnyEvent 5; 
use AnyEvent::Worker::Pool; 
use AnyEvent::HTTP; 
use 5.10.0; 
use strict; 

my $pool_size = 2; 
my $num_jobs = 7; 

# Create a pool of $pool_size workers 
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub { 
    my ($job) = @_; 
    eval { 
    my $cv = AnyEvent->condvar; 
    print "worker starting download [$job] ...\n"; 
    http_get 'http://download.thinkbroadband.com/5MB.zip', sub { 
     my ($data, $headers) = @_; 
     if ($headers->{Status} =~ /^2/) { 
     print "download [$job] succeeded.\n"; 
     } else { 
     print "download [$job] failed.\n"; 
     } 
     $cv->send; # notification of download complete/exit. 
    }; 

    $cv->recv; # wait for download to complete/exit before returning to pool 
    }; if ([email protected]) { 
    print "worker payload error: [email protected]\n"; 
    } 
    return 1; 
}); 

# dispatch the full list of downloads 
my ($need,$done) = ($num_jobs, 0); 
for my $job (0 .. ($need - 1)) { 
    print "dispatching job $job...\n"; 
    $workers->do($job, sub { 
    print "worker [$job] payload threw exception: [email protected]\n" if [email protected]; 
    print "worker [$job] payload completed successfully!\n" unless [email protected]; 
    EV::unloop if ++$done == $need; 
    }); 
} 

EV::loop; # wait here for all downloads to complete 
print "We're done!\n"; # some useful code to follow here... 

Демо выход заключается в следующем:

[email protected]:~$ ./test.pl 
dispatching job 0... 
dispatching job 1... 
dispatching job 2... 
dispatching job 3... 
dispatching job 4... 
dispatching job 5... 
dispatching job 6... 
worker starting download [0] ... 
worker starting download [1] ... 
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46 
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46 
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46 
worker [6] payload threw exception: no worker connection 
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60 

^C 
[email protected]:~$ 
[email protected]:~$ 
[email protected]:~$ download [1] failed. 
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139. 
    ...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145. 

Почему AnyEvent::HTTP?

В моем реальном сценарии я использую еще много функций AnyEvent::HTTP; в частности, я комбинирую обратный вызов on_body с Term::StatusBar, чтобы показать индикатор выполнения для конечного пользователя скрипта; Кроме того, я стратегически «приостанавливаю» в обратном вызове on_body, так что я поддерживаю скорость передачи, равную или меньшую, чем скорость, предварительно определенная конечным пользователем.

Пожалуйста, не стесняйтесь предложить альтернативу с теми функциями (или простой способ взломать их!)

Почему AnyEvent::Worker::Pool?

Я уже был знаком с этим. Альтернативные предложения приветствуются.

Почему EV?

Это быстро. Опять же, предложения альтернатив приветствуются.

+0

скриптов на Perl, как правило, использовать глобальные переменные для дескрипторов файлов. Это не потокобезопасно. –

+0

@DavidKnipe Я не думаю, что 'EV', ни' AnyEvent', ни 'AnyEvent :: HTTP' использовать perl-потоки. Есть ли у вас доказательства того, что они делают? –

+0

Нет, я совсем не знаком с библиотеками, которые вы используете, но вопрос звучал связанным с потоком. В принципе, не обращайте на меня слишком много внимания :-) –

ответ

2

Вы не должны использовать AnyEvent :: Worker :: Poll для этой задачи.
И я порекомендую вам не использовать специфичные для цикла функции, такие как EV :: loop EV :: unloop. Это делает ваш код несовместимым с реализацией других циклов.

Ваш код может быть переписан как этого

use strict; 
use AnyEvent; 
use AnyEvent::HTTP; 

my $pool_size = 2; 
my $num_jobs = 7; 
my $cur_job = 0; 

my $cv = AnyEvent->condvar; 
$cv->begin(); 

for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) { 
    $cv->begin(); 
    make_job($cur_job++); 
} 

$cv->end(); 

sub make_job { 
    my $job = shift; 
    $num_jobs--; 

    http_get 'http://download.thinkbroadband.com/5MB.zip', sub { 
     my ($data, $headers) = @_; 
     if ($headers->{Status} =~ /^2/) { 
      print "download [$job] succeeded.\n"; 
     } else { 
      print "download [$job] failed.\n"; 
     } 

     if ($num_jobs > 0) { 
      make_job($cur_job++); 
     } 
     else { 
      $cv->end(); 
     } 
    }; 
} 

$cv->recv(); 
+0

Это намного проще, чем 'AnyEvent :: Worker'. Красиво сделано. Вы хоть представляете, что с ним происходит? –

+0

Я думаю, вы не можете полагаться на тот же цикл событий в дочернем процессе. Это не безопасно –

+0

О, спасибо. Я даже не знал, что под-процессы AnyEvent :: Worker созданы. Я предполагаю, что это должно было быть очевидно, учитывая предупреждения сокета домена в выводе. : -/ –

Смежные вопросы