1//
2// SocketReactor.cpp
3//
4// Library: Net
5// Package: Reactor
6// Module: SocketReactor
7//
8// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
9// and Contributors.
10//
11// SPDX-License-Identifier: BSL-1.0
12//
13
14
15#include "Poco/Net/SocketReactor.h"
16#include "Poco/Net/SocketNotification.h"
17#include "Poco/Net/SocketNotifier.h"
18#include "Poco/ErrorHandler.h"
19#include "Poco/Thread.h"
20#include "Poco/Exception.h"
21#ifdef max
22#undef max
23#endif
24
25using Poco::Exception;
26using Poco::ErrorHandler;
27
28
29namespace Poco {
30namespace Net {
31
32
33SocketReactor::SocketReactor():
34 _stop(false),
35 _timeout(DEFAULT_TIMEOUT),
36 _pReadableNotification(new ReadableNotification(this)),
37 _pWritableNotification(new WritableNotification(this)),
38 _pErrorNotification(new ErrorNotification(this)),
39 _pTimeoutNotification(new TimeoutNotification(this)),
40 _pIdleNotification(new IdleNotification(this)),
41 _pShutdownNotification(new ShutdownNotification(this)),
42 _pThread(0)
43{
44}
45
46
47SocketReactor::SocketReactor(const Poco::Timespan& timeout):
48 _stop(false),
49 _timeout(timeout),
50 _pReadableNotification(new ReadableNotification(this)),
51 _pWritableNotification(new WritableNotification(this)),
52 _pErrorNotification(new ErrorNotification(this)),
53 _pTimeoutNotification(new TimeoutNotification(this)),
54 _pIdleNotification(new IdleNotification(this)),
55 _pShutdownNotification(new ShutdownNotification(this)),
56 _pThread(0)
57{
58}
59
60
61SocketReactor::~SocketReactor()
62{
63}
64
65
66void SocketReactor::run()
67{
68 _pThread = Thread::current();
69 while (!_stop)
70 {
71 try
72 {
73 if (!hasSocketHandlers())
74 {
75 onIdle();
76 Timespan::TimeDiff ms = _timeout.totalMilliseconds();
77 poco_assert_dbg(ms <= std::numeric_limits<long>::max());
78 Thread::trySleep(static_cast<long>(ms));
79 }
80 else
81 {
82 bool readable = false;
83 PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
84 if (sm.size() > 0)
85 {
86 onBusy();
87 PollSet::SocketModeMap::iterator it = sm.begin();
88 PollSet::SocketModeMap::iterator end = sm.end();
89 for (; it != end; ++it)
90 {
91 if (it->second & PollSet::POLL_READ)
92 {
93 dispatch(it->first, _pReadableNotification);
94 readable = true;
95 }
96 if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification);
97 if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification);
98 }
99 }
100 if (!readable) onTimeout();
101 }
102 }
103 catch (Exception& exc)
104 {
105 ErrorHandler::handle(exc);
106 }
107 catch (std::exception& exc)
108 {
109 ErrorHandler::handle(exc);
110 }
111 catch (...)
112 {
113 ErrorHandler::handle();
114 }
115 }
116 onShutdown();
117}
118
119
120bool SocketReactor::hasSocketHandlers()
121{
122 if (!_pollSet.empty())
123 {
124 ScopedLock lock(_mutex);
125 for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
126 {
127 if (it->second->accepts(_pReadableNotification) ||
128 it->second->accepts(_pWritableNotification) ||
129 it->second->accepts(_pErrorNotification)) return true;
130 }
131 }
132
133 return false;
134}
135
136
137void SocketReactor::stop()
138{
139 _stop = true;
140}
141
142
143void SocketReactor::wakeUp()
144{
145 if (_pThread) _pThread->wakeUp();
146}
147
148
149void SocketReactor::setTimeout(const Poco::Timespan& timeout)
150{
151 _timeout = timeout;
152}
153
154
155const Poco::Timespan& SocketReactor::getTimeout() const
156{
157 return _timeout;
158}
159
160
161void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
162{
163 NotifierPtr pNotifier = getNotifier(socket, true);
164
165 if (!pNotifier->hasObserver(observer)) pNotifier->addObserver(this, observer);
166
167 int mode = 0;
168 if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ;
169 if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE;
170 if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR;
171 if (mode) _pollSet.add(socket, mode);
172}
173
174
175bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
176{
177 NotifierPtr pNotifier = getNotifier(socket);
178 if (!pNotifier) return false;
179 if (pNotifier->hasObserver(observer)) return true;
180 return false;
181}
182
183
184SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool makeNew)
185{
186 ScopedLock lock(_mutex);
187
188 EventHandlerMap::iterator it = _handlers.find(socket);
189 if (it != _handlers.end()) return it->second;
190 else if (makeNew) return (_handlers[socket] = new SocketNotifier(socket));
191
192 return 0;
193}
194
195
196void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
197{
198 NotifierPtr pNotifier = getNotifier(socket);
199 if (pNotifier && pNotifier->hasObserver(observer))
200 {
201 if(pNotifier->countObservers() == 1)
202 {
203 {
204 ScopedLock lock(_mutex);
205 _handlers.erase(socket);
206 }
207 _pollSet.remove(socket);
208 }
209 pNotifier->removeObserver(this, observer);
210 }
211}
212
213
214bool SocketReactor::has(const Socket& socket) const
215{
216 return _pollSet.has(socket);
217}
218
219
220void SocketReactor::onTimeout()
221{
222 dispatch(_pTimeoutNotification);
223}
224
225
226void SocketReactor::onIdle()
227{
228 dispatch(_pIdleNotification);
229}
230
231
232void SocketReactor::onShutdown()
233{
234 dispatch(_pShutdownNotification);
235}
236
237
238void SocketReactor::onBusy()
239{
240}
241
242
243void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification)
244{
245 NotifierPtr pNotifier = getNotifier(socket);
246 if (!pNotifier) return;
247 dispatch(pNotifier, pNotification);
248}
249
250
251void SocketReactor::dispatch(SocketNotification* pNotification)
252{
253 std::vector<NotifierPtr> delegates;
254 {
255 ScopedLock lock(_mutex);
256 delegates.reserve(_handlers.size());
257 for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
258 delegates.push_back(it->second);
259 }
260 for (std::vector<NotifierPtr>::iterator it = delegates.begin(); it != delegates.end(); ++it)
261 {
262 dispatch(*it, pNotification);
263 }
264}
265
266
267void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification)
268{
269 try
270 {
271 pNotifier->dispatch(pNotification);
272 }
273 catch (Exception& exc)
274 {
275 ErrorHandler::handle(exc);
276 }
277 catch (std::exception& exc)
278 {
279 ErrorHandler::handle(exc);
280 }
281 catch (...)
282 {
283 ErrorHandler::handle();
284 }
285}
286
287
288} } // namespace Poco::Net
289