Rev 7406 | Blame | Compare with Previous | Last modification | View Log | RSS feed
#! /usr/bin/perl######################################################################### COPYRIGHT - VIX IP PTY LTD ("VIX"). ALL RIGHTS RESERVED.## Module name : blatTarZip.pl# Module type :# Compiler(s) : Perl# Environment(s):## Description : This is a blat related task that will monitor a# directory for requests to tarZip a package## Usage : ARGV[0] - Path to config file for this instance##......................................................................#require 5.008_002;use strict;use warnings;use Getopt::Long;use File::Basename;use Data::Dumper;use File::Spec::Functions;use POSIX ":sys_wait_h";use File::Temp qw/tempfile/;use Digest::MD5;use FindBin; # Determine the current directoryuse lib "$FindBin::Bin/lib"; # Allow local librariesuse Utils;use StdLogger; # Log to sdtoutuse Logger; # Log to file## Database interface# Pinched from jats and modified so that this software is not dependent on JATS#use IO::Handle;use JatsRmApi;use DBI;## Globals#my $logger = StdLogger->new(); # Stdout logger. Only during config$logger->err("No config file specified") unless (defined $ARGV[0]);$logger->err("Config File does not exist: $ARGV[0]") unless (-f $ARGV[0]);my $name = basename( $ARGV[0]);$name =~ s~.conf$~~;my $now = 0;my $startTime = 0;my $tagDirTime = 0;my $lastDirScan = 0;my $lastCleanScan = 0;my $mtimeConfig = 0;my $conf;my $yday = -1;my $tagRoot;## Contain statisics maintained while operating# Can be dumped with a kill -USR2# List here for documentation#my %statistics = (SeqNum => 0, # Bumped when $statistics are dumpedtimeStamp => 0, # DateTime when statistics are dumpedupTime => 0, # Seconds since program startCycle => 0, # Major process loop counterphase => 'Init', # Current phase of operationstate => 'OK', # Nagios state## The following are reset each daydayStart => 0, # DateTime when daily data was resettxCount => 0, # Packages TransferreddelCount => 0, # Packages marked for deletionstaleTags => 0, # Stale TagslinkErrors => 0, # Transfer (zip) errors## Per Cycle Data - Calculated each processing Cycletotal => 0, # Packages to be synceddelete => 0, # Packages to deleteexcluded => 0, # Packages excludedfiltered => 0, # Packages filtered outmissing => 0, # Packages missingtransfer => 0, # Packages to transferwritable => 0, # Packages still writable - thus not transferredtagCount => 0, # Packages tagged to be transferred#);## Describe configuration parameters#my %cdata = ('piddir' => {'mandatory' => 1 , 'fmt' => 'dir'},'sleep' => {'default' => 5 , 'fmt' => 'period'},'dpkg_archive' => {'mandatory' => 1 , 'fmt' => 'dir'},'logfile' => {'mandatory' => 1 , 'fmt' => 'vfile'},'logfile.size' => {'default' => '1M' , 'fmt' => 'size'},'logfile.count' => {'default' => 9 , 'fmt' => 'int'},'verbose' => {'default' => 0 , 'fmt' => 'int'},'active' => {'default' => 1 , 'fmt' => 'bool'},'debug' => {'default' => 0 , 'fmt' => 'bool'}, # Log to screen'txdetail' => {'default' => 0 , 'fmt' => 'bool'},'tagdir' => {'mandatory' => 1 , 'fmt' => 'mkdir'},'forcedirscan' => {'default' => 100 , 'fmt' => 'period'},'tagMaxPackages' => {'default' => 10 , 'fmt' => 'int'},'tagage' => {'default' => '10d' , 'fmt' => 'period'},'cleanPeriod' => {'default' => '30m' , 'fmt' => 'period'},'maxFileAge' => {'default' => '24h' , 'fmt' => 'period'},);## Read in the configuration# Set up a logger# Write a pidfile - thats not used$now = $startTime = time();readConfig();Utils::writepid($conf);$logger->logmsg("Starting...");readStatistics();sighandlers();## Main processing loop# Will exit when terminated by parent#while (1){$logger->verbose3("Processing");$statistics{Cycle}++;$now = time();$statistics{phase} = 'ReadConfig';readConfig();if ( $conf->{'active'} ){$statistics{phase} = 'Monitor Tags';processRequests();cleanZipStore();}$statistics{phase} = 'Sleep';sleep( $conf->{'sleep'} );reapChildren();# If my PID file ceases to be, then exit the daemon# Used to force daemon to restart#unless ( -f $conf->{'pidfile'} ){$logger->logmsg("Terminate. Pid file removed");last;}}$statistics{phase} = 'Terminated';$logger->logmsg("Child End");exit 0;#-------------------------------------------------------------------------------# Function : reapChildren## Description : Reap any and all dead children# Call in major loops to prevent zombies accumulating## Inputs : None## Returns :#sub reapChildren{my $currentPhase = $statistics{phase};$statistics{phase} = 'Reaping';my $kid;do {$kid = waitpid(-1, WNOHANG);} while ( $kid > 0 );$statistics{phase} = $currentPhase;}#-------------------------------------------------------------------------------# Function : readConfig## Description : Re read the config file if it modification time has changed## Inputs : Nothing## Returns : 0 - Config not read# 1 - Config read# Config file has changed#sub readConfig{my ($mtime) = Utils::mtime($ARGV[0]);my $rv = 0;if ( $mtimeConfig != $mtime ){$logger->logmsg("Reading config file: $ARGV[0]");$mtimeConfig = $mtime;my $errors;($conf, $errors) = Utils::readconf ( $ARGV[0], \%cdata );if ( scalar @{$errors} > 0 ){warn "$_\n" foreach (@{$errors});die ("Config contained errors\n");}## Reset some information# Create a new logger#$logger = Logger->new($conf) unless $conf->{debug};$conf->{logger} = $logger;$conf->{'pidfile'} = $conf->{'piddir'} . '/' . $name . '.pid';$logger->setVerbose($conf->{verbose});$logger->verbose("Log Levl: $conf->{verbose}");## Setup statistics filename$conf->{'statsfile'} = $conf->{'piddir'} . '/' . $name . '.stats';$conf->{'statsfiletmp'} = $conf->{'piddir'} . '/' . $name . '.stats.tmp';## Calculate the base of the tags directory# ASSUME all tagdirs are in the same tree as my tags dir#$conf->{'tagdir'} =~ m~^(.*)/~;$tagRoot = $1;}## When config is read force some actions#Utils::DebugDumpData ("Config", $conf);$logger->warn("Tar Zip is inactive") unless ( $conf->{'active'} );return $rv;}#-------------------------------------------------------------------------------# Function : processRequests## Description : Process tags and generate tarZip files as required# Determine if new tags are present# Process each tag## Inputs : None## Returns : Nothing#sub processRequests{## Determine if new tags are present by examining the time# that the directory was last modified.## Allow for a forced scan to catch packages that did not transfer# on the first attempt#my $tagCount = 0;my ($mtime) = Utils::mtime($conf->{'tagdir'} );if ( ($mtime > $tagDirTime) || ($now > ($lastDirScan + $conf->{'forcedirscan'})) ){$logger->verbose2("processTags: ,$conf->{'tagdir'}");$tagDirTime = $mtime;$lastDirScan = $now;my $txcount = $conf->{'tagMaxPackages'};my $dh;unless (opendir($dh, $conf->{'tagdir'})){$logger->warn ("can't opendir $conf->{'tagdir'}: $!");return;}## Process each entry# Ignore those that start with a .#my %tagPkgList;while (my $tag = readdir($dh) ){next if ( $tag =~ m~^\.~ );my $file = "$conf->{'tagdir'}/$tag";$logger->verbose3("processTags: $file");if ( $tag =~ m~(.+)::(.+)~ ){my $package = $1;my $version = $2;$tagCount++;$tagPkgList{$package}{$version} = $file;}}$statistics{tagCount} = $tagCount;closedir $dh;## Process the packages located in the tags area#send_tags:while ( (my ($package, $pvers)) = each %{tagPkgList} ){while ( (my ($version, $file) ) = each %{$pvers} ){if ( --$txcount <= 0 ){$logger->warn("Max tag transfer count exceeded: $tagCount transfer remaining");$tagDirTime = 0;last send_tags;}if ( readConfig() ){$logger->warn("Config file changed");$txcount = 0;$tagDirTime = 0;last send_tags;}##if ( ! zipRequestNeeded($package, $version) ) {$logger->verbose2("NoZip request outstanding: $package, $version");unlink $file;} elsif ( tarZipPackage( $package, $version )) {unlink $file;triggerTransfers($package, $version);}else{if ($conf->{'tagage'} > 0) {my ($mtime) = Utils::mtime( $file );if ( $now - $mtime > $conf->{'tagage'} ){$logger->warn ("Delete unsatisfied tag: $package::$version after $conf->{'tagage'}" );unlink $file;$statistics{staleTags}++;}}}$tagCount--;reapChildren();}}}}#-------------------------------------------------------------------------------# Function : zipRequestNeeded## Description : Scan tags and ensure that the zip request is still needed# Another task must still have need for this package to be# tarZipped## Inputs : $pname# $version## Returns : True if still needed#sub zipRequestNeeded {my ($pname, $version) = @_;## Find the tag in all blat transfer areas#my $tag = "$pname::$version";my @tagList = glob ("$tagRoot/*/$tag");## Expect to find my own tag, so a count of 1 indicates not found elsewhere#my $count = scalar @tagList;return $count > 1;}#-------------------------------------------------------------------------------# Function : cleanZipStore## Description : Cleanup the store of zipped packages# If a tarZip is no longer needed for a transfer, then we can consider# removing it from the store## May want to keep it for some time, but ...## May need to remove them if they are too old.# When a connection is wedged, then we may end up with a lot of packages# May be that the ssh tx process will age out old tags, but I don't think so## Should be run periodically## Inputs :## Returns :#sub cleanZipStore{if ( $conf->{cleanPeriod} == 0 ){$logger->verbose2("cleanZipStore disabled");return;}## Time to perform the scan# Will do at startup and every time period there after#return unless ( $now > ($lastCleanScan + $conf->{cleanPeriod} ));$logger->verbose("cleanZipStore");$lastCleanScan = $now;my %tagPkgList;## Process all active tag directories ( these have a .config file )# Locate all active tags and create a hash of known tags# These are packages that we have queued to be transferred#if (opendir(my $dh, $tagRoot) ){while (my $tagDir = readdir($dh) ){next unless -f catfile($tagRoot, $tagDir, '.config');$logger->verbose3 ("TagScan: $tagDir");my @tagList = glob ("$tagRoot/$tagDir/*::*");foreach my $entry ( @tagList ){$entry =~ m~.*/(.*)::(.*)$~;$tagPkgList{$1 .'__'.$2 . '.tgz'} = 1;}}closedir $dh;}## Iterate over all the stored tarZips and remove them if they are no longer needed#my @tarZipFiles = glob (catfile( $conf->{'dpkg_archive'}, '.dpkg_archive', 'tarStore', '*.tgz' ));foreach my $entry (@tarZipFiles) {$entry =~ m~.*/(.*)$~;my $fname = $1;if (!exists $tagPkgList{$fname}) {my ($mtime) = Utils::mtime($entry);my $age = time() - $mtime;$logger->verbose3( "File Age: $age, $conf->{maxFileAge}");if ( $age > $conf->{maxFileAge} ) {$logger->logmsg("cleanZipStore. Remove: $fname");unlink $entry;$logger->warn("cleanZipStore. Cannot Remove: $fname") if (-f $entry);}} else {$logger->verbose("cleanZipStore. Retain: $fname");}}}#-------------------------------------------------------------------------------# Function : tarZipPackage## Description : Perform the tar Zip operation## Inputs : $pname# $version## Returns : 1 - TarZip complete# 0 - TarZip not complete#sub tarZipPackage{my ($pname, $version) = @_;$logger->logmsg("TarZip $pname $version");my $pkgName = $pname .'_'.$version;my $srcDir = catdir( $conf->{'dpkg_archive'}, $pname, $version);my $tgtdir = catfile( $conf->{'dpkg_archive'}, '.dpkg_archive', 'tarStore' );my $zfile = $pname . '__' . $version . '.tgz';my $tfile = catfile($tgtdir, $zfile);my $tfileTmp = $tfile . '.TEMP';my $startTime = time;## Does the source existif (! -d $srcDir) {$logger->warn("Package not found: $pname, $version");return 0;}## If the target zip is already present, then assume the job has been done#if ( -f $tfile ) {$logger->verbose("tarZipPackage: Already done: $pname, $version");$logger->verbose2("tarZipPackage: Already done: $pname, $version - $tfile");return 1;}## Tar zip the file# TarZip into a temp file, then rename it#my $tar_cmd = "tar -czf $tfileTmp -C $conf->{'dpkg_archive'} $pname/$version";$logger->verbose2("tarZipPackage:tar_cmd:$tar_cmd");my $ph;my $cmdRv;open ($ph, "$tar_cmd |");while ( <$ph> ){chomp;$logger->verbose2("tarZipPackage:Data: $_");}close ($ph);$cmdRv = $?;$logger->verbose("tarZipPackage:End: $cmdRv");# Rename the TEMP file, so that the tgz file creation appears atomicif ($cmdRv eq 0 && -f $tfileTmp) {rename $tfileTmp, $tfile || $logger->warn("Rename error: $tfileTmp");}## Display the size of the package (tarZipped)# Diagnostic use#if ( -f $tfile && $conf->{txdetail}) {my $tzfsize = -s $tfile;my $size = sprintf "%.3f", $tzfsize / 1024 / 1024 / 1024 ;my $duration = time - $startTime;$logger->logmsg("tarZipPackage: Stats: $pname, $version, $size Gb, $duration Secs");}if ( -f $tfile ) {$statistics{txCount}++;$logger->verbose2("tarZipPackage:Done: $pname/$version");$cmdRv = 1;} else {unlink $tfileTmp;$statistics{linkErrors}++;$logger->verbose2("tarZipPackage:Error: $pname/$version");$cmdRv = 0;}## Return 0 if the required tar file exists#return $cmdRv;}#-------------------------------------------------------------------------------# Function : triggerTransfers## Description : Trigger transfers for other blat tasks that may be waiting for this# tarZip to have been performed## Inputs : $pname# $version## Returns : Even less#sub triggerTransfers{my ($pname, $version) = @_;## Find the tag in all blat transfer areas#my $tag = "$pname::$version";my @tagList = glob ("$tagRoot/*/$tag");$logger->verbose2("triggerTransfer: $tagRoot/*/$tag: @tagList");foreach my $target ( @tagList ){$logger->verbose2("triggerTransfer: $target");$target =~ m~^(.*)/~;my $tagDir = $1;my $configFile = catfile($tagDir, '.config');my $triggerFile = catfile($tagDir, '.trigger');if ( -f $configFile) {Utils::TouchFile($conf, $triggerFile);} else {unlink $target;}}}#-------------------------------------------------------------------------------# Function : resetDailyStatistics## Description : Called periodically to reset the daily statistics## Inputs : $time - Current time## Returns :#sub resetDailyStatistics{my ($time) = @_;## Detect a new day#my $today = (localtime($time))[7];if ($yday != $today){$yday = $today;$logger->logmsg('Resetting daily statistics' );# Note: Must match @recoverTags in readStatistics$statistics{dayStart} = $time;$statistics{txCount} = 0;$statistics{delCount} = 0;$statistics{staleTags} = 0;$statistics{linkErrors} = 0;}}#-------------------------------------------------------------------------------# Function : readStatistics## Description : Read in the last set of stats# Used after a restart to recover daily statistics## Inputs :## Returns :#sub readStatistics{my @recoverTags = qw(dayStart txCount delCount staleTags linkErrors);if ($conf->{'statsfile'} and -f $conf->{'statsfile'}){if (open my $fh, $conf->{'statsfile'}){while (<$fh>){m~(.*):(.*)~;if ( grep( /^$1$/, @recoverTags ) ){$statistics{$1} = $2;$logger->verbose("readStatistics $1, $2");}}close $fh;$yday = (localtime($statistics{dayStart}))[7];}}}#-------------------------------------------------------------------------------# Function : periodicStatistics## Description : Called on a regular basis to write out statistics# Used to feed information into Nagios## This function is called via an alarm and may be outside the normal# processing loop. Don't make assumptions on the value of $now## Inputs :## Returns :#sub periodicStatistics{## A few local stats#$statistics{SeqNum}++;$statistics{timeStamp} = time();$statistics{upTime} = $statistics{timeStamp} - $startTime;# Reset daily accumulations - on first use each dayresetDailyStatistics($statistics{timeStamp});## Write statistics to a file# Write to a tmp file, then rename.# Attempt to make the operation atomic - so that the file consumer# doesn't get a badly formed file.#if ($conf->{'statsfiletmp'}){my $fh;unless (open ($fh, '>', $conf->{'statsfiletmp'})){$fh = undef;$logger->warn("Cannot create temp stats file: $!");}else{foreach my $key ( sort { lc($a) cmp lc($b) } keys %statistics){print $fh $key . ':' . $statistics{$key} . "\n";$logger->verbose2('Statistics:'. $key . ':' . $statistics{$key});}close $fh;# Rename temp to real filerename $conf->{'statsfiletmp'}, $conf->{'statsfile'} ;}}}#-------------------------------------------------------------------------------# Function : sighandlers## Description : Install signal handlers## Inputs : None## Returns : Nothing#sub sighandlers{$SIG{TERM} = sub {# On shutdown$logger->logmsg('Received SIGTERM. Shutting down....' );unlink $conf->{'pidfile'} if (-f $conf->{'pidfile'});exit 0;};$SIG{HUP} = sub {# On logrotate$logger->logmsg('Received SIGHUP.');$logger->rotatelog();};$SIG{USR1} = sub {# On Force Cache Clean$logger->logmsg('Received SIGUSR1.');$lastCleanScan = 0;};alarm 60;$SIG{ALRM} = sub {# On Dump Statistics$logger->verbose2('Received SIGUSR2.');periodicStatistics();alarm 60;};$SIG{__WARN__} = sub { $logger->warn("@_") };$SIG{__DIE__} = sub { $logger->err("@_") };}#-------------------------------------------------------------------------------# Function : Error, Verbose, Warning## Description : Support for JatsRmApi## Inputs : Message## Returns : Nothing#sub Error{$logger->err("@_");}sub Verbose{$logger->verbose2("@_");}sub Warning{$logger->warn("@_");}