1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17
18#include "tbb/parallel_reduce.h"
19#include "tbb/atomic.h"
20#include "harness_assert.h"
21
22using namespace std;
23
24static tbb::atomic<long> ForkCount;
25static tbb::atomic<long> FooBodyCount;
26
27//! Class with public interface that is exactly minimal requirements for Range concept
28class MinimalRange {
29 size_t begin, end;
30 friend class FooBody;
31 explicit MinimalRange( size_t i ) : begin(0), end(i) {}
32 friend void Flog( int nthread, bool inteference );
33public:
34 MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) {
35 begin = r.end = (r.begin+r.end)/2;
36 }
37 bool is_divisible() const {return end-begin>=2;}
38 bool empty() const {return begin==end;}
39};
40
41//! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce
42class FooBody {
43private:
44 FooBody( const FooBody& ); // Deny access
45 void operator=( const FooBody& ); // Deny access
46 friend void Flog( int nthread, bool interference );
47 //! Parent that created this body via split operation. NULL if original body.
48 FooBody* parent;
49 //! Total number of index values processed by body and its children.
50 size_t sum;
51 //! Number of join operations done so far on this body and its children.
52 long join_count;
53 //! Range that has been processed so far by this body and its children.
54 size_t begin, end;
55 //! True if body has not yet been processed at least once by operator().
56 bool is_new;
57 //! 1 if body was created by split; 0 if original body.
58 int forked;
59 FooBody() {++FooBodyCount;}
60public:
61 ~FooBody() {
62 forked = 0xDEADBEEF;
63 sum=0xDEADBEEF;
64 join_count=0xDEADBEEF;
65 --FooBodyCount;
66 }
67 FooBody( FooBody& other, tbb::split ) {
68 ++FooBodyCount;
69 ++ForkCount;
70 sum = 0;
71 parent = &other;
72 join_count = 0;
73 is_new = true;
74 forked = 1;
75 }
76 void join( FooBody& s ) {
77 ASSERT( s.forked==1, NULL );
78 ASSERT( this!=&s, NULL );
79 ASSERT( this==s.parent, NULL );
80 ASSERT( end==s.begin, NULL );
81 end = s.end;
82 sum += s.sum;
83 join_count += s.join_count + 1;
84 s.forked = 2;
85 }
86 void operator()( const MinimalRange& r ) {
87 for( size_t k=r.begin; k<r.end; ++k )
88 ++sum;
89 if( is_new ) {
90 is_new = false;
91 begin = r.begin;
92 } else
93 ASSERT( end==r.begin, NULL );
94 end = r.end;
95 }
96};
97
98#include <cstdio>
99#include "harness.h"
100#include "tbb/tick_count.h"
101
102void Flog( int nthread, bool interference=false ) {
103 for (int mode = 0; mode < 4; mode++) {
104 tbb::tick_count T0 = tbb::tick_count::now();
105 long join_count = 0;
106 tbb::affinity_partitioner ap;
107 for( size_t i=0; i<=1000; ++i ) {
108 FooBody f;
109 f.sum = 0;
110 f.parent = NULL;
111 f.join_count = 0;
112 f.is_new = true;
113 f.forked = 0;
114 f.begin = ~size_t(0);
115 f.end = ~size_t(0);
116 ASSERT( FooBodyCount==1, NULL );
117 switch (mode) {
118 case 0:
119 tbb::parallel_reduce( MinimalRange(i), f );
120 break;
121 case 1:
122 tbb::parallel_reduce( MinimalRange(i), f, tbb::simple_partitioner() );
123 break;
124 case 2:
125 tbb::parallel_reduce( MinimalRange(i), f, tbb::auto_partitioner() );
126 break;
127 case 3:
128 tbb::parallel_reduce( MinimalRange(i), f, ap );
129 break;
130 }
131 join_count += f.join_count;
132 ASSERT( FooBodyCount==1, NULL );
133 ASSERT( f.sum==i, NULL );
134 ASSERT( f.begin==(i==0 ? ~size_t(0) : 0), NULL );
135 ASSERT( f.end==(i==0 ? ~size_t(0) : i), NULL );
136 }
137 tbb::tick_count T1 = tbb::tick_count::now();
138 REMARK("time=%g join_count=%ld ForkCount=%ld nthread=%d%s\n",
139 (T1-T0).seconds(),join_count,long(ForkCount), nthread, interference ? " with interference":"");
140 }
141}
142
143#include "tbb/blocked_range.h"
144
145#if _MSC_VER
146 typedef tbb::internal::uint64_t ValueType;
147#else
148 typedef uint64_t ValueType;
149#endif
150
151struct Sum {
152 template<typename T>
153 T operator() ( const T& v1, const T& v2 ) const {
154 return v1 + v2;
155 }
156};
157
158struct Accumulator {
159 ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const {
160 for ( ValueType* pv = r.begin(); pv != r.end(); ++pv )
161 value += *pv;
162 return value;
163 }
164};
165
166class ParallelSumTester: public NoAssign {
167public:
168 ParallelSumTester() : m_range(NULL, NULL) {
169 m_array = new ValueType[unsigned(N)];
170 for ( ValueType i = 0; i < N; ++i )
171 m_array[i] = i + 1;
172 m_range = tbb::blocked_range<ValueType*>( m_array, m_array + N );
173 }
174 ~ParallelSumTester() { delete[] m_array; }
175 template<typename Partitioner>
176 void CheckParallelReduce() {
177 Partitioner partitioner;
178 ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum(), partitioner );
179 ASSERT( r1 == R, NULL );
180#if __TBB_CPP11_LAMBDAS_PRESENT
181 ValueType r2 = tbb::parallel_reduce(
182 m_range, I,
183 [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
184 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv )
185 value += *pv;
186 return value;
187 },
188 Sum(),
189 partitioner
190 );
191 ASSERT( r2 == R, NULL );
192#endif /* LAMBDAS */
193 }
194 void CheckParallelReduceDefault() {
195 ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum() );
196 ASSERT( r1 == R, NULL );
197#if __TBB_CPP11_LAMBDAS_PRESENT
198 ValueType r2 = tbb::parallel_reduce(
199 m_range, I,
200 [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
201 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv )
202 value += *pv;
203 return value;
204 },
205 Sum()
206 );
207 ASSERT( r2 == R, NULL );
208#endif /* LAMBDAS */
209 }
210private:
211 ValueType* m_array;
212 tbb::blocked_range<ValueType*> m_range;
213 static const ValueType I, N, R;
214};
215
216const ValueType ParallelSumTester::I = 0;
217const ValueType ParallelSumTester::N = 1000000;
218const ValueType ParallelSumTester::R = N * (N + 1) / 2;
219
220void ParallelSum () {
221 ParallelSumTester pst;
222 pst.CheckParallelReduceDefault();
223 pst.CheckParallelReduce<tbb::simple_partitioner>();
224 pst.CheckParallelReduce<tbb::auto_partitioner>();
225 pst.CheckParallelReduce<tbb::affinity_partitioner>();
226 pst.CheckParallelReduce<tbb::static_partitioner>();
227}
228
229#include "harness_concurrency_tracker.h"
230
231class RotOp {
232public:
233 typedef int Type;
234 int operator() ( int x, int i ) const {
235 return ( x<<1 ) ^ i;
236 }
237 int join( int x, int y ) const {
238 return operator()( x, y );
239 }
240};
241
242template <class Op>
243struct ReduceBody {
244 typedef typename Op::Type result_type;
245 result_type my_value;
246
247 ReduceBody() : my_value() {}
248 ReduceBody( ReduceBody &, tbb::split ) : my_value() {}
249
250 void operator() ( const tbb::blocked_range<int>& r ) {
251 Harness::ConcurrencyTracker ct;
252 for ( int i = r.begin(); i != r.end(); ++i ) {
253 Op op;
254 my_value = op(my_value, i);
255 }
256 }
257
258 void join( const ReduceBody& y ) {
259 Op op;
260 my_value = op.join(my_value, y.my_value);
261 }
262};
263
264//! Type-tag for automatic testing algorithm deduction
265struct harness_default_partitioner {};
266
267template<typename Body, typename Partitioner>
268struct parallel_deterministic_reduce_invoker {
269 template<typename Range>
270 static typename Body::result_type run( const Range& range ) {
271 Body body;
272 tbb::parallel_deterministic_reduce(range, body, Partitioner());
273 return body.my_value;
274 }
275};
276
277template<typename Body>
278struct parallel_deterministic_reduce_invoker<Body, harness_default_partitioner> {
279 template<typename Range>
280 static typename Body::result_type run( const Range& range ) {
281 Body body;
282 tbb::parallel_deterministic_reduce(range, body);
283 return body.my_value;
284 }
285};
286
287template<typename ResultType, typename Partitioner>
288struct parallel_deterministic_reduce_lambda_invoker {
289 template<typename Range, typename Func, typename Reduction>
290 static ResultType run( const Range& range, Func f, Reduction r ) {
291 return tbb::parallel_deterministic_reduce(range, ResultType(), f, r, Partitioner());
292 }
293};
294
295template<typename ResultType>
296struct parallel_deterministic_reduce_lambda_invoker<ResultType, harness_default_partitioner> {
297 template<typename Range, typename Func, typename Reduction>
298 static ResultType run(const Range& range, Func f, Reduction r) {
299 return tbb::parallel_deterministic_reduce(range, ResultType(), f, r);
300 }
301};
302
303//! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners
304namespace unsupported {
305
306 template<typename Range, typename Body>
307 void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { }
308
309 template<typename Range, typename Body>
310 void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { }
311
312 template<typename Range, typename Value, typename RealBody, typename Reduction>
313 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) {
314 return identity;
315 }
316
317 template<typename Range, typename Value, typename RealBody, typename Reduction>
318 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) {
319 return identity;
320 }
321
322}
323
324struct Body {
325 float value;
326 Body() : value(0) {}
327 Body(Body&, tbb::split) { value = 0; }
328 void operator()(const tbb::blocked_range<int>&) {}
329 void join(Body&) {}
330};
331
332//! Check that other types of partitioners are not supported (auto, affinity)
333//! In the case of "unsupported" API unexpectedly sneaking into namespace tbb,
334//! this test should result in a compilation error due to overload resolution ambiguity
335static void TestUnsupportedPartitioners() {
336 using namespace tbb;
337 using namespace unsupported;
338 Body body;
339 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner());
340
341 tbb::affinity_partitioner ap;
342 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap);
343
344#if __TBB_CPP11_LAMBDAS_PRESENT
345 parallel_deterministic_reduce(
346 blocked_range<int>(0, 10),
347 0,
348 [](const blocked_range<int>&, int init)->int {
349 return init;
350 },
351 [](int x, int y)->int {
352 return x + y;
353 },
354 tbb::auto_partitioner()
355 );
356 parallel_deterministic_reduce(
357 blocked_range<int>(0, 10),
358 0,
359 [](const blocked_range<int>&, int init)->int {
360 return init;
361 },
362 [](int x, int y)->int {
363 return x + y;
364 },
365 ap
366 );
367#endif /* LAMBDAS */
368}
369
370template <class Partitioner>
371void TestDeterministicReductionFor() {
372 const int N = 1000;
373 const tbb::blocked_range<int> range(0, N);
374 typedef ReduceBody<RotOp> BodyType;
375 BodyType::result_type R1 =
376 parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range);
377 for ( int i=0; i<100; ++i ) {
378 BodyType::result_type R2 =
379 parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range);
380 ASSERT( R1 == R2, "parallel_deterministic_reduce behaves differently from run to run" );
381#if __TBB_CPP11_LAMBDAS_PRESENT
382 typedef RotOp::Type Type;
383 Type R3 = parallel_deterministic_reduce_lambda_invoker<Type, Partitioner>::run(
384 range,
385 [](const tbb::blocked_range<int>& br, Type value) -> Type {
386 Harness::ConcurrencyTracker ct;
387 for ( int ii = br.begin(); ii != br.end(); ++ii ) {
388 RotOp op;
389 value = op(value, ii);
390 }
391 return value;
392 },
393 [](const Type& v1, const Type& v2) -> Type {
394 RotOp op;
395 return op.join(v1,v2);
396 }
397 );
398 ASSERT( R1 == R3, "lambda-based parallel_deterministic_reduce behaves differently from run to run" );
399#endif /* LAMBDAS */
400 }
401}
402
403void TestDeterministicReduction () {
404 TestDeterministicReductionFor<tbb::simple_partitioner>();
405 TestDeterministicReductionFor<tbb::static_partitioner>();
406 TestDeterministicReductionFor<harness_default_partitioner>();
407 ASSERT_WARNING((Harness::ConcurrencyTracker::PeakParallelism() > 1), "no parallel execution\n");
408}
409
410#include "tbb/task_scheduler_init.h"
411#include "harness_cpu.h"
412#include "test_partitioner.h"
413
414namespace interaction_with_range_and_partitioner {
415
416// Test checks compatibility of parallel_reduce algorithm with various range implementations
417
418void test() {
419 using namespace test_partitioner_utils::interaction_with_range_and_partitioner;
420
421 test_partitioner_utils::SimpleReduceBody body;
422 tbb::affinity_partitioner ap;
423
424 parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), body, ap);
425 parallel_reduce(Range2(true, false), body, ap);
426 parallel_reduce(Range3(true, false), body, ap);
427 parallel_reduce(Range4(false, true), body, ap);
428 parallel_reduce(Range5(false, true), body, ap);
429 parallel_reduce(Range6(false, true), body, ap);
430
431 parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false),
432 body, tbb::static_partitioner());
433 parallel_reduce(Range2(true, false), body, tbb::static_partitioner());
434 parallel_reduce(Range3(true, false), body, tbb::static_partitioner());
435 parallel_reduce(Range4(false, true), body, tbb::static_partitioner());
436 parallel_reduce(Range5(false, true), body, tbb::static_partitioner());
437 parallel_reduce(Range6(false, true), body, tbb::static_partitioner());
438
439 parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true),
440 body, tbb::simple_partitioner());
441 parallel_reduce(Range2(false, true), body, tbb::simple_partitioner());
442 parallel_reduce(Range3(false, true), body, tbb::simple_partitioner());
443 parallel_reduce(Range4(false, true), body, tbb::simple_partitioner());
444 parallel_reduce(Range5(false, true), body, tbb::simple_partitioner());
445 parallel_reduce(Range6(false, true), body, tbb::simple_partitioner());
446
447 parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true),
448 body, tbb::auto_partitioner());
449 parallel_reduce(Range2(false, true), body, tbb::auto_partitioner());
450 parallel_reduce(Range3(false, true), body, tbb::auto_partitioner());
451 parallel_reduce(Range4(false, true), body, tbb::auto_partitioner());
452 parallel_reduce(Range5(false, true), body, tbb::auto_partitioner());
453 parallel_reduce(Range6(false, true), body, tbb::auto_partitioner());
454
455 parallel_deterministic_reduce(Range1(/*assert_in_split*/true, /*assert_in_proportional_split*/ false),
456 body, tbb::static_partitioner());
457 parallel_deterministic_reduce(Range2(true, false), body, tbb::static_partitioner());
458 parallel_deterministic_reduce(Range3(true, false), body, tbb::static_partitioner());
459 parallel_deterministic_reduce(Range4(false, true), body, tbb::static_partitioner());
460 parallel_deterministic_reduce(Range5(false, true), body, tbb::static_partitioner());
461 parallel_deterministic_reduce(Range6(false, true), body, tbb::static_partitioner());
462
463 parallel_deterministic_reduce(Range1(/*assert_in_split*/false, /*assert_in_proportional_split*/ true),
464 body, tbb::simple_partitioner());
465 parallel_deterministic_reduce(Range2(false, true), body, tbb::simple_partitioner());
466 parallel_deterministic_reduce(Range3(false, true), body, tbb::simple_partitioner());
467 parallel_deterministic_reduce(Range4(false, true), body, tbb::simple_partitioner());
468 parallel_deterministic_reduce(Range5(false, true), body, tbb::simple_partitioner());
469 parallel_deterministic_reduce(Range6(false, true), body, tbb::simple_partitioner());
470}
471
472} // interaction_with_range_and_partitioner
473
474int TestMain () {
475 TestUnsupportedPartitioners();
476 if( MinThread<0 ) {
477 REPORT("Usage: nthread must be positive\n");
478 exit(1);
479 }
480 for( int p=MinThread; p<=MaxThread; ++p ) {
481 tbb::task_scheduler_init init( p );
482 Flog(p);
483 ParallelSum();
484 if ( p>=2 )
485 TestDeterministicReduction();
486 // Test that all workers sleep when no work
487 TestCPUUserTime(p);
488 }
489 interaction_with_range_and_partitioner::test();
490 return Harness::Done;
491}
492