Subversion Repositories DevTools

Rev

Rev 7033 | Blame | Compare with Previous | Last modification | View Log | RSS feed

package com.erggroup.buildtool.daemon;

import com.erggroup.buildtool.ripple.Package;
import com.erggroup.buildtool.ripple.ReleaseConfig;
import com.erggroup.buildtool.ripple.ReleaseManager;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.slf4j.Logger ;
import org.slf4j.LoggerFactory;

/**BuildDaemon sub component and entry point (main BuildDaemon thread)
 */
public class BuildDaemon
{

    /**hostname
     * @attribute
     */
    static String mHostname;

    /**GBE_LOG
     * @attribute
     */
    static String mGbeLog;
    
    /** mShutDown
         * @attribute
         *  Request to shutdown the build system gracefully
         */
        static boolean mShutDown = false;

    /**Logger
     * @attribute
     */
    protected static Logger mLogger =  LoggerFactory.getLogger(BuildDaemon.class);

    /**Collection of ThreadIdentifier objects.
     * Using a ConcurrentLinkedQueue because we add and remove items from the collection
     * and it is being accessed from multiple threads.
     * @attribute
     */
    private ConcurrentLinkedQueue<ThreadIdentifier> mThreadCollection = new ConcurrentLinkedQueue<ThreadIdentifier>();
    
    /** Last time we did a poll for new builds
     * 
     */
    private long mLastBuildPoll = 0;
    
    /** Build Process poll time in seconds
     *  Modified on error to slow the poll rate
     */
    private int mPollTime = 3;

    /**Nagios
     * @attribute
     */
    ServerSocket nagiosSrv;
    NagiosThread nagiosChecker;
    
    /** Local class to assist in reporting Nagios Status
     * 
     */
    public static class NagiosInfo {
        int        threadCount = 0;
        int        threadAliveCount = 0;
        int        masterCount = 0;
        int        slaveCount = 0;
        ArrayList<String> extendedReason  = new ArrayList<String>();
    }

    /**mThreadCollection items
     */
    private class ThreadIdentifier
    {
        /**rcon_id associated with the thread
         * @attribute
         */
        private final int mRconId;

        /**thread identifier
         * @attribute
         */
        private final BuildThread mThread;

        /**constructor
         */
        ThreadIdentifier(int rconId, BuildThread thread)
        {
            mLogger.debug("ThreadIdentifier {}", rconId);
            mRconId = rconId;
            mThread = thread;
        }

        /**accessor
         */
        int getRconId()
        {
            mLogger.info("get_rcon_id returned {}", mRconId);
            return mRconId;
        }

        /**accessor
         */
        BuildThread getThread()
        {
            mLogger.debug("get_thread");
            return mThread;
        }
    }
    
    /**Exception thrown to indicate an uncorrectable error
     */
    public class BuildException extends Exception
    {
        BuildException(String msg)
        {
            mLogger.error(msg);
        }

      private static final long serialVersionUID = 1L;
    }

    /**main method for the Build Daemon program
     * instantiates a BuildDaemon object
     */
    public static void main(String[] args)
    {
        mLogger.debug("main");

        mHostname = testEnvVar("GBE_HOSTNAME");
        testEnvVar("ANT_HOME");
        testEnvVar("GBE_UNC");
        testEnvVar("GBE_DPKG_SSH_PROPERTIES");
        mGbeLog = testEnvVar("GBE_LOG");
        
        File gl = new File( mGbeLog );
        if ( !gl.isDirectory() )
        {
            mLogger.error("main GBE_LOG is not a directory");
            System.exit(1);
        }

        // Connection information for the database
        String connectionString = System.getenv("GBE_RM_LOCATION");
        String username = System.getenv("GBE_RM_USERNAME");
        String password = System.getenv("GBE_RM_PASSWORD");
        boolean showHelp = false;
        int consumed = 0;

        for (int optind = 0; optind < args.length; optind += 1 + consumed)
        {
            consumed = 0;
            boolean argsRemains = optind < (args.length - 1);
            if (args[optind].equals("-c") && argsRemains )
            {
                connectionString = args[optind+1];
                consumed ++;
            }
            else if (args[optind].equals("-u") && argsRemains)
            {
                username = args[optind+1];
                consumed ++;
            }
            else if (args[optind].equals("-p") && argsRemains)
            {
                password = args[optind+1];
                consumed ++;
            }
            else
            {
                showHelp = true;
            }
        }

        if (    showHelp ||
                connectionString == null ||
                connectionString.length() == 0 ||
                username == null ||
                username.length() == 0 ||
                password == null ||
                password.length() == 0)
        {
            mLogger.error("Usage: java -jar abtdD.jar -c connectionString -u username -p password");
            System.exit(1);
        }

        BuildDaemon buildDaemon = new BuildDaemon(connectionString, username, password);
        buildDaemon.cleanUp();

    }

    /**
     * Test a named EnvVar to ensure that it has been set
     * Call after the mLogger has been setup
     *  @param varName - Name of string to examine
     *  @return - String value of the parameter
     */
    private static String testEnvVar(String varName) {
        String envVar = System.getenv(varName);

        if ( envVar == null )
        {
            mLogger.error("main {} not set", varName);
            System.exit(1);
        }
        return envVar;
    }

    /**constructor, implements the sequence diagram spawn thread
     */
    public BuildDaemon(String connectionString, String username, String password)
    {
        this(new ReleaseManager(connectionString, username + "[release_manager]", password));
    }

    public BuildDaemon(ReleaseManager releaseManager)
    {
        String utf = null;
        mLogger.warn("BuildDaemon");

        try
        {
            // Flag UTF in progress
            if ( releaseManager.mConnectionString.compareTo("unit test spawn thread") == 0 )
            {
                utf = releaseManager.mConnectionString;
            }

            if ( Package.mGenericMachtype == null )
            {
                throw new BuildException("run GBE_MACHTYPE not set");
            }

            if ( Package.mGbeDpkg == null )
            {
                throw new BuildException("run GBE_DPKG not set");
            }
            
            //  Set the default handler invoked when a thread abruptly terminates due to an
            //  uncaught exception, and no other handler has been defined for that thread.
            //
            Thread.setDefaultUncaughtExceptionHandler(new BuildDaemonUncaughtExceptionHandler());

            //
            //  Start the Nagios Interface Thread
            //    Unless performing a unit test
            //
            if ( utf == null )
            {
                startNagiosServer();
            }

            //  Discover new build daemons to be started on the current host
            //
            
            while (!mShutDown)
            {
                mPollTime = 3;
                
                // Poll for slave build requests
                lookForBuildRequests(releaseManager, utf);
               
                // Poll for new builds every few (5) minutes
                if ( System.currentTimeMillis() - mLastBuildPoll > 5 * 60 * 1000) {
                    lookForNewBuildDaemons(releaseManager, utf);
                    mLastBuildPoll = System.currentTimeMillis();
                }
                
                //  In UTF mode we only execute the loop once
                if ( utf != null ) {
                    break;
                }
                
                // Wait 3 seconds before polling again
                daemonSleepSecs(mPollTime);
            }
            
            mLogger.error("BuildDaemon daemon spawning shutdown");
            if (mShutDown)
            {
                notifyAllBuildThreads();
                cleanUp();
            }
        }
        catch( BuildException e )
        {
            mLogger.error("BuildDaemon caught Exception");
        }
    }
    
    /**
     * Used by the Polling Thread to notify slaves of an active build request
     * The slaves will 'wait' for notification in order to proceed
     * Thus the polling is done once, rather than each slave thread polling individually
     * @param rconId
     */
    public void notifyBuildThread(Integer rconId) {
        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
        {
            ThreadIdentifier threadIdentifier = it.next();

            if (threadIdentifier.mThread.isAlive())
            {
                BuildThread bt = threadIdentifier.mThread;
                if (bt.mRconId == rconId ) {
                    mLogger.warn("Notify RtagId:{}, bmlId:{}", bt.mRtagId, bt.mRconId );
                    synchronized (bt.mActiveBuildMonitor) {
                        bt.mActiveBuildMonitor.notifyAll();
                    }
                }
            }
        }
    }
    
    /**
     * Notify all build threads of some activity
     * This will release all threads waiting on the ActiveBuildMonitor
     */
    public void notifyAllBuildThreads() {
        mLogger.error("notifyAllBuildThreads");
        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
        {
            ThreadIdentifier threadIdentifier = it.next();

            if (threadIdentifier.mThread.isAlive())
            {
                BuildThread bt = threadIdentifier.mThread;

                synchronized (bt.mActiveBuildMonitor) {
                    bt.mActiveBuildMonitor.notifyAll();
                }
            }
        }
    }

    /** Look for changes in the build daemons required
     *  Startup new daemons
     *  Shutdown ones that are no longer required
     * 
     *  @param  releaseManager - The release Manager instance
     *  @param  utf            - Unit Test information 
     */
    private void lookForNewBuildDaemons (ReleaseManager releaseManager, String utf) {
        mLogger.error("lookForNewBuildDaemons" );
        try
        {
            // Create a list of all machines that are configured to run on this machine
            //
            releaseManager.queryReleaseConfig(mHostname);

            //
            //  Iterate over all the configured machines
            //  Start up new build threads for new machines
            //
            for (Iterator<ReleaseConfig> it = releaseManager.mReleaseConfigCollection.mReleaseConfig.iterator(); it.hasNext(); )
            {
                ReleaseConfig rc = it.next();

                if (!isActive(rc.getRconId()))
                {
                    //
                    //  Ensure the Release is configured with exactly one master
                    //
                    int masterCount = releaseManager.queryMasterCount( rc.getRtagId() );
                    if ( masterCount != 1 )
                    {
                        mLogger.error("BuildDaemon activating. Invalid Masters {} MasterCount: {}", rc.getRtagId(),  masterCount );
                        continue;
                    }
                    
                    mLogger.warn("BuildDaemon activating {} {} {}",rc.getRtagId(), rc.getRconId(), rc.getDaemonMode());
                    
                    //
                    //    Clone the BuildDaemons ReleaseManager thread
                    //    Done so that we can perform unit testing by extending the ReleaseManager class
                    //    Need to be able to have the Master/Slave threads process through the overridden
                    //    methods within the extended ReleaseManager class
                    //
                    ReleaseManager threadReleaseManager = (ReleaseManager) releaseManager.clone();

                    // spawn and run the BuildThread
                    if (rc.getDaemonMode() == 'M')
                    {
                        MasterThread thread = new MasterThread(rc.getRtagId(), rc.getRconId(), threadReleaseManager, utf);
                        ThreadIdentifier threadIdentifier =  new ThreadIdentifier(rc.getRconId(), thread);
                        mThreadCollection.add(threadIdentifier);
                        thread.start();
                    }
                    else if (rc.getDaemonMode() == 'S')
                    {
                        SlaveThread thread = new SlaveThread(rc.getRtagId(), rc.getRconId(), threadReleaseManager, utf);
                        ThreadIdentifier threadIdentifier = new ThreadIdentifier(rc.getRconId(), thread);
                        mThreadCollection.add(threadIdentifier);
                        thread.start();
                    }
                }
            }

            //
            //  Clean out terminated threads from the thread collection
            //
            cleanupTerminatedThreads();

        }
        catch (SQLException e)
        {
            mLogger.warn("BuildDaemon caught SQLException");
        }
        catch (InterruptedException e)
        {
            mLogger.warn("BuildDaemon caught InterruptedException");
            Thread.currentThread().interrupt();
        }
        catch (Exception e)
        {
            mLogger.warn("BuildDaemon caught Exception");
        }
    }
    
    /** Look for new build requests for slaves on this machine
     *  It is better to have one thread poll every 3 seconds, than have all the slave
     *  threads poll every 3 seconds 
     * 
     * @param releaseManager
     * @param utf
     */
    
    private void lookForBuildRequests(ReleaseManager releaseManager, String utf) {
        if (utf != null) {
            return;
        }
        
        mLogger.debug("Poll Cycle");
        try {
            
            //  Generate a list of all rconIds that are currently active on this machine
            //  Will be used to limit the query
            //
            StringBuilder rconIdList = new StringBuilder();
            String joiner = null;
            
            for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
            {
                ThreadIdentifier threadIdentifier = it.next();

                if (threadIdentifier.mThread.isAlive())
                {
                    BuildThread bt = threadIdentifier.mThread;
                    if( joiner != null) {
                        rconIdList.append(joiner);
                    }
                    rconIdList.append(bt.mRconId);
                    joiner = ",";
                }
            }

            if (joiner != null) {
                //
                //  Locate slaves that have an outstanding build request and master that have forced poll requests
                //  Notify the threads of this condition
                ArrayList<Integer> rv = releaseManager.queryActivatedBuilds(mHostname, rconIdList.toString());
                for (Integer rconId : rv) {
                    mLogger.warn("Activate:{}", rconId);
                    notifyBuildThread(rconId);
                }
            }
            
        } catch (Exception e) {
            mLogger.error("lookForBuildRequests Exception: {}", e.getMessage());
            mPollTime = 30;
        }
    
    }

    /**
     * Start up the Nagios server
     * @throws BuildException
     */
    private void startNagiosServer() throws BuildException {
        try {
            nagiosSrv = new ServerSocket(1111);
            nagiosChecker = new NagiosThread(nagiosSrv, this);
            nagiosChecker.start();
        } catch ( IOException e ) {
            throw new BuildException("Nagios port in use");
        }
    }

    /**
     * Clean out terminated threads from the thread collection
     * Examine all the build threads and get rid of those that have been terminated
     */
    private void cleanupTerminatedThreads()
    {
        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
        {
            ThreadIdentifier threadIdentifier = it.next();

            if (!threadIdentifier.getThread().isAlive())
            {
                BuildThread bt = threadIdentifier.getThread();
                mLogger.warn("BuildDaemon removal {} {} {}", bt.mRtagId, bt.mRconId, bt.getMode());
                it.remove();
            }
        }
    }

    /**calls isAlive on the Thread object associated with the rcon_id
     */
    public boolean isActive(final int rcon_id)
    {
        mLogger.debug("isActive {}", rcon_id);
        boolean retVal = false;
        boolean found = false;

        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
        {
            ThreadIdentifier threadIdentifier = it.next();

            if (threadIdentifier.getRconId() == rcon_id)
            {
                found = true;
                if (threadIdentifier.getThread().isAlive())
                {
                    retVal = true;
                    break;
                }
                else
                {
                    mLogger.warn("isActive found dead thread {}", rcon_id );
                }
            }
        }

        if ( !found )
        {
            mLogger.warn("isActive thread not found {}", rcon_id);
        }

        mLogger.debug("isActive returned {}", retVal);
        return retVal;
    }

    /**
     *  Nagios interface
     *          Must have one active thread
     *          Examine all threads - for logging
     */
    void checkThreads( NagiosInfo nagInfo)
    {
        mLogger.info("checkThreads called");
        if (!mThreadCollection.isEmpty())
        {
            for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext();) 
            {
                ThreadIdentifier threadIdentifier = it.next();
                
                nagInfo.threadCount ++;
                if ( threadIdentifier.getThread().isAlive()) {
                    nagInfo.threadAliveCount ++;
                }
                
                threadIdentifier.getThread().checkThread(nagInfo);
            }
        }
        else
        {
            nagInfo.extendedReason.add("No Build Threads configured");
        }
    }
    
    /**
     *  Nagios interface
     *      Provide extended information on all threads
     * @param estatus 
     */
    public void extendedStatus(Map<String, Object> estatus)
    {
        NagiosInfo nagInfo = new NagiosInfo();
        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext();) 
        {
            ThreadIdentifier threadIdentifier = it.next();
            threadIdentifier.getThread().extendedStatus(nagInfo, estatus);
        }
    }


    /**
     * daemonSleepSecs
     * Sleep for a specified number of seconds allowing for
     * a termination request
     * 
     * The function will sleep for 5 seconds at a time, 
     * then check for a termination request
     * 
     * @param sleepSecs The number of seconds to sleep
     * 
     * @return True :    Termination requested
     *         
     *         False:    No Termination requested
     */
    public static boolean daemonSleepSecs(int sleepSecs)
    {
        while (sleepSecs > 0)
        {
            if (mShutDown)
            {
                mLogger.error("daemonSleepSecs detected termiantion request");
                return true;
            }
            
            int sleepTime = sleepSecs;
            if (sleepTime > 5)
                sleepTime = 5;
            
            try
            {
                Thread.sleep(sleepTime * 1000L);
            }
            catch (InterruptedException e)
            {
                mLogger.warn("daemonSleepSecs sleep caught InterruptedException");
                Thread.currentThread().interrupt();
                break;
            }
            sleepSecs -= sleepTime;
        }
        
        return false;
    }

    /**terminates all BuildThreads
     */
    public void cleanUp()
    {
        mLogger.warn("cleanUp");

        for (Iterator<ThreadIdentifier> it = mThreadCollection.iterator(); it.hasNext(); )
        {
            ThreadIdentifier threadIdentifier = it.next();

            if (threadIdentifier.getThread().isAlive())
            {
                try
                {
                    threadIdentifier.getThread().interrupt();
                    threadIdentifier.getThread().join();
                }
                catch( InterruptedException e )
                {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        if ( nagiosChecker != null )
        {
            nagiosChecker.terminate();
        }

        //  If a shutdown has been requested, then exit the program
        //  The only thing left running should be a timer task
       
        if (mShutDown)
        {
            System.exit(6);
        }
    }
}

class BuildDaemonUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler
{
    private static final Logger mLogger = LoggerFactory.getLogger(BuildDaemonUncaughtExceptionHandler.class);

    //Implements Thread.UncaughtExceptionHandler.uncaughtException()
    public void uncaughtException(Thread th, Throwable ex)
    {
        System.out.println("You crashed thread " + th.getName());
        System.out.println("Exception was: " + ex.toString());

        mLogger.error("UncaughtException ThreadName: {}", th.getName());
        mLogger.error("UncaughtException was: {}", ex);
    }
}