1 | // |
2 | // SocketReactor.cpp |
3 | // |
4 | // Library: Net |
5 | // Package: Reactor |
6 | // Module: SocketReactor |
7 | // |
8 | // Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. |
9 | // and Contributors. |
10 | // |
11 | // SPDX-License-Identifier: BSL-1.0 |
12 | // |
13 | |
14 | |
15 | #include "Poco/Net/SocketReactor.h" |
16 | #include "Poco/Net/SocketNotification.h" |
17 | #include "Poco/Net/SocketNotifier.h" |
18 | #include "Poco/ErrorHandler.h" |
19 | #include "Poco/Thread.h" |
20 | #include "Poco/Exception.h" |
21 | #ifdef max |
22 | #undef max |
23 | #endif |
24 | |
25 | using Poco::Exception; |
26 | using Poco::ErrorHandler; |
27 | |
28 | |
29 | namespace Poco { |
30 | namespace Net { |
31 | |
32 | |
33 | SocketReactor::SocketReactor(): |
34 | _stop(false), |
35 | _timeout(DEFAULT_TIMEOUT), |
36 | _pReadableNotification(new ReadableNotification(this)), |
37 | _pWritableNotification(new WritableNotification(this)), |
38 | _pErrorNotification(new ErrorNotification(this)), |
39 | _pTimeoutNotification(new TimeoutNotification(this)), |
40 | _pIdleNotification(new IdleNotification(this)), |
41 | _pShutdownNotification(new ShutdownNotification(this)), |
42 | _pThread(0) |
43 | { |
44 | } |
45 | |
46 | |
47 | SocketReactor::SocketReactor(const Poco::Timespan& timeout): |
48 | _stop(false), |
49 | _timeout(timeout), |
50 | _pReadableNotification(new ReadableNotification(this)), |
51 | _pWritableNotification(new WritableNotification(this)), |
52 | _pErrorNotification(new ErrorNotification(this)), |
53 | _pTimeoutNotification(new TimeoutNotification(this)), |
54 | _pIdleNotification(new IdleNotification(this)), |
55 | _pShutdownNotification(new ShutdownNotification(this)), |
56 | _pThread(0) |
57 | { |
58 | } |
59 | |
60 | |
61 | SocketReactor::~SocketReactor() |
62 | { |
63 | } |
64 | |
65 | |
66 | void SocketReactor::run() |
67 | { |
68 | _pThread = Thread::current(); |
69 | while (!_stop) |
70 | { |
71 | try |
72 | { |
73 | if (!hasSocketHandlers()) |
74 | { |
75 | onIdle(); |
76 | Timespan::TimeDiff ms = _timeout.totalMilliseconds(); |
77 | poco_assert_dbg(ms <= std::numeric_limits<long>::max()); |
78 | Thread::trySleep(static_cast<long>(ms)); |
79 | } |
80 | else |
81 | { |
82 | bool readable = false; |
83 | PollSet::SocketModeMap sm = _pollSet.poll(_timeout); |
84 | if (sm.size() > 0) |
85 | { |
86 | onBusy(); |
87 | PollSet::SocketModeMap::iterator it = sm.begin(); |
88 | PollSet::SocketModeMap::iterator end = sm.end(); |
89 | for (; it != end; ++it) |
90 | { |
91 | if (it->second & PollSet::POLL_READ) |
92 | { |
93 | dispatch(it->first, _pReadableNotification); |
94 | readable = true; |
95 | } |
96 | if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); |
97 | if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); |
98 | } |
99 | } |
100 | if (!readable) onTimeout(); |
101 | } |
102 | } |
103 | catch (Exception& exc) |
104 | { |
105 | ErrorHandler::handle(exc); |
106 | } |
107 | catch (std::exception& exc) |
108 | { |
109 | ErrorHandler::handle(exc); |
110 | } |
111 | catch (...) |
112 | { |
113 | ErrorHandler::handle(); |
114 | } |
115 | } |
116 | onShutdown(); |
117 | } |
118 | |
119 | |
120 | bool SocketReactor::hasSocketHandlers() |
121 | { |
122 | if (!_pollSet.empty()) |
123 | { |
124 | ScopedLock lock(_mutex); |
125 | for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) |
126 | { |
127 | if (it->second->accepts(_pReadableNotification) || |
128 | it->second->accepts(_pWritableNotification) || |
129 | it->second->accepts(_pErrorNotification)) return true; |
130 | } |
131 | } |
132 | |
133 | return false; |
134 | } |
135 | |
136 | |
137 | void SocketReactor::stop() |
138 | { |
139 | _stop = true; |
140 | } |
141 | |
142 | |
143 | void SocketReactor::wakeUp() |
144 | { |
145 | if (_pThread) _pThread->wakeUp(); |
146 | } |
147 | |
148 | |
149 | void SocketReactor::setTimeout(const Poco::Timespan& timeout) |
150 | { |
151 | _timeout = timeout; |
152 | } |
153 | |
154 | |
155 | const Poco::Timespan& SocketReactor::getTimeout() const |
156 | { |
157 | return _timeout; |
158 | } |
159 | |
160 | |
161 | void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) |
162 | { |
163 | NotifierPtr pNotifier = getNotifier(socket, true); |
164 | |
165 | if (!pNotifier->hasObserver(observer)) pNotifier->addObserver(this, observer); |
166 | |
167 | int mode = 0; |
168 | if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ; |
169 | if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE; |
170 | if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR; |
171 | if (mode) _pollSet.add(socket, mode); |
172 | } |
173 | |
174 | |
175 | bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) |
176 | { |
177 | NotifierPtr pNotifier = getNotifier(socket); |
178 | if (!pNotifier) return false; |
179 | if (pNotifier->hasObserver(observer)) return true; |
180 | return false; |
181 | } |
182 | |
183 | |
184 | SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool makeNew) |
185 | { |
186 | ScopedLock lock(_mutex); |
187 | |
188 | EventHandlerMap::iterator it = _handlers.find(socket); |
189 | if (it != _handlers.end()) return it->second; |
190 | else if (makeNew) return (_handlers[socket] = new SocketNotifier(socket)); |
191 | |
192 | return 0; |
193 | } |
194 | |
195 | |
196 | void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) |
197 | { |
198 | NotifierPtr pNotifier = getNotifier(socket); |
199 | if (pNotifier && pNotifier->hasObserver(observer)) |
200 | { |
201 | if(pNotifier->countObservers() == 1) |
202 | { |
203 | { |
204 | ScopedLock lock(_mutex); |
205 | _handlers.erase(socket); |
206 | } |
207 | _pollSet.remove(socket); |
208 | } |
209 | pNotifier->removeObserver(this, observer); |
210 | } |
211 | } |
212 | |
213 | |
214 | bool SocketReactor::has(const Socket& socket) const |
215 | { |
216 | return _pollSet.has(socket); |
217 | } |
218 | |
219 | |
220 | void SocketReactor::onTimeout() |
221 | { |
222 | dispatch(_pTimeoutNotification); |
223 | } |
224 | |
225 | |
226 | void SocketReactor::onIdle() |
227 | { |
228 | dispatch(_pIdleNotification); |
229 | } |
230 | |
231 | |
232 | void SocketReactor::onShutdown() |
233 | { |
234 | dispatch(_pShutdownNotification); |
235 | } |
236 | |
237 | |
238 | void SocketReactor::onBusy() |
239 | { |
240 | } |
241 | |
242 | |
243 | void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification) |
244 | { |
245 | NotifierPtr pNotifier = getNotifier(socket); |
246 | if (!pNotifier) return; |
247 | dispatch(pNotifier, pNotification); |
248 | } |
249 | |
250 | |
251 | void SocketReactor::dispatch(SocketNotification* pNotification) |
252 | { |
253 | std::vector<NotifierPtr> delegates; |
254 | { |
255 | ScopedLock lock(_mutex); |
256 | delegates.reserve(_handlers.size()); |
257 | for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) |
258 | delegates.push_back(it->second); |
259 | } |
260 | for (std::vector<NotifierPtr>::iterator it = delegates.begin(); it != delegates.end(); ++it) |
261 | { |
262 | dispatch(*it, pNotification); |
263 | } |
264 | } |
265 | |
266 | |
267 | void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification) |
268 | { |
269 | try |
270 | { |
271 | pNotifier->dispatch(pNotification); |
272 | } |
273 | catch (Exception& exc) |
274 | { |
275 | ErrorHandler::handle(exc); |
276 | } |
277 | catch (std::exception& exc) |
278 | { |
279 | ErrorHandler::handle(exc); |
280 | } |
281 | catch (...) |
282 | { |
283 | ErrorHandler::handle(); |
284 | } |
285 | } |
286 | |
287 | |
288 | } } // namespace Poco::Net |
289 | |