1//
2// NotificationQueue.cpp
3//
4// This sample demonstrates the NotificationQueue, ThreadPool,
5// FastMutex and ScopedLock classes.
6//
7// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
8// and Contributors.
9//
10// SPDX-License-Identifier: BSL-1.0
11//
12
13
14#include "Poco/Notification.h"
15#include "Poco/NotificationQueue.h"
16#include "Poco/ThreadPool.h"
17#include "Poco/Thread.h"
18#include "Poco/Runnable.h"
19#include "Poco/Mutex.h"
20#include "Poco/Random.h"
21#include "Poco/AutoPtr.h"
22#include <iostream>
23
24
25using Poco::Notification;
26using Poco::NotificationQueue;
27using Poco::ThreadPool;
28using Poco::Thread;
29using Poco::Runnable;
30using Poco::FastMutex;
31using Poco::AutoPtr;
32
33
34class WorkNotification: public Notification
35 // The notification sent to worker threads.
36{
37public:
38 typedef AutoPtr<WorkNotification> Ptr;
39
40 WorkNotification(int workData):
41 _data(workData)
42 {
43 }
44
45 int data() const
46 {
47 return _data;
48 }
49
50private:
51 int _data;
52};
53
54
55class Worker: public Runnable
56 // A worker thread that gets work items
57 // from a NotificationQueue.
58{
59public:
60 Worker(const std::string& name, NotificationQueue& queue):
61 _name(name),
62 _queue(queue)
63 {
64 }
65
66 void run()
67 {
68 Poco::Random rnd;
69 for (;;)
70 {
71 Notification::Ptr pNf(_queue.waitDequeueNotification());
72 if (pNf)
73 {
74 WorkNotification::Ptr pWorkNf = pNf.cast<WorkNotification>();
75 if (pWorkNf)
76 {
77 {
78 FastMutex::ScopedLock lock(_mutex);
79 std::cout << _name << " got work notification " << pWorkNf->data() << std::endl;
80 }
81 Thread::sleep(rnd.next(200));
82 }
83 }
84 else break;
85 }
86 }
87
88private:
89 std::string _name;
90 NotificationQueue& _queue;
91 static FastMutex _mutex;
92};
93
94
95FastMutex Worker::_mutex;
96
97
98int main(int argc, char** argv)
99{
100 NotificationQueue queue;
101
102 // create some worker threads
103 Worker worker1("Worker 1", queue);
104 Worker worker2("Worker 2", queue);
105 Worker worker3("Worker 3", queue);
106
107 // start worker threads
108 ThreadPool::defaultPool().start(worker1);
109 ThreadPool::defaultPool().start(worker2);
110 ThreadPool::defaultPool().start(worker3);
111
112 // distribute some work
113 for (int i = 0; i < 50; ++i)
114 {
115 queue.enqueueNotification(new WorkNotification(i));
116 }
117
118 // wait until queue is empty and all threads are
119 // waiting for new work.
120 while (!queue.empty()) Thread::sleep(200);
121 Thread::sleep(500);
122
123 // stop all worker threads
124 queue.wakeUpAll();
125 ThreadPool::defaultPool().joinAll();
126
127 return 0;
128}
129