1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "tbb/parallel_scan.h" |
18 | #include "tbb/blocked_range.h" |
19 | #include "harness_assert.h" |
20 | #include <vector> |
21 | |
22 | typedef tbb::blocked_range<long> Range; |
23 | |
24 | static volatile bool ScanIsRunning = false; |
25 | |
26 | //! Sum of 0..i with wrap around on overflow. |
27 | inline int TriangularSum( int i ) { |
28 | return i&1 ? ((i>>1)+1)*i : (i>>1)*(i+1); |
29 | } |
30 | |
31 | #include "harness.h" |
32 | |
33 | //! Verify that sum is init plus sum of integers in closed interval [0..finish_index]. |
34 | /** line should be the source line of the caller */ |
35 | void VerifySum( int init, long finish_index, int sum, int line ) { |
36 | int expected = init + TriangularSum(finish_index); |
37 | if (expected != sum) { |
38 | REPORT("line %d: sum[0..%ld] should be = %d, but was computed as %d\n" , |
39 | line, finish_index, expected, sum); |
40 | abort(); |
41 | } |
42 | } |
43 | |
44 | const int MAXN = 2000; |
45 | |
46 | enum AddendFlag { |
47 | UNUSED=0, |
48 | USED_NONFINAL=1, |
49 | USED_FINAL=2 |
50 | }; |
51 | |
52 | //! Array recording how each addend was used. |
53 | /** 'unsigned char' instead of AddendFlag for sake of compactness. */ |
54 | static unsigned char AddendHistory[MAXN]; |
55 | |
56 | //! Set to 1 for debugging output |
57 | #define PRINT_DEBUG 0 |
58 | |
59 | #include "tbb/atomic.h" |
60 | #if PRINT_DEBUG |
61 | #include <stdio.h> |
62 | #include "harness_report.h" |
63 | tbb::atomic<long> NextBodyId; |
64 | #endif /* PRINT_DEBUG */ |
65 | |
66 | struct BodyId { |
67 | #if PRINT_DEBUG |
68 | const int id; |
69 | BodyId() : id(NextBodyId++) {} |
70 | #endif /* PRINT_DEBUG */ |
71 | }; |
72 | |
73 | tbb::atomic<long> NumberOfLiveStorage; |
74 | |
75 | static void Snooze( bool scan_should_be_running ) { |
76 | ASSERT( ScanIsRunning==scan_should_be_running, NULL ); |
77 | } |
78 | |
79 | template<typename T> |
80 | struct Storage { |
81 | T my_total; |
82 | Range my_range; |
83 | Storage(T init) : |
84 | my_total(init), my_range(-1, -1, 1) { |
85 | ++NumberOfLiveStorage; |
86 | } |
87 | ~Storage() { |
88 | --NumberOfLiveStorage; |
89 | } |
90 | Storage(const Storage& strg) : |
91 | my_total(strg.my_total), my_range(strg.my_range) { |
92 | ++NumberOfLiveStorage; |
93 | } |
94 | Storage & operator=(const Storage& strg) { |
95 | my_total = strg.my_total; |
96 | my_range = strg.my_range; |
97 | return *this; |
98 | } |
99 | }; |
100 | |
101 | template<typename T> |
102 | void JoinStorages(const Storage<T>& left, Storage<T>& right) { |
103 | Snooze(true); |
104 | ASSERT(ScanIsRunning, NULL); |
105 | ASSERT(left.my_range.end() == right.my_range.begin(), NULL); |
106 | right.my_total += left.my_total; |
107 | right.my_range = Range(left.my_range.begin(), right.my_range.end(), 1); |
108 | ASSERT(ScanIsRunning, NULL); |
109 | Snooze(true); |
110 | ASSERT(ScanIsRunning, NULL); |
111 | } |
112 | |
113 | template<typename T> |
114 | void Scan(const Range & r, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) { |
115 | ASSERT(!is_final || (storage.my_range.begin() == 0 && storage.my_range.end() == r.begin()) || (storage.my_range.empty() && r.begin() == 0), NULL); |
116 | for (long i = r.begin(); i < r.end(); ++i) { |
117 | storage.my_total += addend[i]; |
118 | if (is_final) { |
119 | ASSERT(AddendHistory[i] < USED_FINAL, "addend used 'finally' twice?" ); |
120 | AddendHistory[i] |= USED_FINAL; |
121 | sum[i] = storage.my_total; |
122 | VerifySum(42, i, int(sum[i]), __LINE__); |
123 | } |
124 | else { |
125 | ASSERT(AddendHistory[i] == UNUSED, "addend used too many times" ); |
126 | AddendHistory[i] |= USED_NONFINAL; |
127 | } |
128 | } |
129 | if (storage.my_range.empty()) |
130 | storage.my_range = r; |
131 | else |
132 | storage.my_range = Range(storage.my_range.begin(), r.end(), 1); |
133 | Snooze(true); |
134 | } |
135 | |
136 | template<typename T> |
137 | Storage<T> ScanWithInit(const Range & r, T init, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) { |
138 | if (r.begin() == 0) |
139 | storage.my_total = init; |
140 | Scan(r, is_final, storage, sum, addend); |
141 | return storage; |
142 | } |
143 | |
144 | template<typename T> |
145 | class Accumulator: BodyId { |
146 | const std::vector<T> &my_array; |
147 | std::vector<T> & my_sum; |
148 | Storage<T> storage; |
149 | enum state_type { |
150 | full, // Accumulator has sufficient information for final scan, |
151 | // i.e. has seen all iterations to its left. |
152 | // It's either the original Accumulator provided by the user |
153 | // or a Accumulator constructed by a splitting constructor *and* subsequently |
154 | // subjected to a reverse_join with a full accumulator. |
155 | |
156 | partial, // Accumulator has only enough information for pre_scan. |
157 | // i.e. has not seen all iterations to its left. |
158 | // It's an Accumulator created by a splitting constructor that |
159 | // has not yet been subjected to a reverse_join with a full accumulator. |
160 | |
161 | summary, // Accumulator has summary of iterations processed, but not necessarily |
162 | // the information required for a final_scan or pre_scan. |
163 | // It's the result of "assign". |
164 | |
165 | trash // Accumulator with possibly no useful information. |
166 | // It was the source for "assign". |
167 | |
168 | }; |
169 | mutable state_type my_state; |
170 | //! Equals this while object is fully constructed, NULL otherwise. |
171 | /** Used to detect premature destruction and accidental bitwise copy. */ |
172 | Accumulator* self; |
173 | Accumulator& operator= (const Accumulator& other); |
174 | public: |
175 | Accumulator( T init, const std::vector<T> & array, std::vector<T> & sum ) : |
176 | my_array(array), my_sum(sum), storage(init), my_state(full) |
177 | { |
178 | // Set self as last action of constructor, to indicate that object is fully constructed. |
179 | self = this; |
180 | } |
181 | #if PRINT_DEBUG |
182 | void print() const { |
183 | REPORT("%d [%ld..%ld)\n" , id, storage.my_range.begin(), storage.my_range.end() ); |
184 | } |
185 | #endif /* PRINT_DEBUG */ |
186 | ~Accumulator() { |
187 | #if PRINT_DEBUG |
188 | REPORT("%d [%ld..%ld) destroyed\n" ,id, storage.my_range.begin(), storage.my_range.end() ); |
189 | #endif /* PRINT_DEBUG */ |
190 | // Clear self as first action of destructor, to indicate that object is not fully constructed. |
191 | self = 0; |
192 | } |
193 | Accumulator( Accumulator& a, tbb::split ) : |
194 | my_array(a.my_array), my_sum(a.my_sum), storage(0), my_state(partial) |
195 | { |
196 | ASSERT(a.my_state==full || a.my_state==partial, NULL); |
197 | #if PRINT_DEBUG |
198 | REPORT("%d forked from %d\n" ,id,a.id); |
199 | #endif /* PRINT_DEBUG */ |
200 | Snooze(true); |
201 | // Set self as last action of constructor, to indicate that object is fully constructed. |
202 | self = this; |
203 | } |
204 | template<typename Tag> |
205 | void operator()( const Range& r, Tag /*tag*/ ) { |
206 | ASSERT( Tag::is_final_scan() ? my_state==full : my_state==partial, NULL ); |
207 | #if PRINT_DEBUG |
208 | if(storage.my_range.empty() ) |
209 | REPORT("%d computing %s [%ld..%ld)\n" ,id,Tag::is_final_scan()?"final" :"lookahead" ,r.begin(),r.end() ); |
210 | else |
211 | REPORT("%d computing %s [%ld..%ld) [%ld..%ld)\n" ,id,Tag::is_final_scan()?"final" :"lookahead" , storage.my_range.begin(), storage.my_range.end(),r.begin(),r.end()); |
212 | #endif /* PRINT_DEBUG */ |
213 | Scan(r, Tag::is_final_scan(), storage, my_sum, my_array); |
214 | ASSERT( self==this, "this Accumulator corrupted or prematurely destroyed" ); |
215 | } |
216 | void reverse_join( const Accumulator& left_body) { |
217 | #if PRINT_DEBUG |
218 | REPORT("reverse join %d [%ld..%ld) %d [%ld..%ld)\n" , |
219 | left_body.id, left_body.storage.my_range.begin(), left_body.storage.my_range.end(), |
220 | id, storage.my_range.begin(), storage.my_range.end()); |
221 | #endif /* PRINT_DEBUG */ |
222 | const Storage<T> & left = left_body.storage; |
223 | Storage<T> & right = storage; |
224 | ASSERT(my_state==partial, NULL ); |
225 | ASSERT(left_body.my_state==full || left_body.my_state==partial, NULL ); |
226 | |
227 | JoinStorages(left, right); |
228 | |
229 | ASSERT(left_body.self==&left_body, NULL ); |
230 | my_state = left_body.my_state; |
231 | } |
232 | void assign( const Accumulator& other ) { |
233 | ASSERT(other.my_state==full, NULL); |
234 | ASSERT(my_state==full, NULL); |
235 | storage.my_total = other.storage.my_total; |
236 | storage.my_range = other.storage.my_range; |
237 | ASSERT( self==this, NULL ); |
238 | ASSERT( other.self==&other, "other Accumulator corrupted or prematurely destroyed" ); |
239 | my_state = summary; |
240 | other.my_state = trash; |
241 | } |
242 | T get_total() { |
243 | return storage.my_total; |
244 | } |
245 | }; |
246 | |
247 | #include "tbb/tick_count.h" |
248 | |
249 | template<typename T, typename Scan, typename ReverseJoin> |
250 | T ParallelScanFunctionalInvoker(const Range& range, T idx, const Scan& scan, const ReverseJoin& reverse_join, int mode) { |
251 | switch (mode%3) { |
252 | case 0: |
253 | return tbb::parallel_scan(range, idx, scan, reverse_join); |
254 | break; |
255 | case 1: |
256 | return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::simple_partitioner()); |
257 | break; |
258 | default: |
259 | return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::auto_partitioner()); |
260 | } |
261 | } |
262 | |
263 | template<typename T> |
264 | class ScanBody { |
265 | const std::vector<T> &my_addend; |
266 | std::vector<T> &my_sum; |
267 | const T my_init; |
268 | ScanBody& operator= (const ScanBody&); |
269 | public: |
270 | ScanBody(T init, const std::vector<T> &addend, std::vector<T> &sum) :my_addend(addend), my_sum(sum), my_init(init) {} |
271 | template<typename Tag> |
272 | Storage<T> operator()(const Range& r, Storage<T> storage, Tag) const { |
273 | return ScanWithInit(r, my_init, Tag::is_final_scan(), storage, my_sum, my_addend); |
274 | } |
275 | }; |
276 | |
277 | template<typename T> |
278 | class JoinBody { |
279 | public: |
280 | Storage<T> operator()(const Storage<T>& left, Storage<T>& right) const { |
281 | JoinStorages(left, right); |
282 | return right; |
283 | } |
284 | }; |
285 | |
286 | template<typename T> |
287 | T ParallelScanTemplateFunctor(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { |
288 | for (long i = 0; i<MAXN; ++i) { |
289 | AddendHistory[i] = UNUSED; |
290 | } |
291 | ScanIsRunning = true; |
292 | ScanBody<T> sb(init, addend, sum); |
293 | JoinBody<T> jb; |
294 | Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), sb, jb, mode); |
295 | ScanIsRunning = false; |
296 | if (range.empty()) |
297 | res.my_total = init; |
298 | return res.my_total; |
299 | } |
300 | |
301 | #if __TBB_CPP11_LAMBDAS_PRESENT |
302 | template<typename T> |
303 | T ParallelScanLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { |
304 | for (long i = 0; i<MAXN; ++i) { |
305 | AddendHistory[i] = UNUSED; |
306 | } |
307 | ScanIsRunning = true; |
308 | Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), |
309 | [&addend, &sum, init](const Range& r, Storage<T> storage, bool is_final_scan /*tag*/) -> Storage<T> { |
310 | return ScanWithInit(r, init, is_final_scan, storage, sum, addend); |
311 | }, |
312 | [](const Storage<T>& left, Storage<T>& right) -> Storage<T> { |
313 | JoinStorages(left, right); |
314 | return right; |
315 | }, |
316 | mode); |
317 | ScanIsRunning = false; |
318 | if (range.empty()) |
319 | res.my_total = init; |
320 | return res.my_total; |
321 | } |
322 | |
323 | #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT |
324 | template<typename T> |
325 | T ParallelScanGenericLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { |
326 | for (long i = 0; i<MAXN; ++i) { |
327 | AddendHistory[i] = UNUSED; |
328 | } |
329 | ScanIsRunning = true; |
330 | Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), |
331 | [&addend, &sum, init](const Range& rng, Storage<T> storage, auto scan_tag) { |
332 | return ScanWithInit(rng, init, scan_tag.is_final_scan(), storage, sum, addend); |
333 | }, |
334 | [](const Storage<T>& left, Storage<T>& right) { |
335 | JoinStorages(left, right); |
336 | return right; |
337 | }, |
338 | mode); |
339 | ScanIsRunning = false; |
340 | if (range.empty()) |
341 | res.my_total = init; |
342 | return res.my_total; |
343 | } |
344 | #endif/* GENERIC_LAMBDAS */ |
345 | #endif/* LAMBDAS */ |
346 | |
347 | void TestAccumulator( int mode, int nthread ) { |
348 | typedef int T; |
349 | std::vector<T> addend(MAXN); |
350 | std::vector<T> sum(MAXN); |
351 | for( long n=0; n<=MAXN; ++n ) { |
352 | for( long i=0; i<MAXN; ++i ) { |
353 | addend[i] = -1; |
354 | sum[i] = -2; |
355 | AddendHistory[i] = UNUSED; |
356 | } |
357 | for( long i=0; i<n; ++i ) |
358 | addend[i] = i; |
359 | |
360 | Accumulator<T> acc( 42, addend, sum ); |
361 | tbb::tick_count t0 = tbb::tick_count::now(); |
362 | #if PRINT_DEBUG |
363 | REPORT("--------- mode=%d range=[0..%ld)\n" ,mode,n); |
364 | #endif /* PRINT_DEBUG */ |
365 | ScanIsRunning = true; |
366 | |
367 | switch (mode) { |
368 | case 0: |
369 | tbb::parallel_scan( Range( 0, n, 1 ), acc ); |
370 | break; |
371 | case 1: |
372 | tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::simple_partitioner() ); |
373 | break; |
374 | case 2: |
375 | tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::auto_partitioner() ); |
376 | break; |
377 | } |
378 | |
379 | ScanIsRunning = false; |
380 | #if PRINT_DEBUG |
381 | REPORT("=========\n" ); |
382 | #endif /* PRINT_DEBUG */ |
383 | Snooze(false); |
384 | tbb::tick_count t1 = tbb::tick_count::now(); |
385 | long used_once_count = 0; |
386 | for( long i=0; i<n; ++i ) |
387 | if( !(AddendHistory[i]&USED_FINAL) ) { |
388 | REPORT("failed to use addend[%ld] %s\n" ,i,AddendHistory[i]&USED_NONFINAL?"(but used nonfinal)" :"" ); |
389 | } |
390 | for( long i=0; i<n; ++i ) { |
391 | VerifySum( 42, i, sum[i], __LINE__ ); |
392 | used_once_count += AddendHistory[i]==USED_FINAL; |
393 | } |
394 | if( n ) |
395 | ASSERT( acc.get_total()==sum[n-1], NULL ); |
396 | else |
397 | ASSERT( acc.get_total()==42, NULL ); |
398 | REMARK("time [n=%ld] = %g\tused_once%% = %g\tnthread=%d\n" ,n,(t1-t0).seconds(), n==0 ? 0 : 100.0*used_once_count/n,nthread); |
399 | |
400 | |
401 | std::vector<T> sum_tmplt(MAXN); |
402 | for (long i = 0; i<MAXN; ++i) |
403 | sum_tmplt[i] = -2; |
404 | T total_tmplt = ParallelScanTemplateFunctor(Range(0, n, 1), 42, addend, sum_tmplt, mode); |
405 | |
406 | ASSERT(acc.get_total() == total_tmplt, "Parallel prefix sum with lambda interface is not equal to body interface" ); |
407 | ASSERT(sum == sum_tmplt, "Parallel prefix vector with lambda interface is not equal to body interface" ); |
408 | |
409 | #if __TBB_CPP11_LAMBDAS_PRESENT |
410 | std::vector<T> sum_lambda(MAXN); |
411 | for (long i = 0; i<MAXN; ++i) |
412 | sum_lambda[i] = -2; |
413 | T total_lambda = ParallelScanLambda(Range(0, n, 1), 42, addend, sum_lambda, mode); |
414 | |
415 | ASSERT(acc.get_total() == total_lambda, "Parallel prefix sum with lambda interface is not equal to body interface" ); |
416 | ASSERT(sum == sum_lambda, "Parallel prefix vector with lambda interface is not equal to body interface" ); |
417 | |
418 | #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT |
419 | std::vector<T> sum_generic_lambda(MAXN); |
420 | for (long i = 0; i<MAXN; ++i) |
421 | sum_generic_lambda[i] = -2; |
422 | T total_generic_lambda = ParallelScanGenericLambda(Range(0, n, 1), 42, addend, sum_generic_lambda, mode); |
423 | |
424 | ASSERT(acc.get_total() == total_generic_lambda, "Parallel prefix sum with lambda (generic) interface is not equal to body interface" ); |
425 | ASSERT(sum == sum_generic_lambda, "Parallel prefix vector with lambda (generic) interface is not equal to body interface" ); |
426 | |
427 | #endif /* GENERIC_LAMBDAS */ |
428 | #endif /* LAMBDAS */ |
429 | } |
430 | } |
431 | |
432 | static void TestScanTags() { |
433 | ASSERT( tbb::pre_scan_tag::is_final_scan()==false, NULL ); |
434 | ASSERT( tbb::final_scan_tag::is_final_scan()==true, NULL ); |
435 | ASSERT( tbb::pre_scan_tag() == false, NULL ); |
436 | ASSERT( tbb::final_scan_tag() == true, NULL ); |
437 | } |
438 | |
439 | #include "tbb/task_scheduler_init.h" |
440 | #include "harness_cpu.h" |
441 | |
442 | int TestMain () { |
443 | TestScanTags(); |
444 | for( int p=MinThread; p<=MaxThread; ++p ) { |
445 | for (int mode = 0; mode < 3; mode++) { |
446 | tbb::task_scheduler_init init(p); |
447 | NumberOfLiveStorage = 0; |
448 | TestAccumulator(mode, p); |
449 | // Test that all workers sleep when no work |
450 | TestCPUUserTime(p); |
451 | |
452 | // Checking has to be done late, because when parallel_scan makes copies of |
453 | // the user's "Body", the copies might be destroyed slightly after parallel_scan |
454 | // returns. |
455 | ASSERT( NumberOfLiveStorage==0, NULL ); |
456 | } |
457 | } |
458 | return Harness::Done; |
459 | } |
460 | |