1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_ITERATEKERNEL_H
41#define QTCONCURRENT_ITERATEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44
45#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46
47#include <QtCore/qatomic.h>
48#include <QtConcurrent/qtconcurrentmedian.h>
49#include <QtConcurrent/qtconcurrentthreadengine.h>
50
51#include <iterator>
52
53QT_BEGIN_NAMESPACE
54
55
56
57namespace QtConcurrent {
58
59/*
60 The BlockSizeManager class manages how many iterations a thread should
61 reserve and process at a time. This is done by measuring the time spent
62 in the user code versus the control part code, and then increasing
63 the block size if the ratio between them is to small. The block size
64 management is done on the basis of the median of several timing measuremens,
65 and it is done induvidualy for each thread.
66*/
67class Q_CONCURRENT_EXPORT BlockSizeManager
68{
69public:
70 BlockSizeManager(int iterationCount);
71 void timeBeforeUser();
72 void timeAfterUser();
73 int blockSize();
74private:
75 inline bool blockSizeMaxed()
76 {
77 return (m_blockSize >= maxBlockSize);
78 }
79
80 const int maxBlockSize;
81 qint64 beforeUser;
82 qint64 afterUser;
83 Median<double> controlPartElapsed;
84 Median<double> userPartElapsed;
85 int m_blockSize;
86
87 Q_DISABLE_COPY(BlockSizeManager)
88};
89
90// ### Qt6: Replace BlockSizeManager with V2 implementation
91class Q_CONCURRENT_EXPORT BlockSizeManagerV2
92{
93public:
94 explicit BlockSizeManagerV2(int iterationCount);
95
96 void timeBeforeUser();
97 void timeAfterUser();
98 int blockSize();
99
100private:
101 inline bool blockSizeMaxed()
102 {
103 return (m_blockSize >= maxBlockSize);
104 }
105
106 const int maxBlockSize;
107 qint64 beforeUser;
108 qint64 afterUser;
109 MedianDouble controlPartElapsed;
110 MedianDouble userPartElapsed;
111 int m_blockSize;
112
113 Q_DISABLE_COPY(BlockSizeManagerV2)
114};
115
116template <typename T>
117class ResultReporter
118{
119public:
120 ResultReporter(ThreadEngine<T> *_threadEngine)
121 :threadEngine(_threadEngine)
122 {
123
124 }
125
126 void reserveSpace(int resultCount)
127 {
128 currentResultCount = resultCount;
129 vector.resize(qMax(resultCount, vector.count()));
130 }
131
132 void reportResults(int begin)
133 {
134 const int useVectorThreshold = 4; // Tunable parameter.
135 if (currentResultCount > useVectorThreshold) {
136 vector.resize(currentResultCount);
137 threadEngine->reportResults(vector, begin);
138 } else {
139 for (int i = 0; i < currentResultCount; ++i)
140 threadEngine->reportResult(&vector.at(i), begin + i);
141 }
142 }
143
144 inline T * getPointer()
145 {
146 return vector.data();
147 }
148
149 int currentResultCount;
150 ThreadEngine<T> *threadEngine;
151 QVector<T> vector;
152};
153
154template <>
155class ResultReporter<void>
156{
157public:
158 inline ResultReporter(ThreadEngine<void> *) { }
159 inline void reserveSpace(int) { }
160 inline void reportResults(int) { }
161 inline void * getPointer() { return nullptr; }
162};
163
164inline bool selectIteration(std::bidirectional_iterator_tag)
165{
166 return false; // while
167}
168
169inline bool selectIteration(std::forward_iterator_tag)
170{
171 return false; // while
172}
173
174inline bool selectIteration(std::random_access_iterator_tag)
175{
176 return true; // for
177}
178
179template <typename Iterator, typename T>
180class IterateKernel : public ThreadEngine<T>
181{
182public:
183 typedef T ResultType;
184
185 IterateKernel(Iterator _begin, Iterator _end)
186 : begin(_begin), end(_end), current(_begin), currentIndex(0),
187 forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
188 {
189 iterationCount = forIteration ? std::distance(_begin, _end) : 0;
190 }
191
192 virtual ~IterateKernel() { }
193
194 virtual bool runIteration(Iterator it, int index , T *result)
195 { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
196 virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
197 { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
198
199 void start() override
200 {
201 progressReportingEnabled = this->isProgressReportingEnabled();
202 if (progressReportingEnabled && iterationCount > 0)
203 this->setProgressRange(0, iterationCount);
204 }
205
206 bool shouldStartThread() override
207 {
208 if (forIteration)
209 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
210 else // whileIteration
211 return (iteratorThreads.loadRelaxed() == 0);
212 }
213
214 ThreadFunctionResult threadFunction() override
215 {
216 if (forIteration)
217 return this->forThreadFunction();
218 else // whileIteration
219 return this->whileThreadFunction();
220 }
221
222 ThreadFunctionResult forThreadFunction()
223 {
224 BlockSizeManagerV2 blockSizeManager(iterationCount);
225 ResultReporter<T> resultReporter(this);
226
227 for(;;) {
228 if (this->isCanceled())
229 break;
230
231 const int currentBlockSize = blockSizeManager.blockSize();
232
233 if (currentIndex.loadRelaxed() >= iterationCount)
234 break;
235
236 // Atomically reserve a block of iterationCount for this thread.
237 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
238 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
239
240 if (beginIndex >= endIndex) {
241 // No more work
242 break;
243 }
244
245 this->waitForResume(); // (only waits if the qfuture is paused.)
246
247 if (shouldStartThread())
248 this->startThread();
249
250 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
251 resultReporter.reserveSpace(finalBlockSize);
252
253 // Call user code with the current iteration range.
254 blockSizeManager.timeBeforeUser();
255 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
256 blockSizeManager.timeAfterUser();
257
258 if (resultsAvailable)
259 resultReporter.reportResults(beginIndex);
260
261 // Report progress if progress reporting enabled.
262 if (progressReportingEnabled) {
263 completed.fetchAndAddAcquire(finalBlockSize);
264 this->setProgressValue(this->completed.loadRelaxed());
265 }
266
267 if (this->shouldThrottleThread())
268 return ThrottleThread;
269 }
270 return ThreadFinished;
271 }
272
273 ThreadFunctionResult whileThreadFunction()
274 {
275 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
276 return ThreadFinished;
277
278 ResultReporter<T> resultReporter(this);
279 resultReporter.reserveSpace(1);
280
281 while (current != end) {
282 // The following two lines breaks support for input iterators according to
283 // the sgi docs: dereferencing prev after calling ++current is not allowed
284 // on input iterators. (prev is dereferenced inside user.runIteration())
285 Iterator prev = current;
286 ++current;
287 int index = currentIndex.fetchAndAddRelaxed(1);
288 iteratorThreads.testAndSetRelease(1, 0);
289
290 this->waitForResume(); // (only waits if the qfuture is paused.)
291
292 if (shouldStartThread())
293 this->startThread();
294
295 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
296 if (resultAavailable)
297 resultReporter.reportResults(index);
298
299 if (this->shouldThrottleThread())
300 return ThrottleThread;
301
302 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
303 return ThreadFinished;
304 }
305
306 return ThreadFinished;
307 }
308
309
310public:
311 const Iterator begin;
312 const Iterator end;
313 Iterator current;
314 QAtomicInt currentIndex;
315 bool forIteration;
316 QAtomicInt iteratorThreads;
317 int iterationCount;
318
319 bool progressReportingEnabled;
320 QAtomicInt completed;
321};
322
323} // namespace QtConcurrent
324
325
326QT_END_NAMESPACE
327
328#endif // QT_NO_CONCURRENT
329
330#endif
331