1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17// TO DO: Add overlapping put / receive tests
18
19#include "harness.h"
20
21#if __TBB_CPF_BUILD
22#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
23#endif
24
25#include "tbb/flow_graph.h"
26#include "harness_checktype.h"
27#include "tbb/task_scheduler_init.h"
28#include "tbb/tick_count.h"
29#include "harness_graph.h"
30
31#include <cstdio>
32
33#define N 10
34#define C 10
35
36template< typename T >
37void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) {
38 while ( q.try_get(value) != true ) ;
39}
40
41template< typename T >
42void check_item( T* next_value, T &value ) {
43 int tid = value / N;
44 int offset = value % N;
45 ASSERT( next_value[tid] == T(offset), NULL );
46 ++next_value[tid];
47}
48
49template< typename T >
50struct parallel_puts : NoAssign {
51 tbb::flow::priority_queue_node<T> &my_q;
52 parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
53 void operator()(int i) const {
54 for (int j = 0; j < N; ++j) {
55 bool msg = my_q.try_put( T(N*i + j) );
56 ASSERT( msg == true, NULL );
57 }
58 }
59};
60
61template< typename T >
62struct parallel_gets : NoAssign {
63 tbb::flow::priority_queue_node<T> &my_q;
64 parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {}
65 void operator()(int) const {
66 T prev;
67 spin_try_get( my_q, prev );
68 for (int j = 0; j < N-1; ++j) {
69 T v;
70 spin_try_get( my_q, v );
71 ASSERT(v < prev, NULL);
72 }
73 }
74};
75
76template< typename T >
77struct parallel_put_get : NoAssign {
78 tbb::flow::priority_queue_node<T> &my_q;
79 parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
80 void operator()(int tid) const {
81 for ( int i = 0; i < N; i+=C ) {
82 int j_end = ( N < i + C ) ? N : i + C;
83 // dump about C values into the Q
84 for ( int j = i; j < j_end; ++j ) {
85 ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL );
86 }
87 // receive about C values from the Q
88 for ( int j = i; j < j_end; ++j ) {
89 T v;
90 spin_try_get( my_q, v );
91 }
92 }
93 }
94};
95
96//
97// Tests
98//
99// Item can be reserved, released, consumed ( single serial receiver )
100//
101template< typename T >
102int test_reservation(int) {
103 tbb::flow::graph g;
104
105 // Simple tests
106 tbb::flow::priority_queue_node<T> q(g);
107
108 {
109
110 T bogus_value(-1);
111
112 q.try_put(T(1));
113 q.try_put(T(2));
114 q.try_put(T(3));
115 g.wait_for_all();
116
117 T v=bogus_value, w=bogus_value;
118 ASSERT( q.try_reserve(v) == true, NULL );
119 ASSERT( v == T(3), NULL );
120 ASSERT( q.try_release() == true, NULL );
121 v = bogus_value;
122 g.wait_for_all();
123 ASSERT( q.try_reserve(v) == true, NULL );
124 ASSERT( v == T(3), NULL );
125 ASSERT( q.try_consume() == true, NULL );
126 v = bogus_value;
127 g.wait_for_all();
128
129 ASSERT( q.try_get(v) == true, NULL );
130 ASSERT( v == T(2), NULL );
131 v = bogus_value;
132 g.wait_for_all();
133
134 ASSERT( q.try_reserve(v) == true, NULL );
135 ASSERT( v == T(1), NULL );
136 ASSERT( q.try_reserve(w) == false, NULL );
137 ASSERT( w == bogus_value, NULL );
138 ASSERT( q.try_get(w) == false, NULL );
139 ASSERT( w == bogus_value, NULL );
140 ASSERT( q.try_release() == true, NULL );
141 v = bogus_value;
142 g.wait_for_all();
143 ASSERT( q.try_reserve(v) == true, NULL );
144 ASSERT( v == T(1), NULL );
145 ASSERT( q.try_consume() == true, NULL );
146 v = bogus_value;
147 g.wait_for_all();
148 ASSERT( q.try_get(v) == false, NULL );
149 }
150 return 0;
151}
152
153//
154// Tests
155//
156// multiple parallel senders, items in FIFO (relatively to sender) order
157// multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
158// * overlapped puts / gets
159// * all puts finished before any getS
160//
161template< typename T >
162int test_parallel(int num_threads) {
163 tbb::flow::graph g;
164 tbb::flow::priority_queue_node<T> q(g);
165 tbb::flow::priority_queue_node<T> q2(g);
166 tbb::flow::priority_queue_node<T> q3(g);
167 T bogus_value(-1);
168 T j = bogus_value;
169
170 NativeParallelFor( num_threads, parallel_puts<T>(q) );
171 for (int i = num_threads*N -1; i>=0; --i) {
172 spin_try_get( q, j );
173 ASSERT(j == i, NULL);
174 j = bogus_value;
175 }
176 g.wait_for_all();
177 ASSERT( q.try_get( j ) == false, NULL );
178 ASSERT( j == bogus_value, NULL );
179
180 NativeParallelFor( num_threads, parallel_puts<T>(q) );
181 g.wait_for_all();
182 NativeParallelFor( num_threads, parallel_gets<T>(q) );
183 g.wait_for_all();
184 j = bogus_value;
185 ASSERT( q.try_get( j ) == false, NULL );
186 ASSERT( j == bogus_value, NULL );
187
188 NativeParallelFor( num_threads, parallel_put_get<T>(q) );
189 g.wait_for_all();
190 j = bogus_value;
191 ASSERT( q.try_get( j ) == false, NULL );
192 ASSERT( j == bogus_value, NULL );
193
194 tbb::flow::make_edge( q, q2 );
195 tbb::flow::make_edge( q2, q3 );
196 NativeParallelFor( num_threads, parallel_puts<T>(q) );
197 g.wait_for_all();
198 NativeParallelFor( num_threads, parallel_gets<T>(q3) );
199 g.wait_for_all();
200 j = bogus_value;
201 ASSERT( q.try_get( j ) == false, NULL );
202 ASSERT( j == bogus_value, NULL );
203 ASSERT( q2.try_get( j ) == false, NULL );
204 ASSERT( j == bogus_value, NULL );
205 ASSERT( q3.try_get( j ) == false, NULL );
206 ASSERT( j == bogus_value, NULL );
207
208 // test copy constructor
209 ASSERT( q.remove_successor( q2 ) == true, NULL );
210 NativeParallelFor( num_threads, parallel_puts<T>(q) );
211 tbb::flow::priority_queue_node<T> q_copy(q);
212 g.wait_for_all();
213 j = bogus_value;
214 ASSERT( q_copy.try_get( j ) == false, NULL );
215 ASSERT( q.register_successor( q_copy ) == true, NULL );
216 for (int i = num_threads*N -1; i>=0; --i) {
217 spin_try_get( q_copy, j );
218 ASSERT(j == i, NULL);
219 j = bogus_value;
220 }
221 g.wait_for_all();
222 ASSERT( q.try_get( j ) == false, NULL );
223 ASSERT( j == bogus_value, NULL );
224 ASSERT( q_copy.try_get( j ) == false, NULL );
225 ASSERT( j == bogus_value, NULL );
226
227 return 0;
228}
229
230//
231// Tests
232//
233// Predecessors cannot be registered
234// Empty Q rejects item requests
235// Single serial sender, items in FIFO order
236// Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
237//
238
239template< typename T >
240int test_serial() {
241 tbb::flow::graph g;
242 T bogus_value(-1);
243
244 tbb::flow::priority_queue_node<T> q(g);
245 tbb::flow::priority_queue_node<T> q2(g);
246 T j = bogus_value;
247
248 //
249 // Rejects attempts to add / remove predecessor
250 // Rejects request from empty Q
251 //
252 ASSERT( q.register_predecessor( q2 ) == false, NULL );
253 ASSERT( q.remove_predecessor( q2 ) == false, NULL );
254 ASSERT( q.try_get( j ) == false, NULL );
255 ASSERT( j == bogus_value, NULL );
256
257 //
258 // Simple puts and gets
259 //
260
261 for (int i = 0; i < N; ++i)
262 ASSERT( q.try_put( T(i) ), NULL );
263 for (int i = N-1; i >=0; --i) {
264 j = bogus_value;
265 spin_try_get( q, j );
266 ASSERT( i == j, NULL );
267 }
268 j = bogus_value;
269 g.wait_for_all();
270 ASSERT( q.try_get( j ) == false, NULL );
271 ASSERT( j == bogus_value, NULL );
272
273 tbb::flow::make_edge( q, q2 );
274
275 for (int i = 0; i < N; ++i)
276 ASSERT( q.try_put( T(i) ), NULL );
277 g.wait_for_all();
278 for (int i = N-1; i >= 0; --i) {
279 j = bogus_value;
280 spin_try_get( q2, j );
281 ASSERT( i == j, NULL );
282 }
283 j = bogus_value;
284 g.wait_for_all();
285 ASSERT( q.try_get( j ) == false, NULL );
286 g.wait_for_all();
287 ASSERT( q2.try_get( j ) == false, NULL );
288 ASSERT( j == bogus_value, NULL );
289
290 tbb::flow::remove_edge( q, q2 );
291 ASSERT( q.try_put( 1 ) == true, NULL );
292 g.wait_for_all();
293 ASSERT( q2.try_get( j ) == false, NULL );
294 ASSERT( j == bogus_value, NULL );
295 g.wait_for_all();
296 ASSERT( q.try_get( j ) == true, NULL );
297 ASSERT( j == 1, NULL );
298
299 tbb::flow::priority_queue_node<T> q3(g);
300 tbb::flow::make_edge( q, q2 );
301 tbb::flow::make_edge( q2, q3 );
302
303 for (int i = 0; i < N; ++i)
304 ASSERT( q.try_put( T(i) ), NULL );
305 g.wait_for_all();
306 for (int i = N-1; i >= 0; --i) {
307 j = bogus_value;
308 spin_try_get( q3, j );
309 ASSERT( i == j, NULL );
310 }
311 j = bogus_value;
312 g.wait_for_all();
313 ASSERT( q.try_get( j ) == false, NULL );
314 g.wait_for_all();
315 ASSERT( q2.try_get( j ) == false, NULL );
316 g.wait_for_all();
317 ASSERT( q3.try_get( j ) == false, NULL );
318 ASSERT( j == bogus_value, NULL );
319
320 tbb::flow::remove_edge( q, q2 );
321 ASSERT( q.try_put( 1 ) == true, NULL );
322 g.wait_for_all();
323 ASSERT( q2.try_get( j ) == false, NULL );
324 ASSERT( j == bogus_value, NULL );
325 g.wait_for_all();
326 ASSERT( q3.try_get( j ) == false, NULL );
327 ASSERT( j == bogus_value, NULL );
328 g.wait_for_all();
329 ASSERT( q.try_get( j ) == true, NULL );
330 ASSERT( j == 1, NULL );
331
332 return 0;
333}
334
335int TestMain() {
336 tbb::tick_count start = tbb::tick_count::now(), stop;
337 for (int p = 2; p <= 4; ++p) {
338 tbb::task_scheduler_init init(p);
339 test_serial<int>();
340 test_reservation<int>(p);
341 test_reservation<check_type<int> >(p);
342 test_parallel<int>(p);
343 }
344 stop = tbb::tick_count::now();
345 REMARK("Priority_Queue_Node Time=%6.6f\n", (stop-start).seconds());
346 REMARK("Testing resets\n");
347 test_resets<int,tbb::flow::priority_queue_node<int> >();
348 test_resets<float,tbb::flow::priority_queue_node<float> >();
349#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
350 test_buffer_extract<tbb::flow::priority_queue_node<int> >().run_tests();
351#endif
352 return Harness::Done;
353}
354