Мое оригинальное решение для https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top сохраняет копию данных на диске.Perl: читать факс неблокируясь
Теперь я сделал вторую версию, которая буферизует строку в памяти.
Работает, но для начала необходимо подключить все фифы. Это работает:
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: *
Это не дает никаких выходных данных (потому что 100 не подключен):
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: {1..99}
Я попытался с помощью open '+<'
. Это решает вышеупомянутую проблему, но теперь она не останавливается на EOF.
Как это сделать?
Минимальная версия (не поддерживает большие линии и не ждать отката-):
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);
for (@ARGV) {
open($fh{$_},"<",$_) || die;
# set fh non blocking($fh{$_});
my $flags;
fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
print $string;
}
# Sleep 1 ms
select(undef, undef, undef, 1/1000);
}
{
my %buffer;
sub non_blocking_read {
my $file = shift;
my $in = $fh{$file};
my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
# Find \n for full line
my $i = (rindex($buffer{$file},"\n")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
Полной версии: Важный код в первых 40 строках: Остальное хорошо протестированный код.
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
for (@ARGV) {
open($fh{$_},"<",$_) || die;
set_fh_non_blocking($fh{$_});
}
$ms = 1;
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
if($something_read) {
$ms = 0.1;
print $string;
}
}
$ms = exp_usleep($ms);
}
{
my %buffer;
my $ms;
sub non_blocking_read {
use POSIX qw(:errno_h);
my $file = shift;
my $in = $fh{$file};
my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
#### Well-tested code below
# Find \n or \r for full line
my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
(::rindex64(\$buffer{$file},"\r")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
sub rindex64 {
# Do rindex on strings > 2GB.
# rindex in Perl < v5.22 does not work for > 2GB
# Input:
# as rindex except STR which must be passed as a reference
# Output:
# as rindex
my $ref = shift;
my $match = shift;
my $pos = shift;
my $block_size = 2**31-1;
my $strlen = length($$ref);
# Default: search from end
$pos = defined $pos ? $pos : $strlen;
# No point in doing extra work if we don't need to.
if($strlen < $block_size) {
return rindex($$ref, $match, $pos);
}
my $matchlen = length($match);
my $ret;
my $offset = $pos - $block_size + $matchlen;
if($offset < 0) {
# The offset is less than a $block_size
# Set the $offset to 0 and
# Adjust block_size accordingly
$block_size = $block_size + $offset;
$offset = 0;
}
while($offset >= 0) {
$ret = rindex(
substr($$ref, $offset, $block_size),
$match);
if($ret != -1) {
return $ret + $offset;
}
$offset -= ($block_size - $matchlen - 1);
}
return -1;
}
sub exp_usleep {
# Sleep this many milliseconds.
# Input:
# $ms = milliseconds to sleep
# Returns:
# $ms + 10%
my $ms = shift;
select(undef, undef, undef, $ms/1000);
return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
может кто-то пожалуйста, объясните мне 'close' голосовать? Я не совсем понимаю, как это не подходит ... – stevieb
Попробуйте использовать sysread вместо чтения. sysread реализует небуферизацию чтения (т. е. похожее на чтение в режиме syscall), в то время как read реализует буферизованное чтение и пытается прочитать полный буфер (например, fread). Также я рекомендую использовать select, чтобы узнать, в каком файловом дескрипторе имеются данные. –
'eval" использовать Fcntl qw (: DEFAULT: flock); 1; "' не имеет смысла за пределами блока 'BEGIN'. Заменить '$ Global :: use {" Fcntl "} || = eval" использовать Fcntl qw (: DEFAULT: flock); 1; ";' с 'require Fcntl;'. [Это не поможет с вашей проблемой.] – ikegami