1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #if __TBB_CPF_BUILD |
18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
19 | #endif |
20 | |
21 | #include "harness_graph.h" |
22 | |
23 | #include "tbb/flow_graph.h" |
24 | #include "tbb/task_scheduler_init.h" |
25 | |
26 | #define N 1000 |
27 | #define MAX_NODES 4 |
28 | #define C 8 |
29 | |
30 | struct empty_no_assign : private NoAssign { |
31 | empty_no_assign() {} |
32 | empty_no_assign( int ) {} |
33 | operator int() { return 0; } |
34 | }; |
35 | |
36 | // A class to use as a fake predecessor of continue_node |
37 | struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> |
38 | { |
39 | typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; |
40 | // Define implementations of virtual methods that are abstract in the base class |
41 | bool register_successor( successor_type& ) __TBB_override { return false; } |
42 | bool remove_successor( successor_type& ) __TBB_override { return false; } |
43 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
44 | typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type; |
45 | built_successors_type bst; |
46 | built_successors_type &built_successors() __TBB_override { return bst; } |
47 | void internal_add_built_successor( successor_type &) __TBB_override { } |
48 | void internal_delete_built_successor( successor_type &) __TBB_override { } |
49 | void copy_successors(successor_list_type &) __TBB_override {} |
50 | size_t successor_count() __TBB_override {return 0;} |
51 | #endif |
52 | }; |
53 | |
54 | template< typename InputType > |
55 | struct parallel_puts : private NoAssign { |
56 | |
57 | tbb::flow::receiver< InputType > * const my_exe_node; |
58 | |
59 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
60 | |
61 | void operator()( int ) const { |
62 | for ( int i = 0; i < N; ++i ) { |
63 | // the nodes will accept all puts |
64 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
65 | } |
66 | } |
67 | |
68 | }; |
69 | |
70 | template< typename OutputType > |
71 | void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { |
72 | fake_continue_sender fake_sender; |
73 | for (size_t i = 0; i < N; ++i) { |
74 | n.register_predecessor( fake_sender ); |
75 | } |
76 | |
77 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
78 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
79 | harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; |
80 | |
81 | for (size_t r = 0; r < num_receivers; ++r ) { |
82 | tbb::flow::make_edge( n, receivers[r] ); |
83 | } |
84 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
85 | ASSERT(n.successor_count() == (size_t)num_receivers, NULL); |
86 | ASSERT(n.predecessor_count() == 0, NULL); |
87 | typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs; |
88 | typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type; |
89 | n.copy_successors(my_succs); |
90 | ASSERT(my_succs.size() == num_receivers, NULL); |
91 | #endif |
92 | |
93 | NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); |
94 | g.wait_for_all(); |
95 | |
96 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
97 | size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; |
98 | ASSERT( (int)ec == p, NULL ); |
99 | for (size_t r = 0; r < num_receivers; ++r ) { |
100 | size_t c = receivers[r].my_count; |
101 | // 3) the nodes will send to multiple successors. |
102 | ASSERT( (int)c == p, NULL ); |
103 | } |
104 | |
105 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
106 | for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) { |
107 | tbb::flow::remove_edge( n, **si ); |
108 | } |
109 | #else |
110 | for (size_t r = 0; r < num_receivers; ++r ) { |
111 | tbb::flow::remove_edge( n, receivers[r] ); |
112 | } |
113 | #endif |
114 | } |
115 | } |
116 | |
117 | template< typename OutputType, typename Body > |
118 | void continue_nodes( Body body ) { |
119 | for (int p = 1; p < 2*MaxThread; ++p) { |
120 | tbb::flow::graph g; |
121 | tbb::flow::continue_node< OutputType > exe_node( g, body ); |
122 | run_continue_nodes( p, g, exe_node); |
123 | exe_node.try_put(tbb::flow::continue_msg()); |
124 | tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); |
125 | run_continue_nodes( p, g, exe_node_copy); |
126 | } |
127 | } |
128 | |
129 | const size_t Offset = 123; |
130 | tbb::atomic<size_t> global_execute_count; |
131 | |
132 | template< typename OutputType > |
133 | struct inc_functor { |
134 | |
135 | tbb::atomic<size_t> local_execute_count; |
136 | inc_functor( ) { local_execute_count = 0; } |
137 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
138 | void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; } |
139 | |
140 | OutputType operator()( tbb::flow::continue_msg ) { |
141 | ++global_execute_count; |
142 | ++local_execute_count; |
143 | return OutputType(); |
144 | } |
145 | |
146 | }; |
147 | |
148 | template< typename OutputType > |
149 | void continue_nodes_with_copy( ) { |
150 | |
151 | for (int p = 1; p < 2*MaxThread; ++p) { |
152 | tbb::flow::graph g; |
153 | inc_functor<OutputType> cf; |
154 | cf.local_execute_count = Offset; |
155 | global_execute_count = Offset; |
156 | |
157 | tbb::flow::continue_node< OutputType > exe_node( g, cf ); |
158 | fake_continue_sender fake_sender; |
159 | for (size_t i = 0; i < N; ++i) { |
160 | exe_node.register_predecessor( fake_sender ); |
161 | } |
162 | |
163 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
164 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
165 | |
166 | for (size_t r = 0; r < num_receivers; ++r ) { |
167 | tbb::flow::make_edge( exe_node, receivers[r] ); |
168 | } |
169 | |
170 | NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); |
171 | g.wait_for_all(); |
172 | |
173 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
174 | for (size_t r = 0; r < num_receivers; ++r ) { |
175 | size_t c = receivers[r].my_count; |
176 | // 3) the nodes will send to multiple successors. |
177 | ASSERT( (int)c == p, NULL ); |
178 | } |
179 | for (size_t r = 0; r < num_receivers; ++r ) { |
180 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
181 | } |
182 | } |
183 | |
184 | // validate that the local body matches the global execute_count and both are correct |
185 | inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); |
186 | const size_t expected_count = p*MAX_NODES + Offset; |
187 | size_t global_count = global_execute_count; |
188 | size_t inc_count = body_copy.local_execute_count; |
189 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
190 | g.reset(tbb::flow::rf_reset_bodies); |
191 | body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); |
192 | inc_count = body_copy.local_execute_count; |
193 | ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); |
194 | |
195 | } |
196 | } |
197 | |
198 | template< typename OutputType > |
199 | void run_continue_nodes() { |
200 | harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; |
201 | #if __TBB_CPP11_LAMBDAS_PRESENT |
202 | continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); |
203 | #endif |
204 | continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); |
205 | continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); |
206 | continue_nodes_with_copy<OutputType>(); |
207 | } |
208 | |
209 | //! Tests limited concurrency cases for nodes that accept data messages |
210 | void test_concurrency(int num_threads) { |
211 | tbb::task_scheduler_init init(num_threads); |
212 | run_continue_nodes<tbb::flow::continue_msg>(); |
213 | run_continue_nodes<int>(); |
214 | run_continue_nodes<empty_no_assign>(); |
215 | } |
216 | /* |
217 | * Connection of two graphs is not currently supported, but works to some limited extent. |
218 | * This test is included to check for backward compatibility. It checks that a continue_node |
219 | * with predecessors in two different graphs receives the required |
220 | * number of continue messages before it executes. |
221 | */ |
222 | using namespace tbb::flow; |
223 | |
224 | struct add_to_counter { |
225 | int* counter; |
226 | add_to_counter(int& var):counter(&var){} |
227 | void operator()(continue_msg){*counter+=1;} |
228 | }; |
229 | |
230 | void test_two_graphs(){ |
231 | int count=0; |
232 | |
233 | //graph g with broadcast_node and continue_node |
234 | graph g; |
235 | broadcast_node<continue_msg> start_g(g); |
236 | continue_node<continue_msg> first_g(g, add_to_counter(count)); |
237 | |
238 | //graph h with broadcast_node |
239 | graph h; |
240 | broadcast_node<continue_msg> start_h(h); |
241 | |
242 | //making two edges to first_g from the two graphs |
243 | make_edge(start_g,first_g); |
244 | make_edge(start_h, first_g); |
245 | |
246 | //two try_puts from the two graphs |
247 | start_g.try_put(continue_msg()); |
248 | start_h.try_put(continue_msg()); |
249 | g.wait_for_all(); |
250 | ASSERT(count==1, "Not all continue messages received" ); |
251 | |
252 | //two try_puts from the graph that doesn't contain the node |
253 | count=0; |
254 | start_h.try_put(continue_msg()); |
255 | start_h.try_put(continue_msg()); |
256 | g.wait_for_all(); |
257 | ASSERT(count==1, "Not all continue messages received -1" ); |
258 | |
259 | //only one try_put |
260 | count=0; |
261 | start_g.try_put(continue_msg()); |
262 | g.wait_for_all(); |
263 | ASSERT(count==0, "Node executed without waiting for all predecessors" ); |
264 | } |
265 | |
266 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
267 | void test_extract() { |
268 | int my_count = 0; |
269 | tbb::flow::continue_msg cm; |
270 | tbb::flow::graph g; |
271 | tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g); |
272 | tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g); |
273 | tbb::flow::continue_node<tbb::flow::continue_msg> c0(g, add_to_counter(my_count)); |
274 | tbb::flow::queue_node<tbb::flow::continue_msg> q0(g); |
275 | |
276 | tbb::flow::make_edge(b0, c0); |
277 | tbb::flow::make_edge(b1, c0); |
278 | tbb::flow::make_edge(c0, q0); |
279 | for( int i = 0; i < 2; ++i ) { |
280 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
281 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
282 | ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts" ); |
283 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
284 | |
285 | /* b0 */ |
286 | /* \ */ |
287 | /* c0 - q0 */ |
288 | /* / */ |
289 | /* b1 */ |
290 | |
291 | b0.try_put(tbb::flow::continue_msg()); |
292 | g.wait_for_all(); |
293 | ASSERT(my_count == 0, "continue_node fired too soon" ); |
294 | b1.try_put(tbb::flow::continue_msg()); |
295 | g.wait_for_all(); |
296 | ASSERT(my_count == 1, "continue_node didn't fire" ); |
297 | ASSERT(q0.try_get(cm), "continue_node didn't forward" ); |
298 | |
299 | b0.extract(); |
300 | |
301 | /* b0 */ |
302 | /* */ |
303 | /* c0 - q0 */ |
304 | /* / */ |
305 | /* b1 */ |
306 | |
307 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
308 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
309 | ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts" ); |
310 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
311 | b0.try_put(tbb::flow::continue_msg()); |
312 | b0.try_put(tbb::flow::continue_msg()); |
313 | g.wait_for_all(); |
314 | ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected" ); |
315 | b1.try_put(tbb::flow::continue_msg()); |
316 | g.wait_for_all(); |
317 | ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor" ); |
318 | ASSERT(q0.try_get(cm), "continue_node didn't forward second time" ); |
319 | |
320 | c0.extract(); |
321 | |
322 | /* b0 */ |
323 | /* */ |
324 | /* c0 q0 */ |
325 | /* */ |
326 | /* b1 */ |
327 | |
328 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
329 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
330 | ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts" ); |
331 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
332 | b0.try_put(tbb::flow::continue_msg()); |
333 | b0.try_put(tbb::flow::continue_msg()); |
334 | b1.try_put(tbb::flow::continue_msg()); |
335 | b1.try_put(tbb::flow::continue_msg()); |
336 | g.wait_for_all(); |
337 | ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor" ); |
338 | ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't" ); |
339 | make_edge(b0, c0); |
340 | |
341 | /* b0 */ |
342 | /* \ */ |
343 | /* c0 q0 */ |
344 | /* */ |
345 | /* b1 */ |
346 | |
347 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
348 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
349 | ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts" ); |
350 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
351 | |
352 | b0.try_put(tbb::flow::continue_msg()); |
353 | g.wait_for_all(); |
354 | |
355 | ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor" ); |
356 | ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't" ); |
357 | |
358 | tbb::flow::make_edge(b1, c0); |
359 | tbb::flow::make_edge(c0, q0); |
360 | my_count = 0; |
361 | } |
362 | } |
363 | #endif |
364 | |
365 | struct lightweight_policy_body : NoAssign { |
366 | const tbb::tbb_thread::id my_thread_id; |
367 | tbb::atomic<size_t> my_count; |
368 | |
369 | lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) { |
370 | my_count = 0; |
371 | } |
372 | void operator()(tbb::flow::continue_msg) { |
373 | ++my_count; |
374 | tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id(); |
375 | ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight" ); |
376 | } |
377 | }; |
378 | |
379 | void test_lightweight_policy() { |
380 | tbb::flow::graph g; |
381 | tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body()); |
382 | tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body()); |
383 | |
384 | tbb::flow::make_edge(node1, node2); |
385 | const size_t n = 10; |
386 | for(size_t i = 0; i < n; ++i) { |
387 | node1.try_put(tbb::flow::continue_msg()); |
388 | } |
389 | g.wait_for_all(); |
390 | |
391 | lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); |
392 | lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); |
393 | ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times" ); |
394 | ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times" ); |
395 | } |
396 | |
397 | int TestMain() { |
398 | if( MinThread<1 ) { |
399 | REPORT("number of threads must be positive\n" ); |
400 | exit(1); |
401 | } |
402 | for( int p=MinThread; p<=MaxThread; ++p ) { |
403 | test_concurrency(p); |
404 | } |
405 | test_two_graphs(); |
406 | #if __TBB_PREVIEW_LIGHTWEIGHT_POLICY |
407 | test_lightweight_policy(); |
408 | #endif |
409 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
410 | test_extract(); |
411 | #endif |
412 | return Harness::Done; |
413 | } |
414 | |
415 | |