1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | // TO DO: Add overlapping put / receive tests |
18 | |
19 | #include "harness.h" |
20 | |
21 | #if __TBB_CPF_BUILD |
22 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
23 | #endif |
24 | |
25 | #include "tbb/flow_graph.h" |
26 | #include "harness_checktype.h" |
27 | #include "tbb/task_scheduler_init.h" |
28 | #include "tbb/tick_count.h" |
29 | #include "harness_graph.h" |
30 | |
31 | #include <cstdio> |
32 | |
33 | #define N 10 |
34 | #define C 10 |
35 | |
36 | template< typename T > |
37 | void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) { |
38 | while ( q.try_get(value) != true ) ; |
39 | } |
40 | |
41 | template< typename T > |
42 | void check_item( T* next_value, T &value ) { |
43 | int tid = value / N; |
44 | int offset = value % N; |
45 | ASSERT( next_value[tid] == T(offset), NULL ); |
46 | ++next_value[tid]; |
47 | } |
48 | |
49 | template< typename T > |
50 | struct parallel_puts : NoAssign { |
51 | tbb::flow::priority_queue_node<T> &my_q; |
52 | parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} |
53 | void operator()(int i) const { |
54 | for (int j = 0; j < N; ++j) { |
55 | bool msg = my_q.try_put( T(N*i + j) ); |
56 | ASSERT( msg == true, NULL ); |
57 | } |
58 | } |
59 | }; |
60 | |
61 | template< typename T > |
62 | struct parallel_gets : NoAssign { |
63 | tbb::flow::priority_queue_node<T> &my_q; |
64 | parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {} |
65 | void operator()(int) const { |
66 | T prev; |
67 | spin_try_get( my_q, prev ); |
68 | for (int j = 0; j < N-1; ++j) { |
69 | T v; |
70 | spin_try_get( my_q, v ); |
71 | ASSERT(v < prev, NULL); |
72 | } |
73 | } |
74 | }; |
75 | |
76 | template< typename T > |
77 | struct parallel_put_get : NoAssign { |
78 | tbb::flow::priority_queue_node<T> &my_q; |
79 | parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {} |
80 | void operator()(int tid) const { |
81 | for ( int i = 0; i < N; i+=C ) { |
82 | int j_end = ( N < i + C ) ? N : i + C; |
83 | // dump about C values into the Q |
84 | for ( int j = i; j < j_end; ++j ) { |
85 | ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL ); |
86 | } |
87 | // receive about C values from the Q |
88 | for ( int j = i; j < j_end; ++j ) { |
89 | T v; |
90 | spin_try_get( my_q, v ); |
91 | } |
92 | } |
93 | } |
94 | }; |
95 | |
96 | // |
97 | // Tests |
98 | // |
99 | // Item can be reserved, released, consumed ( single serial receiver ) |
100 | // |
101 | template< typename T > |
102 | int test_reservation(int) { |
103 | tbb::flow::graph g; |
104 | |
105 | // Simple tests |
106 | tbb::flow::priority_queue_node<T> q(g); |
107 | |
108 | { |
109 | |
110 | T bogus_value(-1); |
111 | |
112 | q.try_put(T(1)); |
113 | q.try_put(T(2)); |
114 | q.try_put(T(3)); |
115 | g.wait_for_all(); |
116 | |
117 | T v=bogus_value, w=bogus_value; |
118 | ASSERT( q.try_reserve(v) == true, NULL ); |
119 | ASSERT( v == T(3), NULL ); |
120 | ASSERT( q.try_release() == true, NULL ); |
121 | v = bogus_value; |
122 | g.wait_for_all(); |
123 | ASSERT( q.try_reserve(v) == true, NULL ); |
124 | ASSERT( v == T(3), NULL ); |
125 | ASSERT( q.try_consume() == true, NULL ); |
126 | v = bogus_value; |
127 | g.wait_for_all(); |
128 | |
129 | ASSERT( q.try_get(v) == true, NULL ); |
130 | ASSERT( v == T(2), NULL ); |
131 | v = bogus_value; |
132 | g.wait_for_all(); |
133 | |
134 | ASSERT( q.try_reserve(v) == true, NULL ); |
135 | ASSERT( v == T(1), NULL ); |
136 | ASSERT( q.try_reserve(w) == false, NULL ); |
137 | ASSERT( w == bogus_value, NULL ); |
138 | ASSERT( q.try_get(w) == false, NULL ); |
139 | ASSERT( w == bogus_value, NULL ); |
140 | ASSERT( q.try_release() == true, NULL ); |
141 | v = bogus_value; |
142 | g.wait_for_all(); |
143 | ASSERT( q.try_reserve(v) == true, NULL ); |
144 | ASSERT( v == T(1), NULL ); |
145 | ASSERT( q.try_consume() == true, NULL ); |
146 | v = bogus_value; |
147 | g.wait_for_all(); |
148 | ASSERT( q.try_get(v) == false, NULL ); |
149 | } |
150 | return 0; |
151 | } |
152 | |
153 | // |
154 | // Tests |
155 | // |
156 | // multiple parallel senders, items in FIFO (relatively to sender) order |
157 | // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received |
158 | // * overlapped puts / gets |
159 | // * all puts finished before any getS |
160 | // |
161 | template< typename T > |
162 | int test_parallel(int num_threads) { |
163 | tbb::flow::graph g; |
164 | tbb::flow::priority_queue_node<T> q(g); |
165 | tbb::flow::priority_queue_node<T> q2(g); |
166 | tbb::flow::priority_queue_node<T> q3(g); |
167 | T bogus_value(-1); |
168 | T j = bogus_value; |
169 | |
170 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
171 | for (int i = num_threads*N -1; i>=0; --i) { |
172 | spin_try_get( q, j ); |
173 | ASSERT(j == i, NULL); |
174 | j = bogus_value; |
175 | } |
176 | g.wait_for_all(); |
177 | ASSERT( q.try_get( j ) == false, NULL ); |
178 | ASSERT( j == bogus_value, NULL ); |
179 | |
180 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
181 | g.wait_for_all(); |
182 | NativeParallelFor( num_threads, parallel_gets<T>(q) ); |
183 | g.wait_for_all(); |
184 | j = bogus_value; |
185 | ASSERT( q.try_get( j ) == false, NULL ); |
186 | ASSERT( j == bogus_value, NULL ); |
187 | |
188 | NativeParallelFor( num_threads, parallel_put_get<T>(q) ); |
189 | g.wait_for_all(); |
190 | j = bogus_value; |
191 | ASSERT( q.try_get( j ) == false, NULL ); |
192 | ASSERT( j == bogus_value, NULL ); |
193 | |
194 | tbb::flow::make_edge( q, q2 ); |
195 | tbb::flow::make_edge( q2, q3 ); |
196 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
197 | g.wait_for_all(); |
198 | NativeParallelFor( num_threads, parallel_gets<T>(q3) ); |
199 | g.wait_for_all(); |
200 | j = bogus_value; |
201 | ASSERT( q.try_get( j ) == false, NULL ); |
202 | ASSERT( j == bogus_value, NULL ); |
203 | ASSERT( q2.try_get( j ) == false, NULL ); |
204 | ASSERT( j == bogus_value, NULL ); |
205 | ASSERT( q3.try_get( j ) == false, NULL ); |
206 | ASSERT( j == bogus_value, NULL ); |
207 | |
208 | // test copy constructor |
209 | ASSERT( q.remove_successor( q2 ) == true, NULL ); |
210 | NativeParallelFor( num_threads, parallel_puts<T>(q) ); |
211 | tbb::flow::priority_queue_node<T> q_copy(q); |
212 | g.wait_for_all(); |
213 | j = bogus_value; |
214 | ASSERT( q_copy.try_get( j ) == false, NULL ); |
215 | ASSERT( q.register_successor( q_copy ) == true, NULL ); |
216 | for (int i = num_threads*N -1; i>=0; --i) { |
217 | spin_try_get( q_copy, j ); |
218 | ASSERT(j == i, NULL); |
219 | j = bogus_value; |
220 | } |
221 | g.wait_for_all(); |
222 | ASSERT( q.try_get( j ) == false, NULL ); |
223 | ASSERT( j == bogus_value, NULL ); |
224 | ASSERT( q_copy.try_get( j ) == false, NULL ); |
225 | ASSERT( j == bogus_value, NULL ); |
226 | |
227 | return 0; |
228 | } |
229 | |
230 | // |
231 | // Tests |
232 | // |
233 | // Predecessors cannot be registered |
234 | // Empty Q rejects item requests |
235 | // Single serial sender, items in FIFO order |
236 | // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order |
237 | // |
238 | |
239 | template< typename T > |
240 | int test_serial() { |
241 | tbb::flow::graph g; |
242 | T bogus_value(-1); |
243 | |
244 | tbb::flow::priority_queue_node<T> q(g); |
245 | tbb::flow::priority_queue_node<T> q2(g); |
246 | T j = bogus_value; |
247 | |
248 | // |
249 | // Rejects attempts to add / remove predecessor |
250 | // Rejects request from empty Q |
251 | // |
252 | ASSERT( q.register_predecessor( q2 ) == false, NULL ); |
253 | ASSERT( q.remove_predecessor( q2 ) == false, NULL ); |
254 | ASSERT( q.try_get( j ) == false, NULL ); |
255 | ASSERT( j == bogus_value, NULL ); |
256 | |
257 | // |
258 | // Simple puts and gets |
259 | // |
260 | |
261 | for (int i = 0; i < N; ++i) |
262 | ASSERT( q.try_put( T(i) ), NULL ); |
263 | for (int i = N-1; i >=0; --i) { |
264 | j = bogus_value; |
265 | spin_try_get( q, j ); |
266 | ASSERT( i == j, NULL ); |
267 | } |
268 | j = bogus_value; |
269 | g.wait_for_all(); |
270 | ASSERT( q.try_get( j ) == false, NULL ); |
271 | ASSERT( j == bogus_value, NULL ); |
272 | |
273 | tbb::flow::make_edge( q, q2 ); |
274 | |
275 | for (int i = 0; i < N; ++i) |
276 | ASSERT( q.try_put( T(i) ), NULL ); |
277 | g.wait_for_all(); |
278 | for (int i = N-1; i >= 0; --i) { |
279 | j = bogus_value; |
280 | spin_try_get( q2, j ); |
281 | ASSERT( i == j, NULL ); |
282 | } |
283 | j = bogus_value; |
284 | g.wait_for_all(); |
285 | ASSERT( q.try_get( j ) == false, NULL ); |
286 | g.wait_for_all(); |
287 | ASSERT( q2.try_get( j ) == false, NULL ); |
288 | ASSERT( j == bogus_value, NULL ); |
289 | |
290 | tbb::flow::remove_edge( q, q2 ); |
291 | ASSERT( q.try_put( 1 ) == true, NULL ); |
292 | g.wait_for_all(); |
293 | ASSERT( q2.try_get( j ) == false, NULL ); |
294 | ASSERT( j == bogus_value, NULL ); |
295 | g.wait_for_all(); |
296 | ASSERT( q.try_get( j ) == true, NULL ); |
297 | ASSERT( j == 1, NULL ); |
298 | |
299 | tbb::flow::priority_queue_node<T> q3(g); |
300 | tbb::flow::make_edge( q, q2 ); |
301 | tbb::flow::make_edge( q2, q3 ); |
302 | |
303 | for (int i = 0; i < N; ++i) |
304 | ASSERT( q.try_put( T(i) ), NULL ); |
305 | g.wait_for_all(); |
306 | for (int i = N-1; i >= 0; --i) { |
307 | j = bogus_value; |
308 | spin_try_get( q3, j ); |
309 | ASSERT( i == j, NULL ); |
310 | } |
311 | j = bogus_value; |
312 | g.wait_for_all(); |
313 | ASSERT( q.try_get( j ) == false, NULL ); |
314 | g.wait_for_all(); |
315 | ASSERT( q2.try_get( j ) == false, NULL ); |
316 | g.wait_for_all(); |
317 | ASSERT( q3.try_get( j ) == false, NULL ); |
318 | ASSERT( j == bogus_value, NULL ); |
319 | |
320 | tbb::flow::remove_edge( q, q2 ); |
321 | ASSERT( q.try_put( 1 ) == true, NULL ); |
322 | g.wait_for_all(); |
323 | ASSERT( q2.try_get( j ) == false, NULL ); |
324 | ASSERT( j == bogus_value, NULL ); |
325 | g.wait_for_all(); |
326 | ASSERT( q3.try_get( j ) == false, NULL ); |
327 | ASSERT( j == bogus_value, NULL ); |
328 | g.wait_for_all(); |
329 | ASSERT( q.try_get( j ) == true, NULL ); |
330 | ASSERT( j == 1, NULL ); |
331 | |
332 | return 0; |
333 | } |
334 | |
335 | int TestMain() { |
336 | tbb::tick_count start = tbb::tick_count::now(), stop; |
337 | for (int p = 2; p <= 4; ++p) { |
338 | tbb::task_scheduler_init init(p); |
339 | test_serial<int>(); |
340 | test_reservation<int>(p); |
341 | test_reservation<check_type<int> >(p); |
342 | test_parallel<int>(p); |
343 | } |
344 | stop = tbb::tick_count::now(); |
345 | REMARK("Priority_Queue_Node Time=%6.6f\n" , (stop-start).seconds()); |
346 | REMARK("Testing resets\n" ); |
347 | test_resets<int,tbb::flow::priority_queue_node<int> >(); |
348 | test_resets<float,tbb::flow::priority_queue_node<float> >(); |
349 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
350 | test_buffer_extract<tbb::flow::priority_queue_node<int> >().run_tests(); |
351 | #endif |
352 | return Harness::Done; |
353 | } |
354 | |