1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_ITERATEKERNEL_H
41#define QTCONCURRENT_ITERATEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44
45#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46
47#include <QtCore/qatomic.h>
48#include <QtConcurrent/qtconcurrentmedian.h>
49#include <QtConcurrent/qtconcurrentthreadengine.h>
50
51#include <iterator>
52
53QT_BEGIN_NAMESPACE
54
55
56
57namespace QtConcurrent {
58
59/*
60 The BlockSizeManager class manages how many iterations a thread should
61 reserve and process at a time. This is done by measuring the time spent
62 in the user code versus the control part code, and then increasing
63 the block size if the ratio between them is to small. The block size
64 management is done on the basis of the median of several timing measurements,
65 and it is done individually for each thread.
66*/
67class Q_CONCURRENT_EXPORT BlockSizeManager
68{
69public:
70 explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
71
72 void timeBeforeUser();
73 void timeAfterUser();
74 int blockSize();
75
76private:
77 inline bool blockSizeMaxed()
78 {
79 return (m_blockSize >= maxBlockSize);
80 }
81
82 const int maxBlockSize;
83 qint64 beforeUser;
84 qint64 afterUser;
85 Median controlPartElapsed;
86 Median userPartElapsed;
87 int m_blockSize;
88
89 Q_DISABLE_COPY(BlockSizeManager)
90};
91
92template <typename T>
93class ResultReporter
94{
95public:
96 ResultReporter(ThreadEngine<T> *_threadEngine)
97 :threadEngine(_threadEngine)
98 {
99
100 }
101
102 void reserveSpace(int resultCount)
103 {
104 currentResultCount = resultCount;
105 vector.resize(qMax(resultCount, vector.count()));
106 }
107
108 void reportResults(int begin)
109 {
110 const int useVectorThreshold = 4; // Tunable parameter.
111 if (currentResultCount > useVectorThreshold) {
112 vector.resize(currentResultCount);
113 threadEngine->reportResults(vector, begin);
114 } else {
115 for (int i = 0; i < currentResultCount; ++i)
116 threadEngine->reportResult(&vector.at(i), begin + i);
117 }
118 }
119
120 inline T * getPointer()
121 {
122 return vector.data();
123 }
124
125 int currentResultCount;
126 ThreadEngine<T> *threadEngine;
127 QList<T> vector;
128};
129
130template <>
131class ResultReporter<void>
132{
133public:
134 inline ResultReporter(ThreadEngine<void> *) { }
135 inline void reserveSpace(int) { }
136 inline void reportResults(int) { }
137 inline void * getPointer() { return nullptr; }
138};
139
140inline bool selectIteration(std::bidirectional_iterator_tag)
141{
142 return false; // while
143}
144
145inline bool selectIteration(std::forward_iterator_tag)
146{
147 return false; // while
148}
149
150inline bool selectIteration(std::random_access_iterator_tag)
151{
152 return true; // for
153}
154
155template <typename Iterator, typename T>
156class IterateKernel : public ThreadEngine<T>
157{
158public:
159 typedef T ResultType;
160
161 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
162 : ThreadEngine<T>(pool), begin(_begin), end(_end), current(_begin), currentIndex(0)
163 , forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category()))
164 , iterationCount(forIteration ? std::distance(_begin, _end) : 0)
165 , progressReportingEnabled(true)
166 {
167 }
168
169 virtual ~IterateKernel() { }
170
171 virtual bool runIteration(Iterator, int , T *) { return false; }
172 virtual bool runIterations(Iterator, int, int, T *) { return false; }
173
174 void start() override
175 {
176 progressReportingEnabled = this->isProgressReportingEnabled();
177 if (progressReportingEnabled && iterationCount > 0)
178 this->setProgressRange(0, iterationCount);
179 }
180
181 bool shouldStartThread() override
182 {
183 if (forIteration)
184 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
185 else // whileIteration
186 return (iteratorThreads.loadRelaxed() == 0);
187 }
188
189 ThreadFunctionResult threadFunction() override
190 {
191 if (forIteration)
192 return this->forThreadFunction();
193 else // whileIteration
194 return this->whileThreadFunction();
195 }
196
197 ThreadFunctionResult forThreadFunction()
198 {
199 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
200 ResultReporter<T> resultReporter(this);
201
202 for(;;) {
203 if (this->isCanceled())
204 break;
205
206 const int currentBlockSize = blockSizeManager.blockSize();
207
208 if (currentIndex.loadRelaxed() >= iterationCount)
209 break;
210
211 // Atomically reserve a block of iterationCount for this thread.
212 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
213 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
214
215 if (beginIndex >= endIndex) {
216 // No more work
217 break;
218 }
219
220 this->waitForResume(); // (only waits if the qfuture is paused.)
221
222 if (shouldStartThread())
223 this->startThread();
224
225 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
226 resultReporter.reserveSpace(finalBlockSize);
227
228 // Call user code with the current iteration range.
229 blockSizeManager.timeBeforeUser();
230 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
231 blockSizeManager.timeAfterUser();
232
233 if (resultsAvailable)
234 resultReporter.reportResults(beginIndex);
235
236 // Report progress if progress reporting enabled.
237 if (progressReportingEnabled) {
238 completed.fetchAndAddAcquire(finalBlockSize);
239 this->setProgressValue(this->completed.loadRelaxed());
240 }
241
242 if (this->shouldThrottleThread())
243 return ThrottleThread;
244 }
245 return ThreadFinished;
246 }
247
248 ThreadFunctionResult whileThreadFunction()
249 {
250 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
251 return ThreadFinished;
252
253 ResultReporter<T> resultReporter(this);
254 resultReporter.reserveSpace(1);
255
256 while (current != end) {
257 // The following two lines breaks support for input iterators according to
258 // the sgi docs: dereferencing prev after calling ++current is not allowed
259 // on input iterators. (prev is dereferenced inside user.runIteration())
260 Iterator prev = current;
261 ++current;
262 int index = currentIndex.fetchAndAddRelaxed(1);
263 iteratorThreads.testAndSetRelease(1, 0);
264
265 this->waitForResume(); // (only waits if the qfuture is paused.)
266
267 if (shouldStartThread())
268 this->startThread();
269
270 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
271 if (resultAavailable)
272 resultReporter.reportResults(index);
273
274 if (this->shouldThrottleThread())
275 return ThrottleThread;
276
277 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
278 return ThreadFinished;
279 }
280
281 return ThreadFinished;
282 }
283
284
285public:
286 const Iterator begin;
287 const Iterator end;
288 Iterator current;
289 QAtomicInt currentIndex;
290 const bool forIteration;
291 QAtomicInt iteratorThreads;
292 const int iterationCount;
293
294 bool progressReportingEnabled;
295 QAtomicInt completed;
296};
297
298} // namespace QtConcurrent
299
300
301QT_END_NAMESPACE
302
303#endif // QT_NO_CONCURRENT
304
305#endif
306