1//
2// TaskManagerTest.cpp
3//
4// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
5// and Contributors.
6//
7// SPDX-License-Identifier: BSL-1.0
8//
9
10
11#include "TaskManagerTest.h"
12#include "Poco/CppUnit/TestCaller.h"
13#include "Poco/CppUnit/TestSuite.h"
14#include "Poco/Exception.h"
15#include "Poco/TaskManager.h"
16#include "Poco/Task.h"
17#include "Poco/TaskNotification.h"
18#include "Poco/NotificationCenter.h"
19#include "Poco/Thread.h"
20#include "Poco/ThreadPool.h"
21#include "Poco/Event.h"
22#include "Poco/Observer.h"
23#include "Poco/Exception.h"
24#include "Poco/AutoPtr.h"
25
26
27using Poco::TaskManager;
28using Poco::Task;
29using Poco::NotificationCenter;
30using Poco::TaskStartedNotification;
31using Poco::TaskCancelledNotification;
32using Poco::TaskFinishedNotification;
33using Poco::TaskFailedNotification;
34using Poco::TaskProgressNotification;
35using Poco::TaskCustomNotification;
36using Poco::Thread;
37using Poco::ThreadPool;
38using Poco::Event;
39using Poco::Observer;
40using Poco::Exception;
41using Poco::NoThreadAvailableException;
42using Poco::SystemException;
43using Poco::NullPointerException;
44using Poco::AutoPtr;
45
46
47namespace
48{
49 class TestTask: public Task
50 {
51 public:
52 TestTask():
53 Task("TestTask"),
54 _fail(false)
55 {
56 }
57
58 void runTask()
59 {
60 _event.wait();
61 setProgress(0.5);
62 _event.wait();
63 if (isCancelled())
64 return;
65 if (_fail)
66 throw SystemException("warp core breach detected");
67 setProgress(1.0);
68 _event.wait();
69 }
70
71 void fail()
72 {
73 _fail = true;
74 }
75
76 void cont()
77 {
78 _event.set();
79 }
80
81 private:
82 Event _event;
83 bool _fail;
84 };
85
86 class SimpleTask: public Task
87 {
88 public:
89 SimpleTask(): Task("SimpleTask")
90 {
91 }
92
93 void runTask()
94 {
95 sleep(10000);
96 }
97 };
98
99 class IncludingTask: public Task
100 {
101 public:
102 IncludingTask(): Task("IncludingTask")
103 {
104 }
105
106 void runTask()
107 {
108 setProgress(0.5);
109 getOwner()->startSync(new SimpleTask);
110 setProgress(1.0);
111 }
112 };
113
114 class TaskObserver
115 {
116 public:
117 TaskObserver():
118 _started(false),
119 _cancelled(false),
120 _finished(false),
121 _pException(0),
122 _progress(0.0)
123 {
124 }
125
126 ~TaskObserver()
127 {
128 delete _pException;
129 }
130
131 void taskStarted(TaskStartedNotification* pNf)
132 {
133 _started = true;
134 pNf->release();
135 }
136
137 void taskCancelled(TaskCancelledNotification* pNf)
138 {
139 _cancelled = true;
140 pNf->release();
141 }
142
143 void taskFinished(TaskFinishedNotification* pNf)
144 {
145 _finished = true;
146 pNf->release();
147 }
148
149 void taskFailed(TaskFailedNotification* pNf)
150 {
151 _pException = pNf->reason().clone();
152 pNf->release();
153 }
154
155 void taskProgress(TaskProgressNotification* pNf)
156 {
157 _progress = pNf->progress();
158 pNf->release();
159 }
160
161 bool started() const
162 {
163 return _started;
164 }
165
166 bool cancelled() const
167 {
168 return _cancelled;
169 }
170
171 bool finished() const
172 {
173 return _finished;
174 }
175
176 float progress() const
177 {
178 return _progress;
179 }
180
181 Exception* error() const
182 {
183 return _pException;
184 }
185
186 private:
187 bool _started;
188 bool _cancelled;
189 bool _finished;
190 Exception* _pException;
191 float _progress;
192 };
193
194
195 template <typename T>
196 class CustomNotificationTask: public Task
197 {
198 public:
199 CustomNotificationTask(const T& t):
200 Task("CustomNotificationTask"),
201 _custom(t)
202 {
203 }
204
205 void runTask()
206 {
207 sleep(10000);
208 }
209
210 void setCustom(const T& custom)
211 {
212 _custom = custom;
213 postNotification(new TaskCustomNotification<T>(this, _custom));
214 }
215
216 private:
217 T _custom;
218 };
219
220
221 template <class C>
222 class CustomTaskObserver
223 {
224 public:
225 CustomTaskObserver(const C& custom): _custom(custom)
226 {
227 }
228
229 ~CustomTaskObserver()
230 {
231 }
232
233 void taskCustom(TaskCustomNotification<C>* pNf)
234 {
235 _custom = pNf->custom();
236 pNf->release();
237 }
238
239 const C& custom() const
240 {
241 return _custom;
242 }
243
244 private:
245 C _custom;
246 };
247
248 class SimpleTaskQueue
249 {
250 public:
251 SimpleTaskQueue(TaskManager& tm): _tm(tm)
252 {
253 _tm.addObserver(Observer<SimpleTaskQueue, TaskFinishedNotification>(*this, &SimpleTaskQueue::taskFinished));
254 }
255
256 void enqueue(Task* pTask)
257 {
258 _tasks.push_back(pTask);
259 }
260
261 void startQueue()
262 {
263 if (_tm.count() == 0 && _tasks.size())
264 {
265 Task* pTask = _tasks.back();
266 // not thread-safe
267 _tasks.pop_back();
268 _tm.start(pTask);
269 }
270 }
271
272 void taskFinished(TaskFinishedNotification* pNf)
273 {
274 if (_tasks.size())
275 {
276 Task* pTask = _tasks.back();
277 // not thread-safe
278 _tasks.pop_back();
279 _tm.startSync(pTask);
280 }
281 pNf->release();
282 }
283
284 private:
285 std::vector<Task*> _tasks;
286 TaskManager& _tm;
287 };
288}
289
290
291TaskManagerTest::TaskManagerTest(const std::string& rName): CppUnit::TestCase(rName)
292{
293}
294
295
296TaskManagerTest::~TaskManagerTest()
297{
298}
299
300
301void TaskManagerTest::testFinish()
302{
303 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
304 TaskObserver to;
305 tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
306 tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
307 tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
308 tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
309 tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
310 AutoPtr<TestTask> pTT = new TestTask;
311 tm.start(pTT.duplicate());
312 assertTrue (pTT->progress() == 0);
313 Thread::sleep(200);
314 pTT->cont();
315 while (pTT->progress() != 0.5) Thread::sleep(50);
316 assertTrue (to.progress() == 0.5);
317 assertTrue (to.started());
318 assertTrue (pTT->state() == Task::TASK_RUNNING);
319 TaskManager::TaskList list = tm.taskList();
320 assertTrue (list.size() == 1);
321 assertTrue (tm.count() == 1);
322 pTT->cont();
323 while (pTT->progress() != 1.0) Thread::sleep(50);
324 pTT->cont();
325 while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
326 assertTrue (pTT->state() == Task::TASK_FINISHED);
327 while (!to.finished()) Thread::sleep(50);
328 assertTrue (to.finished());
329 while (tm.count() == 1) Thread::sleep(50);
330 list = tm.taskList();
331 assertTrue (list.empty());
332 assertTrue (!to.error());
333}
334
335
336void TaskManagerTest::testCancel()
337{
338 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
339 TaskObserver to;
340 tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
341 tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
342 tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
343 tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
344 tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
345 AutoPtr<TestTask> pTT = new TestTask;
346 tm.start(pTT.duplicate());
347 assertTrue (pTT->progress() == 0);
348 Thread::sleep(200);
349 pTT->cont();
350 while (pTT->progress() != 0.5) Thread::sleep(50);
351 assertTrue (to.progress() == 0.5);
352 assertTrue (to.started());
353 assertTrue (pTT->state() == Task::TASK_RUNNING);
354 TaskManager::TaskList list = tm.taskList();
355 assertTrue (list.size() == 1);
356 assertTrue (tm.count() == 1);
357 tm.cancelAll();
358 assertTrue (to.cancelled());
359 pTT->cont();
360 while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
361 assertTrue (pTT->state() == Task::TASK_FINISHED);
362 assertTrue (to.finished());
363 while (tm.count() == 1) Thread::sleep(50);
364 list = tm.taskList();
365 assertTrue (list.empty());
366 assertTrue (!to.error());
367}
368
369
370void TaskManagerTest::testError()
371{
372 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
373 TaskObserver to;
374 tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
375 tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
376 tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
377 tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
378 tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
379 AutoPtr<TestTask> pTT = new TestTask;
380 tm.start(pTT.duplicate());
381 assertTrue (pTT->progress() == 0);
382 Thread::sleep(200);
383 pTT->cont();
384 while (pTT->progress() != 0.5) Thread::sleep(50);
385 assertTrue (to.progress() == 0.5);
386 assertTrue (to.started());
387 assertTrue (pTT->state() == Task::TASK_RUNNING);
388 TaskManager::TaskList list = tm.taskList();
389 assertTrue (list.size() == 1);
390 assertTrue (tm.count() == 1);
391 pTT->fail();
392 pTT->cont();
393 while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
394 assertTrue (pTT->state() == Task::TASK_FINISHED);
395 assertTrue (to.finished());
396 assertTrue (to.error() != 0);
397 while (tm.count() == 1) Thread::sleep(50);
398 list = tm.taskList();
399 assertTrue (list.empty());
400}
401
402
403void TaskManagerTest::testCustom()
404{
405 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
406
407 CustomTaskObserver<int> ti(0);
408 tm.addObserver(
409 Observer<CustomTaskObserver<int>, TaskCustomNotification<int> >
410 (ti, &CustomTaskObserver<int>::taskCustom));
411
412 AutoPtr<CustomNotificationTask<int> > pCNT1 = new CustomNotificationTask<int>(0);
413 tm.start(pCNT1.duplicate());
414 assertTrue (ti.custom() == 0);
415
416 for (int i = 1; i < 10; ++i)
417 {
418 pCNT1->setCustom(i);
419 assertTrue (ti.custom() == i);
420 }
421
422 CustomTaskObserver<std::string> ts("");
423 tm.addObserver(
424 Observer<CustomTaskObserver<std::string>, TaskCustomNotification<std::string> >
425 (ts, &CustomTaskObserver<std::string>::taskCustom));
426
427 AutoPtr<CustomNotificationTask<std::string> > pCNT2 = new CustomNotificationTask<std::string>("");
428 tm.start(pCNT2.duplicate());
429 assertTrue (tm.taskList().size() == 2);
430 assertTrue (ts.custom() == "");
431 std::string str("notify me");
432 pCNT2->setCustom(str);
433 assertTrue (ts.custom() == str);
434
435 S s;
436 s.i = 0;
437 s.str = "";
438
439 CustomTaskObserver<S*> ptst(&s);
440
441 tm.addObserver(
442 Observer<CustomTaskObserver<S*>, TaskCustomNotification<S*> >
443 (ptst, &CustomTaskObserver<S*>::taskCustom));
444
445 AutoPtr<CustomNotificationTask<S*> > pCNT3 = new CustomNotificationTask<S*>(&s);
446 tm.start(pCNT3.duplicate());
447 assertTrue (tm.taskList().size() == 3);
448 assertTrue (ptst.custom()->i == 0);
449 assertTrue (ptst.custom()->str == "");
450 s.i = 123;
451 s.str = "123";
452 pCNT3->setCustom(&s);
453 assertTrue (ptst.custom()->i == 123);
454 assertTrue (ptst.custom()->str == "123");
455
456 s.i = 0;
457 s.str = "";
458
459 CustomTaskObserver<S> tst(s);
460
461 tm.addObserver(
462 Observer<CustomTaskObserver<S>, TaskCustomNotification<S> >
463 (tst, &CustomTaskObserver<S>::taskCustom));
464
465 AutoPtr<CustomNotificationTask<S> > pCNT4 = new CustomNotificationTask<S>(s);
466 tm.start(pCNT4.duplicate());
467 assertTrue (tm.taskList().size() == 4);
468 assertTrue (tst.custom().i == 0);
469 assertTrue (tst.custom().str == "");
470 s.i = 123;
471 s.str = "123";
472 pCNT4->setCustom(s);
473 assertTrue (tst.custom().i == 123);
474 assertTrue (tst.custom().str == "123");
475
476 AutoPtr<SimpleTask> pST = new SimpleTask;
477 tm.start(pST.duplicate());
478 assertTrue (tm.taskList().size() == 5);
479
480 tm.cancelAll();
481 while (tm.count() > 0) Thread::sleep(50);
482 assertTrue (tm.count() == 0);
483}
484
485
486void TaskManagerTest::testMultiTasks()
487{
488 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
489 tm.start(new SimpleTask);
490 tm.start(new SimpleTask);
491 tm.start(new SimpleTask);
492
493 TaskManager::TaskList list = tm.taskList();
494 assertTrue (list.size() == 3);
495
496 tm.cancelAll();
497 while (tm.count() > 0) Thread::sleep(100);
498 assertTrue (tm.count() == 0);
499}
500
501
502void TaskManagerTest::testTaskInclusion()
503{
504 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
505 IncludingTask* pTask = new IncludingTask;
506
507 pTask->duplicate();
508
509 tm.start(pTask);
510 // wait for the included task to be started
511 while (pTask->progress() < 0.5)
512 {
513 Thread::sleep(100);
514 }
515 Thread::sleep(100);
516 assertTrue (tm.count() == 2);
517
518 tm.cancelAll();
519 while (tm.count() > 0) Thread::sleep(100);
520 assertTrue (tm.count() == 0);
521}
522
523
524void TaskManagerTest::testTaskQueue()
525{
526 TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
527 SimpleTaskQueue tq(tm);
528
529 Task* pT1 = new SimpleTask;
530 Task* pT2 = new SimpleTask;
531 Task* pT3 = new SimpleTask;
532 tq.enqueue(pT1);
533 tq.enqueue(pT2);
534 tq.startQueue();
535
536 assertTrue (tm.count() == 1);
537 Thread::sleep(500);
538 assertTrue (pT1->state() == Task::TASK_RUNNING);
539 assertTrue (pT2->state() == Task::TASK_IDLE);
540
541 tq.enqueue(pT3);
542 pT1->cancel();
543 Thread::sleep(500);
544 assertTrue (tm.count() == 1);
545 assertTrue (pT2->state() == Task::TASK_RUNNING);
546 assertTrue (pT3->state() == Task::TASK_IDLE);
547
548 pT2->cancel();
549 Thread::sleep(500);
550 assertTrue (tm.count() == 1);
551 assertTrue (pT3->state() == Task::TASK_RUNNING);
552
553 tm.cancelAll();
554 while (tm.count() > 0) Thread::sleep(100);
555 assertTrue (tm.count() == 0);
556}
557
558
559void TaskManagerTest::testCustomThreadPool()
560{
561 ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::TAP_UNIFORM_DISTRIBUTION);
562 TaskManager tm(tp);
563
564 // fill up the thread pool
565 for (int i=0; i < tp.capacity(); ++i)
566 {
567 tm.start(new SimpleTask);
568 }
569 assertTrue (tp.allocated() == tp.capacity());
570 assertTrue (tm.count() == tp.allocated());
571
572 // the next one should fail
573 try
574 {
575 tm.start(new SimpleTask);
576 failmsg("thread pool exhausted - must throw exception");
577 }
578 catch (NoThreadAvailableException const&)
579 {
580 }
581 catch (...)
582 {
583 failmsg("wrong exception thrown");
584 }
585
586 assertTrue (tm.count() == tp.allocated());
587
588 tp.joinAll();
589}
590
591void TaskManagerTest::setUp()
592{
593}
594
595
596void TaskManagerTest::tearDown()
597{
598}
599
600
601CppUnit::Test* TaskManagerTest::suite()
602{
603 CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("TaskManagerTest");
604
605 CppUnit_addTest(pSuite, TaskManagerTest, testFinish);
606 CppUnit_addTest(pSuite, TaskManagerTest, testCancel);
607 CppUnit_addTest(pSuite, TaskManagerTest, testError);
608 CppUnit_addTest(pSuite, TaskManagerTest, testMultiTasks);
609 CppUnit_addTest(pSuite, TaskManagerTest, testCustom);
610 CppUnit_addTest(pSuite, TaskManagerTest, testCustomThreadPool);
611
612 return pSuite;
613}
614