1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_REDUCEKERNEL_H
41#define QTCONCURRENT_REDUCEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44
45#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46
47#include <QtCore/qatomic.h>
48#include <QtCore/qlist.h>
49#include <QtCore/qmap.h>
50#include <QtCore/qmutex.h>
51#include <QtCore/qthread.h>
52#include <QtCore/qthreadpool.h>
53#include <QtCore/qvector.h>
54
55#include <mutex>
56
57QT_BEGIN_NAMESPACE
58
59
60namespace QtConcurrent {
61
62/*
63 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
64 limit the reduce queue size for MapReduce. When the number of
65 reduce blocks in the queue exceeds ReduceQueueStartLimit,
66 MapReduce won't start any new threads, and when it exceeds
67 ReduceQueueThrottleLimit running threads will be stopped.
68*/
69#ifdef Q_CLANG_QDOC
70enum ReduceQueueLimits {
71 ReduceQueueStartLimit = 20,
72 ReduceQueueThrottleLimit = 30
73};
74#else
75enum {
76 ReduceQueueStartLimit = 20,
77 ReduceQueueThrottleLimit = 30
78};
79#endif
80
81// IntermediateResults holds a block of intermediate results from a
82// map or filter functor. The begin/end offsets indicates the origin
83// and range of the block.
84template <typename T>
85class IntermediateResults
86{
87public:
88 int begin, end;
89 QVector<T> vector;
90};
91
92enum ReduceOption {
93 UnorderedReduce = 0x1,
94 OrderedReduce = 0x2,
95 SequentialReduce = 0x4
96 // ParallelReduce = 0x8
97};
98Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
99#ifndef Q_CLANG_QDOC
100Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
101#endif
102// supports both ordered and out-of-order reduction
103template <typename ReduceFunctor, typename ReduceResultType, typename T>
104class ReduceKernel
105{
106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
107
108 const ReduceOptions reduceOptions;
109
110 QMutex mutex;
111 int progress, resultsMapSize, threadCount;
112 ResultsMap resultsMap;
113
114 bool canReduce(int begin) const
115 {
116 return (((reduceOptions & UnorderedReduce)
117 && progress == 0)
118 || ((reduceOptions & OrderedReduce)
119 && progress == begin));
120 }
121
122 void reduceResult(ReduceFunctor &reduce,
123 ReduceResultType &r,
124 const IntermediateResults<T> &result)
125 {
126 for (int i = 0; i < result.vector.size(); ++i) {
127 reduce(r, result.vector.at(i));
128 }
129 }
130
131 void reduceResults(ReduceFunctor &reduce,
132 ReduceResultType &r,
133 ResultsMap &map)
134 {
135 typename ResultsMap::iterator it = map.begin();
136 while (it != map.end()) {
137 reduceResult(reduce, r, it.value());
138 ++it;
139 }
140 }
141
142public:
143 ReduceKernel(ReduceOptions _reduceOptions)
144 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
146 { }
147
148 void runReduce(ReduceFunctor &reduce,
149 ReduceResultType &r,
150 const IntermediateResults<T> &result)
151 {
152 std::unique_lock<QMutex> locker(mutex);
153 if (!canReduce(result.begin)) {
154 ++resultsMapSize;
155 resultsMap.insert(result.begin, result);
156 return;
157 }
158
159 if (reduceOptions & UnorderedReduce) {
160 // UnorderedReduce
161 progress = -1;
162
163 // reduce this result
164 locker.unlock();
165 reduceResult(reduce, r, result);
166 locker.lock();
167
168 // reduce all stored results as well
169 while (!resultsMap.isEmpty()) {
170 ResultsMap resultsMapCopy = resultsMap;
171 resultsMap.clear();
172
173 locker.unlock();
174 reduceResults(reduce, r, resultsMapCopy);
175 locker.lock();
176
177 resultsMapSize -= resultsMapCopy.size();
178 }
179
180 progress = 0;
181 } else {
182 // reduce this result
183 locker.unlock();
184 reduceResult(reduce, r, result);
185 locker.lock();
186
187 // OrderedReduce
188 progress += result.end - result.begin;
189
190 // reduce as many other results as possible
191 typename ResultsMap::iterator it = resultsMap.begin();
192 while (it != resultsMap.end()) {
193 if (it.value().begin != progress)
194 break;
195
196 locker.unlock();
197 reduceResult(reduce, r, it.value());
198 locker.lock();
199
200 --resultsMapSize;
201 progress += it.value().end - it.value().begin;
202 it = resultsMap.erase(it);
203 }
204 }
205 }
206
207 // final reduction
208 void finish(ReduceFunctor &reduce, ReduceResultType &r)
209 {
210 reduceResults(reduce, r, resultsMap);
211 }
212
213 inline bool shouldThrottle()
214 {
215 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
216 }
217
218 inline bool shouldStartThread()
219 {
220 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
221 }
222};
223
224template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225struct SequenceHolder2 : public Base
226{
227 SequenceHolder2(const Sequence &_sequence,
228 Functor1 functor1,
229 Functor2 functor2,
230 ReduceOptions reduceOptions)
231 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
232 sequence(_sequence)
233 { }
234
235 Sequence sequence;
236
237 void finish() override
238 {
239 Base::finish();
240 // Clear the sequence to make sure all temporaries are destroyed
241 // before finished is signaled.
242 sequence = Sequence();
243 }
244};
245
246} // namespace QtConcurrent
247
248QT_END_NAMESPACE
249
250#endif // QT_NO_CONCURRENT
251
252#endif
253