1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#if __TBB_CPF_BUILD
18#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19#endif
20
21#include "harness_graph.h"
22
23#include "tbb/flow_graph.h"
24#include "tbb/task_scheduler_init.h"
25
26#define N 1000
27#define MAX_NODES 4
28#define C 8
29
30struct empty_no_assign : private NoAssign {
31 empty_no_assign() {}
32 empty_no_assign( int ) {}
33 operator int() { return 0; }
34};
35
36// A class to use as a fake predecessor of continue_node
37struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
38{
39 typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
40 // Define implementations of virtual methods that are abstract in the base class
41 bool register_successor( successor_type& ) __TBB_override { return false; }
42 bool remove_successor( successor_type& ) __TBB_override { return false; }
43#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
44 typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type;
45 built_successors_type bst;
46 built_successors_type &built_successors() __TBB_override { return bst; }
47 void internal_add_built_successor( successor_type &) __TBB_override { }
48 void internal_delete_built_successor( successor_type &) __TBB_override { }
49 void copy_successors(successor_list_type &) __TBB_override {}
50 size_t successor_count() __TBB_override {return 0;}
51#endif
52};
53
54template< typename InputType >
55struct parallel_puts : private NoAssign {
56
57 tbb::flow::receiver< InputType > * const my_exe_node;
58
59 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
60
61 void operator()( int ) const {
62 for ( int i = 0; i < N; ++i ) {
63 // the nodes will accept all puts
64 ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
65 }
66 }
67
68};
69
70template< typename OutputType >
71void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
72 fake_continue_sender fake_sender;
73 for (size_t i = 0; i < N; ++i) {
74 n.register_predecessor( fake_sender );
75 }
76
77 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
78 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
79 harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
80
81 for (size_t r = 0; r < num_receivers; ++r ) {
82 tbb::flow::make_edge( n, receivers[r] );
83 }
84#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
85 ASSERT(n.successor_count() == (size_t)num_receivers, NULL);
86 ASSERT(n.predecessor_count() == 0, NULL);
87 typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs;
88 typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type;
89 n.copy_successors(my_succs);
90 ASSERT(my_succs.size() == num_receivers, NULL);
91#endif
92
93 NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
94 g.wait_for_all();
95
96 // 2) the nodes will receive puts from multiple predecessors simultaneously,
97 size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
98 ASSERT( (int)ec == p, NULL );
99 for (size_t r = 0; r < num_receivers; ++r ) {
100 size_t c = receivers[r].my_count;
101 // 3) the nodes will send to multiple successors.
102 ASSERT( (int)c == p, NULL );
103 }
104
105#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
106 for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) {
107 tbb::flow::remove_edge( n, **si );
108 }
109#else
110 for (size_t r = 0; r < num_receivers; ++r ) {
111 tbb::flow::remove_edge( n, receivers[r] );
112 }
113#endif
114 }
115}
116
117template< typename OutputType, typename Body >
118void continue_nodes( Body body ) {
119 for (int p = 1; p < 2*MaxThread; ++p) {
120 tbb::flow::graph g;
121 tbb::flow::continue_node< OutputType > exe_node( g, body );
122 run_continue_nodes( p, g, exe_node);
123 exe_node.try_put(tbb::flow::continue_msg());
124 tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
125 run_continue_nodes( p, g, exe_node_copy);
126 }
127}
128
129const size_t Offset = 123;
130tbb::atomic<size_t> global_execute_count;
131
132template< typename OutputType >
133struct inc_functor {
134
135 tbb::atomic<size_t> local_execute_count;
136 inc_functor( ) { local_execute_count = 0; }
137 inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
138 void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; }
139
140 OutputType operator()( tbb::flow::continue_msg ) {
141 ++global_execute_count;
142 ++local_execute_count;
143 return OutputType();
144 }
145
146};
147
148template< typename OutputType >
149void continue_nodes_with_copy( ) {
150
151 for (int p = 1; p < 2*MaxThread; ++p) {
152 tbb::flow::graph g;
153 inc_functor<OutputType> cf;
154 cf.local_execute_count = Offset;
155 global_execute_count = Offset;
156
157 tbb::flow::continue_node< OutputType > exe_node( g, cf );
158 fake_continue_sender fake_sender;
159 for (size_t i = 0; i < N; ++i) {
160 exe_node.register_predecessor( fake_sender );
161 }
162
163 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
164 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
165
166 for (size_t r = 0; r < num_receivers; ++r ) {
167 tbb::flow::make_edge( exe_node, receivers[r] );
168 }
169
170 NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
171 g.wait_for_all();
172
173 // 2) the nodes will receive puts from multiple predecessors simultaneously,
174 for (size_t r = 0; r < num_receivers; ++r ) {
175 size_t c = receivers[r].my_count;
176 // 3) the nodes will send to multiple successors.
177 ASSERT( (int)c == p, NULL );
178 }
179 for (size_t r = 0; r < num_receivers; ++r ) {
180 tbb::flow::remove_edge( exe_node, receivers[r] );
181 }
182 }
183
184 // validate that the local body matches the global execute_count and both are correct
185 inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
186 const size_t expected_count = p*MAX_NODES + Offset;
187 size_t global_count = global_execute_count;
188 size_t inc_count = body_copy.local_execute_count;
189 ASSERT( global_count == expected_count && global_count == inc_count, NULL );
190 g.reset(tbb::flow::rf_reset_bodies);
191 body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
192 inc_count = body_copy.local_execute_count;
193 ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
194
195 }
196}
197
198template< typename OutputType >
199void run_continue_nodes() {
200 harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
201 #if __TBB_CPP11_LAMBDAS_PRESENT
202 continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
203 #endif
204 continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
205 continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
206 continue_nodes_with_copy<OutputType>();
207}
208
209//! Tests limited concurrency cases for nodes that accept data messages
210void test_concurrency(int num_threads) {
211 tbb::task_scheduler_init init(num_threads);
212 run_continue_nodes<tbb::flow::continue_msg>();
213 run_continue_nodes<int>();
214 run_continue_nodes<empty_no_assign>();
215}
216/*
217 * Connection of two graphs is not currently supported, but works to some limited extent.
218 * This test is included to check for backward compatibility. It checks that a continue_node
219 * with predecessors in two different graphs receives the required
220 * number of continue messages before it executes.
221 */
222using namespace tbb::flow;
223
224struct add_to_counter {
225 int* counter;
226 add_to_counter(int& var):counter(&var){}
227 void operator()(continue_msg){*counter+=1;}
228};
229
230void test_two_graphs(){
231 int count=0;
232
233 //graph g with broadcast_node and continue_node
234 graph g;
235 broadcast_node<continue_msg> start_g(g);
236 continue_node<continue_msg> first_g(g, add_to_counter(count));
237
238 //graph h with broadcast_node
239 graph h;
240 broadcast_node<continue_msg> start_h(h);
241
242 //making two edges to first_g from the two graphs
243 make_edge(start_g,first_g);
244 make_edge(start_h, first_g);
245
246 //two try_puts from the two graphs
247 start_g.try_put(continue_msg());
248 start_h.try_put(continue_msg());
249 g.wait_for_all();
250 ASSERT(count==1, "Not all continue messages received");
251
252 //two try_puts from the graph that doesn't contain the node
253 count=0;
254 start_h.try_put(continue_msg());
255 start_h.try_put(continue_msg());
256 g.wait_for_all();
257 ASSERT(count==1, "Not all continue messages received -1");
258
259 //only one try_put
260 count=0;
261 start_g.try_put(continue_msg());
262 g.wait_for_all();
263 ASSERT(count==0, "Node executed without waiting for all predecessors");
264}
265
266#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
267void test_extract() {
268 int my_count = 0;
269 tbb::flow::continue_msg cm;
270 tbb::flow::graph g;
271 tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
272 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
273 tbb::flow::continue_node<tbb::flow::continue_msg> c0(g, add_to_counter(my_count));
274 tbb::flow::queue_node<tbb::flow::continue_msg> q0(g);
275
276 tbb::flow::make_edge(b0, c0);
277 tbb::flow::make_edge(b1, c0);
278 tbb::flow::make_edge(c0, q0);
279 for( int i = 0; i < 2; ++i ) {
280 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
281 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
282 ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts");
283 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
284
285 /* b0 */
286 /* \ */
287 /* c0 - q0 */
288 /* / */
289 /* b1 */
290
291 b0.try_put(tbb::flow::continue_msg());
292 g.wait_for_all();
293 ASSERT(my_count == 0, "continue_node fired too soon");
294 b1.try_put(tbb::flow::continue_msg());
295 g.wait_for_all();
296 ASSERT(my_count == 1, "continue_node didn't fire");
297 ASSERT(q0.try_get(cm), "continue_node didn't forward");
298
299 b0.extract();
300
301 /* b0 */
302 /* */
303 /* c0 - q0 */
304 /* / */
305 /* b1 */
306
307 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
308 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
309 ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts");
310 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
311 b0.try_put(tbb::flow::continue_msg());
312 b0.try_put(tbb::flow::continue_msg());
313 g.wait_for_all();
314 ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected");
315 b1.try_put(tbb::flow::continue_msg());
316 g.wait_for_all();
317 ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor");
318 ASSERT(q0.try_get(cm), "continue_node didn't forward second time");
319
320 c0.extract();
321
322 /* b0 */
323 /* */
324 /* c0 q0 */
325 /* */
326 /* b1 */
327
328 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
329 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
330 ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts");
331 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
332 b0.try_put(tbb::flow::continue_msg());
333 b0.try_put(tbb::flow::continue_msg());
334 b1.try_put(tbb::flow::continue_msg());
335 b1.try_put(tbb::flow::continue_msg());
336 g.wait_for_all();
337 ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor");
338 ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
339 make_edge(b0, c0);
340
341 /* b0 */
342 /* \ */
343 /* c0 q0 */
344 /* */
345 /* b1 */
346
347 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
348 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
349 ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts");
350 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
351
352 b0.try_put(tbb::flow::continue_msg());
353 g.wait_for_all();
354
355 ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor");
356 ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
357
358 tbb::flow::make_edge(b1, c0);
359 tbb::flow::make_edge(c0, q0);
360 my_count = 0;
361 }
362}
363#endif
364
365struct lightweight_policy_body : NoAssign {
366 const tbb::tbb_thread::id my_thread_id;
367 tbb::atomic<size_t> my_count;
368
369 lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) {
370 my_count = 0;
371 }
372 void operator()(tbb::flow::continue_msg) {
373 ++my_count;
374 tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id();
375 ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight");
376 }
377};
378
379void test_lightweight_policy() {
380 tbb::flow::graph g;
381 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body());
382 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body());
383
384 tbb::flow::make_edge(node1, node2);
385 const size_t n = 10;
386 for(size_t i = 0; i < n; ++i) {
387 node1.try_put(tbb::flow::continue_msg());
388 }
389 g.wait_for_all();
390
391 lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
392 lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
393 ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times");
394 ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times");
395}
396
397int TestMain() {
398 if( MinThread<1 ) {
399 REPORT("number of threads must be positive\n");
400 exit(1);
401 }
402 for( int p=MinThread; p<=MaxThread; ++p ) {
403 test_concurrency(p);
404 }
405 test_two_graphs();
406#if __TBB_PREVIEW_LIGHTWEIGHT_POLICY
407 test_lightweight_policy();
408#endif
409#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
410 test_extract();
411#endif
412 return Harness::Done;
413}
414
415