1//
2// SocketReactor.cpp
3//
4// Library: Net
5// Package: Reactor
6// Module: SocketReactor
7//
8// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
9// and Contributors.
10//
11// SPDX-License-Identifier: BSL-1.0
12//
13
14
15#include "Poco/Net/SocketReactor.h"
16#include "Poco/Net/SocketNotification.h"
17#include "Poco/Net/SocketNotifier.h"
18#include "Poco/ErrorHandler.h"
19#include "Poco/Thread.h"
20#include "Poco/Exception.h"
21#ifdef max
22#undef max
23#endif
24
25using Poco::FastMutex;
26using Poco::Exception;
27using Poco::ErrorHandler;
28
29
30namespace Poco {
31namespace Net {
32
33
34SocketReactor::SocketReactor():
35 _stop(false),
36 _timeout(DEFAULT_TIMEOUT),
37 _pReadableNotification(new ReadableNotification(this)),
38 _pWritableNotification(new WritableNotification(this)),
39 _pErrorNotification(new ErrorNotification(this)),
40 _pTimeoutNotification(new TimeoutNotification(this)),
41 _pIdleNotification(new IdleNotification(this)),
42 _pShutdownNotification(new ShutdownNotification(this)),
43 _pThread(0)
44{
45}
46
47
48SocketReactor::SocketReactor(const Poco::Timespan& timeout):
49 _stop(false),
50 _timeout(timeout),
51 _pReadableNotification(new ReadableNotification(this)),
52 _pWritableNotification(new WritableNotification(this)),
53 _pErrorNotification(new ErrorNotification(this)),
54 _pTimeoutNotification(new TimeoutNotification(this)),
55 _pIdleNotification(new IdleNotification(this)),
56 _pShutdownNotification(new ShutdownNotification(this)),
57 _pThread(0)
58{
59}
60
61
62SocketReactor::~SocketReactor()
63{
64}
65
66
67void SocketReactor::run()
68{
69 _pThread = Thread::current();
70
71 Socket::SocketList readable;
72 Socket::SocketList writable;
73 Socket::SocketList except;
74
75 while (!_stop)
76 {
77 try
78 {
79 readable.clear();
80 writable.clear();
81 except.clear();
82 int nSockets = 0;
83 {
84 FastMutex::ScopedLock lock(_mutex);
85 for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
86 {
87 if (it->second->accepts(_pReadableNotification))
88 {
89 readable.push_back(it->first);
90 nSockets++;
91 }
92 if (it->second->accepts(_pWritableNotification))
93 {
94 writable.push_back(it->first);
95 nSockets++;
96 }
97 if (it->second->accepts(_pErrorNotification))
98 {
99 except.push_back(it->first);
100 nSockets++;
101 }
102 }
103 }
104 if (nSockets == 0)
105 {
106 onIdle();
107 Timespan::TimeDiff ms = _timeout.totalMilliseconds();
108 poco_assert_dbg(ms <= std::numeric_limits<long>::max());
109 Thread::trySleep(static_cast<long>(ms));
110 }
111 else if (Socket::select(readable, writable, except, _timeout))
112 {
113 onBusy();
114
115 for (Socket::SocketList::iterator it = readable.begin(); it != readable.end(); ++it)
116 dispatch(*it, _pReadableNotification);
117 for (Socket::SocketList::iterator it = writable.begin(); it != writable.end(); ++it)
118 dispatch(*it, _pWritableNotification);
119 for (Socket::SocketList::iterator it = except.begin(); it != except.end(); ++it)
120 dispatch(*it, _pErrorNotification);
121 }
122 else onTimeout();
123 }
124 catch (Exception& exc)
125 {
126 ErrorHandler::handle(exc);
127 }
128 catch (std::exception& exc)
129 {
130 ErrorHandler::handle(exc);
131 }
132 catch (...)
133 {
134 ErrorHandler::handle();
135 }
136 }
137 onShutdown();
138}
139
140
141void SocketReactor::stop()
142{
143 _stop = true;
144}
145
146
147void SocketReactor::wakeUp()
148{
149 if (_pThread) _pThread->wakeUp();
150}
151
152
153void SocketReactor::setTimeout(const Poco::Timespan& timeout)
154{
155 _timeout = timeout;
156}
157
158
159const Poco::Timespan& SocketReactor::getTimeout() const
160{
161 return _timeout;
162}
163
164
165void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
166{
167 NotifierPtr pNotifier;
168 {
169 FastMutex::ScopedLock lock(_mutex);
170
171 EventHandlerMap::iterator it = _handlers.find(socket);
172 if (it == _handlers.end())
173 {
174 pNotifier = new SocketNotifier(socket);
175 _handlers[socket] = pNotifier;
176 }
177 else pNotifier = it->second;
178
179 if (!pNotifier->hasObserver(observer))
180 pNotifier->addObserver(this, observer);
181 }
182}
183
184
185bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
186{
187 NotifierPtr pNotifier;
188 {
189 FastMutex::ScopedLock lock(_mutex);
190
191 EventHandlerMap::iterator it = _handlers.find(socket);
192 if (it != _handlers.end())
193 {
194 if (it->second->hasObserver(observer))
195 return true;
196 }
197 }
198
199 return false;
200}
201
202
203void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
204{
205 NotifierPtr pNotifier;
206 {
207 FastMutex::ScopedLock lock(_mutex);
208
209 EventHandlerMap::iterator it = _handlers.find(socket);
210 if (it != _handlers.end())
211 {
212 pNotifier = it->second;
213 if (pNotifier->hasObserver(observer) && pNotifier->countObservers() == 1)
214 {
215 _handlers.erase(it);
216 }
217 }
218
219 if (pNotifier && pNotifier->hasObserver(observer))
220 {
221 pNotifier->removeObserver(this, observer);
222 }
223 }
224}
225
226
227void SocketReactor::onTimeout()
228{
229 dispatch(_pTimeoutNotification);
230}
231
232
233void SocketReactor::onIdle()
234{
235 dispatch(_pIdleNotification);
236}
237
238
239void SocketReactor::onShutdown()
240{
241 dispatch(_pShutdownNotification);
242}
243
244
245void SocketReactor::onBusy()
246{
247}
248
249
250void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification)
251{
252 NotifierPtr pNotifier;
253 {
254 FastMutex::ScopedLock lock(_mutex);
255 EventHandlerMap::iterator it = _handlers.find(socket);
256 if (it != _handlers.end())
257 pNotifier = it->second;
258 else
259 return;
260 }
261 dispatch(pNotifier, pNotification);
262}
263
264
265void SocketReactor::dispatch(SocketNotification* pNotification)
266{
267 std::vector<NotifierPtr> delegates;
268 delegates.reserve(_handlers.size());
269 {
270 FastMutex::ScopedLock lock(_mutex);
271 for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
272 delegates.push_back(it->second);
273 }
274 for (std::vector<NotifierPtr>::iterator it = delegates.begin(); it != delegates.end(); ++it)
275 {
276 dispatch(*it, pNotification);
277 }
278}
279
280
281void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification)
282{
283 try
284 {
285 pNotifier->dispatch(pNotification);
286 }
287 catch (Exception& exc)
288 {
289 ErrorHandler::handle(exc);
290 }
291 catch (std::exception& exc)
292 {
293 ErrorHandler::handle(exc);
294 }
295 catch (...)
296 {
297 ErrorHandler::handle();
298 }
299}
300
301
302} } // namespace Poco::Net
303