1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17// TO DO: Add overlapping put / receive tests
18
19#include "harness.h"
20
21#if __TBB_CPF_BUILD
22#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
23#endif
24
25#include "tbb/flow_graph.h"
26#include "tbb/task_scheduler_init.h"
27#include "tbb/tick_count.h"
28#include "harness_checktype.h"
29#include "harness_graph.h"
30
31#include <cstdio>
32
33#define N 1000
34#define C 10
35
36template< typename T >
37void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
38 while ( q.try_get(value) != true ) ;
39}
40
41template< typename T >
42void check_item( T* next_value, T &value ) {
43 int tid = value / N;
44 int offset = value % N;
45 ASSERT( next_value[tid] == T(offset), NULL );
46 ++next_value[tid];
47}
48
49template< typename T >
50struct parallel_puts : NoAssign {
51
52 tbb::flow::queue_node<T> &my_q;
53
54 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
55
56 void operator()(int i) const {
57 for (int j = 0; j < N; ++j) {
58 bool msg = my_q.try_put( T(N*i + j) );
59 ASSERT( msg == true, NULL );
60 }
61 }
62
63};
64
65
66
67template< typename T >
68struct touches {
69
70 bool **my_touches;
71 T **my_last_touch;
72 int my_num_threads;
73
74 touches( int num_threads ) : my_num_threads(num_threads) {
75 my_last_touch = new T* [my_num_threads];
76 my_touches = new bool* [my_num_threads];
77 for ( int p = 0; p < my_num_threads; ++p) {
78 my_last_touch[p] = new T[my_num_threads];
79 for ( int p2 = 0; p2 < my_num_threads; ++p2)
80 my_last_touch[p][p2] = -1;
81
82 my_touches[p] = new bool[N*my_num_threads];
83 for ( int n = 0; n < N*my_num_threads; ++n)
84 my_touches[p][n] = false;
85 }
86 }
87
88 ~touches() {
89 for ( int p = 0; p < my_num_threads; ++p) {
90 delete [] my_touches[p];
91 delete [] my_last_touch[p];
92 }
93 delete [] my_touches;
94 delete [] my_last_touch;
95 }
96
97 bool check( int tid, T v ) {
98 int v_tid = v / N;
99 if ( my_touches[tid][v] != false ) {
100 printf("Error: value seen twice by local thread\n");
101 return false;
102 }
103 if ( v <= my_last_touch[tid][v_tid] ) {
104 printf("Error: value seen in wrong order by local thread\n");
105 return false;
106 }
107 my_last_touch[tid][v_tid] = v;
108 my_touches[tid][v] = true;
109 return true;
110 }
111
112 bool validate_touches() {
113 bool *all_touches = new bool[N*my_num_threads];
114 for ( int n = 0; n < N*my_num_threads; ++n)
115 all_touches[n] = false;
116
117 for ( int p = 0; p < my_num_threads; ++p) {
118 for ( int n = 0; n < N*my_num_threads; ++n) {
119 if ( my_touches[p][n] == true ) {
120 ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
121 all_touches[n] = true;
122 }
123 }
124 }
125 for ( int n = 0; n < N*my_num_threads; ++n) {
126 if ( !all_touches[n] )
127 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
128 //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
129 }
130 delete [] all_touches;
131 return true;
132 }
133
134};
135
136template< typename T >
137struct parallel_gets : NoAssign {
138
139 tbb::flow::queue_node<T> &my_q;
140 touches<T> &my_touches;
141
142 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
143
144 void operator()(int tid) const {
145 for (int j = 0; j < N; ++j) {
146 T v;
147 spin_try_get( my_q, v );
148 my_touches.check( tid, v );
149 }
150 }
151
152};
153
154template< typename T >
155struct parallel_put_get : NoAssign {
156
157 tbb::flow::queue_node<T> &my_q;
158 touches<T> &my_touches;
159
160 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
161
162 void operator()(int tid) const {
163
164 for ( int i = 0; i < N; i+=C ) {
165 int j_end = ( N < i + C ) ? N : i + C;
166 // dump about C values into the Q
167 for ( int j = i; j < j_end; ++j ) {
168 ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL );
169 }
170 // receiver about C values from the Q
171 for ( int j = i; j < j_end; ++j ) {
172 T v;
173 spin_try_get( my_q, v );
174 my_touches.check( tid, v );
175 }
176 }
177 }
178
179};
180
181//
182// Tests
183//
184// Item can be reserved, released, consumed ( single serial receiver )
185//
186template< typename T >
187int test_reservation() {
188 tbb::flow::graph g;
189 T bogus_value(-1);
190
191 // Simple tests
192 tbb::flow::queue_node<T> q(g);
193
194 q.try_put(T(1));
195 q.try_put(T(2));
196 q.try_put(T(3));
197
198 T v;
199 ASSERT( q.reserve_item(v) == true, NULL );
200 ASSERT( v == T(1), NULL );
201 ASSERT( q.release_reservation() == true, NULL );
202 v = bogus_value;
203 g.wait_for_all();
204 ASSERT( q.reserve_item(v) == true, NULL );
205 ASSERT( v == T(1), NULL );
206 ASSERT( q.consume_reservation() == true, NULL );
207 v = bogus_value;
208 g.wait_for_all();
209
210 ASSERT( q.try_get(v) == true, NULL );
211 ASSERT( v == T(2), NULL );
212 v = bogus_value;
213 g.wait_for_all();
214
215 ASSERT( q.reserve_item(v) == true, NULL );
216 ASSERT( v == T(3), NULL );
217 ASSERT( q.release_reservation() == true, NULL );
218 v = bogus_value;
219 g.wait_for_all();
220 ASSERT( q.reserve_item(v) == true, NULL );
221 ASSERT( v == T(3), NULL );
222 ASSERT( q.consume_reservation() == true, NULL );
223 v = bogus_value;
224 g.wait_for_all();
225
226 return 0;
227}
228
229//
230// Tests
231//
232// multiple parallel senders, items in FIFO (relatively to sender) order
233// multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
234// * overlapped puts / gets
235// * all puts finished before any getS
236//
237template< typename T >
238int test_parallel(int num_threads) {
239 tbb::flow::graph g;
240 tbb::flow::queue_node<T> q(g);
241 tbb::flow::queue_node<T> q2(g);
242 tbb::flow::queue_node<T> q3(g);
243 {
244 Check< T > my_check;
245 T bogus_value(-1);
246 T j = bogus_value;
247 NativeParallelFor( num_threads, parallel_puts<T>(q) );
248
249 T *next_value = new T[num_threads];
250 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
251
252 for (int i = 0; i < num_threads * N; ++i ) {
253 spin_try_get( q, j );
254 check_item( next_value, j );
255 j = bogus_value;
256 }
257 for (int tid = 0; tid < num_threads; ++tid) {
258 ASSERT( next_value[tid] == T(N), NULL );
259 }
260 delete[] next_value;
261
262 j = bogus_value;
263 g.wait_for_all();
264 ASSERT( q.try_get( j ) == false, NULL );
265 ASSERT( j == bogus_value, NULL );
266
267 NativeParallelFor( num_threads, parallel_puts<T>(q) );
268
269 {
270 touches< T > t( num_threads );
271 NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
272 g.wait_for_all();
273 ASSERT( t.validate_touches(), NULL );
274 }
275 j = bogus_value;
276 ASSERT( q.try_get( j ) == false, NULL );
277 ASSERT( j == bogus_value, NULL );
278
279 g.wait_for_all();
280 {
281 touches< T > t2( num_threads );
282 NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
283 g.wait_for_all();
284 ASSERT( t2.validate_touches(), NULL );
285 }
286 j = bogus_value;
287 ASSERT( q.try_get( j ) == false, NULL );
288 ASSERT( j == bogus_value, NULL );
289
290 tbb::flow::make_edge( q, q2 );
291 tbb::flow::make_edge( q2, q3 );
292
293 NativeParallelFor( num_threads, parallel_puts<T>(q) );
294 {
295 touches< T > t3( num_threads );
296 NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
297 g.wait_for_all();
298 ASSERT( t3.validate_touches(), NULL );
299 }
300 j = bogus_value;
301 g.wait_for_all();
302 ASSERT( q.try_get( j ) == false, NULL );
303 g.wait_for_all();
304 ASSERT( q2.try_get( j ) == false, NULL );
305 g.wait_for_all();
306 ASSERT( q3.try_get( j ) == false, NULL );
307 ASSERT( j == bogus_value, NULL );
308
309 // test copy constructor
310 ASSERT( q.remove_successor( q2 ), NULL );
311 NativeParallelFor( num_threads, parallel_puts<T>(q) );
312 tbb::flow::queue_node<T> q_copy(q);
313 j = bogus_value;
314 g.wait_for_all();
315 ASSERT( q_copy.try_get( j ) == false, NULL );
316 ASSERT( q.register_successor( q_copy ) == true, NULL );
317 {
318 touches< T > t( num_threads );
319 NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
320 g.wait_for_all();
321 ASSERT( t.validate_touches(), NULL );
322 }
323 j = bogus_value;
324 ASSERT( q.try_get( j ) == false, NULL );
325 ASSERT( j == bogus_value, NULL );
326 ASSERT( q_copy.try_get( j ) == false, NULL );
327 ASSERT( j == bogus_value, NULL );
328 }
329
330 return 0;
331}
332
333//
334// Tests
335//
336// Predecessors cannot be registered
337// Empty Q rejects item requests
338// Single serial sender, items in FIFO order
339// Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
340//
341
342template< typename T >
343int test_serial() {
344 tbb::flow::graph g;
345 tbb::flow::queue_node<T> q(g);
346 tbb::flow::queue_node<T> q2(g);
347 { // destroy the graph after manipulating it, and see if all the items in the buffers
348 // have been destroyed before the graph
349 Check<T> my_check; // if check_type< U > count constructions and destructions
350 T bogus_value(-1);
351 T j = bogus_value;
352
353 //
354 // Rejects attempts to add / remove predecessor
355 // Rejects request from empty Q
356 //
357 ASSERT( q.register_predecessor( q2 ) == false, NULL );
358 ASSERT( q.remove_predecessor( q2 ) == false, NULL );
359 ASSERT( q.try_get( j ) == false, NULL );
360 ASSERT( j == bogus_value, NULL );
361
362 //
363 // Simple puts and gets
364 //
365
366 for (int i = 0; i < N; ++i) {
367 bool msg = q.try_put( T(i) );
368 ASSERT( msg == true, NULL );
369 }
370
371
372 for (int i = 0; i < N; ++i) {
373 j = bogus_value;
374 spin_try_get( q, j );
375 ASSERT( i == j, NULL );
376 }
377 j = bogus_value;
378 g.wait_for_all();
379 ASSERT( q.try_get( j ) == false, NULL );
380 ASSERT( j == bogus_value, NULL );
381
382 tbb::flow::make_edge( q, q2 );
383
384 for (int i = 0; i < N; ++i) {
385 bool msg = q.try_put( T(i) );
386 ASSERT( msg == true, NULL );
387 }
388
389
390 for (int i = 0; i < N; ++i) {
391 j = bogus_value;
392 spin_try_get( q2, j );
393 ASSERT( i == j, NULL );
394 }
395 j = bogus_value;
396 g.wait_for_all();
397 ASSERT( q.try_get( j ) == false, NULL );
398 g.wait_for_all();
399 ASSERT( q2.try_get( j ) == false, NULL );
400 ASSERT( j == bogus_value, NULL );
401
402 tbb::flow::remove_edge( q, q2 );
403 ASSERT( q.try_put( 1 ) == true, NULL );
404 g.wait_for_all();
405 ASSERT( q2.try_get( j ) == false, NULL );
406 ASSERT( j == bogus_value, NULL );
407 g.wait_for_all();
408 ASSERT( q.try_get( j ) == true, NULL );
409 ASSERT( j == 1, NULL );
410
411 tbb::flow::queue_node<T> q3(g);
412 tbb::flow::make_edge( q, q2 );
413 tbb::flow::make_edge( q2, q3 );
414
415 for (int i = 0; i < N; ++i) {
416 bool msg = q.try_put( T(i) );
417 ASSERT( msg == true, NULL );
418 }
419
420 for (int i = 0; i < N; ++i) {
421 j = bogus_value;
422 spin_try_get( q3, j );
423 ASSERT( i == j, NULL );
424 }
425 j = bogus_value;
426 g.wait_for_all();
427 ASSERT( q.try_get( j ) == false, NULL );
428 g.wait_for_all();
429 ASSERT( q2.try_get( j ) == false, NULL );
430 g.wait_for_all();
431 ASSERT( q3.try_get( j ) == false, NULL );
432 ASSERT( j == bogus_value, NULL );
433
434 tbb::flow::remove_edge( q, q2 );
435 ASSERT( q.try_put( 1 ) == true, NULL );
436 g.wait_for_all();
437 ASSERT( q2.try_get( j ) == false, NULL );
438 ASSERT( j == bogus_value, NULL );
439 g.wait_for_all();
440 ASSERT( q3.try_get( j ) == false, NULL );
441 ASSERT( j == bogus_value, NULL );
442 g.wait_for_all();
443 ASSERT( q.try_get( j ) == true, NULL );
444 ASSERT( j == 1, NULL );
445 }
446
447 return 0;
448}
449
450int TestMain() {
451 tbb::tick_count start = tbb::tick_count::now(), stop;
452 for (int p = 2; p <= 4; ++p) {
453 tbb::task_scheduler_init init(p);
454 test_serial<int>();
455 test_serial<check_type<int> >();
456 test_parallel<int>(p);
457 test_parallel<check_type<int> >(p);
458 }
459 stop = tbb::tick_count::now();
460 REMARK("Queue_Node Time=%6.6f\n", (stop-start).seconds());
461 REMARK("Testing resets\n");
462 test_resets<int, tbb::flow::queue_node<int> >();
463 test_resets<float, tbb::flow::queue_node<float> >();
464#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
465 test_buffer_extract<tbb::flow::queue_node<int> >().run_tests();
466#endif
467 return Harness::Done;
468}
469