1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#include "qthreadpool.h"
41#include "qthreadpool_p.h"
42#include "qdeadlinetimer.h"
43#include "qcoreapplication.h"
44
45#include <algorithm>
46
47QT_BEGIN_NAMESPACE
48
49/*
50 QThread wrapper, provides synchronization against a ThreadPool
51*/
52class QThreadPoolThread : public QThread
53{
54 Q_OBJECT
55public:
56 QThreadPoolThread(QThreadPoolPrivate *manager);
57 void run() override;
58 void registerThreadInactive();
59
60 QWaitCondition runnableReady;
61 QThreadPoolPrivate *manager;
62 QRunnable *runnable;
63};
64
65/*
66 QThreadPool private class.
67*/
68
69
70/*!
71 \internal
72*/
73QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager)
74 :manager(manager), runnable(nullptr)
75{
76 setStackSize(manager->stackSize);
77}
78
79/*
80 \internal
81*/
82void QThreadPoolThread::run()
83{
84 QMutexLocker locker(&manager->mutex);
85 for(;;) {
86 QRunnable *r = runnable;
87 runnable = nullptr;
88
89 do {
90 if (r) {
91 // If autoDelete() is false, r might already be deleted after run(), so check status now.
92 const bool del = r->autoDelete();
93
94 // run the task
95 locker.unlock();
96#ifndef QT_NO_EXCEPTIONS
97 try {
98#endif
99 r->run();
100#ifndef QT_NO_EXCEPTIONS
101 } catch (...) {
102 qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n"
103 "This is not supported, exceptions thrown in worker threads must be\n"
104 "caught before control returns to Qt Concurrent.");
105 registerThreadInactive();
106 throw;
107 }
108#endif
109
110 if (del)
111 delete r;
112 locker.relock();
113 }
114
115 // if too many threads are active, expire this thread
116 if (manager->tooManyThreadsActive())
117 break;
118
119 if (manager->queue.isEmpty()) {
120 r = nullptr;
121 break;
122 }
123
124 QueuePage *page = manager->queue.first();
125 r = page->pop();
126
127 if (page->isFinished()) {
128 manager->queue.removeFirst();
129 delete page;
130 }
131 } while (true);
132
133 // if too many threads are active, expire this thread
134 bool expired = manager->tooManyThreadsActive();
135 if (!expired) {
136 manager->waitingThreads.enqueue(this);
137 registerThreadInactive();
138 // wait for work, exiting after the expiry timeout is reached
139 runnableReady.wait(locker.mutex(), QDeadlineTimer(manager->expiryTimeout));
140 ++manager->activeThreads;
141 if (manager->waitingThreads.removeOne(this))
142 expired = true;
143 if (!manager->allThreads.contains(this)) {
144 registerThreadInactive();
145 break;
146 }
147 }
148 if (expired) {
149 manager->expiredThreads.enqueue(this);
150 registerThreadInactive();
151 break;
152 }
153 }
154}
155
156void QThreadPoolThread::registerThreadInactive()
157{
158 if (--manager->activeThreads == 0)
159 manager->noActiveThreads.wakeAll();
160}
161
162
163/*
164 \internal
165*/
166QThreadPoolPrivate:: QThreadPoolPrivate()
167{ }
168
169bool QThreadPoolPrivate::tryStart(QRunnable *task)
170{
171 Q_ASSERT(task != nullptr);
172 if (allThreads.isEmpty()) {
173 // always create at least one thread
174 startThread(task);
175 return true;
176 }
177
178 // can't do anything if we're over the limit
179 if (activeThreadCount() >= maxThreadCount)
180 return false;
181
182 if (waitingThreads.count() > 0) {
183 // recycle an available thread
184 enqueueTask(task);
185 waitingThreads.takeFirst()->runnableReady.wakeOne();
186 return true;
187 }
188
189 if (!expiredThreads.isEmpty()) {
190 // restart an expired thread
191 QThreadPoolThread *thread = expiredThreads.dequeue();
192 Q_ASSERT(thread->runnable == nullptr);
193
194 ++activeThreads;
195
196 thread->runnable = task;
197 thread->start();
198 return true;
199 }
200
201 // start a new thread
202 startThread(task);
203 return true;
204}
205
206inline bool comparePriority(int priority, const QueuePage *p)
207{
208 return p->priority() < priority;
209}
210
211void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
212{
213 Q_ASSERT(runnable != nullptr);
214 for (QueuePage *page : qAsConst(queue)) {
215 if (page->priority() == priority && !page->isFull()) {
216 page->push(runnable);
217 return;
218 }
219 }
220 auto it = std::upper_bound(queue.constBegin(), queue.constEnd(), priority, comparePriority);
221 queue.insert(std::distance(queue.constBegin(), it), new QueuePage(runnable, priority));
222}
223
224int QThreadPoolPrivate::activeThreadCount() const
225{
226 return (allThreads.count()
227 - expiredThreads.count()
228 - waitingThreads.count()
229 + reservedThreads);
230}
231
232void QThreadPoolPrivate::tryToStartMoreThreads()
233{
234 // try to push tasks on the queue to any available threads
235 while (!queue.isEmpty()) {
236 QueuePage *page = queue.first();
237 if (!tryStart(page->first()))
238 break;
239
240 page->pop();
241
242 if (page->isFinished()) {
243 queue.removeFirst();
244 delete page;
245 }
246 }
247}
248
249bool QThreadPoolPrivate::tooManyThreadsActive() const
250{
251 const int activeThreadCount = this->activeThreadCount();
252 return activeThreadCount > maxThreadCount && (activeThreadCount - reservedThreads) > 1;
253}
254
255/*!
256 \internal
257*/
258void QThreadPoolPrivate::startThread(QRunnable *runnable)
259{
260 Q_ASSERT(runnable != nullptr);
261 QScopedPointer<QThreadPoolThread> thread(new QThreadPoolThread(this));
262 thread->setObjectName(QLatin1String("Thread (pooled)"));
263 Q_ASSERT(!allThreads.contains(thread.data())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
264 allThreads.insert(thread.data());
265 ++activeThreads;
266
267 thread->runnable = runnable;
268 thread.take()->start();
269}
270
271/*!
272 \internal
273
274 Helper function only to be called from waitForDone(int)
275*/
276void QThreadPoolPrivate::reset()
277{
278 // move the contents of the set out so that we can iterate without the lock
279 QSet<QThreadPoolThread *> allThreadsCopy;
280 allThreadsCopy.swap(allThreads);
281 expiredThreads.clear();
282 waitingThreads.clear();
283 mutex.unlock();
284
285 for (QThreadPoolThread *thread : qAsConst(allThreadsCopy)) {
286 if (!thread->isFinished()) {
287 thread->runnableReady.wakeAll();
288 thread->wait();
289 }
290 delete thread;
291 }
292
293 mutex.lock();
294}
295
296/*!
297 \internal
298
299 Helper function only to be called from waitForDone(int)
300*/
301bool QThreadPoolPrivate::waitForDone(const QDeadlineTimer &timer)
302{
303 while (!(queue.isEmpty() && activeThreads == 0) && !timer.hasExpired())
304 noActiveThreads.wait(&mutex, timer);
305
306 return queue.isEmpty() && activeThreads == 0;
307}
308
309bool QThreadPoolPrivate::waitForDone(int msecs)
310{
311 QMutexLocker locker(&mutex);
312 QDeadlineTimer timer(msecs);
313 do {
314 if (!waitForDone(timer))
315 return false;
316 reset();
317 // More threads can be started during reset(), in that case continue
318 // waiting if we still have time left.
319 } while ((!queue.isEmpty() || activeThreads) && !timer.hasExpired());
320
321 return queue.isEmpty() && activeThreads == 0;
322}
323
324void QThreadPoolPrivate::clear()
325{
326 QMutexLocker locker(&mutex);
327 while (!queue.isEmpty()) {
328 auto *page = queue.takeLast();
329 while (!page->isFinished()) {
330 QRunnable *r = page->pop();
331 if (r && r->autoDelete()) {
332 locker.unlock();
333 delete r;
334 locker.relock();
335 }
336 }
337 delete page;
338 }
339}
340
341/*!
342 \since 5.9
343
344 Attempts to remove the specified \a runnable from the queue if it is not yet started.
345 If the runnable had not been started, returns \c true, and ownership of \a runnable
346 is transferred to the caller (even when \c{runnable->autoDelete() == true}).
347 Otherwise returns \c false.
348
349 \note If \c{runnable->autoDelete() == true}, this function may remove the wrong
350 runnable. This is known as the \l{https://en.wikipedia.org/wiki/ABA_problem}{ABA problem}:
351 the original \a runnable may already have executed and has since been deleted.
352 The memory is re-used for another runnable, which then gets removed instead of
353 the intended one. For this reason, we recommend calling this function only for
354 runnables that are not auto-deleting.
355
356 \sa start(), QRunnable::autoDelete()
357*/
358bool QThreadPool::tryTake(QRunnable *runnable)
359{
360 Q_D(QThreadPool);
361
362 if (runnable == nullptr)
363 return false;
364
365 QMutexLocker locker(&d->mutex);
366 for (QueuePage *page : qAsConst(d->queue)) {
367 if (page->tryTake(runnable)) {
368 if (page->isFinished()) {
369 d->queue.removeOne(page);
370 delete page;
371 }
372 return true;
373 }
374 }
375
376 return false;
377}
378
379 /*!
380 \internal
381 Searches for \a runnable in the queue, removes it from the queue and
382 runs it if found. This function does not return until the runnable
383 has completed.
384 */
385void QThreadPoolPrivate::stealAndRunRunnable(QRunnable *runnable)
386{
387 Q_Q(QThreadPool);
388 if (!q->tryTake(runnable))
389 return;
390 // If autoDelete() is false, runnable might already be deleted after run(), so check status now.
391 const bool del = runnable->autoDelete();
392
393 runnable->run();
394
395 if (del)
396 delete runnable;
397}
398
399/*!
400 \class QThreadPool
401 \inmodule QtCore
402 \brief The QThreadPool class manages a collection of QThreads.
403 \since 4.4
404 \threadsafe
405
406 \ingroup thread
407
408 QThreadPool manages and recyles individual QThread objects to help reduce
409 thread creation costs in programs that use threads. Each Qt application
410 has one global QThreadPool object, which can be accessed by calling
411 globalInstance().
412
413 To use one of the QThreadPool threads, subclass QRunnable and implement
414 the run() virtual function. Then create an object of that class and pass
415 it to QThreadPool::start().
416
417 \snippet code/src_corelib_concurrent_qthreadpool.cpp 0
418
419 QThreadPool deletes the QRunnable automatically by default. Use
420 QRunnable::setAutoDelete() to change the auto-deletion flag.
421
422 QThreadPool supports executing the same QRunnable more than once
423 by calling tryStart(this) from within QRunnable::run().
424 If autoDelete is enabled the QRunnable will be deleted when
425 the last thread exits the run function. Calling start()
426 multiple times with the same QRunnable when autoDelete is enabled
427 creates a race condition and is not recommended.
428
429 Threads that are unused for a certain amount of time will expire. The
430 default expiry timeout is 30000 milliseconds (30 seconds). This can be
431 changed using setExpiryTimeout(). Setting a negative expiry timeout
432 disables the expiry mechanism.
433
434 Call maxThreadCount() to query the maximum number of threads to be used.
435 If needed, you can change the limit with setMaxThreadCount(). The default
436 maxThreadCount() is QThread::idealThreadCount(). The activeThreadCount()
437 function returns the number of threads currently doing work.
438
439 The reserveThread() function reserves a thread for external
440 use. Use releaseThread() when your are done with the thread, so
441 that it may be reused. Essentially, these functions temporarily
442 increase or reduce the active thread count and are useful when
443 implementing time-consuming operations that are not visible to the
444 QThreadPool.
445
446 Note that QThreadPool is a low-level class for managing threads, see
447 the Qt Concurrent module for higher level alternatives.
448
449 \sa QRunnable
450*/
451
452/*!
453 Constructs a thread pool with the given \a parent.
454*/
455QThreadPool::QThreadPool(QObject *parent)
456 : QObject(*new QThreadPoolPrivate, parent)
457{ }
458
459/*!
460 Destroys the QThreadPool.
461 This function will block until all runnables have been completed.
462*/
463QThreadPool::~QThreadPool()
464{
465 waitForDone();
466}
467
468/*!
469 Returns the global QThreadPool instance.
470*/
471QThreadPool *QThreadPool::globalInstance()
472{
473 static QPointer<QThreadPool> theInstance;
474 static QBasicMutex theMutex;
475
476 const QMutexLocker locker(&theMutex);
477 if (theInstance.isNull() && !QCoreApplication::closingDown())
478 theInstance = new QThreadPool();
479 return theInstance;
480}
481
482/*!
483 Reserves a thread and uses it to run \a runnable, unless this thread will
484 make the current thread count exceed maxThreadCount(). In that case,
485 \a runnable is added to a run queue instead. The \a priority argument can
486 be used to control the run queue's order of execution.
487
488 Note that the thread pool takes ownership of the \a runnable if
489 \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns \c true,
490 and the \a runnable will be deleted automatically by the thread
491 pool after the \l{QRunnable::run()}{runnable->run()} returns. If
492 \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns \c false,
493 ownership of \a runnable remains with the caller. Note that
494 changing the auto-deletion on \a runnable after calling this
495 functions results in undefined behavior.
496*/
497void QThreadPool::start(QRunnable *runnable, int priority)
498{
499 if (!runnable)
500 return;
501
502 Q_D(QThreadPool);
503 QMutexLocker locker(&d->mutex);
504
505 if (!d->tryStart(runnable)) {
506 d->enqueueTask(runnable, priority);
507
508 if (!d->waitingThreads.isEmpty())
509 d->waitingThreads.takeFirst()->runnableReady.wakeOne();
510 }
511}
512
513/*!
514 \overload
515 \since 5.15
516
517 Reserves a thread and uses it to run \a functionToRun, unless this thread will
518 make the current thread count exceed maxThreadCount(). In that case,
519 \a functionToRun is added to a run queue instead. The \a priority argument can
520 be used to control the run queue's order of execution.
521*/
522void QThreadPool::start(std::function<void()> functionToRun, int priority)
523{
524 if (!functionToRun)
525 return;
526 start(QRunnable::create(std::move(functionToRun)), priority);
527}
528
529/*!
530 Attempts to reserve a thread to run \a runnable.
531
532 If no threads are available at the time of calling, then this function
533 does nothing and returns \c false. Otherwise, \a runnable is run immediately
534 using one available thread and this function returns \c true.
535
536 Note that on success the thread pool takes ownership of the \a runnable if
537 \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns \c true,
538 and the \a runnable will be deleted automatically by the thread
539 pool after the \l{QRunnable::run()}{runnable->run()} returns. If
540 \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns \c false,
541 ownership of \a runnable remains with the caller. Note that
542 changing the auto-deletion on \a runnable after calling this
543 function results in undefined behavior.
544*/
545bool QThreadPool::tryStart(QRunnable *runnable)
546{
547 if (!runnable)
548 return false;
549
550 Q_D(QThreadPool);
551 QMutexLocker locker(&d->mutex);
552 if (d->tryStart(runnable))
553 return true;
554
555 return false;
556}
557
558/*!
559 \overload
560 \since 5.15
561 Attempts to reserve a thread to run \a functionToRun.
562
563 If no threads are available at the time of calling, then this function
564 does nothing and returns \c false. Otherwise, \a functionToRun is run immediately
565 using one available thread and this function returns \c true.
566*/
567bool QThreadPool::tryStart(std::function<void()> functionToRun)
568{
569 if (!functionToRun)
570 return false;
571
572 Q_D(QThreadPool);
573 QMutexLocker locker(&d->mutex);
574 if (!d->allThreads.isEmpty() && d->activeThreadCount() >= d->maxThreadCount)
575 return false;
576
577 QRunnable *runnable = QRunnable::create(std::move(functionToRun));
578 if (d->tryStart(runnable))
579 return true;
580 delete runnable;
581 return false;
582}
583
584/*! \property QThreadPool::expiryTimeout
585
586 Threads that are unused for \a expiryTimeout milliseconds are considered
587 to have expired and will exit. Such threads will be restarted as needed.
588 The default \a expiryTimeout is 30000 milliseconds (30 seconds). If
589 \a expiryTimeout is negative, newly created threads will not expire, e.g.,
590 they will not exit until the thread pool is destroyed.
591
592 Note that setting \a expiryTimeout has no effect on already running
593 threads. Only newly created threads will use the new \a expiryTimeout.
594 We recommend setting the \a expiryTimeout immediately after creating the
595 thread pool, but before calling start().
596*/
597
598int QThreadPool::expiryTimeout() const
599{
600 Q_D(const QThreadPool);
601 return d->expiryTimeout;
602}
603
604void QThreadPool::setExpiryTimeout(int expiryTimeout)
605{
606 Q_D(QThreadPool);
607 if (d->expiryTimeout == expiryTimeout)
608 return;
609 d->expiryTimeout = expiryTimeout;
610}
611
612/*! \property QThreadPool::maxThreadCount
613
614 This property represents the maximum number of threads used by the thread
615 pool.
616
617 \note The thread pool will always use at least 1 thread, even if
618 \a maxThreadCount limit is zero or negative.
619
620 The default \a maxThreadCount is QThread::idealThreadCount().
621*/
622
623int QThreadPool::maxThreadCount() const
624{
625 Q_D(const QThreadPool);
626 return d->maxThreadCount;
627}
628
629void QThreadPool::setMaxThreadCount(int maxThreadCount)
630{
631 Q_D(QThreadPool);
632 QMutexLocker locker(&d->mutex);
633
634 if (maxThreadCount == d->maxThreadCount)
635 return;
636
637 d->maxThreadCount = maxThreadCount;
638 d->tryToStartMoreThreads();
639}
640
641/*! \property QThreadPool::activeThreadCount
642
643 This property represents the number of active threads in the thread pool.
644
645 \note It is possible for this function to return a value that is greater
646 than maxThreadCount(). See reserveThread() for more details.
647
648 \sa reserveThread(), releaseThread()
649*/
650
651int QThreadPool::activeThreadCount() const
652{
653 Q_D(const QThreadPool);
654 QMutexLocker locker(&d->mutex);
655 return d->activeThreadCount();
656}
657
658/*!
659 Reserves one thread, disregarding activeThreadCount() and maxThreadCount().
660
661 Once you are done with the thread, call releaseThread() to allow it to be
662 reused.
663
664 \note This function will always increase the number of active threads.
665 This means that by using this function, it is possible for
666 activeThreadCount() to return a value greater than maxThreadCount() .
667
668 \sa releaseThread()
669 */
670void QThreadPool::reserveThread()
671{
672 Q_D(QThreadPool);
673 QMutexLocker locker(&d->mutex);
674 ++d->reservedThreads;
675}
676
677/*! \property QThreadPool::stackSize
678
679 This property contains the stack size for the thread pool worker
680 threads.
681
682 The value of the property is only used when the thread pool creates
683 new threads. Changing it has no effect for already created
684 or running threads.
685
686 The default value is 0, which makes QThread use the operating
687 system default stack size.
688
689 \since 5.10
690*/
691void QThreadPool::setStackSize(uint stackSize)
692{
693 Q_D(QThreadPool);
694 d->stackSize = stackSize;
695}
696
697uint QThreadPool::stackSize() const
698{
699 Q_D(const QThreadPool);
700 return d->stackSize;
701}
702
703/*!
704 Releases a thread previously reserved by a call to reserveThread().
705
706 \note Calling this function without previously reserving a thread
707 temporarily increases maxThreadCount(). This is useful when a
708 thread goes to sleep waiting for more work, allowing other threads
709 to continue. Be sure to call reserveThread() when done waiting, so
710 that the thread pool can correctly maintain the
711 activeThreadCount().
712
713 \sa reserveThread()
714*/
715void QThreadPool::releaseThread()
716{
717 Q_D(QThreadPool);
718 QMutexLocker locker(&d->mutex);
719 --d->reservedThreads;
720 d->tryToStartMoreThreads();
721}
722
723/*!
724 Waits up to \a msecs milliseconds for all threads to exit and removes all
725 threads from the thread pool. Returns \c true if all threads were removed;
726 otherwise it returns \c false. If \a msecs is -1 (the default), the timeout
727 is ignored (waits for the last thread to exit).
728*/
729bool QThreadPool::waitForDone(int msecs)
730{
731 Q_D(QThreadPool);
732 return d->waitForDone(msecs);
733}
734
735/*!
736 \since 5.2
737
738 Removes the runnables that are not yet started from the queue.
739 The runnables for which \l{QRunnable::autoDelete()}{runnable->autoDelete()}
740 returns \c true are deleted.
741
742 \sa start()
743*/
744void QThreadPool::clear()
745{
746 Q_D(QThreadPool);
747 d->clear();
748}
749
750/*!
751 \since 6.0
752
753 Returns \c true if \a thread is a thread managed by this thread pool.
754*/
755bool QThreadPool::contains(const QThread *thread) const
756{
757 Q_D(const QThreadPool);
758 const QThreadPoolThread *poolThread = qobject_cast<const QThreadPoolThread *>(thread);
759 if (!poolThread)
760 return false;
761 return d->allThreads.contains(const_cast<QThreadPoolThread *>(poolThread));
762}
763
764QT_END_NAMESPACE
765
766#include "moc_qthreadpool.cpp"
767#include "qthreadpool.moc"
768