1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness_graph.h"
18#include "harness_barrier.h"
19#include "tbb/flow_graph.h"
20#include "tbb/task_scheduler_init.h"
21
22const int T = 4;
23const int W = 4;
24
25struct decrement_wait : NoAssign {
26
27 tbb::flow::graph * const my_graph;
28 bool * const my_done_flag;
29
30 decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {}
31
32 void operator()(int i) const {
33 Harness::Sleep(10*i);
34 my_done_flag[i] = true;
35 my_graph->decrement_wait_count();
36 }
37};
38
39static void test_wait_count() {
40 tbb::flow::graph h;
41 for (int i = 0; i < T; ++i ) {
42 bool done_flag[W];
43 for (int j = 0; j < W; ++j ) {
44 for ( int w = 0; w < W; ++w ) done_flag[w] = false;
45 for ( int w = 0; w < j; ++w ) h.increment_wait_count();
46
47 NativeParallelFor( j, decrement_wait(h, done_flag) );
48 h.wait_for_all();
49 for ( int w = 0; w < W; ++w ) {
50 if ( w < j ) ASSERT( done_flag[w] == true, NULL );
51 else ASSERT( done_flag[w] == false, NULL );
52 }
53 }
54 }
55}
56
57const int F = 100;
58
59#if __TBB_CPP11_LAMBDAS_PRESENT
60bool lambda_flag[F];
61#endif
62bool functor_flag[F];
63
64struct set_functor {
65 int my_i;
66 set_functor( int i ) : my_i(i) {}
67 void operator()() { functor_flag[my_i] = true; }
68};
69
70struct return_functor {
71 int my_i;
72 return_functor( int i ) : my_i(i) {}
73 int operator()() { return my_i; }
74};
75
76static void test_run() {
77 tbb::flow::graph h;
78 for (int i = 0; i < T; ++i ) {
79
80 // Create receivers and flag arrays
81 #if __TBB_CPP11_LAMBDAS_PRESENT
82 harness_mapped_receiver<int> lambda_r(h);
83 lambda_r.initialize_map( F, 1 );
84 #endif
85 harness_mapped_receiver<int> functor_r(h);
86 functor_r.initialize_map( F, 1 );
87
88 // Initialize flag arrays
89 for (int j = 0; j < F; ++j ) {
90 #if __TBB_CPP11_LAMBDAS_PRESENT
91 lambda_flag[j] = false;
92 #endif
93 functor_flag[j] = false;
94 }
95
96 for ( int j = 0; j < F; ++j ) {
97 #if __TBB_CPP11_LAMBDAS_PRESENT
98 h.run( [=]() { lambda_flag[j] = true; } );
99 h.run( lambda_r, [=]() { return j; } );
100 #endif
101 h.run( set_functor(j) );
102 h.run( functor_r, return_functor(j) );
103 }
104 h.wait_for_all();
105 for ( int j = 0; j < F; ++j ) {
106 #if __TBB_CPP11_LAMBDAS_PRESENT
107 ASSERT( lambda_flag[i] == true, NULL );
108 #endif
109 ASSERT( functor_flag[i] == true, NULL );
110 }
111 #if __TBB_CPP11_LAMBDAS_PRESENT
112 lambda_r.validate();
113 #endif
114 functor_r.validate();
115 }
116}
117
118// Encapsulate object we want to store in vector (because contained type must have
119// copy constructor and assignment operator
120class my_int_buffer {
121 tbb::flow::buffer_node<int> *b;
122 tbb::flow::graph& my_graph;
123public:
124 my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); }
125 my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) {
126 b = new tbb::flow::buffer_node<int>(my_graph);
127 }
128 ~my_int_buffer() { delete b; }
129 my_int_buffer& operator=(const my_int_buffer& /*other*/) {
130 return *this;
131 }
132};
133
134// test the graph iterator, delete nodes from graph, test again
135void test_iterator() {
136 tbb::flow::graph g;
137 my_int_buffer a_buffer(g);
138 my_int_buffer b_buffer(g);
139 my_int_buffer c_buffer(g);
140 my_int_buffer *d_buffer = new my_int_buffer(g);
141 my_int_buffer e_buffer(g);
142 std::vector< my_int_buffer > my_buffer_vector(10, c_buffer);
143
144 int count = 0;
145 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
146 count++;
147 }
148 ASSERT(count==15, "error in iterator count");
149
150 delete d_buffer;
151
152 count = 0;
153 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
154 count++;
155 }
156 ASSERT(count==14, "error in iterator count");
157
158 my_buffer_vector.clear();
159
160 count = 0;
161 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
162 count++;
163 }
164 ASSERT(count==4, "error in iterator count");
165}
166
167class AddRemoveBody : NoAssign {
168 tbb::flow::graph& g;
169 int nThreads;
170 Harness::SpinBarrier &barrier;
171public:
172 AddRemoveBody(int nthr, Harness::SpinBarrier &barrier_, tbb::flow::graph& _g) :
173 g(_g), nThreads(nthr), barrier(barrier_)
174 {}
175 void operator()(const int /*threadID*/) const {
176 my_int_buffer b(g);
177 {
178 std::vector<my_int_buffer> my_buffer_vector(100, b);
179 barrier.wait(); // wait until all nodes are created
180 // now test that the proper number of nodes were created
181 int count = 0;
182 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
183 count++;
184 }
185 ASSERT(count==101*nThreads, "error in iterator count");
186 barrier.wait(); // wait until all threads are done counting
187 } // all nodes but for the initial node on this thread are deleted
188 barrier.wait(); // wait until all threads have deleted all nodes in their vectors
189 // now test that all the nodes were deleted except for the initial node
190 int count = 0;
191 for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
192 count++;
193 }
194 ASSERT(count==nThreads, "error in iterator count");
195 barrier.wait(); // wait until all threads are done counting
196 } // initial node gets deleted
197};
198
199void test_parallel(int nThreads) {
200 tbb::flow::graph g;
201 Harness::SpinBarrier barrier(nThreads);
202 AddRemoveBody body(nThreads, barrier, g);
203 NativeParallelFor(nThreads, body);
204}
205
206/*
207 * Functors for graph arena spawn tests
208 */
209
210inline void check_arena(tbb::task_arena* a) {
211 ASSERT(a->max_concurrency() == 2, NULL);
212 ASSERT(tbb::this_task_arena::max_concurrency() == 1, NULL);
213}
214
215struct run_functor {
216 tbb::task_arena* my_a;
217 int return_value;
218 run_functor(tbb::task_arena* a) : my_a(a), return_value(1) {}
219 int operator()() {
220 check_arena(my_a);
221 return return_value;
222 }
223};
224
225template < typename T >
226struct function_body {
227 tbb::task_arena* my_a;
228 function_body(tbb::task_arena* a) : my_a(a) {}
229 tbb::flow::continue_msg operator()(const T& /*arg*/) {
230 check_arena(my_a);
231 return tbb::flow::continue_msg();
232 }
233};
234
235typedef tbb::flow::multifunction_node< int, tbb::flow::tuple< int > > mf_node;
236
237struct multifunction_body {
238 tbb::task_arena* my_a;
239 multifunction_body(tbb::task_arena* a) : my_a(a) {}
240 void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) {
241 check_arena(my_a);
242 }
243};
244
245struct source_body {
246 tbb::task_arena* my_a;
247 int counter;
248 source_body(tbb::task_arena* a) : my_a(a), counter(0) {}
249 bool operator()(const int& /*i*/) {
250 check_arena(my_a);
251 if (counter < 1) {
252 ++counter;
253 return true;
254 }
255 return false;
256 }
257};
258
259struct run_test_functor : tbb::internal::no_assign {
260 tbb::task_arena* fg_arena;
261 tbb::flow::graph& my_graph;
262
263 run_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {}
264 void operator()() const {
265 harness_mapped_receiver<int> functor_r(my_graph);
266 functor_r.initialize_map(F, 1);
267
268 my_graph.run(run_functor(fg_arena));
269 my_graph.run(functor_r, run_functor(fg_arena));
270
271 my_graph.wait_for_all();
272 }
273};
274
275struct nodes_test_functor : tbb::internal::no_assign {
276 tbb::task_arena* fg_arena;
277 tbb::flow::graph& my_graph;
278
279 nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {}
280 void operator()() const {
281
282 // Define test nodes
283 // Continue, function, source nodes
284 tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(fg_arena));
285 tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(fg_arena));
286 tbb::flow::source_node< int > s_n(my_graph, source_body(fg_arena), false);
287
288 // Multifunction node
289 mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(fg_arena));
290
291 // Join node
292 tbb::flow::function_node< tbb::flow::tuple< int, int > > join_f_n(my_graph, tbb::flow::unlimited, function_body< tbb::flow::tuple< int, int > >(fg_arena));
293 tbb::flow::join_node< tbb::flow::tuple< int, int > > j_n(my_graph);
294 make_edge(j_n, join_f_n);
295
296 // Split node
297 tbb::flow::function_node< int > split_f_n1 = f_n;
298 tbb::flow::function_node< int > split_f_n2 = f_n;
299 tbb::flow::split_node< tbb::flow::tuple< int, int > > sp_n(my_graph);
300 make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1);
301 make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2);
302
303 // Overwrite node
304 tbb::flow::function_node< int > ow_f_n = f_n;
305 tbb::flow::overwrite_node< int > ow_n(my_graph);
306 make_edge(ow_n, ow_f_n);
307
308 // Write once node
309 tbb::flow::function_node< int > w_f_n = f_n;
310 tbb::flow::write_once_node< int > w_n(my_graph);
311 make_edge(w_n, w_f_n);
312
313 // Buffer node
314 tbb::flow::function_node< int > buf_f_n = f_n;
315 tbb::flow::buffer_node< int > buf_n(my_graph);
316 make_edge(w_n, buf_f_n);
317
318 // Limiter node
319 tbb::flow::function_node< int > l_f_n = f_n;
320 tbb::flow::limiter_node< int > l_n(my_graph, 1);
321 make_edge(l_n, l_f_n);
322
323 // Execute nodes
324 c_n.try_put( tbb::flow::continue_msg() );
325 f_n.try_put(1);
326 m_n.try_put(1);
327 s_n.activate();
328
329 tbb::flow::input_port<0>(j_n).try_put(1);
330 tbb::flow::input_port<1>(j_n).try_put(1);
331
332 tbb::flow::tuple< int, int > sp_tuple(1, 1);
333 sp_n.try_put(sp_tuple);
334
335 ow_n.try_put(1);
336 w_n.try_put(1);
337 buf_n.try_put(1);
338 l_n.try_put(1);
339
340 my_graph.wait_for_all();
341 }
342};
343
344void test_graph_arena() {
345 // There is only one thread for execution (master thread).
346 // So, if graph's tasks get spawned in different arena
347 // master thread won't be able to find them in its own arena.
348 // In this case test should hang.
349 tbb::task_scheduler_init init(1);
350
351 tbb::flow::graph g;
352 tbb::task_arena fg_arena;
353 fg_arena.initialize(2);
354 fg_arena.execute(run_test_functor(&fg_arena, g));
355 fg_arena.execute(nodes_test_functor(&fg_arena, g));
356}
357
358int TestMain() {
359 if( MinThread<1 ) {
360 REPORT("number of threads must be positive\n");
361 exit(1);
362 }
363 for( int p=MinThread; p<=MaxThread; ++p ) {
364 tbb::task_scheduler_init init(p);
365 test_wait_count();
366 test_run();
367 test_iterator();
368 test_parallel(p);
369 }
370 test_graph_arena();
371 return Harness::Done;
372}
373