1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/parallel_scan.h"
18#include "tbb/blocked_range.h"
19#include "harness_assert.h"
20#include <vector>
21
22typedef tbb::blocked_range<long> Range;
23
24static volatile bool ScanIsRunning = false;
25
26//! Sum of 0..i with wrap around on overflow.
27inline int TriangularSum( int i ) {
28 return i&1 ? ((i>>1)+1)*i : (i>>1)*(i+1);
29}
30
31#include "harness.h"
32
33//! Verify that sum is init plus sum of integers in closed interval [0..finish_index].
34/** line should be the source line of the caller */
35void VerifySum( int init, long finish_index, int sum, int line ) {
36 int expected = init + TriangularSum(finish_index);
37 if (expected != sum) {
38 REPORT("line %d: sum[0..%ld] should be = %d, but was computed as %d\n",
39 line, finish_index, expected, sum);
40 abort();
41 }
42}
43
44const int MAXN = 2000;
45
46enum AddendFlag {
47 UNUSED=0,
48 USED_NONFINAL=1,
49 USED_FINAL=2
50};
51
52//! Array recording how each addend was used.
53/** 'unsigned char' instead of AddendFlag for sake of compactness. */
54static unsigned char AddendHistory[MAXN];
55
56//! Set to 1 for debugging output
57#define PRINT_DEBUG 0
58
59#include "tbb/atomic.h"
60#if PRINT_DEBUG
61#include <stdio.h>
62#include "harness_report.h"
63tbb::atomic<long> NextBodyId;
64#endif /* PRINT_DEBUG */
65
66struct BodyId {
67#if PRINT_DEBUG
68 const int id;
69 BodyId() : id(NextBodyId++) {}
70#endif /* PRINT_DEBUG */
71};
72
73tbb::atomic<long> NumberOfLiveStorage;
74
75static void Snooze( bool scan_should_be_running ) {
76 ASSERT( ScanIsRunning==scan_should_be_running, NULL );
77}
78
79template<typename T>
80struct Storage {
81 T my_total;
82 Range my_range;
83 Storage(T init) :
84 my_total(init), my_range(-1, -1, 1) {
85 ++NumberOfLiveStorage;
86 }
87 ~Storage() {
88 --NumberOfLiveStorage;
89 }
90 Storage(const Storage& strg) :
91 my_total(strg.my_total), my_range(strg.my_range) {
92 ++NumberOfLiveStorage;
93 }
94 Storage & operator=(const Storage& strg) {
95 my_total = strg.my_total;
96 my_range = strg.my_range;
97 return *this;
98 }
99};
100
101template<typename T>
102void JoinStorages(const Storage<T>& left, Storage<T>& right) {
103 Snooze(true);
104 ASSERT(ScanIsRunning, NULL);
105 ASSERT(left.my_range.end() == right.my_range.begin(), NULL);
106 right.my_total += left.my_total;
107 right.my_range = Range(left.my_range.begin(), right.my_range.end(), 1);
108 ASSERT(ScanIsRunning, NULL);
109 Snooze(true);
110 ASSERT(ScanIsRunning, NULL);
111}
112
113template<typename T>
114void Scan(const Range & r, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
115 ASSERT(!is_final || (storage.my_range.begin() == 0 && storage.my_range.end() == r.begin()) || (storage.my_range.empty() && r.begin() == 0), NULL);
116 for (long i = r.begin(); i < r.end(); ++i) {
117 storage.my_total += addend[i];
118 if (is_final) {
119 ASSERT(AddendHistory[i] < USED_FINAL, "addend used 'finally' twice?");
120 AddendHistory[i] |= USED_FINAL;
121 sum[i] = storage.my_total;
122 VerifySum(42, i, int(sum[i]), __LINE__);
123 }
124 else {
125 ASSERT(AddendHistory[i] == UNUSED, "addend used too many times");
126 AddendHistory[i] |= USED_NONFINAL;
127 }
128 }
129 if (storage.my_range.empty())
130 storage.my_range = r;
131 else
132 storage.my_range = Range(storage.my_range.begin(), r.end(), 1);
133 Snooze(true);
134}
135
136template<typename T>
137Storage<T> ScanWithInit(const Range & r, T init, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
138 if (r.begin() == 0)
139 storage.my_total = init;
140 Scan(r, is_final, storage, sum, addend);
141 return storage;
142}
143
144template<typename T>
145class Accumulator: BodyId {
146 const std::vector<T> &my_array;
147 std::vector<T> & my_sum;
148 Storage<T> storage;
149 enum state_type {
150 full, // Accumulator has sufficient information for final scan,
151 // i.e. has seen all iterations to its left.
152 // It's either the original Accumulator provided by the user
153 // or a Accumulator constructed by a splitting constructor *and* subsequently
154 // subjected to a reverse_join with a full accumulator.
155
156 partial, // Accumulator has only enough information for pre_scan.
157 // i.e. has not seen all iterations to its left.
158 // It's an Accumulator created by a splitting constructor that
159 // has not yet been subjected to a reverse_join with a full accumulator.
160
161 summary, // Accumulator has summary of iterations processed, but not necessarily
162 // the information required for a final_scan or pre_scan.
163 // It's the result of "assign".
164
165 trash // Accumulator with possibly no useful information.
166 // It was the source for "assign".
167
168 };
169 mutable state_type my_state;
170 //! Equals this while object is fully constructed, NULL otherwise.
171 /** Used to detect premature destruction and accidental bitwise copy. */
172 Accumulator* self;
173 Accumulator& operator= (const Accumulator& other);
174public:
175 Accumulator( T init, const std::vector<T> & array, std::vector<T> & sum ) :
176 my_array(array), my_sum(sum), storage(init), my_state(full)
177 {
178 // Set self as last action of constructor, to indicate that object is fully constructed.
179 self = this;
180 }
181#if PRINT_DEBUG
182 void print() const {
183 REPORT("%d [%ld..%ld)\n", id, storage.my_range.begin(), storage.my_range.end() );
184 }
185#endif /* PRINT_DEBUG */
186 ~Accumulator() {
187#if PRINT_DEBUG
188 REPORT("%d [%ld..%ld) destroyed\n",id, storage.my_range.begin(), storage.my_range.end() );
189#endif /* PRINT_DEBUG */
190 // Clear self as first action of destructor, to indicate that object is not fully constructed.
191 self = 0;
192 }
193 Accumulator( Accumulator& a, tbb::split ) :
194 my_array(a.my_array), my_sum(a.my_sum), storage(0), my_state(partial)
195 {
196 ASSERT(a.my_state==full || a.my_state==partial, NULL);
197#if PRINT_DEBUG
198 REPORT("%d forked from %d\n",id,a.id);
199#endif /* PRINT_DEBUG */
200 Snooze(true);
201 // Set self as last action of constructor, to indicate that object is fully constructed.
202 self = this;
203 }
204 template<typename Tag>
205 void operator()( const Range& r, Tag /*tag*/ ) {
206 ASSERT( Tag::is_final_scan() ? my_state==full : my_state==partial, NULL );
207#if PRINT_DEBUG
208 if(storage.my_range.empty() )
209 REPORT("%d computing %s [%ld..%ld)\n",id,Tag::is_final_scan()?"final":"lookahead",r.begin(),r.end() );
210 else
211 REPORT("%d computing %s [%ld..%ld) [%ld..%ld)\n",id,Tag::is_final_scan()?"final":"lookahead", storage.my_range.begin(), storage.my_range.end(),r.begin(),r.end());
212#endif /* PRINT_DEBUG */
213 Scan(r, Tag::is_final_scan(), storage, my_sum, my_array);
214 ASSERT( self==this, "this Accumulator corrupted or prematurely destroyed" );
215 }
216 void reverse_join( const Accumulator& left_body) {
217#if PRINT_DEBUG
218 REPORT("reverse join %d [%ld..%ld) %d [%ld..%ld)\n",
219 left_body.id, left_body.storage.my_range.begin(), left_body.storage.my_range.end(),
220 id, storage.my_range.begin(), storage.my_range.end());
221#endif /* PRINT_DEBUG */
222 const Storage<T> & left = left_body.storage;
223 Storage<T> & right = storage;
224 ASSERT(my_state==partial, NULL );
225 ASSERT(left_body.my_state==full || left_body.my_state==partial, NULL );
226
227 JoinStorages(left, right);
228
229 ASSERT(left_body.self==&left_body, NULL );
230 my_state = left_body.my_state;
231 }
232 void assign( const Accumulator& other ) {
233 ASSERT(other.my_state==full, NULL);
234 ASSERT(my_state==full, NULL);
235 storage.my_total = other.storage.my_total;
236 storage.my_range = other.storage.my_range;
237 ASSERT( self==this, NULL );
238 ASSERT( other.self==&other, "other Accumulator corrupted or prematurely destroyed" );
239 my_state = summary;
240 other.my_state = trash;
241 }
242 T get_total() {
243 return storage.my_total;
244 }
245};
246
247#include "tbb/tick_count.h"
248
249template<typename T, typename Scan, typename ReverseJoin>
250T ParallelScanFunctionalInvoker(const Range& range, T idx, const Scan& scan, const ReverseJoin& reverse_join, int mode) {
251 switch (mode%3) {
252 case 0:
253 return tbb::parallel_scan(range, idx, scan, reverse_join);
254 break;
255 case 1:
256 return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::simple_partitioner());
257 break;
258 default:
259 return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::auto_partitioner());
260 }
261}
262
263template<typename T>
264class ScanBody {
265 const std::vector<T> &my_addend;
266 std::vector<T> &my_sum;
267 const T my_init;
268 ScanBody& operator= (const ScanBody&);
269public:
270 ScanBody(T init, const std::vector<T> &addend, std::vector<T> &sum) :my_addend(addend), my_sum(sum), my_init(init) {}
271 template<typename Tag>
272 Storage<T> operator()(const Range& r, Storage<T> storage, Tag) const {
273 return ScanWithInit(r, my_init, Tag::is_final_scan(), storage, my_sum, my_addend);
274 }
275};
276
277template<typename T>
278class JoinBody {
279public:
280 Storage<T> operator()(const Storage<T>& left, Storage<T>& right) const {
281 JoinStorages(left, right);
282 return right;
283 }
284};
285
286template<typename T>
287T ParallelScanTemplateFunctor(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
288 for (long i = 0; i<MAXN; ++i) {
289 AddendHistory[i] = UNUSED;
290 }
291 ScanIsRunning = true;
292 ScanBody<T> sb(init, addend, sum);
293 JoinBody<T> jb;
294 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), sb, jb, mode);
295 ScanIsRunning = false;
296 if (range.empty())
297 res.my_total = init;
298 return res.my_total;
299}
300
301#if __TBB_CPP11_LAMBDAS_PRESENT
302template<typename T>
303T ParallelScanLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
304 for (long i = 0; i<MAXN; ++i) {
305 AddendHistory[i] = UNUSED;
306 }
307 ScanIsRunning = true;
308 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
309 [&addend, &sum, init](const Range& r, Storage<T> storage, bool is_final_scan /*tag*/) -> Storage<T> {
310 return ScanWithInit(r, init, is_final_scan, storage, sum, addend);
311 },
312 [](const Storage<T>& left, Storage<T>& right) -> Storage<T> {
313 JoinStorages(left, right);
314 return right;
315 },
316 mode);
317 ScanIsRunning = false;
318 if (range.empty())
319 res.my_total = init;
320 return res.my_total;
321}
322
323#if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
324template<typename T>
325T ParallelScanGenericLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
326 for (long i = 0; i<MAXN; ++i) {
327 AddendHistory[i] = UNUSED;
328 }
329 ScanIsRunning = true;
330 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
331 [&addend, &sum, init](const Range& rng, Storage<T> storage, auto scan_tag) {
332 return ScanWithInit(rng, init, scan_tag.is_final_scan(), storage, sum, addend);
333 },
334 [](const Storage<T>& left, Storage<T>& right) {
335 JoinStorages(left, right);
336 return right;
337 },
338 mode);
339 ScanIsRunning = false;
340 if (range.empty())
341 res.my_total = init;
342 return res.my_total;
343}
344#endif/* GENERIC_LAMBDAS */
345#endif/* LAMBDAS */
346
347void TestAccumulator( int mode, int nthread ) {
348 typedef int T;
349 std::vector<T> addend(MAXN);
350 std::vector<T> sum(MAXN);
351 for( long n=0; n<=MAXN; ++n ) {
352 for( long i=0; i<MAXN; ++i ) {
353 addend[i] = -1;
354 sum[i] = -2;
355 AddendHistory[i] = UNUSED;
356 }
357 for( long i=0; i<n; ++i )
358 addend[i] = i;
359
360 Accumulator<T> acc( 42, addend, sum );
361 tbb::tick_count t0 = tbb::tick_count::now();
362#if PRINT_DEBUG
363 REPORT("--------- mode=%d range=[0..%ld)\n",mode,n);
364#endif /* PRINT_DEBUG */
365 ScanIsRunning = true;
366
367 switch (mode) {
368 case 0:
369 tbb::parallel_scan( Range( 0, n, 1 ), acc );
370 break;
371 case 1:
372 tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::simple_partitioner() );
373 break;
374 case 2:
375 tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::auto_partitioner() );
376 break;
377 }
378
379 ScanIsRunning = false;
380#if PRINT_DEBUG
381 REPORT("=========\n");
382#endif /* PRINT_DEBUG */
383 Snooze(false);
384 tbb::tick_count t1 = tbb::tick_count::now();
385 long used_once_count = 0;
386 for( long i=0; i<n; ++i )
387 if( !(AddendHistory[i]&USED_FINAL) ) {
388 REPORT("failed to use addend[%ld] %s\n",i,AddendHistory[i]&USED_NONFINAL?"(but used nonfinal)":"");
389 }
390 for( long i=0; i<n; ++i ) {
391 VerifySum( 42, i, sum[i], __LINE__ );
392 used_once_count += AddendHistory[i]==USED_FINAL;
393 }
394 if( n )
395 ASSERT( acc.get_total()==sum[n-1], NULL );
396 else
397 ASSERT( acc.get_total()==42, NULL );
398 REMARK("time [n=%ld] = %g\tused_once%% = %g\tnthread=%d\n",n,(t1-t0).seconds(), n==0 ? 0 : 100.0*used_once_count/n,nthread);
399
400
401 std::vector<T> sum_tmplt(MAXN);
402 for (long i = 0; i<MAXN; ++i)
403 sum_tmplt[i] = -2;
404 T total_tmplt = ParallelScanTemplateFunctor(Range(0, n, 1), 42, addend, sum_tmplt, mode);
405
406 ASSERT(acc.get_total() == total_tmplt, "Parallel prefix sum with lambda interface is not equal to body interface");
407 ASSERT(sum == sum_tmplt, "Parallel prefix vector with lambda interface is not equal to body interface");
408
409#if __TBB_CPP11_LAMBDAS_PRESENT
410 std::vector<T> sum_lambda(MAXN);
411 for (long i = 0; i<MAXN; ++i)
412 sum_lambda[i] = -2;
413 T total_lambda = ParallelScanLambda(Range(0, n, 1), 42, addend, sum_lambda, mode);
414
415 ASSERT(acc.get_total() == total_lambda, "Parallel prefix sum with lambda interface is not equal to body interface");
416 ASSERT(sum == sum_lambda, "Parallel prefix vector with lambda interface is not equal to body interface");
417
418#if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
419 std::vector<T> sum_generic_lambda(MAXN);
420 for (long i = 0; i<MAXN; ++i)
421 sum_generic_lambda[i] = -2;
422 T total_generic_lambda = ParallelScanGenericLambda(Range(0, n, 1), 42, addend, sum_generic_lambda, mode);
423
424 ASSERT(acc.get_total() == total_generic_lambda, "Parallel prefix sum with lambda (generic) interface is not equal to body interface");
425 ASSERT(sum == sum_generic_lambda, "Parallel prefix vector with lambda (generic) interface is not equal to body interface");
426
427#endif /* GENERIC_LAMBDAS */
428#endif /* LAMBDAS */
429 }
430}
431
432static void TestScanTags() {
433 ASSERT( tbb::pre_scan_tag::is_final_scan()==false, NULL );
434 ASSERT( tbb::final_scan_tag::is_final_scan()==true, NULL );
435 ASSERT( tbb::pre_scan_tag() == false, NULL );
436 ASSERT( tbb::final_scan_tag() == true, NULL );
437}
438
439#include "tbb/task_scheduler_init.h"
440#include "harness_cpu.h"
441
442int TestMain () {
443 TestScanTags();
444 for( int p=MinThread; p<=MaxThread; ++p ) {
445 for (int mode = 0; mode < 3; mode++) {
446 tbb::task_scheduler_init init(p);
447 NumberOfLiveStorage = 0;
448 TestAccumulator(mode, p);
449 // Test that all workers sleep when no work
450 TestCPUUserTime(p);
451
452 // Checking has to be done late, because when parallel_scan makes copies of
453 // the user's "Body", the copies might be destroyed slightly after parallel_scan
454 // returns.
455 ASSERT( NumberOfLiveStorage==0, NULL );
456 }
457 }
458 return Harness::Done;
459}
460