2012-05-07 4 views
7

Мой Perl скрипт необходимо запускать несколько потоков одновременно ...Как реализовать взаимодействие потоков семафора в Perl?

use threads ('yield', 'exit' => 'threads_only'); 
use threads::shared; 
use strict; 
use warnings; 
no warnings 'threads'; 
use LWP::UserAgent; 
use HTTP::Request; 
use HTTP::Async; 
use ... 

... и такие потоки должны получить некоторую информацию из Интернета, так HTTP::Async используется.

my $request = HTTP::Request->new; 
    $request->protocol('HTTP/1.1'); 
    $request->method('GET'); 
    $request->header('User-Agent' => '...'); 

my $async = HTTP::Async->new(slots   => 100, 
           timeout   => REQUEST_TIMEOUT, 
           max_request_time => REQUEST_TIMEOUT); 

Но некоторые темы, необходимо получить доступ к веб-только тогда, когда другой поток (s) так говорит.

my $start = [Time::HiRes::gettimeofday()]; 
my @threads =(); 
foreach ... { 
    $thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      } 
      my ($response, $data); 
      $request->url($url); 
      $data = ''; 
      $async->add($request); 
      while ($response = $async->wait_for_next_response) { 
      threads->yield(); 
      $data .= $response->as_string; 
      } 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      } 
     } 
     }, $_); 

    if (defined $thread) { 
    $thread->detach; 
    push (@threads, $thread); 
    } 
} 

Там может быть один или несколько нитей ждет для «идти» сигнал и там может быть один или более нити, такие «идти» сигнал может отправить. В начале статус семафора «wait», и как только он превращается в «go», он останется таким.

Наконец, приложение проверяет максимальное время работы. Если потоки работают слишком долго, передается сигнал самонастройки.

my $running; 
do { 
    $running = 0; 
    foreach my $thread (@threads) { 
    $running++ if $thread->is_running(); 
    } 
    threads->yield(); 
} until (($running == 0) || 
     (Time::HiRes::tv_interval($start) > MAX_RUN_TIME)); 
$running = 0; 
foreach my $thread (@threads) { 
    if ($thread->is_running()) { 
    $thread->kill('KILL'); 
    $running++; 
    } 
} 
threads->yield(); 

Теперь к делу. Мои вопросы:

  1. Как я могу наиболее эффективно код ожидания «семафор» в сценарии (см комментарии в скрипте выше). Должен ли я просто использовать только общую переменную с некоторым манекеномsleeploop?

  2. Мне нужно добавить некоторыеsleepпетли в конце приложения, чтобы дать время для потоков для самоуничтожения ли?

+0

Правильно ли я понимаю, что вы используете отдельный HTTP :: объекты ASync (копируются, а не общий, новыми нитями) для получения более одного URL за раз в потоке? – pilcrow

+0

@pilcrow - Да, похоже. Это трата ресурсов? –

+0

Это может быть или не быть менее экономичным или экономичным во времени, но это реальный сток на циклах программиста. :) Дизайн трудно понять, и, следовательно, возможно, чтобы изменить/расширить безопасно, потому что компоненты не кажутся совершенно правильными. – pilcrow

ответ

3

Вы можете посмотреть на Thread::Queue, чтобы выполнить эту работу. Вы можете настроить очередь, которая будет обрабатывать сигнализацию между потоками, ожидающими сигнал «идти», и потоки, посылающие сигнал «идти». Вот быстрый макет, что я не проверял:

... 
use Thread::Queue; 
... 
# In main body 
my $q = Thread::Queue->new(); 
... 
$thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      my $mesg = $q->dequeue(); 
      # you could put in some termination code if the $mesg isn't 'go' 
      if ($mesg ne 'go') { ... } 
      } 
      ... 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      $q->enqueue('go'); 
      } 
     } 
     }, $_); 
... 

Нити, которые должны ждать «идти» сигнал будет ждать на методе DEQUEUE пока что-то не входит в очередь. Как только сообщение входит в очередь, один поток и только один поток будут захватывать сообщение и обрабатывать его.

Если вы хотите остановить потоки, чтобы они не запускались, вы можете вставить сообщение остановки в голову очереди.

$q->insert(0, 'stop') foreach (@threads); 

Есть примеры Thread :: Queue и threads распределения CPAN, которые показывают это более подробно.

В ответ на ваш второй вопрос ответ, к сожалению, зависит. Когда вы продолжаете прекращать свои потоки, какая очистка требуется для чистого выключения? Каков худший сценарий, который может произойти, если коврик вытащил из-под потока? Вы хотели бы планировать в любое время, чтобы произошла чистка. Другой вариант, который вы можете сделать, - это подождать на каждом потоке, чтобы закончить.

Причина, по которой мой комментарий спрашивает, можете ли вы удалить вызов detach, заключается в том, что этот метод позволяет основному потоку выйти и не заботится о том, что происходит с дочерними потоками. Вместо этого, если вы удалите этот вызов, и добавить:

$_->join() foreach threads->list(); 

до конца основного блока, для этого потребуется основное приложение ждать каждый поток на самом деле завершен.

Если вы оставите метод detach на месте, вам необходимо будет спать в конце вашего кода, если вам нужны ваши потоки для выполнения любой очистки. Когда вы вызываете detach в потоке, то, что вы рассказываете о Perl, заключается в том, что вам все равно, что делает поток, когда ваш основной поток завершается. Если главный поток выходит и есть потоки, которые все еще работают, которые были отсоединены, программа завершится без предупреждений. Однако, если вы не нуждаетесь в очистке, и вы все еще звоните detach, не стесняйтесь уходить, когда захотите.

+0

У этого вопроса есть открытая щедрость, которая стоит +50 репутации. Пожалуйста, ** улучшите свой ответ **. Я нашел ваше сообщение интересным, однако вы не ответили на второй вопрос в моей публикации (если/как ** ждать ** для самонастройки потоков) –

+0

@ user1215106 Я замечаю в вашем коде, что вы делаете ' $ резьбовых> отрывать; '. Обычно вы используете это, чтобы игнорировать поток и не беспокоиться о его завершении или нет. У вас есть причина для этого присутствия или его можно удалить? – Joel

+0

Я считаю, что его можно удалить –

-1

Попробуйте что-то вроде этого ....

#!/usr/bin/perl 

use threads; 
use threads::shared; 

$|=1; 

my ($global):shared; 
my (@threads); 

push(@threads, threads->new(\&mySub,1)); 
push(@threads, threads->new(\&mySub,2)); 
push(@threads, threads->new(\&mySub,3)); 

$i = 0; 

foreach my $myThread(@threads) 

{ 
    my @ReturnData = $myTread->join ; 
    print "Thread $i returned: @ReturnData\n"; 
    $i++; 
} 

sub mySub 
{ 
    my ($threadID) = @_; 

    for(0..1000) 
    { 
     $global++; 
     print "Thread ID: $threadID >> $_ >> GLB: $global\n"; 
     sleep(1); 
    } 
    return($id); 
} 
Смежные вопросы