2016-08-28 2 views
4

Мое оригинальное решение для https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top сохраняет копию данных на диске.Perl: читать факс неблокируясь

Теперь я сделал вторую версию, которая буферизует строку в памяти.

Работает, но для начала необходимо подключить все фифы. Это работает:

window1$ mkfifo {1..100} 
window1$ parcat {1..100} | pv >/dev/null 

window2$ parallel -j0 'cat bigfile > ' ::: * 

Это не дает никаких выходных данных (потому что 100 не подключен):

window1$ mkfifo {1..100} 
window1$ parcat {1..100} | pv >/dev/null 

window2$ parallel -j0 'cat bigfile > ' ::: {1..99} 

Я попытался с помощью open '+<'. Это решает вышеупомянутую проблему, но теперь она не останавливается на EOF.

Как это сделать?

Минимальная версия (не поддерживает большие линии и не ждать отката-):

#!/usr/bin/perl 

use Symbol qw(gensym); 
use IPC::Open3; 
use POSIX qw(:errno_h); 
use Fcntl qw(:DEFAULT :flock); 

for (@ARGV) { 
    open($fh{$_},"<",$_) || die; 
    # set fh non blocking($fh{$_}); 
    my $flags; 
    fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle 
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags 
    fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle 
} 

while(keys %fh) { 
    for(keys %fh) { 
     my($string,$something_read) = non_blocking_read($_); 
    print $string; 
    } 
    # Sleep 1 ms 
    select(undef, undef, undef, 1/1000); 
} 

{ 
    my %buffer; 

    sub non_blocking_read { 

     my $file = shift; 
     my $in = $fh{$file}; 
     my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680); 
     if (!$rv) { 
      if($! == EAGAIN) { 
      # Would block: Nothing read 
      return(undef,undef); 
      } else { 
      # This file is done 
      close $in; 
      delete $fh{$file}; 
      my $buf = $buffer{$file}; 
      delete $buffer{$file}; 
      return ($buf,1); 
      } 
     } 

     # Find \n for full line 
     my $i = (rindex($buffer{$file},"\n")+1); 
     if($i) { 
      # Return full line 
      # Remove full line from $buffer 
      return(substr($buffer{$file},0,$i), 
       1,substr($buffer{$file},0,$i) = ""); 
     } else { 
      # Something read, but not a full line 
      return("",1); 
     } 
    } 
} 

Полной версии: Важный код в первых 40 строках: Остальное хорошо протестированный код.

#!/usr/bin/perl 

use Symbol qw(gensym); 
use IPC::Open3; 

for (@ARGV) { 
    open($fh{$_},"<",$_) || die; 
    set_fh_non_blocking($fh{$_}); 
} 

$ms = 1; 
while(keys %fh) { 
    for(keys %fh) { 
    my($string,$something_read) = non_blocking_read($_); 
    if($something_read) { 
     $ms = 0.1; 
     print $string; 
    } 
    } 
    $ms = exp_usleep($ms); 
} 

{ 
    my %buffer; 
    my $ms; 

    sub non_blocking_read { 
    use POSIX qw(:errno_h); 

    my $file = shift; 
    my $in = $fh{$file}; 
    my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680); 
    if (!$rv) { 
     if($! == EAGAIN) { 
     # Would block: Nothing read 
     return(undef,undef); 
     } else { 
     # This file is done 
     close $in; 
     delete $fh{$file}; 
     my $buf = $buffer{$file}; 
     delete $buffer{$file}; 
     return ($buf,1); 
     } 
    } 

    #### Well-tested code below               

    # Find \n or \r for full line 
    my $i = (::rindex64(\$buffer{$file},"\n")+1) || 
     (::rindex64(\$buffer{$file},"\r")+1); 
    if($i) { 
     # Return full line 
     # Remove full line from $buffer 
     return(substr($buffer{$file},0,$i), 
      1,substr($buffer{$file},0,$i) = ""); 
    } else { 
     # Something read, but not a full line 
     return("",1); 
    } 
    } 
} 

sub rindex64 { 
    # Do rindex on strings > 2GB. 
    # rindex in Perl < v5.22 does not work for > 2GB 
    # Input: 
    # as rindex except STR which must be passed as a reference 
    # Output: 
    # as rindex 
    my $ref = shift; 
    my $match = shift; 
    my $pos = shift; 
    my $block_size = 2**31-1; 
    my $strlen = length($$ref); 
    # Default: search from end 
    $pos = defined $pos ? $pos : $strlen; 
    # No point in doing extra work if we don't need to. 
    if($strlen < $block_size) { 
    return rindex($$ref, $match, $pos); 
    } 

    my $matchlen = length($match); 
    my $ret; 
    my $offset = $pos - $block_size + $matchlen; 
    if($offset < 0) { 
    # The offset is less than a $block_size 
    # Set the $offset to 0 and 
    # Adjust block_size accordingly 
    $block_size = $block_size + $offset; 
    $offset = 0; 
    } 
    while($offset >= 0) { 
    $ret = rindex(
     substr($$ref, $offset, $block_size), 
     $match); 
    if($ret != -1) { 
     return $ret + $offset; 
    } 
    $offset -= ($block_size - $matchlen - 1); 
    } 
    return -1; 
} 

sub exp_usleep { 
    # Sleep this many milliseconds. 
    # Input: 
    # $ms = milliseconds to sleep 
    # Returns: 
    # $ms + 10% 
    my $ms = shift; 
    select(undef, undef, undef, $ms/1000); 
    return (($ms < 1000) ? ($ms * 1.1) : ($ms)); 
} 

sub set_fh_non_blocking { 
    # Set filehandle as non-blocking 
    # Inputs: 
    # $fh = filehandle to be blocking 
    # Returns: 
    # N/A 
    my $fh = shift; 
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; 
    my $flags; 
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle 
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags 
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle 
} 
+3

может кто-то пожалуйста, объясните мне 'close' голосовать? Я не совсем понимаю, как это не подходит ... – stevieb

+1

Попробуйте использовать sysread вместо чтения. sysread реализует небуферизацию чтения (т. е. похожее на чтение в режиме syscall), в то время как read реализует буферизованное чтение и пытается прочитать полный буфер (например, fread). Также я рекомендую использовать select, чтобы узнать, в каком файловом дескрипторе имеются данные. –

+0

'eval" использовать Fcntl qw (: DEFAULT: flock); 1; "' не имеет смысла за пределами блока 'BEGIN'. Заменить '$ Global :: use {" Fcntl "} || = eval" использовать Fcntl qw (: DEFAULT: flock); 1; ";' с 'require Fcntl;'. [Это не поможет с вашей проблемой.] – ikegami

ответ

1

Это решение открывает фальшивый писатель, который закрывается, как только будут получены какие-либо данные. Это делает правильно, за исключением того, что она не заканчивается, если вход пуст:

mkfifo {1..100} 
parcat {1..100} & 
parallel -j2 echo works '>' {} ::: {1..100} 

parcat {1..100} & 
# Fails (parcat does not exit) 
parallel -j2 cat /dev/null '>' {} ::: {1..100} 

Код:

#!/usr/bin/perl 

use Symbol qw(gensym); 
use IPC::Open3; 
use POSIX qw(:errno_h); 
use IO::Select; 
use strict; 

my $s = IO::Select->new(); 
my %fhr; 
my %fhw; 

for (@ARGV) { 
    # Open the file with a fake writer that will never write 
    open(my $fhw,"+<",$_) || die; 
    # Open the file for real 
    open(my $fhr,"<",$_) || die; 
    set_fh_non_blocking($fhr); 
    $s->add($fhr); 
    $fhr{$fhr}++; 
    $fhw{$fhr}=$fhw; 
} 

my %buffer; 
while(keys %fhr) { 
    for my $file ($s->can_read(undef)) { 
     my $rv = sysread($file, substr($buffer{$file},length $buffer{$file}), 327680); 
     if (!$rv) { 
      if($! == EAGAIN) { 
      # Would block: Nothing read 
     next; 
      } else { 
      # This file is done 
     $s->remove($file); 
      delete $fhr{$file}; 
     print $buffer{$file}; 
      delete $buffer{$file}; 
     # Closing the $file causes it to block 
     # close $file; 
     next; 
      } 
     } 
    if($fhw{$file}) { 
     # We have received data from $file: 
     # Close the fake writer 
     close $fhw{$file}; 
     delete $fhw{$file}; 
    } 

     # Find \n or \r for full line 
     my $i = (::rindex64(\$buffer{$file},"\n")+1) || 
      (::rindex64(\$buffer{$file},"\r")+1); 
     if($i) { 
      # Print full line 
      # Remove full line from $buffer 
     print substr($buffer{$file},0,$i); 
     substr($buffer{$file},0,$i) = ""; 
     next; 
     } else { 
      # Something read, but not a full line 
     next; 
     } 
    } 
} 

sub rindex64 { 
    # Do rindex on strings > 2GB. 
    # rindex in Perl < v5.22 does not work for > 2GB 
    # Input: 
    # as rindex except STR which must be passed as a reference 
    # Output: 
    # as rindex 
    my $ref = shift; 
    my $match = shift; 
    my $pos = shift; 
    my $block_size = 2**31-1; 
    my $strlen = length($$ref); 
    # Default: search from end 
    $pos = defined $pos ? $pos : $strlen; 
    # No point in doing extra work if we don't need to. 
    if($strlen < $block_size) { 
     return rindex($$ref, $match, $pos); 
    } 

    my $matchlen = length($match); 
    my $ret; 
    my $offset = $pos - $block_size + $matchlen; 
    if($offset < 0) { 
     # The offset is less than a $block_size 
     # Set the $offset to 0 and 
     # Adjust block_size accordingly 
     $block_size = $block_size + $offset; 
     $offset = 0; 
    } 
    while($offset >= 0) { 
     $ret = rindex(
      substr($$ref, $offset, $block_size), 
      $match); 
     if($ret != -1) { 
      return $ret + $offset; 
     } 
     $offset -= ($block_size - $matchlen - 1); 
    } 
    return -1; 
} 

sub set_fh_non_blocking { 
    # Set filehandle as non-blocking 
    # Inputs: 
    # $fh = filehandle to be blocking 
    # Returns: 
    # N/A 
    my $fh = shift; 
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; 
    my $flags; 
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle 
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags 
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle 
} 
Смежные вопросы