1//
2// TCPServerDispatcher.cpp
3//
4// Library: Net
5// Package: TCPServer
6// Module: TCPServerDispatcher
7//
8// Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
9// and Contributors.
10//
11// SPDX-License-Identifier: BSL-1.0
12//
13
14
15#include "Poco/Net/TCPServerDispatcher.h"
16#include "Poco/Net/TCPServerConnectionFactory.h"
17#include "Poco/Notification.h"
18#include "Poco/AutoPtr.h"
19#include "Poco/ErrorHandler.h"
20#include <memory>
21
22
23using Poco::Notification;
24using Poco::FastMutex;
25using Poco::AutoPtr;
26
27
28namespace Poco {
29namespace Net {
30
31
32class TCPConnectionNotification: public Notification
33{
34public:
35 TCPConnectionNotification(const StreamSocket& socket):
36 _socket(socket)
37 {
38 }
39
40 ~TCPConnectionNotification()
41 {
42 }
43
44 const StreamSocket& socket() const
45 {
46 return _socket;
47 }
48
49private:
50 StreamSocket _socket;
51};
52
53
54TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
55 _rc(1),
56 _pParams(pParams),
57 _currentThreads(0),
58 _totalConnections(0),
59 _currentConnections(0),
60 _maxConcurrentConnections(0),
61 _refusedConnections(0),
62 _stopped(false),
63 _pConnectionFactory(pFactory),
64 _threadPool(threadPool)
65{
66 poco_check_ptr (pFactory);
67
68 if (!_pParams)
69 _pParams = new TCPServerParams;
70
71 if (_pParams->getMaxThreads() == 0)
72 _pParams->setMaxThreads(threadPool.capacity());
73}
74
75
76TCPServerDispatcher::~TCPServerDispatcher()
77{
78}
79
80
81void TCPServerDispatcher::duplicate()
82{
83 ++_rc;
84}
85
86
87void TCPServerDispatcher::release()
88{
89 if (--_rc == 0) delete this;
90}
91
92
93void TCPServerDispatcher::run()
94{
95 AutoPtr<TCPServerDispatcher> guard(this, false); // ensure _rc is decreased when function exits
96
97 int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
98
99 for (;;)
100 {
101 {
102 ThreadCountWatcher tcw(this);
103 try
104 {
105 AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
106 if (pNf)
107 {
108 TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
109 if (pCNf)
110 {
111 std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
112 poco_check_ptr(pConnection.get());
113 beginConnection();
114 pConnection->start();
115 endConnection();
116 }
117 }
118 }
119 catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
120 catch (std::exception &exc) { ErrorHandler::handle(exc); }
121 catch (...) { ErrorHandler::handle(); }
122 }
123 if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
124 }
125}
126
127
128namespace
129{
130 static const std::string threadName("TCPServerConnection");
131}
132
133
134void TCPServerDispatcher::enqueue(const StreamSocket& socket)
135{
136 FastMutex::ScopedLock lock(_mutex);
137
138 if (_queue.size() < _pParams->getMaxQueued())
139 {
140 _queue.enqueueNotification(new TCPConnectionNotification(socket));
141 if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
142 {
143 try
144 {
145 _threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
146 ++_currentThreads;
147 // Ensure this object lives at least until run() starts
148 // Small chance of leaking if threadpool is stopped before this
149 // work runs, but better than a dangling pointer and crash!
150 duplicate();
151 }
152 catch (Poco::Exception&)
153 {
154 // no problem here, connection is already queued
155 // and a new thread might be available later.
156 }
157 }
158 }
159 else
160 {
161 ++_refusedConnections;
162 }
163}
164
165
166void TCPServerDispatcher::stop()
167{
168 _stopped = true;
169 _queue.clear();
170 _queue.wakeUpAll();
171}
172
173
174int TCPServerDispatcher::currentThreads() const
175{
176 return _currentThreads;
177}
178
179int TCPServerDispatcher::maxThreads() const
180{
181 FastMutex::ScopedLock lock(_mutex);
182
183 return _threadPool.capacity();
184}
185
186
187int TCPServerDispatcher::totalConnections() const
188{
189 return _totalConnections;
190}
191
192
193int TCPServerDispatcher::currentConnections() const
194{
195 return _currentConnections;
196}
197
198
199int TCPServerDispatcher::maxConcurrentConnections() const
200{
201 return _maxConcurrentConnections;
202}
203
204
205int TCPServerDispatcher::queuedConnections() const
206{
207 return _queue.size();
208}
209
210
211int TCPServerDispatcher::refusedConnections() const
212{
213 return _refusedConnections;
214}
215
216
217void TCPServerDispatcher::beginConnection()
218{
219 FastMutex::ScopedLock lock(_mutex);
220
221 ++_totalConnections;
222 ++_currentConnections;
223 if (_currentConnections > _maxConcurrentConnections)
224 _maxConcurrentConnections.store(_currentConnections);
225}
226
227
228void TCPServerDispatcher::endConnection()
229{
230 --_currentConnections;
231}
232
233
234} } // namespace Poco::Net
235