| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #ifndef TBB_PREVIEW_AGGREGATOR |
| 18 | #define TBB_PREVIEW_AGGREGATOR 1 |
| 19 | #endif |
| 20 | |
| 21 | #include "tbb/aggregator.h" |
| 22 | #include "harness.h" |
| 23 | #include <queue> |
| 24 | |
| 25 | typedef std::priority_queue<int, std::vector<int>, std::less<int> > pq_t; |
| 26 | |
| 27 | int N; |
| 28 | int* shared_data; |
| 29 | |
| 30 | // Code for testing basic interface using function objects |
| 31 | class push_fnobj : NoAssign, Harness::NoAfterlife { |
| 32 | pq_t& pq; |
| 33 | int threadID; |
| 34 | public: |
| 35 | push_fnobj(pq_t& pq_, int tid) : pq(pq_), threadID(tid) {} |
| 36 | void operator()() const { |
| 37 | AssertLive(); |
| 38 | pq.push(threadID); |
| 39 | } |
| 40 | }; |
| 41 | |
| 42 | class pop_fnobj : NoAssign, Harness::NoAfterlife { |
| 43 | pq_t& pq; |
| 44 | public: |
| 45 | pop_fnobj(pq_t& pq_) : pq(pq_) {} |
| 46 | void operator()() const { |
| 47 | AssertLive(); |
| 48 | ASSERT(!pq.empty(), "queue should not be empty yet" ); |
| 49 | int elem = pq.top(); |
| 50 | pq.pop(); |
| 51 | shared_data[elem]++; |
| 52 | } |
| 53 | }; |
| 54 | |
| 55 | class BasicBody : NoAssign { |
| 56 | pq_t& pq; |
| 57 | tbb::aggregator& agg; |
| 58 | public: |
| 59 | BasicBody(pq_t& pq_, tbb::aggregator& agg_) : pq(pq_), agg(agg_) {} |
| 60 | void operator()(const int threadID) const { |
| 61 | for (int i=0; i<N; ++i) agg.execute( push_fnobj(pq, threadID) ); |
| 62 | for (int i=0; i<N; ++i) agg.execute( pop_fnobj(pq) ); |
| 63 | } |
| 64 | }; |
| 65 | |
| 66 | void TestBasicInterface(int nThreads) { |
| 67 | pq_t my_pq; |
| 68 | tbb::aggregator agg; |
| 69 | for (int i=0; i<MaxThread; ++i) shared_data[i] = 0; |
| 70 | REMARK("Testing aggregator basic interface.\n" ); |
| 71 | NativeParallelFor(nThreads, BasicBody(my_pq, agg)); |
| 72 | for (int i=0; i<nThreads; ++i) |
| 73 | ASSERT(shared_data[i] == N, "wrong number of elements pushed" ); |
| 74 | REMARK("Done testing aggregator basic interface.\n" ); |
| 75 | } |
| 76 | // End of code for testing basic interface using function objects |
| 77 | |
| 78 | |
| 79 | // Code for testing basic interface using lambda expressions |
| 80 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 81 | void TestBasicLambdaInterface(int nThreads) { |
| 82 | pq_t my_pq; |
| 83 | tbb::aggregator agg; |
| 84 | for (int i=0; i<MaxThread; ++i) shared_data[i] = 0; |
| 85 | REMARK("Testing aggregator basic lambda interface.\n" ); |
| 86 | NativeParallelFor(nThreads, [&agg, &my_pq](const int threadID) { |
| 87 | for (int i=0; i<N; ++i) |
| 88 | agg.execute( [&, threadID]() { my_pq.push(threadID); } ); |
| 89 | for (int i=0; i<N; ++i) { |
| 90 | agg.execute( [&]() { |
| 91 | ASSERT(!my_pq.empty(), "queue should not be empty yet" ); |
| 92 | int elem = my_pq.top(); |
| 93 | my_pq.pop(); |
| 94 | shared_data[elem]++; |
| 95 | } ); |
| 96 | } |
| 97 | } ); |
| 98 | for (int i=0; i<nThreads; ++i) |
| 99 | ASSERT(shared_data[i] == N, "wrong number of elements pushed" ); |
| 100 | REMARK("Done testing aggregator basic lambda interface.\n" ); |
| 101 | } |
| 102 | #endif /* __TBB_CPP11_LAMBDAS_PRESENT */ |
| 103 | // End of code for testing basic interface using lambda expressions |
| 104 | |
| 105 | // Code for testing expert interface |
| 106 | class op_data : public tbb::aggregator_operation, NoAssign { |
| 107 | public: |
| 108 | const int tid; |
| 109 | op_data(const int tid_=-1) : tbb::aggregator_operation(), tid(tid_) {} |
| 110 | }; |
| 111 | |
| 112 | class my_handler { |
| 113 | pq_t *pq; |
| 114 | public: |
| 115 | my_handler() {} |
| 116 | my_handler(pq_t *pq_) : pq(pq_) {} |
| 117 | void operator()(tbb::aggregator_operation* op_list) const { |
| 118 | while (op_list) { |
| 119 | op_data& request = static_cast<op_data&>(*op_list); |
| 120 | op_list = op_list->next(); |
| 121 | request.start(); |
| 122 | if (request.tid >= 0) pq->push(request.tid); |
| 123 | else { |
| 124 | ASSERT(!pq->empty(), "queue should not be empty!" ); |
| 125 | int elem = pq->top(); |
| 126 | pq->pop(); |
| 127 | shared_data[elem]++; |
| 128 | } |
| 129 | request.finish(); |
| 130 | } |
| 131 | } |
| 132 | }; |
| 133 | |
| 134 | class ExpertBody : NoAssign { |
| 135 | pq_t& pq; |
| 136 | tbb::aggregator_ext<my_handler>& agg; |
| 137 | public: |
| 138 | ExpertBody(pq_t& pq_, tbb::aggregator_ext<my_handler>& agg_) : pq(pq_), agg(agg_) {} |
| 139 | void operator()(const int threadID) const { |
| 140 | for (int i=0; i<N; ++i) { |
| 141 | op_data to_push(threadID); |
| 142 | agg.process( &to_push ); |
| 143 | } |
| 144 | for (int i=0; i<N; ++i) { |
| 145 | op_data to_pop; |
| 146 | agg.process( &to_pop ); |
| 147 | } |
| 148 | } |
| 149 | }; |
| 150 | |
| 151 | void TestExpertInterface(int nThreads) { |
| 152 | pq_t my_pq; |
| 153 | tbb::aggregator_ext<my_handler> agg((my_handler(&my_pq))); |
| 154 | for (int i=0; i<MaxThread; ++i) shared_data[i] = 0; |
| 155 | REMARK("Testing aggregator expert interface.\n" ); |
| 156 | NativeParallelFor(nThreads, ExpertBody(my_pq, agg)); |
| 157 | for (int i=0; i<nThreads; ++i) |
| 158 | ASSERT(shared_data[i] == N, "wrong number of elements pushed" ); |
| 159 | REMARK("Done testing aggregator expert interface.\n" ); |
| 160 | } |
| 161 | // End of code for testing expert interface |
| 162 | |
| 163 | int TestMain() { |
| 164 | if (MinThread < 1) |
| 165 | MinThread = 1; |
| 166 | shared_data = new int[MaxThread]; |
| 167 | for (int p = MinThread; p <= MaxThread; ++p) { |
| 168 | REMARK("Testing on %d threads.\n" , p); |
| 169 | N = 0; |
| 170 | while (N <= 100) { |
| 171 | REMARK("Testing with N=%d\n" , N); |
| 172 | TestBasicInterface(p); |
| 173 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 174 | TestBasicLambdaInterface(p); |
| 175 | #endif |
| 176 | TestExpertInterface(p); |
| 177 | N = N ? N*10 : 1; |
| 178 | } |
| 179 | } |
| 180 | return Harness::Done; |
| 181 | } |
| 182 | |