1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness.h"
18
19#if __TBB_CPF_BUILD
20#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
21#endif
22
23#include "harness_graph.h"
24
25#include "tbb/task_scheduler_init.h"
26#include "tbb/tick_count.h"
27
28#define N 1000
29#define C 10
30
31template< typename T >
32void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
33 while ( b.try_get(value) != true ) {}
34}
35
36template< typename T >
37void check_item( T* count_value, T &value ) {
38 count_value[value / N] += value % N;
39}
40
41template< typename T >
42struct parallel_puts : NoAssign {
43
44 tbb::flow::buffer_node<T> &my_b;
45
46 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
47
48 void operator()(int i) const {
49 for (int j = 0; j < N; ++j) {
50 bool msg = my_b.try_put( T(N*i + j) );
51 ASSERT( msg == true, NULL );
52 }
53 }
54};
55
56template< typename T >
57struct touches {
58
59 bool **my_touches;
60 int my_num_threads;
61
62 touches( int num_threads ) : my_num_threads(num_threads) {
63 my_touches = new bool* [my_num_threads];
64 for ( int p = 0; p < my_num_threads; ++p) {
65 my_touches[p] = new bool[N];
66 for ( int n = 0; n < N; ++n)
67 my_touches[p][n] = false;
68 }
69 }
70
71 ~touches() {
72 for ( int p = 0; p < my_num_threads; ++p) {
73 delete [] my_touches[p];
74 }
75 delete [] my_touches;
76 }
77
78 bool check( T v ) {
79 ASSERT ( my_touches[v/N][v%N] == false, NULL);
80 my_touches[v/N][v%N] = true;
81 return true;
82 }
83
84 bool validate_touches() {
85 for ( int p = 0; p < my_num_threads; ++p) {
86 for ( int n = 0; n < N; ++n) {
87 ASSERT ( my_touches[p][n] == true, NULL);
88 }
89 }
90 return true;
91 }
92};
93
94template< typename T >
95struct parallel_gets : NoAssign {
96
97 tbb::flow::buffer_node<T> &my_b;
98 touches<T> &my_touches;
99
100 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
101
102 void operator()(int) const {
103 for (int j = 0; j < N; ++j) {
104 T v;
105 spin_try_get( my_b, v );
106 my_touches.check( v );
107 }
108 }
109
110};
111
112template< typename T >
113struct parallel_put_get : NoAssign {
114
115 tbb::flow::buffer_node<T> &my_b;
116 touches<T> &my_touches;
117
118 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
119
120 void operator()(int tid) const {
121
122 for ( int i = 0; i < N; i+=C ) {
123 int j_end = ( N < i + C ) ? N : i + C;
124 // dump about C values into the buffer
125 for ( int j = i; j < j_end; ++j ) {
126 ASSERT( my_b.try_put( T (N*tid + j ) ) == true, NULL );
127 }
128 // receiver about C values from the buffer
129 for ( int j = i; j < j_end; ++j ) {
130 T v;
131 spin_try_get( my_b, v );
132 my_touches.check( v );
133 }
134 }
135 }
136
137};
138
139//
140// Tests
141//
142// Item can be reserved, released, consumed ( single serial receiver )
143//
144template< typename T >
145int test_reservation() {
146 tbb::flow::graph g;
147 T bogus_value(-1);
148
149 // Simple tests
150 tbb::flow::buffer_node<T> b(g);
151
152 b.try_put(T(1));
153 b.try_put(T(2));
154 b.try_put(T(3));
155
156 T v, vsum;
157 ASSERT( b.try_reserve(v) == true, NULL );
158 ASSERT( b.try_release() == true, NULL );
159 v = bogus_value;
160 g.wait_for_all();
161 ASSERT( b.try_reserve(v) == true, NULL );
162 ASSERT( b.try_consume() == true, NULL );
163 vsum += v;
164 v = bogus_value;
165 g.wait_for_all();
166
167 ASSERT( b.try_get(v) == true, NULL );
168 vsum += v;
169 v = bogus_value;
170 g.wait_for_all();
171
172 ASSERT( b.try_reserve(v) == true, NULL );
173 ASSERT( b.try_release() == true, NULL );
174 v = bogus_value;
175 g.wait_for_all();
176 ASSERT( b.try_reserve(v) == true, NULL );
177 ASSERT( b.try_consume() == true, NULL );
178 vsum += v;
179 ASSERT( vsum == T(6), NULL);
180 v = bogus_value;
181 g.wait_for_all();
182
183 return 0;
184}
185
186//
187// Tests
188//
189// multiple parallel senders, items in arbitrary order
190// multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
191// * overlapped puts / gets
192// * all puts finished before any getS
193//
194template< typename T >
195int test_parallel(int num_threads) {
196 tbb::flow::graph g;
197 tbb::flow::buffer_node<T> b(g);
198 tbb::flow::buffer_node<T> b2(g);
199 tbb::flow::buffer_node<T> b3(g);
200 T bogus_value(-1);
201 T j = bogus_value;
202
203 NativeParallelFor( num_threads, parallel_puts<T>(b) );
204
205 T *next_value = new T[num_threads];
206 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
207
208 for (int i = 0; i < num_threads * N; ++i ) {
209 spin_try_get( b, j );
210 check_item( next_value, j );
211 j = bogus_value;
212 }
213 for (int tid = 0; tid < num_threads; ++tid) {
214 ASSERT( next_value[tid] == T((N*(N-1))/2), NULL );
215 }
216
217 j = bogus_value;
218 g.wait_for_all();
219 ASSERT( b.try_get( j ) == false, NULL );
220 ASSERT( j == bogus_value, NULL );
221
222 NativeParallelFor( num_threads, parallel_puts<T>(b) );
223
224 {
225 touches< T > t( num_threads );
226 NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
227 g.wait_for_all();
228 ASSERT( t.validate_touches(), NULL );
229 }
230 j = bogus_value;
231 ASSERT( b.try_get( j ) == false, NULL );
232 ASSERT( j == bogus_value, NULL );
233
234 g.wait_for_all();
235 {
236 touches< T > t( num_threads );
237 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
238 g.wait_for_all();
239 ASSERT( t.validate_touches(), NULL );
240 }
241 j = bogus_value;
242 ASSERT( b.try_get( j ) == false, NULL );
243 ASSERT( j == bogus_value, NULL );
244
245 tbb::flow::make_edge( b, b2 );
246 tbb::flow::make_edge( b2, b3 );
247
248 NativeParallelFor( num_threads, parallel_puts<T>(b) );
249 {
250 touches< T > t( num_threads );
251 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
252 g.wait_for_all();
253 ASSERT( t.validate_touches(), NULL );
254 }
255 j = bogus_value;
256 g.wait_for_all();
257 ASSERT( b.try_get( j ) == false, NULL );
258 g.wait_for_all();
259 ASSERT( b2.try_get( j ) == false, NULL );
260 g.wait_for_all();
261 ASSERT( b3.try_get( j ) == false, NULL );
262 ASSERT( j == bogus_value, NULL );
263
264 // test copy constructor
265 ASSERT( b.remove_successor( b2 ), NULL );
266 // fill up b:
267 NativeParallelFor( num_threads, parallel_puts<T>(b) );
268 // copy b:
269 tbb::flow::buffer_node<T> b_copy(b);
270
271 // b_copy should be empty
272 j = bogus_value;
273 g.wait_for_all();
274 ASSERT( b_copy.try_get( j ) == false, NULL );
275
276 // hook them together:
277 ASSERT( b.register_successor(b_copy) == true, NULL );
278 // try to get content from b_copy
279 {
280 touches< T > t( num_threads );
281 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
282 g.wait_for_all();
283 ASSERT( t.validate_touches(), NULL );
284 }
285 // now both should be empty
286 j = bogus_value;
287 g.wait_for_all();
288 ASSERT( b.try_get( j ) == false, NULL );
289 g.wait_for_all();
290 ASSERT( b_copy.try_get( j ) == false, NULL );
291 ASSERT( j == bogus_value, NULL );
292
293 delete [] next_value;
294 return 0;
295}
296
297//
298// Tests
299//
300// Predecessors cannot be registered
301// Empty buffer rejects item requests
302// Single serial sender, items in arbitrary order
303// Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
304//
305
306template< typename T >
307int test_serial() {
308 tbb::flow::graph g;
309 T bogus_value(-1);
310
311 tbb::flow::buffer_node<T> b(g);
312 tbb::flow::buffer_node<T> b2(g);
313 T j = bogus_value;
314
315 //
316 // Rejects attempts to add / remove predecessor
317 // Rejects request from empty buffer
318 //
319 ASSERT( b.register_predecessor( b2 ) == false, NULL );
320 ASSERT( b.remove_predecessor( b2 ) == false, NULL );
321 ASSERT( b.try_get( j ) == false, NULL );
322 ASSERT( j == bogus_value, NULL );
323
324 //
325 // Simple puts and gets
326 //
327
328 for (int i = 0; i < N; ++i) {
329 bool msg = b.try_put( T(i) );
330 ASSERT( msg == true, NULL );
331 }
332
333 T vsum = T(0);
334 for (int i = 0; i < N; ++i) {
335 j = bogus_value;
336 spin_try_get( b, j );
337 vsum += j;
338 }
339 ASSERT( vsum == (N*(N-1))/2, NULL);
340 j = bogus_value;
341 g.wait_for_all();
342 ASSERT( b.try_get( j ) == false, NULL );
343 ASSERT( j == bogus_value, NULL );
344
345 tbb::flow::make_edge(b, b2);
346#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
347 ASSERT( b.successor_count() == 1, NULL);
348 ASSERT( b.predecessor_count() == 0, NULL);
349 ASSERT( b2.successor_count() == 0, NULL);
350 ASSERT( b2.predecessor_count() == 1, NULL);
351 typename tbb::flow::buffer_node<T>::successor_list_type my_succs;
352 b.copy_successors(my_succs);
353 ASSERT(my_succs.size() == 1, NULL);
354 typename tbb::flow::buffer_node<T>::predecessor_list_type my_preds;
355 b.copy_predecessors(my_preds);
356 ASSERT(my_preds.size() == 0, NULL);
357#endif
358
359 vsum = T(0);
360 for (int i = 0; i < N; ++i) {
361 bool msg = b.try_put( T(i) );
362 ASSERT( msg == true, NULL );
363 }
364
365 for (int i = 0; i < N; ++i) {
366 j = bogus_value;
367 spin_try_get( b2, j );
368 vsum += j;
369 }
370 ASSERT( vsum == (N*(N-1))/2, NULL);
371 j = bogus_value;
372 g.wait_for_all();
373 ASSERT( b.try_get( j ) == false, NULL );
374 g.wait_for_all();
375 ASSERT( b2.try_get( j ) == false, NULL );
376 ASSERT( j == bogus_value, NULL );
377
378 tbb::flow::remove_edge(b, b2);
379 ASSERT( b.try_put( 1 ) == true, NULL );
380 g.wait_for_all();
381 ASSERT( b2.try_get( j ) == false, NULL );
382 ASSERT( j == bogus_value, NULL );
383 g.wait_for_all();
384 ASSERT( b.try_get( j ) == true, NULL );
385 ASSERT( j == 1, NULL );
386
387 tbb::flow::buffer_node<T> b3(g);
388 tbb::flow::make_edge( b, b2 );
389 tbb::flow::make_edge( b2, b3 );
390
391 vsum = T(0);
392 for (int i = 0; i < N; ++i) {
393 bool msg = b.try_put( T(i) );
394 ASSERT( msg == true, NULL );
395 }
396
397 for (int i = 0; i < N; ++i) {
398 j = bogus_value;
399 spin_try_get( b3, j );
400 vsum += j;
401 }
402 ASSERT( vsum == (N*(N-1))/2, NULL);
403 j = bogus_value;
404 g.wait_for_all();
405 ASSERT( b.try_get( j ) == false, NULL );
406 g.wait_for_all();
407 ASSERT( b2.try_get( j ) == false, NULL );
408 g.wait_for_all();
409 ASSERT( b3.try_get( j ) == false, NULL );
410 ASSERT( j == bogus_value, NULL );
411
412 tbb::flow::remove_edge(b, b2);
413 ASSERT( b.try_put( 1 ) == true, NULL );
414 g.wait_for_all();
415 ASSERT( b2.try_get( j ) == false, NULL );
416 ASSERT( j == bogus_value, NULL );
417 g.wait_for_all();
418 ASSERT( b3.try_get( j ) == false, NULL );
419 ASSERT( j == bogus_value, NULL );
420 g.wait_for_all();
421 ASSERT( b.try_get( j ) == true, NULL );
422 ASSERT( j == 1, NULL );
423
424 return 0;
425}
426
427int TestMain() {
428 tbb::tick_count start = tbb::tick_count::now(), stop;
429 for (int p = 2; p <= 4; ++p) {
430 tbb::task_scheduler_init init(p);
431 test_serial<int>();
432 test_parallel<int>(p);
433 }
434 stop = tbb::tick_count::now();
435 REMARK("Buffer_Node Time=%6.6f\n", (stop-start).seconds());
436 test_resets<int,tbb::flow::buffer_node<int> >();
437 test_resets<float,tbb::flow::buffer_node<float> >();
438#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
439 test_buffer_extract<tbb::flow::buffer_node<int> >().run_tests();
440#endif
441 return Harness::Done;
442}
443