Blame | Last modification | View Log | RSS feed
//---------------------------------------------------------------------------#pragma warn -com#include <LoggingMacros.h>#pragma warn +com#include <vcl.h>#pragma hdrstop#include "UdTransferThread.h"#include <algorithm>#include <dateutils.hpp>#include <Registry.hpp>#include "IDeviceFactory.h"#include "IDevice.h"#include "DeviceSimulatorThread.h"namespace{const char * REG_BASE_KEY = "\\Software\\ERG\\TxnTestManager\\UD Transfer";const char * REG_RENAME = "RenameSentFiles";const char * REG_IGNORESENT = "IgnoreSentFiles";const char * REG_MAXDEVICES = "MaxDeviceSimulators";const char * REG_DEVICEID = "StartingDeviceId";const char * REG_DEVICETYPE = "DeviceType";const char * REG_CONNECTTIMEOUT = "DeviceConnectTimeout";const char * SENT_STRING = "sent_";}using namespace std;#pragma package(smart_init)//---------------------------------------------------------------------------// Important: Methods and properties of objects in VCL can only be// used in a method called using Synchronize, for example://// Synchronize(UpdateCaption);//// where UpdateCaption could look like://// void __fastcall UdTransferThread::UpdateCaption()// {// Form1->Caption = "Updated in a thread";// }//---------------------------------------------------------------------------__fastcall UdTransferThread::UdTransferThread(const UdTransferTask & task,UdTransferManager * manager,IDeviceFactory * deviceFactory ) :TThread( false ),m_task( task ),m_manager( manager ),m_deviceFactory( deviceFactory ),m_lastDevice( 0 ),m_running( false ),m_error( false ),m_simulators( false ),m_renameSent( false ),m_ignoreSent( false ),m_maxDevices( 100 ),m_deviceId( 1 ),m_connectTimeout( 5 ),m_deviceType("000SIM"){Priority = tpHigher;}//---------------------------------------------------------------------------void UdTransferThread::SetName(){THREADNAME_INFO info;info.dwType = 0x1000;info.szName = "UdTransferThread";info.dwThreadID = -1;info.dwFlags = 0;__try{RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD),(DWORD*)&info );}__except (EXCEPTION_CONTINUE_EXECUTION){}}//---------------------------------------------------------------------------bool UdTransferThread::InitialiseTask(){if( DirectoryExists( m_task.sourcePath ) ){// Read generic transfer settings from the registryTRegistry * registry = new TRegistry;registry->RootKey = HKEY_CURRENT_USER;if( registry->OpenKey(REG_BASE_KEY, true) ){if( registry->ValueExists(REG_RENAME) ){m_renameSent = registry->ReadBool(REG_RENAME);}if( registry->ValueExists(REG_IGNORESENT) ){m_ignoreSent = registry->ReadBool(REG_IGNORESENT);}if( registry->ValueExists(REG_MAXDEVICES) ){m_maxDevices = registry->ReadInteger(REG_MAXDEVICES);if( m_maxDevices <= 0 ) m_maxDevices = 100;}if( registry->ValueExists(REG_DEVICEID) ){m_deviceId = registry->ReadInteger(REG_DEVICEID);if( m_deviceId <= 0 ) m_deviceId = 1;}if( registry->ValueExists(REG_DEVICETYPE) ){m_deviceType = registry->ReadString(REG_DEVICETYPE).SubString(1, 6);if( m_deviceType.Length() < 6 ){m_deviceType.Insert( AnsiString::StringOfChar('0', 6 - m_deviceType.Length()), 1 );}}if( registry->ValueExists(REG_CONNECTTIMEOUT) ){m_connectTimeout = registry->ReadInteger(REG_CONNECTTIMEOUT);if( m_connectTimeout <= 0 ) m_connectTimeout = 5;}registry->CloseKey();delete registry;registry = NULL;}// Initialise the file list if requiredif( m_task.files.empty() ){UdFile file;TSearchRec record;if( FindFirst( m_task.sourcePath + "\\*.devud", faReadOnly, record ) == 0 ){do{if( (record.Attr & faDirectory) == 0 ){if( !(m_ignoreSent && (record.Name.Pos(SENT_STRING) == 1)) ){file.name = record.Name;file.size = record.Size;file.time = FileDateToDateTime( record.Time );m_task.files.push_back( file );m_stats.totalBytes += file.size;}}} while( FindNext( record ) == 0 );FindClose( record );}}// Sort the ud files by their creation timesort( m_task.files.begin(), m_task.files.end() );if( (m_task.batchSize == 0) || (m_task.batchSize > (int)m_task.files.size()) ){m_task.batchSize = m_task.files.size();}// Initialise ramp settingsif( m_task.rampType != RampTestDisabled ){m_ramp.batchSize = m_task.batchSize;m_ramp.count = 0;if( m_task.rampType == RampTestByRatio ){m_task.gradient = (double)m_task.files.size() / m_task.duration;}}m_stats.completed = false;m_stats.totalFiles = m_task.files.size();// Check if we require use of device simulators. This is the case if the destination// provided is not a path (no characters associated with paths).m_simulators = (m_task.destination.LastDelimiter("\\/:") == 0);if( m_simulators ){AnsiString msg;msg = "Connecting device simulators to " + m_task.destination + ".";m_manager->UpdateTask( m_task, msg );if( !ConnectDeviceSimulators() ){m_error = true;m_task.state = UdTransferFailed;msg = "Failed to connect device simulators to " + m_task.destination + ".";m_manager->UpdateTask( m_task, msg );return false;}}// Now ready to begin processing. Set the start/end time now as the previous// step may have consumed quite a bit of time.m_stats.startTime = TDateTime::CurrentDateTime();m_stats.endTime = m_stats.startTime + (double)m_task.duration /(24*60*60);m_task.state = UdTransferInProgress;m_manager->UpdateTask( m_task, "Processing started at " + m_stats.startTime + "." );return (m_task.batchSize > 0);}return false;}//---------------------------------------------------------------------------void __fastcall UdTransferThread::Execute(){SetName();m_running = true;try{try{bool complete = false;if( InitialiseTask() ){// Start transferring files, repeat if a recurring taskdo{while( m_running && !complete && !m_error ){TDateTime lastBatch = TDateTime::CurrentDateTime();complete = TransferNextBatch();// Check if we have reached the end of the upload periodif( TDateTime::CurrentDateTime() >= m_stats.endTime ){complete = true;m_running = false;}else if( m_running && !complete && !m_error ){// Sleep until the next frequency periodwhile( m_running &&SecondsBetween( TDateTime::CurrentDateTime(), lastBatch ) < m_task.frequency ){Sleep( 500 );}RampTransferBatch();}// Check whether the device simulator threads are still happyif( m_running && !m_error && m_simulators ){CheckDeviceSimulatorThreads();}}} while( m_task.recurring && m_running && !m_error );}else if( !m_error ){m_error = true;m_task.state = UdTransferFailed;m_manager->UpdateTask( m_task, "Failed to initialise usage data transfer task." );}// Update task state if we are successfulif( !m_error && complete ){m_task.state = UdTransferCompleted;AnsiString msg = "Successfully transferred " + IntToStr( m_stats.processedFiles ) +" files (" +AnsiString::FloatToStrF((long double)m_stats.processedBytes, AnsiString::sffNumber, 18, 0) +" bytes).";m_manager->UpdateTask( m_task, msg );}// Update task statisticsm_manager->UpdateTaskStatistics( m_task.id, m_stats );}catch( ... ){m_task.state = UdTransferFailed;m_manager->UpdateTask( m_task, "A fatal exception occurred during UD transfer processing." );m_manager->UpdateTaskStatistics( m_task.id, m_stats );}}__finally{m_running = false;m_stats.completed = true;// Shutdown device simulators. This is much quicker if we quickly run through// each and stop them first.vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();for( ; device != m_devices.end(); device++ ){(*device)->Stop();}for( device = m_devices.begin(); device != m_devices.end(); device++ ){(*device)->WaitFor();delete (*device);(*device) = 0;}m_devices.clear();// Rename files that have been sent if required. Don't care if renames fail.if( m_renameSent ){int count = 0;AnsiString oldFile, newFile;UdFileItem file = m_task.files.begin();while( (++count <= m_stats.processedFiles) &&(file != m_task.files.end()) ){// Don't prepend SEND_STRING twiceif( (*file).name.Pos(SENT_STRING) != 1 ){oldFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;newFile = IncludeTrailingPathDelimiter(m_task.sourcePath) +AnsiString(SENT_STRING) + (*file).name;MoveFile( oldFile.c_str(), newFile.c_str() );}file++;}}}}//---------------------------------------------------------------------------void UdTransferThread::RampTransferBatch(){if( m_task.rampType != RampTestDisabled ){// Check if it is time to start ramping batch sizesif( !m_ramp.running ){m_ramp.running = (TDateTime::CurrentDateTime() >= m_task.rampTime);}// Calculate the new batch size for the next transferif( m_ramp.running ){// Calculation is linear function, y = mx + c where:// y = Next batch size// m = Ramp gradient// x = Ramped Upload batch count// c = Batch size for uploadm_ramp.batchSize = m_task.gradient * (++m_ramp.count) + m_task.batchSize;}}}//---------------------------------------------------------------------------bool UdTransferThread::TransferNextBatch(){// Find the next file to start a batch on. Account for recurring tasks.UdFileItem file = m_task.files.begin();advance( file, m_task.recurring? (m_stats.processedFiles % m_stats.totalFiles): m_stats.processedFiles );AnsiString sourceFile;int batchSize = GetBatchSize();int count = 0;int transferSize = 0;// Transfer files until we stop running, we reach the end of all transferrable// files, or the batch transfer size is reachedwhile( m_running &&(file != m_task.files.end()) &&(count < batchSize) ){sourceFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;if( TransferUdFile( sourceFile ) ){m_stats.processedBytes += (*file).size;transferSize += (*file).size;++m_stats.processedFiles;++file;++count;}else{// Abort further processingm_error = true;m_task.state = UdTransferFailed;AnsiString msg = "Failed to transfer file " + (*file).name;m_manager->UpdateTask( m_task, msg );break;}// Roll over to the start if this is a recurring transferif( m_task.recurring &&(file == m_task.files.end()) ){file = m_task.files.begin();}}if( !m_error ) m_stats.batches.push_back( transferSize );// Return true if there are no more files to transferreturn (file == m_task.files.end());}//---------------------------------------------------------------------------bool UdTransferThread::TransferUdFile( const AnsiString & filename ){bool transferred = true;if( m_simulators ){// Get the next device simulator to send the UD filevector<DeviceSimulatorThread *>::iterator device = m_devices.begin();advance( device, ++m_lastDevice );if( device == m_devices.end() ){m_lastDevice = 0;device = m_devices.begin();}// Can't check the state of the transfer as the transfer is handled// in another thread. After a batch is sent we check with all the device// simulator threads for potential transfer problemsif( (*device) != NULL ){(*device)->UploadUDFile( filename );}}else{// Simply copy the file to the destination directoryAnsiString destFile = IncludeTrailingPathDelimiter(m_task.destination) +ExtractFileName(filename);transferred = (CopyFile( filename.c_str(), destFile.c_str(), false ) != 0);}return transferred;}//---------------------------------------------------------------------------int UdTransferThread::GetBatchSize(){return m_ramp.running ? m_ramp.batchSize : m_task.batchSize;}//---------------------------------------------------------------------------void __fastcall UdTransferThread::GetTransferStatistics( UdTransferStatistics & stats ){stats.completed = m_stats.completed;stats.startTime = m_stats.startTime;stats.endTime = m_stats.endTime;stats.processedFiles = m_stats.processedFiles;stats.totalFiles = m_stats.totalFiles;stats.processedBytes = m_stats.processedBytes;stats.totalBytes = m_stats.totalBytes;stats.batches = m_stats.batches;}//---------------------------------------------------------------------------bool UdTransferThread::ConnectDeviceSimulators(){bool connected = true;int number = m_maxDevices;if( m_task.batchSize < number ){number = m_task.batchSize;}// Create and start simulator threadsfor( int i = 0; i < number; i++ ){unsigned long deviceId = m_deviceId + i;if( deviceId <= 0 ) deviceId = 1;AnsiString installationId;installationId.sprintf( "??????????-???-%s-%08d", m_deviceType, deviceId );IDevice * device = &m_deviceFactory->createMASSDevice(m_deviceType.c_str(), deviceId, installationId.c_str() );if( device != NULL ){// Simulator thread object now owns the deviceDeviceSimulatorThread * thread =new DeviceSimulatorThread( device, m_deviceFactory, m_task.destination );if( thread != NULL ){m_devices.push_back( thread );// Wait for the first connection to confirm that the destination host is// running a site computer, and that the device configuration is correct.// Subsequent disconnections (if any) will be handled by the device thread.int count = m_connectTimeout * 2;while( (count >= 0) && !thread->IsConnected() ){count--;Sleep( 500 );}if( count == 0 ){connected = false;break;}// Need a slight delay to confirm that the device has connected all channels// (CMD/UD/CD) -- if not, we may overlap channels between devices and this can// take a long time to sort itself out (lots of device disconnections).Sleep( 500 );}}if( !m_running ) break;}return connected;}//---------------------------------------------------------------------------void UdTransferThread::CheckDeviceSimulatorThreads(){AnsiString error = "";// We are expecting all threads to be running here, so a terminal error// has occurred if this is not the casevector<DeviceSimulatorThread *>::iterator device = m_devices.begin();for( ; device != m_devices.end(); device++ ){if( ((*device) != NULL) &&!(*device)->IsRunning() ){error = (*device)->GetErrorMessage();if( !error.IsEmpty() ){break;}}}// Abort further processingif( !error.IsEmpty() ){m_error = true;m_task.state = UdTransferFailed;m_manager->UpdateTask( m_task, error );}}