2016-08-08 4 views
0

У меня есть следующий код perl, который делает асинхронные вызовы для двух внешних программ биоинформатики. Во-первых, он запускает blastJob, а затем он принимает результаты от этого и запускает exonerateJob. Я адаптировал этот код от previous question о переносе моего кода на многопоточный подход.Perl Многопоточность - рабочие потоки перестают работать

Проблема расстраивает, потому что это происходит только после нескольких часов работы. Я оставлю программу работать на ночь и рано утром обнаруживаю, что exonerateJobs больше не работают, но новые blastJobs все еще прокручиваются. Нет никаких сообщений об ошибках или что-то еще. Еще одним лакомым кусочком информации является то, что я вернулся и протестировал входные запросы, где журналы показывают, что exonerateJobs перестали работать. Программа заканчивается просто отлично, если я запускаю небольшое количество запросов через нее, даже если они являются запросами, которые, казалось, вызвали проблемы ранее. Поскольку я не слишком хорошо знаком с правилами многопоточности, я хотел бы знать, есть ли проблема с моим подходом или если это потенциально проблема с внешними программами, которые вызываются. Вот фрагмент кода:

#Asynchronous calls to blast and exonerate 
{ 
    my $blast_request_queue = Thread::Queue->new(); 
    my $exonerate_request_queue = Thread::Queue->new(); 

    my @blast_threads; 
    for (1..NUM_BLAST_WORKERS) { 
     push @blast_threads, async { 
     while (my $q = $blast_request_queue->dequeue()) { 
      my @results = blastJob($q, $blastopts_ref); 
      foreach (@results) { 
       my @args = ($q, $_); 
       $exonerate_request_queue->enqueue(\@args); 
      } 
     } 

     $exonerate_request_queue->end(); # I've tried with and without this line, the result seems to be the same 
     }; 
    } 

    my @exonerate_threads; 
    for (1..NUM_EXONERATE_WORKERS) { 
     push @exonerate_threads, async { 
     while (my $args_ref = $exonerate_request_queue->dequeue()) { 
      my ($queryFile, $targetName) = @$args_ref; #De-reference args 
      my $regex = qr/\Q$targetName\E/; 
      #Check for target file 
      my ($file_match) = grep { $_ =~ $regex } keys %targets; 
      if ($file_match) { 
       my $targetFile = $options{'t'} . $file_match; 
       my $result = exonerateJob($queryFile, $targetFile, $exonopts_ref); 
       #Print result to file after job is finished 
       my ($Qname, $Qpath, $Qsuffix) = fileparse($queryFile); 
       my $outFN = $Qname . ".exonerate_out"; 
       open(OUTFH, ">>$outFN") or print STDERR "Can't open $outFN: $!"; 
       print OUTFH $result; 
      } else { 
       print STDERR "Target file not found: $targetName. Can't run exonerate"; 
      } 
     } 
     }; 
    } 

    foreach (@queries) { 
     #Concatenate query path with name 
     my $queryFile = $options{'q'} . $_; 
     $blast_request_queue->enqueue($queryFile); 
    } 
    #my $queryFile = $options{'q'} . $queries[3]; 
    #$blast_request_queue->enqueue($queryFile); 

    $blast_request_queue->end();  
    $_->join() for @blast_threads; 
    $exonerate_request_queue->end(); 
    $_->join() for @exonerate_threads; 
} 

#I'm using IPC::Run to launch the programs. 
#There is some error handling which I believe should catch any probs 
sub blastJob { 
    my ($query, $blastopts_ref) = @_; 
    #De-reference blast options 
    my @blastCmd = @$blastopts_ref; 
    my ($blastOut, $err); #for blast output 
    #Add query information after first blast option 
    splice(@blastCmd, 1, 0, ("-query", $query)); 
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query); 
    print "Running $blastCmd[0]: query $Qname...\n"; 
    run \@blastCmd, \undef, \$blastOut, \$err; 
    if ($err) { 
     print "Error in BLAST query $Qname. $err\n"; 
    } 
    my @results = split("\n", $blastOut); 
    return uniq(@results); 
} 

sub exonerateJob { 
    my ($query, $target, $exonopts_ref) = @_; 
    #De-reference exonerate options 
    my @exonCmd = @$exonopts_ref; 
    my ($exonOut, $err); #for exonerate output 
    #Add program, query, and target information to exonerate options 
    unshift (@exonCmd, ("exonerate", "-q", $query, "-t", $target)); 
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query); 
    my ($Tname, $Tpath, $Tsuffix) = fileparse($target); 
    eval { 
     print "Running exonerate: query $Qname, target $Tname...\n"; 
     run \@exonCmd, \undef, \$exonOut, \$err, timeout(240); 
     if ($err) { 
      print STDERR "Error in exonerate query $Qname, target $Tname. $err\n"; 
     } 
    }; 
    if ([email protected] =~ /timeout/) { 
     print STDERR "Error: timeout in exonerate query $Qname, target $Tname\n"; 
    } 

    return $exonOut; 
} 
+1

Совет: 'warn (" x \ n ")' не только короче, чем 'print (STDERR" x \ n ")', также вызывает '$ SIG {__ WARN __}' для вызова, если он установлен. Это лучший подход. – ikegami

+0

Это имеет смысл. Спасибо за совет! – Tsaari

+1

Вы можете распечатать сообщение после циклов dequeue, чтобы узнать, вышел ли поток, потому что dequeue вернул false или по какой-то другой причине. – ikegami

ответ

1

Без вашей информации источника, я не могу проверить это, - но мои деньги будут на первый раз, когда вы:

$exonerate_request_queue->end(); 

В этом асинхронном блоке.

Потому что я думаю, что это вполне возможно, там, что как только как вы закрыть

$blast_request_queue->end(); 

Затем нить выйдет вскоре после того, закройте очередь «выход», и при этом - значит, что вы потерять все, что ожидало, потому что очередь закрыта.

+1

'$ exonerate_request_queue-> end();' действительно вызывается слишком рано, хотя это не обязательно называется, как только вызывается '$ blast_request_queue-> end();'. Код, на котором основывался его ОП, не пострадал от этой проблемы. Тем не менее, я не уверен, что это проблема, о которой спрашивает OP. – ikegami

+0

Я должен был заметить, что я попытался удалить эту строку. Я отредактирую свой Q, чтобы это отразить. Результат был тем же, разочаровывающим. – Tsaari

1

Вы столкнулись с множеством проблем, чтобы избежать die, когда вы просто должны добавить eval BLOCK вокруг рабочего кода.

Изменить

my $result = job1($job); 
$job2_request_queue->enqueue($result); 

в

my $result = eval { job1($job) }; 
if ([email protected]) { 
    warn("Job failed: [email protected]"); 
} else { 
    $job2_request_queue->enqueue($result); 
} 

Это гораздо надежнее. Например, run может генерировать исключение, которое убьет вашего ребенка.


Кроме того, как уже упоминалось Sobrique, не должен был добавлен верхний экземпляр $exonerate_request_queue->end();. Это предотвращает добавление дополнительной работы в очередь (и сигнализирует, что работники-экзонаты должны выйти, как только была выполнена вся работа, выполняемая в настоящее время в очереди). Это должно быть сделано только после выхода каждого взрывателя, но это добавление заставляет это делать, как только выйдет первый взрывной рабочий.

+0

Спасибо Ikegami, я попробую этот подход, а также удалю эту строку. Чтобы уточнить, вы бы также посоветовали изменить способ обработки ошибок внутри подпрограммы, чтобы они умерли, а не печатали в STDERR? – Tsaari

+2

Вы имеете в виду, как вы печатаете на 'open', но затем пытаетесь использовать дескриптор? Да, это нужно изменить. – ikegami

+0

Вау, да, я согласен, что это довольно вопиющее. Я определенно избегал «умирать», в основном из-за невежества. Кажется, я вижу, как он теперь будет вместе. – Tsaari

Смежные вопросы