Subversion Repositories DevTools

Rev

Blame | Last modification | View Log | RSS feed

//---------------------------------------------------------------------------

#pragma warn -com
#include <LoggingMacros.h>
#pragma warn +com

#include <vcl.h>
#pragma hdrstop

#include "UdTransferThread.h"
#include <algorithm>
#include <dateutils.hpp>
#include <Registry.hpp>

#include "IDeviceFactory.h"
#include "IDevice.h"
#include "DeviceSimulatorThread.h"

namespace
{
    const char * REG_BASE_KEY       =   "\\Software\\ERG\\TxnTestManager\\UD Transfer";
    const char * REG_RENAME         =   "RenameSentFiles";
    const char * REG_IGNORESENT     =   "IgnoreSentFiles";
    const char * REG_MAXDEVICES     =   "MaxDeviceSimulators";
    const char * REG_DEVICEID       =   "StartingDeviceId";
    const char * REG_DEVICETYPE     =   "DeviceType";
    const char * REG_CONNECTTIMEOUT =   "DeviceConnectTimeout";

    const char * SENT_STRING        =   "sent_";
}

using namespace std;

#pragma package(smart_init)
//---------------------------------------------------------------------------

//   Important: Methods and properties of objects in VCL can only be
//   used in a method called using Synchronize, for example:
//
//      Synchronize(UpdateCaption);
//
//   where UpdateCaption could look like:
//
//      void __fastcall UdTransferThread::UpdateCaption()
//      {
//        Form1->Caption = "Updated in a thread";
//      }
//---------------------------------------------------------------------------

__fastcall UdTransferThread::UdTransferThread(const UdTransferTask & task,
                                              UdTransferManager * manager,
                                              IDeviceFactory * deviceFactory ) :
    TThread( false ),
    m_task( task ),
    m_manager( manager ),
    m_deviceFactory( deviceFactory ),
    m_lastDevice( 0 ),
    m_running( false ),
    m_error( false ),
    m_simulators( false ),
    m_renameSent( false ),
    m_ignoreSent( false ),
    m_maxDevices( 100 ),
    m_deviceId( 1 ),
    m_connectTimeout( 5 ),
    m_deviceType("000SIM")

{
    Priority = tpHigher;
}
//---------------------------------------------------------------------------

void UdTransferThread::SetName()
{
    THREADNAME_INFO info;
    info.dwType = 0x1000;
    info.szName = "UdTransferThread";
    info.dwThreadID = -1;
    info.dwFlags = 0;

    __try
    {
         RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD),(DWORD*)&info );
    }
    __except (EXCEPTION_CONTINUE_EXECUTION)
    {
    }
}
//---------------------------------------------------------------------------

bool UdTransferThread::InitialiseTask()
{
    if( DirectoryExists( m_task.sourcePath ) )
    {
        // Read generic transfer settings from the registry

        TRegistry * registry = new TRegistry;
        registry->RootKey = HKEY_CURRENT_USER;
        if( registry->OpenKey(REG_BASE_KEY, true) )
        {
            if( registry->ValueExists(REG_RENAME) )
            {
                m_renameSent = registry->ReadBool(REG_RENAME);
            }
            if( registry->ValueExists(REG_IGNORESENT) )
            {
                m_ignoreSent = registry->ReadBool(REG_IGNORESENT);
            }
            if( registry->ValueExists(REG_MAXDEVICES) )
            {
                m_maxDevices = registry->ReadInteger(REG_MAXDEVICES);
                if( m_maxDevices <= 0 ) m_maxDevices = 100;
            }
            if( registry->ValueExists(REG_DEVICEID) )
            {
                m_deviceId = registry->ReadInteger(REG_DEVICEID);
                if( m_deviceId <= 0 ) m_deviceId = 1;
            }
            if( registry->ValueExists(REG_DEVICETYPE) )
            {
                m_deviceType = registry->ReadString(REG_DEVICETYPE).SubString(1, 6);
                if( m_deviceType.Length() < 6 )
                {
                    m_deviceType.Insert( AnsiString::StringOfChar('0', 6 - m_deviceType.Length()), 1 );
                }
            }
            if( registry->ValueExists(REG_CONNECTTIMEOUT) )
            {
                m_connectTimeout = registry->ReadInteger(REG_CONNECTTIMEOUT);
                if( m_connectTimeout <= 0 ) m_connectTimeout = 5;
            }

            registry->CloseKey();
            delete registry;
            registry = NULL;
        }

        // Initialise the file list if required

        if( m_task.files.empty() )
        {
            UdFile file;        
            TSearchRec record;
            if( FindFirst( m_task.sourcePath + "\\*.devud", faReadOnly, record ) == 0 )
            {
                do
                {
                    if( (record.Attr & faDirectory) == 0 )
                    {
                        if( !(m_ignoreSent && (record.Name.Pos(SENT_STRING) == 1)) )
                        {
                            file.name = record.Name;
                            file.size = record.Size;
                            file.time = FileDateToDateTime( record.Time );

                            m_task.files.push_back( file );
                            m_stats.totalBytes += file.size;
                        }
                    }
                } while( FindNext( record ) == 0 );

                FindClose( record );
            }
        }

        // Sort the ud files by their creation time

        sort( m_task.files.begin(), m_task.files.end() );
        if( (m_task.batchSize == 0) || (m_task.batchSize > (int)m_task.files.size()) )
        {
            m_task.batchSize = m_task.files.size();
        }

        // Initialise ramp settings

        if( m_task.rampType != RampTestDisabled )
        {
            m_ramp.batchSize = m_task.batchSize;
            m_ramp.count     = 0;

            if( m_task.rampType == RampTestByRatio )
            {
                m_task.gradient = (double)m_task.files.size() / m_task.duration;
            }
        }

        m_stats.completed = false;
        m_stats.totalFiles = m_task.files.size();

        // Check if we require use of device simulators. This is the case if the destination
        // provided is not a path (no characters associated with paths).

        m_simulators = (m_task.destination.LastDelimiter("\\/:") == 0);
        if( m_simulators )
        {
            AnsiString msg;
            msg = "Connecting device simulators to " + m_task.destination + ".";
            m_manager->UpdateTask( m_task, msg );

            if( !ConnectDeviceSimulators() )
            {
                m_error = true;
                m_task.state = UdTransferFailed;
                msg = "Failed to connect device simulators to " + m_task.destination + ".";
                m_manager->UpdateTask( m_task, msg );
                return false;
            }
        }

        // Now ready to begin processing. Set the start/end time now as the previous
        // step may have consumed quite a bit of time.

        m_stats.startTime = TDateTime::CurrentDateTime();
        m_stats.endTime = m_stats.startTime + (double)m_task.duration /(24*60*60);
                
        m_task.state = UdTransferInProgress;
        m_manager->UpdateTask( m_task, "Processing started at " + m_stats.startTime + "." );

        return (m_task.batchSize > 0);
    }

    return false;
}
//---------------------------------------------------------------------------

void __fastcall UdTransferThread::Execute()
{
    SetName();

    m_running = true;

    try
    {
        try
        {
            bool complete = false;

            if( InitialiseTask() )
            {
                // Start transferring files, repeat if a recurring task
                do
                {
                    while( m_running && !complete && !m_error )
                    {
                        TDateTime lastBatch = TDateTime::CurrentDateTime();
                        complete = TransferNextBatch();

                        // Check if we have reached the end of the upload period

                        if( TDateTime::CurrentDateTime() >= m_stats.endTime )
                        {
                            complete = true;
                            m_running = false;
                        }
                        else if( m_running && !complete && !m_error )
                        {
                            // Sleep until the next frequency period

                            while( m_running &&
                                   SecondsBetween( TDateTime::CurrentDateTime(), lastBatch ) < m_task.frequency )
                            {
                                Sleep( 500 );
                            }

                            RampTransferBatch();
                        }

                        // Check whether the device simulator threads are still happy

                        if( m_running && !m_error && m_simulators )
                        {
                            CheckDeviceSimulatorThreads();
                        }
                    }

                } while( m_task.recurring && m_running && !m_error );
            }
            else if( !m_error )
            {
                m_error = true;
                m_task.state = UdTransferFailed;
                m_manager->UpdateTask( m_task, "Failed to initialise usage data transfer task." );
            }

            // Update task state if we are successful

            if( !m_error && complete )
            {
                m_task.state = UdTransferCompleted;
                AnsiString msg = "Successfully transferred " + IntToStr( m_stats.processedFiles ) +
                    " files (" +
                    AnsiString::FloatToStrF((long double)m_stats.processedBytes, AnsiString::sffNumber, 18, 0) +
                    " bytes).";
                m_manager->UpdateTask( m_task, msg );
            }

            // Update task statistics

            m_manager->UpdateTaskStatistics( m_task.id, m_stats );
        }
        catch( ... )
        {
            m_task.state = UdTransferFailed;
            m_manager->UpdateTask( m_task, "A fatal exception occurred during UD transfer processing." );
            m_manager->UpdateTaskStatistics( m_task.id, m_stats );
        }
    }
    __finally
    {
        m_running = false;
        m_stats.completed = true;

        // Shutdown device simulators. This is much quicker if we quickly run through
        // each and stop them first.

        vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
        for( ; device != m_devices.end(); device++ )
        {
            (*device)->Stop();
        }
        for( device = m_devices.begin(); device != m_devices.end(); device++ )
        {
            (*device)->WaitFor();
            delete (*device);
            (*device) = 0;
        }
        m_devices.clear();

        // Rename files that have been sent if required. Don't care if renames fail.

        if( m_renameSent )
        {
            int count = 0;
            AnsiString oldFile, newFile;            
            UdFileItem file = m_task.files.begin();

            while( (++count <= m_stats.processedFiles) &&
                   (file != m_task.files.end()) )
            {
                // Don't prepend SEND_STRING twice

                if( (*file).name.Pos(SENT_STRING) != 1 )
                {
                    oldFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;
                    newFile = IncludeTrailingPathDelimiter(m_task.sourcePath) +
                        AnsiString(SENT_STRING) + (*file).name;

                    MoveFile( oldFile.c_str(), newFile.c_str() );
                }
                file++;
            }
        }
    }
}
//---------------------------------------------------------------------------

void UdTransferThread::RampTransferBatch()
{
    if( m_task.rampType != RampTestDisabled )
    {
        // Check if it is time to start ramping batch sizes

        if( !m_ramp.running )
        {
            m_ramp.running = (TDateTime::CurrentDateTime() >= m_task.rampTime);
        }

        // Calculate the new batch size for the next transfer

        if( m_ramp.running )
        {
            // Calculation is linear function, y = mx + c where:
            //  y = Next batch size
            //  m = Ramp gradient
            //  x = Ramped Upload batch count
            //  c = Batch size for upload

            m_ramp.batchSize = m_task.gradient * (++m_ramp.count) + m_task.batchSize;
        }
    }
}
//---------------------------------------------------------------------------

bool UdTransferThread::TransferNextBatch()
{
    // Find the next file to start a batch on. Account for recurring tasks.

    UdFileItem file = m_task.files.begin();
    advance( file, m_task.recurring
                        ? (m_stats.processedFiles % m_stats.totalFiles)
                        : m_stats.processedFiles );

    AnsiString sourceFile;
    int batchSize = GetBatchSize();
    int count = 0;
    int transferSize = 0;

    // Transfer files until we stop running, we reach the end of all transferrable
    // files, or the batch transfer size is reached

    while( m_running &&
          (file != m_task.files.end()) &&
          (count < batchSize) )
    {
        sourceFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;

        if( TransferUdFile( sourceFile ) )
        {
            m_stats.processedBytes  += (*file).size;
            transferSize            += (*file).size;

            ++m_stats.processedFiles;
            ++file;
            ++count;
        }
        else
        {
            // Abort further processing

            m_error = true;
            m_task.state = UdTransferFailed;
            AnsiString msg = "Failed to transfer file " + (*file).name;
            m_manager->UpdateTask( m_task, msg );
            break;
        }

        // Roll over to the start if this is a recurring transfer

        if( m_task.recurring &&
            (file == m_task.files.end()) )
        {
            file = m_task.files.begin();
        }
    }

    if( !m_error ) m_stats.batches.push_back( transferSize );

    // Return true if there are no more files to transfer

    return (file == m_task.files.end());
}
//---------------------------------------------------------------------------

bool UdTransferThread::TransferUdFile( const AnsiString & filename )
{
    bool transferred = true;

    if( m_simulators )
    {
        // Get the next device simulator to send the UD file

        vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
        advance( device, ++m_lastDevice );
        if( device == m_devices.end() )
        {
            m_lastDevice = 0;
            device = m_devices.begin();
        }

        // Can't check the state of the transfer as the transfer is handled
        // in another thread. After a batch is sent we check with all the device
        // simulator threads for potential transfer problems

        if( (*device) != NULL )
        {
            (*device)->UploadUDFile( filename );
        }
    }
    else
    {
        // Simply copy the file to the destination directory

        AnsiString destFile = IncludeTrailingPathDelimiter(m_task.destination) +
            ExtractFileName(filename);
        transferred = (CopyFile( filename.c_str(), destFile.c_str(), false ) != 0);
    }

    return transferred;
}
//---------------------------------------------------------------------------

int  UdTransferThread::GetBatchSize()
{
    return m_ramp.running ? m_ramp.batchSize : m_task.batchSize;
}
//---------------------------------------------------------------------------

void __fastcall UdTransferThread::GetTransferStatistics( UdTransferStatistics & stats )
{
    stats.completed      = m_stats.completed;
    stats.startTime      = m_stats.startTime;
    stats.endTime        = m_stats.endTime;
    stats.processedFiles = m_stats.processedFiles;
    stats.totalFiles     = m_stats.totalFiles;
    stats.processedBytes = m_stats.processedBytes;
    stats.totalBytes     = m_stats.totalBytes;
    stats.batches        = m_stats.batches;
}
//---------------------------------------------------------------------------

bool UdTransferThread::ConnectDeviceSimulators()
{
    bool connected = true;

    int number = m_maxDevices;
    if( m_task.batchSize < number )
    {
        number = m_task.batchSize;
    }

    // Create and start simulator threads

    for( int i = 0; i < number; i++ )
    {
        unsigned long deviceId = m_deviceId + i;
        if( deviceId <= 0 ) deviceId = 1;

        AnsiString installationId;
        installationId.sprintf( "??????????-???-%s-%08d", m_deviceType, deviceId );

        IDevice * device = &m_deviceFactory->createMASSDevice(
            m_deviceType.c_str(), deviceId, installationId.c_str() );
        if( device != NULL )
        {
            // Simulator thread object now owns the device
            
            DeviceSimulatorThread * thread =
                new DeviceSimulatorThread( device, m_deviceFactory, m_task.destination );
            if( thread != NULL )
            {
                m_devices.push_back( thread );

                // Wait for the first connection to confirm that the destination host is
                // running a site computer, and that the device configuration is correct.
                // Subsequent disconnections (if any) will be handled by the device thread.

                int count = m_connectTimeout * 2;

                while( (count >= 0) && !thread->IsConnected() )
                {
                    count--;
                    Sleep( 500 );
                }
                if( count == 0 )
                {
                    connected = false;
                    break;
                }

                // Need a slight delay to confirm that the device has connected all channels
                // (CMD/UD/CD) -- if not, we may overlap channels between devices and this can
                // take a long time to sort itself out (lots of device disconnections).

                Sleep( 500 );
            }
        }
        if( !m_running ) break;
    }

    return connected;
}
//---------------------------------------------------------------------------

void UdTransferThread::CheckDeviceSimulatorThreads()
{
    AnsiString error = "";

    // We are expecting all threads to be running here, so a terminal error
    // has occurred if this is not the case

    vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
    for( ; device != m_devices.end(); device++ )
    {
        if( ((*device) != NULL) &&
            !(*device)->IsRunning() )
        {
            error = (*device)->GetErrorMessage();
            if( !error.IsEmpty() )
            {
                break;
            }
        }
    }

    // Abort further processing

    if( !error.IsEmpty() )
    {
        m_error = true;
        m_task.state = UdTransferFailed;
        m_manager->UpdateTask( m_task, error );
    }
}