| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | |
| 18 | #include "tbb/parallel_reduce.h" |
| 19 | #include "tbb/atomic.h" |
| 20 | #include "harness_assert.h" |
| 21 | |
| 22 | using namespace std; |
| 23 | |
| 24 | static tbb::atomic<long> ForkCount; |
| 25 | static tbb::atomic<long> FooBodyCount; |
| 26 | |
| 27 | //! Class with public interface that is exactly minimal requirements for Range concept |
| 28 | class MinimalRange { |
| 29 | size_t begin, end; |
| 30 | friend class FooBody; |
| 31 | explicit MinimalRange( size_t i ) : begin(0), end(i) {} |
| 32 | friend void Flog( int nthread, bool inteference ); |
| 33 | public: |
| 34 | MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) { |
| 35 | begin = r.end = (r.begin+r.end)/2; |
| 36 | } |
| 37 | bool is_divisible() const {return end-begin>=2;} |
| 38 | bool empty() const {return begin==end;} |
| 39 | }; |
| 40 | |
| 41 | //! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce |
| 42 | class FooBody { |
| 43 | private: |
| 44 | FooBody( const FooBody& ); // Deny access |
| 45 | void operator=( const FooBody& ); // Deny access |
| 46 | friend void Flog( int nthread, bool interference ); |
| 47 | //! Parent that created this body via split operation. NULL if original body. |
| 48 | FooBody* parent; |
| 49 | //! Total number of index values processed by body and its children. |
| 50 | size_t sum; |
| 51 | //! Number of join operations done so far on this body and its children. |
| 52 | long join_count; |
| 53 | //! Range that has been processed so far by this body and its children. |
| 54 | size_t begin, end; |
| 55 | //! True if body has not yet been processed at least once by operator(). |
| 56 | bool is_new; |
| 57 | //! 1 if body was created by split; 0 if original body. |
| 58 | int forked; |
| 59 | FooBody() {++FooBodyCount;} |
| 60 | public: |
| 61 | ~FooBody() { |
| 62 | forked = 0xDEADBEEF; |
| 63 | sum=0xDEADBEEF; |
| 64 | join_count=0xDEADBEEF; |
| 65 | --FooBodyCount; |
| 66 | } |
| 67 | FooBody( FooBody& other, tbb::split ) { |
| 68 | ++FooBodyCount; |
| 69 | ++ForkCount; |
| 70 | sum = 0; |
| 71 | parent = &other; |
| 72 | join_count = 0; |
| 73 | is_new = true; |
| 74 | forked = 1; |
| 75 | } |
| 76 | void join( FooBody& s ) { |
| 77 | ASSERT( s.forked==1, NULL ); |
| 78 | ASSERT( this!=&s, NULL ); |
| 79 | ASSERT( this==s.parent, NULL ); |
| 80 | ASSERT( end==s.begin, NULL ); |
| 81 | end = s.end; |
| 82 | sum += s.sum; |
| 83 | join_count += s.join_count + 1; |
| 84 | s.forked = 2; |
| 85 | } |
| 86 | void operator()( const MinimalRange& r ) { |
| 87 | for( size_t k=r.begin; k<r.end; ++k ) |
| 88 | ++sum; |
| 89 | if( is_new ) { |
| 90 | is_new = false; |
| 91 | begin = r.begin; |
| 92 | } else |
| 93 | ASSERT( end==r.begin, NULL ); |
| 94 | end = r.end; |
| 95 | } |
| 96 | }; |
| 97 | |
| 98 | #include <cstdio> |
| 99 | #include "harness.h" |
| 100 | #include "tbb/tick_count.h" |
| 101 | |
| 102 | void Flog( int nthread, bool interference=false ) { |
| 103 | for (int mode = 0; mode < 4; mode++) { |
| 104 | tbb::tick_count T0 = tbb::tick_count::now(); |
| 105 | long join_count = 0; |
| 106 | tbb::affinity_partitioner ap; |
| 107 | for( size_t i=0; i<=1000; ++i ) { |
| 108 | FooBody f; |
| 109 | f.sum = 0; |
| 110 | f.parent = NULL; |
| 111 | f.join_count = 0; |
| 112 | f.is_new = true; |
| 113 | f.forked = 0; |
| 114 | f.begin = ~size_t(0); |
| 115 | f.end = ~size_t(0); |
| 116 | ASSERT( FooBodyCount==1, NULL ); |
| 117 | switch (mode) { |
| 118 | case 0: |
| 119 | tbb::parallel_reduce( MinimalRange(i), f ); |
| 120 | break; |
| 121 | case 1: |
| 122 | tbb::parallel_reduce( MinimalRange(i), f, tbb::simple_partitioner() ); |
| 123 | break; |
| 124 | case 2: |
| 125 | tbb::parallel_reduce( MinimalRange(i), f, tbb::auto_partitioner() ); |
| 126 | break; |
| 127 | case 3: |
| 128 | tbb::parallel_reduce( MinimalRange(i), f, ap ); |
| 129 | break; |
| 130 | } |
| 131 | join_count += f.join_count; |
| 132 | ASSERT( FooBodyCount==1, NULL ); |
| 133 | ASSERT( f.sum==i, NULL ); |
| 134 | ASSERT( f.begin==(i==0 ? ~size_t(0) : 0), NULL ); |
| 135 | ASSERT( f.end==(i==0 ? ~size_t(0) : i), NULL ); |
| 136 | } |
| 137 | tbb::tick_count T1 = tbb::tick_count::now(); |
| 138 | REMARK("time=%g join_count=%ld ForkCount=%ld nthread=%d%s\n" , |
| 139 | (T1-T0).seconds(),join_count,long(ForkCount), nthread, interference ? " with interference" :"" ); |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | #include "tbb/blocked_range.h" |
| 144 | |
| 145 | #if _MSC_VER |
| 146 | typedef tbb::internal::uint64_t ValueType; |
| 147 | #else |
| 148 | typedef uint64_t ValueType; |
| 149 | #endif |
| 150 | |
| 151 | struct Sum { |
| 152 | template<typename T> |
| 153 | T operator() ( const T& v1, const T& v2 ) const { |
| 154 | return v1 + v2; |
| 155 | } |
| 156 | }; |
| 157 | |
| 158 | struct Accumulator { |
| 159 | ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const { |
| 160 | for ( ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
| 161 | value += *pv; |
| 162 | return value; |
| 163 | } |
| 164 | }; |
| 165 | |
| 166 | class ParallelSumTester: public NoAssign { |
| 167 | public: |
| 168 | ParallelSumTester() : m_range(NULL, NULL) { |
| 169 | m_array = new ValueType[unsigned(N)]; |
| 170 | for ( ValueType i = 0; i < N; ++i ) |
| 171 | m_array[i] = i + 1; |
| 172 | m_range = tbb::blocked_range<ValueType*>( m_array, m_array + N ); |
| 173 | } |
| 174 | ~ParallelSumTester() { delete[] m_array; } |
| 175 | template<typename Partitioner> |
| 176 | void CheckParallelReduce() { |
| 177 | Partitioner partitioner; |
| 178 | ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum(), partitioner ); |
| 179 | ASSERT( r1 == R, NULL ); |
| 180 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 181 | ValueType r2 = tbb::parallel_reduce( |
| 182 | m_range, I, |
| 183 | [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType { |
| 184 | for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
| 185 | value += *pv; |
| 186 | return value; |
| 187 | }, |
| 188 | Sum(), |
| 189 | partitioner |
| 190 | ); |
| 191 | ASSERT( r2 == R, NULL ); |
| 192 | #endif /* LAMBDAS */ |
| 193 | } |
| 194 | void CheckParallelReduceDefault() { |
| 195 | ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum() ); |
| 196 | ASSERT( r1 == R, NULL ); |
| 197 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 198 | ValueType r2 = tbb::parallel_reduce( |
| 199 | m_range, I, |
| 200 | [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType { |
| 201 | for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv ) |
| 202 | value += *pv; |
| 203 | return value; |
| 204 | }, |
| 205 | Sum() |
| 206 | ); |
| 207 | ASSERT( r2 == R, NULL ); |
| 208 | #endif /* LAMBDAS */ |
| 209 | } |
| 210 | private: |
| 211 | ValueType* m_array; |
| 212 | tbb::blocked_range<ValueType*> m_range; |
| 213 | static const ValueType I, N, R; |
| 214 | }; |
| 215 | |
| 216 | const ValueType ParallelSumTester::I = 0; |
| 217 | const ValueType ParallelSumTester::N = 1000000; |
| 218 | const ValueType ParallelSumTester::R = N * (N + 1) / 2; |
| 219 | |
| 220 | void ParallelSum () { |
| 221 | ParallelSumTester pst; |
| 222 | pst.CheckParallelReduceDefault(); |
| 223 | pst.CheckParallelReduce<tbb::simple_partitioner>(); |
| 224 | pst.CheckParallelReduce<tbb::auto_partitioner>(); |
| 225 | pst.CheckParallelReduce<tbb::affinity_partitioner>(); |
| 226 | pst.CheckParallelReduce<tbb::static_partitioner>(); |
| 227 | } |
| 228 | |
| 229 | #include "harness_concurrency_tracker.h" |
| 230 | |
| 231 | class RotOp { |
| 232 | public: |
| 233 | typedef int Type; |
| 234 | int operator() ( int x, int i ) const { |
| 235 | return ( x<<1 ) ^ i; |
| 236 | } |
| 237 | int join( int x, int y ) const { |
| 238 | return operator()( x, y ); |
| 239 | } |
| 240 | }; |
| 241 | |
| 242 | template <class Op> |
| 243 | struct ReduceBody { |
| 244 | typedef typename Op::Type result_type; |
| 245 | result_type my_value; |
| 246 | |
| 247 | ReduceBody() : my_value() {} |
| 248 | ReduceBody( ReduceBody &, tbb::split ) : my_value() {} |
| 249 | |
| 250 | void operator() ( const tbb::blocked_range<int>& r ) { |
| 251 | Harness::ConcurrencyTracker ct; |
| 252 | for ( int i = r.begin(); i != r.end(); ++i ) { |
| 253 | Op op; |
| 254 | my_value = op(my_value, i); |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | void join( const ReduceBody& y ) { |
| 259 | Op op; |
| 260 | my_value = op.join(my_value, y.my_value); |
| 261 | } |
| 262 | }; |
| 263 | |
| 264 | //! Type-tag for automatic testing algorithm deduction |
| 265 | struct harness_default_partitioner {}; |
| 266 | |
| 267 | template<typename Body, typename Partitioner> |
| 268 | struct parallel_deterministic_reduce_invoker { |
| 269 | template<typename Range> |
| 270 | static typename Body::result_type run( const Range& range ) { |
| 271 | Body body; |
| 272 | tbb::parallel_deterministic_reduce(range, body, Partitioner()); |
| 273 | return body.my_value; |
| 274 | } |
| 275 | }; |
| 276 | |
| 277 | template<typename Body> |
| 278 | struct parallel_deterministic_reduce_invoker<Body, harness_default_partitioner> { |
| 279 | template<typename Range> |
| 280 | static typename Body::result_type run( const Range& range ) { |
| 281 | Body body; |
| 282 | tbb::parallel_deterministic_reduce(range, body); |
| 283 | return body.my_value; |
| 284 | } |
| 285 | }; |
| 286 | |
| 287 | template<typename ResultType, typename Partitioner> |
| 288 | struct parallel_deterministic_reduce_lambda_invoker { |
| 289 | template<typename Range, typename Func, typename Reduction> |
| 290 | static ResultType run( const Range& range, Func f, Reduction r ) { |
| 291 | return tbb::parallel_deterministic_reduce(range, ResultType(), f, r, Partitioner()); |
| 292 | } |
| 293 | }; |
| 294 | |
| 295 | template<typename ResultType> |
| 296 | struct parallel_deterministic_reduce_lambda_invoker<ResultType, harness_default_partitioner> { |
| 297 | template<typename Range, typename Func, typename Reduction> |
| 298 | static ResultType run(const Range& range, Func f, Reduction r) { |
| 299 | return tbb::parallel_deterministic_reduce(range, ResultType(), f, r); |
| 300 | } |
| 301 | }; |
| 302 | |
| 303 | //! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners |
| 304 | namespace unsupported { |
| 305 | |
| 306 | template<typename Range, typename Body> |
| 307 | void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { } |
| 308 | |
| 309 | template<typename Range, typename Body> |
| 310 | void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { } |
| 311 | |
| 312 | template<typename Range, typename Value, typename RealBody, typename Reduction> |
| 313 | Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) { |
| 314 | return identity; |
| 315 | } |
| 316 | |
| 317 | template<typename Range, typename Value, typename RealBody, typename Reduction> |
| 318 | Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) { |
| 319 | return identity; |
| 320 | } |
| 321 | |
| 322 | } |
| 323 | |
| 324 | struct Body { |
| 325 | float value; |
| 326 | Body() : value(0) {} |
| 327 | Body(Body&, tbb::split) { value = 0; } |
| 328 | void operator()(const tbb::blocked_range<int>&) {} |
| 329 | void join(Body&) {} |
| 330 | }; |
| 331 | |
| 332 | //! Check that other types of partitioners are not supported (auto, affinity) |
| 333 | //! In the case of "unsupported" API unexpectedly sneaking into namespace tbb, |
| 334 | //! this test should result in a compilation error due to overload resolution ambiguity |
| 335 | static void TestUnsupportedPartitioners() { |
| 336 | using namespace tbb; |
| 337 | using namespace unsupported; |
| 338 | Body body; |
| 339 | parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner()); |
| 340 | |
| 341 | tbb::affinity_partitioner ap; |
| 342 | parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap); |
| 343 | |
| 344 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 345 | parallel_deterministic_reduce( |
| 346 | blocked_range<int>(0, 10), |
| 347 | 0, |
| 348 | [](const blocked_range<int>&, int init)->int { |
| 349 | return init; |
| 350 | }, |
| 351 | [](int x, int y)->int { |
| 352 | return x + y; |
| 353 | }, |
| 354 | tbb::auto_partitioner() |
| 355 | ); |
| 356 | parallel_deterministic_reduce( |
| 357 | blocked_range<int>(0, 10), |
| 358 | 0, |
| 359 | [](const blocked_range<int>&, int init)->int { |
| 360 | return init; |
| 361 | }, |
| 362 | [](int x, int y)->int { |
| 363 | return x + y; |
| 364 | }, |
| 365 | ap |
| 366 | ); |
| 367 | #endif /* LAMBDAS */ |
| 368 | } |
| 369 | |
| 370 | template <class Partitioner> |
| 371 | void TestDeterministicReductionFor() { |
| 372 | const int N = 1000; |
| 373 | const tbb::blocked_range<int> range(0, N); |
| 374 | typedef ReduceBody<RotOp> BodyType; |
| 375 | BodyType::result_type R1 = |
| 376 | parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range); |
| 377 | for ( int i=0; i<100; ++i ) { |
| 378 | BodyType::result_type R2 = |
| 379 | parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range); |
| 380 | ASSERT( R1 == R2, "parallel_deterministic_reduce behaves differently from run to run" ); |
| 381 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 382 | typedef RotOp::Type Type; |
| 383 | Type R3 = parallel_deterministic_reduce_lambda_invoker<Type, Partitioner>::run( |
| 384 | range, |
| 385 | [](const tbb::blocked_range<int>& br, Type value) -> Type { |
| 386 | Harness::ConcurrencyTracker ct; |
| 387 | for ( int ii = br.begin(); ii != br.end(); ++ii ) { |
| 388 | RotOp op; |
| 389 | value = op(value, ii); |
| 390 | } |
| 391 | return value; |
| 392 | }, |
| 393 | [](const Type& v1, const Type& v2) -> Type { |
| 394 | RotOp op; |
| 395 | return op.join(v1,v2); |
| 396 | } |
| 397 | ); |
| 398 | ASSERT( R1 == R3, "lambda-based parallel_deterministic_reduce behaves differently from run to run" ); |
| 399 | #endif /* LAMBDAS */ |
| 400 | } |
| 401 | } |
| 402 | |
| 403 | void TestDeterministicReduction () { |
| 404 | TestDeterministicReductionFor<tbb::simple_partitioner>(); |
| 405 | TestDeterministicReductionFor<tbb::static_partitioner>(); |
| 406 | TestDeterministicReductionFor<harness_default_partitioner>(); |
| 407 | ASSERT_WARNING((Harness::ConcurrencyTracker::PeakParallelism() > 1), "no parallel execution\n" ); |
| 408 | } |
| 409 | |
| 410 | #include "tbb/task_scheduler_init.h" |
| 411 | #include "harness_cpu.h" |
| 412 | #include "test_partitioner.h" |
| 413 | |
| 414 | namespace interaction_with_range_and_partitioner { |
| 415 | |
| 416 | // Test checks compatibility of parallel_reduce algorithm with various range implementations |
| 417 | |
| 418 | void test() { |
| 419 | using namespace test_partitioner_utils::interaction_with_range_and_partitioner; |
| 420 | |
| 421 | test_partitioner_utils::SimpleReduceBody body; |
| 422 | tbb::affinity_partitioner ap; |
| 423 | |
| 424 | parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), body, ap); |
| 425 | parallel_reduce(Range2(true, false), body, ap); |
| 426 | parallel_reduce(Range3(true, false), body, ap); |
| 427 | parallel_reduce(Range4(false, true), body, ap); |
| 428 | parallel_reduce(Range5(false, true), body, ap); |
| 429 | parallel_reduce(Range6(false, true), body, ap); |
| 430 | |
| 431 | parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), |
| 432 | body, tbb::static_partitioner()); |
| 433 | parallel_reduce(Range2(true, false), body, tbb::static_partitioner()); |
| 434 | parallel_reduce(Range3(true, false), body, tbb::static_partitioner()); |
| 435 | parallel_reduce(Range4(false, true), body, tbb::static_partitioner()); |
| 436 | parallel_reduce(Range5(false, true), body, tbb::static_partitioner()); |
| 437 | parallel_reduce(Range6(false, true), body, tbb::static_partitioner()); |
| 438 | |
| 439 | parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true), |
| 440 | body, tbb::simple_partitioner()); |
| 441 | parallel_reduce(Range2(false, true), body, tbb::simple_partitioner()); |
| 442 | parallel_reduce(Range3(false, true), body, tbb::simple_partitioner()); |
| 443 | parallel_reduce(Range4(false, true), body, tbb::simple_partitioner()); |
| 444 | parallel_reduce(Range5(false, true), body, tbb::simple_partitioner()); |
| 445 | parallel_reduce(Range6(false, true), body, tbb::simple_partitioner()); |
| 446 | |
| 447 | parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true), |
| 448 | body, tbb::auto_partitioner()); |
| 449 | parallel_reduce(Range2(false, true), body, tbb::auto_partitioner()); |
| 450 | parallel_reduce(Range3(false, true), body, tbb::auto_partitioner()); |
| 451 | parallel_reduce(Range4(false, true), body, tbb::auto_partitioner()); |
| 452 | parallel_reduce(Range5(false, true), body, tbb::auto_partitioner()); |
| 453 | parallel_reduce(Range6(false, true), body, tbb::auto_partitioner()); |
| 454 | |
| 455 | parallel_deterministic_reduce(Range1(/*assert_in_split*/true, /*assert_in_proportional_split*/ false), |
| 456 | body, tbb::static_partitioner()); |
| 457 | parallel_deterministic_reduce(Range2(true, false), body, tbb::static_partitioner()); |
| 458 | parallel_deterministic_reduce(Range3(true, false), body, tbb::static_partitioner()); |
| 459 | parallel_deterministic_reduce(Range4(false, true), body, tbb::static_partitioner()); |
| 460 | parallel_deterministic_reduce(Range5(false, true), body, tbb::static_partitioner()); |
| 461 | parallel_deterministic_reduce(Range6(false, true), body, tbb::static_partitioner()); |
| 462 | |
| 463 | parallel_deterministic_reduce(Range1(/*assert_in_split*/false, /*assert_in_proportional_split*/ true), |
| 464 | body, tbb::simple_partitioner()); |
| 465 | parallel_deterministic_reduce(Range2(false, true), body, tbb::simple_partitioner()); |
| 466 | parallel_deterministic_reduce(Range3(false, true), body, tbb::simple_partitioner()); |
| 467 | parallel_deterministic_reduce(Range4(false, true), body, tbb::simple_partitioner()); |
| 468 | parallel_deterministic_reduce(Range5(false, true), body, tbb::simple_partitioner()); |
| 469 | parallel_deterministic_reduce(Range6(false, true), body, tbb::simple_partitioner()); |
| 470 | } |
| 471 | |
| 472 | } // interaction_with_range_and_partitioner |
| 473 | |
| 474 | int TestMain () { |
| 475 | TestUnsupportedPartitioners(); |
| 476 | if( MinThread<0 ) { |
| 477 | REPORT("Usage: nthread must be positive\n" ); |
| 478 | exit(1); |
| 479 | } |
| 480 | for( int p=MinThread; p<=MaxThread; ++p ) { |
| 481 | tbb::task_scheduler_init init( p ); |
| 482 | Flog(p); |
| 483 | ParallelSum(); |
| 484 | if ( p>=2 ) |
| 485 | TestDeterministicReduction(); |
| 486 | // Test that all workers sleep when no work |
| 487 | TestCPUUserTime(p); |
| 488 | } |
| 489 | interaction_with_range_and_partitioner::test(); |
| 490 | return Harness::Done; |
| 491 | } |
| 492 | |