2015-06-21 2 views
-1

Я хвостохранилище файла, читаю каждую строку и подачу его в поток очереди, используя приведенный ниже кодне сможет повторно использовать потоки в Perl

use threads; 
use threads::shared; 

use Thread::Queue; 

use threads 'exit' => 'threads_only'; 

$thread_limit = 10; 

my $q = Thread::Queue->new(); 

my @thr = map { 
    threads->create(
     sub { 
      while (defined(my $item = dequeue($q))) { 
       test($item); 
      } 
     }); 
} 1 .. $thread_limit; 

my $file = File::Tail->new("/path/File.txt"); 

while (defined(my $line = $file->read)) { 

    my @j; 
    if ($line !~ /\s/) { 
     push(@j, $line); 
    } 
    else { 
     my @temp = split(/\s/, $line); 
     my $size = @temp; 
     for (my $i = 0 ; $i <= $size ; $i++) { 
      push(@j, $temp[$i]); 
     } 
    } 

    { 
     lock($q); 
     $q->enqueue(@job); 
    } 
} 

Проблема:

thread_limit = 10

теперь, как только он достигает «количества активных потоков» = 1, потоки останавливаются, хотя очередь имеет больше рабочих элементов для удаления из очереди. Не могли бы вы помочь? Чтение файла для рабочих элементов будет непрерывным.

+2

$ q-> Ставить (@job); $ q-> enqueue (@j); –

+0

Вы всегда должны *** использовать «строгое» и «использовать предупреждения» в верхней части каждой программы Perl, ans объявить все ваши переменные с 'my' как можно ближе к их первой точке использования. – Borodin

+0

. Весь вашего оператора 'if' (если вы использовали' '' 'вместо'/\ s/', как вам следует) совпадает с' my @j = split '', $ line'. И вы должны передать массив по ссылке «enqueue», поэтому вы должны написать '$ q-> enqueue (\ @ j)' – Borodin

ответ

0

Каждая из нитей вашей программы умирает с Undefined subroutine &main::dequeue, а основная программа умирает с Can't locate object method "new" via package "File::Tail".

  • Добавить use File::Tail;.
  • Изменение dequeue($q) - $q->dequeue().

Добавление use strict; use warnings; (который вы должны всегда использование) обнаруживает проблемы.

  • Изменить $q->enqueue(@job);$q->enqueue(@j);. (Всегда используйте use strict; use warnings;!)

Все еще проблемы выявляются по мере исправления вышеизложенного.

  • Удалить lock($q);. 1) Это не сработает, поскольку это не общая переменная, 2) Блокировка только в одном месте бесполезна и 3) Thread :: Queue является потокобезопасной.
  • Подпрограмма test не существует.

После применения этих исправлений и некоторые другие очистки, вы получите следующее:

use strict; 
use warnings; 
use threads; 
use File::Tail   qw(); 
use Thread::Queue 3.01 qw(); 

use constant NUM_WORKERS  => 10; 
use constant COMMAND_FILE_QFN => '...'; 
use constant DEBUG   => 1; 

sub worker { 
    my ($item) = @_; 
    print("Processed $item.\n"); 
} 

my $q = Thread::Queue->new(); 
for (1..NUM_WORKERS) { 
    async { 
     while (defined(my $item = $q->dequeue())) { 
      worker($item); 
     } 
    }; 
} 

my $tail = File::Tail->new(
    name  => COMMAND_FILE_QFN, 
    maxinterval => DEBUG ? 1 : 60, 
); 
while (defined(my $line = $tail->read())) { 
    $q->enqueue(split(' ', $line)); 
} 

$q->end(); 
$_->join() for threads->list(); 
+0

Я думаю, что Thread :: Queue's может быть заблокирован - модули предлагают сделать это, если вы собираетесь делать неатомные вещи. В этом случае это не имеет значения, но это можно сделать. – Sobrique

+0

@Sobrique, я бы не заблокировал сам объект T :: Q ('lock% $ q;', что является первым делом 'enqueue' и' dequeue' do). Не уверен, что это связано с моим ответом (или вопросом). – ikegami

Смежные вопросы