1//
2// TCPServerDispatcher.cpp
3//
4// Library: Net
5// Package: TCPServer
6// Module: TCPServerDispatcher
7//
8// Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
9// and Contributors.
10//
11// SPDX-License-Identifier: BSL-1.0
12//
13
14
15#include "Poco/Net/TCPServerDispatcher.h"
16#include "Poco/Net/TCPServerConnectionFactory.h"
17#include "Poco/Notification.h"
18#include "Poco/AutoPtr.h"
19#include "Poco/ErrorHandler.h"
20#include <memory>
21
22
23using Poco::Notification;
24using Poco::FastMutex;
25using Poco::AutoPtr;
26
27
28namespace Poco {
29namespace Net {
30
31
32class TCPConnectionNotification: public Notification
33{
34public:
35 TCPConnectionNotification(const StreamSocket& socket):
36 _socket(socket)
37 {
38 }
39
40 ~TCPConnectionNotification()
41 {
42 }
43
44 const StreamSocket& socket() const
45 {
46 return _socket;
47 }
48
49private:
50 StreamSocket _socket;
51};
52
53
54TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
55 _rc(1),
56 _pParams(pParams),
57 _currentThreads(0),
58 _totalConnections(0),
59 _currentConnections(0),
60 _maxConcurrentConnections(0),
61 _refusedConnections(0),
62 _stopped(false),
63 _pConnectionFactory(pFactory),
64 _threadPool(threadPool)
65{
66 poco_check_ptr (pFactory);
67
68 if (!_pParams)
69 _pParams = new TCPServerParams;
70
71 if (_pParams->getMaxThreads() == 0)
72 _pParams->setMaxThreads(threadPool.capacity());
73}
74
75
76TCPServerDispatcher::~TCPServerDispatcher()
77{
78}
79
80
81void TCPServerDispatcher::duplicate()
82{
83 ++_rc;
84}
85
86
87void TCPServerDispatcher::release()
88{
89 if (--_rc == 0) delete this;
90}
91
92
93void TCPServerDispatcher::run()
94{
95 AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
96
97 int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
98
99 for (;;)
100 {
101 {
102 ThreadCountWatcher tcw(this);
103 try
104 {
105 AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
106 if (pNf)
107 {
108 TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
109 if (pCNf)
110 {
111 std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
112 poco_check_ptr(pConnection.get());
113 beginConnection();
114 pConnection->start();
115 endConnection();
116 }
117 }
118 }
119 catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
120 catch (std::exception &exc) { ErrorHandler::handle(exc); }
121 catch (...) { ErrorHandler::handle(); }
122 }
123 if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
124 }
125}
126
127
128namespace
129{
130 static const std::string threadName("TCPServerConnection");
131}
132
133
134void TCPServerDispatcher::enqueue(const StreamSocket& socket)
135{
136 FastMutex::ScopedLock lock(_mutex);
137
138 if (_queue.size() < _pParams->getMaxQueued())
139 {
140 if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
141 {
142 try
143 {
144 _threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
145 ++_currentThreads;
146 }
147 catch (Poco::Exception&)
148 {
149 ++_refusedConnections;
150 return;
151 }
152 }
153 _queue.enqueueNotification(new TCPConnectionNotification(socket));
154 }
155 else
156 {
157 ++_refusedConnections;
158 }
159}
160
161
162void TCPServerDispatcher::stop()
163{
164 _stopped = true;
165 _queue.clear();
166 _queue.wakeUpAll();
167}
168
169
170int TCPServerDispatcher::currentThreads() const
171{
172 return _currentThreads;
173}
174
175int TCPServerDispatcher::maxThreads() const
176{
177 FastMutex::ScopedLock lock(_mutex);
178
179 return _threadPool.capacity();
180}
181
182
183int TCPServerDispatcher::totalConnections() const
184{
185 return _totalConnections;
186}
187
188
189int TCPServerDispatcher::currentConnections() const
190{
191 return _currentConnections;
192}
193
194
195int TCPServerDispatcher::maxConcurrentConnections() const
196{
197 return _maxConcurrentConnections;
198}
199
200
201int TCPServerDispatcher::queuedConnections() const
202{
203 return _queue.size();
204}
205
206
207int TCPServerDispatcher::refusedConnections() const
208{
209 return _refusedConnections;
210}
211
212
213void TCPServerDispatcher::beginConnection()
214{
215 FastMutex::ScopedLock lock(_mutex);
216
217 ++_totalConnections;
218 ++_currentConnections;
219 if (_currentConnections > _maxConcurrentConnections)
220 _maxConcurrentConnections.store(_currentConnections);
221}
222
223
224void TCPServerDispatcher::endConnection()
225{
226 --_currentConnections;
227}
228
229
230} } // namespace Poco::Net
231