2013-08-21 3 views
2

Я совершенно новый для Perl, особенно Perl Threads. я хочу сделать:Perl Queue and Threads abnormal exit

  1. Есть 5 потоков, которые будут данные ан-очереди (номера) Случайные в Thread :: очереди
  2. есть 3 темы, которые будут ДЭ-очереди данных из Thread :: очередь.

Полный код, который я написал для того, чтобы достичь выше миссии:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @Enquing_threads; 
our @Dequeuing_threads; 

sub buildQueue 
{ 
    my $TotalEntry=1000; 
    while($TotalEntry-- >0) 
    { 
     my $query = rand(10000); 
     $queue->enqueue($query); 
     print "Enque thread with TID " .threads->tid . " got $query,"; 
     print "Queue Size: " . $queue->pending . "\n"; 
    } 
} 
sub process_Queue 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Dequeu thread with TID " .threads->tid . " got $query\n"; 
    } 
} 
push @Enquing_threads,threads->create(\&buildQueue) for 1..5; 
push @Dequeuing_threads,threads->create(\&process_Queue) for 1..3; 

Вопросы, которые я Облицовочные:

  • Нити не работают, как одновременно, как и ожидалось.
  • Вся программа ненормально выход с следующий вывод консоли:

Perl вышел с активными потоками: 8 работает и несвязанные

0 finished and unjoined 
    0 running and detached 

    Enque thread with TID 5 got 6646.13585023883,Queue Size: 595 
    Enque thread with TID 1 got 3573.84104215917,Queue Size: 595 

Любая помощь на код-оптимизации ценится.

+0

Почему вы использовали 'our' вместо' my'? Вам никогда не придется использовать 'our', кроме случаев принудительного (' @ ISA', '@ EXPORT') – ikegami

ответ

4

Ожидаемое поведение: Когда основной поток выходит, все остальные потоки также выходят. Если вам все равно, вы можете сделать $thread->detach. В противном случае вам нужно вручную указать $thread->join, что мы и сделаем.

$thread->join ожидает завершения потока и возвращает возвращаемое значение (потоки могут возвращать значения, как и подпрограммы, хотя контекст (list/void/scalar) должен быть зафиксирован на время появления).

Мы detach потоки, Епдиеие данные:

threads->create(\&buildQueue)->detach for 1..5; 

Теперь для освобождения пакета из очереди потоков, мы поместили их в лексической переменной (? Почему вы используете глобал), так что мы можем из очереди их позже:

my @dequeue_threads = map threads->create(\&process_queue), 1 .. 3; 

Тогда ждать их завершения:

$_->join for @dequeue_threads; 

Мы знаем, что отдельные потоки завершат выполнение до того, как программа выйдет, потому что единственный выход для выходящих из очереди потоков - выключение очереди.

За исключением полутора ошибок. Понимаете, есть разница между пустой очередью и готовой очередью. Если очередь просто пуста, то декомпрессионные потоки будут блокироваться на $queue->dequeue, пока они не получат некоторый ввод. Традиционное решение - dequeue, а значение, которое они получают, определяется. Мы можем разбить цикл, поставив столько же undef значений в очереди, сколько потоков из очереди. Более современная версия Thread::Queue имеет метод end, что делает dequeue return undef для всех последующих вызовов.

Проблема заключается в , когда, чтобы положить конец очереди. Мы должны это сделать после того, как вышли из очереди очереди. Это означает, что мы должны ждать их вручную. Вздох.

my @enqueueing = map threads->create(\&enqueue), 1..5; 
my @dequeueing = map threads->create(\&dequeue), 1..3; 
$_->join for @enqueueing; 
$queue->enqueue(undef) for 1..3; 
$_->join for @dequeueing; 

И в sub dequeuing: while(defined(my $item = $queue->dequeue)) { ... }.

Использование теста defined еще одна ошибка: rand может вернуть ноль, хотя это маловероятно и пропустит большинство тестов. Договор заключается в том, что он возвращает псевдослучайное число с плавающей запятой между включением нуля и исключением некоторой верхней границы: Число из интервала [0, x). По умолчанию значение по умолчанию: 1.

Если вы не хотите присоединяться к очереди очереди вручную, вы можете использовать семафор для завершения ввода сигнала. Семафор - это многопоточный примитив, который может быть увеличен и уменьшен, но не ниже нуля. Если операция декремента позволила бы подсчету падения ниже нуля, вызов блокируется до тех пор, пока другой поток не увеличит счетчик. Если начальный счет равен 1, он может использоваться как флаг для блокировки ресурсов.

Мы также можем начать с отрицательного значения 1 - $NUM_THREADS, и каждый поток увеличит значение, так что только когда все потоки выйдут, он может быть уменьшен снова.

use threads; # make a habit of importing `threads` as the first thing 

use strict; use warnings; 
use feature 'say'; 

use Thread::Queue; 
use Thread::Semaphore; 

use constant { 
    NUM_ENQUEUE_THREADS => 5, # it's good to fix the thread counts early 
    NUM_DEQUEUE_THREADS => 3, 
}; 

sub enqueue { 
    my ($out_queue, $finished_semaphore) = @_; 
    my $tid = threads->tid; 

    # iterate over ranges instead of using the while($maxval --> 0) idiom 
    for (1 .. 1000) { 
    $out_queue->enqueue(my $val = rand 10_000); 
    say "Thread $tid enqueued $val"; 
    } 

    $finished_semaphore->up; 
    # try a non-blocking decrement. Returns true only for the last thread exiting. 
    if ($finished_semaphore->down_nb) { 
    $out_queue->end; # for sufficiently modern versions of Thread::Queue 
    # $out_queue->enqueue(undef) for 1 .. NUM_DEQUEUE_THREADS; 
    } 
} 

sub dequeue { 
    my ($in_queue) = @_; 
    my $tid = threads->tid; 
    while(defined(my $item = $in_queue->dequeue)) { 
    say "thread $tid dequeued $item"; 
    } 
} 

# create the queue and the semaphore 
my $queue = Thread::Queue->new; 
my $enqueuers_ended_semaphore = Thread::Semaphore->new(1 - NUM_ENQUEUE_THREADS); 

# kick off the enqueueing threads -- they handle themself 
threads->create(\&enqueue, $queue, $enqueuers_ended_semaphore)->detach for 1..NUM_ENQUEUE_THREADS; 

# start and join the dequeuing threads 
my @dequeuers = map threads->create(\&dequeue, $queue), 1 .. NUM_DEQUEUE_THREADS; 
$_->join for @dequeuers; 

Не удивляйтесь, если потоки, кажется, не работают параллельно, но последовательно: Эта задача (enqueuing случайного числа) очень быстро, и не очень хорошо подходит для многопоточности (enqueueing дороже чем создание случайного числа).

Вот пример работы, где каждый только создает Очередь два значения:

Thread 1 enqueued 6.39390993005694 
Thread 1 enqueued 0.337993319585337 
Thread 2 enqueued 4.34504733960242 
Thread 2 enqueued 2.89158054485114 
Thread 3 enqueued 9.4947585773571 
Thread 3 enqueued 3.17079715055542 
Thread 4 enqueued 8.86408863197179 
Thread 5 enqueued 5.13654995317669 
Thread 5 enqueued 4.2210886147538 
Thread 4 enqueued 6.94064174636395 
thread 6 dequeued 6.39390993005694 
thread 6 dequeued 0.337993319585337 
thread 6 dequeued 4.34504733960242 
thread 6 dequeued 2.89158054485114 
thread 6 dequeued 9.4947585773571 
thread 6 dequeued 3.17079715055542 
thread 6 dequeued 8.86408863197179 
thread 6 dequeued 5.13654995317669 
thread 6 dequeued 4.2210886147538 
thread 6 dequeued 6.94064174636395 

Вы можете увидеть, что 5 удалось епдиеие несколько вещей, прежде чем 4. Нити 7 и 8 ничего не могут удалить, 6 слишком быстро. Кроме того, все защитники завершаются до того, как порождаются декомпоненты (для такого небольшого количества входов).

+1

' use Thread :: Queue 1.03; 'будет обеспечивать' -> end'. – ikegami

+0

@amon: Еще одна вещь, что 3 здания (enquing) и 5 ​​потоков обработки (обработки), подпадает под тип модели потоков? Это, модель потока Boss-Worker? –

+0

@ritesh_NITW В модели * boss-worker * один поток распределяет нагрузку на несколько рабочих потоков (часто из пула потоков). Наша программа не слишком хорошо соответствует этому описанию. Я бы скорее описал это как архитектуру * производитель-потребитель * или * конвейер *. – amon