1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness.h"
18
19#if __TBB_CPF_BUILD
20#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
21#endif
22
23#include "tbb/flow_graph.h"
24#include "tbb/task_scheduler_init.h"
25#include "tbb/tick_count.h"
26#include "tbb/atomic.h"
27#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
28#include "harness_graph.h"
29#endif
30
31#include <cstdio>
32
33#define N 1000
34#define C 10
35
36template< typename T >
37struct seq_inspector {
38 size_t operator()(const T &v) const { return size_t(v); }
39};
40
41template< typename T >
42bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
43 g.wait_for_all();
44 return q.try_get(value);
45}
46
47template< typename T >
48void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
49 while ( q.try_get(value) != true ) ;
50}
51
52template< typename T >
53struct parallel_puts : NoAssign {
54
55 tbb::flow::sequencer_node<T> &my_q;
56 int my_num_threads;
57
58 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
59
60 void operator()(int tid) const {
61 for (int j = tid; j < N; j+=my_num_threads) {
62 bool msg = my_q.try_put( T(j) );
63 ASSERT( msg == true, NULL );
64 }
65 }
66
67};
68
69template< typename T >
70struct touches {
71
72 bool **my_touches;
73 T *my_last_touch;
74 int my_num_threads;
75
76 touches( int num_threads ) : my_num_threads(num_threads) {
77 my_last_touch = new T[my_num_threads];
78 my_touches = new bool* [my_num_threads];
79 for ( int p = 0; p < my_num_threads; ++p) {
80 my_last_touch[p] = T(-1);
81 my_touches[p] = new bool[N];
82 for ( int n = 0; n < N; ++n)
83 my_touches[p][n] = false;
84 }
85 }
86
87 ~touches() {
88 for ( int p = 0; p < my_num_threads; ++p) {
89 delete [] my_touches[p];
90 }
91 delete [] my_touches;
92 delete [] my_last_touch;
93 }
94
95 bool check( int tid, T v ) {
96 if ( my_touches[tid][v] != false ) {
97 printf("Error: value seen twice by local thread\n");
98 return false;
99 }
100 if ( v <= my_last_touch[tid] ) {
101 printf("Error: value seen in wrong order by local thread\n");
102 return false;
103 }
104 my_last_touch[tid] = v;
105 my_touches[tid][v] = true;
106 return true;
107 }
108
109 bool validate_touches() {
110 bool *all_touches = new bool[N];
111 for ( int n = 0; n < N; ++n)
112 all_touches[n] = false;
113
114 for ( int p = 0; p < my_num_threads; ++p) {
115 for ( int n = 0; n < N; ++n) {
116 if ( my_touches[p][n] == true ) {
117 ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
118 all_touches[n] = true;
119 }
120 }
121 }
122 for ( int n = 0; n < N; ++n) {
123 if ( !all_touches[n] )
124 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
125 //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
126 }
127 delete [] all_touches;
128 return true;
129 }
130
131};
132
133template< typename T >
134struct parallel_gets : NoAssign {
135
136 tbb::flow::sequencer_node<T> &my_q;
137 int my_num_threads;
138 touches<T> &my_touches;
139
140 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
141
142 void operator()(int tid) const {
143 for (int j = tid; j < N; j+=my_num_threads) {
144 T v;
145 spin_try_get( my_q, v );
146 my_touches.check( tid, v );
147 }
148 }
149
150};
151
152template< typename T >
153struct parallel_put_get : NoAssign {
154
155 tbb::flow::sequencer_node<T> &my_s1;
156 tbb::flow::sequencer_node<T> &my_s2;
157 int my_num_threads;
158 tbb::atomic< int > &my_counter;
159 touches<T> &my_touches;
160
161 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
162 tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
163
164 void operator()(int tid) const {
165 int i_start = 0;
166
167 while ( (i_start = my_counter.fetch_and_add(C)) < N ) {
168 int i_end = ( N < i_start + C ) ? N : i_start + C;
169 for (int i = i_start; i < i_end; ++i) {
170 bool msg = my_s1.try_put( T(i) );
171 ASSERT( msg == true, NULL );
172 }
173
174 for (int i = i_start; i < i_end; ++i) {
175 T v;
176 spin_try_get( my_s2, v );
177 my_touches.check( tid, v );
178 }
179 }
180 }
181
182};
183
184//
185// Tests
186//
187// multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
188// chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
189//
190
191template< typename T >
192int test_parallel(int num_threads) {
193 tbb::flow::graph g;
194
195 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
196 NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
197 {
198 touches<T> t( num_threads );
199 NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
200 g.wait_for_all();
201 ASSERT( t.validate_touches(), NULL );
202 }
203 T bogus_value(-1);
204 T j = bogus_value;
205 ASSERT( s.try_get( j ) == false, NULL );
206 ASSERT( j == bogus_value, NULL );
207 g.wait_for_all();
208
209 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
210 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
211 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
212 tbb::flow::make_edge( s1, s2 );
213 tbb::flow::make_edge( s2, s3 );
214
215 {
216 touches<T> t( num_threads );
217 tbb::atomic<int> counter;
218 counter = 0;
219 NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
220 g.wait_for_all();
221 t.validate_touches();
222 }
223 g.wait_for_all();
224 ASSERT( s1.try_get( j ) == false, NULL );
225 g.wait_for_all();
226 ASSERT( s2.try_get( j ) == false, NULL );
227 g.wait_for_all();
228 ASSERT( s3.try_get( j ) == false, NULL );
229 ASSERT( j == bogus_value, NULL );
230
231 // test copy constructor
232 tbb::flow::sequencer_node<T> s_copy(s);
233 NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
234 for (int i = 0; i < N; ++i) {
235 j = bogus_value;
236 spin_try_get( s_copy, j );
237 ASSERT( i == j, NULL );
238 }
239 j = bogus_value;
240 g.wait_for_all();
241 ASSERT( s_copy.try_get( j ) == false, NULL );
242 ASSERT( j == bogus_value, NULL );
243
244 return 0;
245}
246
247
248//
249// Tests
250//
251// No predecessors can be registered
252// Request from empty buffer fails
253// In-order puts, single sender, single receiver, properly sequenced at output
254// Reverse-order puts, single sender, single receiver, properly sequenced at output
255// Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
256//
257
258template< typename T >
259int test_serial() {
260 tbb::flow::graph g;
261 T bogus_value(-1);
262
263 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
264 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
265 T j = bogus_value;
266
267 //
268 // Rejects attempts to add / remove predecessor
269 // Rejects request from empty Q
270 //
271 ASSERT( s.register_predecessor( s2 ) == false, NULL );
272 ASSERT( s.remove_predecessor( s2 ) == false, NULL );
273 ASSERT( s.try_get( j ) == false, NULL );
274 ASSERT( j == bogus_value, NULL );
275
276 //
277 // In-order simple puts and gets
278 //
279
280 for (int i = 0; i < N; ++i) {
281 bool msg = s.try_put( T(i) );
282 ASSERT( msg == true, NULL );
283 ASSERT(!s.try_put( T(i) ), NULL); // second attempt to put should reject
284 }
285
286
287 for (int i = 0; i < N; ++i) {
288 j = bogus_value;
289 ASSERT(wait_try_get( g, s, j ) == true, NULL);
290 ASSERT( i == j, NULL );
291 ASSERT(!s.try_put( T(i) ),NULL ); // after retrieving value, subsequent put should fail
292 }
293 j = bogus_value;
294 g.wait_for_all();
295 ASSERT( s.try_get( j ) == false, NULL );
296 ASSERT( j == bogus_value, NULL );
297
298 //
299 // Reverse-order simple puts and gets
300 //
301
302 for (int i = N-1; i >= 0; --i) {
303 bool msg = s2.try_put( T(i) );
304 ASSERT( msg == true, NULL );
305 }
306
307 for (int i = 0; i < N; ++i) {
308 j = bogus_value;
309 ASSERT(wait_try_get( g, s2, j ) == true, NULL);
310 ASSERT( i == j, NULL );
311 }
312 j = bogus_value;
313 g.wait_for_all();
314 ASSERT( s2.try_get( j ) == false, NULL );
315 ASSERT( j == bogus_value, NULL );
316
317 //
318 // Chained in-order simple puts and gets
319 //
320
321 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
322 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
323 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
324 tbb::flow::make_edge( s3, s4 );
325 tbb::flow::make_edge( s4, s5 );
326
327 for (int i = 0; i < N; ++i) {
328 bool msg = s3.try_put( T(i) );
329 ASSERT( msg == true, NULL );
330 }
331
332 for (int i = 0; i < N; ++i) {
333 j = bogus_value;
334 ASSERT(wait_try_get( g, s5, j ) == true, NULL);
335 ASSERT( i == j, NULL );
336 }
337 j = bogus_value;
338 ASSERT( wait_try_get( g, s3, j ) == false, NULL );
339 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
340 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
341 ASSERT( j == bogus_value, NULL );
342
343 g.wait_for_all();
344 tbb::flow::remove_edge( s3, s4 );
345 ASSERT( s3.try_put( N ) == true, NULL );
346 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
347 ASSERT( j == bogus_value, NULL );
348 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
349 ASSERT( j == bogus_value, NULL );
350 ASSERT( wait_try_get( g, s3, j ) == true, NULL );
351 ASSERT( j == N, NULL );
352
353 //
354 // Chained reverse-order simple puts and gets
355 //
356
357 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
358 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
359 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
360 tbb::flow::make_edge( s6, s7 );
361 tbb::flow::make_edge( s7, s8 );
362
363 for (int i = N-1; i >= 0; --i) {
364 bool msg = s6.try_put( T(i) );
365 ASSERT( msg == true, NULL );
366 }
367
368 for (int i = 0; i < N; ++i) {
369 j = bogus_value;
370 ASSERT( wait_try_get( g, s8, j ) == true, NULL );
371 ASSERT( i == j, NULL );
372 }
373 j = bogus_value;
374 ASSERT( wait_try_get( g, s6, j ) == false, NULL );
375 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
376 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
377 ASSERT( j == bogus_value, NULL );
378
379 g.wait_for_all();
380 tbb::flow::remove_edge( s6, s7 );
381 ASSERT( s6.try_put( N ) == true, NULL );
382 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
383 ASSERT( j == bogus_value, NULL );
384 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
385 ASSERT( j == bogus_value, NULL );
386 ASSERT( wait_try_get( g, s6, j ) == true, NULL );
387 ASSERT( j == N, NULL );
388
389 return 0;
390}
391
392int TestMain() {
393 tbb::tick_count start = tbb::tick_count::now(), stop;
394 for (int p = 2; p <= 4; ++p) {
395 tbb::task_scheduler_init init(p);
396 test_serial<int>();
397 test_parallel<int>(p);
398 }
399#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
400 test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests();
401#endif
402 stop = tbb::tick_count::now();
403 REMARK("Sequencer_Node Time=%6.6f\n", (stop-start).seconds());
404 return Harness::Done;
405}
406