[interchange-cvs] interchange - racke modified 4 files

interchange-cvs at icdevgroup.org interchange-cvs at icdevgroup.org
Sun Jul 27 13:06:00 EDT 2003


User:      racke
Date:      2003-07-27 16:06:53 GMT
Modified:  .        WHATSNEW
Modified:  lib/Vend Config.pm Control.pm Server.pm
Log:
* Jobs are now listed and queued in $Global::RunDir/jobs instead
  of $Global::RunDir/reconfig

* Add global Jobs configuration directive:

  Jobs MaxLifetime 3600
  Jobs MaxServers  3

  After a job has been running for MaxLifetime seconds, it will be
  removed by next housekeeping run. The default MaxLifetime is 10
  minutes. This setting is only available if PIDcheck has been set.

  MaxServers is the number of jobs allowed to be run simulatenously,
  which should be significantly smaller than the value in the
  MaxServers directive to avoid unaccessibility of Interchange for
  users. The default for MaxServers is 1.

TODO:

* don't check jobs on any housekeeping run

* introduce delay for jobs which serves as method to expire jobs
  (delay = 4 minutes for jobs called from cron every 5 minutes)

Revision  Changes    Path
2.146     +17 -0     interchange/WHATSNEW


rev 2.146, prev_rev 2.145
Index: WHATSNEW
===================================================================
RCS file: /var/cvs/interchange/WHATSNEW,v
retrieving revision 2.145
retrieving revision 2.146
diff -u -r2.145 -r2.146
--- WHATSNEW	21 Jul 2003 18:40:06 -0000	2.145
+++ WHATSNEW	27 Jul 2003 16:06:53 -0000	2.146
@@ -315,6 +315,23 @@
 
 * filter option added for Job output
 
+* Jobs are now listed and queued in $Global::RunDir/jobs instead
+  of $Global::RunDir/reconfig
+
+* Add global Jobs configuration directive:
+ 
+  Jobs MaxLifetime 3600
+  Jobs MaxServers  3
+
+  After a job has been running for MaxLifetime seconds, it will be
+  removed by next housekeeping run. The default MaxLifetime is 10
+  minutes. This setting is only available if PIDcheck has been set.
+
+  MaxServers is the number of jobs allowed to be run simulatenously,
+  which should be significantly smaller than the value in the
+  MaxServers directive to avoid unaccessibility of Interchange for
+  users. The default for MaxServers is 1.
+  
 SQL Parsing
 -----------
 



2.124     +3 -2      interchange/lib/Vend/Config.pm


rev 2.124, prev_rev 2.123
Index: Config.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Config.pm,v
retrieving revision 2.123
retrieving revision 2.124
diff -u -r2.123 -r2.124
--- Config.pm	26 Jul 2003 20:25:43 -0000	2.123
+++ Config.pm	27 Jul 2003 16:06:53 -0000	2.124
@@ -1,6 +1,6 @@
 # Vend::Config - Configure Interchange
 #
-# $Id: Config.pm,v 2.123 2003/07/26 20:25:43 mheins Exp $
+# $Id: Config.pm,v 2.124 2003/07/27 16:06:53 racke Exp $
 #
 # Copyright (C) 2002-2003 Interchange Development Group
 # Copyright (C) 1996-2002 Red Hat, Inc.
@@ -48,7 +48,7 @@
 use Vend::File;
 use Vend::Data;
 
-$VERSION = substr(q$Revision: 2.123 $, 10);
+$VERSION = substr(q$Revision: 2.124 $, 10);
 
 my %CDname;
 my %CPname;
@@ -305,6 +305,7 @@
 	['SOAP_StartServers', 'integer',          1],
 	['SOAP_Host',         undef,              'localhost 127.0.0.1'],
 	['SOAP_Control',     'action',           ''],
+	['Jobs',		 	 'hash',     	 	 'MaxLifetime 600 MaxServers 1'],
 	['IPCsocket',		 undef,	     	 	 "$Global::VendRoot/etc/socket.ipc"],
 	['HouseKeeping',     'integer',          60],
 	['Mall',	          'yesno',           'No'],



2.8       +9 -3      interchange/lib/Vend/Control.pm


rev 2.8, prev_rev 2.7
Index: Control.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Control.pm,v
retrieving revision 2.7
retrieving revision 2.8
diff -u -r2.7 -r2.8
--- Control.pm	18 Jun 2003 17:34:44 -0000	2.7
+++ Control.pm	27 Jul 2003 16:06:53 -0000	2.8
@@ -1,6 +1,6 @@
 # Vend::Control - Routines that alter the running Interchange daemon
 # 
-# $Id: Control.pm,v 2.7 2003/06/18 17:34:44 jon Exp $
+# $Id: Control.pm,v 2.8 2003/07/27 16:06:53 racke Exp $
 #
 # Copyright (C) 2002-2003 Interchange Development Group
 # Copyright (C) 1996-2002 Red Hat, Inc.
@@ -55,14 +55,20 @@
 	shift;
 	$Vend::mode = 'jobs';
 	my $arg = shift;
-	my ($cat, $job) = split /\s*=\s*/, $arg, 2;
+	my ($cat, $job, $delay) = split /\s*=\s*/, $arg, 3;
+	
 	$Vend::JobsCat = $cat;
+	if ($delay =~ /^(\d+)$/) {
+		$delay + time;
+	} else {
+		$delay = 0;
+	}
 #::logGlobal("signal_jobs: called cat=$cat job=$job");
 	$job = join ",", $job, $Vend::JobsJob;
 	$job =~ s/^,+//;
 	$job =~ s/,+$//;
 	$Vend::JobsJob = $job;
-	Vend::Util::writefile("$Global::RunDir/restart", "jobs $cat $job\n");
+	Vend::Util::writefile("$Global::RunDir/jobs", "jobs $cat $delay $job\n");
 #::logGlobal("signal_jobs: wrote file, ready to control_interchange");
 	control_interchange('jobs', 'HUP');
 }



2.38      +92 -9     interchange/lib/Vend/Server.pm


rev 2.38, prev_rev 2.37
Index: Server.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Server.pm,v
retrieving revision 2.37
retrieving revision 2.38
diff -u -r2.37 -r2.38
--- Server.pm	26 Jul 2003 21:55:58 -0000	2.37
+++ Server.pm	27 Jul 2003 16:06:53 -0000	2.38
@@ -1,6 +1,6 @@
 # Vend::Server - Listen for Interchange CGI requests as a background server
 #
-# $Id: Server.pm,v 2.37 2003/07/26 21:55:58 mheins Exp $
+# $Id: Server.pm,v 2.38 2003/07/27 16:06:53 racke Exp $
 #
 # Copyright (C) 2002-2003 Interchange Development Group
 # Copyright (C) 1996-2002 Red Hat, Inc.
@@ -26,7 +26,7 @@
 package Vend::Server;
 
 use vars qw($VERSION);
-$VERSION = substr(q$Revision: 2.37 $, 10);
+$VERSION = substr(q$Revision: 2.38 $, 10);
 
 use POSIX qw(setsid strftime);
 use Vend::Util;
@@ -865,6 +865,8 @@
 			%Page_pids
 			$SOAP_servers
 			%SOAP_pids
+			$Job_servers
+			%Lifetime
 			$vector
 			$p_vector
 			$s_vector
@@ -875,6 +877,8 @@
 }
 $Num_servers = 0;
 $SOAP_servers = 0;
+$Job_servers = 0;
+%Lifetime = ();
 
 # might also trap: QUIT
 
@@ -963,7 +967,7 @@
 	rand();
 	$Last_housekeeping = $now;
 
-	my ($c, $num,$reconfig, $restart, @files);
+	my ($c, $num,$reconfig, $restart, $jobs, @files);
 	my @pids;
 
 		if($Global::PreFork) {
@@ -1013,6 +1017,7 @@
 		($reconfig) = grep $_ eq 'reconfig', @files;
 		($restart) = grep $_ eq 'restart', @files
 			if $Signal_Restart || $Global::Windows;
+		($jobs) = grep $_ eq 'jobs', @files;
 		if($Global::PIDcheck) {
 			$Num_servers = 0;
 			@pids = grep /^pid\.\d+$/, @files;
@@ -1052,11 +1057,6 @@
 					{
 						::remove_catalog($1);
 					}
-					elsif( $directive =~ /^jobs$/i) {
-						my ($cat, @jobs) = grep /\S/, split /[\s,\0]+/, $value;
-#::logGlobal("restart line found value='$value'");
-						run_jobs($cat, @jobs);
-					}
 					else {
 						::change_global_directive($directive, $value);
 					}
@@ -1124,6 +1124,66 @@
 			$respawn = 1;
 			
 		}
+		if (defined $jobs) {
+			my (@scheduled_jobs, @queued_jobs);
+			open(Vend::Server::JOBS, "+<$Global::RunDir/jobs")
+				or die "open $Global::RunDir/jobs: $!\n";
+			lockfile(\*Vend::Server::JOBS, 1, 1)
+				or die "lock $Global::RunDir/jobs: $!\n";
+			while(<Vend::Server::JOBS>) {
+				chomp;
+				my ($directive,$value) = split /\s+/, $_, 2;
+				my ($cat, $delay, @jobs) = grep /\S/, split /[\s,\0]+/, $value;
+				if ($delay && $delay < time()) {
+					# job expired
+#::logDebug ("Jobs @jobs expired ($delay vs $now)\n");
+				} elsif ($Job_servers++ >= $Global::Jobs->{MaxServers}) {
+						# no slot for job
+						$Job_servers--;
+#::logDebug ("Jobs @jobs queued, already %d jobs running/scheduled", $Job_servers);
+                        push(@queued_jobs, "$directive $value");
+                } else {
+#::logDebug ("Scheduled job @jobs for running");
+					push (@scheduled_jobs, [$cat, @jobs]);
+				}
+                if (@queued_jobs > 20) {
+					::logGlobal({ level => 'notice' }, "Excessive size of job queue, stopping");
+					last;
+				}
+			}
+
+			truncate(Vend::Server::JOBS, 0)
+				or die "truncate $Global::RunDir/jobs: $!\n";
+            seek(Vend::Server::JOBS, 0, 0)
+                or die "seek $Global::RunDir/jobs: $!\n";
+
+            if (@queued_jobs) {
+#::logDebug("Size of queue $$: %s", scalar(@queued_jobs));
+				print Vend::Server::JOBS join("\n", @queued_jobs, '');
+                unlockfile(\*Vend::Server::JOBS)
+					or die "unlock $Global::RunDir/jobs: $!\n";
+				close(Vend::Server::JOBS)
+					or die "close $Global::RunDir/jobs: $!\n";
+			} else {
+				unlockfile(\*Vend::Server::JOBS)
+					or die "unlock $Global::RunDir/jobs: $!\n";
+				close(Vend::Server::JOBS)
+					or die "close $Global::RunDir/jobs: $!\n";
+				unlink "$Global::RunDir/jobs"
+					or die "unlink $Global::RunDir/jobs: $!\n";
+			}
+
+			# now we run the scheduled jobs
+			for my $jobref (@scheduled_jobs) {
+				eval {
+					run_jobs (@$jobref);
+				};
+
+				if($@) {
+					::logGlobal({ level => 'notice' }, $@);
+				}
+			}
+		}
 
 		if($respawn) {
 			if($Global::PreFork) {
@@ -1169,8 +1229,20 @@
             my $fn = "$Global::RunDir/$_";
             ($Num_servers--, next) if ! -f $fn;
             my $runtime = $now - (stat(_))[9];
-            next if $runtime < $Global::PIDcheck;
             s/^pid\.//;
+            my ($lifetime, $isjob);
+            if (exists $Lifetime{$_}) {
+ 				$lifetime = $Lifetime{$_};
+				$isjob = 1;
+			} else {
+				$lifetime = $Global::PIDcheck;
+			}
+            next if $runtime < $lifetime;
+			if ($isjob) {
+	            delete $Lifetime{$_};
+    	        $Job_servers--;
+			}
+
             if(kill 9, $_) {
                 unlink $fn and $Num_servers--;
                 ::logGlobal({ level => 'error' }, "hammered PID %s running %s seconds", $_, $runtime);
@@ -1830,6 +1902,15 @@
 		$SOAP_servers--;
 		start_soap(undef, 1);
 	}
+	elsif ($thing =~ /^running job (\d+)/) {
+#::logDebug("registered job pid $1");
+		$Lifetime{$1} = $Global::Jobs->{MaxLifetime} || 30;
+	}
+	elsif ($thing =~ /^finishing job (\d+)/) {
+#::logDebug("finished job pid $1");
+		$Job_servers--;
+		delete $Lifetime{$1};
+	}
 	elsif($thing =~ /^\d+$/) {
 		close $fh;
 		$Num_servers++;
@@ -2293,6 +2374,7 @@
 		#fork again
 		unless ($pid = fork) {
 
+			send_ipc("running job $$");
 			reset_per_fork();
 			$::Instance = {};
 			eval { 
@@ -2307,6 +2389,7 @@
 					if defined $Vend::Cfg->{ErrorFile};
 			}
 			clean_up_after_fork();
+			send_ipc("finishing job $$");
 
 			undef $::Instance;
 			select(undef,undef,undef,0.050) until &$ppidsub == 1;







More information about the interchange-cvs mailing list