Subversion Repositories DevTools

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2263 kivins 1
//---------------------------------------------------------------------------
2
 
3
#pragma warn -com
4
#include <LoggingMacros.h>
5
#pragma warn +com
6
 
7
#include <vcl.h>
8
#pragma hdrstop
9
 
10
#include "UdTransferThread.h"
11
#include <algorithm>
12
#include <dateutils.hpp>
13
#include <Registry.hpp>
14
 
15
#include "IDeviceFactory.h"
16
#include "IDevice.h"
17
#include "DeviceSimulatorThread.h"
18
 
19
namespace
20
{
21
    const char * REG_BASE_KEY       =   "\\Software\\ERG\\TxnTestManager\\UD Transfer";
22
    const char * REG_RENAME         =   "RenameSentFiles";
23
    const char * REG_IGNORESENT     =   "IgnoreSentFiles";
24
    const char * REG_MAXDEVICES     =   "MaxDeviceSimulators";
25
    const char * REG_DEVICEID       =   "StartingDeviceId";
26
    const char * REG_DEVICETYPE     =   "DeviceType";
27
    const char * REG_CONNECTTIMEOUT =   "DeviceConnectTimeout";
28
 
29
    const char * SENT_STRING        =   "sent_";
30
}
31
 
32
using namespace std;
33
 
34
#pragma package(smart_init)
35
//---------------------------------------------------------------------------
36
 
37
//   Important: Methods and properties of objects in VCL can only be
38
//   used in a method called using Synchronize, for example:
39
//
40
//      Synchronize(UpdateCaption);
41
//
42
//   where UpdateCaption could look like:
43
//
44
//      void __fastcall UdTransferThread::UpdateCaption()
45
//      {
46
//        Form1->Caption = "Updated in a thread";
47
//      }
48
//---------------------------------------------------------------------------
49
 
50
__fastcall UdTransferThread::UdTransferThread(const UdTransferTask & task,
51
                                              UdTransferManager * manager,
52
                                              IDeviceFactory * deviceFactory ) :
53
    TThread( false ),
54
    m_task( task ),
55
    m_manager( manager ),
56
    m_deviceFactory( deviceFactory ),
57
    m_lastDevice( 0 ),
58
    m_running( false ),
59
    m_error( false ),
60
    m_simulators( false ),
61
    m_renameSent( false ),
62
    m_ignoreSent( false ),
63
    m_maxDevices( 100 ),
64
    m_deviceId( 1 ),
65
    m_connectTimeout( 5 ),
66
    m_deviceType("000SIM")
67
 
68
{
69
    Priority = tpHigher;
70
}
71
//---------------------------------------------------------------------------
72
 
73
void UdTransferThread::SetName()
74
{
75
    THREADNAME_INFO info;
76
    info.dwType = 0x1000;
77
    info.szName = "UdTransferThread";
78
    info.dwThreadID = -1;
79
    info.dwFlags = 0;
80
 
81
    __try
82
    {
83
         RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD),(DWORD*)&info );
84
    }
85
    __except (EXCEPTION_CONTINUE_EXECUTION)
86
    {
87
    }
88
}
89
//---------------------------------------------------------------------------
90
 
91
bool UdTransferThread::InitialiseTask()
92
{
93
    if( DirectoryExists( m_task.sourcePath ) )
94
    {
95
        // Read generic transfer settings from the registry
96
 
97
        TRegistry * registry = new TRegistry;
98
        registry->RootKey = HKEY_CURRENT_USER;
99
        if( registry->OpenKey(REG_BASE_KEY, true) )
100
        {
101
            if( registry->ValueExists(REG_RENAME) )
102
            {
103
                m_renameSent = registry->ReadBool(REG_RENAME);
104
            }
105
            if( registry->ValueExists(REG_IGNORESENT) )
106
            {
107
                m_ignoreSent = registry->ReadBool(REG_IGNORESENT);
108
            }
109
            if( registry->ValueExists(REG_MAXDEVICES) )
110
            {
111
                m_maxDevices = registry->ReadInteger(REG_MAXDEVICES);
112
                if( m_maxDevices <= 0 ) m_maxDevices = 100;
113
            }
114
            if( registry->ValueExists(REG_DEVICEID) )
115
            {
116
                m_deviceId = registry->ReadInteger(REG_DEVICEID);
117
                if( m_deviceId <= 0 ) m_deviceId = 1;
118
            }
119
            if( registry->ValueExists(REG_DEVICETYPE) )
120
            {
121
                m_deviceType = registry->ReadString(REG_DEVICETYPE).SubString(1, 6);
122
                if( m_deviceType.Length() < 6 )
123
                {
124
                    m_deviceType.Insert( AnsiString::StringOfChar('0', 6 - m_deviceType.Length()), 1 );
125
                }
126
            }
127
            if( registry->ValueExists(REG_CONNECTTIMEOUT) )
128
            {
129
                m_connectTimeout = registry->ReadInteger(REG_CONNECTTIMEOUT);
130
                if( m_connectTimeout <= 0 ) m_connectTimeout = 5;
131
            }
132
 
133
            registry->CloseKey();
134
            delete registry;
135
            registry = NULL;
136
        }
137
 
138
        // Initialise the file list if required
139
 
140
        if( m_task.files.empty() )
141
        {
142
            UdFile file;        
143
            TSearchRec record;
144
            if( FindFirst( m_task.sourcePath + "\\*.devud", faReadOnly, record ) == 0 )
145
            {
146
                do
147
                {
148
                    if( (record.Attr & faDirectory) == 0 )
149
                    {
150
                        if( !(m_ignoreSent && (record.Name.Pos(SENT_STRING) == 1)) )
151
                        {
152
                            file.name = record.Name;
153
                            file.size = record.Size;
154
                            file.time = FileDateToDateTime( record.Time );
155
 
156
                            m_task.files.push_back( file );
157
                            m_stats.totalBytes += file.size;
158
                        }
159
                    }
160
                } while( FindNext( record ) == 0 );
161
 
162
                FindClose( record );
163
            }
164
        }
165
 
166
        // Sort the ud files by their creation time
167
 
168
        sort( m_task.files.begin(), m_task.files.end() );
169
        if( (m_task.batchSize == 0) || (m_task.batchSize > (int)m_task.files.size()) )
170
        {
171
            m_task.batchSize = m_task.files.size();
172
        }
173
 
174
        // Initialise ramp settings
175
 
176
        if( m_task.rampType != RampTestDisabled )
177
        {
178
            m_ramp.batchSize = m_task.batchSize;
179
            m_ramp.count     = 0;
180
 
181
            if( m_task.rampType == RampTestByRatio )
182
            {
183
                m_task.gradient = (double)m_task.files.size() / m_task.duration;
184
            }
185
        }
186
 
187
        m_stats.completed = false;
188
        m_stats.totalFiles = m_task.files.size();
189
 
190
        // Check if we require use of device simulators. This is the case if the destination
191
        // provided is not a path (no characters associated with paths).
192
 
193
        m_simulators = (m_task.destination.LastDelimiter("\\/:") == 0);
194
        if( m_simulators )
195
        {
196
            AnsiString msg;
197
            msg = "Connecting device simulators to " + m_task.destination + ".";
198
            m_manager->UpdateTask( m_task, msg );
199
 
200
            if( !ConnectDeviceSimulators() )
201
            {
202
                m_error = true;
203
                m_task.state = UdTransferFailed;
204
                msg = "Failed to connect device simulators to " + m_task.destination + ".";
205
                m_manager->UpdateTask( m_task, msg );
206
                return false;
207
            }
208
        }
209
 
210
        // Now ready to begin processing. Set the start/end time now as the previous
211
        // step may have consumed quite a bit of time.
212
 
213
        m_stats.startTime = TDateTime::CurrentDateTime();
214
        m_stats.endTime = m_stats.startTime + (double)m_task.duration /(24*60*60);
215
 
216
        m_task.state = UdTransferInProgress;
217
        m_manager->UpdateTask( m_task, "Processing started at " + m_stats.startTime + "." );
218
 
219
        return (m_task.batchSize > 0);
220
    }
221
 
222
    return false;
223
}
224
//---------------------------------------------------------------------------
225
 
226
void __fastcall UdTransferThread::Execute()
227
{
228
    SetName();
229
 
230
    m_running = true;
231
 
232
    try
233
    {
234
        try
235
        {
236
            bool complete = false;
237
 
238
            if( InitialiseTask() )
239
            {
240
                // Start transferring files, repeat if a recurring task
241
                do
242
                {
243
                    while( m_running && !complete && !m_error )
244
                    {
245
                        TDateTime lastBatch = TDateTime::CurrentDateTime();
246
                        complete = TransferNextBatch();
247
 
248
                        // Check if we have reached the end of the upload period
249
 
250
                        if( TDateTime::CurrentDateTime() >= m_stats.endTime )
251
                        {
252
                            complete = true;
253
                            m_running = false;
254
                        }
255
                        else if( m_running && !complete && !m_error )
256
                        {
257
                            // Sleep until the next frequency period
258
 
259
                            while( m_running &&
260
                                   SecondsBetween( TDateTime::CurrentDateTime(), lastBatch ) < m_task.frequency )
261
                            {
262
                                Sleep( 500 );
263
                            }
264
 
265
                            RampTransferBatch();
266
                        }
267
 
268
                        // Check whether the device simulator threads are still happy
269
 
270
                        if( m_running && !m_error && m_simulators )
271
                        {
272
                            CheckDeviceSimulatorThreads();
273
                        }
274
                    }
275
 
276
                } while( m_task.recurring && m_running && !m_error );
277
            }
278
            else if( !m_error )
279
            {
280
                m_error = true;
281
                m_task.state = UdTransferFailed;
282
                m_manager->UpdateTask( m_task, "Failed to initialise usage data transfer task." );
283
            }
284
 
285
            // Update task state if we are successful
286
 
287
            if( !m_error && complete )
288
            {
289
                m_task.state = UdTransferCompleted;
290
                AnsiString msg = "Successfully transferred " + IntToStr( m_stats.processedFiles ) +
291
                    " files (" +
292
                    AnsiString::FloatToStrF((long double)m_stats.processedBytes, AnsiString::sffNumber, 18, 0) +
293
                    " bytes).";
294
                m_manager->UpdateTask( m_task, msg );
295
            }
296
 
297
            // Update task statistics
298
 
299
            m_manager->UpdateTaskStatistics( m_task.id, m_stats );
300
        }
301
        catch( ... )
302
        {
303
            m_task.state = UdTransferFailed;
304
            m_manager->UpdateTask( m_task, "A fatal exception occurred during UD transfer processing." );
305
            m_manager->UpdateTaskStatistics( m_task.id, m_stats );
306
        }
307
    }
308
    __finally
309
    {
310
        m_running = false;
311
        m_stats.completed = true;
312
 
313
        // Shutdown device simulators. This is much quicker if we quickly run through
314
        // each and stop them first.
315
 
316
        vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
317
        for( ; device != m_devices.end(); device++ )
318
        {
319
            (*device)->Stop();
320
        }
321
        for( device = m_devices.begin(); device != m_devices.end(); device++ )
322
        {
323
            (*device)->WaitFor();
324
            delete (*device);
325
            (*device) = 0;
326
        }
327
        m_devices.clear();
328
 
329
        // Rename files that have been sent if required. Don't care if renames fail.
330
 
331
        if( m_renameSent )
332
        {
333
            int count = 0;
334
            AnsiString oldFile, newFile;            
335
            UdFileItem file = m_task.files.begin();
336
 
337
            while( (++count <= m_stats.processedFiles) &&
338
                   (file != m_task.files.end()) )
339
            {
340
                // Don't prepend SEND_STRING twice
341
 
342
                if( (*file).name.Pos(SENT_STRING) != 1 )
343
                {
344
                    oldFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;
345
                    newFile = IncludeTrailingPathDelimiter(m_task.sourcePath) +
346
                        AnsiString(SENT_STRING) + (*file).name;
347
 
348
                    MoveFile( oldFile.c_str(), newFile.c_str() );
349
                }
350
                file++;
351
            }
352
        }
353
    }
354
}
355
//---------------------------------------------------------------------------
356
 
357
void UdTransferThread::RampTransferBatch()
358
{
359
    if( m_task.rampType != RampTestDisabled )
360
    {
361
        // Check if it is time to start ramping batch sizes
362
 
363
        if( !m_ramp.running )
364
        {
365
            m_ramp.running = (TDateTime::CurrentDateTime() >= m_task.rampTime);
366
        }
367
 
368
        // Calculate the new batch size for the next transfer
369
 
370
        if( m_ramp.running )
371
        {
372
            // Calculation is linear function, y = mx + c where:
373
            //  y = Next batch size
374
            //  m = Ramp gradient
375
            //  x = Ramped Upload batch count
376
            //  c = Batch size for upload
377
 
378
            m_ramp.batchSize = m_task.gradient * (++m_ramp.count) + m_task.batchSize;
379
        }
380
    }
381
}
382
//---------------------------------------------------------------------------
383
 
384
bool UdTransferThread::TransferNextBatch()
385
{
386
    // Find the next file to start a batch on. Account for recurring tasks.
387
 
388
    UdFileItem file = m_task.files.begin();
389
    advance( file, m_task.recurring
390
                        ? (m_stats.processedFiles % m_stats.totalFiles)
391
                        : m_stats.processedFiles );
392
 
393
    AnsiString sourceFile;
394
    int batchSize = GetBatchSize();
395
    int count = 0;
396
    int transferSize = 0;
397
 
398
    // Transfer files until we stop running, we reach the end of all transferrable
399
    // files, or the batch transfer size is reached
400
 
401
    while( m_running &&
402
          (file != m_task.files.end()) &&
403
          (count < batchSize) )
404
    {
405
        sourceFile = IncludeTrailingPathDelimiter(m_task.sourcePath) + (*file).name;
406
 
407
        if( TransferUdFile( sourceFile ) )
408
        {
409
            m_stats.processedBytes  += (*file).size;
410
            transferSize            += (*file).size;
411
 
412
            ++m_stats.processedFiles;
413
            ++file;
414
            ++count;
415
        }
416
        else
417
        {
418
            // Abort further processing
419
 
420
            m_error = true;
421
            m_task.state = UdTransferFailed;
422
            AnsiString msg = "Failed to transfer file " + (*file).name;
423
            m_manager->UpdateTask( m_task, msg );
424
            break;
425
        }
426
 
427
        // Roll over to the start if this is a recurring transfer
428
 
429
        if( m_task.recurring &&
430
            (file == m_task.files.end()) )
431
        {
432
            file = m_task.files.begin();
433
        }
434
    }
435
 
436
    if( !m_error ) m_stats.batches.push_back( transferSize );
437
 
438
    // Return true if there are no more files to transfer
439
 
440
    return (file == m_task.files.end());
441
}
442
//---------------------------------------------------------------------------
443
 
444
bool UdTransferThread::TransferUdFile( const AnsiString & filename )
445
{
446
    bool transferred = true;
447
 
448
    if( m_simulators )
449
    {
450
        // Get the next device simulator to send the UD file
451
 
452
        vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
453
        advance( device, ++m_lastDevice );
454
        if( device == m_devices.end() )
455
        {
456
            m_lastDevice = 0;
457
            device = m_devices.begin();
458
        }
459
 
460
        // Can't check the state of the transfer as the transfer is handled
461
        // in another thread. After a batch is sent we check with all the device
462
        // simulator threads for potential transfer problems
463
 
464
        if( (*device) != NULL )
465
        {
466
            (*device)->UploadUDFile( filename );
467
        }
468
    }
469
    else
470
    {
471
        // Simply copy the file to the destination directory
472
 
473
        AnsiString destFile = IncludeTrailingPathDelimiter(m_task.destination) +
474
            ExtractFileName(filename);
475
        transferred = (CopyFile( filename.c_str(), destFile.c_str(), false ) != 0);
476
    }
477
 
478
    return transferred;
479
}
480
//---------------------------------------------------------------------------
481
 
482
int  UdTransferThread::GetBatchSize()
483
{
484
    return m_ramp.running ? m_ramp.batchSize : m_task.batchSize;
485
}
486
//---------------------------------------------------------------------------
487
 
488
void __fastcall UdTransferThread::GetTransferStatistics( UdTransferStatistics & stats )
489
{
490
    stats.completed      = m_stats.completed;
491
    stats.startTime      = m_stats.startTime;
492
    stats.endTime        = m_stats.endTime;
493
    stats.processedFiles = m_stats.processedFiles;
494
    stats.totalFiles     = m_stats.totalFiles;
495
    stats.processedBytes = m_stats.processedBytes;
496
    stats.totalBytes     = m_stats.totalBytes;
497
    stats.batches        = m_stats.batches;
498
}
499
//---------------------------------------------------------------------------
500
 
501
bool UdTransferThread::ConnectDeviceSimulators()
502
{
503
    bool connected = true;
504
 
505
    int number = m_maxDevices;
506
    if( m_task.batchSize < number )
507
    {
508
        number = m_task.batchSize;
509
    }
510
 
511
    // Create and start simulator threads
512
 
513
    for( int i = 0; i < number; i++ )
514
    {
515
        unsigned long deviceId = m_deviceId + i;
516
        if( deviceId <= 0 ) deviceId = 1;
517
 
518
        AnsiString installationId;
519
        installationId.sprintf( "??????????-???-%s-%08d", m_deviceType, deviceId );
520
 
521
        IDevice * device = &m_deviceFactory->createMASSDevice(
522
            m_deviceType.c_str(), deviceId, installationId.c_str() );
523
        if( device != NULL )
524
        {
525
            // Simulator thread object now owns the device
526
 
527
            DeviceSimulatorThread * thread =
528
                new DeviceSimulatorThread( device, m_deviceFactory, m_task.destination );
529
            if( thread != NULL )
530
            {
531
                m_devices.push_back( thread );
532
 
533
                // Wait for the first connection to confirm that the destination host is
534
                // running a site computer, and that the device configuration is correct.
535
                // Subsequent disconnections (if any) will be handled by the device thread.
536
 
537
                int count = m_connectTimeout * 2;
538
 
539
                while( (count >= 0) && !thread->IsConnected() )
540
                {
541
                    count--;
542
                    Sleep( 500 );
543
                }
544
                if( count == 0 )
545
                {
546
                    connected = false;
547
                    break;
548
                }
549
 
550
                // Need a slight delay to confirm that the device has connected all channels
551
                // (CMD/UD/CD) -- if not, we may overlap channels between devices and this can
552
                // take a long time to sort itself out (lots of device disconnections).
553
 
554
                Sleep( 500 );
555
            }
556
        }
557
        if( !m_running ) break;
558
    }
559
 
560
    return connected;
561
}
562
//---------------------------------------------------------------------------
563
 
564
void UdTransferThread::CheckDeviceSimulatorThreads()
565
{
566
    AnsiString error = "";
567
 
568
    // We are expecting all threads to be running here, so a terminal error
569
    // has occurred if this is not the case
570
 
571
    vector<DeviceSimulatorThread *>::iterator device = m_devices.begin();
572
    for( ; device != m_devices.end(); device++ )
573
    {
574
        if( ((*device) != NULL) &&
575
            !(*device)->IsRunning() )
576
        {
577
            error = (*device)->GetErrorMessage();
578
            if( !error.IsEmpty() )
579
            {
580
                break;
581
            }
582
        }
583
    }
584
 
585
    // Abort further processing
586
 
587
    if( !error.IsEmpty() )
588
    {
589
        m_error = true;
590
        m_task.state = UdTransferFailed;
591
        m_manager->UpdateTask( m_task, error );
592
    }
593
}
594
 
595