1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "harness_graph.h" |
18 | #include "harness_barrier.h" |
19 | #include "tbb/flow_graph.h" |
20 | #include "tbb/task_scheduler_init.h" |
21 | |
22 | const int T = 4; |
23 | const int W = 4; |
24 | |
25 | struct decrement_wait : NoAssign { |
26 | |
27 | tbb::flow::graph * const my_graph; |
28 | bool * const my_done_flag; |
29 | |
30 | decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {} |
31 | |
32 | void operator()(int i) const { |
33 | Harness::Sleep(10*i); |
34 | my_done_flag[i] = true; |
35 | my_graph->decrement_wait_count(); |
36 | } |
37 | }; |
38 | |
39 | static void test_wait_count() { |
40 | tbb::flow::graph h; |
41 | for (int i = 0; i < T; ++i ) { |
42 | bool done_flag[W]; |
43 | for (int j = 0; j < W; ++j ) { |
44 | for ( int w = 0; w < W; ++w ) done_flag[w] = false; |
45 | for ( int w = 0; w < j; ++w ) h.increment_wait_count(); |
46 | |
47 | NativeParallelFor( j, decrement_wait(h, done_flag) ); |
48 | h.wait_for_all(); |
49 | for ( int w = 0; w < W; ++w ) { |
50 | if ( w < j ) ASSERT( done_flag[w] == true, NULL ); |
51 | else ASSERT( done_flag[w] == false, NULL ); |
52 | } |
53 | } |
54 | } |
55 | } |
56 | |
57 | const int F = 100; |
58 | |
59 | #if __TBB_CPP11_LAMBDAS_PRESENT |
60 | bool lambda_flag[F]; |
61 | #endif |
62 | bool functor_flag[F]; |
63 | |
64 | struct set_functor { |
65 | int my_i; |
66 | set_functor( int i ) : my_i(i) {} |
67 | void operator()() { functor_flag[my_i] = true; } |
68 | }; |
69 | |
70 | struct return_functor { |
71 | int my_i; |
72 | return_functor( int i ) : my_i(i) {} |
73 | int operator()() { return my_i; } |
74 | }; |
75 | |
76 | static void test_run() { |
77 | tbb::flow::graph h; |
78 | for (int i = 0; i < T; ++i ) { |
79 | |
80 | // Create receivers and flag arrays |
81 | #if __TBB_CPP11_LAMBDAS_PRESENT |
82 | harness_mapped_receiver<int> lambda_r(h); |
83 | lambda_r.initialize_map( F, 1 ); |
84 | #endif |
85 | harness_mapped_receiver<int> functor_r(h); |
86 | functor_r.initialize_map( F, 1 ); |
87 | |
88 | // Initialize flag arrays |
89 | for (int j = 0; j < F; ++j ) { |
90 | #if __TBB_CPP11_LAMBDAS_PRESENT |
91 | lambda_flag[j] = false; |
92 | #endif |
93 | functor_flag[j] = false; |
94 | } |
95 | |
96 | for ( int j = 0; j < F; ++j ) { |
97 | #if __TBB_CPP11_LAMBDAS_PRESENT |
98 | h.run( [=]() { lambda_flag[j] = true; } ); |
99 | h.run( lambda_r, [=]() { return j; } ); |
100 | #endif |
101 | h.run( set_functor(j) ); |
102 | h.run( functor_r, return_functor(j) ); |
103 | } |
104 | h.wait_for_all(); |
105 | for ( int j = 0; j < F; ++j ) { |
106 | #if __TBB_CPP11_LAMBDAS_PRESENT |
107 | ASSERT( lambda_flag[i] == true, NULL ); |
108 | #endif |
109 | ASSERT( functor_flag[i] == true, NULL ); |
110 | } |
111 | #if __TBB_CPP11_LAMBDAS_PRESENT |
112 | lambda_r.validate(); |
113 | #endif |
114 | functor_r.validate(); |
115 | } |
116 | } |
117 | |
118 | // Encapsulate object we want to store in vector (because contained type must have |
119 | // copy constructor and assignment operator |
120 | class my_int_buffer { |
121 | tbb::flow::buffer_node<int> *b; |
122 | tbb::flow::graph& my_graph; |
123 | public: |
124 | my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); } |
125 | my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) { |
126 | b = new tbb::flow::buffer_node<int>(my_graph); |
127 | } |
128 | ~my_int_buffer() { delete b; } |
129 | my_int_buffer& operator=(const my_int_buffer& /*other*/) { |
130 | return *this; |
131 | } |
132 | }; |
133 | |
134 | // test the graph iterator, delete nodes from graph, test again |
135 | void test_iterator() { |
136 | tbb::flow::graph g; |
137 | my_int_buffer a_buffer(g); |
138 | my_int_buffer b_buffer(g); |
139 | my_int_buffer c_buffer(g); |
140 | my_int_buffer *d_buffer = new my_int_buffer(g); |
141 | my_int_buffer e_buffer(g); |
142 | std::vector< my_int_buffer > my_buffer_vector(10, c_buffer); |
143 | |
144 | int count = 0; |
145 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
146 | count++; |
147 | } |
148 | ASSERT(count==15, "error in iterator count" ); |
149 | |
150 | delete d_buffer; |
151 | |
152 | count = 0; |
153 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
154 | count++; |
155 | } |
156 | ASSERT(count==14, "error in iterator count" ); |
157 | |
158 | my_buffer_vector.clear(); |
159 | |
160 | count = 0; |
161 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
162 | count++; |
163 | } |
164 | ASSERT(count==4, "error in iterator count" ); |
165 | } |
166 | |
167 | class AddRemoveBody : NoAssign { |
168 | tbb::flow::graph& g; |
169 | int nThreads; |
170 | Harness::SpinBarrier &barrier; |
171 | public: |
172 | AddRemoveBody(int nthr, Harness::SpinBarrier &barrier_, tbb::flow::graph& _g) : |
173 | g(_g), nThreads(nthr), barrier(barrier_) |
174 | {} |
175 | void operator()(const int /*threadID*/) const { |
176 | my_int_buffer b(g); |
177 | { |
178 | std::vector<my_int_buffer> my_buffer_vector(100, b); |
179 | barrier.wait(); // wait until all nodes are created |
180 | // now test that the proper number of nodes were created |
181 | int count = 0; |
182 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
183 | count++; |
184 | } |
185 | ASSERT(count==101*nThreads, "error in iterator count" ); |
186 | barrier.wait(); // wait until all threads are done counting |
187 | } // all nodes but for the initial node on this thread are deleted |
188 | barrier.wait(); // wait until all threads have deleted all nodes in their vectors |
189 | // now test that all the nodes were deleted except for the initial node |
190 | int count = 0; |
191 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
192 | count++; |
193 | } |
194 | ASSERT(count==nThreads, "error in iterator count" ); |
195 | barrier.wait(); // wait until all threads are done counting |
196 | } // initial node gets deleted |
197 | }; |
198 | |
199 | void test_parallel(int nThreads) { |
200 | tbb::flow::graph g; |
201 | Harness::SpinBarrier barrier(nThreads); |
202 | AddRemoveBody body(nThreads, barrier, g); |
203 | NativeParallelFor(nThreads, body); |
204 | } |
205 | |
206 | /* |
207 | * Functors for graph arena spawn tests |
208 | */ |
209 | |
210 | inline void check_arena(tbb::task_arena* a) { |
211 | ASSERT(a->max_concurrency() == 2, NULL); |
212 | ASSERT(tbb::this_task_arena::max_concurrency() == 1, NULL); |
213 | } |
214 | |
215 | struct run_functor { |
216 | tbb::task_arena* my_a; |
217 | int return_value; |
218 | run_functor(tbb::task_arena* a) : my_a(a), return_value(1) {} |
219 | int operator()() { |
220 | check_arena(my_a); |
221 | return return_value; |
222 | } |
223 | }; |
224 | |
225 | template < typename T > |
226 | struct function_body { |
227 | tbb::task_arena* my_a; |
228 | function_body(tbb::task_arena* a) : my_a(a) {} |
229 | tbb::flow::continue_msg operator()(const T& /*arg*/) { |
230 | check_arena(my_a); |
231 | return tbb::flow::continue_msg(); |
232 | } |
233 | }; |
234 | |
235 | typedef tbb::flow::multifunction_node< int, tbb::flow::tuple< int > > mf_node; |
236 | |
237 | struct multifunction_body { |
238 | tbb::task_arena* my_a; |
239 | multifunction_body(tbb::task_arena* a) : my_a(a) {} |
240 | void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) { |
241 | check_arena(my_a); |
242 | } |
243 | }; |
244 | |
245 | struct source_body { |
246 | tbb::task_arena* my_a; |
247 | int counter; |
248 | source_body(tbb::task_arena* a) : my_a(a), counter(0) {} |
249 | bool operator()(const int& /*i*/) { |
250 | check_arena(my_a); |
251 | if (counter < 1) { |
252 | ++counter; |
253 | return true; |
254 | } |
255 | return false; |
256 | } |
257 | }; |
258 | |
259 | struct run_test_functor : tbb::internal::no_assign { |
260 | tbb::task_arena* fg_arena; |
261 | tbb::flow::graph& my_graph; |
262 | |
263 | run_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {} |
264 | void operator()() const { |
265 | harness_mapped_receiver<int> functor_r(my_graph); |
266 | functor_r.initialize_map(F, 1); |
267 | |
268 | my_graph.run(run_functor(fg_arena)); |
269 | my_graph.run(functor_r, run_functor(fg_arena)); |
270 | |
271 | my_graph.wait_for_all(); |
272 | } |
273 | }; |
274 | |
275 | struct nodes_test_functor : tbb::internal::no_assign { |
276 | tbb::task_arena* fg_arena; |
277 | tbb::flow::graph& my_graph; |
278 | |
279 | nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {} |
280 | void operator()() const { |
281 | |
282 | // Define test nodes |
283 | // Continue, function, source nodes |
284 | tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(fg_arena)); |
285 | tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(fg_arena)); |
286 | tbb::flow::source_node< int > s_n(my_graph, source_body(fg_arena), false); |
287 | |
288 | // Multifunction node |
289 | mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(fg_arena)); |
290 | |
291 | // Join node |
292 | tbb::flow::function_node< tbb::flow::tuple< int, int > > join_f_n(my_graph, tbb::flow::unlimited, function_body< tbb::flow::tuple< int, int > >(fg_arena)); |
293 | tbb::flow::join_node< tbb::flow::tuple< int, int > > j_n(my_graph); |
294 | make_edge(j_n, join_f_n); |
295 | |
296 | // Split node |
297 | tbb::flow::function_node< int > split_f_n1 = f_n; |
298 | tbb::flow::function_node< int > split_f_n2 = f_n; |
299 | tbb::flow::split_node< tbb::flow::tuple< int, int > > sp_n(my_graph); |
300 | make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1); |
301 | make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2); |
302 | |
303 | // Overwrite node |
304 | tbb::flow::function_node< int > ow_f_n = f_n; |
305 | tbb::flow::overwrite_node< int > ow_n(my_graph); |
306 | make_edge(ow_n, ow_f_n); |
307 | |
308 | // Write once node |
309 | tbb::flow::function_node< int > w_f_n = f_n; |
310 | tbb::flow::write_once_node< int > w_n(my_graph); |
311 | make_edge(w_n, w_f_n); |
312 | |
313 | // Buffer node |
314 | tbb::flow::function_node< int > buf_f_n = f_n; |
315 | tbb::flow::buffer_node< int > buf_n(my_graph); |
316 | make_edge(w_n, buf_f_n); |
317 | |
318 | // Limiter node |
319 | tbb::flow::function_node< int > l_f_n = f_n; |
320 | tbb::flow::limiter_node< int > l_n(my_graph, 1); |
321 | make_edge(l_n, l_f_n); |
322 | |
323 | // Execute nodes |
324 | c_n.try_put( tbb::flow::continue_msg() ); |
325 | f_n.try_put(1); |
326 | m_n.try_put(1); |
327 | s_n.activate(); |
328 | |
329 | tbb::flow::input_port<0>(j_n).try_put(1); |
330 | tbb::flow::input_port<1>(j_n).try_put(1); |
331 | |
332 | tbb::flow::tuple< int, int > sp_tuple(1, 1); |
333 | sp_n.try_put(sp_tuple); |
334 | |
335 | ow_n.try_put(1); |
336 | w_n.try_put(1); |
337 | buf_n.try_put(1); |
338 | l_n.try_put(1); |
339 | |
340 | my_graph.wait_for_all(); |
341 | } |
342 | }; |
343 | |
344 | void test_graph_arena() { |
345 | // There is only one thread for execution (master thread). |
346 | // So, if graph's tasks get spawned in different arena |
347 | // master thread won't be able to find them in its own arena. |
348 | // In this case test should hang. |
349 | tbb::task_scheduler_init init(1); |
350 | |
351 | tbb::flow::graph g; |
352 | tbb::task_arena fg_arena; |
353 | fg_arena.initialize(2); |
354 | fg_arena.execute(run_test_functor(&fg_arena, g)); |
355 | fg_arena.execute(nodes_test_functor(&fg_arena, g)); |
356 | } |
357 | |
358 | int TestMain() { |
359 | if( MinThread<1 ) { |
360 | REPORT("number of threads must be positive\n" ); |
361 | exit(1); |
362 | } |
363 | for( int p=MinThread; p<=MaxThread; ++p ) { |
364 | tbb::task_scheduler_init init(p); |
365 | test_wait_count(); |
366 | test_run(); |
367 | test_iterator(); |
368 | test_parallel(p); |
369 | } |
370 | test_graph_arena(); |
371 | return Harness::Done; |
372 | } |
373 | |