Rev 7033 | Blame | Compare with Previous | Last modification | View Log | RSS feed
package com.erggroup.buildtool.daemon;import com.erggroup.buildtool.ripple.Package;import com.erggroup.buildtool.ripple.ReleaseConfig;import com.erggroup.buildtool.ripple.ReleaseManager;import java.io.File;import java.io.IOException;import java.net.ServerSocket;import java.sql.SQLException;import java.util.ArrayList;import java.util.Iterator;import java.util.Map;import java.util.concurrent.ConcurrentLinkedQueue;import org.slf4j.Logger ;import org.slf4j.LoggerFactory;/**BuildDaemon sub component and entry point (main BuildDaemon thread)*/public class BuildDaemon{/**hostname* @attribute*/static String mHostname;/**GBE_LOG* @attribute*/static String mGbeLog;/** mShutDown* @attribute* Request to shutdown the build system gracefully*/static boolean mShutDown = false;/**Logger* @attribute*/protected static Logger mLogger = LoggerFactory.getLogger(BuildDaemon.class);/**Collection of ThreadIdentifier objects.* Using a ConcurrentLinkedQueue because we add and remove items from the collection* and it is being accessed from multiple threads.* @attribute*/private ConcurrentLinkedQueue<ThreadIdentifier> mThreadCollection = new ConcurrentLinkedQueue<ThreadIdentifier>();/** Last time we did a poll for new builds**/private long mLastBuildPoll = 0;/** Build Process poll time in seconds* Modified on error to slow the poll rate*/private int mPollTime = 3;/**Nagios* @attribute*/ServerSocket nagiosSrv;NagiosThread nagiosChecker;/** Local class to assist in reporting Nagios Status**/public static class NagiosInfo {int threadCount = 0;int threadAliveCount = 0;int masterCount = 0;int slaveCount = 0;ArrayList<String> extendedReason = new ArrayList<String>();}/**mThreadCollection items*/private class ThreadIdentifier{/**rcon_id associated with the thread* @attribute*/private final int mRconId;/**thread identifier* @attribute*/private final BuildThread mThread;/**constructor*/ThreadIdentifier(int rconId, BuildThread thread){mLogger.debug("ThreadIdentifier {}", rconId);mRconId = rconId;mThread = thread;}/**accessor*/int getRconId(){mLogger.info("get_rcon_id returned {}", mRconId);return mRconId;}/**accessor*/BuildThread getThread(){mLogger.debug("get_thread");return mThread;}}/**Exception thrown to indicate an uncorrectable error*/public class BuildException extends Exception{BuildException(String msg){mLogger.error(msg);}private static final long serialVersionUID = 1L;}/**main method for the Build Daemon program* instantiates a BuildDaemon object*/public static void main(String[] args){mLogger.debug("main");mHostname = testEnvVar("GBE_HOSTNAME");testEnvVar("ANT_HOME");testEnvVar("GBE_UNC");testEnvVar("GBE_DPKG_SSH_PROPERTIES");mGbeLog = testEnvVar("GBE_LOG");File gl = new File( mGbeLog );if ( !gl.isDirectory() ){mLogger.error("main GBE_LOG is not a directory");System.exit(1);}// Connection information for the databaseString connectionString = System.getenv("GBE_RM_LOCATION");String username = System.getenv("GBE_RM_USERNAME");String password = System.getenv("GBE_RM_PASSWORD");boolean showHelp = false;int consumed = 0;for (int optind = 0; optind < args.length; optind += 1 + consumed){consumed = 0;boolean argsRemains = optind < (args.length - 1);if (args[optind].equals("-c") && argsRemains ){connectionString = args[optind+1];consumed ++;}else if (args[optind].equals("-u") && argsRemains){username = args[optind+1];consumed ++;}else if (args[optind].equals("-p") && argsRemains){password = args[optind+1];consumed ++;}else{showHelp = true;}}if ( showHelp ||connectionString == null ||connectionString.length() == 0 ||username == null ||username.length() == 0 ||password == null ||password.length() == 0){mLogger.error("Usage: java -jar abtdD.jar -c connectionString -u username -p password");System.exit(1);}BuildDaemon buildDaemon = new BuildDaemon(connectionString, username, password);buildDaemon.cleanUp();}/*** Test a named EnvVar to ensure that it has been set* Call after the mLogger has been setup* @param varName - Name of string to examine* @return - String value of the parameter*/private static String testEnvVar(String varName) {String envVar = System.getenv(varName);if ( envVar == null ){mLogger.error("main {} not set", varName);System.exit(1);}return envVar;}/**constructor, implements the sequence diagram spawn thread*/public BuildDaemon(String connectionString, String username, String password){this(new ReleaseManager(connectionString, username + "[release_manager]", password));}public BuildDaemon(ReleaseManager releaseManager){String utf = null;mLogger.warn("BuildDaemon");try{// Flag UTF in progressif ( releaseManager.mConnectionString.compareTo("unit test spawn thread") == 0 ){utf = releaseManager.mConnectionString;}if ( Package.mGenericMachtype == null ){throw new BuildException("run GBE_MACHTYPE not set");}if ( Package.mGbeDpkg == null ){throw new BuildException("run GBE_DPKG not set");}// Set the default handler invoked when a thread abruptly terminates due to an// uncaught exception, and no other handler has been defined for that thread.//Thread.setDefaultUncaughtExceptionHandler(new BuildDaemonUncaughtExceptionHandler());//// Start the Nagios Interface Thread// Unless performing a unit test//if ( utf == null ){startNagiosServer();}// Discover new build daemons to be started on the current host//while (!mShutDown){mPollTime = 3;// Poll for slave build requestslookForBuildRequests(releaseManager, utf);// Poll for new builds every few (5) minutesif ( System.currentTimeMillis() - mLastBuildPoll > 5 * 60 * 1000) {lookForNewBuildDaemons(releaseManager, utf);mLastBuildPoll = System.currentTimeMillis();}// In UTF mode we only execute the loop onceif ( utf != null ) {break;}// Wait 3 seconds before polling againdaemonSleepSecs(mPollTime);}mLogger.error("BuildDaemon daemon spawning shutdown");if (mShutDown){notifyAllBuildThreads();cleanUp();}}catch( BuildException e ){mLogger.error("BuildDaemon caught Exception");}}/*** Used by the Polling Thread to notify slaves of an active build request* The slaves will 'wait' for notification in order to proceed* Thus the polling is done once, rather than each slave thread polling individually* @param rconId*/public void notifyBuildThread(Integer rconId) {for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (threadIdentifier.mThread.isAlive()){BuildThread bt = threadIdentifier.mThread;if (bt.mRconId == rconId ) {mLogger.warn("Notify RtagId:{}, bmlId:{}", bt.mRtagId, bt.mRconId );synchronized (bt.mActiveBuildMonitor) {bt.mActiveBuildMonitor.notifyAll();}}}}}/*** Notify all build threads of some activity* This will release all threads waiting on the ActiveBuildMonitor*/public void notifyAllBuildThreads() {mLogger.error("notifyAllBuildThreads");for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (threadIdentifier.mThread.isAlive()){BuildThread bt = threadIdentifier.mThread;synchronized (bt.mActiveBuildMonitor) {bt.mActiveBuildMonitor.notifyAll();}}}}/** Look for changes in the build daemons required* Startup new daemons* Shutdown ones that are no longer required** @param releaseManager - The release Manager instance* @param utf - Unit Test information*/private void lookForNewBuildDaemons (ReleaseManager releaseManager, String utf) {mLogger.error("lookForNewBuildDaemons" );try{// Create a list of all machines that are configured to run on this machine//releaseManager.queryReleaseConfig(mHostname);//// Iterate over all the configured machines// Start up new build threads for new machines//for (Iterator<ReleaseConfig> it = releaseManager.mReleaseConfigCollection.mReleaseConfig.iterator(); it.hasNext(); ){ReleaseConfig rc = it.next();if (!isActive(rc.getRconId())){//// Ensure the Release is configured with exactly one master//int masterCount = releaseManager.queryMasterCount( rc.getRtagId() );if ( masterCount != 1 ){mLogger.error("BuildDaemon activating. Invalid Masters {} MasterCount: {}", rc.getRtagId(), masterCount );continue;}mLogger.warn("BuildDaemon activating {} {} {}",rc.getRtagId(), rc.getRconId(), rc.getDaemonMode());//// Clone the BuildDaemons ReleaseManager thread// Done so that we can perform unit testing by extending the ReleaseManager class// Need to be able to have the Master/Slave threads process through the overridden// methods within the extended ReleaseManager class//ReleaseManager threadReleaseManager = (ReleaseManager) releaseManager.clone();// spawn and run the BuildThreadif (rc.getDaemonMode() == 'M'){MasterThread thread = new MasterThread(rc.getRtagId(), rc.getRconId(), threadReleaseManager, utf);ThreadIdentifier threadIdentifier = new ThreadIdentifier(rc.getRconId(), thread);mThreadCollection.add(threadIdentifier);thread.start();}else if (rc.getDaemonMode() == 'S'){SlaveThread thread = new SlaveThread(rc.getRtagId(), rc.getRconId(), threadReleaseManager, utf);ThreadIdentifier threadIdentifier = new ThreadIdentifier(rc.getRconId(), thread);mThreadCollection.add(threadIdentifier);thread.start();}}}//// Clean out terminated threads from the thread collection//cleanupTerminatedThreads();}catch (SQLException e){mLogger.warn("BuildDaemon caught SQLException");}catch (InterruptedException e){mLogger.warn("BuildDaemon caught InterruptedException");Thread.currentThread().interrupt();}catch (Exception e){mLogger.warn("BuildDaemon caught Exception");}}/** Look for new build requests for slaves on this machine* It is better to have one thread poll every 3 seconds, than have all the slave* threads poll every 3 seconds** @param releaseManager* @param utf*/private void lookForBuildRequests(ReleaseManager releaseManager, String utf) {if (utf != null) {return;}mLogger.debug("Poll Cycle");try {// Generate a list of all rconIds that are currently active on this machine// Will be used to limit the query//StringBuilder rconIdList = new StringBuilder();String joiner = null;for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (threadIdentifier.mThread.isAlive()){BuildThread bt = threadIdentifier.mThread;if( joiner != null) {rconIdList.append(joiner);}rconIdList.append(bt.mRconId);joiner = ",";}}if (joiner != null) {//// Locate slaves that have an outstanding build request and master that have forced poll requests// Notify the threads of this conditionArrayList<Integer> rv = releaseManager.queryActivatedBuilds(mHostname, rconIdList.toString());for (Integer rconId : rv) {mLogger.warn("Activate:{}", rconId);notifyBuildThread(rconId);}}} catch (Exception e) {mLogger.error("lookForBuildRequests Exception: {}", e.getMessage());mPollTime = 30;}}/*** Start up the Nagios server* @throws BuildException*/private void startNagiosServer() throws BuildException {try {nagiosSrv = new ServerSocket(1111);nagiosChecker = new NagiosThread(nagiosSrv, this);nagiosChecker.start();} catch ( IOException e ) {throw new BuildException("Nagios port in use");}}/*** Clean out terminated threads from the thread collection* Examine all the build threads and get rid of those that have been terminated*/private void cleanupTerminatedThreads(){for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (!threadIdentifier.getThread().isAlive()){BuildThread bt = threadIdentifier.getThread();mLogger.warn("BuildDaemon removal {} {} {}", bt.mRtagId, bt.mRconId, bt.getMode());it.remove();}}}/**calls isAlive on the Thread object associated with the rcon_id*/public boolean isActive(final int rcon_id){mLogger.debug("isActive {}", rcon_id);boolean retVal = false;boolean found = false;for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (threadIdentifier.getRconId() == rcon_id){found = true;if (threadIdentifier.getThread().isAlive()){retVal = true;break;}else{mLogger.warn("isActive found dead thread {}", rcon_id );}}}if ( !found ){mLogger.warn("isActive thread not found {}", rcon_id);}mLogger.debug("isActive returned {}", retVal);return retVal;}/*** Nagios interface* Must have one active thread* Examine all threads - for logging*/void checkThreads( NagiosInfo nagInfo){mLogger.info("checkThreads called");if (!mThreadCollection.isEmpty()){for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext();){ThreadIdentifier threadIdentifier = it.next();nagInfo.threadCount ++;if ( threadIdentifier.getThread().isAlive()) {nagInfo.threadAliveCount ++;}threadIdentifier.getThread().checkThread(nagInfo);}}else{nagInfo.extendedReason.add("No Build Threads configured");}}/*** Nagios interface* Provide extended information on all threads* @param estatus*/public void extendedStatus(Map<String, Object> estatus){NagiosInfo nagInfo = new NagiosInfo();for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext();){ThreadIdentifier threadIdentifier = it.next();threadIdentifier.getThread().extendedStatus(nagInfo, estatus);}}/*** daemonSleepSecs* Sleep for a specified number of seconds allowing for* a termination request** The function will sleep for 5 seconds at a time,* then check for a termination request** @param sleepSecs The number of seconds to sleep** @return True : Termination requested** False: No Termination requested*/public static boolean daemonSleepSecs(int sleepSecs){while (sleepSecs > 0){if (mShutDown){mLogger.error("daemonSleepSecs detected termiantion request");return true;}int sleepTime = sleepSecs;if (sleepTime > 5)sleepTime = 5;try{Thread.sleep(sleepTime * 1000L);}catch (InterruptedException e){mLogger.warn("daemonSleepSecs sleep caught InterruptedException");Thread.currentThread().interrupt();break;}sleepSecs -= sleepTime;}return false;}/**terminates all BuildThreads*/public void cleanUp(){mLogger.warn("cleanUp");for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); ){ThreadIdentifier threadIdentifier = it.next();if (threadIdentifier.getThread().isAlive()){try{threadIdentifier.getThread().interrupt();threadIdentifier.getThread().join();}catch( InterruptedException e ){Thread.currentThread().interrupt();}}}if ( nagiosChecker != null ){nagiosChecker.terminate();}// If a shutdown has been requested, then exit the program// The only thing left running should be a timer taskif (mShutDown){System.exit(6);}}}class BuildDaemonUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{private static final Logger mLogger = LoggerFactory.getLogger(BuildDaemonUncaughtExceptionHandler.class);//Implements Thread.UncaughtExceptionHandler.uncaughtException()public void uncaughtException(Thread th, Throwable ex){System.out.println("You crashed thread " + th.getName());System.out.println("Exception was: " + ex.toString());mLogger.error("UncaughtException ThreadName: {}", th.getName());mLogger.error("UncaughtException was: {}", ex);}}