1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | // TO DO: Add overlapping put / receive tests |
18 | |
19 | #include "harness.h" |
20 | |
21 | #if __TBB_CPF_BUILD |
22 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
23 | #endif |
24 | |
25 | #include "tbb/flow_graph.h" |
26 | #include "tbb/task_scheduler_init.h" |
27 | #include "tbb/tick_count.h" |
28 | #include "harness_checktype.h" |
29 | #include "harness_graph.h" |
30 | |
31 | #include <cstdio> |
32 | |
33 | #define N 1000 |
34 | #define C 10 |
35 | |
36 | template< typename T > |
37 | void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { |
38 | while ( q.try_get(value) != true ) ; |
39 | } |
40 | |
41 | template< typename T > |
42 | void check_item( T* next_value, T &value ) { |
43 | int tid = value / N; |
44 | int offset = value % N; |
45 | ASSERT( next_value[tid] == T(offset), NULL ); |
46 | ++next_value[tid]; |
47 | } |
48 | |
49 | template< typename T > |
50 | struct parallel_puts : NoAssign { |
51 | |
52 | tbb::flow::queue_node<T> &my_q; |
53 | |
54 | parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} |
55 | |
56 | void operator()(int i) const { |
57 | for (int j = 0; j < N; ++j) { |
58 | bool msg = my_q.try_put( T(N*i + j) ); |
59 | ASSERT( msg == true, NULL ); |
60 | } |
61 | } |
62 | |
63 | }; |
64 | |
65 | |
66 | |
67 | template< typename T > |
68 | struct touches { |
69 | |
70 | bool **my_touches; |
71 | T **my_last_touch; |
72 | int my_num_threads; |
73 | |
74 | touches( int num_threads ) : my_num_threads(num_threads) { |
75 | my_last_touch = new T* [my_num_threads]; |
76 | my_touches = new bool* [my_num_threads]; |
77 | for ( int p = 0; p < my_num_threads; ++p) { |
78 | my_last_touch[p] = new T[my_num_threads]; |
79 | for ( int p2 = 0; p2 < my_num_threads; ++p2) |
80 | my_last_touch[p][p2] = -1; |
81 | |
82 | my_touches[p] = new bool[N*my_num_threads]; |
83 | for ( int n = 0; n < N*my_num_threads; ++n) |
84 | my_touches[p][n] = false; |
85 | } |
86 | } |
87 | |
88 | ~touches() { |
89 | for ( int p = 0; p < my_num_threads; ++p) { |
90 | delete [] my_touches[p]; |
91 | delete [] my_last_touch[p]; |
92 | } |
93 | delete [] my_touches; |
94 | delete [] my_last_touch; |
95 | } |
96 | |
97 | bool check( int tid, T v ) { |
98 | int v_tid = v / N; |
99 | if ( my_touches[tid][v] != false ) { |
100 | printf("Error: value seen twice by local thread\n" ); |
101 | return false; |
102 | } |
103 | if ( v <= my_last_touch[tid][v_tid] ) { |
104 | printf("Error: value seen in wrong order by local thread\n" ); |
105 | return false; |
106 | } |
107 | my_last_touch[tid][v_tid] = v; |
108 | my_touches[tid][v] = true; |
109 | return true; |
110 | } |
111 | |
112 | bool validate_touches() { |
113 | bool *all_touches = new bool[N*my_num_threads]; |
114 | for ( int n = 0; n < N*my_num_threads; ++n) |
115 | all_touches[n] = false; |
116 | |
117 | for ( int p = 0; p < my_num_threads; ++p) { |
118 | for ( int n = 0; n < N*my_num_threads; ++n) { |
119 | if ( my_touches[p][n] == true ) { |
120 | ASSERT( all_touches[n] == false, "value see by more than one thread\n" ); |
121 | all_touches[n] = true; |
122 | } |
123 | } |
124 | } |
125 | for ( int n = 0; n < N*my_num_threads; ++n) { |
126 | if ( !all_touches[n] ) |
127 | printf("No touch at %d, my_num_threads = %d\n" , n, my_num_threads); |
128 | //ASSERT( all_touches[n] == true, "value not seen by any thread\n" ); |
129 | } |
130 | delete [] all_touches; |
131 | return true; |
132 | } |
133 | |
134 | }; |
135 | |
136 | template< typename T > |
137 | struct parallel_gets : NoAssign { |
138 | |
139 | tbb::flow::queue_node<T> &my_q; |
140 | touches<T> &my_touches; |
141 | |
142 | parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} |
143 | |
144 | void operator()(int tid) const { |
145 | for (int j = 0; j < N; ++j) { |
146 | T v; |
147 | spin_try_get( my_q, v ); |
148 | my_touches.check( tid, v ); |
149 | } |
150 | } |
151 | |
152 | }; |
153 | |
154 | template< typename T > |
155 | struct parallel_put_get : NoAssign { |
156 | |
157 | tbb::flow::queue_node<T> &my_q; |
158 | touches<T> &my_touches; |
159 | |
160 | parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} |
161 | |
162 | void operator()(int tid) const { |
163 | |
164 | for ( int i = 0; i < N; i+=C ) { |
165 | int j_end = ( N < i + C ) ? N : i + C; |
166 | // dump about C values into the Q |
167 | for ( int j = i; j < j_end; ++j ) { |
168 | ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL ); |
169 | } |
170 | // receiver about C values from the Q |
171 | for ( int j = i; j < j_end; ++j ) { |
172 | T v; |
173 | spin_try_get( my_q, v ); |
174 | my_touches.check( tid, v ); |
175 | } |
176 | } |
177 | } |
178 | |
179 | }; |
180 | |
181 | // |
182 | // Tests |
183 | // |
184 | // Item can be reserved, released, consumed ( single serial receiver ) |
185 | // |
186 | template< typename T > |
187 | int test_reservation() { |
188 | tbb::flow::graph g; |
189 | T bogus_value(-1); |
190 | |
191 | // Simple tests |
192 | tbb::flow::queue_node<T> q(g); |
193 | |
194 | q.try_put(T(1)); |
195 | q.try_put(T(2)); |
196 | q.try_put(T(3)); |
197 | |
198 | T v; |
199 | ASSERT( q.reserve_item(v) == true, NULL ); |
200 | ASSERT( v == T(1), NULL ); |
201 | ASSERT( q.release_reservation() == true, NULL ); |
202 | v = bogus_value; |
203 | g.wait_for_all(); |
204 | ASSERT( q.reserve_item(v) == true, NULL ); |
205 | ASSERT( v == T(1), NULL ); |
206 | ASSERT( q.consume_reservation() == true, NULL ); |
207 | v = bogus_value; |
208 | g.wait_for_all(); |
209 | |
210 | ASSERT( q.try_get(v) == true, NULL ); |
211 | ASSERT( v == T(2), NULL ); |
212 | v = bogus_value; |
213 | g.wait_for_all(); |
214 | |
215 | ASSERT( q.reserve_item(v) == true, NULL ); |
216 | ASSERT( v == T(3), NULL ); |
217 | ASSERT( q.release_reservation() == true, NULL ); |
218 | v = bogus_value; |
219 | g.wait_for_all(); |
220 | ASSERT( q.reserve_item(v) == true, NULL ); |
221 | ASSERT( v == T(3), NULL ); |
222 | ASSERT( q.consume_reservation() == true, NULL ); |
223 | v = bogus_value; |
224 | g.wait_for_all(); |
225 | |
226 | return 0; |
227 | } |
228 | |
229 | // |
230 | // Tests |
231 | // |
232 | // multiple parallel senders, items in FIFO (relatively to sender) order |
233 | // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received |
234 | // * overlapped puts / gets |
235 | // * all puts finished before any getS |
236 | // |
237 | template< typename T > |
238 | int test_parallel(int num_threads) { |
239 | tbb::flow::graph g; |
240 | tbb::flow::queue_node<T> q(g); |
241 | tbb::flow::queue_node<T> q2(g); |
242 | tbb::flow::queue_node<T> q3(g); |
243 | { |
244 | Check< T > my_check; |
245 | T bogus_value(-1); |
246 | T j = bogus_value; |
247 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
248 | |
249 | T *next_value = new T[num_threads]; |
250 | for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); |
251 | |
252 | for (int i = 0; i < num_threads * N; ++i ) { |
253 | spin_try_get( q, j ); |
254 | check_item( next_value, j ); |
255 | j = bogus_value; |
256 | } |
257 | for (int tid = 0; tid < num_threads; ++tid) { |
258 | ASSERT( next_value[tid] == T(N), NULL ); |
259 | } |
260 | delete[] next_value; |
261 | |
262 | j = bogus_value; |
263 | g.wait_for_all(); |
264 | ASSERT( q.try_get( j ) == false, NULL ); |
265 | ASSERT( j == bogus_value, NULL ); |
266 | |
267 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
268 | |
269 | { |
270 | touches< T > t( num_threads ); |
271 | NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); |
272 | g.wait_for_all(); |
273 | ASSERT( t.validate_touches(), NULL ); |
274 | } |
275 | j = bogus_value; |
276 | ASSERT( q.try_get( j ) == false, NULL ); |
277 | ASSERT( j == bogus_value, NULL ); |
278 | |
279 | g.wait_for_all(); |
280 | { |
281 | touches< T > t2( num_threads ); |
282 | NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); |
283 | g.wait_for_all(); |
284 | ASSERT( t2.validate_touches(), NULL ); |
285 | } |
286 | j = bogus_value; |
287 | ASSERT( q.try_get( j ) == false, NULL ); |
288 | ASSERT( j == bogus_value, NULL ); |
289 | |
290 | tbb::flow::make_edge( q, q2 ); |
291 | tbb::flow::make_edge( q2, q3 ); |
292 | |
293 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
294 | { |
295 | touches< T > t3( num_threads ); |
296 | NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); |
297 | g.wait_for_all(); |
298 | ASSERT( t3.validate_touches(), NULL ); |
299 | } |
300 | j = bogus_value; |
301 | g.wait_for_all(); |
302 | ASSERT( q.try_get( j ) == false, NULL ); |
303 | g.wait_for_all(); |
304 | ASSERT( q2.try_get( j ) == false, NULL ); |
305 | g.wait_for_all(); |
306 | ASSERT( q3.try_get( j ) == false, NULL ); |
307 | ASSERT( j == bogus_value, NULL ); |
308 | |
309 | // test copy constructor |
310 | ASSERT( q.remove_successor( q2 ), NULL ); |
311 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
312 | tbb::flow::queue_node<T> q_copy(q); |
313 | j = bogus_value; |
314 | g.wait_for_all(); |
315 | ASSERT( q_copy.try_get( j ) == false, NULL ); |
316 | ASSERT( q.register_successor( q_copy ) == true, NULL ); |
317 | { |
318 | touches< T > t( num_threads ); |
319 | NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); |
320 | g.wait_for_all(); |
321 | ASSERT( t.validate_touches(), NULL ); |
322 | } |
323 | j = bogus_value; |
324 | ASSERT( q.try_get( j ) == false, NULL ); |
325 | ASSERT( j == bogus_value, NULL ); |
326 | ASSERT( q_copy.try_get( j ) == false, NULL ); |
327 | ASSERT( j == bogus_value, NULL ); |
328 | } |
329 | |
330 | return 0; |
331 | } |
332 | |
333 | // |
334 | // Tests |
335 | // |
336 | // Predecessors cannot be registered |
337 | // Empty Q rejects item requests |
338 | // Single serial sender, items in FIFO order |
339 | // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order |
340 | // |
341 | |
342 | template< typename T > |
343 | int test_serial() { |
344 | tbb::flow::graph g; |
345 | tbb::flow::queue_node<T> q(g); |
346 | tbb::flow::queue_node<T> q2(g); |
347 | { // destroy the graph after manipulating it, and see if all the items in the buffers |
348 | // have been destroyed before the graph |
349 | Check<T> my_check; // if check_type< U > count constructions and destructions |
350 | T bogus_value(-1); |
351 | T j = bogus_value; |
352 | |
353 | // |
354 | // Rejects attempts to add / remove predecessor |
355 | // Rejects request from empty Q |
356 | // |
357 | ASSERT( q.register_predecessor( q2 ) == false, NULL ); |
358 | ASSERT( q.remove_predecessor( q2 ) == false, NULL ); |
359 | ASSERT( q.try_get( j ) == false, NULL ); |
360 | ASSERT( j == bogus_value, NULL ); |
361 | |
362 | // |
363 | // Simple puts and gets |
364 | // |
365 | |
366 | for (int i = 0; i < N; ++i) { |
367 | bool msg = q.try_put( T(i) ); |
368 | ASSERT( msg == true, NULL ); |
369 | } |
370 | |
371 | |
372 | for (int i = 0; i < N; ++i) { |
373 | j = bogus_value; |
374 | spin_try_get( q, j ); |
375 | ASSERT( i == j, NULL ); |
376 | } |
377 | j = bogus_value; |
378 | g.wait_for_all(); |
379 | ASSERT( q.try_get( j ) == false, NULL ); |
380 | ASSERT( j == bogus_value, NULL ); |
381 | |
382 | tbb::flow::make_edge( q, q2 ); |
383 | |
384 | for (int i = 0; i < N; ++i) { |
385 | bool msg = q.try_put( T(i) ); |
386 | ASSERT( msg == true, NULL ); |
387 | } |
388 | |
389 | |
390 | for (int i = 0; i < N; ++i) { |
391 | j = bogus_value; |
392 | spin_try_get( q2, j ); |
393 | ASSERT( i == j, NULL ); |
394 | } |
395 | j = bogus_value; |
396 | g.wait_for_all(); |
397 | ASSERT( q.try_get( j ) == false, NULL ); |
398 | g.wait_for_all(); |
399 | ASSERT( q2.try_get( j ) == false, NULL ); |
400 | ASSERT( j == bogus_value, NULL ); |
401 | |
402 | tbb::flow::remove_edge( q, q2 ); |
403 | ASSERT( q.try_put( 1 ) == true, NULL ); |
404 | g.wait_for_all(); |
405 | ASSERT( q2.try_get( j ) == false, NULL ); |
406 | ASSERT( j == bogus_value, NULL ); |
407 | g.wait_for_all(); |
408 | ASSERT( q.try_get( j ) == true, NULL ); |
409 | ASSERT( j == 1, NULL ); |
410 | |
411 | tbb::flow::queue_node<T> q3(g); |
412 | tbb::flow::make_edge( q, q2 ); |
413 | tbb::flow::make_edge( q2, q3 ); |
414 | |
415 | for (int i = 0; i < N; ++i) { |
416 | bool msg = q.try_put( T(i) ); |
417 | ASSERT( msg == true, NULL ); |
418 | } |
419 | |
420 | for (int i = 0; i < N; ++i) { |
421 | j = bogus_value; |
422 | spin_try_get( q3, j ); |
423 | ASSERT( i == j, NULL ); |
424 | } |
425 | j = bogus_value; |
426 | g.wait_for_all(); |
427 | ASSERT( q.try_get( j ) == false, NULL ); |
428 | g.wait_for_all(); |
429 | ASSERT( q2.try_get( j ) == false, NULL ); |
430 | g.wait_for_all(); |
431 | ASSERT( q3.try_get( j ) == false, NULL ); |
432 | ASSERT( j == bogus_value, NULL ); |
433 | |
434 | tbb::flow::remove_edge( q, q2 ); |
435 | ASSERT( q.try_put( 1 ) == true, NULL ); |
436 | g.wait_for_all(); |
437 | ASSERT( q2.try_get( j ) == false, NULL ); |
438 | ASSERT( j == bogus_value, NULL ); |
439 | g.wait_for_all(); |
440 | ASSERT( q3.try_get( j ) == false, NULL ); |
441 | ASSERT( j == bogus_value, NULL ); |
442 | g.wait_for_all(); |
443 | ASSERT( q.try_get( j ) == true, NULL ); |
444 | ASSERT( j == 1, NULL ); |
445 | } |
446 | |
447 | return 0; |
448 | } |
449 | |
450 | int TestMain() { |
451 | tbb::tick_count start = tbb::tick_count::now(), stop; |
452 | for (int p = 2; p <= 4; ++p) { |
453 | tbb::task_scheduler_init init(p); |
454 | test_serial<int>(); |
455 | test_serial<check_type<int> >(); |
456 | test_parallel<int>(p); |
457 | test_parallel<check_type<int> >(p); |
458 | } |
459 | stop = tbb::tick_count::now(); |
460 | REMARK("Queue_Node Time=%6.6f\n" , (stop-start).seconds()); |
461 | REMARK("Testing resets\n" ); |
462 | test_resets<int, tbb::flow::queue_node<int> >(); |
463 | test_resets<float, tbb::flow::queue_node<float> >(); |
464 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
465 | test_buffer_extract<tbb::flow::queue_node<int> >().run_tests(); |
466 | #endif |
467 | return Harness::Done; |
468 | } |
469 | |