1 | // |
2 | // TaskManagerTest.cpp |
3 | // |
4 | // Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. |
5 | // and Contributors. |
6 | // |
7 | // SPDX-License-Identifier: BSL-1.0 |
8 | // |
9 | |
10 | |
11 | #include "TaskManagerTest.h" |
12 | #include "Poco/CppUnit/TestCaller.h" |
13 | #include "Poco/CppUnit/TestSuite.h" |
14 | #include "Poco/Exception.h" |
15 | #include "Poco/TaskManager.h" |
16 | #include "Poco/Task.h" |
17 | #include "Poco/TaskNotification.h" |
18 | #include "Poco/NotificationCenter.h" |
19 | #include "Poco/Thread.h" |
20 | #include "Poco/ThreadPool.h" |
21 | #include "Poco/Event.h" |
22 | #include "Poco/Observer.h" |
23 | #include "Poco/Exception.h" |
24 | #include "Poco/AutoPtr.h" |
25 | |
26 | |
27 | using Poco::TaskManager; |
28 | using Poco::Task; |
29 | using Poco::NotificationCenter; |
30 | using Poco::TaskStartedNotification; |
31 | using Poco::TaskCancelledNotification; |
32 | using Poco::TaskFinishedNotification; |
33 | using Poco::TaskFailedNotification; |
34 | using Poco::TaskProgressNotification; |
35 | using Poco::TaskCustomNotification; |
36 | using Poco::Thread; |
37 | using Poco::ThreadPool; |
38 | using Poco::Event; |
39 | using Poco::Observer; |
40 | using Poco::Exception; |
41 | using Poco::NoThreadAvailableException; |
42 | using Poco::SystemException; |
43 | using Poco::NullPointerException; |
44 | using Poco::AutoPtr; |
45 | |
46 | |
47 | namespace |
48 | { |
49 | class TestTask: public Task |
50 | { |
51 | public: |
52 | TestTask(): |
53 | Task("TestTask" ), |
54 | _fail(false) |
55 | { |
56 | } |
57 | |
58 | void runTask() |
59 | { |
60 | _event.wait(); |
61 | setProgress(0.5); |
62 | _event.wait(); |
63 | if (isCancelled()) |
64 | return; |
65 | if (_fail) |
66 | throw SystemException("warp core breach detected" ); |
67 | setProgress(1.0); |
68 | _event.wait(); |
69 | } |
70 | |
71 | void fail() |
72 | { |
73 | _fail = true; |
74 | } |
75 | |
76 | void cont() |
77 | { |
78 | _event.set(); |
79 | } |
80 | |
81 | private: |
82 | Event _event; |
83 | bool _fail; |
84 | }; |
85 | |
86 | class SimpleTask: public Task |
87 | { |
88 | public: |
89 | SimpleTask(): Task("SimpleTask" ) |
90 | { |
91 | } |
92 | |
93 | void runTask() |
94 | { |
95 | sleep(10000); |
96 | } |
97 | }; |
98 | |
99 | class IncludingTask: public Task |
100 | { |
101 | public: |
102 | IncludingTask(): Task("IncludingTask" ) |
103 | { |
104 | } |
105 | |
106 | void runTask() |
107 | { |
108 | setProgress(0.5); |
109 | getOwner()->startSync(new SimpleTask); |
110 | setProgress(1.0); |
111 | } |
112 | }; |
113 | |
114 | class TaskObserver |
115 | { |
116 | public: |
117 | TaskObserver(): |
118 | _started(false), |
119 | _cancelled(false), |
120 | _finished(false), |
121 | _pException(0), |
122 | _progress(0.0) |
123 | { |
124 | } |
125 | |
126 | ~TaskObserver() |
127 | { |
128 | delete _pException; |
129 | } |
130 | |
131 | void taskStarted(TaskStartedNotification* pNf) |
132 | { |
133 | _started = true; |
134 | pNf->release(); |
135 | } |
136 | |
137 | void taskCancelled(TaskCancelledNotification* pNf) |
138 | { |
139 | _cancelled = true; |
140 | pNf->release(); |
141 | } |
142 | |
143 | void taskFinished(TaskFinishedNotification* pNf) |
144 | { |
145 | _finished = true; |
146 | pNf->release(); |
147 | } |
148 | |
149 | void taskFailed(TaskFailedNotification* pNf) |
150 | { |
151 | _pException = pNf->reason().clone(); |
152 | pNf->release(); |
153 | } |
154 | |
155 | void taskProgress(TaskProgressNotification* pNf) |
156 | { |
157 | _progress = pNf->progress(); |
158 | pNf->release(); |
159 | } |
160 | |
161 | bool started() const |
162 | { |
163 | return _started; |
164 | } |
165 | |
166 | bool cancelled() const |
167 | { |
168 | return _cancelled; |
169 | } |
170 | |
171 | bool finished() const |
172 | { |
173 | return _finished; |
174 | } |
175 | |
176 | float progress() const |
177 | { |
178 | return _progress; |
179 | } |
180 | |
181 | Exception* error() const |
182 | { |
183 | return _pException; |
184 | } |
185 | |
186 | private: |
187 | bool _started; |
188 | bool _cancelled; |
189 | bool _finished; |
190 | Exception* _pException; |
191 | float _progress; |
192 | }; |
193 | |
194 | |
195 | template <typename T> |
196 | class CustomNotificationTask: public Task |
197 | { |
198 | public: |
199 | CustomNotificationTask(const T& t): |
200 | Task("CustomNotificationTask" ), |
201 | _custom(t) |
202 | { |
203 | } |
204 | |
205 | void runTask() |
206 | { |
207 | sleep(10000); |
208 | } |
209 | |
210 | void setCustom(const T& custom) |
211 | { |
212 | _custom = custom; |
213 | postNotification(new TaskCustomNotification<T>(this, _custom)); |
214 | } |
215 | |
216 | private: |
217 | T _custom; |
218 | }; |
219 | |
220 | |
221 | template <class C> |
222 | class CustomTaskObserver |
223 | { |
224 | public: |
225 | CustomTaskObserver(const C& custom): _custom(custom) |
226 | { |
227 | } |
228 | |
229 | ~CustomTaskObserver() |
230 | { |
231 | } |
232 | |
233 | void taskCustom(TaskCustomNotification<C>* pNf) |
234 | { |
235 | _custom = pNf->custom(); |
236 | pNf->release(); |
237 | } |
238 | |
239 | const C& custom() const |
240 | { |
241 | return _custom; |
242 | } |
243 | |
244 | private: |
245 | C _custom; |
246 | }; |
247 | |
248 | class SimpleTaskQueue |
249 | { |
250 | public: |
251 | SimpleTaskQueue(TaskManager& tm): _tm(tm) |
252 | { |
253 | _tm.addObserver(Observer<SimpleTaskQueue, TaskFinishedNotification>(*this, &SimpleTaskQueue::taskFinished)); |
254 | } |
255 | |
256 | void enqueue(Task* pTask) |
257 | { |
258 | _tasks.push_back(pTask); |
259 | } |
260 | |
261 | void startQueue() |
262 | { |
263 | if (_tm.count() == 0 && _tasks.size()) |
264 | { |
265 | Task* pTask = _tasks.back(); |
266 | // not thread-safe |
267 | _tasks.pop_back(); |
268 | _tm.start(pTask); |
269 | } |
270 | } |
271 | |
272 | void taskFinished(TaskFinishedNotification* pNf) |
273 | { |
274 | if (_tasks.size()) |
275 | { |
276 | Task* pTask = _tasks.back(); |
277 | // not thread-safe |
278 | _tasks.pop_back(); |
279 | _tm.startSync(pTask); |
280 | } |
281 | pNf->release(); |
282 | } |
283 | |
284 | private: |
285 | std::vector<Task*> _tasks; |
286 | TaskManager& _tm; |
287 | }; |
288 | } |
289 | |
290 | |
291 | TaskManagerTest::TaskManagerTest(const std::string& rName): CppUnit::TestCase(rName) |
292 | { |
293 | } |
294 | |
295 | |
296 | TaskManagerTest::~TaskManagerTest() |
297 | { |
298 | } |
299 | |
300 | |
301 | void TaskManagerTest::testFinish() |
302 | { |
303 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
304 | TaskObserver to; |
305 | tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); |
306 | tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); |
307 | tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed)); |
308 | tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished)); |
309 | tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); |
310 | AutoPtr<TestTask> pTT = new TestTask; |
311 | tm.start(pTT.duplicate()); |
312 | assertTrue (pTT->progress() == 0); |
313 | Thread::sleep(200); |
314 | pTT->cont(); |
315 | while (pTT->progress() != 0.5) Thread::sleep(50); |
316 | assertTrue (to.progress() == 0.5); |
317 | assertTrue (to.started()); |
318 | assertTrue (pTT->state() == Task::TASK_RUNNING); |
319 | TaskManager::TaskList list = tm.taskList(); |
320 | assertTrue (list.size() == 1); |
321 | assertTrue (tm.count() == 1); |
322 | pTT->cont(); |
323 | while (pTT->progress() != 1.0) Thread::sleep(50); |
324 | pTT->cont(); |
325 | while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); |
326 | assertTrue (pTT->state() == Task::TASK_FINISHED); |
327 | while (!to.finished()) Thread::sleep(50); |
328 | assertTrue (to.finished()); |
329 | while (tm.count() == 1) Thread::sleep(50); |
330 | list = tm.taskList(); |
331 | assertTrue (list.empty()); |
332 | assertTrue (!to.error()); |
333 | } |
334 | |
335 | |
336 | void TaskManagerTest::testCancel() |
337 | { |
338 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
339 | TaskObserver to; |
340 | tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); |
341 | tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); |
342 | tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed)); |
343 | tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished)); |
344 | tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); |
345 | AutoPtr<TestTask> pTT = new TestTask; |
346 | tm.start(pTT.duplicate()); |
347 | assertTrue (pTT->progress() == 0); |
348 | Thread::sleep(200); |
349 | pTT->cont(); |
350 | while (pTT->progress() != 0.5) Thread::sleep(50); |
351 | assertTrue (to.progress() == 0.5); |
352 | assertTrue (to.started()); |
353 | assertTrue (pTT->state() == Task::TASK_RUNNING); |
354 | TaskManager::TaskList list = tm.taskList(); |
355 | assertTrue (list.size() == 1); |
356 | assertTrue (tm.count() == 1); |
357 | tm.cancelAll(); |
358 | assertTrue (to.cancelled()); |
359 | pTT->cont(); |
360 | while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); |
361 | assertTrue (pTT->state() == Task::TASK_FINISHED); |
362 | assertTrue (to.finished()); |
363 | while (tm.count() == 1) Thread::sleep(50); |
364 | list = tm.taskList(); |
365 | assertTrue (list.empty()); |
366 | assertTrue (!to.error()); |
367 | } |
368 | |
369 | |
370 | void TaskManagerTest::testError() |
371 | { |
372 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
373 | TaskObserver to; |
374 | tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); |
375 | tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); |
376 | tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed)); |
377 | tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished)); |
378 | tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress)); |
379 | AutoPtr<TestTask> pTT = new TestTask; |
380 | tm.start(pTT.duplicate()); |
381 | assertTrue (pTT->progress() == 0); |
382 | Thread::sleep(200); |
383 | pTT->cont(); |
384 | while (pTT->progress() != 0.5) Thread::sleep(50); |
385 | assertTrue (to.progress() == 0.5); |
386 | assertTrue (to.started()); |
387 | assertTrue (pTT->state() == Task::TASK_RUNNING); |
388 | TaskManager::TaskList list = tm.taskList(); |
389 | assertTrue (list.size() == 1); |
390 | assertTrue (tm.count() == 1); |
391 | pTT->fail(); |
392 | pTT->cont(); |
393 | while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); |
394 | assertTrue (pTT->state() == Task::TASK_FINISHED); |
395 | assertTrue (to.finished()); |
396 | assertTrue (to.error() != 0); |
397 | while (tm.count() == 1) Thread::sleep(50); |
398 | list = tm.taskList(); |
399 | assertTrue (list.empty()); |
400 | } |
401 | |
402 | |
403 | void TaskManagerTest::testCustom() |
404 | { |
405 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
406 | |
407 | CustomTaskObserver<int> ti(0); |
408 | tm.addObserver( |
409 | Observer<CustomTaskObserver<int>, TaskCustomNotification<int> > |
410 | (ti, &CustomTaskObserver<int>::taskCustom)); |
411 | |
412 | AutoPtr<CustomNotificationTask<int> > pCNT1 = new CustomNotificationTask<int>(0); |
413 | tm.start(pCNT1.duplicate()); |
414 | assertTrue (ti.custom() == 0); |
415 | |
416 | for (int i = 1; i < 10; ++i) |
417 | { |
418 | pCNT1->setCustom(i); |
419 | assertTrue (ti.custom() == i); |
420 | } |
421 | |
422 | CustomTaskObserver<std::string> ts("" ); |
423 | tm.addObserver( |
424 | Observer<CustomTaskObserver<std::string>, TaskCustomNotification<std::string> > |
425 | (ts, &CustomTaskObserver<std::string>::taskCustom)); |
426 | |
427 | AutoPtr<CustomNotificationTask<std::string> > pCNT2 = new CustomNotificationTask<std::string>("" ); |
428 | tm.start(pCNT2.duplicate()); |
429 | assertTrue (tm.taskList().size() == 2); |
430 | assertTrue (ts.custom() == "" ); |
431 | std::string str("notify me" ); |
432 | pCNT2->setCustom(str); |
433 | assertTrue (ts.custom() == str); |
434 | |
435 | S s; |
436 | s.i = 0; |
437 | s.str = "" ; |
438 | |
439 | CustomTaskObserver<S*> ptst(&s); |
440 | |
441 | tm.addObserver( |
442 | Observer<CustomTaskObserver<S*>, TaskCustomNotification<S*> > |
443 | (ptst, &CustomTaskObserver<S*>::taskCustom)); |
444 | |
445 | AutoPtr<CustomNotificationTask<S*> > pCNT3 = new CustomNotificationTask<S*>(&s); |
446 | tm.start(pCNT3.duplicate()); |
447 | assertTrue (tm.taskList().size() == 3); |
448 | assertTrue (ptst.custom()->i == 0); |
449 | assertTrue (ptst.custom()->str == "" ); |
450 | s.i = 123; |
451 | s.str = "123" ; |
452 | pCNT3->setCustom(&s); |
453 | assertTrue (ptst.custom()->i == 123); |
454 | assertTrue (ptst.custom()->str == "123" ); |
455 | |
456 | s.i = 0; |
457 | s.str = "" ; |
458 | |
459 | CustomTaskObserver<S> tst(s); |
460 | |
461 | tm.addObserver( |
462 | Observer<CustomTaskObserver<S>, TaskCustomNotification<S> > |
463 | (tst, &CustomTaskObserver<S>::taskCustom)); |
464 | |
465 | AutoPtr<CustomNotificationTask<S> > pCNT4 = new CustomNotificationTask<S>(s); |
466 | tm.start(pCNT4.duplicate()); |
467 | assertTrue (tm.taskList().size() == 4); |
468 | assertTrue (tst.custom().i == 0); |
469 | assertTrue (tst.custom().str == "" ); |
470 | s.i = 123; |
471 | s.str = "123" ; |
472 | pCNT4->setCustom(s); |
473 | assertTrue (tst.custom().i == 123); |
474 | assertTrue (tst.custom().str == "123" ); |
475 | |
476 | AutoPtr<SimpleTask> pST = new SimpleTask; |
477 | tm.start(pST.duplicate()); |
478 | assertTrue (tm.taskList().size() == 5); |
479 | |
480 | tm.cancelAll(); |
481 | while (tm.count() > 0) Thread::sleep(50); |
482 | assertTrue (tm.count() == 0); |
483 | } |
484 | |
485 | |
486 | void TaskManagerTest::testMultiTasks() |
487 | { |
488 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
489 | tm.start(new SimpleTask); |
490 | tm.start(new SimpleTask); |
491 | tm.start(new SimpleTask); |
492 | |
493 | TaskManager::TaskList list = tm.taskList(); |
494 | assertTrue (list.size() == 3); |
495 | |
496 | tm.cancelAll(); |
497 | while (tm.count() > 0) Thread::sleep(100); |
498 | assertTrue (tm.count() == 0); |
499 | } |
500 | |
501 | |
502 | void TaskManagerTest::testTaskInclusion() |
503 | { |
504 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
505 | IncludingTask* pTask = new IncludingTask; |
506 | |
507 | pTask->duplicate(); |
508 | |
509 | tm.start(pTask); |
510 | // wait for the included task to be started |
511 | while (pTask->progress() < 0.5) |
512 | { |
513 | Thread::sleep(100); |
514 | } |
515 | Thread::sleep(100); |
516 | assertTrue (tm.count() == 2); |
517 | |
518 | tm.cancelAll(); |
519 | while (tm.count() > 0) Thread::sleep(100); |
520 | assertTrue (tm.count() == 0); |
521 | } |
522 | |
523 | |
524 | void TaskManagerTest::testTaskQueue() |
525 | { |
526 | TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
527 | SimpleTaskQueue tq(tm); |
528 | |
529 | Task* pT1 = new SimpleTask; |
530 | Task* pT2 = new SimpleTask; |
531 | Task* pT3 = new SimpleTask; |
532 | tq.enqueue(pT1); |
533 | tq.enqueue(pT2); |
534 | tq.startQueue(); |
535 | |
536 | assertTrue (tm.count() == 1); |
537 | Thread::sleep(500); |
538 | assertTrue (pT1->state() == Task::TASK_RUNNING); |
539 | assertTrue (pT2->state() == Task::TASK_IDLE); |
540 | |
541 | tq.enqueue(pT3); |
542 | pT1->cancel(); |
543 | Thread::sleep(500); |
544 | assertTrue (tm.count() == 1); |
545 | assertTrue (pT2->state() == Task::TASK_RUNNING); |
546 | assertTrue (pT3->state() == Task::TASK_IDLE); |
547 | |
548 | pT2->cancel(); |
549 | Thread::sleep(500); |
550 | assertTrue (tm.count() == 1); |
551 | assertTrue (pT3->state() == Task::TASK_RUNNING); |
552 | |
553 | tm.cancelAll(); |
554 | while (tm.count() > 0) Thread::sleep(100); |
555 | assertTrue (tm.count() == 0); |
556 | } |
557 | |
558 | |
559 | void TaskManagerTest::testCustomThreadPool() |
560 | { |
561 | ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::TAP_UNIFORM_DISTRIBUTION); |
562 | TaskManager tm(tp); |
563 | |
564 | // fill up the thread pool |
565 | for (int i=0; i < tp.capacity(); ++i) |
566 | { |
567 | tm.start(new SimpleTask); |
568 | } |
569 | assertTrue (tp.allocated() == tp.capacity()); |
570 | assertTrue (tm.count() == tp.allocated()); |
571 | |
572 | // the next one should fail |
573 | try |
574 | { |
575 | tm.start(new SimpleTask); |
576 | failmsg("thread pool exhausted - must throw exception" ); |
577 | } |
578 | catch (NoThreadAvailableException const&) |
579 | { |
580 | } |
581 | catch (...) |
582 | { |
583 | failmsg("wrong exception thrown" ); |
584 | } |
585 | |
586 | assertTrue (tm.count() == tp.allocated()); |
587 | |
588 | tp.joinAll(); |
589 | } |
590 | |
591 | void TaskManagerTest::setUp() |
592 | { |
593 | } |
594 | |
595 | |
596 | void TaskManagerTest::tearDown() |
597 | { |
598 | } |
599 | |
600 | |
601 | CppUnit::Test* TaskManagerTest::suite() |
602 | { |
603 | CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("TaskManagerTest" ); |
604 | |
605 | CppUnit_addTest(pSuite, TaskManagerTest, testFinish); |
606 | CppUnit_addTest(pSuite, TaskManagerTest, testCancel); |
607 | CppUnit_addTest(pSuite, TaskManagerTest, testError); |
608 | CppUnit_addTest(pSuite, TaskManagerTest, testMultiTasks); |
609 | CppUnit_addTest(pSuite, TaskManagerTest, testCustom); |
610 | CppUnit_addTest(pSuite, TaskManagerTest, testCustomThreadPool); |
611 | |
612 | return pSuite; |
613 | } |
614 | |