1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define HARNESS_DEFAULT_MIN_THREADS 3
18#define HARNESS_DEFAULT_MAX_THREADS 4
19
20#if _MSC_VER
21 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
22 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
23 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
24 #pragma warning (disable: 4702)
25 #endif
26#endif
27
28#include "harness.h"
29#include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
30
31// need these to get proper external names for private methods in library.
32#include "tbb/spin_mutex.h"
33#include "tbb/spin_rw_mutex.h"
34#include "tbb/task.h"
35#include "tbb/task_arena.h"
36
37#define private public
38#define protected public
39#include "tbb/flow_graph.h"
40#undef protected
41#undef private
42#include "tbb/task_scheduler_init.h"
43#include "harness_graph.h"
44
45template<typename T>
46struct receiverBody {
47 tbb::flow::continue_msg operator()(const T &/*in*/) {
48 return tbb::flow::continue_msg();
49 }
50};
51
52// split_nodes cannot have predecessors
53// they do not reject messages and always forward.
54// they reject edge reversals from successors.
55void TestSplitNode() {
56 typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type;
57 tbb::flow::graph g;
58 snode_type snode(g);
59 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
60 REMARK("Testing split_node\n");
61 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors");
62 // tbb::flow::output_port<0>(snode)
63 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
64 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor.");
65 snode.try_put(tbb::flow::tuple<int>(1));
66 g.wait_for_all();
67 g.reset();
68 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor.");
69 g.reset(tbb::flow::rf_clear_edges);
70 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor.");
71}
72
73// buffering nodes cannot have predecessors
74// they do not reject messages and always save or forward
75// they allow edge reversals from successors
76template< typename B >
77void TestBufferingNode(const char * name) {
78 tbb::flow::graph g;
79 B bnode(g);
80 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
81 REMARK("Testing %s:", name);
82 for(int icnt = 0; icnt < 2; icnt++) {
83 bool reverse_edge = (icnt & 0x2) != 0;
84 serial_fn_state0 = 0; // reset to waiting state.
85 REMARK(" make_edge");
86 tbb::flow::make_edge(bnode, fnode);
87 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
88 REMARK(" try_put");
89 bnode.try_put(1); // will forward to the fnode
90 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
91 if(reverse_edge) {
92 REMARK(" try_put2");
93 bnode.try_put(2); // will reverse the edge
94 // cannot do a wait_for_all here; the function_node is still executing
95 BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
96 // at this point the only task running is the one for the function_node.
97 ASSERT(bnode.my_successors.empty(), "successor not removed");
98 }
99 else {
100 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
101 }
102 serial_fn_state0 = 0; // release the function_node.
103 if(reverse_edge) {
104 // have to do a second release because the function_node will get the 2nd item
105 BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
106 serial_fn_state0 = 0; // release the function_node.
107 }
108 g.wait_for_all();
109 REMARK(" remove_edge");
110 tbb::flow::remove_edge(bnode, fnode);
111 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
112 }
113 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
114 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
115 g.wait_for_all();
116 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
117 REMARK(" reverse");
118 bnode.try_put(1); // the edge should reverse
119 g.wait_for_all();
120 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
121 REMARK(" reset()");
122 g.wait_for_all();
123 g.reset(); // should be in forward direction again
124 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
125 REMARK(" remove_edge");
126 g.reset(tbb::flow::rf_clear_edges);
127 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
128 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again
129 // reverse edge by adding to buffer.
130 bnode.try_put(1); // the edge should reverse
131 g.wait_for_all();
132 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
133 REMARK(" remove_edge(reversed)");
134 g.reset(tbb::flow::rf_clear_edges);
135 ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()");
136 ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset");
137 REMARK(" done\n");
138 g.wait_for_all();
139}
140
141// continue_node has only predecessor count
142// they do not have predecessors, only the counts
143// successor edges cannot be reversed
144void TestContinueNode() {
145 tbb::flow::graph g;
146 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
147 tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
148 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
149 tbb::flow::make_edge(fnode0, cnode);
150 tbb::flow::make_edge(cnode, fnode1);
151 REMARK("Testing continue_node:");
152 for( int icnt = 0; icnt < 2; ++icnt ) {
153 REMARK( " initial%d", icnt);
154 ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count");
155 ASSERT(!cnode.successors().empty(), "successors empty though we added one");
156 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect");
157 serial_continue_state0 = 0;
158 serial_fn_state0 = 0;
159 serial_fn_state1 = 0;
160
161 fnode0.try_put(1); // start the first function node.
162 BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
163 // Now the body of function_node 0 is executing.
164 serial_fn_state0 = 0; // release the node
165 // wait for node to count the message (or for the node body to execute, which would be wrong)
166 BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
167 ASSERT(serial_continue_state0 == 0, "Improperly released continue_node");
168 ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect");
169 if(icnt == 0) { // first time through, let the continue_node fire
170 REMARK(" firing");
171 fnode0.try_put(1); // second message
172 BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
173 // Now the body of function_node 0 is executing.
174 serial_fn_state0 = 0; // release the node
175
176 BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start"); // now we wait for the continue_node.
177 ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started");
178 serial_continue_state0 = 0; // release the continue_node
179 BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start"); // wait for the successor function_node to enter body
180 serial_fn_state1 = 0; // release successor function_node.
181 g.wait_for_all();
182
183 // try a try_get()
184 {
185 int i;
186 ASSERT(!cnode.try_get(i), "try_get not rejected");
187 }
188
189 REMARK(" reset");
190 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
191 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
192 g.reset(); // should still be the same
193 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" );
194 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)");
195 }
196 else { // we're going to see if the rf_clear_edges resets things.
197 g.wait_for_all();
198 REMARK(" reset(rf_clear_edges)");
199 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
200 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
201 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
202 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)");
203 ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
204 ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset");
205 }
206 }
207
208 REMARK(" done\n");
209
210}
211
212// function_node has predecessors and successors
213// try_get() rejects
214// successor edges cannot be reversed
215// predecessors will reverse (only rejecting will reverse)
216void TestFunctionNode() {
217 tbb::flow::graph g;
218 tbb::flow::queue_node<int> qnode0(g);
219 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
220 // queueing function node
221 tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
222
223 tbb::flow::queue_node<int> qnode1(g);
224
225 tbb::flow::make_edge(fnode0, qnode1);
226 tbb::flow::make_edge(qnode0, fnode0);
227
228 serial_fn_state0 = 2; // just let it go
229 // see if the darned thing will work....
230 qnode0.try_put(1);
231 g.wait_for_all();
232 int ii;
233 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
234 tbb::flow::remove_edge(qnode0, fnode0);
235 tbb::flow::remove_edge(fnode0, qnode1);
236
237 tbb::flow::make_edge(fnode1, qnode1);
238 tbb::flow::make_edge(qnode0, fnode1);
239
240 serial_fn_state0 = 2; // just let it go
241 // see if the darned thing will work....
242 qnode0.try_put(1);
243 g.wait_for_all();
244 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
245 tbb::flow::remove_edge(qnode0, fnode1);
246 tbb::flow::remove_edge(fnode1, qnode1);
247
248 // rejecting
249 serial_fn_state0 = 0;
250 tbb::flow::make_edge(fnode0, qnode1);
251 tbb::flow::make_edge(qnode0, fnode0);
252 REMARK("Testing rejecting function_node:");
253 ASSERT(!fnode0.my_queue, "node should have no queue");
254 ASSERT(!fnode0.my_successors.empty(), "successor edge not added");
255 qnode0.try_put(1);
256 BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
257 qnode0.try_put(2); // rejecting node should reject, reverse.
258 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
259 serial_fn_state0 = 2; // release function_node body.
260 g.wait_for_all();
261 REMARK(" reset");
262 g.reset(); // should reverse the edge from the input to the function node.
263 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
264 ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed");
265 tbb::flow::remove_edge(qnode0, fnode0);
266 tbb::flow::remove_edge(fnode0, qnode1);
267 REMARK("\n");
268
269 // queueing
270 tbb::flow::make_edge(fnode1, qnode1);
271 REMARK("Testing queueing function_node:");
272 ASSERT(fnode1.my_queue, "node should have no queue");
273 ASSERT(!fnode1.my_successors.empty(), "successor edge not added");
274 REMARK(" add_pred");
275 ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor");
276 ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor");
277 REMARK(" reset");
278 g.wait_for_all();
279 g.reset(); // should reverse the edge from the input to the function node.
280 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
281 ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed");
282 tbb::flow::remove_edge(qnode0, fnode1);
283 tbb::flow::remove_edge(fnode1, qnode1);
284 REMARK("\n");
285
286 serial_fn_state0 = 0; // make the function_node wait
287 tbb::flow::make_edge(qnode0, fnode0);
288 REMARK(" start_func");
289 qnode0.try_put(1);
290 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
291 // now if we put an item to the queues the edges to the function_node will reverse.
292 REMARK(" put_node(2)");
293 qnode0.try_put(2); // start queue node.
294 // wait for the edges to reverse
295 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
296 ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed");
297 g.my_root_task->cancel_group_execution();
298 // release the function_node
299 serial_fn_state0 = 2;
300 g.wait_for_all();
301 ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed");
302 g.reset(tbb::flow::rf_clear_edges);
303 ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed");
304 ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed");
305 REMARK(" done\n");
306}
307
308template<typename TT>
309class tag_func {
310 TT my_mult;
311public:
312 tag_func(TT multiplier) : my_mult(multiplier) { }
313 void operator=( const tag_func& other){my_mult = other.my_mult;}
314 // operator() will return [0 .. Count)
315 tbb::flow::tag_value operator()( TT v) {
316 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
317 return t;
318 }
319};
320
321template<typename JNODE_TYPE>
322void
323TestSimpleSuccessorArc(const char *name) {
324 tbb::flow::graph g;
325 {
326 REMARK("Join<%s> successor test ", name);
327 tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g);
328 tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g);
329 tbb::flow::make_edge(qj, bnode);
330 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
331 g.reset();
332 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
333 g.reset(tbb::flow::rf_clear_edges);
334 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
335 }
336}
337
338template<>
339void
340TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
341 tbb::flow::graph g;
342 {
343 REMARK("Join<%s> successor test ", name);
344 typedef tbb::flow::tuple<int,int> my_tuple;
345 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
346 tag_func<int>(1),
347 tag_func<int>(1)
348 );
349 tbb::flow::broadcast_node<my_tuple > bnode(g);
350 tbb::flow::make_edge(qj, bnode);
351 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
352 g.reset();
353 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
354 g.reset(tbb::flow::rf_clear_edges);
355 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
356 }
357}
358
359void
360TestJoinNode() {
361 tbb::flow::graph g;
362
363 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
364 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
365 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
366
367 // queueing and tagging join nodes have input queues, so the input ports do not reverse.
368 REMARK(" reserving preds");
369 {
370 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g);
371 tbb::flow::queue_node<int> q0(g);
372 tbb::flow::queue_node<int> q1(g);
373 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
374 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
375 q0.try_put(1);
376 g.wait_for_all(); // quiesce
377 ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor");
378 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred");
379 g.reset();
380 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
381 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
382 q1.try_put(2);
383 g.wait_for_all(); // quiesce
384 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
385 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
386 g.reset();
387 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
388 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
389 // should reset predecessors just as regular reset.
390 q1.try_put(3);
391 g.wait_for_all(); // quiesce
392 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
393 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
394 g.reset(tbb::flow::rf_clear_edges);
395 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
396 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
397 ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
398 ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
399 }
400 REMARK(" done\n");
401}
402
403void
404TestLimiterNode() {
405 int out_int;
406 tbb::flow::graph g;
407 tbb::flow::limiter_node<int> ln(g,1);
408 REMARK("Testing limiter_node: preds and succs");
409 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
410 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
411 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
412#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
413 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
414#endif
415 ASSERT(ln.my_threshold == 1, "error in my_threshold");
416 tbb::flow::queue_node<int> inq(g);
417 tbb::flow::queue_node<int> outq(g);
418 tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g);
419
420 tbb::flow::make_edge(inq,ln);
421 tbb::flow::make_edge(ln,outq);
422 tbb::flow::make_edge(bn,ln.decrement);
423
424 g.wait_for_all();
425 ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge");
426 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
427 inq.try_put(1);
428 g.wait_for_all();
429 ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value");
430 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
431 inq.try_put(2);
432 g.wait_for_all();
433 ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input");
434 ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed");
435 bn.try_put(tbb::flow::continue_msg());
436 g.wait_for_all();
437 ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value");
438 g.wait_for_all();
439 ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())");
440 g.reset();
441 ASSERT(ln.my_predecessors.empty(), "input edge not reset");
442 inq.try_put(3);
443 g.wait_for_all();
444 ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value");
445
446 REMARK(" rf_clear_edges");
447 // currently the limiter_node will not pass another message
448 g.reset(tbb::flow::rf_clear_edges);
449 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
450 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
451 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
452#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
453 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
454#endif
455 ASSERT(ln.my_threshold == 1, "error in my_threshold");
456 ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)");
457 ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)");
458 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
459 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
460 ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)");
461 tbb::flow::make_edge(inq,ln);
462 tbb::flow::make_edge(ln,outq);
463 inq.try_put(4);
464 inq.try_put(5);
465 g.wait_for_all();
466 ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)");
467 ASSERT(out_int == 4, "input incorrect (4)");
468 bn.try_put(tbb::flow::continue_msg());
469 g.wait_for_all();
470 ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)");
471 REMARK(" done\n");
472}
473
474template<typename MF_TYPE>
475struct mf_body {
476 tbb::atomic<int> *_flag;
477 mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
478 void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
479 if(*_flag == 0) {
480 *_flag = 1;
481 BACKOFF_WAIT(*_flag == 1, "multifunction_node not released");
482 }
483
484 if(in & 0x1) tbb::flow::get<1>(outports).try_put(in);
485 else tbb::flow::get<0>(outports).try_put(in);
486 }
487};
488
489template<typename P, typename T>
490struct test_reversal;
491template<typename T>
492struct test_reversal<tbb::flow::queueing, T> {
493 test_reversal() { REMARK("<queueing>"); }
494 // queueing node will not reverse.
495 bool operator()( T &node) { return node.my_predecessors.empty(); }
496};
497
498template<typename T>
499struct test_reversal<tbb::flow::rejecting, T> {
500 test_reversal() { REMARK("<rejecting>"); }
501 bool operator()( T &node) { return !node.my_predecessors.empty(); }
502};
503
504template<typename P>
505void
506TestMultifunctionNode() {
507 typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type;
508 REMARK("Testing multifunction_node");
509 test_reversal<P,multinode_type> my_test;
510 REMARK(":");
511 tbb::flow::graph g;
512 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
513 tbb::flow::queue_node<int> qin(g);
514 tbb::flow::queue_node<int> qodd_out(g);
515 tbb::flow::queue_node<int> qeven_out(g);
516 tbb::flow::make_edge(qin,mf);
517 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
518 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
519 g.wait_for_all();
520 for( int ii = 0; ii < 2 ; ++ii) {
521 serial_fn_state0 = 0;
522 if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");
523 qin.try_put(0);
524 // wait for node to be active
525 BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
526 qin.try_put(1);
527 BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
528 ASSERT(my_test(mf), "fail second put test");
529 g.my_root_task->cancel_group_execution();
530 // release node
531 serial_fn_state0 = 2;
532 g.wait_for_all();
533 ASSERT(my_test(mf), "fail cancel group test");
534 if( ii == 1) {
535 REMARK(" rf_clear_edges");
536 g.reset(tbb::flow::rf_clear_edges);
537 ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)");
538 ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)");
539 }
540 else
541 {
542 g.reset();
543 }
544 ASSERT(mf.my_predecessors.empty(), "edge didn't reset");
545 ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset");
546 }
547 REMARK(" done\n");
548}
549
550// indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
551// never allows a successor to reverse its edge, so we only need test the successors.
552void
553TestIndexerNode() {
554 tbb::flow::graph g;
555 typedef tbb::flow::indexer_node< int, int > indexernode_type;
556 indexernode_type inode(g);
557 REMARK("Testing indexer_node:");
558 tbb::flow::queue_node<indexernode_type::output_type> qout(g);
559 tbb::flow::make_edge(inode,qout);
560 g.wait_for_all();
561 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing");
562 g.reset();
563 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset");
564 g.reset(tbb::flow::rf_clear_edges);
565 ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)");
566 REMARK(" done\n");
567}
568
569template<typename Node>
570void
571TestScalarNode(const char *name) {
572 tbb::flow::graph g;
573 Node on(g);
574 tbb::flow::queue_node<int> qout(g);
575 REMARK("Testing %s:", name);
576 tbb::flow::make_edge(on,qout);
577 g.wait_for_all();
578 ASSERT(!on.my_successors.empty(), "edge not added");
579 g.reset();
580 ASSERT(!on.my_successors.empty(), "edge improperly removed");
581 g.reset(tbb::flow::rf_clear_edges);
582 ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
583 REMARK(" done\n");
584}
585
586struct seq_body {
587 size_t operator()(const int &in) {
588 return size_t(in / 3);
589 }
590};
591
592// sequencer_node behaves like a queueing node, but requires a different constructor.
593void
594TestSequencerNode() {
595 tbb::flow::graph g;
596 tbb::flow::sequencer_node<int> bnode(g, seq_body());
597 REMARK("Testing sequencer_node:");
598 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
599 REMARK("Testing sequencer_node:");
600 serial_fn_state0 = 0; // reset to waiting state.
601 REMARK(" make_edge");
602 tbb::flow::make_edge(bnode, fnode);
603 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
604 REMARK(" try_put");
605 bnode.try_put(0); // will forward to the fnode
606 BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node"); // wait for the function_node to fire up
607 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
608 serial_fn_state0 = 0;
609 g.wait_for_all();
610 REMARK(" remove_edge");
611 tbb::flow::remove_edge(bnode, fnode);
612 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
613 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
614 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
615 g.wait_for_all();
616 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
617 REMARK(" reverse");
618 bnode.try_put(3); // the edge should reverse
619 g.wait_for_all();
620 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
621 REMARK(" reset()");
622 g.wait_for_all();
623 g.reset(); // should be in forward direction again
624 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
625 REMARK(" remove_edge");
626 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
627 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
628 ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)");
629 REMARK(" done\n");
630 g.wait_for_all();
631}
632
633struct snode_body {
634 int max_cnt;
635 int my_cnt;
636 snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
637 bool operator()(int &out) {
638 if(max_cnt <= my_cnt++) return false;
639 out = my_cnt;
640 return true;
641 }
642};
643
644void
645TestSourceNode() {
646 tbb::flow::graph g;
647 tbb::flow::source_node<int> sn(g, snode_body(4), false);
648 REMARK("Testing source_node:");
649 tbb::flow::queue_node<int> qin(g);
650 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g);
651 tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g);
652
653 REMARK(" make_edges");
654 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
655 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
656 tbb::flow::make_edge(jn,qout);
657 ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge");
658 g.wait_for_all();
659 g.reset();
660 ASSERT(!sn.my_successors.empty(), "source node has no successor after reset");
661 g.wait_for_all();
662 g.reset(tbb::flow::rf_clear_edges);
663 ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)");
664 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
665 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
666 tbb::flow::make_edge(jn,qout);
667 g.wait_for_all();
668 REMARK(" activate");
669 sn.activate(); // will forward to the fnode
670 REMARK(" wait1");
671 BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse");
672 ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message");
673
674 g.wait_for_all();
675 g.reset();
676 ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset");
677 ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset.");
678 REMARK(" done\n");
679}
680
681int TestMain() {
682
683 if(MinThread < 3) MinThread = 3;
684 tbb::task_scheduler_init init(MinThread); // tests presume at least three threads
685
686 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
687 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
688 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
689 TestSequencerNode();
690
691 TestMultifunctionNode<tbb::flow::rejecting>();
692 TestMultifunctionNode<tbb::flow::queueing>();
693 TestSourceNode();
694 TestContinueNode();
695 TestFunctionNode();
696
697 TestJoinNode();
698
699 TestLimiterNode();
700 TestIndexerNode();
701 TestSplitNode();
702 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
703 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
704 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
705
706 return Harness::Done;
707}
708
709