1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/pipeline.h"
18#include "tbb/spin_mutex.h"
19#include "tbb/atomic.h"
20#include "tbb/tbb_thread.h"
21#include <cstdlib>
22#include <cstdio>
23#include "harness.h"
24
25// In the test, variables related to token counting are declared
26// as unsigned long to match definition of tbb::internal::Token.
27
28//! Id of thread that first executes work on non-thread-bound stages
29tbb::tbb_thread::id thread_id;
30//! Zero thread id
31tbb::tbb_thread::id id0;
32//! True if non-thread-bound stages must be executed on one thread
33bool is_serial_execution;
34double sleeptime; // how long is a non-thread-bound stage to sleep?
35
36struct Buffer {
37 //! Indicates that the buffer is not used.
38 static const unsigned long unused = ~0ul;
39 unsigned long id;
40 //! True if Buffer is in use.
41 bool is_busy;
42 unsigned long sequence_number;
43 Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
44};
45
46class waiting_probe {
47 size_t check_counter;
48public:
49 waiting_probe() : check_counter(0) {}
50 bool required( ) {
51 ++check_counter;
52 return !((check_counter+1)&size_t(0x7FFF));
53 }
54 void probe( ); // defined below
55};
56
57static const unsigned StreamSize = 10;
58//! Maximum number of filters allowed
59static const unsigned MaxFilters = 4;
60static const unsigned MaxBuffer = 8;
61static bool Done[MaxFilters][StreamSize];
62static waiting_probe WaitTest;
63static unsigned out_of_order_count;
64
65#include "harness_concurrency_tracker.h"
66
67template<typename T>
68class BaseFilter: public T {
69 bool* const my_done;
70 const bool my_is_last;
71 bool concurrency_observed;
72 tbb::atomic<int> running_count;
73public:
74 tbb::atomic<tbb::internal::Token> current_token;
75 BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
76 T(type),
77 my_done(done),
78 my_is_last(is_last),
79 concurrency_observed(false),
80 current_token()
81 {
82 running_count = 0;
83 }
84 ~BaseFilter() {
85 if( this->is_serial() || is_serial_execution )
86 ASSERT( !concurrency_observed, "Unexpected concurrency in a [serial] filter" );
87 else if( sleeptime > 0 )
88 ASSERT( concurrency_observed, "No concurrency in a parallel filter" );
89 }
90 virtual Buffer* get_buffer( void* item ) {
91 current_token++;
92 return static_cast<Buffer*>(item);
93 }
94 void* operator()( void* item ) __TBB_override {
95 // Check if work is done only on one thread when ntokens==1 or
96 // when pipeline has only one filter that is serial and non-thread-bound
97 if( is_serial_execution && !this->is_bound() ) {
98 // Get id of current thread
99 tbb::tbb_thread::id id = tbb::this_tbb_thread::get_id();
100 // At first execution, set thread_id to current thread id.
101 // Serialized execution is expected, so there should be no race.
102 if( thread_id == id0 )
103 thread_id = id;
104 // Check if work is done on one thread
105 ASSERT( thread_id == id, "non-thread-bound stages executed on different threads when must be executed on a single one");
106 }
107 Harness::ConcurrencyTracker ct;
108 concurrency_observed = concurrency_observed || (running_count++ > 0);
109 if( this->is_serial() )
110 ASSERT( !concurrency_observed, "premature entry to serial stage" );
111
112 Buffer* b = get_buffer(item);
113 if( b ) {
114 if(!this->is_bound() && sleeptime > 0) {
115 if(this->is_serial()) {
116 Harness::Sleep((int)sleeptime);
117 } else {
118 // early parallel tokens sleep longer
119 int i = (int)((5 - (int)b->sequence_number) * sleeptime);
120 if(i < (int)sleeptime) i = (int)sleeptime;
121 Harness::Sleep(i);
122 }
123 }
124 if( this->is_ordered() ) {
125 if( b->sequence_number == Buffer::unused )
126 b->sequence_number = current_token-1;
127 else
128 ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
129 } else if( this->is_serial() ) {
130 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
131 out_of_order_count++;
132 }
133 ASSERT( b->id < StreamSize, NULL );
134 ASSERT( !my_done[b->id], "duplicate processing of token?" );
135 ASSERT( b->is_busy, NULL );
136 my_done[b->id] = true;
137 if( my_is_last ) {
138 b->id = Buffer::unused;
139 b->sequence_number = Buffer::unused;
140 __TBB_store_with_release(b->is_busy, false);
141 }
142 }
143 concurrency_observed = concurrency_observed || (--running_count > 0);
144 return b;
145 }
146};
147
148template<typename T>
149class InputFilter: public BaseFilter<T> {
150 tbb::spin_mutex input_lock;
151 Buffer buffer[MaxBuffer];
152 const tbb::internal::Token my_number_of_tokens;
153public:
154 InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
155 BaseFilter<T>(type, done, is_last),
156 my_number_of_tokens(ntokens)
157 {}
158 Buffer* get_buffer( void* ) __TBB_override {
159 unsigned long next_input;
160 unsigned free_buffer = 0;
161 { // lock protected scope
162 tbb::spin_mutex::scoped_lock lock(input_lock);
163 if( this->current_token>=StreamSize )
164 return NULL;
165 next_input = this->current_token++;
166 // once in a while, emulate waiting for input; this only makes sense for serial input
167 if( this->is_serial() && WaitTest.required() )
168 WaitTest.probe( );
169 while( free_buffer<MaxBuffer )
170 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
171 ++free_buffer;
172 else {
173 buffer[free_buffer].is_busy = true;
174 break;
175 }
176 }
177 ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
178 Buffer* b = &buffer[free_buffer];
179 ASSERT( &buffer[0] <= b, NULL );
180 ASSERT( b <= &buffer[MaxBuffer-1], NULL );
181 ASSERT( b->id == Buffer::unused, NULL);
182 b->id = next_input;
183 ASSERT( b->sequence_number == Buffer::unused, NULL);
184 return b;
185 }
186};
187
188class process_loop {
189public:
190 void operator()( tbb::thread_bound_filter* tbf ) {
191 tbb::thread_bound_filter::result_type flag;
192 do
193 flag = tbf->process_item();
194 while( flag != tbb::thread_bound_filter::end_of_stream );
195 }
196};
197
198//! The struct below repeats layout of tbb::pipeline.
199struct hacked_pipeline {
200 tbb::filter* filter_list;
201 tbb::filter* filter_end;
202 tbb::empty_task* end_counter;
203 tbb::atomic<tbb::internal::Token> input_tokens;
204 tbb::atomic<tbb::internal::Token> global_token_counter;
205 bool end_of_input;
206 bool has_thread_bound_filters;
207
208 virtual ~hacked_pipeline();
209};
210
211//! The struct below repeats layout of tbb::internal::ordered_buffer.
212struct hacked_ordered_buffer {
213 void* array; // This should be changed to task_info* if ever used
214 tbb::internal::Token array_size;
215 tbb::internal::Token low_token;
216 tbb::spin_mutex array_mutex;
217 tbb::internal::Token high_token;
218 bool is_ordered;
219 bool is_bound;
220};
221
222//! The struct below repeats layout of tbb::filter.
223struct hacked_filter {
224 tbb::filter* next_filter_in_pipeline;
225 hacked_ordered_buffer* input_buffer;
226 unsigned char my_filter_mode;
227 tbb::filter* prev_filter_in_pipeline;
228 tbb::pipeline* my_pipeline;
229 tbb::filter* next_segment;
230
231 virtual ~hacked_filter();
232};
233
234#if _MSC_VER && !defined(__INTEL_COMPILER)
235 // Workaround for overzealous compiler warnings
236 // Suppress compiler warning about constant conditional expression
237 #pragma warning (disable: 4127)
238#endif
239
240void clear_global_state() {
241 Harness::ConcurrencyTracker::Reset();
242 memset( Done, 0, sizeof(Done) );
243 thread_id = id0;
244 is_serial_execution = false;
245}
246
247
248class PipelineTest {
249 // There are 3 non-thread-bound filter types: serial_in_order and serial_out_of_order, parallel
250 static const tbb::filter::mode non_tb_filters_table[3]; // = { tbb::filter::serial_in_order, tbb::filter::serial_out_of_order, tbb::filter::parallel};
251 // There are 2 thread-bound filter types: serial_in_order and serial_out_of_order
252 static const tbb::filter::mode tb_filters_table[2]; // = { tbb::filter::serial_in_order, tbb::filter::serial_out_of_order };
253
254 static const unsigned number_of_non_tb_filter_types = sizeof(non_tb_filters_table)/sizeof(non_tb_filters_table[0]);
255 static const unsigned number_of_tb_filter_types = sizeof(tb_filters_table)/sizeof(tb_filters_table[0]);
256 static const unsigned number_of_filter_types = number_of_non_tb_filter_types + number_of_tb_filter_types;
257 // static unsigned my_nthread;
258 public:
259 static double TestOneConfiguration( unsigned numeral, unsigned nthread, unsigned number_of_filters, tbb::internal::Token ntokens);
260 static void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters );
261 static void TestIdleSpinning(unsigned nthread);
262
263 static void PrintConfiguration(unsigned numeral, unsigned nFilters) {
264 REMARK( "{ ");
265 for( unsigned i = 0; i < nFilters; ++i) {
266 switch( numeral % number_of_filter_types ) {
267 case 0: REMARK("s "); break;
268 case 1: REMARK("B "); break;
269 case 2: REMARK("o "); break;
270 case 3: REMARK("Bo "); break;
271 case 4: REMARK("P "); break;
272 default: REMARK(" ** ERROR** "); break;
273 }
274 numeral /= number_of_filter_types;
275 }
276 REMARK("}");
277 }
278 static bool ContainsBoundFilter(unsigned numeral) {
279 for( ;numeral != 0; numeral /= number_of_filter_types)
280 if(numeral & 0x1) return true;
281 return false;
282 }
283};
284
285const tbb::filter::mode PipelineTest::non_tb_filters_table[3] = {
286 tbb::filter::serial_in_order, // 0
287 tbb::filter::serial_out_of_order, // 2
288 tbb::filter::parallel // 4
289};
290const tbb::filter::mode PipelineTest::tb_filters_table[2] = {
291 tbb::filter::serial_in_order, // 1
292 tbb::filter::serial_out_of_order // 3
293};
294
295#include "harness_cpu.h"
296
297double PipelineTest::TestOneConfiguration(unsigned numeral, unsigned nthread, unsigned number_of_filters, tbb::internal::Token ntokens)
298{
299 // Build pipeline
300 tbb::pipeline pipeline;
301 tbb::filter* filter[MaxFilters];
302 unsigned temp = numeral;
303 // parallelism_limit is the upper bound on the possible parallelism
304 unsigned parallelism_limit = 0;
305 // number of thread-bound-filters in the current sequence
306 unsigned number_of_tb_filters = 0;
307 // ordinal numbers of thread-bound-filters in the current sequence
308 unsigned array_of_tb_filter_numbers[MaxFilters];
309 if(!ContainsBoundFilter(numeral)) return 0.0;
310 for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
311 bool is_bound = temp%number_of_filter_types&0x1;
312 tbb::filter::mode filter_type;
313 if( is_bound ) {
314 filter_type = tb_filters_table[temp%number_of_filter_types/number_of_non_tb_filter_types];
315 } else
316 filter_type = non_tb_filters_table[temp%number_of_filter_types/number_of_tb_filter_types];
317 const bool is_last = i==number_of_filters-1;
318 if( is_bound ) {
319 if( i == 0 )
320 filter[i] = new InputFilter<tbb::thread_bound_filter>(filter_type,ntokens,Done[i],is_last);
321 else
322 filter[i] = new BaseFilter<tbb::thread_bound_filter>(filter_type,Done[i],is_last);
323 array_of_tb_filter_numbers[number_of_tb_filters] = i;
324 number_of_tb_filters++;
325 } else {
326 if( i == 0 )
327 filter[i] = new InputFilter<tbb::filter>(filter_type,ntokens,Done[i],is_last);
328 else
329 filter[i] = new BaseFilter<tbb::filter>(filter_type,Done[i],is_last);
330 }
331 pipeline.add_filter(*filter[i]);
332 if ( filter[i]->is_serial() ) {
333 parallelism_limit += 1;
334 } else {
335 parallelism_limit = nthread;
336 }
337 }
338 ASSERT(number_of_tb_filters,NULL);
339 clear_global_state();
340 // Account for clipping of parallelism.
341 if( parallelism_limit>nthread )
342 parallelism_limit = nthread;
343 if( parallelism_limit>ntokens )
344 parallelism_limit = (unsigned)ntokens;
345
346 for( unsigned i=0; i<number_of_filters; ++i ) {
347 static_cast<BaseFilter<tbb::filter>*>(filter[i])->current_token=0;
348 }
349 tbb::tbb_thread* t[MaxFilters];
350 for( unsigned j = 0; j<number_of_tb_filters; j++)
351 t[j] = new tbb::tbb_thread(process_loop(), static_cast<tbb::thread_bound_filter*>(filter[array_of_tb_filter_numbers[j]]));
352 if( ntokens == 1 || ( number_of_filters == 1 && number_of_tb_filters == 0 && filter[0]->is_serial() ))
353 is_serial_execution = true;
354 double strttime = GetCPUUserTime();
355 pipeline.run( ntokens );
356 double endtime = GetCPUUserTime();
357 for( unsigned j = 0; j<number_of_tb_filters; j++)
358 t[j]->join();
359 ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
360 for( unsigned i=0; i<number_of_filters; ++i )
361 ASSERT( static_cast<BaseFilter<tbb::filter>*>(filter[i])->current_token==StreamSize, NULL );
362 for( unsigned i=0; i<MaxFilters; ++i )
363 for( unsigned j=0; j<StreamSize; ++j ) {
364 ASSERT( Done[i][j]==(i<number_of_filters), NULL );
365 }
366 if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
367 REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
368 nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
369 for( unsigned i=0; i < number_of_filters; ++i ) {
370 delete filter[i];
371 filter[i] = NULL;
372 }
373 for( unsigned j = 0; j<number_of_tb_filters; j++)
374 delete t[j];
375 pipeline.clear();
376 return endtime - strttime;
377} // TestOneConfiguration
378
379void PipelineTest::TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
380
381 REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
382 ASSERT( number_of_filters<=MaxFilters, "too many filters" );
383 tbb::internal::Token max_tokens = nthread < MaxBuffer ? nthread : MaxBuffer;
384 // The loop has 1 iteration if max_tokens=1 and 2 iterations if max_tokens>1:
385 // one iteration for ntokens=1 and second for ntokens=max_tokens
386 // Iteration for ntokens=1 is required in each test case to check if pipeline run only on one thread
387 unsigned max_iteration = max_tokens > 1 ? 2 : 1;
388 tbb::internal::Token ntokens = 1;
389 for( unsigned iteration = 0; iteration < max_iteration; iteration++) {
390 if( iteration > 0 )
391 ntokens = max_tokens;
392 // Count maximum iterations number
393 unsigned limit = 1;
394 for( unsigned i=0; i<number_of_filters; ++i)
395 limit *= number_of_filter_types;
396 // Iterate over possible filter sequences
397 for( unsigned numeral=0; numeral<limit; ++numeral ) {
398 REMARK( "testing configuration %lu of %lu\n", numeral, limit );
399 (void)TestOneConfiguration(numeral, nthread, number_of_filters, ntokens);
400 }
401 }
402}
403
404// varying times for sleep result in different user times for all pipelines.
405// So we compare the running time of an all non-TBF pipeline with different (with
406// luck representative) TBF configurations.
407//
408// We run the tests multiple times and compare the average runtimes for those cases
409// that don't return 0 user time. configurations that exceed the allowable extra
410// time are reported.
411void PipelineTest::TestIdleSpinning( unsigned nthread) {
412 unsigned sample_setups[] = {
413 // in the comments below, s == serial, o == serial out-of-order,
414 // B == thread bound, Bo == thread bound out-of-order, p == parallel
415 1, // B s s s
416 5, // s B s s
417 25, // s s B s
418 125, // s s s B
419 6, // B B s s
420 26, // B s B s
421 126, // B s s B
422 30, // s B B s
423 130, // s B s B
424 150, // s s B B
425 31, // B B B s
426 131, // B B s B
427 155, // s B B B
428 495, // s p p Bo
429 71, // B p o s
430 355, // s B p o
431 95, // s p Bo s
432 475, // s s p Bo
433 };
434 const int nsetups = sizeof(sample_setups) / sizeof(unsigned);
435 const int ntests = 4;
436 const double bignum = 1000000000.0;
437 const double allowable_slowdown = 3.5;
438 unsigned zero_count = 0;
439
440 REMARK( "testing idle spinning with %lu threads\n", nthread );
441 tbb::internal::Token max_tokens = nthread < MaxBuffer ? nthread : MaxBuffer;
442 for( int i=0; i<nsetups; ++i ) {
443 unsigned numeral = sample_setups[i];
444 unsigned temp = numeral;
445 unsigned nbound = 0;
446 while(temp) {
447 if((temp%number_of_filter_types)&0x01) nbound++;
448 temp /= number_of_filter_types;
449 }
450 sleeptime = 20.0;
451 double s0 = bignum;
452 double s1 = bignum;
453 int v0cnt = 0;
454 int v1cnt = 0;
455 double s0sum = 0.0;
456 double s1sum = 0.0;
457 REMARK(" TestOneConfiguration, pipeline == ");
458 PrintConfiguration(numeral, MaxFilters);
459 REMARK(", max_tokens== %d\n", (int)max_tokens);
460 for(int j = 0; j < ntests; ++j) {
461 double s1a = TestOneConfiguration(numeral, nthread, MaxFilters, max_tokens);
462 double s0a = TestOneConfiguration((unsigned)0, nthread, MaxFilters, max_tokens);
463 s1sum += s1a;
464 s0sum += s0a;
465 if(s0a > 0.0) {
466 ++v0cnt;
467 s0 = (s0a < s0) ? s0a : s0;
468 } else {
469 ++zero_count;
470 }
471 if(s1a > 0.0) {
472 ++v1cnt;
473 s1 = (s1a < s1) ? s1a : s1;
474 } else {
475 ++zero_count;
476 }
477 }
478 if(s0 == bignum || s1 == bignum) continue;
479 s0sum /= (double)v0cnt;
480 s1sum /= (double)v1cnt;
481 double slowdown = (s1sum-s0sum)/s0sum;
482 if(slowdown > allowable_slowdown)
483 REMARK( "with %lu threads configuration %lu has slowdown > %g (%g)\n", nthread, numeral, allowable_slowdown, slowdown );
484 }
485 REMARK("Total of %lu zero times\n", zero_count);
486}
487
488static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
489
490void waiting_probe::probe( ) {
491 if( nthread==1 ) return;
492 REMARK("emulating wait for input\n");
493 // Test that threads sleep while no work.
494 // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
495 TestCPUUserTime(nthread, 2);
496}
497
498#include "tbb/task_scheduler_init.h"
499
500int TestMain () {
501 out_of_order_count = 0;
502 if( MinThread<1 ) {
503 REPORT("must have at least one thread");
504 exit(1);
505 }
506
507 // Test with varying number of threads.
508 for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
509 // Initialize TBB task scheduler
510 tbb::task_scheduler_init init(nthread);
511 sleeptime = 0.0; // msec : 0 == no_timing, > 0, each filter stage sleeps for sleeptime
512
513 // Test pipelines with 1 and maximal number of filters
514 for( unsigned n=1; n<=MaxFilters; n*=MaxFilters ) {
515 // Thread-bound stages are serviced by user-created threads; those
516 // don't run the pipeline and don't service non-thread-bound stages
517 PipelineTest::TestTrivialPipeline(nthread,n);
518 }
519
520 // Test that all workers sleep when no work
521 TestCPUUserTime(nthread);
522 if((unsigned)nthread >= MaxFilters) // test works when number of threads >= number of stages
523 PipelineTest::TestIdleSpinning(nthread);
524 }
525 if( !out_of_order_count )
526 REPORT("Warning: out of order serial filter received tokens in order\n");
527 return Harness::Done;
528}
529