Ожидаемое поведение: Когда основной поток выходит, все остальные потоки также выходят. Если вам все равно, вы можете сделать $thread->detach
. В противном случае вам нужно вручную указать $thread->join
, что мы и сделаем.
$thread->join
ожидает завершения потока и возвращает возвращаемое значение (потоки могут возвращать значения, как и подпрограммы, хотя контекст (list/void/scalar) должен быть зафиксирован на время появления).
Мы detach
потоки, Епдиеие данные:
threads->create(\&buildQueue)->detach for 1..5;
Теперь для освобождения пакета из очереди потоков, мы поместили их в лексической переменной (? Почему вы используете глобал), так что мы можем из очереди их позже:
my @dequeue_threads = map threads->create(\&process_queue), 1 .. 3;
Тогда ждать их завершения:
$_->join for @dequeue_threads;
Мы знаем, что отдельные потоки завершат выполнение до того, как программа выйдет, потому что единственный выход для выходящих из очереди потоков - выключение очереди.
За исключением полутора ошибок. Понимаете, есть разница между пустой очередью и готовой очередью. Если очередь просто пуста, то декомпрессионные потоки будут блокироваться на $queue->dequeue
, пока они не получат некоторый ввод. Традиционное решение - dequeue
, а значение, которое они получают, определяется. Мы можем разбить цикл, поставив столько же undef
значений в очереди, сколько потоков из очереди. Более современная версия Thread::Queue
имеет метод end
, что делает dequeue
return undef
для всех последующих вызовов.
Проблема заключается в , когда, чтобы положить конец очереди. Мы должны это сделать после того, как вышли из очереди очереди. Это означает, что мы должны ждать их вручную. Вздох.
my @enqueueing = map threads->create(\&enqueue), 1..5;
my @dequeueing = map threads->create(\&dequeue), 1..3;
$_->join for @enqueueing;
$queue->enqueue(undef) for 1..3;
$_->join for @dequeueing;
И в sub dequeuing
: while(defined(my $item = $queue->dequeue)) { ... }
.
Использование теста defined
еще одна ошибка: rand
может вернуть ноль, хотя это маловероятно и пропустит большинство тестов. Договор заключается в том, что он возвращает псевдослучайное число с плавающей запятой между включением нуля и исключением некоторой верхней границы: Число из интервала [0, x)
. По умолчанию значение по умолчанию: 1
.
Если вы не хотите присоединяться к очереди очереди вручную, вы можете использовать семафор для завершения ввода сигнала. Семафор - это многопоточный примитив, который может быть увеличен и уменьшен, но не ниже нуля. Если операция декремента позволила бы подсчету падения ниже нуля, вызов блокируется до тех пор, пока другой поток не увеличит счетчик. Если начальный счет равен 1
, он может использоваться как флаг для блокировки ресурсов.
Мы также можем начать с отрицательного значения 1 - $NUM_THREADS
, и каждый поток увеличит значение, так что только когда все потоки выйдут, он может быть уменьшен снова.
use threads; # make a habit of importing `threads` as the first thing
use strict; use warnings;
use feature 'say';
use Thread::Queue;
use Thread::Semaphore;
use constant {
NUM_ENQUEUE_THREADS => 5, # it's good to fix the thread counts early
NUM_DEQUEUE_THREADS => 3,
};
sub enqueue {
my ($out_queue, $finished_semaphore) = @_;
my $tid = threads->tid;
# iterate over ranges instead of using the while($maxval --> 0) idiom
for (1 .. 1000) {
$out_queue->enqueue(my $val = rand 10_000);
say "Thread $tid enqueued $val";
}
$finished_semaphore->up;
# try a non-blocking decrement. Returns true only for the last thread exiting.
if ($finished_semaphore->down_nb) {
$out_queue->end; # for sufficiently modern versions of Thread::Queue
# $out_queue->enqueue(undef) for 1 .. NUM_DEQUEUE_THREADS;
}
}
sub dequeue {
my ($in_queue) = @_;
my $tid = threads->tid;
while(defined(my $item = $in_queue->dequeue)) {
say "thread $tid dequeued $item";
}
}
# create the queue and the semaphore
my $queue = Thread::Queue->new;
my $enqueuers_ended_semaphore = Thread::Semaphore->new(1 - NUM_ENQUEUE_THREADS);
# kick off the enqueueing threads -- they handle themself
threads->create(\&enqueue, $queue, $enqueuers_ended_semaphore)->detach for 1..NUM_ENQUEUE_THREADS;
# start and join the dequeuing threads
my @dequeuers = map threads->create(\&dequeue, $queue), 1 .. NUM_DEQUEUE_THREADS;
$_->join for @dequeuers;
Не удивляйтесь, если потоки, кажется, не работают параллельно, но последовательно: Эта задача (enqueuing случайного числа) очень быстро, и не очень хорошо подходит для многопоточности (enqueueing дороже чем создание случайного числа).
Вот пример работы, где каждый только создает Очередь два значения:
Thread 1 enqueued 6.39390993005694
Thread 1 enqueued 0.337993319585337
Thread 2 enqueued 4.34504733960242
Thread 2 enqueued 2.89158054485114
Thread 3 enqueued 9.4947585773571
Thread 3 enqueued 3.17079715055542
Thread 4 enqueued 8.86408863197179
Thread 5 enqueued 5.13654995317669
Thread 5 enqueued 4.2210886147538
Thread 4 enqueued 6.94064174636395
thread 6 dequeued 6.39390993005694
thread 6 dequeued 0.337993319585337
thread 6 dequeued 4.34504733960242
thread 6 dequeued 2.89158054485114
thread 6 dequeued 9.4947585773571
thread 6 dequeued 3.17079715055542
thread 6 dequeued 8.86408863197179
thread 6 dequeued 5.13654995317669
thread 6 dequeued 4.2210886147538
thread 6 dequeued 6.94064174636395
Вы можете увидеть, что 5
удалось епдиеие несколько вещей, прежде чем 4
. Нити 7
и 8
ничего не могут удалить, 6
слишком быстро. Кроме того, все защитники завершаются до того, как порождаются декомпоненты (для такого небольшого количества входов).
Почему вы использовали 'our' вместо' my'? Вам никогда не придется использовать 'our', кроме случаев принудительного (' @ ISA', '@ EXPORT') – ikegami