async: Simplify IPC::Pipe.

This commit removes the 'megaclass' code that IPC::Pipe has become to
support two different dataflow paths in a single class.

Instead, we use up to 2 different IPC objects in async mode to give the
dataflow needed.

Splitting this up required moving the message boundary detection code
back to IPC::Pipe. It is now expected that a given IPC subclass is able
to pass messages with message boundaries intact, and to block while
waiting for messages if concurrency is supported. IPC::Null already
maintains message boundaries and doesn't support concurrency.

Previously IPC and IPC::Pipe cooperated to perform message boundary
detection, by inserting and removing '\n' characters. This was always
kind of a hack. Now we simply pass the message length across the pipe
followed by the message, and ignore the idea completely in IPC.

The impetus for all of this had been to improve performance, but
unfortunately that hasn't happened here, although it's still a positive
change from the code simplicity front.
wilder
Michael Pyne 13 years ago
parent 3215ea0780
commit 2ed0aafee7
  1. 169
      kdesrc-build
  2. 52
      modules/ksb/IPC.pm
  3. 137
      modules/ksb/IPC/Pipe.pm

@ -56,13 +56,14 @@ use URI; # For git-clone snapshot support
use Sys::Hostname;
use Storable 'dclone';
use IO::Handle;
use IO::Select;
use Data::Dumper;
use ksb::IPC;
use ksb::Debug;
use ksb::Util;
use ksb::Version qw(scriptVersion);
use ksb::IPC::Pipe;
use ksb::IPC 0.20;
use ksb::IPC::Pipe 0.20;
use ksb::IPC::Null;
use ksb::KDEXMLReader;
use ksb::Updater::Git;
@ -1809,6 +1810,8 @@ sub handle_updates
$hadError = !$module->update($ipc, $ctx) || $hadError;
}
$ipc->close();
info ("<<< Update Complete >>>\n");
return $hadError;
}
@ -2304,6 +2307,8 @@ EOF
print "\n"; # Space things out
}
$ipc->close();
if ($outfile)
{
close STATUS_FILE;
@ -2575,112 +2580,73 @@ sub execute_command_line_program
# process to go from start to finish without undue interruption on it waiting
# to write out its status to the build process which is usually busy.
#
# First parameter is the IPC object to use.
# First parameter is the IPC object to use to send to build process.
# Second parameter is the IPC object to use to recv from update process.
#
# Returns 0 on success, non-zero on failure.
sub handle_monitoring
{
my $ipc = shift;
# Setup some file handle sets to use in the select() call.
# The out ones are copies of the in ones since select() overwrites its
# parameters.
my ($win, $wout, $rin, $rout);
($win, $rin) = ("") x 2; # Get rid of undefined warnings.
my ($ipcToBuild, $ipcFromUpdater) = @_;
my @msgs; # Message queue.
# Perl uses vec() to setup the file handle sets. Make some local
# subroutines to make it suck less in the real code.
sub setFdInSet($$$) {
my ($set, $fh, $inSet) = @_;
vec($set, fileno($fh), 1) = $inSet;
return $set;
}
# We will write to the build process and read from the update process.
sub fdIsChosen($$) {
my ($set, $fh) = @_;
return vec($set, fileno($fh), 1) == 1;
}
my $sendFH = $ipcToBuild->{fh} || croak_runtime('??? missing pipe to build proc');
my $recvFH = $ipcFromUpdater->{fh} || croak_runtime('??? missing pipe from monitor');
# We will write to the build process and read from the update process.
$win = setFdInSet($win, $ipc->{'toBuild'}, 1);
$rin = setFdInSet($rin, $ipc->{'fromSvn'}, 1);
my $readSelector = IO::Select->new($recvFH);
my $writeSelector = IO::Select->new($sendFH);
# Start the loop. We will be waiting on either $win or $rin. Whenever
# select() returns we must check both sets.
for(;;)
# Start the loop. We will be waiting on either read or write ends.
# Whenever select() returns we must check both sets.
while (
my ($readReadyRef, $writeReadyRef) =
IO::Select->select($readSelector, $writeSelector, undef))
{
my $numFound = select($rout = $rin, $wout = $win, undef, undef);
my $selectErr = $!;
if ($numFound == -1)
# Check for source updates first.
if (@{$readReadyRef})
{
error ("r[mon]: Monitor IPC error: r[$selectErr]");
return 1;
}
undef $@;
my $msg = eval { $ipcFromUpdater->receiveMessage(); };
# Check for svn updates first.
if (fdIsChosen($rout, $ipc->{'fromSvn'}))
{
my $msg = $ipc->receiveFromUpdater();
# undef msg indicates EOF, so check for exception obj specifically
die $@ if $@;
# undef can be returned on EOF as well as error. EOF means the
# other side is presumably done.
if (not defined $msg and not $!)
if (! defined $msg)
{
$rin = setFdInSet($rin, $ipc->{'fromSvn'}, 0);
$readSelector->remove($recvFH);
last; # Select no longer needed, just output to build.
}
# Don't check for $! first, it seems to always be set to EBADF.
# Probably I'm screwing up the select() call?
if (defined $msg)
{
push @msgs, $msg;
}
else
{
error ("r[mon]: Error reading update: r[b[$selectErr]");
return 1;
push @msgs, $msg;
# We may not have been waiting for write handle to be ready if
# we were blocking on an update from updater thread.
$writeSelector->add($sendFH) unless $writeSelector->exists($sendFH);
}
}
# Now check for build updates.
if (fdIsChosen($wout, $ipc->{'toBuild'}))
if (@{$writeReadyRef})
{
# If we're here the update is still going. If we have no messages
# to send wait for that first.
if (not @msgs)
{
my ($rout2, $numFound2);
$numFound2 = select($rout2 = $rin, undef, undef, undef);
$selectErr = $!;
if ($numFound2 == -1 and $selectErr)
{
error ("r[mon]: Monitor IPC error: r[$selectErr]");
return 1;
}
# Assume EOF can happen here.
my $msg = $ipc->receiveFromUpdater();
$selectErr = $!;
if (not defined $msg and $selectErr)
$writeSelector->remove($sendFH);
}
else
{
# Send the message (if we got one).
if (!$ipcToBuild->sendMessage(shift @msgs))
{
error ("r[mon]: Monitor IPC error, unexpected disappearance of updater.");
error ("r[mon]: Mysterious circumstances: r[b[$selectErr]");
error ("r[mon]: Build process stopped too soon! r[$!]");
return 1;
}
push @msgs, $msg if $msg;
}
# Send the message (if we got one).
if (scalar @msgs and !$ipc->sendToBuilder(shift @msgs))
{
error ("r[mon]: Build process stopped too soon! r[$!]");
return 1;
}
}
}
@ -2688,7 +2654,7 @@ sub handle_monitoring
# Send all remaining messages.
while (@msgs)
{
if (!$ipc->sendToBuilder(shift @msgs))
if (!$ipcToBuild->sendMessage(shift @msgs))
{
error ("r[mon]: Build process stopped too soon! r[$!]");
return 1;
@ -2715,32 +2681,41 @@ sub handle_async_build
my ($ipc, $ctx) = @_;
my $svnPid = fork;
if ($svnPid == 0)
{ # child
$ipc->setUpdater();
# Avoid calling close subroutines in more than one routine.
POSIX::_exit (handle_updates ($ipc, $ctx));
}
my $result = 0;
my $monitorPid = fork;
if ($monitorPid == 0) {
# child
my $updaterToMonitorIPC = ksb::IPC::Pipe->new();
my $updaterPid = fork;
# Parent
my $monPid = fork;
if ($monPid == 0)
{ # monitor
$ipc->setMonitor();
# Avoid calling close subroutines in more than one routine.
POSIX::_exit (handle_monitoring ($ipc));
}
if ($updaterPid) {
$updaterToMonitorIPC->setSender();
# Still the parent, let's do the build.
$ipc->setBuilder();
my $result = handle_build ($ipc, $ctx);
# Avoid calling close subroutines in more than one routine.
POSIX::_exit (handle_updates ($updaterToMonitorIPC, $ctx));
}
else {
$ipc->setSender();
$updaterToMonitorIPC->setReceiver();
# Avoid calling close subroutines in more than one routine.
my $result = handle_monitoring ($ipc, $updaterToMonitorIPC);
waitpid ($updaterPid, 0);
$result = 1 if $? != 0;
POSIX::_exit ($result);
}
}
else {
# Still the parent, let's do the build.
$ipc->setReceiver();
my $result = handle_build ($ipc, $ctx);
}
# Exit code is in $?.
waitpid ($svnPid, 0);
$result = 1 if $? != 0;
waitpid ($monitorPid, 0);
waitpid ($monPid, 0);
$result = 1 if $? != 0;
return $result;

@ -9,7 +9,7 @@ use strict;
use warnings;
use v5.10;
our $VERSION = '0.10';
our $VERSION = '0.20';
use ksb::Util; # make_exception, list_has
use ksb::Debug;
@ -35,11 +35,7 @@ sub new
my $class = shift;
# Must bless a hash ref since subclasses expect it.
my $ref = {};
$ref->{'residue'} = ''; # Define this for later.
$ref->{'updated'} = {}; # Tracks modules we've received status for.
return bless $ref, $class;
return bless {}, $class;
}
sub notifyUpdateSuccess
@ -178,13 +174,10 @@ sub waitForStreamStart
# procedure.
sub sendIPCMessage
{
# Use shift for these to empty @_ of the parameters.
my $self = shift;
my $ipcType = shift;
my $msg = shift;
my ($self, $ipcType, $msg) = @_;
my $encodedMsg = pack("l! a*", $ipcType, $msg);
return $self->sendMessage("$encodedMsg\n", @_);
return $self->sendMessage($encodedMsg);
}
# Static class function to unpack a message.
@ -216,37 +209,9 @@ sub receiveIPCMessage
my $self = shift;
my $outBuffer = shift;
# Check if we still have data left over from last read, and if it
# contains a full message.
if ($self->{'residue'} =~ /\n/)
{
my ($first, $remainder) = split(/\n/, $self->{'residue'}, 2);
$self->{'residue'} = defined $remainder ? $remainder : '';
return unpackMsg($first, $outBuffer);
}
# Read in messages enough to get to the message separator (\n)
my $msg = '';
while($msg !~ /\n/) {
my $msgFragment = $self->receiveMessage(@_);
$msg .= $msgFragment if defined $msgFragment;
last unless defined $msgFragment;
}
my $msg = $self->receiveMessage();
return undef if not defined $msg or $msg eq '';
# We may have residue still if we had a partial husk of a message, so
# append to the residue before breaking up the message. We assume a
# newline separates the messages.
$msg = $self->{'residue'} . $msg;
my ($first, $remainder) = split(/\n/, $msg, 2);
# Save rest for later.
$self->{'residue'} = defined $remainder ? $remainder : '';
return unpackMsg($first, $outBuffer);
return ($msg ? unpackMsg($msg, $outBuffer) : undef);
}
# These must be reimplemented. They must be able to handle scalars without
@ -268,4 +233,9 @@ sub supportsConcurrency
return 0;
}
# Should be reimplemented if default does not apply.
sub close
{
}
1;

@ -1,138 +1,121 @@
package ksb::IPC::Pipe;
# IPC class that uses pipes for communication. Basically requires forking two
# children in order to communicate with. Assumes that the two children are the
# update process and a monitor process which keeps the update going and informs
# us (the build process) of the status when we're ready to hear about it.
# IPC class that uses pipes in addition to forking for IPC.
use strict;
use warnings;
use v5.10;
our $VERSION = '0.10';
our $VERSION = '0.20';
use IO::Handle;
use ksb::IPC;
our @ISA = qw(ksb::IPC);
use ksb::Util qw(croak_internal croak_runtime);
use IO::Handle;
use IO::Pipe;
use Errno qw(EINTR);
sub new
{
my $class = shift;
my $self = $class->SUPER::new;
# Define file handles.
$self->{$_} = new IO::Handle foreach qw/fromMon toMon fromSvn toBuild/;
if (not pipe($self->{'fromSvn'}, $self->{'toMon'})or
not pipe($self->{'fromMon'}, $self->{'toBuild'}))
{
return undef;
}
$self->{fh} = IO::Pipe->new();
return bless $self, $class;
}
# Must override to send to correct filehandle.
sub notifyUpdateSuccess
# Call this to let the object know it will be the update process.
sub setSender
{
my $self = shift;
my ($module, $msg) = @_;
$self->sendIPCMessage(ksb::IPC::MODULE_SUCCESS, "$module,$msg", 'toMon');
}
$self->{fh}->writer();
# Closes the given list of filehandle ids.
sub closeFilehandles
{
my $self = shift;
my @fhs = @_;
for my $fh (@fhs) {
close $self->{$fh};
$self->{$fh} = 0;
}
# Disable buffering
$self->{fh}->autoflush(1);
}
# Call this to let the object know it will be the update process.
sub setUpdater
sub setReceiver
{
my $self = shift;
$self->closeFilehandles(qw/fromSvn fromMon toBuild/);
}
sub setBuilder
{
my $self = shift;
$self->closeFilehandles(qw/fromSvn toMon toBuild/);
}
$self->{fh}->reader();
sub setMonitor
{
my $self = shift;
$self->closeFilehandles(qw/toMon fromMon/);
# Disable buffering
$self->{fh}->autoflush(1);
}
# Reimplementation of ksb::IPC::supportsConcurrency
sub supportsConcurrency
{
return 1;
}
# First parameter is the ipc Type of the message to send.
# Second parameter is the module name (or other message).
# Third parameter is the file handle id to send on.
# Required reimplementation of ksb::IPC::sendMessage
# First parameter is the (encoded) message to send.
sub sendMessage
{
my $self = shift;
my ($msg, $fh) = @_;
my ($self, $msg) = @_;
return syswrite ($self->{$fh}, $msg);
}
# Since streaming does not provide message boundaries, we will insert
# ourselves, by sending a 2-byte unsigned length, then the message.
my $encodedMsg = pack ("S a*", length($msg), $msg);
# Override of sendIPCMessage to specify which filehandle to send to.
sub sendIPCMessage
{
my $self = shift;
push @_, 'toMon'; # Add filehandle to args.
if (length($encodedMsg) != $self->{fh}->syswrite($encodedMsg)) {
croak_runtime("Unable to write full msg to pipe: $!");
}
return $self->SUPER::sendIPCMessage(@_);
return 1;
}
# Used by monitor process, so no message encoding or decoding required.
sub sendToBuilder
sub _readNumberOfBytes
{
my ($self, $msg) = @_;
return $self->sendMessage($msg, 'toBuild');
}
my ($self, $length) = @_;
# First parameter is a reference to the output buffer.
# Second parameter is the id of the filehandle to read from.
sub receiveMessage
{
my $self = shift;
my $fh = shift;
my $value;
my $fh = $self->{fh};
my $readLength = 0;
my $result;
while ($readLength < $length) {
$! = 0; # Reset errno
my $curLength = $fh->sysread ($result, ($length - $readLength), $readLength);
if ($curLength > $length) {
croak_runtime("sysread read too much: $curLength vs $length")
}
undef $!; # Clear error marker
my $result = sysread ($self->{$fh}, $value, 256);
# EINTR is OK, but check early so we don't trip 0-length check
next if (!defined $curLength && $!{EINTR});
return if (defined $curLength && $curLength == 0);
croak_runtime ("Error reading $length bytes from pipe: $!") if !$curLength;
return undef if not $result;
return $value;
$readLength += $curLength;
}
return $result;
}
# Override of receiveIPCMessage to specify which filehandle to receive from.
sub receiveIPCMessage
# Required reimplementation of ksb::IPC::receiveMessage
sub receiveMessage
{
my $self = shift;
push @_, 'fromMon'; # Add filehandle to args.
return $self->SUPER::receiveIPCMessage(@_);
# Read unsigned short with msg length, then the message
my $msgLength = $self->_readNumberOfBytes(2);
return if !$msgLength;
$msgLength = unpack ("S", $msgLength); # Decode to Perl type
return $self->_readNumberOfBytes($msgLength);
}
# Used by monitor process, so no message encoding or decoding required.
sub receiveFromUpdater
sub close
{
my $self = shift;
return $self->receiveMessage('fromSvn');
$self->{fh}->close();
}
1;

Loading…
Cancel
Save