1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef TBB_PREVIEW_AGGREGATOR
18 #define TBB_PREVIEW_AGGREGATOR 1
19#endif
20
21#include "tbb/aggregator.h"
22#include "harness.h"
23#include <queue>
24
25typedef std::priority_queue<int, std::vector<int>, std::less<int> > pq_t;
26
27int N;
28int* shared_data;
29
30// Code for testing basic interface using function objects
31class push_fnobj : NoAssign, Harness::NoAfterlife {
32 pq_t& pq;
33 int threadID;
34public:
35 push_fnobj(pq_t& pq_, int tid) : pq(pq_), threadID(tid) {}
36 void operator()() const {
37 AssertLive();
38 pq.push(threadID);
39 }
40};
41
42class pop_fnobj : NoAssign, Harness::NoAfterlife {
43 pq_t& pq;
44public:
45 pop_fnobj(pq_t& pq_) : pq(pq_) {}
46 void operator()() const {
47 AssertLive();
48 ASSERT(!pq.empty(), "queue should not be empty yet");
49 int elem = pq.top();
50 pq.pop();
51 shared_data[elem]++;
52 }
53};
54
55class BasicBody : NoAssign {
56 pq_t& pq;
57 tbb::aggregator& agg;
58public:
59 BasicBody(pq_t& pq_, tbb::aggregator& agg_) : pq(pq_), agg(agg_) {}
60 void operator()(const int threadID) const {
61 for (int i=0; i<N; ++i) agg.execute( push_fnobj(pq, threadID) );
62 for (int i=0; i<N; ++i) agg.execute( pop_fnobj(pq) );
63 }
64};
65
66void TestBasicInterface(int nThreads) {
67 pq_t my_pq;
68 tbb::aggregator agg;
69 for (int i=0; i<MaxThread; ++i) shared_data[i] = 0;
70 REMARK("Testing aggregator basic interface.\n");
71 NativeParallelFor(nThreads, BasicBody(my_pq, agg));
72 for (int i=0; i<nThreads; ++i)
73 ASSERT(shared_data[i] == N, "wrong number of elements pushed");
74 REMARK("Done testing aggregator basic interface.\n");
75}
76// End of code for testing basic interface using function objects
77
78
79// Code for testing basic interface using lambda expressions
80#if __TBB_CPP11_LAMBDAS_PRESENT
81void TestBasicLambdaInterface(int nThreads) {
82 pq_t my_pq;
83 tbb::aggregator agg;
84 for (int i=0; i<MaxThread; ++i) shared_data[i] = 0;
85 REMARK("Testing aggregator basic lambda interface.\n");
86 NativeParallelFor(nThreads, [&agg, &my_pq](const int threadID) {
87 for (int i=0; i<N; ++i)
88 agg.execute( [&, threadID]() { my_pq.push(threadID); } );
89 for (int i=0; i<N; ++i) {
90 agg.execute( [&]() {
91 ASSERT(!my_pq.empty(), "queue should not be empty yet");
92 int elem = my_pq.top();
93 my_pq.pop();
94 shared_data[elem]++;
95 } );
96 }
97 } );
98 for (int i=0; i<nThreads; ++i)
99 ASSERT(shared_data[i] == N, "wrong number of elements pushed");
100 REMARK("Done testing aggregator basic lambda interface.\n");
101}
102#endif /* __TBB_CPP11_LAMBDAS_PRESENT */
103// End of code for testing basic interface using lambda expressions
104
105// Code for testing expert interface
106class op_data : public tbb::aggregator_operation, NoAssign {
107public:
108 const int tid;
109 op_data(const int tid_=-1) : tbb::aggregator_operation(), tid(tid_) {}
110};
111
112class my_handler {
113 pq_t *pq;
114public:
115 my_handler() {}
116 my_handler(pq_t *pq_) : pq(pq_) {}
117 void operator()(tbb::aggregator_operation* op_list) const {
118 while (op_list) {
119 op_data& request = static_cast<op_data&>(*op_list);
120 op_list = op_list->next();
121 request.start();
122 if (request.tid >= 0) pq->push(request.tid);
123 else {
124 ASSERT(!pq->empty(), "queue should not be empty!");
125 int elem = pq->top();
126 pq->pop();
127 shared_data[elem]++;
128 }
129 request.finish();
130 }
131 }
132};
133
134class ExpertBody : NoAssign {
135 pq_t& pq;
136 tbb::aggregator_ext<my_handler>& agg;
137public:
138 ExpertBody(pq_t& pq_, tbb::aggregator_ext<my_handler>& agg_) : pq(pq_), agg(agg_) {}
139 void operator()(const int threadID) const {
140 for (int i=0; i<N; ++i) {
141 op_data to_push(threadID);
142 agg.process( &to_push );
143 }
144 for (int i=0; i<N; ++i) {
145 op_data to_pop;
146 agg.process( &to_pop );
147 }
148 }
149};
150
151void TestExpertInterface(int nThreads) {
152 pq_t my_pq;
153 tbb::aggregator_ext<my_handler> agg((my_handler(&my_pq)));
154 for (int i=0; i<MaxThread; ++i) shared_data[i] = 0;
155 REMARK("Testing aggregator expert interface.\n");
156 NativeParallelFor(nThreads, ExpertBody(my_pq, agg));
157 for (int i=0; i<nThreads; ++i)
158 ASSERT(shared_data[i] == N, "wrong number of elements pushed");
159 REMARK("Done testing aggregator expert interface.\n");
160}
161// End of code for testing expert interface
162
163int TestMain() {
164 if (MinThread < 1)
165 MinThread = 1;
166 shared_data = new int[MaxThread];
167 for (int p = MinThread; p <= MaxThread; ++p) {
168 REMARK("Testing on %d threads.\n", p);
169 N = 0;
170 while (N <= 100) {
171 REMARK("Testing with N=%d\n", N);
172 TestBasicInterface(p);
173#if __TBB_CPP11_LAMBDAS_PRESENT
174 TestBasicLambdaInterface(p);
175#endif
176 TestExpertInterface(p);
177 N = N ? N*10 : 1;
178 }
179 }
180 return Harness::Done;
181}
182