1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "harness.h" |
18 | |
19 | #if __TBB_CPF_BUILD |
20 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
21 | #endif |
22 | |
23 | #include "tbb/flow_graph.h" |
24 | #include "tbb/task_scheduler_init.h" |
25 | #include "tbb/tick_count.h" |
26 | #include "tbb/atomic.h" |
27 | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES |
28 | #include "harness_graph.h" |
29 | #endif |
30 | |
31 | #include <cstdio> |
32 | |
33 | #define N 1000 |
34 | #define C 10 |
35 | |
36 | template< typename T > |
37 | struct seq_inspector { |
38 | size_t operator()(const T &v) const { return size_t(v); } |
39 | }; |
40 | |
41 | template< typename T > |
42 | bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) { |
43 | g.wait_for_all(); |
44 | return q.try_get(value); |
45 | } |
46 | |
47 | template< typename T > |
48 | void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { |
49 | while ( q.try_get(value) != true ) ; |
50 | } |
51 | |
52 | template< typename T > |
53 | struct parallel_puts : NoAssign { |
54 | |
55 | tbb::flow::sequencer_node<T> &my_q; |
56 | int my_num_threads; |
57 | |
58 | parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {} |
59 | |
60 | void operator()(int tid) const { |
61 | for (int j = tid; j < N; j+=my_num_threads) { |
62 | bool msg = my_q.try_put( T(j) ); |
63 | ASSERT( msg == true, NULL ); |
64 | } |
65 | } |
66 | |
67 | }; |
68 | |
69 | template< typename T > |
70 | struct touches { |
71 | |
72 | bool **my_touches; |
73 | T *my_last_touch; |
74 | int my_num_threads; |
75 | |
76 | touches( int num_threads ) : my_num_threads(num_threads) { |
77 | my_last_touch = new T[my_num_threads]; |
78 | my_touches = new bool* [my_num_threads]; |
79 | for ( int p = 0; p < my_num_threads; ++p) { |
80 | my_last_touch[p] = T(-1); |
81 | my_touches[p] = new bool[N]; |
82 | for ( int n = 0; n < N; ++n) |
83 | my_touches[p][n] = false; |
84 | } |
85 | } |
86 | |
87 | ~touches() { |
88 | for ( int p = 0; p < my_num_threads; ++p) { |
89 | delete [] my_touches[p]; |
90 | } |
91 | delete [] my_touches; |
92 | delete [] my_last_touch; |
93 | } |
94 | |
95 | bool check( int tid, T v ) { |
96 | if ( my_touches[tid][v] != false ) { |
97 | printf("Error: value seen twice by local thread\n" ); |
98 | return false; |
99 | } |
100 | if ( v <= my_last_touch[tid] ) { |
101 | printf("Error: value seen in wrong order by local thread\n" ); |
102 | return false; |
103 | } |
104 | my_last_touch[tid] = v; |
105 | my_touches[tid][v] = true; |
106 | return true; |
107 | } |
108 | |
109 | bool validate_touches() { |
110 | bool *all_touches = new bool[N]; |
111 | for ( int n = 0; n < N; ++n) |
112 | all_touches[n] = false; |
113 | |
114 | for ( int p = 0; p < my_num_threads; ++p) { |
115 | for ( int n = 0; n < N; ++n) { |
116 | if ( my_touches[p][n] == true ) { |
117 | ASSERT( all_touches[n] == false, "value see by more than one thread\n" ); |
118 | all_touches[n] = true; |
119 | } |
120 | } |
121 | } |
122 | for ( int n = 0; n < N; ++n) { |
123 | if ( !all_touches[n] ) |
124 | printf("No touch at %d, my_num_threads = %d\n" , n, my_num_threads); |
125 | //ASSERT( all_touches[n] == true, "value not seen by any thread\n" ); |
126 | } |
127 | delete [] all_touches; |
128 | return true; |
129 | } |
130 | |
131 | }; |
132 | |
133 | template< typename T > |
134 | struct parallel_gets : NoAssign { |
135 | |
136 | tbb::flow::sequencer_node<T> &my_q; |
137 | int my_num_threads; |
138 | touches<T> &my_touches; |
139 | |
140 | parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {} |
141 | |
142 | void operator()(int tid) const { |
143 | for (int j = tid; j < N; j+=my_num_threads) { |
144 | T v; |
145 | spin_try_get( my_q, v ); |
146 | my_touches.check( tid, v ); |
147 | } |
148 | } |
149 | |
150 | }; |
151 | |
152 | template< typename T > |
153 | struct parallel_put_get : NoAssign { |
154 | |
155 | tbb::flow::sequencer_node<T> &my_s1; |
156 | tbb::flow::sequencer_node<T> &my_s2; |
157 | int my_num_threads; |
158 | tbb::atomic< int > &my_counter; |
159 | touches<T> &my_touches; |
160 | |
161 | parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads, |
162 | tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {} |
163 | |
164 | void operator()(int tid) const { |
165 | int i_start = 0; |
166 | |
167 | while ( (i_start = my_counter.fetch_and_add(C)) < N ) { |
168 | int i_end = ( N < i_start + C ) ? N : i_start + C; |
169 | for (int i = i_start; i < i_end; ++i) { |
170 | bool msg = my_s1.try_put( T(i) ); |
171 | ASSERT( msg == true, NULL ); |
172 | } |
173 | |
174 | for (int i = i_start; i < i_end; ++i) { |
175 | T v; |
176 | spin_try_get( my_s2, v ); |
177 | my_touches.check( tid, v ); |
178 | } |
179 | } |
180 | } |
181 | |
182 | }; |
183 | |
184 | // |
185 | // Tests |
186 | // |
187 | // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output |
188 | // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output |
189 | // |
190 | |
191 | template< typename T > |
192 | int test_parallel(int num_threads) { |
193 | tbb::flow::graph g; |
194 | |
195 | tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); |
196 | NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) ); |
197 | { |
198 | touches<T> t( num_threads ); |
199 | NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) ); |
200 | g.wait_for_all(); |
201 | ASSERT( t.validate_touches(), NULL ); |
202 | } |
203 | T bogus_value(-1); |
204 | T j = bogus_value; |
205 | ASSERT( s.try_get( j ) == false, NULL ); |
206 | ASSERT( j == bogus_value, NULL ); |
207 | g.wait_for_all(); |
208 | |
209 | tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>()); |
210 | tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); |
211 | tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); |
212 | tbb::flow::make_edge( s1, s2 ); |
213 | tbb::flow::make_edge( s2, s3 ); |
214 | |
215 | { |
216 | touches<T> t( num_threads ); |
217 | tbb::atomic<int> counter; |
218 | counter = 0; |
219 | NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) ); |
220 | g.wait_for_all(); |
221 | t.validate_touches(); |
222 | } |
223 | g.wait_for_all(); |
224 | ASSERT( s1.try_get( j ) == false, NULL ); |
225 | g.wait_for_all(); |
226 | ASSERT( s2.try_get( j ) == false, NULL ); |
227 | g.wait_for_all(); |
228 | ASSERT( s3.try_get( j ) == false, NULL ); |
229 | ASSERT( j == bogus_value, NULL ); |
230 | |
231 | // test copy constructor |
232 | tbb::flow::sequencer_node<T> s_copy(s); |
233 | NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) ); |
234 | for (int i = 0; i < N; ++i) { |
235 | j = bogus_value; |
236 | spin_try_get( s_copy, j ); |
237 | ASSERT( i == j, NULL ); |
238 | } |
239 | j = bogus_value; |
240 | g.wait_for_all(); |
241 | ASSERT( s_copy.try_get( j ) == false, NULL ); |
242 | ASSERT( j == bogus_value, NULL ); |
243 | |
244 | return 0; |
245 | } |
246 | |
247 | |
248 | // |
249 | // Tests |
250 | // |
251 | // No predecessors can be registered |
252 | // Request from empty buffer fails |
253 | // In-order puts, single sender, single receiver, properly sequenced at output |
254 | // Reverse-order puts, single sender, single receiver, properly sequenced at output |
255 | // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output |
256 | // |
257 | |
258 | template< typename T > |
259 | int test_serial() { |
260 | tbb::flow::graph g; |
261 | T bogus_value(-1); |
262 | |
263 | tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); |
264 | tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); |
265 | T j = bogus_value; |
266 | |
267 | // |
268 | // Rejects attempts to add / remove predecessor |
269 | // Rejects request from empty Q |
270 | // |
271 | ASSERT( s.register_predecessor( s2 ) == false, NULL ); |
272 | ASSERT( s.remove_predecessor( s2 ) == false, NULL ); |
273 | ASSERT( s.try_get( j ) == false, NULL ); |
274 | ASSERT( j == bogus_value, NULL ); |
275 | |
276 | // |
277 | // In-order simple puts and gets |
278 | // |
279 | |
280 | for (int i = 0; i < N; ++i) { |
281 | bool msg = s.try_put( T(i) ); |
282 | ASSERT( msg == true, NULL ); |
283 | ASSERT(!s.try_put( T(i) ), NULL); // second attempt to put should reject |
284 | } |
285 | |
286 | |
287 | for (int i = 0; i < N; ++i) { |
288 | j = bogus_value; |
289 | ASSERT(wait_try_get( g, s, j ) == true, NULL); |
290 | ASSERT( i == j, NULL ); |
291 | ASSERT(!s.try_put( T(i) ),NULL ); // after retrieving value, subsequent put should fail |
292 | } |
293 | j = bogus_value; |
294 | g.wait_for_all(); |
295 | ASSERT( s.try_get( j ) == false, NULL ); |
296 | ASSERT( j == bogus_value, NULL ); |
297 | |
298 | // |
299 | // Reverse-order simple puts and gets |
300 | // |
301 | |
302 | for (int i = N-1; i >= 0; --i) { |
303 | bool msg = s2.try_put( T(i) ); |
304 | ASSERT( msg == true, NULL ); |
305 | } |
306 | |
307 | for (int i = 0; i < N; ++i) { |
308 | j = bogus_value; |
309 | ASSERT(wait_try_get( g, s2, j ) == true, NULL); |
310 | ASSERT( i == j, NULL ); |
311 | } |
312 | j = bogus_value; |
313 | g.wait_for_all(); |
314 | ASSERT( s2.try_get( j ) == false, NULL ); |
315 | ASSERT( j == bogus_value, NULL ); |
316 | |
317 | // |
318 | // Chained in-order simple puts and gets |
319 | // |
320 | |
321 | tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); |
322 | tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>()); |
323 | tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>()); |
324 | tbb::flow::make_edge( s3, s4 ); |
325 | tbb::flow::make_edge( s4, s5 ); |
326 | |
327 | for (int i = 0; i < N; ++i) { |
328 | bool msg = s3.try_put( T(i) ); |
329 | ASSERT( msg == true, NULL ); |
330 | } |
331 | |
332 | for (int i = 0; i < N; ++i) { |
333 | j = bogus_value; |
334 | ASSERT(wait_try_get( g, s5, j ) == true, NULL); |
335 | ASSERT( i == j, NULL ); |
336 | } |
337 | j = bogus_value; |
338 | ASSERT( wait_try_get( g, s3, j ) == false, NULL ); |
339 | ASSERT( wait_try_get( g, s4, j ) == false, NULL ); |
340 | ASSERT( wait_try_get( g, s5, j ) == false, NULL ); |
341 | ASSERT( j == bogus_value, NULL ); |
342 | |
343 | g.wait_for_all(); |
344 | tbb::flow::remove_edge( s3, s4 ); |
345 | ASSERT( s3.try_put( N ) == true, NULL ); |
346 | ASSERT( wait_try_get( g, s4, j ) == false, NULL ); |
347 | ASSERT( j == bogus_value, NULL ); |
348 | ASSERT( wait_try_get( g, s5, j ) == false, NULL ); |
349 | ASSERT( j == bogus_value, NULL ); |
350 | ASSERT( wait_try_get( g, s3, j ) == true, NULL ); |
351 | ASSERT( j == N, NULL ); |
352 | |
353 | // |
354 | // Chained reverse-order simple puts and gets |
355 | // |
356 | |
357 | tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>()); |
358 | tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>()); |
359 | tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>()); |
360 | tbb::flow::make_edge( s6, s7 ); |
361 | tbb::flow::make_edge( s7, s8 ); |
362 | |
363 | for (int i = N-1; i >= 0; --i) { |
364 | bool msg = s6.try_put( T(i) ); |
365 | ASSERT( msg == true, NULL ); |
366 | } |
367 | |
368 | for (int i = 0; i < N; ++i) { |
369 | j = bogus_value; |
370 | ASSERT( wait_try_get( g, s8, j ) == true, NULL ); |
371 | ASSERT( i == j, NULL ); |
372 | } |
373 | j = bogus_value; |
374 | ASSERT( wait_try_get( g, s6, j ) == false, NULL ); |
375 | ASSERT( wait_try_get( g, s7, j ) == false, NULL ); |
376 | ASSERT( wait_try_get( g, s8, j ) == false, NULL ); |
377 | ASSERT( j == bogus_value, NULL ); |
378 | |
379 | g.wait_for_all(); |
380 | tbb::flow::remove_edge( s6, s7 ); |
381 | ASSERT( s6.try_put( N ) == true, NULL ); |
382 | ASSERT( wait_try_get( g, s7, j ) == false, NULL ); |
383 | ASSERT( j == bogus_value, NULL ); |
384 | ASSERT( wait_try_get( g, s8, j ) == false, NULL ); |
385 | ASSERT( j == bogus_value, NULL ); |
386 | ASSERT( wait_try_get( g, s6, j ) == true, NULL ); |
387 | ASSERT( j == N, NULL ); |
388 | |
389 | return 0; |
390 | } |
391 | |
392 | int TestMain() { |
393 | tbb::tick_count start = tbb::tick_count::now(), stop; |
394 | for (int p = 2; p <= 4; ++p) { |
395 | tbb::task_scheduler_init init(p); |
396 | test_serial<int>(); |
397 | test_parallel<int>(p); |
398 | } |
399 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
400 | test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests(); |
401 | #endif |
402 | stop = tbb::tick_count::now(); |
403 | REMARK("Sequencer_Node Time=%6.6f\n" , (stop-start).seconds()); |
404 | return Harness::Done; |
405 | } |
406 | |