//--------------------------------------------------------------------------- #pragma warn -com #include #pragma warn +com #include #pragma hdrstop #include "UdTransferThread.h" #include #include #include #include "IDeviceFactory.h" #include "IDevice.h" #include "DeviceSimulatorThread.h" namespace { const char * REG_BASE_KEY = "\\Software\\ERG\\TxnTestManager\\UD Transfer"; const char * REG_RENAME = "RenameSentFiles"; const char * REG_IGNORESENT = "IgnoreSentFiles"; const char * REG_MAXDEVICES = "MaxDeviceSimulators"; const char * REG_DEVICEID = "StartingDeviceId"; const char * REG_DEVICETYPE = "DeviceType"; const char * REG_CONNECTTIMEOUT = "DeviceConnectTimeout"; const char * SENT_STRING = "sent_"; } using namespace std; #pragma package(smart_init) //--------------------------------------------------------------------------- // Important: Methods and properties of objects in VCL can only be // used in a method called using Synchronize, for example: // // Synchronize(UpdateCaption); // // where UpdateCaption could look like: // // void __fastcall UdTransferThread::UpdateCaption() // { // Form1->Caption = "Updated in a thread"; // } //--------------------------------------------------------------------------- __fastcall UdTransferThread::UdTransferThread(const UdTransferTask & task, UdTransferManager * manager, IDeviceFactory * deviceFactory ) : TThread( false ), m_task( task ), m_manager( manager ), m_deviceFactory( deviceFactory ), m_lastDevice( 0 ), m_running( false ), m_error( false ), m_simulators( false ), m_renameSent( false ), m_ignoreSent( false ), m_maxDevices( 100 ), m_deviceId( 1 ), m_connectTimeout( 5 ), m_deviceType("000SIM") { Priority = tpHigher; } //--------------------------------------------------------------------------- void UdTransferThread::SetName() { THREADNAME_INFO info; info.dwType = 0x1000; info.szName = "UdTransferThread"; info.dwThreadID = -1; info.dwFlags = 0; __try { RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD),(DWORD*)&info ); } __except (EXCEPTION_CONTINUE_EXECUTION) { } } //--------------------------------------------------------------------------- bool UdTransferThread::InitialiseTask() { if( DirectoryExists( m_task.sourcePath ) ) { // Read generic transfer settings from the registry TRegistry * registry = new TRegistry; registry->RootKey = HKEY_CURRENT_USER; if( registry->OpenKey(REG_BASE_KEY, true) ) { if( registry->ValueExists(REG_RENAME) ) { m_renameSent = registry->ReadBool(REG_RENAME); } if( registry->ValueExists(REG_IGNORESENT) ) { m_ignoreSent = registry->ReadBool(REG_IGNORESENT); } if( registry->ValueExists(REG_MAXDEVICES) ) { m_maxDevices = registry->ReadInteger(REG_MAXDEVICES); if( m_maxDevices <= 0 ) m_maxDevices = 100; } if( registry->ValueExists(REG_DEVICEID) ) { m_deviceId = registry->ReadInteger(REG_DEVICEID); if( m_deviceId <= 0 ) m_deviceId = 1; } if( registry->ValueExists(REG_DEVICETYPE) ) { m_deviceType = registry->ReadString(REG_DEVICETYPE).SubString(1, 6); if( m_deviceType.Length() < 6 ) { m_deviceType.Insert( AnsiString::StringOfChar('0', 6 - m_deviceType.Length()), 1 ); } } if( registry->ValueExists(REG_CONNECTTIMEOUT) ) { m_connectTimeout = registry->ReadInteger(REG_CONNECTTIMEOUT); if( m_connectTimeout <= 0 ) m_connectTimeout = 5; } registry->CloseKey(); delete registry; registry = NULL; } // Initialise the file list if required if( m_task.files.empty() ) { UdFile file; TSearchRec record; if( FindFirst( m_task.sourcePath + "\\*.devud", faReadOnly, record ) == 0 ) { do { if( (record.Attr & faDirectory) == 0 ) { if( !(m_ignoreSent && (record.Name.Pos(SENT_STRING) == 1)) ) { file.name = record.Name; file.size = record.Size; file.time = FileDateToDateTime( record.Time ); m_task.files.push_back( file ); m_stats.totalBytes += file.size; } } } while( FindNext( record ) == 0 ); FindClose( record ); } } // Sort the ud files by their creation time sort( m_task.files.begin(), m_task.files.end() ); if( (m_task.batchSize == 0) || (m_task.batchSize > (int)m_task.files.size()) ) { m_task.batchSize = m_task.files.size(); } // Initialise ramp settings if( m_task.rampType != RampTestDisabled ) { m_ramp.batchSize = m_task.batchSize; m_ramp.count = 0; if( m_task.rampType == RampTestByRatio ) { m_task.gradient = (double)m_task.files.size() / m_task.duration; } } m_stats.completed = false; m_stats.totalFiles = m_task.files.size(); // Check if we require use of device simulators. This is the case if the destination // provided is not a path (no characters associated with paths). m_simulators = (m_task.destination.LastDelimiter("\\/:") == 0); if( m_simulators ) { AnsiString msg; msg = "Connecting device simulators to " + m_task.destination + "."; m_manager->UpdateTask( m_task, msg ); if( !ConnectDeviceSimulators() ) { m_error = true; m_task.state = UdTransferFailed; msg = "Failed to connect device simulators to " + m_task.destination + "."; m_manager->UpdateTask( m_task, msg ); return false; } } // Now ready to begin processing. Set the start/end time now as the previous // step may have consumed quite a bit of time. m_stats.startTime = TDateTime::CurrentDateTime(); m_stats.endTime = m_stats.startTime + (double)m_task.duration /(24*60*60); m_task.state = UdTransferInProgress; m_manager->UpdateTask( m_task, "Processing started at " + m_stats.startTime + "." ); return (m_task.batchSize > 0); } return false; } //--------------------------------------------------------------------------- void __fastcall UdTransferThread::Execute() { SetName(); m_running = true; try { try { bool complete = false; if( InitialiseTask() ) { // Start transferring files, repeat if a recurring task do { while( m_running && !complete && !m_error ) { TDateTime lastBatch = TDateTime::CurrentDateTime(); complete = TransferNextBatch(); // Check if we have reached the end of the upload period if( TDateTime::CurrentDateTime() >= m_stats.endTime ) { complete = true; m_running = false; } else if( m_running && !complete && !m_error ) { // Sleep until the next frequency period while( m_running && SecondsBetween( TDateTime::CurrentDateTime(), lastBatch ) < m_task.frequency ) { Sleep( 500 ); } RampTransferBatch(); } // Check whether the device simulator threads are still happy if( m_running && !m_error && m_simulators ) { CheckDeviceSimulatorThreads(); } } } while( m_task.recurring && m_running && !m_error ); } else if( !m_error ) { m_error = true; m_task.state = UdTransferFailed; m_manager->UpdateTask( m_task, "Failed to initialise usage data transfer task." ); } // Update task state if we are successful if( !m_error && complete ) { m_task.state = UdTransferCompleted; AnsiString msg = "Successfully transferred " + IntToStr( m_stats.processedFiles ) + " files (" + AnsiString::FloatToStrF((long double)m_stats.processedBytes, AnsiString::sffNumber, 18, 0) + " bytes)."; m_manager->UpdateTask( m_task, msg ); } // Update task statistics m_manager->UpdateTaskStatistics( m_task.id, m_stats ); } catch( ... ) { m_task.state = UdTransferFailed; m_manager->UpdateTask( m_task, "A fatal exception occurred during UD transfer processing." ); m_manager->UpdateTaskStatistics( m_task.id, m_stats ); } } __finally { m_running = false; m_stats.completed = true; // Shutdown device simulators. This is much quicker if we quickly run through // each and stop them first. vector::iterator device = m_devices.begin(); for( ; device != m_devices.end(); device++ ) { (*device)->Stop(); } for( device = m_devices.begin(); device != m_devices.end(); device++ ) { (*device)->WaitFor(); delete (*device); (*device) = 0; } m_devices.clear(); // Rename files that have been sent if required. Don't care if renames fail. if( m_renameSent ) { int count = 0; AnsiString oldFile, newFile; UdFileItem file = m_task.files.begin(); while( (++count <= m_stats.processedFiles) && (file != m_task.files.end()) ) { // Don't prepend SEND_STRING twice if( (*file).name.Pos(SENT_STRING) != 1 ) { oldFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name; newFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + AnsiString(SENT_STRING) + (*file).name; MoveFile( oldFile.c_str(), newFile.c_str() ); } file++; } } } } //--------------------------------------------------------------------------- void UdTransferThread::RampTransferBatch() { if( m_task.rampType != RampTestDisabled ) { // Check if it is time to start ramping batch sizes if( !m_ramp.running ) { m_ramp.running = (TDateTime::CurrentDateTime() >= m_task.rampTime); } // Calculate the new batch size for the next transfer if( m_ramp.running ) { // Calculation is linear function, y = mx + c where: // y = Next batch size // m = Ramp gradient // x = Ramped Upload batch count // c = Batch size for upload m_ramp.batchSize = m_task.gradient * (++m_ramp.count) + m_task.batchSize; } } } //--------------------------------------------------------------------------- bool UdTransferThread::TransferNextBatch() { // Find the next file to start a batch on. Account for recurring tasks. UdFileItem file = m_task.files.begin(); advance( file, m_task.recurring ? (m_stats.processedFiles % m_stats.totalFiles) : m_stats.processedFiles ); AnsiString sourceFile; int batchSize = GetBatchSize(); int count = 0; int transferSize = 0; // Transfer files until we stop running, we reach the end of all transferrable // files, or the batch transfer size is reached while( m_running && (file != m_task.files.end()) && (count < batchSize) ) { sourceFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name; if( TransferUdFile( sourceFile ) ) { m_stats.processedBytes += (*file).size; transferSize += (*file).size; ++m_stats.processedFiles; ++file; ++count; } else { // Abort further processing m_error = true; m_task.state = UdTransferFailed; AnsiString msg = "Failed to transfer file " + (*file).name; m_manager->UpdateTask( m_task, msg ); break; } // Roll over to the start if this is a recurring transfer if( m_task.recurring && (file == m_task.files.end()) ) { file = m_task.files.begin(); } } if( !m_error ) m_stats.batches.push_back( transferSize ); // Return true if there are no more files to transfer return (file == m_task.files.end()); } //--------------------------------------------------------------------------- bool UdTransferThread::TransferUdFile( const AnsiString & filename ) { bool transferred = true; if( m_simulators ) { // Get the next device simulator to send the UD file vector::iterator device = m_devices.begin(); advance( device, ++m_lastDevice ); if( device == m_devices.end() ) { m_lastDevice = 0; device = m_devices.begin(); } // Can't check the state of the transfer as the transfer is handled // in another thread. After a batch is sent we check with all the device // simulator threads for potential transfer problems if( (*device) != NULL ) { (*device)->UploadUDFile( filename ); } } else { // Simply copy the file to the destination directory AnsiString destFile = IncludeTrailingPathDelimiter(m_task.destination) + ExtractFileName(filename); transferred = (CopyFile( filename.c_str(), destFile.c_str(), false ) != 0); } return transferred; } //--------------------------------------------------------------------------- int UdTransferThread::GetBatchSize() { return m_ramp.running ? m_ramp.batchSize : m_task.batchSize; } //--------------------------------------------------------------------------- void __fastcall UdTransferThread::GetTransferStatistics( UdTransferStatistics & stats ) { stats.completed = m_stats.completed; stats.startTime = m_stats.startTime; stats.endTime = m_stats.endTime; stats.processedFiles = m_stats.processedFiles; stats.totalFiles = m_stats.totalFiles; stats.processedBytes = m_stats.processedBytes; stats.totalBytes = m_stats.totalBytes; stats.batches = m_stats.batches; } //--------------------------------------------------------------------------- bool UdTransferThread::ConnectDeviceSimulators() { bool connected = true; int number = m_maxDevices; if( m_task.batchSize < number ) { number = m_task.batchSize; } // Create and start simulator threads for( int i = 0; i < number; i++ ) { unsigned long deviceId = m_deviceId + i; if( deviceId <= 0 ) deviceId = 1; AnsiString installationId; installationId.sprintf( "??????????-???-%s-%08d", m_deviceType, deviceId ); IDevice * device = &m_deviceFactory->createMASSDevice( m_deviceType.c_str(), deviceId, installationId.c_str() ); if( device != NULL ) { // Simulator thread object now owns the device DeviceSimulatorThread * thread = new DeviceSimulatorThread( device, m_deviceFactory, m_task.destination ); if( thread != NULL ) { m_devices.push_back( thread ); // Wait for the first connection to confirm that the destination host is // running a site computer, and that the device configuration is correct. // Subsequent disconnections (if any) will be handled by the device thread. int count = m_connectTimeout * 2; while( (count >= 0) && !thread->IsConnected() ) { count--; Sleep( 500 ); } if( count == 0 ) { connected = false; break; } // Need a slight delay to confirm that the device has connected all channels // (CMD/UD/CD) -- if not, we may overlap channels between devices and this can // take a long time to sort itself out (lots of device disconnections). Sleep( 500 ); } } if( !m_running ) break; } return connected; } //--------------------------------------------------------------------------- void UdTransferThread::CheckDeviceSimulatorThreads() { AnsiString error = ""; // We are expecting all threads to be running here, so a terminal error // has occurred if this is not the case vector::iterator device = m_devices.begin(); for( ; device != m_devices.end(); device++ ) { if( ((*device) != NULL) && !(*device)->IsRunning() ) { error = (*device)->GetErrorMessage(); if( !error.IsEmpty() ) { break; } } } // Abort further processing if( !error.IsEmpty() ) { m_error = true; m_task.state = UdTransferFailed; m_manager->UpdateTask( m_task, error ); } }