2013-02-27 3 views
1

Я пытаюсь реализовать многопоточное приложение на основе слегка измененной модели босса/работника. В основном основной поток создает несколько потоков босса, которые, в свою очередь, порождают два рабочих потока каждый (возможно, больше). Это потому, что потоки босса имеют дело с одним хостом или сетевым устройством, и рабочие потоки могут занять некоторое время, чтобы завершить свою работу.Странное поведение переменных с использованием Perl ithreads

Я использую Thread::Pool для реализации этой концепции, и до сих пор она работает достаточно хорошо; Я также не думаю, что моя проблема связана с Thread::Pool (см. Ниже). Очень упрощенный псевдокод вперед:

use strict; 
use warnings; 

my $bosspool = create_bosspool(); # spawns all boss threads 
my $taskpool = undef;    # created in each boss thread at 
            # creation of each boss thread 

# give device jobs to boss threads 
while (1) { 
    foreach my $device (@devices) { 
    $bosspool->job($device); 
    } 

    sleep(1); 
} 

# This sub is called for jobs passed to the $bosspool 
sub process_boss 
{ 
    my $device = shift; 

    foreach my $task ($device->{tasks}) { 
    # process results as they become available 
    process_result() while ($taskpool->results); 
    # give task jobs to task threads 
    scalar $taskpool->job($device, $task); 
    sleep(1); ### HACK ### 
    } 

    # process remaining results/wait for all tasks to finish 
    process_result() while ($taskpool->results || $taskpool->todo); 

    # happy result processing 
} 

sub process_result 
{ 
    my $result = $taskpool->result_any(); 

    # mangle $result 
} 

# This sub is called for jobs passed to the $taskpool of each boss thread 
sub process_task 
{ 
    # not so important stuff 

    return $result; 
} 

Кстати, почему я не использовать monitor() -routine потому, что я должен ждать для всех рабочих мест в $taskpool до конца. Теперь этот код работает просто замечательно, если вы не удалите строку ### HACK ###. Без сна $taskpool->todo() не достанется правильному количеству заданий, которые будут открыты, если вы добавите их или получите их результаты слишком «быстро». Например, вы добавили 4 задания, но $taskpool->todo() вернется только после 2 (без ожидающих результатов). Это приводит к разным интересным эффектам.

ОК, так Thread::Pool->todo() это дерьмо, давайте попробуем обходной путь:

sub process_boss 
{ 
    my $device = shift; 

    my $todo = 0; 

    foreach my $task ($device->{tasks}) { 
    # process results as they become available 
    while ($taskpool->results) { 
     process_result(); 
     $todo--; 
    } 
    # give task jobs to task threads 
    scalar $taskpool->job($device, $task); 
    $todo++; 
    } 

    # process remaining results/wait for all tasks to finish 
    while ($todo) { 
    process_result(); 
    sleep(1); ### HACK ### 
    $todo--; 
    } 
} 

Это также будет работать хорошо, до тех пор, пока я держу ### HACK ### линию. Без этой строки этот код будет воспроизводить проблемы Thread::Pool->todo(), так как $todo не только уменьшается на 1, но и на 2 или даже больше.

Я тестировал этот код только с одной нитью босса, поэтому в большинстве случаев не было многопоточности (когда дело доходит до этой подпрограммы). $bosspool, $taskpool и особенно $todo не :shared, никаких побочных эффектов не возможно, правда? Что происходит в этой подпрограмме, которая выполняется только одним потоком босса без общих переменных, семафоров и т. Д.?

+5

Можете ли вы создать простой комплект кода примера, который демонстрирует проблему? Трудно оценить, что здесь происходит, потому что мы не можем видеть ключевые части вашей программы. – 2013-02-27 14:19:13

+0

Вы пытались с реальными данными или вы сначала создали тестовую установку? – didierc

ответ

0

Я бы предположил, что наилучший способ реализации модели потоков работника - Thread::Queue. Проблема с чем-то подобным заключается в том, чтобы выяснить, когда очереди завершены, или элементы отложены и ожидающие обработки.

С помощью Thread::Queue вы можете использовать цикл while для извлечения элементов из очереди и end очереди, так что цикл while возвращает undef и потоки завершаются.

Таким образом, вам не всегда нужны несколько потоков «босса», вы можете просто использовать несколько разных вкусов worker и ввести очереди. Я бы поставил под вопрос, зачем вам нужна модель потока босса в этом случае. Это кажется ненужным.

Со ссылкой на: Perl daemonize with child daemons

#!/usr/bin/perl 

use strict; 
use warnings; 
use threads; 
use Thread::Queue; 

my $nthreads = 4; 

my @targets = qw (device1 device2 device3 device4); 

my $task_one_q = Thread::Queue->new(); 
my $task_two_q = Thread::Queue->new(); 

my $results_q = Thread::Queue->new(); 

sub task_one_worker { 
    while (my $item = task_one_q->dequeue) { 

     #do something with $item 

     $results_q->enqueue("$item task_one complete"); 
    } 
} 

sub task_two_worker { 
    while (my $item = task_two_q->dequeue) { 

     #do something with $item 

     $results_q->enqueue("$item task_two complete"); 
    } 
} 

#start threads; 

for (1 .. $nthreads) { 
    threads->create(\&task_one_worker); 
    threads->create(\&task_two_worker); 
} 

foreach my $target (@targets) { 
    $task_one_q->enqueue($target); 
    $task_two_q->enqueue($target); 
} 

$task_one_q->end; 
$task_two_q->end; 

#Wait for threads to exit. 

foreach my $thr (threads->list()) { 
    threads->join(); 
} 

$results_q->end(); 

while (my $item = $results_q->dequeue()) { 
    print $item, "\n"; 
} 

Вы могли бы сделать что-то подобное с boss нитью, если вы жаждущий - вы можете создать очереди за boss и передать его по отношению к рабочим. Я не уверен, что это необходимо.

Смежные вопросы