Rev 7469 | Blame | Compare with Previous | Last modification | View Log | RSS feed
#! /usr/bin/perl######################################################################### COPYRIGHT - VIX IP PTY LTD ("VIX"). ALL RIGHTS RESERVED.## Module name : blatDaemon.pl# Module type :# Compiler(s) : Perl# Environment(s):## Description :## Usage : ARGV[0] - Path to config file for this instance##......................................................................#require 5.008_002;use strict;use warnings;use Getopt::Long;use File::Basename;use Data::Dumper;use File::Spec::Functions;use POSIX ":sys_wait_h";use File::Temp qw/tempfile/;use Digest::MD5;use FindBin; # Determine the current directoryuse lib "$FindBin::Bin/lib"; # Allow local librariesuse Utils;use StdLogger; # Log to sdtoutuse Logger; # Log to file## Database interface# Pinched from jats and modified so that this software is not dependent on JATS#use IO::Handle;use JatsRmApi;use DBI;## Globals#my $logger = StdLogger->new(); # Stdout logger. Only during config$logger->err("No config file specified") unless (defined $ARGV[0]);$logger->err("Config File does not exist: $ARGV[0]") unless (-f $ARGV[0]);my $name = basename( $ARGV[0]);$name =~ s~.conf$~~;my $now = 0;my $startTime = 0;my $tar = 'tar';my $gzip = 'gzip';my $tagDirTime = 0;my $lastDirScan = 0;my $lastReleaseScan = 0;my $releaseScanMode = 0;my $lastTagListUpdate = 0;my $lastRmConfRead = 0;my $lastRmConfFullRead = 0;my $lastRmSeqNum = 0;my $mtimeConfig = 0;my $conf;my $extraPkgs;my $excludePkgs;my %releaseData;my $comError = 0;my $yday = -1;my $RemotePkgList = {};my $targetBinDir = "$FindBin::Bin/targetBin";my $server_id;my @projectList;my @releaseList;my $isS3Target;## Contain statisics maintained while operating# Can be dumped with a kill -USR2# List here for documentation#my %statistics = (SeqNum => 0, # Bumped when $statistics are dumpedtimeStamp => 0, # DateTime when statistics are dumpedupTime => 0, # Seconds since program startCycle => 0, # Major process loop counterphase => 'Init', # Current phase of operationstate => 'OK', # Nagios statewedged => 0, # Wedge indication - main loop not cycling## The following are reset each daydayStart => 0, # DateTime when daily data was resettxCount => 0, # Packages TransferreddelCount => 0, # Packages marked for deletionstaleTags => 0, # Stale TagslinkErrors => 0, # Transfer errors## Per Cycle Data - Calculated each processing Cycletotal => 0, # Packages to be synceddelete => 0, # Packages to deleteexcluded => 0, # Packages excludedfiltered => 0, # Packages filtered outmissing => 0, # Packages missingtransfer => 0, # Packages to transferwritable => 0, # Packages still writable - thus not transferredtagCount => 0, # Packages tagged to be transferredtagDelCount => 0, # Packages tagged to be deleted## Expected from the Target# Target.Hostname => '', # Target Hostname# Target.avail => 0, # Information from 'df' 1Kblocks# Target.pcent => 0,# Target.size => 0,# Target.used => 0,# Target.iavail => 0, # Inode information from 'df'# Target.ipcent => 0,# Target.isize => 0,# Target.iused => 0,# Target.Total => 0, # Number of Package Versions in the archive# Target.Damaged => 0, # Number that are damaged# Target.Delete => 0, # Number marked for future deletion# Target.Missing => 0, # Number missing);## Describe configuration parameters#my %cdata = ('.ignore' => {'pkg\.(.+)' => 'pkgs' },'.oneOf' => [['hostname','S3Bucket']] ,'piddir' => {'mandatory' => 1 , 'fmt' => 'dir'},'sleep' => {'default' => 5 , 'fmt' => 'period'},'dpkg_archive' => {'mandatory' => 1 , 'fmt' => 'dir'},'logfile' => {'mandatory' => 1 , 'fmt' => 'vfile'},'logfile.size' => {'default' => '1M' , 'fmt' => 'size'},'logfile.count' => {'default' => 9 , 'fmt' => 'int'},'verbose' => {'default' => 0 , 'fmt' => 'int'},'rmHostName' => {'default' => undef , 'fmt' => 'text'},'sshport' => {'default' => 0 , 'fmt' => 'int'},'tagdir' => {'mandatory' => 1 , 'fmt' => 'mkdir'},'forcedirscan' => {'default' => 100 , 'fmt' => 'period'},'tagage' => {'default' => '10m' , 'fmt' => 'period'},'tagListUpdate' => {'default' => '1h' , 'fmt' => 'period'},'tagMaxPackages' => {'default' => 10 , 'fmt' => 'int'},'rmConfigCheck' => {'default' => '60' , 'fmt' => 'period'},'rmConfFullRead' => {'default' => '1h' , 'fmt' => 'period'},'synctime' => {'default' => '2h' , 'fmt' => 'period'},'syncretry' => {'default' => '5m' , 'fmt' => 'period'},'allProjects' => {'default' => 0 , 'fmt' => 'bool'},'allArchive' => {'default' => 0 , 'fmt' => 'bool'},'project' => {'mandatory' => 0 , 'fmt' => 'int_list'},'release' => {'mandatory' => 0 , 'fmt' => 'int_list'},'writewindow' => {'default' => '3h' , 'fmt' => 'period'},'maxpackages' => {'default' => 5 , 'fmt' => 'int'},'deletePackages' => {'default' => 0 , 'fmt' => 'bool'},'deleteImmediate' => {'default' => 0 , 'fmt' => 'bool'},'deleteAge' => {'default' => 0 , 'fmt' => 'period'},'packageFilter' => {'default' => undef , 'fmt' => 'text'},'active' => {'default' => 1 , 'fmt' => 'bool'},'debug' => {'default' => 0 , 'fmt' => 'bool'}, # Log to screen'txdetail' => {'default' => 0 , 'fmt' => 'bool'},'noTransfers' => {'default' => 0 , 'fmt' => 'bool'}, # Debugging option to prevent transfers'maxTarZips' => {'default' => 5 , 'fmt' => 'int'},'wedgeTime' => {'default' => '30m' , 'fmt' => 'period'},'allNewPkgs' => {'default' => 0 , 'fmt' => 'bool'},## Transfer via ssh# Cannot be used in conjunction with S3Bucket#'hostname' => {'fmt' => 'text', 'requires' => 'user,identity,bindir'},'user' => {'fmt' => 'text', 'requires' => 'hostname'},'identity' => {'fmt' => 'file'},'bindir' => {'fmt' => 'text'},## Transfer to S3 configuration items# Cannot be used in conjunction with hostname# Many other options will be ignored#'S3Bucket' => {'fmt' => 'text', 'requires' => 'S3Profile,S3Profile,S3Region'},'S3Profile' => {'fmt' => 'text', 'requires' => 'S3Bucket' },'S3Region' => {'fmt' => 'text', 'requires' => 'S3Bucket' },'S3AllowDelete' => {'default' => 0 , 'fmt' => 'bool'},);## Read in the configuration# Set up a logger# Write a pidfile - thats not used$now = $startTime = time();readConfig();Utils::writepid($conf);$logger->logmsg("Starting...");readStatistics();sighandlers();## Main processing loop# Will exit when terminated by parent#while (1){$logger->verbose3("Processing");$statistics{Cycle}++;$now = time();Utils::resetWedge();$statistics{phase} = 'ReadConfig';readConfig();if ( $conf->{'active'} ){$statistics{phase} = 'ProcessReleaseList';processReleaseList();$statistics{phase} = 'processTags';processTags();$statistics{phase} = 'maintainTagList';maintainTagList();}%releaseData = ();$statistics{phase} = 'Sleep';sleep( $conf->{'sleep'} );reapChildren();# If my PID file ceases to be, then exit the daemon# Used to force daemon to restart#unless ( -f $conf->{'pidfile'} ){$logger->logmsg("Terminate. Pid file removed");last;}}$statistics{phase} = 'Terminated';$logger->logmsg("Child End");exit 0;#-------------------------------------------------------------------------------# Function : reapChildren## Description : Reap any and all dead children# Call in major loops to prevent zombies accumulating## Inputs : None## Returns :#sub reapChildren{my $currentPhase = $statistics{phase};$statistics{phase} = 'Reaping';my $kid;do {$kid = waitpid(-1, WNOHANG);} while ( $kid > 0 );$statistics{phase} = $currentPhase;}#-------------------------------------------------------------------------------# Function : readConfig## Description : Re read the config file if it modification time has changed## Inputs : Nothing## Returns : 0 - Config not read# 1 - Config read# Config file has changed#sub readConfig{my ($mtime) = Utils::mtime($ARGV[0]);my $rv = 0;if ( $mtimeConfig != $mtime ){$logger->logmsg("Reading config file: $ARGV[0]");$mtimeConfig = $mtime;my $errors;($conf, $errors) = Utils::readconf ( $ARGV[0], \%cdata );if ( scalar @{$errors} > 0 ){warn "$_\n" foreach (@{$errors});die ("Config contained errors\n");}$isS3Target = defined $conf->{'S3Bucket'};if ($isS3Target) {if (!$conf->{'S3AllowDelete'}) {$conf->{deletePackages} = 0;}}## Reset some information# Create a new logger#$logger = Logger->new($conf) unless $conf->{debug};$conf->{logger} = $logger;$conf->{'pidfile'} = $conf->{'piddir'} . '/' . $name . '.pid';$logger->setVerbose($conf->{verbose});$logger->verbose("Log Levl: $conf->{verbose}");## Setup statistics filename$conf->{'statsfile'} = $conf->{'piddir'} . '/' . $name . '.stats';$conf->{'statsfiletmp'} = $conf->{'piddir'} . '/' . $name . '.stats.tmp';## Extract extra package config# Ignore ALL and Version info if transferring the entire archive# Honor the EXCLUDE - for bandwidth reasons## NOTE: Package inclusion will not be processed in allArchive mode# BUG: This processing should be done after the RmConfig has been included#$extraPkgs = {};$excludePkgs = {};while (my($key, $data) = each ( %{$conf->{pkgs}} )){if ( $data eq 'EXCLUDE' ) {$excludePkgs->{$key} = 1;$logger->verbose("Exclude Pkg: $key");} elsif ( $data eq 'ALL' ) {next if ( $conf->{'allArchive'} );foreach my $pver (getPackageVersions($key)){$extraPkgs->{$key}{$pver} = 1;$logger->verbose("Extra Pkg: $key -> $pver");}} else {next if ( $conf->{'allArchive'} );foreach (split(/[,\s]+/, $data)){$extraPkgs->{$key}{$_} = 1;$logger->verbose("Extra Pkg: $key -> $_");}}}$logger->verbose("Filter Packages: " . $conf->{'packageFilter'})if ( defined $conf->{'packageFilter'} );$logger->warn("Non standard ssh port: " . $conf->{'sshport'})if ( $conf->{'sshport'} );## Save Text based config for use in RmConfig#$conf->{'BaseActive'} = $conf->{'active'};## Flag config has changed / been read# Force full RM data fetch#$rv = 1;$lastRmSeqNum = 0;$lastRmConfRead = 0;}## Read the Release Manager configuration too#$rv |= ReadRmConfig();## When config is read force some actions# - Force tagList to be created# - Force release scanif ($rv) {$lastTagListUpdate = 0;$lastReleaseScan = 0;## Update global Project/Release list - only on change@projectList = split /[,\s]+/, $conf->{'project'} || '';@releaseList = split /[,\s]+/, $conf->{'release'} || '';$logger->logmsg("projectList: ". join(',',@projectList));$logger->logmsg("releaseList: ". join(',',@releaseList));#Utils::DebugDumpData ("Config", $conf);$logger->warn("Transfer session configured as not active") unless ( $conf->{'active'} );$logger->warn("Transfer all project packages") if ( $conf->{'allProjects'} );$logger->warn("Transfer entire package archive") if ( $conf->{'allArchive'} );$logger->warn("All Transfers disabled") if ( $conf->{'noTransfers'} );$logger->warn("Transfer to AWS S3 Bucket") if ( $isS3Target );}return $rv;}#-------------------------------------------------------------------------------# Function : ReadRmConfig## Description : Read Configuration information from Release Manager# If Rm configuration is to be used then it will override# the project/release configuration in the text file### Inputs : Nothing## Returns : 0 - Config not read# 1 - Config read# Config file has changed#sub ReadRmConfig{## Time to perform a database read# Will do at startup and every time period there after#return 0 unless $conf->{'rmHostName'};return 0 unless $conf->{'BaseActive'};return 0 unless ( $now > ($lastRmConfRead + $conf->{rmConfigCheck} ));$logger->verbose("ReadRmConfig");$lastRmConfRead = $now;my $rv = 0;my $RM_DB;my ($blat_seqnum, $blat_mode, $found);my $server_enabled = 1;## Read the BLAT_SERVER record from Release Manager#my $m_sqlstr = "select blat_id, blat_enable, blat_seqnum, blat_mode from RELEASE_MANAGER.blat_servers WHERE UPPER(BLAT_SERVER_NAME) = UPPER('$conf->{'rmHostName'}')";connectRM(\$RM_DB);my $sth = $RM_DB->prepare($m_sqlstr);if ( defined($sth) ){if ( $sth->execute( ) ){if ( $sth->rows ){while (my @row = $sth->fetchrow_array ){$logger->verbose2("ReadRmConfig:Data:@row");$server_id = $row[0] || 0;$server_enabled = ($row[1] || 'N') eq 'Y';$blat_seqnum = $row[2] || 0;$blat_mode = ($row[3] || 'N');$found = 1;last;}}$sth->finish();}else{$logger->warn("ReadRmConfig: SQL Execute failure");}}else{$logger->warn("ReadRmConfig: SQL Prepare failure");}disconnectRM(\$RM_DB);## Process the extracted data#if ( !$found) {$logger->warn("No Release Manager configuration for:" . $conf->{'rmHostName'});$server_id = 0;$lastRmSeqNum = 0;return 0;}# Check for a change in config#if ($lastRmSeqNum != $blat_seqnum) {$rv = 1;$lastRmSeqNum = $blat_seqnum;}## Insert server configuration into the global config table# Rm Config will override the text config#$conf->{'active'} = $server_enabled;$conf->{'allProjects'} = $blat_mode eq 'P';$conf->{'allArchive'} = $blat_mode eq 'E';## If the config has changed, then read the Project and Release records from the database# Insert these into the config provided by the text file confing#my $forceFullRead;if ( $now > ($lastRmConfFullRead + $conf->{rmConfFullRead} )) {$logger->verbose("ForceReadRmConfig");$lastRmConfFullRead = $now;$forceFullRead = 1;}if ($rv || $forceFullRead) {connectRM(\$RM_DB);my @projects;my @releases;my $m_sqlstr = "select proj_id as proj_id, null as rtag_id from release_manager.blat_projects where blat_id = $server_id and UPPER(bp_enabled) = 'Y'" ." UNION " ."select null as proj_id, rtag_id rtag_id from release_manager.blat_releases where blat_id = $server_id and UPPER(br_enabled) = 'Y'";my $sth = $RM_DB->prepare($m_sqlstr);if ( defined($sth) ){if ( $sth->execute( ) ){if ( $sth->rows ){while (my @row = $sth->fetchrow_array ){$logger->verbose2("ReadRmConfig:Data:@row");push (@projects, $row[0]) if ($row[0]);push (@releases, $row[1]) if ($row[1] );}}$sth->finish();}else{$logger->warn("ReadRmConfig: SQL Execute failure");}}else{$logger->warn("ReadRmConfig: SQL Prepare failure");}disconnectRM(\$RM_DB);## Reform the projects and releases into a comma separated list and detect changes#my $projects = join(',', sort @projects);my $releases = join(',', sort @releases);if ((($projects || '') ne ($conf->{'project'} || '')) || (($releases || '') ne ($conf->{'release'} || ''))) {$conf->{'project'} = $projects;$conf->{'release'} = $releases;$rv = 1;}}# Indicate if the config has changedreturn $rv;}#-------------------------------------------------------------------------------# Function : checkForBasicTools## Description : Check that the target has the basic tools are installed# Can populate the target's bin directory with tools## Inputs : None## Returns : Nothing#sub checkForBasicTools{return if $isS3Target;my $ph;my $found;my $tgt_cmd = "if [ -x $conf->{'bindir'}/get_plist.pl ] ; then echo :FOUND:; fi";my $ssh_cmd = sshCmd($tgt_cmd);$logger->verbose2("checkForBasicTools:ssh_cmd:$ssh_cmd");open ($ph, "$ssh_cmd |");while ( <$ph> ){chomp;if (m~:FOUND:~) {$found = 1;}$logger->verbose2("checkForBasicTools:Data: $_");}close ($ph);my $exitCode = $? >> 8;$logger->verbose2("checkForBasicTools:End: $exitCode, $?");unless ( $found ){$logger->warn("checkForBasicTools: None found, $?");## The 'get_plist.pl' program was not found# Assume that the entire directory does not exist and transfer all#transferTargetBin();}}#-------------------------------------------------------------------------------# Function : transferTargetBin## Description : Ensure that the targets 'bin' folder is upto date## Inputs : $blatBinData - Ref to array of target data file info## Returns :#sub transferTargetBin{return if $isS3Target;my ($blatBinData) = @_;my $blatBinList = getBlatBin();foreach my $file ( keys %{$blatBinList} ){if (defined $blatBinData && exists $blatBinData->{$file}) {if ($blatBinData->{$file} eq $blatBinList->{$file}) {delete $blatBinList->{$file};}}}#Utils::DebugDumpData ("blatBinList", $blatBinList);transferBlatBin($blatBinList);}#-------------------------------------------------------------------------------# Function : processReleaseList## Description : Process the release list# Determine if its time to process release list# Determine release list# Determine release content# Determine new items## Inputs : None## Returns : Nothing#sub processReleaseList{## Is Release List Processing active# Can configure blat to disable release sync# This will then allow 'new' packages to be sent#if ( $conf->{maxpackages} == 0 || $conf->{'synctime'} <= 0 ){$logger->verbose2("processReleaseList disabled");$RemotePkgList = {};return;}## Time to perform the scan# Will do at startup and every time period there after#my $wtime = $releaseScanMode ? $conf->{'syncretry'} : $conf->{'synctime'};return unless ( $now > ($lastReleaseScan + $wtime ));$logger->verbose("processReleaseList");$lastReleaseScan = $now;$releaseScanMode = 1; # Assume error## Ensure that we have the basic tools for the transfer#checkForBasicTools();## Get list of packages from Remote site#my $remotePkgList;my $remoteData;my $blatBinData;## Get Data from an S3 bucket# Can only get a part of the full data set. The timestamp can't be processed, so set it to -1#if ($isS3Target ) {my $ph;my $tgt_cmd = "aws --profile $conf->{'S3Profile'}";$tgt_cmd .= " --region $conf->{'S3Region'}" if (defined $conf->{'S3Region'});$tgt_cmd .= " s3 ls $conf->{'S3Bucket'}/";$logger->verbose2("processReleaseList:s3_cmd:$tgt_cmd");open ($ph, "$tgt_cmd |");while ( <$ph> ){chomp;if (m~.*\s(.*)__(.*).tgz$~ ) {$remotePkgList->{$1}{$2}{s3} = 1;}}close ($ph);} else {# Get Data from a dpkg_archive maintained via ssh# Invoke a program on the remote site and parse the results## Returned data looks like:# Metadata avail="140100452"# BlatBin MD5="9e2c6e45af600a20a01dbcb7570da1f1" file="stat.pl"# time="1497586865" GMT="Fri Jun 16 04:21:05 2017" pname="ERGissaccounts" pver="1.0.7169.mas"# time="1497586865" GMT="Fri Jun 16 04:21:05 2017" pname="ERGissaccounts" pver="1.0.7169.mas" "link=latest"# time="1497954104" GMT="Tue Jun 20 10:21:44 2017" pname="ERGissaccounts" pver="1.0.7178.mas" deleted="0"my $ph;my $tgt_cmd = "$conf->{'bindir'}/get_plist.pl";my $ssh_cmd = sshCmd($tgt_cmd);$logger->verbose2("processReleaseList:ssh_cmd:$ssh_cmd");open ($ph, "$ssh_cmd |");while ( <$ph> ){chomp;if ($_ =~ m~^Metadata\s+~){parsePkgMetaData($_, \%{$remoteData});}elsif ($_ =~ m~^BlatBin\s+~){parseBlatBinData($_, \%{$blatBinData})}else{if ( parsePkgList($_, \%{$remotePkgList} ) ){$logger->verbose2("processReleaseList:Data: $_");}else{$logger->warn("processReleaseList:Bad Data: $_");}}}close ($ph);}$logger->verbose("processReleaseList:End: $?");$RemotePkgList = $remotePkgList;LogTxError ($?);if ( $? != 0 ){$logger->warn("Cannot retrieve package list: $?");$statistics{state} = 'No Remote Package List';return;}#Utils::DebugDumpData ("remotePkgList", $remotePkgList);## Ensure that the target bin folder is up to date#transferTargetBin($blatBinData);## Determine the set of packages in the releases to be transferred#my $pkgList;if ( $conf->{'allArchive'} ){# Examine entire archive#$pkgList = getArchiveList();}else{# Examine Releases#my @rlist = getReleaseList();unless ( @rlist ){$logger->verbose2("No Releases to Process");$statistics{state} = 'No Releases found';# Allow config with just specified packages## return;} else {$pkgList = getPkgList(@rlist);}}## Append extra packages# These are packages that are specifically named by the user## Note: If they are symbolic links, then the target of the# link is also added.## Symlink MUST be within the same directory# Used to transfer jats2_current#while ( (my ($pname, $pvers)) = each %{$extraPkgs} ) {while ( (my ($pver, $pdata) ) = each %{$pvers} ) {my $epath = catfile( $conf->{'dpkg_archive'} , $pname, $pver );if ( -l $epath ){my $lver = readlink( $epath );if ( ! defined $lver ){$logger->warn("Can't resolve symlink: $pname, $pver");next;}if ( $lver =~ m ~/~ ){$logger->warn("Won't resolve symlink: $pname, $pver, $lver");next;}## Add the package the link points to#$logger->verbose2("Add linked package: $pname, $lver, $pdata");$pkgList->{$pname}{$lver} = $pdata;if ($isS3Target) {$logger->verbose2("Won't send symlink to S3: $pname, $pver, $lver");next;}}$logger->verbose2("Add extra package: $pname, $pver, $pdata");$pkgList->{$pname}{$pver} = $pdata;}}#Utils::DebugDumpData ("parsePkgList", $rv);#Utils::DebugDumpData ("remotePkgList", $remotePkgList);## If there are no packages to process, then assume that this is an error# condition. Retry the operation soon.#unless ( keys %{$pkgList} ){$logger->verbose2("No packages to process");$statistics{state} = 'No Packages found';return;}# ## # Useful debug code# ## while ( (my ($pname, $pvers)) = each %{$pkgList} )# {# while ( (my ($pver, $ptime) ) = each %{$pvers} )# {# print "L-- $pname, $pver, $ptime \n";## }# }## Delete Excess Packages# Packages not required on the target# KLUDGE: Don't delete links to packages# Don't delete packages marked for deletion#my $excessPkgList;my $excessPkgListCount = 0;if ( $conf->{deletePackages} ){while ( (my ($pname, $pvers)) = each %{$remotePkgList} ){while ( (my ($pver, $pdata) ) = each %{$pvers} ){if ( !exists $pkgList->{$pname}{$pver} ){if ( exists $excludePkgs->{$pname} ){$logger->verbose2("Keep Excluded package: ${pname}");next;}if ( exists $pdata->{deleted} ){if ( $conf->{deleteAge} ){if ( $pdata->{deleted} <= $conf->{deleteAge} ){$logger->verbose2("Already marked for future age deletion: ${pname}/${pver}, $pdata->{deleted}");next;}$pdata->{FORCEDELETE} = 1;}if ( !$conf->{deleteImmediate} ){$logger->verbose2("Already marked for deletion: ${pname}/${pver}");next;}}## Force deletion# deleteImmediate mode# target is a broken link#$pdata->{FORCEDELETE} = 1if ($conf->{deleteImmediate} || $pdata->{broken});$excessPkgList->{$pname}{$pver} = $pdata;$excessPkgListCount++;$logger->verbose("Excess package: ${pname}/${pver}");}# else# {# $logger->verbose3("Retain package: ${pname}/${pver}");# }}}}## Process the remote list and the local list# The remote time-stamp is the modification time of the packages descpkg file## Mark for transfer packages that# Are in the local set but not the remote set# Have a different time stamp## Ignore packages not in the local archive# Ignore packages that don't have a descpkg# Ignore packages that are writable - still being formed#my $needPkgList;my $needPkgListCount = 0;my $filteredCount = 0;my $missingCount = 0;my $writableCount = 0;my $excludeCount = 0;my $packageVersionCount = 0;while ( (my ($pname, $pvers)) = each %{$pkgList} ){## Ignore excluded packages#if ( exists $excludePkgs->{$pname} ){$excludeCount++;next;}## Ignore packages that are filtered out#if ( defined $conf->{'packageFilter'} ){unless ( $pname =~ m~$conf->{'packageFilter'}~ ){$logger->verbose3("Filtering out: ${pname}");$filteredCount++;next;}}while ( (my ($pver, $pdata) ) = each %{$pvers} ){my $must_transfer;my $existsRemote = exists($remotePkgList->{$pname}) && exists ($remotePkgList->{$pname}{$pver});## Take care not to create an entry into $remotePkgList->{$pname}{$pver}# if it does not exist. Existence of {$pname}{$pver} is used later#my $tmtime = undef;if ($existsRemote && exists ($remotePkgList->{$pname}{$pver}{time})) {$tmtime = $remotePkgList->{$pname}{$pver}{time};}$packageVersionCount++;# Package is present in both listmy $localPackage = catdir( $conf->{'dpkg_archive'} , $pname, $pver );my ($mtime, $mode) = Utils::mtime( catfile($localPackage, 'descpkg') );if ( $mtime == 0 ){# PackageVersion not in local archive (at least the descpkg file is not)# Skip now - will pick it up later$logger->verbose("Package not in dpkg_archive: $pname, $pver");$missingCount++;next;}if ( $mode & 0222 ){# Descpkg file is writable# Package may be in the process of being created# If the package has been writable for a long time, then# consider for transfermy $age = $now - $mtime;if ( $age < ($conf->{'writewindow '} || 600) ){$logger->verbose("Package is writable: $pname, $pver, ", $now - $mtime);$writableCount++;next;}}if (! $existsRemote ) {## Package does not exist in the remote, and is eligible for transfer#$logger->verbose("Package Needs to be transferred: $pname, $pver");$must_transfer = 1;} elsif ( defined $tmtime && ($mtime != $tmtime) ) {## Package exists in both source and target# If the package-time is known, then ensure that they are the same#$logger->verbose("Package Needs to be transferred: $pname, $pver, $mtime, $tmtime");$must_transfer = 1;}else{## Package exists in both source and target# Symlink test: Ensure symlinks are the same#my $localIsSymlink = -l $localPackage;my $remoteIsSymlink = exists($remotePkgList->{$pname}) && exists ($remotePkgList->{$pname}{$pver}) && exists ($remotePkgList->{$pname}{$pver}{link});if ($remoteIsSymlink && $localIsSymlink) {## Both are symlinks - check that they address the same item#my $targetLink = $remotePkgList->{$pname}{$pver}{link};$logger->verbose2("Package is symlink: $pname, $pver -> $targetLink");my $lver = readlink( $localPackage );if ( ! defined $lver ) {$logger->warn("Can't resolve symlink: $pname, $pver");next;}if ($targetLink ne $lver ) {$logger->verbose("Package symlinks differ: $pname, $pver, $targetLink, $lver");$must_transfer = 3;}} elsif ($remoteIsSymlink || $localIsSymlink ) {## Only one is a symlink - force transfer#$logger->warn("Packages versions not both symlink: $pname, $pver, L:$remoteIsSymlink R:$localIsSymlink");$must_transfer = 2;}}## If we are forcing a package transfer then flag it and also remove it from the# RemotePkgList so that it will be transferred - even if its present on target#if ($must_transfer) {# Package not present on target, or timestamps differ$needPkgList->{$pname}{$pver} = $pdata;delete $RemotePkgList->{$pname}{$pver};$needPkgListCount++;next;}}}## Debug output only# Display what we need to transfer#if ( $conf->{verbose} > 2 ){while ( (my ($pname, $pvers)) = each %{$needPkgList} ){while ( (my ($pver, $pdata) ) = each %{$pvers} ){$logger->verbose("Need to transfer: $pname, $pver, $pdata");}}}if ( $conf->{verbose} ){$logger->verbose("Packages to transfer: $needPkgListCount");$logger->verbose("Packages to delete: $excessPkgListCount");$logger->verbose("Packages filtered out: $filteredCount");$logger->verbose("Packages missing: $missingCount");$logger->verbose("Packages still writable: $writableCount");$logger->verbose("Packages excluded: $excludeCount");}## Update stats# At this point we are looking pretty good#$statistics{state} = 'OK';$statistics{total} = $packageVersionCount;$statistics{transfer} = $needPkgListCount;$statistics{delete} = $excessPkgListCount;$statistics{filtered} = $filteredCount;$statistics{missing} = $missingCount;$statistics{writable} = $writableCount;$statistics{excluded} = $excludeCount;## Time to do the real work# Transfer packages and delete excess packages# Note: Perform the transfers first# Limit the number of packages processed in one pass#my $txcount = $conf->{maxpackages};## Transfer packages that we have identified#while ( (my ($pname, $pvers)) = each %{$needPkgList} ){while ( (my ($pver, $pdata) ) = each %{$pvers} ){tagForTransfer($pname, $pver);$needPkgListCount--;}}## Delete packages that have been identified as excess#while ( (my ($pname, $pvers)) = each %{$excessPkgList} ){while ( (my ($pver, $pdata) ) = each %{$pvers} ){tagForDelete ($pname, $pver, $pdata);$excessPkgListCount--;}}## Need to transmission remove tags for packages that don't need any more# ie: Tags that have since been made unnessesary#my $taggedPackages = getTaggedPackages();if ($taggedPackages) {## Mark entries thay we still need#foreach my $pname ( keys %{$pkgList} ){foreach my $pver ( keys %{$pkgList->{$pname}} ){if (exists $taggedPackages->{$pname}{$pver}) {$taggedPackages->{$pname}{$pver} = 1;}}}## Sweep out entries that we don't need any moreforeach my $pname ( keys %{$taggedPackages} ) {foreach my $pver ( keys %{$taggedPackages->{$pname}} ) {unless ($taggedPackages->{$pname}{$pver}) {$logger->verbose("Untag: $pname $pver");unTagForTransfer($pname, $pver);}}}}## Send package list to the target#sendPackageList ($pkgList);## On a successful transfer# Force tag processing# Set scan Mode to normal#$tagDirTime = 0;$releaseScanMode = 0;}#-------------------------------------------------------------------------------# Function : tagForTransfer## Description : Tag a package to be transferred## Inputs : $pname# $pvers## Returns :#sub tagForTransfer{my ($pname, $pver) = @_;my $tag = "$pname::$pver";my $myTag = catfile($conf->{'tagdir'} , $tag);unless (-f $myTag ) {$logger->verbose2("tagForTransfer: $pname, $pver");Utils::TouchFile($conf, $myTag);}unlink 'DELD::' . $myTag;unlink 'DELF::' . $myTag;}#-------------------------------------------------------------------------------# Function : unTagForTransfer## Description : Un Tag a package to be transferred## Inputs : $pname# $pvers## Returns :#sub unTagForTransfer{my ($pname, $pver) = @_;my $tag = "$pname::$pver";my $myTag = catfile($conf->{'tagdir'} , $tag);if (-f $myTag ) {$logger->verbose2("UnTagForTransfer: $pname, $pver");unlink($myTag);}}#-------------------------------------------------------------------------------# Function : tagForDelete## Description : Tag a package to be transferred# Generate tags of the form# DELD::pname::pver - Delayed Delete# DELF::pname::pver - Forced Delete## Inputs : $pname# $pver# $pdata - Type of delete## Returns :#sub tagForDelete{my ($pname, $pver, $pdata) = @_;my $tag = "$pname::$pver";my $myTag = catfile($conf->{'tagdir'} , $tag);my $delType = $pdata->{FORCEDELETE} ? 'D' : 'F';my $myDelTag = 'DEL' . $delType . '::' . $myTag;unless (-f $myDelTag ) {$logger->verbose2("tagForDelete: $pname, $pver");Utils::TouchFile($conf, $myDelTag);}unlink $myTag;}#-------------------------------------------------------------------------------# Function : getTaggedPackages## Description : Determine the tagged packages## Inputs : None## Returns : Returns a pointer to a hash of tagged packages of the form#sub getTaggedPackages{my $taggedPackages = {};foreach ( glob ("$conf->{'tagdir'}/*::*")){next if m~/DEL.::~;m~.*/(.*)::(.*)~;$taggedPackages->{$1}{$2} = 0;}return $taggedPackages;}#-------------------------------------------------------------------------------# Function : sendPackageList## Description : Transfer package list to the target## Inputs : $pkgList - Ref to hash of package names and versions## Returns : Nothing# Don't really care about any errors from this process# Its not essential#sub sendPackageList{return if $isS3Target;my ($pkgList) = @_;my ($fh, $filename) = tempfile( "/tmp/blat.$$.XXXX", SUFFIX => '.txt');$logger->verbose("sendPackageList:TmpFile: $filename");return if $conf->{'noTransfers'};## Create a temp file with data#foreach my $pname ( sort keys %{$pkgList} ){foreach my $pver ( sort keys %{$pkgList->{$pname}} ){print $fh "$pname/$pver\n";}}close $fh;## Transfer to target# Create the process pipe to transfer the file# gzip the file and pipe the result through a ssh session to the target machine# gzip -c filename | ssh ... "./receive_file filename"#my $ph;my $gzip_cmd = "$gzip --no-name -c \"$filename\"";my $tgt_cmd = "$conf->{'bindir'}/receive_file \"ArchiveList\"";my $ssh_cmd = sshCmd($tgt_cmd);$logger->verbose2("sendPackageList:gzip_cmd:$gzip_cmd");$logger->verbose2("sendPackageList:tgt_cmd:$tgt_cmd");$logger->verbose2("sendPackageList:ssh_cmd:$ssh_cmd");open ($ph, "$gzip_cmd | $ssh_cmd |");while ( <$ph> ){chomp;$logger->verbose2("sendPackageList:Data: $_");}close ($ph);unlink $filename;$logger->verbose("sendPackageList:End: $?");LogTxError ($?);}#-------------------------------------------------------------------------------# Function : getPkgList## Description : Determine a set of package versions within the list# of provided releases## Inputs : @rlist - A list of releases to examine## Returns : Ref to a hask of package versions#sub getPkgList{my %pdata;my $RM_DB;connectRM(\$RM_DB);$logger->verbose("getPkgList");## Determine the releases that are in this project# Build up an sql query#my $m_rlist = join ',', @_;my $m_sqlstr = "SELECT DISTINCT pv.PV_ID, pkg.PKG_NAME, pv.PKG_VERSION, pv.IS_DEPLOYABLE" ." FROM RELEASE_MANAGER.RELEASE_CONTENT rc, RELEASE_MANAGER.PACKAGE_VERSIONS pv, RELEASE_MANAGER.PACKAGES pkg" ." WHERE ( RTAG_ID in ($m_rlist) ) AND rc.PV_ID = pv.PV_ID AND pv.PKG_ID = pkg.PKG_ID" ." ORDER by PKG_NAME DESC";$logger->verbose3("getPkgList:Sql:$m_sqlstr");my $sth = $RM_DB->prepare($m_sqlstr);if ( defined($sth) ){if ( $sth->execute( ) ){if ( $sth->rows ){while (my @row = $sth->fetchrow_array ){$logger->verbose2("getPkgList:Data:@row");$pdata{$row[1]}{$row[2]} = 1;}}$sth->finish();}}else{$logger->warn("getPkgList: SQL Prepare failure");}disconnectRM(\$RM_DB);return \%pdata;}#-------------------------------------------------------------------------------# Function : getReleaseList## Description : Determine the list of releases to be proccessed# From:# Convert projects to a list of releases# Configured list of releases## Inputs : None## Returns : A list of releases to be processed#sub getReleaseList{my $RM_DB;my %rlist;my $m_sqlstr;$logger->verbose("getReleaseList");## Cache data# Only for one cycle of the main loop#if ( exists $releaseData{getReleaseList} ){$logger->verbose3("getReleaseList:Cache hit");return @{$releaseData{getReleaseList}};}## All projects#if ( $conf->{'allProjects'} ){$m_sqlstr = "SELECT rt.RTAG_ID" ." FROM RELEASE_MANAGER.RELEASE_TAGS rt" ." WHERE rt.OFFICIAL != 'A'"." AND rt.OFFICIAL != 'S'";#" AND rt.OFFICIAL != 'Y'";}else{## Convert list of projects into a list of releases#if ( @projectList ){## Determine the releases that are in this project# Build up an sql query#my $m_plist = join ',', @projectList;$m_sqlstr = "SELECT rt.RTAG_ID" ." FROM RELEASE_MANAGER.RELEASE_TAGS rt" ." WHERE ( PROJ_ID in ( $m_plist) )" ." AND rt.OFFICIAL != 'A'" ." AND rt.OFFICIAL != 'S'";#" AND rt.OFFICIAL != 'Y'";}}if ( defined $m_sqlstr ){$logger->verbose3("getReleaseList:Sql:$m_sqlstr");connectRM(\$RM_DB);my $sth = $RM_DB->prepare($m_sqlstr);if ( defined($sth) ){if ( $sth->execute( ) ){if ( $sth->rows ){while (my @row = $sth->fetchrow_array ){$logger->verbose2("getReleaseList:Data:@row");$rlist{$row[0]} = 1;}}$sth->finish();} else {$logger->warn("getReleaseList: SQL Execute failure");}}else{$logger->warn("getReleaseList: SQL Prepare failure");}disconnectRM(\$RM_DB);}## Add in the user specified list of releases#$rlist{$_} = 1 foreach(@releaseList);## Sort for pretty display only#@{$releaseData{getReleaseList}} = sort {$a <=> $b} keys %rlist;return @{$releaseData{getReleaseList}};}#-------------------------------------------------------------------------------# Function : getPackageVersions## Description : Get the list of package-versions available in the package# store.## Inputs : pkgName - The package name## Returns : Array of versions#sub getPackageVersions{my ($pkgName) = @_;my @versionList;my $pkgDir = catfile($conf->{'dpkg_archive'} , $pkgName );my $dh;unless (opendir($dh, $pkgDir)){$logger->warn ("Can't opendir $pkgDir: $!");return @versionList;}## Process each entry# Ignore those that start with a .#while (my $version = readdir($dh) ){next if ( $version =~ m~^\.~ );my $file = catfile($pkgDir, $version);next unless ( -d $file );push @versionList, $version;$logger->verbose3("getPackageVersions: $pkgName, $version");}closedir $dh;return @versionList;}#-------------------------------------------------------------------------------# Function : getArchiveList## Description : Get the entire set of package versions in the archive## Inputs :## Returns : Ref to a hash of package versions#sub getArchiveList{my $pkgDir = $conf->{'dpkg_archive'};my %archiveList;my $dh;my @pkgList;unless (opendir($dh, $pkgDir)){$logger->warn ("Can't opendir $pkgDir: $!");return \%archiveList;}## Process each entry# Ignore those that start with a .# Ignore files#while (my $pkgName = readdir($dh) ){next if ( $pkgName =~ m~^\.~ );my $file = catfile($pkgDir, $pkgName);next unless ( -d $file );$logger->verbose3("getArchiveList: $pkgName");push @pkgList, $pkgName;}closedir $dh;# Now get the package versions# Sort for pretty displayforeach my $pname (sort @pkgList){foreach my $pver (getPackageVersions($pname)){$archiveList{$pname}{$pver} = 1;}}return \%archiveList;}#-------------------------------------------------------------------------------# Function : getBlatBin## Description : Get the list of files that should be in the targetbin directory## Inputs : Nothing## Returns : A hash of data#sub getBlatBin{my $data;$logger->verbose("getBlatBin: $targetBinDir");if (opendir(DIR, $targetBinDir ) ) {my @vlist = readdir(DIR);closedir DIR;foreach my $vname ( sort @vlist ){next if ( $vname eq '.' );next if ( $vname eq '..' );next unless ( -f "$targetBinDir/$vname" );if (open FILE, "$targetBinDir/$vname") {$data->{$vname} = Digest::MD5->new->addfile(*FILE)->hexdigest;close (FILE);}}} else {$logger->warn("BlatBin Not Found: $targetBinDir");}return $data;}#-------------------------------------------------------------------------------# Function : maintainTagList## Description : Maintain a data structure for the maintenance of the# tags directory## Inputs : None## Returns : Nothing#sub maintainTagList{## Time to perform the scan# Will do at startup and every time period there after#return unless ( $now > ($lastTagListUpdate + $conf->{tagListUpdate} ));$logger->verbose("maintainTagList");$lastTagListUpdate = $now;## Get list of things#my %config;## Is Tag Processing active# Can configure blat to disable tag sync#if ( $conf->{'tagMaxPackages'} > 0 ){if ($conf->{'allNewPkgs'} ) {$config{allNewPkgs} = 1}if ($conf->{'allArchive'} ){$config{allArchive} = 1}elsif ($conf->{'allProjects'} ){$config{allProjects} = 1;}else{%{$config{projects}} = map { $_ => 1 } @projectList;%{$config{releases}} = map { $_ => 1 } getReleaseList();}} else {$config{disableTagTx} = 1}## Save data#my $dump = Data::Dumper->new([\%config], [qw(*config)]);#print $dump->Dump;#$dump->Reset;## Save config data#my $conf_file = catfile( $conf->{'tagdir'},'.config' );$logger->verbose3("maintainTagList: Writting $conf_file");my $fh;open ( $fh, '>', $conf_file ) or $logger->err("Can't create $conf_file: $!");print $fh $dump->Dump;close $fh;}#-------------------------------------------------------------------------------# Function : processTags## Description : Process tags and send marked package versions to the target# Determine if new tags are present# Process each tag## Two types of tag# Transfer Requests# Delete Request# Send packages before deleting packages## Inputs : None## Returns : Nothing#sub processTags{## Determine if new tags are present by examining the time# that the directory was last modified.## Allow for a forced scan to catch packages that did not transfer# on the first attempt#my $tagCount = 0;my $delCount = 0;my ($mtime) = Utils::mtime($conf->{'tagdir'} );if ( ($mtime > $tagDirTime) || ($now > ($lastDirScan + $conf->{'forcedirscan'})) ){$logger->verbose2("processTags: ",$conf->{'tagdir'}, $mtime - $tagDirTime, $now - $lastDirScan);$tagDirTime = $mtime;$lastDirScan = $now;my $txcount = $conf->{'tagMaxPackages'};my $dh;unless (opendir($dh, $conf->{'tagdir'})){$logger->warn ("can't opendir $conf->{'tagdir'}: $!");return;}## Process each entry# Ignore those that start with a .# Attempt to keep the entries in time of creation order.#my @tagPkgList;my %deleteTags;my @sortedFiles = sort {(stat($conf->{'tagdir'} .'/'. $a))[10] <=> (stat($conf->{'tagdir'} .'/'. $b))[10]} readdir($dh);closedir $dh;foreach my $tag (@sortedFiles){next if ( $tag =~ m~^\.~ );my $file = "$conf->{'tagdir'}/$tag";$logger->verbose3("processTags: $file");next unless ( -f $file );next if ( $tag eq 'ReleaseList' );if ( $tag =~ m~^DEL(.)::(.+)::(.+)~) {$deleteTags {$2}{$3}{file} = $file;$deleteTags {$2}{$3}{mode} = $1 eq 'F' ? 1 : 0;$delCount++;}if ( $tag =~ m~(.+)::(.+)~ ){push @tagPkgList, join($;, $1, $2, $file);$tagCount++;}}$statistics{tagCount} = $tagCount;$statistics{tagDelCount} = $delCount;## Process delete requests after all transfers have occured#my $showDel = $delCount;my $showSend = $tagCount;unless ($tagCount) {delete_pkgs:while ( (my ($pname, $pvers)) = each %deleteTags ){while ( (my ($pver, $pdata) ) = each %{$pvers} ){if ( --$txcount <= 0 ){$tagDirTime = 0;last delete_pkgs;}if ( readConfig() ){$logger->warn("Config file changed");$txcount = 0;$tagDirTime = 0;last delete_pkgs;}deletePackage ($pname, $pver, $pdata->{mode});unlink $pdata->{file};$delCount--;reapChildren();Utils::resetWedge();}}}## Process the packages located in the tags area# Have attempted to keep them in creation order#my $notSent = 0;send_tags:foreach my $entry ( @tagPkgList ){my ($package, $version, $file) = split ($;, $entry);if ( --$txcount <= 0 ){$tagDirTime = 0;last send_tags;}if ( readConfig() ){$logger->warn("Config file changed");$txcount = 0;$tagDirTime = 0;last send_tags;}# ## # Don't transfer 'extra' packages# # Removed. It was casuing a tarZip to be triggered, but never transferred# ## if (exists ($extraPkgs->{$package}) )# {# $logger->warn ("Delete excess package tag: $package::$version");# unlink $file;## } elsemy ($txDone, $workDone) = transferPackage( $package, $version );$txcount++ unless $workDone;if ($txDone ) {unlink $file;}else{$notSent++;if ($conf->{'tagage'} > 0) {my ($mtime) = Utils::mtime( $file );if ( $now - $mtime > $conf->{'tagage'} ){$logger->warn ("Delete unsatisfied tag: $package::$version after $conf->{'tagage'}" );unlink $file;$statistics{staleTags}++;}}}$tagCount--;reapChildren();Utils::resetWedge();}# Nice outputif ($showSend) {$logger->warn("Max tag transfer count exceeded: $notSent transfer remaining");$logger->warn("Max tag transfer count exceeded: $delCount deletion remaining");}elsif ($showDel) {$logger->warn("Max tag transfer count exceeded: $delCount deletion remaining");}}}#-------------------------------------------------------------------------------# Function : transferBlatBin## Description : Transfer any of the Blat Bin files that are out of date# on the target## Inputs : $fileHash - A hash whose files are those that need# to be updated## Returns :#sub transferBlatBin{my ($hash) = @_;$logger->verbose("transferBlatBin");return if $conf->{'noTransfers'};foreach my $file ( sort keys %{$hash}){$logger->logmsg("transferBlatBin: $file");## Transfer one file using only 'ssh'# Create the target directory on the fly# Manipulate file permissions# Report errorsmy $tar_cmd = "cat \"$targetBinDir/$file\"";my $tgt_cmd = "mkdir -p ~/bin && if [ -f \"~/bin/$file\" ] ; then chmod +x+w \"~/bin/$file\"; fi && cat > \"~/bin/$file\" && chmod +x-w \"~/bin/$file\" || exit 1";my $ssh_cmd = sshCmd($tgt_cmd);my $cat_cmd =$logger->verbose2("transferBlatBin:tar_cmd:$tar_cmd");$logger->verbose2("transferBlatBin:tgt_cmd:$tgt_cmd");$logger->verbose2("transferBlatBin:ssh_cmd:$ssh_cmd");my $ph;open ($ph, "$tar_cmd | $ssh_cmd |");while ( <$ph> ){chomp;$logger->verbose2("transferBlatBin:Data: $_");}close ($ph);$logger->verbose("transferBlatBin:End: $?");if ( $? != 0 ){$logger->warn("transferBlatBin:Transfer Error: $file, $?");}LogTxError ($?);}}#-------------------------------------------------------------------------------# Function : calcZipRequests## Description : Determine the number of packages that I need that have# outstanding ZIP requests## Purpose is to limit the number of ZIP requests that are# outstanding at any one time to reduce disc space## ie: Iff there 1000 outstanding transfers, we don't need to zip all# of the packages right now.## Inputs :## Returns :##sub calcZipRequests{my $zipRequests = 0;$conf->{'tagdir'} =~ m~^(.*)/~;my $tagRoot = $1;my $dh;if ( opendir($dh, $conf->{'tagdir'}) ){while (my $tag = readdir($dh) ){next if ( $tag =~ m~^\.~ );my $file = "$conf->{'tagdir'}/$tag";# $logger->verbose3("calcZipRequests: $file");next unless ( -f $file );next if ( $tag eq 'ReleaseList' );next if ( $tag =~ m~^DEL(.)::~);my $zipTag = catfile($tagRoot, 'tarZip', $tag);if (-f $zipTag ) {$zipRequests++;}}closedir $dh;}$logger->verbose("zipRequests: $zipRequests");return $zipRequests;}#-------------------------------------------------------------------------------# Function : transferPackage## Description : Transfer specified package to target system# If a symlink, then a symlink will be transferred## Inputs : $pname - Name of the package# $pver - Package version## Returns : Two items# txDone - True package transferred. False Not transferred# workDone - True: work done. Flase: No work done## true - Package transferred# false - Package not transferred### PackageExcluded - Assume that its been transferred# PackageAtTarget - Package known to be on the target# PackageNotInArchive - Package not found in dpkg_archive# PackageZipRequested - Requested a Zipped version of the package# PackageZipNotRequested - Need a Zipped version, but I have too many requests outstanding# PackageTransferOK - Package has been transferred# PackageTransferError - Error in the package transfer###sub transferPackage{my ($pname, $pver ) = @_;my $rv = 0;my $cmdRv = 0;$logger->verbose("Enter transferPackage: @_");my $startTime = time;## Do not transfer excluded files#if ( exists $excludePkgs->{$pname} ){$logger->warn("transferPackage: Excluded package not transferred: $pname, $pver");return 1, 0;}## Apply package filter#if ( defined $conf->{'packageFilter'} ){unless ( $pname =~ m~$conf->{'packageFilter'}~ ){$logger->warn("transferPackage: Filtered out package not transferred: $pname, $pver");return 1, 0;}}## If its known to be in the target archive, then we don't need to transfer it again# It may have been transferred in this cycle# It may have been in the archive anyway#if ( exists($RemotePkgList->{$pname}) && exists ($RemotePkgList->{$pname}{$pver})) {$logger->verbose("transferPackage: Already in archive");#$logger->logmsg("transferPackage: $pname, $pver : Already in archive");return 1, 0;}my $sdir = catfile( $conf->{'dpkg_archive'} , $pname );my $sfile = catfile( $sdir, $pver );unless ( -d $sfile ){$logger->warn("transferPackage:Package not found: $pname, $pver");return 0, 0;}my $tzdir = catfile( $conf->{'dpkg_archive'} , '.dpkg_archive', 'tarStore' );my $tzfile = $pname . '__' . $pver . '.tgz';my $tzpath = catfile($tzdir, $tzfile);unless (-f $tzpath) {$logger->verbose("transferPackage: tarZip not found - $tzfile");$conf->{'tagdir'} =~ m~^(.*)/~;my $tagRoot = $1;my $tag = "$pname::$pver";my $zipTag = catfile($tagRoot, 'tarZip', $tag);my $myTag = catfile($conf->{'tagdir'} , $tag);# Ensure I have an outstanding request, so that the tarZip process can notify meUtils::TouchFile($conf, $myTag) unless -f $myTag;unless (-f $zipTag) {if (calcZipRequests() < $conf->{maxTarZips} ) {$logger->logmsg("transferPackage. Request Zip: @_");Utils::TouchFile($conf, $zipTag);} else {$logger->verbose("transferPackage: Max outstanding tarZip Requests");}}return 0, 0;}############################################################################ Transfer the package / symlink#$logger->logmsg("transferPackage: @_");my $tzfsize = -s $tzpath;if ($isS3Target) {$cmdRv = transferPackageS3($tzdir, $tzfile, $pname, $pver);} else {$cmdRv = transferPackageSsh($tzdir, $tzfile, $sfile, $pname, $pver);}## Display the size of the package (tarZipped)# Diagnostic use#if ($conf->{txdetail}) {my $size = sprintf "%.3f", $tzfsize / 1024 / 1024 / 1024 ;my $duration = time - $startTime;$logger->logmsg("transferPackage: Stats: $pname, $pver, $size Gb, $duration Secs");}if ( $cmdRv == 0 ) {## Mark has having been transferred in the current cycle#$RemotePkgList->{$pname}{$pver}{transferred} = 1;$rv = 1;$statistics{txCount}++;} else {$logger->warn("transferPackage:Transfer Error: $pname, $pver, $?");}LogTxError ($?);return $rv, 1;}#-------------------------------------------------------------------------------# Function : transferPackageSsh## Description : Transfer a package via an ssh connection## Inputs : $tzdir - Directory that contains the tarZip file# $tzname - Name of tarZip File# $sfile - Full path to the source file# $pname - Package Name# $pver - Package Version## Returns : Result Code# 0 - Transfer OK# <0 - Skip transfer# >0 - Command error code#sub transferPackageSsh{my ($tzdir, $tzname, $sfile, $pname, $pver) = @_;my $tgt_cmd;my $ssh_cmd;my $cmdRv = 0;my $tzfile = catfile($tzdir, $tzname);if (-l $sfile) {## Determine the value of the symlink# Only support simple symlinks - that are in the same directory#my $lver = readlink( $sfile );if ( ! defined $lver ) {$logger->warn("Can't resolve symlink: $pname, $pver");return -1;}if ( $lver =~ m ~/~ ) {$logger->warn("Won't resolve symlink: $pname, $pver, $lver");return -1;}$tgt_cmd = "$conf->{'bindir'}/receive_symlink \"$pname\" \"$pver\" \"$lver\"";$ssh_cmd = sshCmd($tgt_cmd);} else {## Create the process pipe to transfer the package# Pipe the tarZip of the package through a ssh session to the target machine# cat $tzpath | ssh ... "./receive_package pname pver"#$tgt_cmd = "$conf->{'bindir'}/receive_package \"$pname\" \"$pver\"";$ssh_cmd = sshCmd($tgt_cmd);$ssh_cmd .= " <$tzfile"}$logger->verbose2("transferPackage:tgt_cmd:$tgt_cmd");$logger->verbose2("transferPackage:ssh_cmd:$ssh_cmd");unless ($conf->{'noTransfers'}) {my $ph;open ($ph, "$ssh_cmd |");while ( <$ph> ){chomp;$logger->verbose2("transferPackage:Data: $_");}close ($ph);$cmdRv = $?;$logger->verbose("transferPackage:End: $?");}return $cmdRv;}#-------------------------------------------------------------------------------# Function : transferPackageS3## Description : Transfer a package to an AWS S3 bucket# Requires that the package already be tarZip-ed## Inputs : $tzdir - Directory that contains the tarZip file# $tzfile - Name of tarZip File# $pname - Package Name# $pver - Package Version## Returns : Result Code# 0 - Transfer OK# <0 - Skip transfer# >0 - Command error code#sub transferPackageS3{my ($tzdir, $tzfile, $pname, $pver) = @_;my $cmdRv = 0;## Locate the file on the dpkgArchive tarZip store#my $sfile = catfile($tzdir, $tzfile);if (-l $sfile) {$logger->warn("Will not transfer symlink: $pname, $pver");return -1;}## Create a command to transfer the file to AWS use the cli tools# Note: Ive seen problem with this when used from Perth to AWS (Sydney)# If this is an issue use curl - see the savePkgToS3.sh for an implementation#my $s3_cmd = "aws --profile $conf->{'S3Profile'}";$s3_cmd .= " --region $conf->{'S3Region'}" if (defined $conf->{'S3Region'});$s3_cmd .= " s3 cp $sfile s3://$conf->{'S3Bucket'}/$tzfile";$logger->verbose2("transferPackage:s3_cmd:$s3_cmd");unless ($conf->{'noTransfers'}) {my $ph;open ($ph, "$s3_cmd |");while ( <$ph> ){chomp;$logger->verbose2("transferPackage:Data: $_");}close ($ph);$cmdRv = $?;$logger->verbose("transferPackage:End: $?");}return $cmdRv;}#-------------------------------------------------------------------------------# Function : deletePackage## Description : Delete specified package to target system## Inputs : $pname - Name of the package# $pver - Package version# $mode - 1 ForcedDelete 0: Tag for delayed Delete## Returns : true - Package deleted# false - Package not deleted#sub deletePackage{my ($pname, $pver, $mode ) = @_;my $rv = 0;my $cmdRv = 0;$logger->logmsg("deletePackage: $pname, $pver");if ($isS3Target) {## S3 Transfer is being used to store packages forever# Don't normally want to be able to delete such packages, so the default# is to prevent deletion of S3 transferred packages.if ($conf->{'S3AllowDelete'}) {# Create the process pipe to delete the packagemy $tzfile = $pname . '__' . $pver . '.tgz';my $s3_cmd = "aws --profile $conf->{'S3Profile'}";$s3_cmd .= " --region $conf->{'S3Region'}" if (defined $conf->{'S3Region'});$s3_cmd .= " s3 rm s3://$conf->{'S3Bucket'}/$tzfile";$logger->verbose2("deletePackage:s3_cmd:$s3_cmd");my $ph;open ($ph, "$s3_cmd |");while ( <$ph> ){chomp;$logger->verbose2("deletePackage:Data: $_");}close ($ph);$cmdRv = $?;} else {$logger->warn("deletePackage: S3Deletion disabled: $pname, $pver");}} else {## Create the process pipe to delete the package# ssh ... "./delete_package ${rx_opts} \"$pname\" \"$pver\""#unless ($conf->{'noTransfers'}) {my $ph;my $flags = $mode ? '-T' : '';my $tgt_cmd = "$conf->{'bindir'}/delete_package $flags \"$pname\" \"$pver\"";my $ssh_cmd = sshCmd($tgt_cmd);$logger->verbose2("deletePackage:tgt_cmd:$tgt_cmd");$logger->verbose2("deletePackage:ssh_cmd:$ssh_cmd");open ($ph, "$ssh_cmd |");while ( <$ph> ){chomp;$logger->verbose2("deletePackage:Data: $_");}close ($ph);$cmdRv = $?;}}## Common code#$logger->verbose("deletePackage:End: $cmdRv");if ( $cmdRv == 0 ){$rv = 1;$statistics{delCount}++;delete $RemotePkgList->{$pname}{$pver};}else{$logger->warn("deletePackage:Error: $pname, $pver, $?");}LogTxError ($?);return $rv;}#-------------------------------------------------------------------------------# Function : sshCmd## Description : Generate a ssh based command## Inputs : Target command## Returns : An shh command string#sub sshCmd{my ($tgt_cmd) = @_;my $sshPort = '';$sshPort = "-p $conf->{'sshport'}"if ($conf->{'sshport'});return "ssh -o \"BatchMode yes\" -i $conf->{'identity'} ${sshPort} $conf->{'user'}\@$conf->{'hostname'} \"$tgt_cmd\" 2>&1";}#-------------------------------------------------------------------------------# Function : parsePkgList## Description : Parse one line from a pkgList# Lines are multiple item="data" items# time="1497586865" GMT="Fri Jun 16 04:21:05 2017" pname="ERGissaccounts" pver="1.0.7169.mas"# time="1497586865" GMT="Fri Jun 16 04:21:05 2017" pname="ERGissaccounts" pver="1.0.7169.mas" link="latest"## Inputs : $line - Line of data# $hashp - Ref to hash to populate## Returns : A hash of data items#sub parsePkgList{my ($line, $hashp) = @_;my $rv;while ( $line =~ m~\s*(.+?)="(.+?)"~ ){$rv->{$1} = $2;$line = $';}#Utils::DebugDumpData ("parsePkgList", $rv);my $pname = $rv->{pname};my $pver = $rv->{pver};return undef unless ( $pname && $pver );delete $rv->{pname};delete $rv->{pver};delete $rv->{GMT};$hashp->{$pname}{$pver} = $rv;return $hashp;}#-------------------------------------------------------------------------------# Function : parsePkgMetaData## Description : Parse one line of meta data from a pkgList# Lines are multiple item="data" items## Inputs : $line - Line of data# $hashp - Ref to hash to populate## Returns : Nothing#sub parsePkgMetaData{my ($line, $hashp) = @_;if ( $line =~ m~\s+(.+?)="(.+?)"~ ){$hashp->{$1} = $2;$statistics{'Target.' . $1} = $2;$line = $';$logger->verbose2("parsePkgMetaData: $1 = $2");}}#-------------------------------------------------------------------------------# Function : parseBlatBinData## Description : Parse one line of Blat Bin data from a pkgList# Lines are of the form:# BlatBin MD5="dbc4507f4db5b41f7358b28bce65a15d" file="ddp-gtar"## Inputs : $line - Line of data# $hashp - Ref to hash to populate## Returns : Nothing#sub parseBlatBinData{my ($line, $hashp) = @_;my $rv;$line =~ s~^\S+~~;while ( $line =~ m~\s*(.+?)="(.+?)"~ ){$rv->{$1} = $2;$line = $';}#Utils::DebugDumpData ("parseBlatBinData", $rv);my $fname = $rv->{file};my $md5 = $rv->{MD5};return undef unless ( $fname && $md5 );$logger->verbose2("parseBlatBinData: $fname : $md5");$hashp->{$fname} = $md5;}#-------------------------------------------------------------------------------# Function : resetDailyStatistics## Description : Called periodically to reset the daily statistics## Inputs : $time - Current time## Returns :#sub resetDailyStatistics{my ($time) = @_;## Detect a new day#my $today = (localtime($time))[7];if ($yday != $today){$yday = $today;$logger->logmsg('Resetting daily statistics' );# Note: Must match @recoverTags in readStatistics$statistics{dayStart} = $time;$statistics{txCount} = 0;$statistics{delCount} = 0;$statistics{staleTags} = 0;$statistics{linkErrors} = 0;}}#-------------------------------------------------------------------------------# Function : readStatistics## Description : Read in the last set of stats# Used after a restart to recover daily statistics## Inputs :## Returns :#sub readStatistics{my @recoverTags = qw(dayStart txCount delCount staleTags linkErrors);if ($conf->{'statsfile'} and -f $conf->{'statsfile'}){if (open my $fh, $conf->{'statsfile'}){while (<$fh>){m~(.*):(.*)~;if ( grep( /^$1$/, @recoverTags ) ){$statistics{$1} = $2;$logger->verbose("readStatistics $1, $2");}}close $fh;$yday = (localtime($statistics{dayStart}))[7];}}}#-------------------------------------------------------------------------------# Function : periodicStatistics## Description : Called on a regular basis to write out statistics# Used to feed information into Nagios## This function is called via an alarm and may be outside the normal# processing loop. Don't make assumptions on the value of $now## Inputs :## Returns :#sub periodicStatistics{## A few local stats#$statistics{SeqNum}++;$statistics{timeStamp} = time();$statistics{upTime} = $statistics{timeStamp} - $startTime;$statistics{wedged} = Utils::isWedged($conf);$statistics{state} = $statistics{wedged} ? 'Wedged' : $statistics{state};# Reset daily accumulations - on first use each dayresetDailyStatistics($statistics{timeStamp});## Write statistics to a file# Write to a tmp file, then rename.# Attempt to make the operation atomic - so that the file consumer# doesn't get a badly formed file.#if ($conf->{'statsfiletmp'}){my $fh;unless (open ($fh, '>', $conf->{'statsfiletmp'})){$fh = undef;$logger->warn("Cannot create temp stats file: $!");}else{foreach my $key ( sort { lc($a) cmp lc($b) } keys %statistics){print $fh $key . ':' . $statistics{$key} . "\n";$logger->verbose2('Statistics:'. $key . ':' . $statistics{$key});}close $fh;# Rename temp to real filerename $conf->{'statsfiletmp'}, $conf->{'statsfile'} ;}}}#-------------------------------------------------------------------------------# Function : sighandlers## Description : Install signal handlers## Inputs : $conf - System config## Returns : Nothing#sub sighandlers{$SIG{TERM} = sub {# On shutdown$logger->logmsg('Received SIGTERM. Shutting down....' );unlink $conf->{'pidfile'} if (-f $conf->{'pidfile'});exit 0;};$SIG{HUP} = sub {# On logrotate$logger->logmsg('Received SIGHUP.');$logger->rotatelog();};$SIG{USR1} = sub {# On Force Archive Sync$logger->logmsg('Received SIGUSR1.');$lastReleaseScan = 0;$lastTagListUpdate = 0;$lastRmConfRead = 0;};alarm 60 unless $conf->{debug};$SIG{ALRM} = sub {# On Dump Statistics$logger->verbose2('Received SIGUSR2.');periodicStatistics();alarm 60;};$SIG{__WARN__} = sub { $logger->warn("@_") };$SIG{__DIE__} = sub { $logger->err("@_") };}#-------------------------------------------------------------------------------# Function : LogTxError## Description : Detect restoration of communication and log such# Don't log failures as the user will do that## Inputs : $state - 0 - All is well# !0 - Error## Returns : Nothing#sub LogTxError{my ($state) = $@;if ( $state ){$statistics{linkErrors}++ unless($comError);$comError++;$statistics{state} = 'No Communication';}elsif ( $comError ){$logger->warn("Communication Restored");$comError = 0;$statistics{state} = 'OK';}}#-------------------------------------------------------------------------------# Function : Error, Verbose, Warning## Description : Support for JatsRmApi## Inputs : Message## Returns : Nothing#sub Error{$logger->err("@_");}sub Verbose{$logger->verbose2("@_");}sub Warning{$logger->warn("@_");}