From 2ed0aafee761158251731406c5cfac9ce479992d Mon Sep 17 00:00:00 2001 From: Michael Pyne Date: Sat, 9 Feb 2013 16:52:25 -0500 Subject: [PATCH] async: Simplify IPC::Pipe. This commit removes the 'megaclass' code that IPC::Pipe has become to support two different dataflow paths in a single class. Instead, we use up to 2 different IPC objects in async mode to give the dataflow needed. Splitting this up required moving the message boundary detection code back to IPC::Pipe. It is now expected that a given IPC subclass is able to pass messages with message boundaries intact, and to block while waiting for messages if concurrency is supported. IPC::Null already maintains message boundaries and doesn't support concurrency. Previously IPC and IPC::Pipe cooperated to perform message boundary detection, by inserting and removing '\n' characters. This was always kind of a hack. Now we simply pass the message length across the pipe followed by the message, and ignore the idea completely in IPC. The impetus for all of this had been to improve performance, but unfortunately that hasn't happened here, although it's still a positive change from the code simplicity front. --- kdesrc-build | 169 +++++++++++++++++----------------------- modules/ksb/IPC.pm | 52 +++---------- modules/ksb/IPC/Pipe.pm | 137 ++++++++++++++------------------ 3 files changed, 143 insertions(+), 215 deletions(-) diff --git a/kdesrc-build b/kdesrc-build index e331ac1..27b51ac 100755 --- a/kdesrc-build +++ b/kdesrc-build @@ -56,13 +56,14 @@ use URI; # For git-clone snapshot support use Sys::Hostname; use Storable 'dclone'; use IO::Handle; +use IO::Select; use Data::Dumper; -use ksb::IPC; use ksb::Debug; use ksb::Util; use ksb::Version qw(scriptVersion); -use ksb::IPC::Pipe; +use ksb::IPC 0.20; +use ksb::IPC::Pipe 0.20; use ksb::IPC::Null; use ksb::KDEXMLReader; use ksb::Updater::Git; @@ -1809,6 +1810,8 @@ sub handle_updates $hadError = !$module->update($ipc, $ctx) || $hadError; } + $ipc->close(); + info ("<<< Update Complete >>>\n"); return $hadError; } @@ -2304,6 +2307,8 @@ EOF print "\n"; # Space things out } + $ipc->close(); + if ($outfile) { close STATUS_FILE; @@ -2575,112 +2580,73 @@ sub execute_command_line_program # process to go from start to finish without undue interruption on it waiting # to write out its status to the build process which is usually busy. # -# First parameter is the IPC object to use. +# First parameter is the IPC object to use to send to build process. +# Second parameter is the IPC object to use to recv from update process. # # Returns 0 on success, non-zero on failure. sub handle_monitoring { - my $ipc = shift; - - # Setup some file handle sets to use in the select() call. - # The out ones are copies of the in ones since select() overwrites its - # parameters. - my ($win, $wout, $rin, $rout); - ($win, $rin) = ("") x 2; # Get rid of undefined warnings. + my ($ipcToBuild, $ipcFromUpdater) = @_; my @msgs; # Message queue. - # Perl uses vec() to setup the file handle sets. Make some local - # subroutines to make it suck less in the real code. - sub setFdInSet($$$) { - my ($set, $fh, $inSet) = @_; - vec($set, fileno($fh), 1) = $inSet; - return $set; - } + # We will write to the build process and read from the update process. - sub fdIsChosen($$) { - my ($set, $fh) = @_; - return vec($set, fileno($fh), 1) == 1; - } + my $sendFH = $ipcToBuild->{fh} || croak_runtime('??? missing pipe to build proc'); + my $recvFH = $ipcFromUpdater->{fh} || croak_runtime('??? missing pipe from monitor'); - # We will write to the build process and read from the update process. - $win = setFdInSet($win, $ipc->{'toBuild'}, 1); - $rin = setFdInSet($rin, $ipc->{'fromSvn'}, 1); + my $readSelector = IO::Select->new($recvFH); + my $writeSelector = IO::Select->new($sendFH); - # Start the loop. We will be waiting on either $win or $rin. Whenever - # select() returns we must check both sets. - for(;;) + # Start the loop. We will be waiting on either read or write ends. + # Whenever select() returns we must check both sets. + while ( + my ($readReadyRef, $writeReadyRef) = + IO::Select->select($readSelector, $writeSelector, undef)) { - my $numFound = select($rout = $rin, $wout = $win, undef, undef); - my $selectErr = $!; - - if ($numFound == -1) + # Check for source updates first. + if (@{$readReadyRef}) { - error ("r[mon]: Monitor IPC error: r[$selectErr]"); - return 1; - } + undef $@; + my $msg = eval { $ipcFromUpdater->receiveMessage(); }; - # Check for svn updates first. - if (fdIsChosen($rout, $ipc->{'fromSvn'})) - { - my $msg = $ipc->receiveFromUpdater(); + # undef msg indicates EOF, so check for exception obj specifically + die $@ if $@; # undef can be returned on EOF as well as error. EOF means the # other side is presumably done. - if (not defined $msg and not $!) + if (! defined $msg) { - $rin = setFdInSet($rin, $ipc->{'fromSvn'}, 0); + $readSelector->remove($recvFH); last; # Select no longer needed, just output to build. } - - # Don't check for $! first, it seems to always be set to EBADF. - # Probably I'm screwing up the select() call? - if (defined $msg) - { - push @msgs, $msg; - } else { - error ("r[mon]: Error reading update: r[b[$selectErr]"); - return 1; + push @msgs, $msg; + + # We may not have been waiting for write handle to be ready if + # we were blocking on an update from updater thread. + $writeSelector->add($sendFH) unless $writeSelector->exists($sendFH); } } # Now check for build updates. - if (fdIsChosen($wout, $ipc->{'toBuild'})) + if (@{$writeReadyRef}) { # If we're here the update is still going. If we have no messages # to send wait for that first. if (not @msgs) { - my ($rout2, $numFound2); - $numFound2 = select($rout2 = $rin, undef, undef, undef); - $selectErr = $!; - - if ($numFound2 == -1 and $selectErr) - { - error ("r[mon]: Monitor IPC error: r[$selectErr]"); - return 1; - } - - # Assume EOF can happen here. - my $msg = $ipc->receiveFromUpdater(); - $selectErr = $!; - if (not defined $msg and $selectErr) + $writeSelector->remove($sendFH); + } + else + { + # Send the message (if we got one). + if (!$ipcToBuild->sendMessage(shift @msgs)) { - error ("r[mon]: Monitor IPC error, unexpected disappearance of updater."); - error ("r[mon]: Mysterious circumstances: r[b[$selectErr]"); + error ("r[mon]: Build process stopped too soon! r[$!]"); return 1; } - - push @msgs, $msg if $msg; - } - - # Send the message (if we got one). - if (scalar @msgs and !$ipc->sendToBuilder(shift @msgs)) - { - error ("r[mon]: Build process stopped too soon! r[$!]"); - return 1; } } } @@ -2688,7 +2654,7 @@ sub handle_monitoring # Send all remaining messages. while (@msgs) { - if (!$ipc->sendToBuilder(shift @msgs)) + if (!$ipcToBuild->sendMessage(shift @msgs)) { error ("r[mon]: Build process stopped too soon! r[$!]"); return 1; @@ -2715,32 +2681,41 @@ sub handle_async_build my ($ipc, $ctx) = @_; - my $svnPid = fork; - if ($svnPid == 0) - { # child - $ipc->setUpdater(); - # Avoid calling close subroutines in more than one routine. - POSIX::_exit (handle_updates ($ipc, $ctx)); - } + my $result = 0; + my $monitorPid = fork; + if ($monitorPid == 0) { + # child + my $updaterToMonitorIPC = ksb::IPC::Pipe->new(); + my $updaterPid = fork; - # Parent - my $monPid = fork; - if ($monPid == 0) - { # monitor - $ipc->setMonitor(); - # Avoid calling close subroutines in more than one routine. - POSIX::_exit (handle_monitoring ($ipc)); - } + if ($updaterPid) { + $updaterToMonitorIPC->setSender(); - # Still the parent, let's do the build. - $ipc->setBuilder(); - my $result = handle_build ($ipc, $ctx); + # Avoid calling close subroutines in more than one routine. + POSIX::_exit (handle_updates ($updaterToMonitorIPC, $ctx)); + } + else { + $ipc->setSender(); + $updaterToMonitorIPC->setReceiver(); + + # Avoid calling close subroutines in more than one routine. + my $result = handle_monitoring ($ipc, $updaterToMonitorIPC); + + waitpid ($updaterPid, 0); + $result = 1 if $? != 0; + + POSIX::_exit ($result); + } + } + else { + # Still the parent, let's do the build. + $ipc->setReceiver(); + my $result = handle_build ($ipc, $ctx); + } # Exit code is in $?. - waitpid ($svnPid, 0); - $result = 1 if $? != 0; + waitpid ($monitorPid, 0); - waitpid ($monPid, 0); $result = 1 if $? != 0; return $result; diff --git a/modules/ksb/IPC.pm b/modules/ksb/IPC.pm index 00dcb72..b760177 100644 --- a/modules/ksb/IPC.pm +++ b/modules/ksb/IPC.pm @@ -9,7 +9,7 @@ use strict; use warnings; use v5.10; -our $VERSION = '0.10'; +our $VERSION = '0.20'; use ksb::Util; # make_exception, list_has use ksb::Debug; @@ -35,11 +35,7 @@ sub new my $class = shift; # Must bless a hash ref since subclasses expect it. - my $ref = {}; - $ref->{'residue'} = ''; # Define this for later. - $ref->{'updated'} = {}; # Tracks modules we've received status for. - - return bless $ref, $class; + return bless {}, $class; } sub notifyUpdateSuccess @@ -178,13 +174,10 @@ sub waitForStreamStart # procedure. sub sendIPCMessage { - # Use shift for these to empty @_ of the parameters. - my $self = shift; - my $ipcType = shift; - my $msg = shift; + my ($self, $ipcType, $msg) = @_; my $encodedMsg = pack("l! a*", $ipcType, $msg); - return $self->sendMessage("$encodedMsg\n", @_); + return $self->sendMessage($encodedMsg); } # Static class function to unpack a message. @@ -216,37 +209,9 @@ sub receiveIPCMessage my $self = shift; my $outBuffer = shift; - # Check if we still have data left over from last read, and if it - # contains a full message. - if ($self->{'residue'} =~ /\n/) - { - my ($first, $remainder) = split(/\n/, $self->{'residue'}, 2); - $self->{'residue'} = defined $remainder ? $remainder : ''; - - return unpackMsg($first, $outBuffer); - } - - # Read in messages enough to get to the message separator (\n) - my $msg = ''; - while($msg !~ /\n/) { - my $msgFragment = $self->receiveMessage(@_); - $msg .= $msgFragment if defined $msgFragment; - - last unless defined $msgFragment; - } + my $msg = $self->receiveMessage(); - return undef if not defined $msg or $msg eq ''; - - # We may have residue still if we had a partial husk of a message, so - # append to the residue before breaking up the message. We assume a - # newline separates the messages. - $msg = $self->{'residue'} . $msg; - my ($first, $remainder) = split(/\n/, $msg, 2); - - # Save rest for later. - $self->{'residue'} = defined $remainder ? $remainder : ''; - - return unpackMsg($first, $outBuffer); + return ($msg ? unpackMsg($msg, $outBuffer) : undef); } # These must be reimplemented. They must be able to handle scalars without @@ -268,4 +233,9 @@ sub supportsConcurrency return 0; } +# Should be reimplemented if default does not apply. +sub close +{ +} + 1; diff --git a/modules/ksb/IPC/Pipe.pm b/modules/ksb/IPC/Pipe.pm index 5c7aac9..284e308 100644 --- a/modules/ksb/IPC/Pipe.pm +++ b/modules/ksb/IPC/Pipe.pm @@ -1,138 +1,121 @@ package ksb::IPC::Pipe; -# IPC class that uses pipes for communication. Basically requires forking two -# children in order to communicate with. Assumes that the two children are the -# update process and a monitor process which keeps the update going and informs -# us (the build process) of the status when we're ready to hear about it. +# IPC class that uses pipes in addition to forking for IPC. use strict; use warnings; use v5.10; -our $VERSION = '0.10'; +our $VERSION = '0.20'; -use IO::Handle; use ksb::IPC; our @ISA = qw(ksb::IPC); +use ksb::Util qw(croak_internal croak_runtime); + +use IO::Handle; +use IO::Pipe; +use Errno qw(EINTR); + sub new { my $class = shift; my $self = $class->SUPER::new; # Define file handles. - $self->{$_} = new IO::Handle foreach qw/fromMon toMon fromSvn toBuild/; - - if (not pipe($self->{'fromSvn'}, $self->{'toMon'})or - not pipe($self->{'fromMon'}, $self->{'toBuild'})) - { - return undef; - } + $self->{fh} = IO::Pipe->new(); return bless $self, $class; } -# Must override to send to correct filehandle. -sub notifyUpdateSuccess +# Call this to let the object know it will be the update process. +sub setSender { my $self = shift; - my ($module, $msg) = @_; - $self->sendIPCMessage(ksb::IPC::MODULE_SUCCESS, "$module,$msg", 'toMon'); -} + $self->{fh}->writer(); -# Closes the given list of filehandle ids. -sub closeFilehandles -{ - my $self = shift; - my @fhs = @_; - - for my $fh (@fhs) { - close $self->{$fh}; - $self->{$fh} = 0; - } + # Disable buffering + $self->{fh}->autoflush(1); } -# Call this to let the object know it will be the update process. -sub setUpdater +sub setReceiver { my $self = shift; - $self->closeFilehandles(qw/fromSvn fromMon toBuild/); -} -sub setBuilder -{ - my $self = shift; - $self->closeFilehandles(qw/fromSvn toMon toBuild/); -} + $self->{fh}->reader(); -sub setMonitor -{ - my $self = shift; - $self->closeFilehandles(qw/toMon fromMon/); + # Disable buffering + $self->{fh}->autoflush(1); } +# Reimplementation of ksb::IPC::supportsConcurrency sub supportsConcurrency { return 1; } -# First parameter is the ipc Type of the message to send. -# Second parameter is the module name (or other message). -# Third parameter is the file handle id to send on. +# Required reimplementation of ksb::IPC::sendMessage +# First parameter is the (encoded) message to send. sub sendMessage { - my $self = shift; - my ($msg, $fh) = @_; + my ($self, $msg) = @_; - return syswrite ($self->{$fh}, $msg); -} + # Since streaming does not provide message boundaries, we will insert + # ourselves, by sending a 2-byte unsigned length, then the message. + my $encodedMsg = pack ("S a*", length($msg), $msg); -# Override of sendIPCMessage to specify which filehandle to send to. -sub sendIPCMessage -{ - my $self = shift; - push @_, 'toMon'; # Add filehandle to args. + if (length($encodedMsg) != $self->{fh}->syswrite($encodedMsg)) { + croak_runtime("Unable to write full msg to pipe: $!"); + } - return $self->SUPER::sendIPCMessage(@_); + return 1; } -# Used by monitor process, so no message encoding or decoding required. -sub sendToBuilder +sub _readNumberOfBytes { - my ($self, $msg) = @_; - return $self->sendMessage($msg, 'toBuild'); -} + my ($self, $length) = @_; -# First parameter is a reference to the output buffer. -# Second parameter is the id of the filehandle to read from. -sub receiveMessage -{ - my $self = shift; - my $fh = shift; - my $value; + my $fh = $self->{fh}; + my $readLength = 0; + my $result; + + while ($readLength < $length) { + $! = 0; # Reset errno + + my $curLength = $fh->sysread ($result, ($length - $readLength), $readLength); + if ($curLength > $length) { + croak_runtime("sysread read too much: $curLength vs $length") + } - undef $!; # Clear error marker - my $result = sysread ($self->{$fh}, $value, 256); + # EINTR is OK, but check early so we don't trip 0-length check + next if (!defined $curLength && $!{EINTR}); + return if (defined $curLength && $curLength == 0); + croak_runtime ("Error reading $length bytes from pipe: $!") if !$curLength; - return undef if not $result; - return $value; + $readLength += $curLength; + } + + return $result; } -# Override of receiveIPCMessage to specify which filehandle to receive from. -sub receiveIPCMessage +# Required reimplementation of ksb::IPC::receiveMessage +sub receiveMessage { my $self = shift; - push @_, 'fromMon'; # Add filehandle to args. - return $self->SUPER::receiveIPCMessage(@_); + # Read unsigned short with msg length, then the message + my $msgLength = $self->_readNumberOfBytes(2); + return if !$msgLength; + + $msgLength = unpack ("S", $msgLength); # Decode to Perl type + return $self->_readNumberOfBytes($msgLength); } -# Used by monitor process, so no message encoding or decoding required. -sub receiveFromUpdater +sub close { my $self = shift; - return $self->receiveMessage('fromSvn'); + $self->{fh}->close(); } 1;