1//
2// ThreadPool.cpp
3//
4// Library: Foundation
5// Package: Threading
6// Module: ThreadPool
7//
8// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
9// and Contributors.
10//
11// SPDX-License-Identifier: BSL-1.0
12//
13
14
15#include "Poco/ThreadPool.h"
16#include "Poco/Runnable.h"
17#include "Poco/Thread.h"
18#include "Poco/Event.h"
19#include "Poco/ThreadLocal.h"
20#include "Poco/ErrorHandler.h"
21#include <sstream>
22#include <ctime>
23#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
24#include "wce_time.h"
25#endif
26
27
28namespace Poco {
29
30
31class PooledThread: public Runnable
32{
33public:
34 PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
35 ~PooledThread();
36
37 void start(int cpu = -1);
38 void start(Thread::Priority priority, Runnable& target, int cpu = -1);
39 void start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu = -1);
40 bool idle();
41 int idleTime();
42 void join();
43 void activate();
44 void release();
45 void run();
46
47private:
48 bool _idle;
49 std::time_t _idleTime;
50 Runnable* _pTarget;
51 std::string _name;
52 Thread _thread;
53 Event _targetReady;
54 Event _targetCompleted;
55 Event _started;
56 FastMutex _mutex;
57};
58
59
60PooledThread::PooledThread(const std::string& name, int stackSize):
61 _idle(true),
62 _idleTime(0),
63 _pTarget(0),
64 _name(name),
65 _thread(name),
66 _targetReady(),
67 _targetCompleted(Event::EVENT_MANUALRESET),
68 _started(),
69 _mutex()
70{
71 poco_assert_dbg (stackSize >= 0);
72 _thread.setStackSize(stackSize);
73#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
74 _idleTime = wceex_time(NULL);
75#else
76 _idleTime = std::time(NULL);
77#endif
78}
79
80
81PooledThread::~PooledThread()
82{
83}
84
85
86void PooledThread::start(int cpu)
87{
88 _thread.start(*this);
89 _started.wait();
90 if (cpu >= 0)
91 {
92 _thread.setAffinity(static_cast<unsigned>(cpu));
93 }
94}
95
96
97void PooledThread::start(Thread::Priority priority, Runnable& target, int cpu)
98{
99 FastMutex::ScopedLock lock(_mutex);
100
101 poco_assert (_pTarget == 0);
102
103 _pTarget = &target;
104 _thread.setPriority(priority);
105 _targetReady.set();
106 if (cpu >= 0)
107 {
108 _thread.setAffinity(static_cast<unsigned>(cpu));
109 }
110}
111
112
113void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
114{
115 FastMutex::ScopedLock lock(_mutex);
116
117 std::string fullName(name);
118 if (name.empty())
119 {
120 fullName = _name;
121 }
122 else
123 {
124 fullName.append(" (");
125 fullName.append(_name);
126 fullName.append(")");
127 }
128 _thread.setName(fullName);
129 _thread.setPriority(priority);
130
131 poco_assert (_pTarget == 0);
132
133 _pTarget = &target;
134 _targetReady.set();
135 if (cpu >= 0)
136 {
137 _thread.setAffinity(static_cast<unsigned>(cpu));
138 }
139}
140
141
142inline bool PooledThread::idle()
143{
144 FastMutex::ScopedLock lock(_mutex);
145 return _idle;
146}
147
148
149int PooledThread::idleTime()
150{
151 FastMutex::ScopedLock lock(_mutex);
152
153#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
154 return (int) (wceex_time(NULL) - _idleTime);
155#else
156 return (int) (time(NULL) - _idleTime);
157#endif
158}
159
160
161void PooledThread::join()
162{
163 _mutex.lock();
164 Runnable* pTarget = _pTarget;
165 _mutex.unlock();
166 if (pTarget)
167 _targetCompleted.wait();
168}
169
170
171void PooledThread::activate()
172{
173 FastMutex::ScopedLock lock(_mutex);
174
175 poco_assert (_idle);
176 _idle = false;
177 _targetCompleted.reset();
178}
179
180
181void PooledThread::release()
182{
183 const long JOIN_TIMEOUT = 10000;
184
185 _mutex.lock();
186 _pTarget = 0;
187 _mutex.unlock();
188 // In case of a statically allocated thread pool (such
189 // as the default thread pool), Windows may have already
190 // terminated the thread before we got here.
191 if (_thread.isRunning())
192 _targetReady.set();
193
194 if (_thread.tryJoin(JOIN_TIMEOUT))
195 {
196 delete this;
197 }
198}
199
200
201void PooledThread::run()
202{
203 _started.set();
204 for (;;)
205 {
206 _targetReady.wait();
207 _mutex.lock();
208 if (_pTarget) // a NULL target means kill yourself
209 {
210 Runnable* pTarget = _pTarget;
211 _mutex.unlock();
212 try
213 {
214 pTarget->run();
215 }
216 catch (Exception& exc)
217 {
218 ErrorHandler::handle(exc);
219 }
220 catch (std::exception& exc)
221 {
222 ErrorHandler::handle(exc);
223 }
224 catch (...)
225 {
226 ErrorHandler::handle();
227 }
228 FastMutex::ScopedLock lock(_mutex);
229 _pTarget = 0;
230#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
231 _idleTime = wceex_time(NULL);
232#else
233 _idleTime = time(NULL);
234#endif
235 _idle = true;
236 _targetCompleted.set();
237 ThreadLocalStorage::clear();
238 _thread.setName(_name);
239 _thread.setPriority(Thread::PRIO_NORMAL);
240 }
241 else
242 {
243 _mutex.unlock();
244 break;
245 }
246 }
247}
248
249
250ThreadPool::ThreadPool(int minCapacity,
251 int maxCapacity,
252 int idleTime,
253 int stackSize,
254 ThreadAffinityPolicy affinityPolicy):
255 _minCapacity(minCapacity),
256 _maxCapacity(maxCapacity),
257 _idleTime(idleTime),
258 _serial(0),
259 _age(0),
260 _stackSize(stackSize),
261 _affinityPolicy(affinityPolicy),
262 _lastCpu(0)
263{
264 poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
265
266 int cpu = -1;
267 int cpuCount = Poco::Environment::processorCount();
268
269 for (int i = 0; i < _minCapacity; i++)
270 {
271 if (_affinityPolicy == TAP_UNIFORM_DISTRIBUTION)
272 {
273 cpu = _lastCpu.value() % cpuCount;
274 _lastCpu++;
275 }
276 PooledThread* pThread = createThread();
277 _threads.push_back(pThread);
278 pThread->start(cpu);
279 }
280}
281
282
283ThreadPool::ThreadPool(const std::string& rName,
284 int minCapacity,
285 int maxCapacity,
286 int idleTime,
287 int stackSize,
288 ThreadAffinityPolicy affinityPolicy):
289 _name(rName),
290 _minCapacity(minCapacity),
291 _maxCapacity(maxCapacity),
292 _idleTime(idleTime),
293 _serial(0),
294 _age(0),
295 _stackSize(stackSize),
296 _affinityPolicy(affinityPolicy),
297 _lastCpu(0)
298{
299 poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
300
301 int cpu = -1;
302 int cpuCount = Poco::Environment::processorCount();
303 for (int i = 0; i < _minCapacity; i++)
304 {
305 if (_affinityPolicy == TAP_UNIFORM_DISTRIBUTION)
306 {
307 cpu = _lastCpu.value() % cpuCount;
308 _lastCpu++;
309 }
310 PooledThread* pThread = createThread();
311 _threads.push_back(pThread);
312 pThread->start(cpu);
313 }
314}
315
316
317ThreadPool::~ThreadPool()
318{
319 try
320 {
321 stopAll();
322 }
323 catch (...)
324 {
325 poco_unexpected();
326 }
327}
328
329
330void ThreadPool::addCapacity(int n)
331{
332 FastMutex::ScopedLock lock(_mutex);
333
334 poco_assert (_maxCapacity + n >= _minCapacity);
335 _maxCapacity += n;
336 housekeep();
337}
338
339
340int ThreadPool::capacity() const
341{
342 FastMutex::ScopedLock lock(_mutex);
343 return _maxCapacity;
344}
345
346
347int ThreadPool::available() const
348{
349 FastMutex::ScopedLock lock(_mutex);
350
351 int count = 0;
352 for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
353 {
354 if ((*it)->idle()) ++count;
355 }
356 return (int) (count + _maxCapacity - _threads.size());
357}
358
359
360int ThreadPool::used() const
361{
362 FastMutex::ScopedLock lock(_mutex);
363
364 int count = 0;
365 for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
366 {
367 if (!(*it)->idle()) ++count;
368 }
369 return count;
370}
371
372
373int ThreadPool::allocated() const
374{
375 FastMutex::ScopedLock lock(_mutex);
376
377 return int(_threads.size());
378}
379
380
381int ThreadPool::affinity(int cpu)
382{
383 switch (static_cast<int>(_affinityPolicy))
384 {
385 case TAP_UNIFORM_DISTRIBUTION:
386 {
387 cpu = _lastCpu.value() % Environment::processorCount();
388 _lastCpu++;
389 }
390 break;
391 case TAP_DEFAULT:
392 {
393 cpu = -1;
394 }
395 break;
396 case TAP_CUSTOM:
397 {
398 if ((cpu < -1) || (cpu >= Environment::processorCount()))
399 {
400 throw InvalidArgumentException("cpu argument is invalid");
401 }
402 }
403 break;
404 }
405 return cpu;
406}
407
408
409void ThreadPool::start(Runnable& target, int cpu)
410{
411 getThread()->start(Thread::PRIO_NORMAL, target, affinity(cpu));
412}
413
414
415void ThreadPool::start(Runnable& target, const std::string& rName, int cpu)
416{
417 getThread()->start(Thread::PRIO_NORMAL, target, rName, affinity(cpu));
418}
419
420
421void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, int cpu)
422{
423 getThread()->start(priority, target, affinity(cpu));
424}
425
426
427void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& rName, int cpu)
428{
429 getThread()->start(priority, target, rName, affinity(cpu));
430}
431
432
433void ThreadPool::stopAll()
434{
435 FastMutex::ScopedLock lock(_mutex);
436
437 for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
438 {
439 (*it)->release();
440 }
441 _threads.clear();
442}
443
444
445void ThreadPool::joinAll()
446{
447 FastMutex::ScopedLock lock(_mutex);
448
449 for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
450 {
451 (*it)->join();
452 }
453 housekeep();
454}
455
456
457void ThreadPool::collect()
458{
459 FastMutex::ScopedLock lock(_mutex);
460 housekeep();
461}
462
463
464void ThreadPool::housekeep()
465{
466 _age = 0;
467 if (_threads.size() <= _minCapacity)
468 return;
469
470 ThreadVec idleThreads;
471 ThreadVec expiredThreads;
472 ThreadVec activeThreads;
473 idleThreads.reserve(_threads.size());
474 activeThreads.reserve(_threads.size());
475
476 for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
477 {
478 if ((*it)->idle())
479 {
480 if ((*it)->idleTime() < _idleTime)
481 idleThreads.push_back(*it);
482 else
483 expiredThreads.push_back(*it);
484 }
485 else activeThreads.push_back(*it);
486 }
487 int n = (int) activeThreads.size();
488 int limit = (int) idleThreads.size() + n;
489 if (limit < _minCapacity) limit = _minCapacity;
490 idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
491 _threads.clear();
492 for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)
493 {
494 if (n < limit)
495 {
496 _threads.push_back(*it);
497 ++n;
498 }
499 else (*it)->release();
500 }
501 _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());
502}
503
504
505PooledThread* ThreadPool::getThread()
506{
507 FastMutex::ScopedLock lock(_mutex);
508
509 if (++_age == 32)
510 housekeep();
511
512 PooledThread* pThread = 0;
513 for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
514 {
515 if ((*it)->idle())
516 pThread = *it;
517 }
518 if (!pThread)
519 {
520 if (_threads.size() < _maxCapacity)
521 {
522 pThread = createThread();
523 try
524 {
525 pThread->start();
526 _threads.push_back(pThread);
527 }
528 catch (...)
529 {
530 delete pThread;
531 throw;
532 }
533 }
534 else throw NoThreadAvailableException();
535 }
536 pThread->activate();
537 return pThread;
538}
539
540
541PooledThread* ThreadPool::createThread()
542{
543 std::ostringstream threadName;
544 threadName << _name << "[#" << ++_serial << "]";
545 return new PooledThread(threadName.str(), _stackSize);
546}
547
548
549} // namespace Poco
550