1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_REDUCEKERNEL_H
41#define QTCONCURRENT_REDUCEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44#include <QtConcurrent/qtconcurrentfunctionwrappers.h>
45
46#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
47
48#include <QtCore/qatomic.h>
49#include <QtCore/qlist.h>
50#include <QtCore/qmap.h>
51#include <QtCore/qmutex.h>
52#include <QtCore/qthread.h>
53#include <QtCore/qthreadpool.h>
54
55#include <mutex>
56
57QT_BEGIN_NAMESPACE
58
59
60namespace QtConcurrent {
61
62/*
63 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
64 limit the reduce queue size for MapReduce. When the number of
65 reduce blocks in the queue exceeds ReduceQueueStartLimit,
66 MapReduce won't start any new threads, and when it exceeds
67 ReduceQueueThrottleLimit running threads will be stopped.
68*/
69#ifdef Q_CLANG_QDOC
70enum ReduceQueueLimits {
71 ReduceQueueStartLimit = 20,
72 ReduceQueueThrottleLimit = 30
73};
74#else
75enum {
76 ReduceQueueStartLimit = 20,
77 ReduceQueueThrottleLimit = 30
78};
79#endif
80
81// IntermediateResults holds a block of intermediate results from a
82// map or filter functor. The begin/end offsets indicates the origin
83// and range of the block.
84template <typename T>
85class IntermediateResults
86{
87public:
88 int begin, end;
89 QList<T> vector;
90};
91
92enum ReduceOption {
93 UnorderedReduce = 0x1,
94 OrderedReduce = 0x2,
95 SequentialReduce = 0x4
96 // ParallelReduce = 0x8
97};
98Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
99#ifndef Q_CLANG_QDOC
100Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
101#endif
102// supports both ordered and out-of-order reduction
103template <typename ReduceFunctor, typename ReduceResultType, typename T>
104class ReduceKernel
105{
106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
107
108 const ReduceOptions reduceOptions;
109
110 QMutex mutex;
111 int progress, resultsMapSize;
112 const int threadCount;
113 ResultsMap resultsMap;
114
115 bool canReduce(int begin) const
116 {
117 return (((reduceOptions & UnorderedReduce)
118 && progress == 0)
119 || ((reduceOptions & OrderedReduce)
120 && progress == begin));
121 }
122
123 void reduceResult(ReduceFunctor &reduce,
124 ReduceResultType &r,
125 const IntermediateResults<T> &result)
126 {
127 for (int i = 0; i < result.vector.size(); ++i) {
128 std::invoke(reduce, r, result.vector.at(i));
129 }
130 }
131
132 void reduceResults(ReduceFunctor &reduce,
133 ReduceResultType &r,
134 ResultsMap &map)
135 {
136 typename ResultsMap::iterator it = map.begin();
137 while (it != map.end()) {
138 reduceResult(reduce, r, it.value());
139 ++it;
140 }
141 }
142
143public:
144 ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
145 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
146 threadCount(pool->maxThreadCount())
147 { }
148
149 void runReduce(ReduceFunctor &reduce,
150 ReduceResultType &r,
151 const IntermediateResults<T> &result)
152 {
153 std::unique_lock<QMutex> locker(mutex);
154 if (!canReduce(result.begin)) {
155 ++resultsMapSize;
156 resultsMap.insert(result.begin, result);
157 return;
158 }
159
160 if (reduceOptions & UnorderedReduce) {
161 // UnorderedReduce
162 progress = -1;
163
164 // reduce this result
165 locker.unlock();
166 reduceResult(reduce, r, result);
167 locker.lock();
168
169 // reduce all stored results as well
170 while (!resultsMap.isEmpty()) {
171 ResultsMap resultsMapCopy = resultsMap;
172 resultsMap.clear();
173
174 locker.unlock();
175 reduceResults(reduce, r, resultsMapCopy);
176 locker.lock();
177
178 resultsMapSize -= resultsMapCopy.size();
179 }
180
181 progress = 0;
182 } else {
183 // reduce this result
184 locker.unlock();
185 reduceResult(reduce, r, result);
186 locker.lock();
187
188 // OrderedReduce
189 progress += result.end - result.begin;
190
191 // reduce as many other results as possible
192 typename ResultsMap::iterator it = resultsMap.begin();
193 while (it != resultsMap.end()) {
194 if (it.value().begin != progress)
195 break;
196
197 locker.unlock();
198 reduceResult(reduce, r, it.value());
199 locker.lock();
200
201 --resultsMapSize;
202 progress += it.value().end - it.value().begin;
203 it = resultsMap.erase(it);
204 }
205 }
206 }
207
208 // final reduction
209 void finish(ReduceFunctor &reduce, ReduceResultType &r)
210 {
211 reduceResults(reduce, r, resultsMap);
212 }
213
214 inline bool shouldThrottle()
215 {
216 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
217 }
218
219 inline bool shouldStartThread()
220 {
221 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
222 }
223};
224
225template <typename Sequence, typename Base, typename Functor1, typename Functor2>
226struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
227{
228 SequenceHolder2(QThreadPool *pool, const Sequence &_sequence, Functor1 functor1,
229 Functor2 functor2, ReduceOptions reduceOptions)
230 : QtPrivate::SequenceHolder<Sequence>(_sequence),
231 Base(pool, this->sequence.cbegin(), this->sequence.cend(), functor1, functor2,
232 reduceOptions)
233 { }
234
235 SequenceHolder2(QThreadPool *pool, Sequence &&_sequence, Functor1 functor1, Functor2 functor2,
236 ReduceOptions reduceOptions)
237 : QtPrivate::SequenceHolder<Sequence>(std::move(_sequence)),
238 Base(pool, this->sequence.cbegin(), this->sequence.cend(), functor1, functor2,
239 reduceOptions)
240 { }
241
242 template<typename InitialValueType>
243 SequenceHolder2(QThreadPool *pool, const Sequence &_sequence, Functor1 functor1,
244 Functor2 functor2, InitialValueType &&initialValue, ReduceOptions reduceOptions)
245 : QtPrivate::SequenceHolder<Sequence>(_sequence),
246 Base(pool, this->sequence.cbegin(), this->sequence.cend(), functor1, functor2,
247 std::forward<InitialValueType>(initialValue), reduceOptions)
248 { }
249
250 template<typename InitialValueType>
251 SequenceHolder2(QThreadPool *pool, Sequence &&_sequence, Functor1 functor1, Functor2 functor2,
252 InitialValueType &&initialValue, ReduceOptions reduceOptions)
253 : QtPrivate::SequenceHolder<Sequence>(std::move(_sequence)),
254 Base(pool, this->sequence.cbegin(), this->sequence.cend(), functor1, functor2,
255 std::forward<InitialValueType>(initialValue), reduceOptions)
256 { }
257
258 void finish() override
259 {
260 Base::finish();
261 // Clear the sequence to make sure all temporaries are destroyed
262 // before finished is signaled.
263 this->sequence = Sequence();
264 }
265};
266
267} // namespace QtConcurrent
268
269QT_END_NAMESPACE
270
271#endif // QT_NO_CONCURRENT
272
273#endif
274