1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "tbb/tbb_stddef.h" |
18 | #include "tbb/pipeline.h" |
19 | #include "tbb/spin_mutex.h" |
20 | #include "tbb/atomic.h" |
21 | #include <cstdlib> |
22 | #include <cstdio> |
23 | #include "harness.h" |
24 | |
25 | // In the test, variables related to token counting are declared |
26 | // as unsigned long to match definition of tbb::internal::Token. |
27 | |
28 | struct Buffer { |
29 | //! Indicates that the buffer is not used. |
30 | static const unsigned long unused = ~0ul; |
31 | unsigned long id; |
32 | //! True if Buffer is in use. |
33 | bool is_busy; |
34 | unsigned long sequence_number; |
35 | Buffer() : id(unused), is_busy(false), sequence_number(unused) {} |
36 | }; |
37 | |
38 | class waiting_probe { |
39 | size_t check_counter; |
40 | public: |
41 | waiting_probe() : check_counter(0) {} |
42 | bool required( ) { |
43 | ++check_counter; |
44 | return !((check_counter+1)&size_t(0x7FFF)); |
45 | } |
46 | void probe( ); // defined below |
47 | }; |
48 | |
49 | static const unsigned MaxStreamSize = 8000; |
50 | static const unsigned MaxStreamItemsPerThread = 1000; |
51 | //! Maximum number of filters allowed |
52 | static const unsigned MaxFilters = 5; |
53 | static unsigned StreamSize; |
54 | static const unsigned MaxBuffer = 8; |
55 | static bool Done[MaxFilters][MaxStreamSize]; |
56 | static waiting_probe WaitTest; |
57 | static unsigned out_of_order_count; |
58 | |
59 | #include "harness_concurrency_tracker.h" |
60 | |
61 | class BaseFilter: public tbb::filter { |
62 | bool* const my_done; |
63 | const bool my_is_last; |
64 | bool my_is_running; |
65 | public: |
66 | tbb::atomic<tbb::internal::Token> current_token; |
67 | BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) : |
68 | filter(type), |
69 | my_done(done), |
70 | my_is_last(is_last), |
71 | my_is_running(false), |
72 | current_token() |
73 | {} |
74 | virtual Buffer* get_buffer( void* item ) { |
75 | current_token++; |
76 | return static_cast<Buffer*>(item); |
77 | } |
78 | void* operator()( void* item ) __TBB_override { |
79 | Harness::ConcurrencyTracker ct; |
80 | if( is_serial() ) |
81 | ASSERT( !my_is_running, "premature entry to serial stage" ); |
82 | my_is_running = true; |
83 | Buffer* b = get_buffer(item); |
84 | if( b ) { |
85 | if( is_ordered() ) { |
86 | if( b->sequence_number == Buffer::unused ) |
87 | b->sequence_number = current_token-1; |
88 | else |
89 | ASSERT( b->sequence_number==current_token-1, "item arrived out of order" ); |
90 | } else if( is_serial() ) { |
91 | if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused ) |
92 | out_of_order_count++; |
93 | } |
94 | ASSERT( b->id < StreamSize, NULL ); |
95 | ASSERT( !my_done[b->id], "duplicate processing of token?" ); |
96 | ASSERT( b->is_busy, NULL ); |
97 | my_done[b->id] = true; |
98 | if( my_is_last ) { |
99 | b->id = Buffer::unused; |
100 | b->sequence_number = Buffer::unused; |
101 | __TBB_store_with_release(b->is_busy, false); |
102 | } |
103 | } |
104 | my_is_running = false; |
105 | return b; |
106 | } |
107 | }; |
108 | |
109 | class InputFilter: public BaseFilter { |
110 | tbb::spin_mutex input_lock; |
111 | Buffer buffer[MaxBuffer]; |
112 | const tbb::internal::Token my_number_of_tokens; |
113 | public: |
114 | InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) : |
115 | BaseFilter(type, done, is_last), |
116 | my_number_of_tokens(ntokens) |
117 | {} |
118 | Buffer* get_buffer( void* ) __TBB_override { |
119 | unsigned long next_input; |
120 | unsigned free_buffer = 0; |
121 | { // lock protected scope |
122 | tbb::spin_mutex::scoped_lock lock(input_lock); |
123 | if( current_token>=StreamSize ) |
124 | return NULL; |
125 | next_input = current_token++; |
126 | // once in a while, emulate waiting for input; this only makes sense for serial input |
127 | if( is_serial() && WaitTest.required() ) |
128 | WaitTest.probe( ); |
129 | while( free_buffer<MaxBuffer ) |
130 | if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) ) |
131 | ++free_buffer; |
132 | else { |
133 | buffer[free_buffer].is_busy = true; |
134 | break; |
135 | } |
136 | } |
137 | ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" ); |
138 | Buffer* b = &buffer[free_buffer]; |
139 | ASSERT( &buffer[0] <= b, NULL ); |
140 | ASSERT( b <= &buffer[MaxBuffer-1], NULL ); |
141 | ASSERT( b->id == Buffer::unused, NULL); |
142 | b->id = next_input; |
143 | ASSERT( b->sequence_number == Buffer::unused, NULL); |
144 | return b; |
145 | } |
146 | }; |
147 | |
148 | //! The struct below repeats layout of tbb::pipeline. |
149 | struct hacked_pipeline { |
150 | tbb::filter* filter_list; |
151 | tbb::filter* filter_end; |
152 | tbb::empty_task* end_counter; |
153 | tbb::atomic<tbb::internal::Token> input_tokens; |
154 | tbb::atomic<tbb::internal::Token> token_counter; |
155 | bool end_of_input; |
156 | bool has_thread_bound_filters; |
157 | |
158 | virtual ~hacked_pipeline(); |
159 | }; |
160 | |
161 | //! The struct below repeats layout of tbb::internal::input_buffer. |
162 | struct hacked_input_buffer { |
163 | void* array; // This should be changed to task_info* if ever used |
164 | void* my_sem; // This should be changed to semaphore* if ever used |
165 | tbb::internal::Token array_size; |
166 | tbb::internal::Token low_token; |
167 | tbb::spin_mutex array_mutex; |
168 | tbb::internal::Token high_token; |
169 | bool is_ordered; |
170 | bool is_bound; |
171 | }; |
172 | |
173 | //! The struct below repeats layout of tbb::filter. |
174 | struct hacked_filter { |
175 | tbb::filter* next_filter_in_pipeline; |
176 | hacked_input_buffer* my_input_buffer; |
177 | unsigned char my_filter_mode; |
178 | tbb::filter* prev_filter_in_pipeline; |
179 | tbb::pipeline* my_pipeline; |
180 | tbb::filter* next_segment; |
181 | |
182 | virtual ~hacked_filter(); |
183 | }; |
184 | |
185 | bool do_hacking_tests = true; |
186 | const tbb::internal::Token tokens_before_wraparound = 0xF; |
187 | |
188 | void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) { |
189 | // There are 3 filter types: parallel, serial_in_order and serial_out_of_order |
190 | static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order}; |
191 | const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]); |
192 | REMARK( "testing with %lu threads and %lu filters\n" , nthread, number_of_filters ); |
193 | ASSERT( number_of_filters<=MaxFilters, "too many filters" ); |
194 | ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" ); |
195 | ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" ); |
196 | tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer; |
197 | // Count maximum iterations number |
198 | unsigned limit = 1; |
199 | for( unsigned i=0; i<number_of_filters; ++i) |
200 | limit *= number_of_filter_types; |
201 | // Iterate over possible filter sequences |
202 | for( unsigned numeral=0; numeral<limit; ++numeral ) { |
203 | // Build pipeline |
204 | tbb::pipeline pipeline; |
205 | if( do_hacking_tests ) { |
206 | // A private member of pipeline is hacked there for sake of testing wrap-around immunity. |
207 | tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound; |
208 | } |
209 | tbb::filter* filter[MaxFilters]; |
210 | unsigned temp = numeral; |
211 | // parallelism_limit is the upper bound on the possible parallelism |
212 | unsigned parallelism_limit = 0; |
213 | for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) { |
214 | tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types]; |
215 | const bool is_last = i==number_of_filters-1; |
216 | if( i==0 ) |
217 | filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last); |
218 | else |
219 | filter[i] = new BaseFilter(filter_type,Done[i],is_last); |
220 | pipeline.add_filter(*filter[i]); |
221 | // The ordered buffer of serial filters is hacked as well. |
222 | if ( filter[i]->is_serial() ) { |
223 | if( do_hacking_tests ) { |
224 | ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound; |
225 | ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound; |
226 | } |
227 | parallelism_limit += 1; |
228 | } else { |
229 | parallelism_limit = nthread; |
230 | } |
231 | } |
232 | // Account for clipping of parallelism. |
233 | if( parallelism_limit>nthread ) |
234 | parallelism_limit = nthread; |
235 | if( parallelism_limit>ntokens ) |
236 | parallelism_limit = (unsigned)ntokens; |
237 | Harness::ConcurrencyTracker::Reset(); |
238 | unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread ); |
239 | for( StreamSize=0; StreamSize<=streamSizeLimit; ) { |
240 | memset( Done, 0, sizeof(Done) ); |
241 | for( unsigned i=0; i<number_of_filters; ++i ) { |
242 | static_cast<BaseFilter*>(filter[i])->current_token=0; |
243 | } |
244 | pipeline.run( ntokens ); |
245 | ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" ); |
246 | for( unsigned i=0; i<number_of_filters; ++i ) |
247 | ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL ); |
248 | for( unsigned i=0; i<MaxFilters; ++i ) |
249 | for( unsigned j=0; j<StreamSize; ++j ) { |
250 | ASSERT( Done[i][j]==(i<number_of_filters), NULL ); |
251 | } |
252 | if( StreamSize < min(nthread*8, 32u) ) { |
253 | ++StreamSize; |
254 | } else { |
255 | StreamSize = StreamSize*8/3; |
256 | } |
257 | } |
258 | if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit ) |
259 | REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n" , |
260 | nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit ); |
261 | for( unsigned i=0; i < number_of_filters; ++i ) { |
262 | delete filter[i]; |
263 | filter[i] = NULL; |
264 | } |
265 | pipeline.clear(); |
266 | } |
267 | } |
268 | |
269 | #include "harness_cpu.h" |
270 | |
271 | static int nthread; // knowing number of threads is necessary to call TestCPUUserTime |
272 | |
273 | void waiting_probe::probe( ) { |
274 | if( nthread==1 ) return; |
275 | REMARK("emulating wait for input\n" ); |
276 | // Test that threads sleep while no work. |
277 | // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input |
278 | TestCPUUserTime(nthread, 2); |
279 | } |
280 | |
281 | #include "tbb/task_scheduler_init.h" |
282 | |
283 | int TestMain () { |
284 | out_of_order_count = 0; |
285 | if( MinThread<1 ) { |
286 | REPORT("must have at least one thread" ); |
287 | exit(1); |
288 | } |
289 | if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) { |
290 | REMARK("Warning: implementation dependent tests disabled\n" ); |
291 | do_hacking_tests = false; |
292 | } |
293 | |
294 | // Test with varying number of threads. |
295 | for( nthread=MinThread; nthread<=MaxThread; ++nthread ) { |
296 | // Initialize TBB task scheduler |
297 | tbb::task_scheduler_init init(nthread); |
298 | |
299 | // Test pipelines with n filters |
300 | for( unsigned n=0; n<=MaxFilters; ++n ) |
301 | TestTrivialPipeline(nthread,n); |
302 | |
303 | // Test that all workers sleep when no work |
304 | TestCPUUserTime(nthread); |
305 | } |
306 | if( !out_of_order_count ) |
307 | REPORT("Warning: out of order serial filter received tokens in order\n" ); |
308 | return Harness::Done; |
309 | } |
310 | |