1/****************************************************************************
2**
3** Copyright (C) 2020 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40// qfutureinterface.h included from qfuture.h
41#include "qfuture.h"
42#include "qfutureinterface_p.h"
43
44#include <QtCore/qatomic.h>
45#include <QtCore/qthread.h>
46#include <private/qthreadpool_p.h>
47
48#ifdef interface
49# undef interface
50#endif
51
52QT_BEGIN_NAMESPACE
53
54enum {
55 MaxProgressEmitsPerSecond = 25
56};
57
58namespace {
59class ThreadPoolThreadReleaser {
60 QThreadPool *m_pool;
61public:
62 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
63 : m_pool(pool)
64 { if (pool) pool->releaseThread(); }
65 ~ThreadPoolThreadReleaser()
66 { if (m_pool) m_pool->reserveThread(); }
67};
68
69const auto suspendingOrSuspended =
70 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
71
72} // unnamed namespace
73
74
75QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
76 : d(new QFutureInterfaceBasePrivate(initialState))
77{ }
78
79QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
80 : d(other.d)
81{
82 d->refCount.ref();
83}
84
85QFutureInterfaceBase::~QFutureInterfaceBase()
86{
87 if (!d->refCount.deref())
88 delete d;
89}
90
91static inline int switch_on(QAtomicInt &a, int which)
92{
93 return a.fetchAndOrRelaxed(which) | which;
94}
95
96static inline int switch_off(QAtomicInt &a, int which)
97{
98 return a.fetchAndAndRelaxed(~which) & ~which;
99}
100
101static inline int switch_from_to(QAtomicInt &a, int from, int to)
102{
103 int newValue;
104 int expected = a.loadRelaxed();
105 do {
106 newValue = (expected & ~from) | to;
107 } while (!a.testAndSetRelaxed(expected, newValue, expected));
108 return newValue;
109}
110
111void QFutureInterfaceBase::cancel()
112{
113 QMutexLocker locker(&d->m_mutex);
114 if (d->state.loadRelaxed() & Canceled)
115 return;
116
117 switch_from_to(d->state, suspendingOrSuspended, Canceled);
118 d->waitCondition.wakeAll();
119 d->pausedWaitCondition.wakeAll();
120 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
121 d->isValid = false;
122}
123
124void QFutureInterfaceBase::setSuspended(bool suspend)
125{
126 QMutexLocker locker(&d->m_mutex);
127 if (suspend) {
128 switch_on(d->state, Suspending);
129 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
130 } else {
131 switch_off(d->state, suspendingOrSuspended);
132 d->pausedWaitCondition.wakeAll();
133 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
134 }
135}
136
137void QFutureInterfaceBase::toggleSuspended()
138{
139 QMutexLocker locker(&d->m_mutex);
140 if (d->state.loadRelaxed() & suspendingOrSuspended) {
141 switch_off(d->state, suspendingOrSuspended);
142 d->pausedWaitCondition.wakeAll();
143 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
144 } else {
145 switch_on(d->state, Suspending);
146 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
147 }
148}
149
150void QFutureInterfaceBase::reportSuspended() const
151{
152 // Needs to be called when pause is in effect,
153 // i.e. no more events will be reported.
154
155 QMutexLocker locker(&d->m_mutex);
156 const int state = d->state;
157 if (!(state & Suspending) || (state & Suspended))
158 return;
159
160 switch_from_to(d->state, Suspending, Suspended);
161 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
162}
163
164void QFutureInterfaceBase::setThrottled(bool enable)
165{
166 QMutexLocker lock(&d->m_mutex);
167 if (enable) {
168 switch_on(d->state, Throttled);
169 } else {
170 switch_off(d->state, Throttled);
171 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
172 d->pausedWaitCondition.wakeAll();
173 }
174}
175
176
177bool QFutureInterfaceBase::isRunning() const
178{
179 return queryState(Running);
180}
181
182bool QFutureInterfaceBase::isStarted() const
183{
184 return queryState(Started);
185}
186
187bool QFutureInterfaceBase::isCanceled() const
188{
189 return queryState(Canceled);
190}
191
192bool QFutureInterfaceBase::isFinished() const
193{
194 return queryState(Finished);
195}
196
197bool QFutureInterfaceBase::isSuspending() const
198{
199 return queryState(Suspending);
200}
201
202#if QT_DEPRECATED_SINCE(6, 0)
203bool QFutureInterfaceBase::isPaused() const
204{
205 return queryState(static_cast<State>(suspendingOrSuspended));
206}
207#endif
208
209bool QFutureInterfaceBase::isSuspended() const
210{
211 return queryState(Suspended);
212}
213
214bool QFutureInterfaceBase::isThrottled() const
215{
216 return queryState(Throttled);
217}
218
219bool QFutureInterfaceBase::isResultReadyAt(int index) const
220{
221 QMutexLocker lock(&d->m_mutex);
222 return d->internal_isResultReadyAt(index);
223}
224
225bool QFutureInterfaceBase::isValid() const
226{
227 const QMutexLocker lock(&d->m_mutex);
228 return d->isValid;
229}
230
231bool QFutureInterfaceBase::isRunningOrPending() const
232{
233 return queryState(static_cast<State>(Running | Pending));
234}
235
236bool QFutureInterfaceBase::waitForNextResult()
237{
238 QMutexLocker lock(&d->m_mutex);
239 return d->internal_waitForNextResult();
240}
241
242void QFutureInterfaceBase::waitForResume()
243{
244 // return early if possible to avoid taking the mutex lock.
245 {
246 const int state = d->state.loadRelaxed();
247 if (!(state & suspendingOrSuspended) || (state & Canceled))
248 return;
249 }
250
251 QMutexLocker lock(&d->m_mutex);
252 const int state = d->state.loadRelaxed();
253 if (!(state & suspendingOrSuspended) || (state & Canceled))
254 return;
255
256 // decrease active thread count since this thread will wait.
257 const ThreadPoolThreadReleaser releaser(d->pool());
258
259 d->pausedWaitCondition.wait(&d->m_mutex);
260}
261
262void QFutureInterfaceBase::suspendIfRequested()
263{
264 const auto canSuspend = [] (int state) {
265 // can suspend only if 1) in any suspend-related state; 2) not canceled
266 return (state & suspendingOrSuspended) && !(state & Canceled);
267 };
268
269 // return early if possible to avoid taking the mutex lock.
270 {
271 const int state = d->state.loadRelaxed();
272 if (!canSuspend(state))
273 return;
274 }
275
276 QMutexLocker lock(&d->m_mutex);
277 const int state = d->state.loadRelaxed();
278 if (!canSuspend(state))
279 return;
280
281 // Note: expecting that Suspending and Suspended are mutually exclusive
282 if (!(state & Suspended)) {
283 // switch state in case this is the first invocation
284 switch_from_to(d->state, Suspending, Suspended);
285 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
286 }
287
288 // decrease active thread count since this thread will wait.
289 const ThreadPoolThreadReleaser releaser(d->pool());
290 d->pausedWaitCondition.wait(&d->m_mutex);
291}
292
293int QFutureInterfaceBase::progressValue() const
294{
295 const QMutexLocker lock(&d->m_mutex);
296 return d->m_progressValue;
297}
298
299int QFutureInterfaceBase::progressMinimum() const
300{
301 const QMutexLocker lock(&d->m_mutex);
302 return d->m_progressMinimum;
303}
304
305int QFutureInterfaceBase::progressMaximum() const
306{
307 const QMutexLocker lock(&d->m_mutex);
308 return d->m_progressMaximum;
309}
310
311int QFutureInterfaceBase::resultCount() const
312{
313 QMutexLocker lock(&d->m_mutex);
314 return d->internal_resultCount();
315}
316
317QString QFutureInterfaceBase::progressText() const
318{
319 QMutexLocker locker(&d->m_mutex);
320 return d->m_progressText;
321}
322
323bool QFutureInterfaceBase::isProgressUpdateNeeded() const
324{
325 QMutexLocker locker(&d->m_mutex);
326 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
327}
328
329void QFutureInterfaceBase::reportStarted()
330{
331 QMutexLocker locker(&d->m_mutex);
332 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
333 return;
334 d->setState(State(Started | Running));
335 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
336 d->isValid = true;
337}
338
339void QFutureInterfaceBase::reportCanceled()
340{
341 cancel();
342}
343
344#ifndef QT_NO_EXCEPTIONS
345void QFutureInterfaceBase::reportException(const QException &exception)
346{
347 try {
348 exception.raise();
349 } catch (...) {
350 reportException(std::current_exception());
351 }
352}
353
354void QFutureInterfaceBase::reportException(std::exception_ptr exception)
355{
356 QMutexLocker locker(&d->m_mutex);
357 if (d->state.loadRelaxed() & (Canceled|Finished))
358 return;
359
360 d->m_exceptionStore.setException(exception);
361 switch_on(d->state, Canceled);
362 d->waitCondition.wakeAll();
363 d->pausedWaitCondition.wakeAll();
364 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
365}
366#endif
367
368void QFutureInterfaceBase::reportFinished()
369{
370 QMutexLocker locker(&d->m_mutex);
371 if (!isFinished()) {
372 switch_from_to(d->state, Running, Finished);
373 d->waitCondition.wakeAll();
374 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
375 }
376}
377
378void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
379{
380 if (d->manualProgress == false)
381 setProgressRange(0, resultCount);
382 d->m_expectedResultCount = resultCount;
383}
384
385int QFutureInterfaceBase::expectedResultCount()
386{
387 return d->m_expectedResultCount;
388}
389
390bool QFutureInterfaceBase::queryState(State state) const
391{
392 return d->state.loadRelaxed() & state;
393}
394
395int QFutureInterfaceBase::loadState() const
396{
397 return d->state.loadRelaxed();
398}
399
400void QFutureInterfaceBase::waitForResult(int resultIndex)
401{
402 d->m_exceptionStore.throwPossibleException();
403
404 QMutexLocker lock(&d->m_mutex);
405 if (!isRunningOrPending())
406 return;
407 lock.unlock();
408
409 // To avoid deadlocks and reduce the number of threads used, try to
410 // run the runnable in the current thread.
411 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
412
413 lock.relock();
414
415 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
416 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
417 d->waitCondition.wait(&d->m_mutex);
418
419 d->m_exceptionStore.throwPossibleException();
420}
421
422void QFutureInterfaceBase::waitForFinished()
423{
424 QMutexLocker lock(&d->m_mutex);
425 const bool alreadyFinished = isFinished();
426 lock.unlock();
427
428 if (!alreadyFinished) {
429 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
430
431 lock.relock();
432
433 while (!isFinished())
434 d->waitCondition.wait(&d->m_mutex);
435 }
436
437 d->m_exceptionStore.throwPossibleException();
438}
439
440void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
441{
442 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
443 return;
444
445 d->waitCondition.wakeAll();
446
447 if (d->manualProgress == false) {
448 if (d->internal_updateProgress(d->m_progressValue + endIndex - beginIndex) == false) {
449 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
450 beginIndex,
451 endIndex));
452 return;
453 }
454
455 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
456 d->m_progressValue,
457 d->m_progressText),
458 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
459 beginIndex,
460 endIndex));
461 return;
462 }
463 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
464}
465
466void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
467{
468 d->runnable = runnable;
469}
470
471void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
472{
473 d->m_pool = pool;
474}
475
476QThreadPool *QFutureInterfaceBase::threadPool() const
477{
478 return d->m_pool;
479}
480
481void QFutureInterfaceBase::setFilterMode(bool enable)
482{
483 QMutexLocker locker(&d->m_mutex);
484 resultStoreBase().setFilterMode(enable);
485}
486
487void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
488{
489 QMutexLocker locker(&d->m_mutex);
490 d->m_progressMinimum = minimum;
491 d->m_progressMaximum = maximum;
492 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
493}
494
495void QFutureInterfaceBase::setProgressValue(int progressValue)
496{
497 setProgressValueAndText(progressValue, QString());
498}
499
500void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
501 const QString &progressText)
502{
503 QMutexLocker locker(&d->m_mutex);
504 if (d->manualProgress == false)
505 d->manualProgress = true;
506 if (d->m_progressValue >= progressValue)
507 return;
508
509 if (d->state.loadRelaxed() & (Canceled|Finished))
510 return;
511
512 if (d->internal_updateProgress(progressValue, progressText)) {
513 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
514 d->m_progressValue,
515 d->m_progressText));
516 }
517}
518
519QMutex &QFutureInterfaceBase::mutex() const
520{
521 return d->m_mutex;
522}
523
524QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
525{
526 return d->m_exceptionStore;
527}
528
529QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
530{
531 return d->m_results;
532}
533
534const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
535{
536 return d->m_results;
537}
538
539QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
540{
541 other.d->refCount.ref();
542 if (!d->refCount.deref())
543 delete d;
544 d = other.d;
545 return *this;
546}
547
548void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
549{
550 qSwap(d, other.d);
551}
552
553bool QFutureInterfaceBase::refT() const
554{
555 return d->refCount.refT();
556}
557
558bool QFutureInterfaceBase::derefT() const
559{
560 return d->refCount.derefT();
561}
562
563void QFutureInterfaceBase::reset()
564{
565 d->m_progressValue = 0;
566 d->m_progressMinimum = 0;
567 d->m_progressMaximum = 0;
568 d->setState(QFutureInterfaceBase::NoState);
569 d->progressTime.invalidate();
570 d->isValid = false;
571}
572
573QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
574 : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0),
575 state(initialState),
576 manualProgress(false), m_expectedResultCount(0), runnable(nullptr), m_pool(nullptr)
577{
578 progressTime.invalidate();
579}
580
581int QFutureInterfaceBasePrivate::internal_resultCount() const
582{
583 return m_results.count(); // ### subtract canceled results.
584}
585
586bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
587{
588 return (m_results.contains(index));
589}
590
591bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
592{
593 if (m_results.hasNextResult())
594 return true;
595
596 while ((state.loadRelaxed() & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false)
597 waitCondition.wait(&m_mutex);
598
599 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled) && m_results.hasNextResult();
600}
601
602bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
603 const QString &progressText)
604{
605 if (m_progressValue >= progress)
606 return false;
607
608 m_progressValue = progress;
609 m_progressText = progressText;
610
611 if (progressTime.isValid() && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted.
612 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
613 return false;
614
615 progressTime.start();
616 return true;
617}
618
619void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
620{
621 // bail out if we are not changing the state
622 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
623 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
624 return;
625
626 // change the state
627 if (enable) {
628 switch_on(state, QFutureInterfaceBase::Throttled);
629 } else {
630 switch_off(state, QFutureInterfaceBase::Throttled);
631 if (!(state.loadRelaxed() & suspendingOrSuspended))
632 pausedWaitCondition.wakeAll();
633 }
634}
635
636void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
637{
638 if (outputConnections.isEmpty())
639 return;
640
641 for (int i = 0; i < outputConnections.count(); ++i)
642 outputConnections.at(i)->postCallOutEvent(callOutEvent);
643}
644
645void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
646 const QFutureCallOutEvent &callOutEvent2)
647{
648 if (outputConnections.isEmpty())
649 return;
650
651 for (int i = 0; i < outputConnections.count(); ++i) {
652 QFutureCallOutInterface *interface = outputConnections.at(i);
653 interface->postCallOutEvent(callOutEvent1);
654 interface->postCallOutEvent(callOutEvent2);
655 }
656}
657
658// This function connects an output interface (for example a QFutureWatcher)
659// to this future. While holding the lock we check the state and ready results
660// and add the appropriate callouts to the queue. In order to avoid deadlocks,
661// the actual callouts are made at the end while not holding the lock.
662void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
663{
664 QMutexLocker locker(&m_mutex);
665
666 const auto currentState = state.loadRelaxed();
667 if (currentState & QFutureInterfaceBase::Started) {
668 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
669 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
670 m_progressMinimum,
671 m_progressMaximum));
672 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
673 m_progressValue,
674 m_progressText));
675 }
676
677 QtPrivate::ResultIteratorBase it = m_results.begin();
678 while (it != m_results.end()) {
679 const int begin = it.resultIndex();
680 const int end = begin + it.batchSize();
681 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
682 begin,
683 end));
684 it.batchedAdvance();
685 }
686
687 if (currentState & QFutureInterfaceBase::Suspended)
688 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
689 else if (currentState & QFutureInterfaceBase::Suspending)
690 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
691
692 if (currentState & QFutureInterfaceBase::Canceled)
693 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
694
695 if (currentState & QFutureInterfaceBase::Finished)
696 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
697
698 outputConnections.append(interface);
699}
700
701void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
702{
703 QMutexLocker lock(&m_mutex);
704 const int index = outputConnections.indexOf(interface);
705 if (index == -1)
706 return;
707 outputConnections.removeAt(index);
708
709 interface->callOutInterfaceDisconnected();
710}
711
712void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
713{
714 state.storeRelaxed(newState);
715}
716
717void QFutureInterfaceBase::setContinuation(std::function<void()> func)
718{
719 QMutexLocker lock(&d->continuationMutex);
720 // If the state is ready, run continuation immediately,
721 // otherwise save it for later.
722 if (isFinished()) {
723 lock.unlock();
724 func();
725 } else {
726 d->continuation = std::move(func);
727 }
728}
729
730void QFutureInterfaceBase::runContinuation() const
731{
732 QMutexLocker lock(&d->continuationMutex);
733 if (d->continuation) {
734 lock.unlock();
735 d->continuation();
736 }
737}
738
739void QFutureInterfaceBase::setLaunchAsync(bool value)
740{
741 d->launchAsync = value;
742}
743
744bool QFutureInterfaceBase::launchAsync() const
745{
746 return d->launchAsync;
747}
748
749QT_END_NAMESPACE
750