//--------------------------------------------------------------------------- #pragma warn -com #include #pragma warn +com #pragma hdrstop #include "UdTransferManager.h" #include "UdTransferThread.h" #include "DataModule.h" #include "UdTransferHistory.h" #include "IDevice.h" //--------------------------------------------------------------------------- namespace { AnsiString GetSQLDateTime( TDateTime datetime ) { AnsiString sqldate; sqldate.sprintf("TO_DATE('%s','YYYY-MM-DD HH24:MI:SS')", datetime.FormatString("yyyy-mm-dd hh:nn:ss").c_str()); return sqldate; } } //--------------------------------------------------------------------------- #pragma package(smart_init) UdTransferManager::UdTransferManager() : m_deviceSimulatorFactory( 0 ) { // Get required settings m_username = getenv("USERNAME"); m_hostname = getenv("COMPUTERNAME"); if( m_username.IsEmpty() ) { char name[ 256 ]; unsigned long size = sizeof( name ); if( GetUserName( name, &size ) ) { m_username = name; } } if( m_hostname.IsEmpty() ) { char name[ 256 ]; unsigned long size = sizeof( name ); if( GetComputerName( name, &size ) ) { m_hostname = name; } } m_tasksLock = new Syncobjs::TCriticalSection; // Load the device simulator library m_deviceSimulatorWrapperDll = ::LoadLibrary("DeviceSimulatorWrapperP.dll"); if( m_deviceSimulatorWrapperDll == NULL ) { MessageDlg("Failed to load library 'DeviceSimulatorWrapperP.dll'.\nDevice simulation functionality will not be available.", mtError, TMsgDlgButtons() << mbOK, 0 ); } else { // Get a device factory instance m_getDeviceFactory = reinterpret_cast< getDeviceFactory_t >( ::GetProcAddress( m_deviceSimulatorWrapperDll, "getDeviceFactory" ) ); if( m_getDeviceFactory == NULL ) { MessageDlg("Failed to get device factory from library 'DeviceSimulatorWrapperP.dll'.\nDevice simulation functionality will not be available.", mtError, TMsgDlgButtons() << mbOK, 0 ); } else { m_deviceSimulatorFactory = m_getDeviceFactory(); } } } //--------------------------------------------------------------------------- UdTransferManager::~UdTransferManager() { UdTransferThreadItem thread = m_threads.begin(); for( ; thread != m_threads.end(); thread++ ) { if( (*thread).second != NULL ) { (*thread).second->StopTransfer(); delete (*thread).second; } } m_threads.clear(); delete m_tasksLock; if( m_deviceSimulatorWrapperDll != NULL ) { // Freeing the library deletes the factory instance FreeLibrary( m_deviceSimulatorWrapperDll ); m_deviceSimulatorFactory = 0; } } //--------------------------------------------------------------------------- void UdTransferManager::AddNewTask( const UdTransferTask & task ) { // Add the new task to the database TADOQuery * txnquery = new TADOQuery(NULL); if( txnquery == NULL ) return; txnquery->Connection = Data_Module->IntegrationDBConnection; AnsiString sql_statement; sql_statement.sprintf("insert into TRANSFER_HISTORY " " (ADDED_TIME, USERNAME, HOSTNAME, STATUS, " " RAMP_TIME, SOURCE_PATH, BATCH_SIZE, " " DURATION, FREQUENCY, RECURRING, " " RAMP_TYPE, RAMP_GRADIENT, DESTINATION) " "values (sysdate, '%s', '%s', %d, " " %s, '%s', %d, " " %d, %d, %d, " " %d, %f, '%s') ", m_username, m_hostname, (int)UdTransferPending, GetSQLDateTime(task.rampTime), task.sourcePath, task.batchSize, task.duration, task.frequency, task.recurring, (int)task.rampType, task.gradient, task.destination); txnquery->SQL->Text = sql_statement; if( txnquery->ExecSQL() != 1 ) { MERROR( "Failed to add new UD transfer task to the DB" ); } txnquery->Close(); // Now get its id and add it to our list sql_statement.sprintf("select TRANSFER_NO, ADDED_TIME " "from TRANSFER_HISTORY " "where USERNAME = '%s' and " " HOSTNAME = '%s' and " " STATUS = %d and " " RAMP_TIME = %s and " " BATCH_SIZE = %d and " " DURATION = %d and " " FREQUENCY = %d and " " RECURRING = %d and " " RAMP_TYPE = %d and " " RAMP_GRADIENT = %f " "order by ADDED_TIME desc ", m_username, m_hostname, (int)UdTransferPending, GetSQLDateTime(task.rampTime), task.batchSize, task.duration, task.frequency, task.recurring, (int)task.rampType, task.gradient); txnquery->SQL->Text = sql_statement; txnquery->Open(); UdTransferTask newTask( task ); if( !txnquery->Eof ) { newTask.id = txnquery->FieldByName("TRANSFER_NO")->AsInteger; m_tasksLock->Acquire(); m_tasks.push_back( newTask ); m_tasksLock->Release(); if( !LaunchTask( newTask ) ) { MERROR( "Failed to launch newly added task (" << newTask.id << ")" ); } } else { MERROR( "Failed to retrieve ID of added UD transfer task" ); } delete txnquery; } //--------------------------------------------------------------------------- void UdTransferManager::LoadPendingTasks() { TADOQuery * txnquery = new TADOQuery(NULL); if( txnquery == NULL ) return; txnquery->Connection = Data_Module->IntegrationDBConnection; AnsiString sql_statement; sql_statement.sprintf("select TRANSFER_NO, BATCH_SIZE, DURATION, FREQUENCY, " " RECURRING, SOURCE_PATH, RAMP_TIME, RAMP_TYPE, " " RAMP_GRADIENT, STATUS, DESTINATION " "from TRANSFER_HISTORY " "where STATUS = %d and " " USERNAME = '%s' and " " HOSTNAME = '%s' ", (int)UdTransferPending, m_username, m_hostname); txnquery->SQL->Text = sql_statement; txnquery->Open(); UdTransferTask task; while( !txnquery->Eof ) { task.id = txnquery->FieldByName("TRANSFER_NO")->AsInteger; task.batchSize = txnquery->FieldByName("BATCH_SIZE")->AsInteger; task.duration = txnquery->FieldByName("DURATION")->AsInteger; task.frequency = txnquery->FieldByName("FREQUENCY")->AsInteger; task.recurring = (txnquery->FieldByName("RECURRING")->AsInteger > 0); task.sourcePath = txnquery->FieldByName("SOURCE_PATH")->AsString; task.rampTime = txnquery->FieldByName("RAMP_TIME")->AsDateTime; task.rampType = (RampTestType)txnquery->FieldByName("RAMP_TYPE")->AsInteger; task.gradient = txnquery->FieldByName("RAMP_GRADIENT")->AsFloat; task.state = (UdTransferTaskState)txnquery->FieldByName("STATUS")->AsInteger; task.destination = txnquery->FieldByName("DESTINATION")->AsString; m_tasksLock->Acquire(); m_tasks.push_back( task ); m_tasksLock->Release(); txnquery->Next(); } delete txnquery; if( !m_tasks.empty() ) { // Ask user to start any previously pending tasks AnsiString msg; if( m_tasks.size() == 1 ) { msg = "There is a UD task currently pending. Do you want to start it now?"; } else { msg = "There are " + IntToStr(m_tasks.size()) + " UD transfer tasks currently pending."; msg += "\nDo you want to start them now?"; } if( MessageDlg(msg, mtConfirmation, TMsgDlgButtons() << mbYes << mbNo, 0 ) == mbYes ) { m_tasksLock->Acquire(); try { UdTransferTaskItem it = m_tasks.begin(); for( ; it != m_tasks.end(); it++ ) { if( (*it).state == UdTransferPending ) { LaunchTask( (*it) ); } } } __finally { m_tasksLock->Release(); } } } } //--------------------------------------------------------------------------- void UdTransferManager::UpdateTask( const UdTransferTask & task, const AnsiString & comment ) { // Find our locally stored task m_tasksLock->Acquire(); try { UdTransferTaskItem it = m_tasks.begin(); for( ; it != m_tasks.end(); it++ ) { if( (*it).id == task.id ) break; } if( (it != m_tasks.end()) ) // TODO: && ((*it) != task) ) { bool taskDone = (((*it).state > UdTransferAborted) && (task.state <= UdTransferAborted)); TADOQuery * txnquery = new TADOQuery(NULL); if( txnquery == NULL ) return; txnquery->Connection = Data_Module->IntegrationDBConnection; // Find parameters that have changed and add them to the update query std::stringstream stream; stream << "update TRANSFER_HISTORY set "; if( (*it).batchSize != task.batchSize ) stream << " BATCH_SIZE = " << task.batchSize << ","; if( (*it).duration != task.duration ) stream << " DURATION = " << task.duration << ","; if( (*it).frequency != task.frequency ) stream << " FREQUENCY = " << task.frequency << ","; if( (*it).recurring != task.recurring ) stream << " RECURRING = " << task.recurring << ","; if( (*it).sourcePath != task.sourcePath ) stream << " SOURCE_PATH = '" << task.sourcePath.c_str() << "',"; if( (*it).destination != task.destination ) stream << " DESTINATION = '" << task.destination.c_str() << "',"; if( (*it).state != task.state ) stream << " STATUS = " << task.state << ","; if( (*it).rampType != task.rampType ) stream << " RAMP_TYPE = " << task.rampType << ","; if( (*it).gradient != task.gradient ) stream << " RAMP_GRADIENT = " << task.gradient << ","; if( (*it).rampTime != task.rampTime ) stream << " RAMP_TIME = " << GetSQLDateTime(task.rampTime).c_str() << ","; if( !comment.IsEmpty() ) stream << " COMMENTS = '" << comment.c_str() << "',"; if( taskDone ) stream << " END_TIME = sysdate,"; AnsiString sql_statement, temp; temp = stream.str().c_str(); temp.Delete(temp.Length(), 1); // Remove last comma sql_statement.sprintf("%s where TRANSFER_NO = %d", temp, task.id); txnquery->SQL->Text = sql_statement; if( txnquery->ExecSQL()!= 1 ) { MERROR( "Failed to update transfer history on statement: " << sql_statement ); } delete txnquery; (*it) = task; if( taskDone ) { m_tasks.erase( it ); UdTransferThreadItem thread = m_threads.find( task.id ); if( thread != m_threads.end() ) { (*thread).second->StopTransfer(); } } } } __finally { m_tasksLock->Release(); } UdTransferHistoryForm->RefreshHistory(); } //--------------------------------------------------------------------------- bool UdTransferManager::GetTaskDetails( UdTransferTask & task ) { bool found = false; m_tasksLock->Acquire(); try { UdTransferTaskItem it = m_tasks.begin(); for( ; it != m_tasks.end(); it++ ) { if( (*it).id == task.id ) { found = true; task = (*it); break; } } } __finally { m_tasksLock->Release(); } return found; } //--------------------------------------------------------------------------- bool UdTransferManager::LaunchTask( const UdTransferTask & task ) { bool launched = false; // Ensure we haven't already got a thread for this task UdTransferThreadItem thread = m_threads.find( task.id ); if( thread == m_threads.end() ) { m_threads[task.id] = new UdTransferThread( task, this, m_deviceSimulatorFactory ); launched = true; } return launched; } //--------------------------------------------------------------------------- bool UdTransferManager::GetTaskStatistics( const int & taskId, UdTransferStatistics & stats ) { bool found = false; // Check if we are currently processing the device, if so // request the stats from the task thread UdTransferThreadItem thread = m_threads.find( taskId ); if( thread != m_threads.end() ) { if( (*thread).second != NULL ) { (*thread).second->GetTransferStatistics( stats ); found = true; } } // Otherwise grab the info from the database if( !found ) { TADOQuery * txnquery = new TADOQuery(NULL); if( txnquery == NULL ) return false; txnquery->Connection = Data_Module->IntegrationDBConnection; AnsiString sql_statement; sql_statement.sprintf("select ADDED_TIME, END_TIME, NUMBER_FILES, NUMBER_BYTES, BATCH_HISTORY " "from TRANSFER_HISTORY " "where TRANSFER_NO = %d", taskId); txnquery->SQL->Text = sql_statement; txnquery->Open(); if( !txnquery->Eof ) { stats.completed = true; stats.startTime = txnquery->FieldByName("ADDED_TIME")->AsDateTime; stats.processedFiles = txnquery->FieldByName("NUMBER_FILES")->AsInteger; stats.processedBytes = txnquery->FieldByName("NUMBER_BYTES")->AsInteger; // End time may not be populated in times of program crashes. Ignore the // problem here and handle it when the statistics get used. try { stats.endTime = txnquery->FieldByName("END_TIME")->AsDateTime; } catch( EDatabaseError & e ) { } AnsiString batches = txnquery->FieldByName("BATCH_HISTORY")->AsString; if( !batches.IsEmpty() && (batches != "(null)") ) { // Tokenise our batch history string to our list TStringList * list = new TStringList; try { if( list != NULL ) { try { list->CommaText = batches; for( int i = 0; i < list->Count; i++ ) { stats.batches.push_back( list->Strings[i].ToInt() ); } } catch( EConvertError & e ) {} // Ignore } } __finally { delete list; } } found = true; } delete txnquery; } return found; } //--------------------------------------------------------------------------- void UdTransferManager::UpdateTaskStatistics( const int & taskId, const UdTransferStatistics & stats ) { // Format the batches list into a comma delimited string AnsiString batches; BatchSizeItem batch = (int *)stats.batches.begin(); for( ; batch != stats.batches.end(); batch++ ) { if( batch != stats.batches.begin() ) batches += ","; batches += IntToStr( *batch ); } TADOQuery * txnquery = new TADOQuery(NULL); if( txnquery == NULL ) return; txnquery->Connection = Data_Module->IntegrationDBConnection; AnsiString sql_statement; sql_statement.sprintf("update TRANSFER_HISTORY set " " NUMBER_FILES = %d, " " NUMBER_BYTES = %d, " " BATCH_HISTORY = '%s' " "where TRANSFER_NO = %d", stats.processedFiles, stats.processedBytes, batches, taskId); txnquery->SQL->Text = sql_statement; if( txnquery->ExecSQL() != 1 ) { MERROR( "Failed to persist transfer statistics to database for task " << taskId ); } txnquery->Close(); delete txnquery; UdTransferHistoryForm->RefreshHistory(); } //--------------------------------------------------------------------------- bool UdTransferManager::IsTaskRunning() { bool running = false; UdTransferThreadItem thread = m_threads.begin(); for( ; thread != m_threads.end(); thread++ ) { if( ((*thread).second != NULL) && (*thread).second->IsRunning() ) { running = true; break; } } return running; }