1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | |
18 | #include "tbb/parallel_reduce.h" |
19 | #include "tbb/atomic.h" |
20 | #include "harness_assert.h" |
21 | |
22 | using namespace std; |
23 | |
24 | static tbb::atomic<long> ForkCount; |
25 | static tbb::atomic<long> FooBodyCount; |
26 | |
27 | //! Class with public interface that is exactly minimal requirements for Range concept |
28 | class MinimalRange { |
29 | size_t begin, end; |
30 | friend class FooBody; |
31 | explicit MinimalRange( size_t i ) : begin(0), end(i) {} |
32 | friend void Flog( int nthread, bool inteference ); |
33 | public: |
34 | MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) { |
35 | begin = r.end = (r.begin+r.end)/2; |
36 | } |
37 | bool is_divisible() const {return end-begin>=2;} |
38 | bool empty() const {return begin==end;} |
39 | }; |
40 | |
41 | //! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce |
42 | class FooBody { |
43 | private: |
44 | FooBody( const FooBody& ); // Deny access |
45 | void operator=( const FooBody& ); // Deny access |
46 | friend void Flog( int nthread, bool interference ); |
47 | //! Parent that created this body via split operation. NULL if original body. |
48 | FooBody* parent; |
49 | //! Total number of index values processed by body and its children. |
50 | size_t sum; |
51 | //! Number of join operations done so far on this body and its children. |
52 | long join_count; |
53 | //! Range that has been processed so far by this body and its children. |
54 | size_t begin, end; |
55 | //! True if body has not yet been processed at least once by operator(). |
56 | bool is_new; |
57 | //! 1 if body was created by split; 0 if original body. |
58 | int forked; |
59 | FooBody() {++FooBodyCount;} |
60 | public: |
61 | ~FooBody() { |
62 | forked = 0xDEADBEEF; |
63 | sum=0xDEADBEEF; |
64 | join_count=0xDEADBEEF; |
65 | --FooBodyCount; |
66 | } |
67 | FooBody( FooBody& other, tbb::split ) { |
68 | ++FooBodyCount; |
69 | ++ForkCount; |
70 | sum = 0; |
71 | parent = &other; |
72 | join_count = 0; |
73 | is_new = true; |
74 | forked = 1; |
75 | } |
76 | void join( FooBody& s ) { |
77 | ASSERT( s.forked==1, NULL ); |
78 | ASSERT( this!=&s, NULL ); |
79 | ASSERT( this==s.parent, NULL ); |
80 | ASSERT( end==s.begin, NULL ); |
81 | end = s.end; |
82 | sum += s.sum; |
83 | join_count += s.join_count + 1; |
84 | s.forked = 2; |
85 | } |
86 | void operator()( const MinimalRange& r ) { |
87 | for( size_t k=r.begin; k<r.end; ++k ) |
88 | ++sum; |
89 | if( is_new ) { |
90 | is_new = false; |
91 | begin = r.begin; |
92 | } else |
93 | ASSERT( end==r.begin, NULL ); |
94 | end = r.end; |
95 | } |
96 | }; |
97 | |
98 | #include <cstdio> |
99 | #include "harness.h" |
100 | #include "tbb/tick_count.h" |
101 | |
102 | void Flog( int nthread, bool interference=false ) { |
103 | for (int mode = 0; mode < 4; mode++) { |
104 | tbb::tick_count T0 = tbb::tick_count::now(); |
105 | long join_count = 0; |
106 | tbb::affinity_partitioner ap; |
107 | for( size_t i=0; i<=1000; ++i ) { |
108 | FooBody f; |
109 | f.sum = 0; |
110 | f.parent = NULL; |
111 | f.join_count = 0; |
112 | f.is_new = true; |
113 | f.forked = 0; |
114 | f.begin = ~size_t(0); |
115 | f.end = ~size_t(0); |
116 | ASSERT( FooBodyCount==1, NULL ); |
117 | switch (mode) { |
118 | case 0: |
119 | tbb::parallel_reduce( MinimalRange(i), f ); |
120 | break; |
121 | case 1: |
122 | tbb::parallel_reduce( MinimalRange(i), f, tbb::simple_partitioner() ); |
123 | break; |
124 | case 2: |
125 | tbb::parallel_reduce( MinimalRange(i), f, tbb::auto_partitioner() ); |
126 | break; |
127 | case 3: |
128 | tbb::parallel_reduce( MinimalRange(i), f, ap ); |
129 | break; |
130 | } |
131 | join_count += f.join_count; |
132 | ASSERT( FooBodyCount==1, NULL ); |
133 | ASSERT( f.sum==i, NULL ); |
134 | ASSERT( f.begin==(i==0 ? ~size_t(0) : 0), NULL ); |
135 | ASSERT( f.end==(i==0 ? ~size_t(0) : i), NULL ); |
136 | } |
137 | tbb::tick_count T1 = tbb::tick_count::now(); |
138 | REMARK("time=%g join_count=%ld ForkCount=%ld nthread=%d%s\n" , |
139 | (T1-T0).seconds(),join_count,long(ForkCount), nthread, interference ? " with interference" :"" ); |
140 | } |
141 | } |
142 | |
143 | #include "tbb/blocked_range.h" |
144 | |
145 | #if _MSC_VER |
146 | typedef tbb::internal::uint64_t ValueType; |
147 | #else |
148 | typedef uint64_t ValueType; |
149 | #endif |
150 | |
151 | struct Sum { |
152 | template<typename T> |
153 | T operator() ( const T& v1, const T& v2 ) const { |
154 | return v1 + v2; |
155 | } |
156 | }; |
157 | |
158 | struct Accumulator { |
159 | ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const { |
160 | for ( ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
161 | value += *pv; |
162 | return value; |
163 | } |
164 | }; |
165 | |
166 | class ParallelSumTester: public NoAssign { |
167 | public: |
168 | ParallelSumTester() : m_range(NULL, NULL) { |
169 | m_array = new ValueType[unsigned(N)]; |
170 | for ( ValueType i = 0; i < N; ++i ) |
171 | m_array[i] = i + 1; |
172 | m_range = tbb::blocked_range<ValueType*>( m_array, m_array + N ); |
173 | } |
174 | ~ParallelSumTester() { delete[] m_array; } |
175 | template<typename Partitioner> |
176 | void CheckParallelReduce() { |
177 | Partitioner partitioner; |
178 | ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum(), partitioner ); |
179 | ASSERT( r1 == R, NULL ); |
180 | #if __TBB_CPP11_LAMBDAS_PRESENT |
181 | ValueType r2 = tbb::parallel_reduce( |
182 | m_range, I, |
183 | [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType { |
184 | for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
185 | value += *pv; |
186 | return value; |
187 | }, |
188 | Sum(), |
189 | partitioner |
190 | ); |
191 | ASSERT( r2 == R, NULL ); |
192 | #endif /* LAMBDAS */ |
193 | } |
194 | void CheckParallelReduceDefault() { |
195 | ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum() ); |
196 | ASSERT( r1 == R, NULL ); |
197 | #if __TBB_CPP11_LAMBDAS_PRESENT |
198 | ValueType r2 = tbb::parallel_reduce( |
199 | m_range, I, |
200 | [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType { |
201 | for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
202 | value += *pv; |
203 | return value; |
204 | }, |
205 | Sum() |
206 | ); |
207 | ASSERT( r2 == R, NULL ); |
208 | #endif /* LAMBDAS */ |
209 | } |
210 | private: |
211 | ValueType* m_array; |
212 | tbb::blocked_range<ValueType*> m_range; |
213 | static const ValueType I, N, R; |
214 | }; |
215 | |
216 | const ValueType ParallelSumTester::I = 0; |
217 | const ValueType ParallelSumTester::N = 1000000; |
218 | const ValueType ParallelSumTester::R = N * (N + 1) / 2; |
219 | |
220 | void ParallelSum () { |
221 | ParallelSumTester pst; |
222 | pst.CheckParallelReduceDefault(); |
223 | pst.CheckParallelReduce<tbb::simple_partitioner>(); |
224 | pst.CheckParallelReduce<tbb::auto_partitioner>(); |
225 | pst.CheckParallelReduce<tbb::affinity_partitioner>(); |
226 | pst.CheckParallelReduce<tbb::static_partitioner>(); |
227 | } |
228 | |
229 | #include "harness_concurrency_tracker.h" |
230 | |
231 | class RotOp { |
232 | public: |
233 | typedef int Type; |
234 | int operator() ( int x, int i ) const { |
235 | return ( x<<1 ) ^ i; |
236 | } |
237 | int join( int x, int y ) const { |
238 | return operator()( x, y ); |
239 | } |
240 | }; |
241 | |
242 | template <class Op> |
243 | struct ReduceBody { |
244 | typedef typename Op::Type result_type; |
245 | result_type my_value; |
246 | |
247 | ReduceBody() : my_value() {} |
248 | ReduceBody( ReduceBody &, tbb::split ) : my_value() {} |
249 | |
250 | void operator() ( const tbb::blocked_range<int>& r ) { |
251 | Harness::ConcurrencyTracker ct; |
252 | for ( int i = r.begin(); i != r.end(); ++i ) { |
253 | Op op; |
254 | my_value = op(my_value, i); |
255 | } |
256 | } |
257 | |
258 | void join( const ReduceBody& y ) { |
259 | Op op; |
260 | my_value = op.join(my_value, y.my_value); |
261 | } |
262 | }; |
263 | |
264 | //! Type-tag for automatic testing algorithm deduction |
265 | struct harness_default_partitioner {}; |
266 | |
267 | template<typename Body, typename Partitioner> |
268 | struct parallel_deterministic_reduce_invoker { |
269 | template<typename Range> |
270 | static typename Body::result_type run( const Range& range ) { |
271 | Body body; |
272 | tbb::parallel_deterministic_reduce(range, body, Partitioner()); |
273 | return body.my_value; |
274 | } |
275 | }; |
276 | |
277 | template<typename Body> |
278 | struct parallel_deterministic_reduce_invoker<Body, harness_default_partitioner> { |
279 | template<typename Range> |
280 | static typename Body::result_type run( const Range& range ) { |
281 | Body body; |
282 | tbb::parallel_deterministic_reduce(range, body); |
283 | return body.my_value; |
284 | } |
285 | }; |
286 | |
287 | template<typename ResultType, typename Partitioner> |
288 | struct parallel_deterministic_reduce_lambda_invoker { |
289 | template<typename Range, typename Func, typename Reduction> |
290 | static ResultType run( const Range& range, Func f, Reduction r ) { |
291 | return tbb::parallel_deterministic_reduce(range, ResultType(), f, r, Partitioner()); |
292 | } |
293 | }; |
294 | |
295 | template<typename ResultType> |
296 | struct parallel_deterministic_reduce_lambda_invoker<ResultType, harness_default_partitioner> { |
297 | template<typename Range, typename Func, typename Reduction> |
298 | static ResultType run(const Range& range, Func f, Reduction r) { |
299 | return tbb::parallel_deterministic_reduce(range, ResultType(), f, r); |
300 | } |
301 | }; |
302 | |
303 | //! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners |
304 | namespace unsupported { |
305 | |
306 | template<typename Range, typename Body> |
307 | void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { } |
308 | |
309 | template<typename Range, typename Body> |
310 | void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { } |
311 | |
312 | template<typename Range, typename Value, typename RealBody, typename Reduction> |
313 | Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) { |
314 | return identity; |
315 | } |
316 | |
317 | template<typename Range, typename Value, typename RealBody, typename Reduction> |
318 | Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) { |
319 | return identity; |
320 | } |
321 | |
322 | } |
323 | |
324 | struct Body { |
325 | float value; |
326 | Body() : value(0) {} |
327 | Body(Body&, tbb::split) { value = 0; } |
328 | void operator()(const tbb::blocked_range<int>&) {} |
329 | void join(Body&) {} |
330 | }; |
331 | |
332 | //! Check that other types of partitioners are not supported (auto, affinity) |
333 | //! In the case of "unsupported" API unexpectedly sneaking into namespace tbb, |
334 | //! this test should result in a compilation error due to overload resolution ambiguity |
335 | static void TestUnsupportedPartitioners() { |
336 | using namespace tbb; |
337 | using namespace unsupported; |
338 | Body body; |
339 | parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner()); |
340 | |
341 | tbb::affinity_partitioner ap; |
342 | parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap); |
343 | |
344 | #if __TBB_CPP11_LAMBDAS_PRESENT |
345 | parallel_deterministic_reduce( |
346 | blocked_range<int>(0, 10), |
347 | 0, |
348 | [](const blocked_range<int>&, int init)->int { |
349 | return init; |
350 | }, |
351 | [](int x, int y)->int { |
352 | return x + y; |
353 | }, |
354 | tbb::auto_partitioner() |
355 | ); |
356 | parallel_deterministic_reduce( |
357 | blocked_range<int>(0, 10), |
358 | 0, |
359 | [](const blocked_range<int>&, int init)->int { |
360 | return init; |
361 | }, |
362 | [](int x, int y)->int { |
363 | return x + y; |
364 | }, |
365 | ap |
366 | ); |
367 | #endif /* LAMBDAS */ |
368 | } |
369 | |
370 | template <class Partitioner> |
371 | void TestDeterministicReductionFor() { |
372 | const int N = 1000; |
373 | const tbb::blocked_range<int> range(0, N); |
374 | typedef ReduceBody<RotOp> BodyType; |
375 | BodyType::result_type R1 = |
376 | parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range); |
377 | for ( int i=0; i<100; ++i ) { |
378 | BodyType::result_type R2 = |
379 | parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range); |
380 | ASSERT( R1 == R2, "parallel_deterministic_reduce behaves differently from run to run" ); |
381 | #if __TBB_CPP11_LAMBDAS_PRESENT |
382 | typedef RotOp::Type Type; |
383 | Type R3 = parallel_deterministic_reduce_lambda_invoker<Type, Partitioner>::run( |
384 | range, |
385 | [](const tbb::blocked_range<int>& br, Type value) -> Type { |
386 | Harness::ConcurrencyTracker ct; |
387 | for ( int ii = br.begin(); ii != br.end(); ++ii ) { |
388 | RotOp op; |
389 | value = op(value, ii); |
390 | } |
391 | return value; |
392 | }, |
393 | [](const Type& v1, const Type& v2) -> Type { |
394 | RotOp op; |
395 | return op.join(v1,v2); |
396 | } |
397 | ); |
398 | ASSERT( R1 == R3, "lambda-based parallel_deterministic_reduce behaves differently from run to run" ); |
399 | #endif /* LAMBDAS */ |
400 | } |
401 | } |
402 | |
403 | void TestDeterministicReduction () { |
404 | TestDeterministicReductionFor<tbb::simple_partitioner>(); |
405 | TestDeterministicReductionFor<tbb::static_partitioner>(); |
406 | TestDeterministicReductionFor<harness_default_partitioner>(); |
407 | ASSERT_WARNING((Harness::ConcurrencyTracker::PeakParallelism() > 1), "no parallel execution\n" ); |
408 | } |
409 | |
410 | #include "tbb/task_scheduler_init.h" |
411 | #include "harness_cpu.h" |
412 | #include "test_partitioner.h" |
413 | |
414 | namespace interaction_with_range_and_partitioner { |
415 | |
416 | // Test checks compatibility of parallel_reduce algorithm with various range implementations |
417 | |
418 | void test() { |
419 | using namespace test_partitioner_utils::interaction_with_range_and_partitioner; |
420 | |
421 | test_partitioner_utils::SimpleReduceBody body; |
422 | tbb::affinity_partitioner ap; |
423 | |
424 | parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), body, ap); |
425 | parallel_reduce(Range2(true, false), body, ap); |
426 | parallel_reduce(Range3(true, false), body, ap); |
427 | parallel_reduce(Range4(false, true), body, ap); |
428 | parallel_reduce(Range5(false, true), body, ap); |
429 | parallel_reduce(Range6(false, true), body, ap); |
430 | |
431 | parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), |
432 | body, tbb::static_partitioner()); |
433 | parallel_reduce(Range2(true, false), body, tbb::static_partitioner()); |
434 | parallel_reduce(Range3(true, false), body, tbb::static_partitioner()); |
435 | parallel_reduce(Range4(false, true), body, tbb::static_partitioner()); |
436 | parallel_reduce(Range5(false, true), body, tbb::static_partitioner()); |
437 | parallel_reduce(Range6(false, true), body, tbb::static_partitioner()); |
438 | |
439 | parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true), |
440 | body, tbb::simple_partitioner()); |
441 | parallel_reduce(Range2(false, true), body, tbb::simple_partitioner()); |
442 | parallel_reduce(Range3(false, true), body, tbb::simple_partitioner()); |
443 | parallel_reduce(Range4(false, true), body, tbb::simple_partitioner()); |
444 | parallel_reduce(Range5(false, true), body, tbb::simple_partitioner()); |
445 | parallel_reduce(Range6(false, true), body, tbb::simple_partitioner()); |
446 | |
447 | parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true), |
448 | body, tbb::auto_partitioner()); |
449 | parallel_reduce(Range2(false, true), body, tbb::auto_partitioner()); |
450 | parallel_reduce(Range3(false, true), body, tbb::auto_partitioner()); |
451 | parallel_reduce(Range4(false, true), body, tbb::auto_partitioner()); |
452 | parallel_reduce(Range5(false, true), body, tbb::auto_partitioner()); |
453 | parallel_reduce(Range6(false, true), body, tbb::auto_partitioner()); |
454 | |
455 | parallel_deterministic_reduce(Range1(/*assert_in_split*/true, /*assert_in_proportional_split*/ false), |
456 | body, tbb::static_partitioner()); |
457 | parallel_deterministic_reduce(Range2(true, false), body, tbb::static_partitioner()); |
458 | parallel_deterministic_reduce(Range3(true, false), body, tbb::static_partitioner()); |
459 | parallel_deterministic_reduce(Range4(false, true), body, tbb::static_partitioner()); |
460 | parallel_deterministic_reduce(Range5(false, true), body, tbb::static_partitioner()); |
461 | parallel_deterministic_reduce(Range6(false, true), body, tbb::static_partitioner()); |
462 | |
463 | parallel_deterministic_reduce(Range1(/*assert_in_split*/false, /*assert_in_proportional_split*/ true), |
464 | body, tbb::simple_partitioner()); |
465 | parallel_deterministic_reduce(Range2(false, true), body, tbb::simple_partitioner()); |
466 | parallel_deterministic_reduce(Range3(false, true), body, tbb::simple_partitioner()); |
467 | parallel_deterministic_reduce(Range4(false, true), body, tbb::simple_partitioner()); |
468 | parallel_deterministic_reduce(Range5(false, true), body, tbb::simple_partitioner()); |
469 | parallel_deterministic_reduce(Range6(false, true), body, tbb::simple_partitioner()); |
470 | } |
471 | |
472 | } // interaction_with_range_and_partitioner |
473 | |
474 | int TestMain () { |
475 | TestUnsupportedPartitioners(); |
476 | if( MinThread<0 ) { |
477 | REPORT("Usage: nthread must be positive\n" ); |
478 | exit(1); |
479 | } |
480 | for( int p=MinThread; p<=MaxThread; ++p ) { |
481 | tbb::task_scheduler_init init( p ); |
482 | Flog(p); |
483 | ParallelSum(); |
484 | if ( p>=2 ) |
485 | TestDeterministicReduction(); |
486 | // Test that all workers sleep when no work |
487 | TestCPUUserTime(p); |
488 | } |
489 | interaction_with_range_and_partitioner::test(); |
490 | return Harness::Done; |
491 | } |
492 | |