[interchange-cvs] interchange - racke modified 4 files
interchange-cvs at icdevgroup.org
interchange-cvs at icdevgroup.org
Sun Jul 27 13:06:00 EDT 2003
User: racke
Date: 2003-07-27 16:06:53 GMT
Modified: . WHATSNEW
Modified: lib/Vend Config.pm Control.pm Server.pm
Log:
* Jobs are now listed and queued in $Global::RunDir/jobs instead
of $Global::RunDir/reconfig
* Add global Jobs configuration directive:
Jobs MaxLifetime 3600
Jobs MaxServers 3
After a job has been running for MaxLifetime seconds, it will be
removed by next housekeeping run. The default MaxLifetime is 10
minutes. This setting is only available if PIDcheck has been set.
MaxServers is the number of jobs allowed to be run simulatenously,
which should be significantly smaller than the value in the
MaxServers directive to avoid unaccessibility of Interchange for
users. The default for MaxServers is 1.
TODO:
* don't check jobs on any housekeeping run
* introduce delay for jobs which serves as method to expire jobs
(delay = 4 minutes for jobs called from cron every 5 minutes)
Revision Changes Path
2.146 +17 -0 interchange/WHATSNEW
rev 2.146, prev_rev 2.145
Index: WHATSNEW
===================================================================
RCS file: /var/cvs/interchange/WHATSNEW,v
retrieving revision 2.145
retrieving revision 2.146
diff -u -r2.145 -r2.146
--- WHATSNEW 21 Jul 2003 18:40:06 -0000 2.145
+++ WHATSNEW 27 Jul 2003 16:06:53 -0000 2.146
@@ -315,6 +315,23 @@
* filter option added for Job output
+* Jobs are now listed and queued in $Global::RunDir/jobs instead
+ of $Global::RunDir/reconfig
+
+* Add global Jobs configuration directive:
+
+ Jobs MaxLifetime 3600
+ Jobs MaxServers 3
+
+ After a job has been running for MaxLifetime seconds, it will be
+ removed by next housekeeping run. The default MaxLifetime is 10
+ minutes. This setting is only available if PIDcheck has been set.
+
+ MaxServers is the number of jobs allowed to be run simulatenously,
+ which should be significantly smaller than the value in the
+ MaxServers directive to avoid unaccessibility of Interchange for
+ users. The default for MaxServers is 1.
+
SQL Parsing
-----------
2.124 +3 -2 interchange/lib/Vend/Config.pm
rev 2.124, prev_rev 2.123
Index: Config.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Config.pm,v
retrieving revision 2.123
retrieving revision 2.124
diff -u -r2.123 -r2.124
--- Config.pm 26 Jul 2003 20:25:43 -0000 2.123
+++ Config.pm 27 Jul 2003 16:06:53 -0000 2.124
@@ -1,6 +1,6 @@
# Vend::Config - Configure Interchange
#
-# $Id: Config.pm,v 2.123 2003/07/26 20:25:43 mheins Exp $
+# $Id: Config.pm,v 2.124 2003/07/27 16:06:53 racke Exp $
#
# Copyright (C) 2002-2003 Interchange Development Group
# Copyright (C) 1996-2002 Red Hat, Inc.
@@ -48,7 +48,7 @@
use Vend::File;
use Vend::Data;
-$VERSION = substr(q$Revision: 2.123 $, 10);
+$VERSION = substr(q$Revision: 2.124 $, 10);
my %CDname;
my %CPname;
@@ -305,6 +305,7 @@
['SOAP_StartServers', 'integer', 1],
['SOAP_Host', undef, 'localhost 127.0.0.1'],
['SOAP_Control', 'action', ''],
+ ['Jobs', 'hash', 'MaxLifetime 600 MaxServers 1'],
['IPCsocket', undef, "$Global::VendRoot/etc/socket.ipc"],
['HouseKeeping', 'integer', 60],
['Mall', 'yesno', 'No'],
2.8 +9 -3 interchange/lib/Vend/Control.pm
rev 2.8, prev_rev 2.7
Index: Control.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Control.pm,v
retrieving revision 2.7
retrieving revision 2.8
diff -u -r2.7 -r2.8
--- Control.pm 18 Jun 2003 17:34:44 -0000 2.7
+++ Control.pm 27 Jul 2003 16:06:53 -0000 2.8
@@ -1,6 +1,6 @@
# Vend::Control - Routines that alter the running Interchange daemon
#
-# $Id: Control.pm,v 2.7 2003/06/18 17:34:44 jon Exp $
+# $Id: Control.pm,v 2.8 2003/07/27 16:06:53 racke Exp $
#
# Copyright (C) 2002-2003 Interchange Development Group
# Copyright (C) 1996-2002 Red Hat, Inc.
@@ -55,14 +55,20 @@
shift;
$Vend::mode = 'jobs';
my $arg = shift;
- my ($cat, $job) = split /\s*=\s*/, $arg, 2;
+ my ($cat, $job, $delay) = split /\s*=\s*/, $arg, 3;
+
$Vend::JobsCat = $cat;
+ if ($delay =~ /^(\d+)$/) {
+ $delay + time;
+ } else {
+ $delay = 0;
+ }
#::logGlobal("signal_jobs: called cat=$cat job=$job");
$job = join ",", $job, $Vend::JobsJob;
$job =~ s/^,+//;
$job =~ s/,+$//;
$Vend::JobsJob = $job;
- Vend::Util::writefile("$Global::RunDir/restart", "jobs $cat $job\n");
+ Vend::Util::writefile("$Global::RunDir/jobs", "jobs $cat $delay $job\n");
#::logGlobal("signal_jobs: wrote file, ready to control_interchange");
control_interchange('jobs', 'HUP');
}
2.38 +92 -9 interchange/lib/Vend/Server.pm
rev 2.38, prev_rev 2.37
Index: Server.pm
===================================================================
RCS file: /var/cvs/interchange/lib/Vend/Server.pm,v
retrieving revision 2.37
retrieving revision 2.38
diff -u -r2.37 -r2.38
--- Server.pm 26 Jul 2003 21:55:58 -0000 2.37
+++ Server.pm 27 Jul 2003 16:06:53 -0000 2.38
@@ -1,6 +1,6 @@
# Vend::Server - Listen for Interchange CGI requests as a background server
#
-# $Id: Server.pm,v 2.37 2003/07/26 21:55:58 mheins Exp $
+# $Id: Server.pm,v 2.38 2003/07/27 16:06:53 racke Exp $
#
# Copyright (C) 2002-2003 Interchange Development Group
# Copyright (C) 1996-2002 Red Hat, Inc.
@@ -26,7 +26,7 @@
package Vend::Server;
use vars qw($VERSION);
-$VERSION = substr(q$Revision: 2.37 $, 10);
+$VERSION = substr(q$Revision: 2.38 $, 10);
use POSIX qw(setsid strftime);
use Vend::Util;
@@ -865,6 +865,8 @@
%Page_pids
$SOAP_servers
%SOAP_pids
+ $Job_servers
+ %Lifetime
$vector
$p_vector
$s_vector
@@ -875,6 +877,8 @@
}
$Num_servers = 0;
$SOAP_servers = 0;
+$Job_servers = 0;
+%Lifetime = ();
# might also trap: QUIT
@@ -963,7 +967,7 @@
rand();
$Last_housekeeping = $now;
- my ($c, $num,$reconfig, $restart, @files);
+ my ($c, $num,$reconfig, $restart, $jobs, @files);
my @pids;
if($Global::PreFork) {
@@ -1013,6 +1017,7 @@
($reconfig) = grep $_ eq 'reconfig', @files;
($restart) = grep $_ eq 'restart', @files
if $Signal_Restart || $Global::Windows;
+ ($jobs) = grep $_ eq 'jobs', @files;
if($Global::PIDcheck) {
$Num_servers = 0;
@pids = grep /^pid\.\d+$/, @files;
@@ -1052,11 +1057,6 @@
{
::remove_catalog($1);
}
- elsif( $directive =~ /^jobs$/i) {
- my ($cat, @jobs) = grep /\S/, split /[\s,\0]+/, $value;
-#::logGlobal("restart line found value='$value'");
- run_jobs($cat, @jobs);
- }
else {
::change_global_directive($directive, $value);
}
@@ -1124,6 +1124,66 @@
$respawn = 1;
}
+ if (defined $jobs) {
+ my (@scheduled_jobs, @queued_jobs);
+ open(Vend::Server::JOBS, "+<$Global::RunDir/jobs")
+ or die "open $Global::RunDir/jobs: $!\n";
+ lockfile(\*Vend::Server::JOBS, 1, 1)
+ or die "lock $Global::RunDir/jobs: $!\n";
+ while(<Vend::Server::JOBS>) {
+ chomp;
+ my ($directive,$value) = split /\s+/, $_, 2;
+ my ($cat, $delay, @jobs) = grep /\S/, split /[\s,\0]+/, $value;
+ if ($delay && $delay < time()) {
+ # job expired
+#::logDebug ("Jobs @jobs expired ($delay vs $now)\n");
+ } elsif ($Job_servers++ >= $Global::Jobs->{MaxServers}) {
+ # no slot for job
+ $Job_servers--;
+#::logDebug ("Jobs @jobs queued, already %d jobs running/scheduled", $Job_servers);
+ push(@queued_jobs, "$directive $value");
+ } else {
+#::logDebug ("Scheduled job @jobs for running");
+ push (@scheduled_jobs, [$cat, @jobs]);
+ }
+ if (@queued_jobs > 20) {
+ ::logGlobal({ level => 'notice' }, "Excessive size of job queue, stopping");
+ last;
+ }
+ }
+
+ truncate(Vend::Server::JOBS, 0)
+ or die "truncate $Global::RunDir/jobs: $!\n";
+ seek(Vend::Server::JOBS, 0, 0)
+ or die "seek $Global::RunDir/jobs: $!\n";
+
+ if (@queued_jobs) {
+#::logDebug("Size of queue $$: %s", scalar(@queued_jobs));
+ print Vend::Server::JOBS join("\n", @queued_jobs, '');
+ unlockfile(\*Vend::Server::JOBS)
+ or die "unlock $Global::RunDir/jobs: $!\n";
+ close(Vend::Server::JOBS)
+ or die "close $Global::RunDir/jobs: $!\n";
+ } else {
+ unlockfile(\*Vend::Server::JOBS)
+ or die "unlock $Global::RunDir/jobs: $!\n";
+ close(Vend::Server::JOBS)
+ or die "close $Global::RunDir/jobs: $!\n";
+ unlink "$Global::RunDir/jobs"
+ or die "unlink $Global::RunDir/jobs: $!\n";
+ }
+
+ # now we run the scheduled jobs
+ for my $jobref (@scheduled_jobs) {
+ eval {
+ run_jobs (@$jobref);
+ };
+
+ if($@) {
+ ::logGlobal({ level => 'notice' }, $@);
+ }
+ }
+ }
if($respawn) {
if($Global::PreFork) {
@@ -1169,8 +1229,20 @@
my $fn = "$Global::RunDir/$_";
($Num_servers--, next) if ! -f $fn;
my $runtime = $now - (stat(_))[9];
- next if $runtime < $Global::PIDcheck;
s/^pid\.//;
+ my ($lifetime, $isjob);
+ if (exists $Lifetime{$_}) {
+ $lifetime = $Lifetime{$_};
+ $isjob = 1;
+ } else {
+ $lifetime = $Global::PIDcheck;
+ }
+ next if $runtime < $lifetime;
+ if ($isjob) {
+ delete $Lifetime{$_};
+ $Job_servers--;
+ }
+
if(kill 9, $_) {
unlink $fn and $Num_servers--;
::logGlobal({ level => 'error' }, "hammered PID %s running %s seconds", $_, $runtime);
@@ -1830,6 +1902,15 @@
$SOAP_servers--;
start_soap(undef, 1);
}
+ elsif ($thing =~ /^running job (\d+)/) {
+#::logDebug("registered job pid $1");
+ $Lifetime{$1} = $Global::Jobs->{MaxLifetime} || 30;
+ }
+ elsif ($thing =~ /^finishing job (\d+)/) {
+#::logDebug("finished job pid $1");
+ $Job_servers--;
+ delete $Lifetime{$1};
+ }
elsif($thing =~ /^\d+$/) {
close $fh;
$Num_servers++;
@@ -2293,6 +2374,7 @@
#fork again
unless ($pid = fork) {
+ send_ipc("running job $$");
reset_per_fork();
$::Instance = {};
eval {
@@ -2307,6 +2389,7 @@
if defined $Vend::Cfg->{ErrorFile};
}
clean_up_after_fork();
+ send_ipc("finishing job $$");
undef $::Instance;
select(undef,undef,undef,0.050) until &$ppidsub == 1;
More information about the interchange-cvs
mailing list