1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Example program that computes Fibonacci numbers in different ways.
18 Arguments are: [ Number [Threads [Repeats]]]
19 The defaults are Number=500 Threads=1:4 Repeats=1.
20
21 The point of this program is to check that the library is working properly.
22 Most of the computations are deliberately silly and not expected to
23 show any speedup on multiprocessors.
24*/
25
26// enable assertions
27#ifdef NDEBUG
28#undef NDEBUG
29#endif
30
31#include <cstdio>
32#include <cstdlib>
33#include <cassert>
34#include <utility>
35#include "tbb/task.h"
36#include "tbb/task_scheduler_init.h"
37#include "tbb/tick_count.h"
38#include "tbb/blocked_range.h"
39#include "tbb/concurrent_vector.h"
40#include "tbb/concurrent_queue.h"
41#include "tbb/concurrent_hash_map.h"
42#include "tbb/parallel_while.h"
43#include "tbb/parallel_for.h"
44#include "tbb/parallel_reduce.h"
45#include "tbb/parallel_scan.h"
46#include "tbb/pipeline.h"
47#include "tbb/atomic.h"
48#include "tbb/mutex.h"
49#include "tbb/spin_mutex.h"
50#include "tbb/queuing_mutex.h"
51#include "tbb/tbb_thread.h"
52
53using namespace std;
54using namespace tbb;
55
56//! type used for Fibonacci number computations
57typedef long long value;
58
59//! Matrix 2x2 class
60struct Matrix2x2
61{
62 //! Array of values
63 value v[2][2];
64 Matrix2x2() {}
65 Matrix2x2(value v00, value v01, value v10, value v11) {
66 v[0][0] = v00; v[0][1] = v01; v[1][0] = v10; v[1][1] = v11;
67 }
68 Matrix2x2 operator * (const Matrix2x2 &to) const; //< Multiply two Matrices
69};
70//! Identity matrix
71static const Matrix2x2 MatrixIdentity(1, 0, 0, 1);
72//! Default matrix to multiply
73static const Matrix2x2 Matrix1110(1, 1, 1, 0);
74//! Raw arrays matrices multiply
75void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]);
76
77/////////////////////// Serial methods ////////////////////////
78
79//! Plain serial sum
80value SerialFib(int n)
81{
82 if(n < 2)
83 return n;
84 value a = 0, b = 1, sum; int i;
85 for( i = 2; i <= n; i++ )
86 { // n is really index of Fibonacci number
87 sum = a + b; a = b; b = sum;
88 }
89 return sum;
90}
91//! Serial n-1 matrices multiplication
92value SerialMatrixFib(int n)
93{
94 value c[2][2], a[2][2] = {{1, 1}, {1, 0}}, b[2][2] = {{1, 1}, {1, 0}}; int i;
95 for(i = 2; i < n; i++)
96 { // Using condition to prevent copying of values
97 if(i & 1) Matrix2x2Multiply(a, c, b);
98 else Matrix2x2Multiply(a, b, c);
99 }
100 return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell
101}
102//! Recursive summing. Just for complete list of serial algorithms, not used
103value SerialRecursiveFib(int n)
104{
105 value result;
106 if(n < 2)
107 result = n;
108 else
109 result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
110 return result;
111}
112//! Introducing of queue method in serial
113value SerialQueueFib(int n)
114{
115 concurrent_queue<Matrix2x2> Q;
116 for(int i = 1; i < n; i++)
117 Q.push(Matrix1110);
118 Matrix2x2 A, B;
119 while(true) {
120 while( !Q.try_pop(A) ) this_tbb_thread::yield();
121 if(Q.empty()) break;
122 while( !Q.try_pop(B) ) this_tbb_thread::yield();
123 Q.push(A * B);
124 }
125 return A.v[0][0];
126}
127//! Trying to use concurrent_vector
128value SerialVectorFib(int n)
129{
130 concurrent_vector<value> A;
131 A.grow_by(2);
132 A[0] = 0; A[1] = 1;
133 for( int i = 2; i <= n; i++)
134 {
135 A.grow_to_at_least(i+1);
136 A[i] = A[i-1] + A[i-2];
137 }
138 return A[n];
139}
140
141///////////////////// Parallel methods ////////////////////////
142
143// *** Serial shared by mutexes *** //
144
145//! Shared glabals
146value SharedA = 0, SharedB = 1; int SharedI = 1, SharedN;
147
148//! Template task class which computes Fibonacci numbers with shared globals
149template<typename M>
150class SharedSerialFibBody {
151 M &mutex;
152public:
153 SharedSerialFibBody( M &m ) : mutex( m ) {}
154 //! main loop
155 void operator()( const blocked_range<int>& range ) const {
156 for(;;) {
157 typename M::scoped_lock lock( mutex );
158 if(SharedI >= SharedN) break;
159 value sum = SharedA + SharedB;
160 SharedA = SharedB; SharedB = sum;
161 ++SharedI;
162 }
163 }
164};
165
166//! Root function
167template<class M>
168value SharedSerialFib(int n)
169{
170 SharedA = 0; SharedB = 1; SharedI = 1; SharedN = n; M mutex;
171 parallel_for( blocked_range<int>(0,4,1), SharedSerialFibBody<M>( mutex ) );
172 return SharedB;
173}
174
175// *** Serial shared by concurrent hash map *** //
176
177//! Hash comparer
178struct IntHashCompare {
179 bool equal( const int j, const int k ) const { return j == k; }
180 unsigned long hash( const int k ) const { return (unsigned long)k; }
181};
182//! NumbersTable type based on concurrent_hash_map
183typedef concurrent_hash_map<int, value, IntHashCompare> NumbersTable;
184//! task for serial method using shared concurrent_hash_map
185class ConcurrentHashSerialFibTask: public task {
186 NumbersTable &Fib;
187 int my_n;
188public:
189 //! constructor
190 ConcurrentHashSerialFibTask( NumbersTable &cht, int n ) : Fib(cht), my_n(n) { }
191 //! executing task
192 task* execute() /*override*/ {
193 for( int i = 2; i <= my_n; ++i ) { // there is no difference in to recycle or to make loop
194 NumbersTable::const_accessor f1, f2; // same as iterators
195 if( !Fib.find(f1, i-1) || !Fib.find(f2, i-2) ) {
196 // Something is seriously wrong, because i-1 and i-2 must have been inserted
197 // earlier by this thread or another thread.
198 assert(0);
199 }
200 value sum = f1->second + f2->second;
201 NumbersTable::const_accessor fsum;
202 Fib.insert(fsum, make_pair(i, sum)); // inserting
203 assert( fsum->second == sum ); // check value
204 }
205 return 0;
206 }
207};
208
209//! Root function
210value ConcurrentHashSerialFib(int n)
211{
212 NumbersTable Fib;
213 bool okay;
214 okay = Fib.insert( make_pair(0, 0) ); assert(okay); // assign initial values
215 okay = Fib.insert( make_pair(1, 1) ); assert(okay);
216
217 task_list list;
218 // allocate tasks
219 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
220 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
221 task::spawn_root_and_wait(list);
222 NumbersTable::const_accessor fresult;
223 okay = Fib.find( fresult, n );
224 assert(okay);
225 return fresult->second;
226}
227
228// *** Queue with parallel_for and parallel_while *** //
229
230//! Stream of matrices
231struct QueueStream {
232 volatile bool producer_is_done;
233 concurrent_queue<Matrix2x2> Queue;
234 //! Get pair of matricies if present
235 bool pop_if_present( pair<Matrix2x2, Matrix2x2> &mm ) {
236 // get first matrix if present
237 if(!Queue.try_pop(mm.first)) return false;
238 // get second matrix if present
239 if(!Queue.try_pop(mm.second)) {
240 // if not, then push back first matrix
241 Queue.push(mm.first); return false;
242 }
243 return true;
244 }
245};
246
247//! Functor for parallel_for which fills the queue
248struct parallel_forFibBody {
249 QueueStream &my_stream;
250 //! fill functor arguments
251 parallel_forFibBody(QueueStream &s) : my_stream(s) { }
252 //! iterate thorough range
253 void operator()( const blocked_range<int> &range ) const {
254 int i_end = range.end();
255 for( int i = range.begin(); i != i_end; ++i ) {
256 my_stream.Queue.push( Matrix1110 ); // push initial matrix
257 }
258 }
259};
260//! Functor for parallel_while which process the queue
261class parallel_whileFibBody
262{
263 QueueStream &my_stream;
264 parallel_while<parallel_whileFibBody> &my_while;
265public:
266 typedef pair<Matrix2x2, Matrix2x2> argument_type;
267 //! fill functor arguments
268 parallel_whileFibBody(parallel_while<parallel_whileFibBody> &w, QueueStream &s)
269 : my_while(w), my_stream(s) { }
270 //! process pair of matrices
271 void operator() (argument_type mm) const {
272 mm.first = mm.first * mm.second;
273 // note: it can run concurrently with QueueStream::pop_if_present()
274 if(my_stream.Queue.try_pop(mm.second))
275 my_while.add( mm ); // now, two matrices available. Add next iteration.
276 else my_stream.Queue.push( mm.first ); // or push back calculated value if queue is empty
277 }
278};
279
280//! Parallel queue's filling task
281struct QueueInsertTask: public task {
282 QueueStream &my_stream;
283 int my_n;
284 //! fill task arguments
285 QueueInsertTask( int n, QueueStream &s ) : my_n(n), my_stream(s) { }
286 //! executing task
287 task* execute() /*override*/ {
288 // Execute of parallel pushing of n-1 initial matrices
289 parallel_for( blocked_range<int>( 1, my_n, 10 ), parallel_forFibBody(my_stream) );
290 my_stream.producer_is_done = true;
291 return 0;
292 }
293};
294//! Parallel queue's processing task
295struct QueueProcessTask: public task {
296 QueueStream &my_stream;
297 //! fill task argument
298 QueueProcessTask( QueueStream &s ) : my_stream(s) { }
299 //! executing task
300 task* execute() /*override*/ {
301 while( !my_stream.producer_is_done || my_stream.Queue.unsafe_size()>1 ) {
302 parallel_while<parallel_whileFibBody> w; // run while loop in parallel
303 w.run( my_stream, parallel_whileFibBody( w, my_stream ) );
304 }
305 return 0;
306 }
307};
308//! Root function
309value ParallelQueueFib(int n)
310{
311 QueueStream stream;
312 stream.producer_is_done = false;
313 task_list list;
314 list.push_back(*new(task::allocate_root()) QueueInsertTask( n, stream ));
315 list.push_back(*new(task::allocate_root()) QueueProcessTask( stream ));
316 // If there is only a single thread, the first task in the list runs to completion
317 // before the second task in the list starts.
318 task::spawn_root_and_wait(list);
319 assert(stream.Queue.unsafe_size() == 1); // it is easy to lose some work
320 Matrix2x2 M;
321 bool result = stream.Queue.try_pop( M ); // get last matrix
322 assert( result );
323 return M.v[0][0]; // and result number
324}
325
326// *** Queue with pipeline *** //
327
328//! filter to fills queue
329class InputFilter: public filter {
330 tbb::atomic<int> N; //< index of Fibonacci number minus 1
331public:
332 concurrent_queue<Matrix2x2> Queue;
333 //! fill filter arguments
334 InputFilter( int n ) : filter(false /*is not serial*/) { N = n; }
335 //! executing filter
336 void* operator()(void*) /*override*/ {
337 int n = --N;
338 if(n <= 0) return 0;
339 Queue.push( Matrix1110 );
340 return &Queue;
341 }
342};
343//! filter to process queue
344class MultiplyFilter: public filter {
345public:
346 MultiplyFilter( ) : filter(false /*is not serial*/) { }
347 //! executing filter
348 void* operator()(void*p) /*override*/ {
349 concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p);
350 Matrix2x2 m1, m2;
351 // get two elements
352 while( !Queue.try_pop( m1 ) ) this_tbb_thread::yield();
353 while( !Queue.try_pop( m2 ) ) this_tbb_thread::yield();
354 m1 = m1 * m2; // process them
355 Queue.push( m1 ); // and push back
356 return this; // just nothing
357 }
358};
359//! Root function
360value ParallelPipeFib(int n)
361{
362 InputFilter input( n-1 );
363 MultiplyFilter process;
364 // Create the pipeline
365 pipeline pipeline;
366 // add filters
367 pipeline.add_filter( input ); // first
368 pipeline.add_filter( process ); // second
369
370 input.Queue.push( Matrix1110 );
371 // Run the pipeline
372 pipeline.run( n ); // must be larger then max threads number
373 pipeline.clear(); // do not forget clear the pipeline
374
375 assert( input.Queue.unsafe_size()==1 );
376 Matrix2x2 M;
377 bool result = input.Queue.try_pop( M ); // get last element
378 assert( result );
379 return M.v[0][0]; // get value
380}
381
382// *** parallel_reduce *** //
383
384//! Functor for parallel_reduce
385struct parallel_reduceFibBody {
386 Matrix2x2 sum;
387 int split_flag; //< flag to make one less operation for split bodies
388 //! Constructor fills sum with initial matrix
389 parallel_reduceFibBody() : sum( Matrix1110 ), split_flag(0) { }
390 //! Splitting constructor
391 parallel_reduceFibBody( parallel_reduceFibBody& other, split ) : sum( Matrix1110 ), split_flag(1/*note that it is split*/) {}
392 //! Join point
393 void join( parallel_reduceFibBody &s ) {
394 sum = sum * s.sum;
395 }
396 //! Process multiplications
397 void operator()( const blocked_range<int> &r ) {
398 for( int k = r.begin() + split_flag; k < r.end(); ++k )
399 sum = sum * Matrix1110;
400 split_flag = 0; // reset flag, because this method can be reused for next range
401 }
402};
403//! Root function
404value parallel_reduceFib(int n)
405{
406 parallel_reduceFibBody b;
407 parallel_reduce(blocked_range<int>(2, n, 3), b); // do parallel reduce on range [2, n) for b
408 return b.sum.v[0][0];
409}
410
411// *** parallel_scan *** //
412
413//! Functor for parallel_scan
414struct parallel_scanFibBody {
415 /** Though parallel_scan is usually used to accumulate running sums,
416 it can be used to accumulate running products too. */
417 Matrix2x2 product;
418 /** Pointer to output sequence */
419 value* const output;
420 //! Constructor sets product to identity matrix
421 parallel_scanFibBody(value* output_) : product( MatrixIdentity ), output(output_) {}
422 //! Splitting constructor
423 parallel_scanFibBody( parallel_scanFibBody &b, split) : product( MatrixIdentity ), output(b.output) {}
424 //! Method for merging summary information from a, which was split off from *this, into *this.
425 void reverse_join( parallel_scanFibBody &a ) {
426 // When using non-commutative reduction operation, reverse_join
427 // should put argument "a" on the left side of the operation.
428 // The reversal from the argument order is why the method is
429 // called "reverse_join" instead of "join".
430 product = a.product * product;
431 }
432 //! Method for assigning final result back to original body.
433 void assign( parallel_scanFibBody &b ) {
434 product = b.product;
435 }
436 //! Compute matrix running product.
437 /** Tag indicates whether is is the final scan over the range, or
438 just a helper "prescan" that is computing a partial reduction. */
439 template<typename Tag>
440 void operator()( const blocked_range<int> &r, Tag tag) {
441 for( int k = r.begin(); k < r.end(); ++k ) {
442 // Code performs an "exclusive" scan, which outputs a value *before* updating the product.
443 // For an "inclusive" scan, output the value after the update.
444 if( tag.is_final_scan() )
445 output[k] = product.v[0][1];
446 product = product * Matrix1110;
447 }
448 }
449};
450//! Root function
451value parallel_scanFib(int n)
452{
453 value* output = new value[n];
454 parallel_scanFibBody b(output);
455 parallel_scan(blocked_range<int>(0, n, 3), b);
456 // output[0..n-1] now contains the Fibonacci sequence (modulo integer wrap-around).
457 // Check the last two values for correctness.
458 assert( n<2 || output[n-2]+output[n-1]==b.product.v[0][1] );
459 delete[] output;
460 return b.product.v[0][1];
461}
462
463// *** Raw tasks *** //
464
465//! task class which computes Fibonacci numbers by Lucas formula
466struct FibTask: public task {
467 const int n;
468 value& sum;
469 value x, y;
470 bool second_phase; //< flag of continuation
471 // task arguments
472 FibTask( int n_, value& sum_ ) :
473 n(n_), sum(sum_), second_phase(false)
474 {}
475 //! Execute task
476 task* execute() /*override*/ {
477 // Using Lucas' formula here
478 if( second_phase ) { // children finished
479 sum = n&1 ? x*x + y*y : x*x - y*y;
480 return NULL;
481 }
482 if( n <= 2 ) {
483 sum = n!=0;
484 return NULL;
485 } else {
486 recycle_as_continuation(); // repeat this task when children finish
487 second_phase = true; // mark second phase
488 FibTask& a = *new( allocate_child() ) FibTask( n/2 + 1, x );
489 FibTask& b = *new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y );
490 set_ref_count(2);
491 spawn( a );
492 return &b;
493 }
494 }
495};
496//! Root function
497value ParallelTaskFib(int n) {
498 value sum;
499 FibTask& a = *new(task::allocate_root()) FibTask(n, sum);
500 task::spawn_root_and_wait(a);
501 return sum;
502}
503
504/////////////////////////// Main ////////////////////////////////////////////////////
505
506//! A closed range of int.
507struct IntRange {
508 int low;
509 int high;
510 void set_from_string( const char* s );
511 IntRange( int low_, int high_ ) : low(low_), high(high_) {}
512};
513
514void IntRange::set_from_string( const char* s ) {
515 char* end;
516 high = low = strtol(s,&end,0);
517 switch( *end ) {
518 case ':':
519 high = strtol(end+1,0,0);
520 break;
521 case '\0':
522 break;
523 default:
524 printf("unexpected character = %c\n",*end);
525 }
526}
527
528//! Tick count for start
529static tick_count t0;
530
531//! Verbose output flag
532static bool Verbose = false;
533
534typedef value (*MeasureFunc)(int);
535//! Measure ticks count in loop [2..n]
536value Measure(const char *name, MeasureFunc func, int n)
537{
538 value result;
539 if(Verbose) printf("%s",name);
540 t0 = tick_count::now();
541 for(int number = 2; number <= n; number++)
542 result = func(number);
543 if(Verbose) printf("\t- in %f msec\n", (tick_count::now() - t0).seconds()*1000);
544 return result;
545}
546
547//! program entry
548int main(int argc, char* argv[])
549{
550 if(argc>1) Verbose = true;
551 int NumbersCount = argc>1 ? strtol(argv[1],0,0) : 500;
552 IntRange NThread(1,4);// Number of threads to use.
553 if(argc>2) NThread.set_from_string(argv[2]);
554 unsigned long ntrial = argc>3? (unsigned long)strtoul(argv[3],0,0) : 1;
555 value result, sum;
556
557 if(Verbose) printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount);
558
559 result = Measure("Serial loop", SerialFib, NumbersCount);
560 sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); assert(result == sum);
561 sum = Measure("Serial vector", SerialVectorFib, NumbersCount); assert(result == sum);
562 sum = Measure("Serial queue", SerialQueueFib, NumbersCount); assert(result == sum);
563 // now in parallel
564 for( unsigned long i=0; i<ntrial; ++i ) {
565 for(int threads = NThread.low; threads <= NThread.high; threads *= 2)
566 {
567 task_scheduler_init scheduler_init(threads);
568 if(Verbose) printf("\nThreads number is %d\n", threads);
569
570 sum = Measure("Shared serial (mutex)\t", SharedSerialFib<mutex>, NumbersCount); assert(result == sum);
571 sum = Measure("Shared serial (spin_mutex)", SharedSerialFib<spin_mutex>, NumbersCount); assert(result == sum);
572 sum = Measure("Shared serial (queuing_mutex)", SharedSerialFib<queuing_mutex>, NumbersCount); assert(result == sum);
573 sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount); assert(result == sum);
574 sum = Measure("Parallel while+for/queue", ParallelQueueFib, NumbersCount); assert(result == sum);
575 sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount); assert(result == sum);
576 sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount); assert(result == sum);
577 sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount); assert(result == sum);
578 sum = Measure("Parallel tasks\t\t", ParallelTaskFib, NumbersCount); assert(result == sum);
579 }
580
581 #ifdef __GNUC__
582 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result);
583 #else
584 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result);
585 #endif
586 }
587 if(!Verbose) printf("TEST PASSED\n");
588 return 0;
589}
590
591// Utils
592
593void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2])
594{
595 for( int i = 0; i <= 1; i++)
596 for( int j = 0; j <= 1; j++)
597 c[i][j] = a[i][0]*b[0][j] + a[i][1]*b[1][j];
598}
599
600Matrix2x2 Matrix2x2::operator *(const Matrix2x2 &to) const
601{
602 Matrix2x2 result;
603 Matrix2x2Multiply(v, to.v, result.v);
604 return result;
605}
606