1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #define HARNESS_DEFAULT_MIN_THREADS 3 |
18 | #define HARNESS_DEFAULT_MAX_THREADS 4 |
19 | |
20 | #if _MSC_VER |
21 | #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning |
22 | #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) |
23 | // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) |
24 | #pragma warning (disable: 4702) |
25 | #endif |
26 | #endif |
27 | |
28 | #include "harness.h" |
29 | #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) |
30 | |
31 | // need these to get proper external names for private methods in library. |
32 | #include "tbb/spin_mutex.h" |
33 | #include "tbb/spin_rw_mutex.h" |
34 | #include "tbb/task.h" |
35 | #include "tbb/task_arena.h" |
36 | |
37 | #define private public |
38 | #define protected public |
39 | #include "tbb/flow_graph.h" |
40 | #undef protected |
41 | #undef private |
42 | #include "tbb/task_scheduler_init.h" |
43 | #include "harness_graph.h" |
44 | |
45 | template<typename T> |
46 | struct receiverBody { |
47 | tbb::flow::continue_msg operator()(const T &/*in*/) { |
48 | return tbb::flow::continue_msg(); |
49 | } |
50 | }; |
51 | |
52 | // split_nodes cannot have predecessors |
53 | // they do not reject messages and always forward. |
54 | // they reject edge reversals from successors. |
55 | void TestSplitNode() { |
56 | typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type; |
57 | tbb::flow::graph g; |
58 | snode_type snode(g); |
59 | tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); |
60 | REMARK("Testing split_node\n" ); |
61 | ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors" ); |
62 | // tbb::flow::output_port<0>(snode) |
63 | tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); |
64 | ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor." ); |
65 | snode.try_put(tbb::flow::tuple<int>(1)); |
66 | g.wait_for_all(); |
67 | g.reset(); |
68 | ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor." ); |
69 | g.reset(tbb::flow::rf_clear_edges); |
70 | ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor." ); |
71 | } |
72 | |
73 | // buffering nodes cannot have predecessors |
74 | // they do not reject messages and always save or forward |
75 | // they allow edge reversals from successors |
76 | template< typename B > |
77 | void TestBufferingNode(const char * name) { |
78 | tbb::flow::graph g; |
79 | B bnode(g); |
80 | tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
81 | REMARK("Testing %s:" , name); |
82 | for(int icnt = 0; icnt < 2; icnt++) { |
83 | bool reverse_edge = (icnt & 0x2) != 0; |
84 | serial_fn_state0 = 0; // reset to waiting state. |
85 | REMARK(" make_edge" ); |
86 | tbb::flow::make_edge(bnode, fnode); |
87 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge" ); |
88 | REMARK(" try_put" ); |
89 | bnode.try_put(1); // will forward to the fnode |
90 | BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put" ); |
91 | if(reverse_edge) { |
92 | REMARK(" try_put2" ); |
93 | bnode.try_put(2); // will reverse the edge |
94 | // cannot do a wait_for_all here; the function_node is still executing |
95 | BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put" ); |
96 | // at this point the only task running is the one for the function_node. |
97 | ASSERT(bnode.my_successors.empty(), "successor not removed" ); |
98 | } |
99 | else { |
100 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message" ); |
101 | } |
102 | serial_fn_state0 = 0; // release the function_node. |
103 | if(reverse_edge) { |
104 | // have to do a second release because the function_node will get the 2nd item |
105 | BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put" ); |
106 | serial_fn_state0 = 0; // release the function_node. |
107 | } |
108 | g.wait_for_all(); |
109 | REMARK(" remove_edge" ); |
110 | tbb::flow::remove_edge(bnode, fnode); |
111 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge" ); |
112 | } |
113 | tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g); |
114 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task |
115 | g.wait_for_all(); |
116 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join" ); |
117 | REMARK(" reverse" ); |
118 | bnode.try_put(1); // the edge should reverse |
119 | g.wait_for_all(); |
120 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
121 | REMARK(" reset()" ); |
122 | g.wait_for_all(); |
123 | g.reset(); // should be in forward direction again |
124 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
125 | REMARK(" remove_edge" ); |
126 | g.reset(tbb::flow::rf_clear_edges); |
127 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
128 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again |
129 | // reverse edge by adding to buffer. |
130 | bnode.try_put(1); // the edge should reverse |
131 | g.wait_for_all(); |
132 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
133 | REMARK(" remove_edge(reversed)" ); |
134 | g.reset(tbb::flow::rf_clear_edges); |
135 | ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
136 | ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset" ); |
137 | REMARK(" done\n" ); |
138 | g.wait_for_all(); |
139 | } |
140 | |
141 | // continue_node has only predecessor count |
142 | // they do not have predecessors, only the counts |
143 | // successor edges cannot be reversed |
144 | void TestContinueNode() { |
145 | tbb::flow::graph g; |
146 | tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
147 | tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0)); |
148 | tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); |
149 | tbb::flow::make_edge(fnode0, cnode); |
150 | tbb::flow::make_edge(cnode, fnode1); |
151 | REMARK("Testing continue_node:" ); |
152 | for( int icnt = 0; icnt < 2; ++icnt ) { |
153 | REMARK( " initial%d" , icnt); |
154 | ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count" ); |
155 | ASSERT(!cnode.successors().empty(), "successors empty though we added one" ); |
156 | ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect" ); |
157 | serial_continue_state0 = 0; |
158 | serial_fn_state0 = 0; |
159 | serial_fn_state1 = 0; |
160 | |
161 | fnode0.try_put(1); // start the first function node. |
162 | BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start" ); |
163 | // Now the body of function_node 0 is executing. |
164 | serial_fn_state0 = 0; // release the node |
165 | // wait for node to count the message (or for the node body to execute, which would be wrong) |
166 | BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change" ); |
167 | ASSERT(serial_continue_state0 == 0, "Improperly released continue_node" ); |
168 | ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect" ); |
169 | if(icnt == 0) { // first time through, let the continue_node fire |
170 | REMARK(" firing" ); |
171 | fnode0.try_put(1); // second message |
172 | BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute" ); |
173 | // Now the body of function_node 0 is executing. |
174 | serial_fn_state0 = 0; // release the node |
175 | |
176 | BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start" ); // now we wait for the continue_node. |
177 | ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started" ); |
178 | serial_continue_state0 = 0; // release the continue_node |
179 | BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start" ); // wait for the successor function_node to enter body |
180 | serial_fn_state1 = 0; // release successor function_node. |
181 | g.wait_for_all(); |
182 | |
183 | // try a try_get() |
184 | { |
185 | int i; |
186 | ASSERT(!cnode.try_get(i), "try_get not rejected" ); |
187 | } |
188 | |
189 | REMARK(" reset" ); |
190 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)" ); |
191 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)" ); |
192 | g.reset(); // should still be the same |
193 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" ); |
194 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)" ); |
195 | } |
196 | else { // we're going to see if the rf_clear_edges resets things. |
197 | g.wait_for_all(); |
198 | REMARK(" reset(rf_clear_edges)" ); |
199 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)" ); |
200 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)" ); |
201 | g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again |
202 | ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)" ); |
203 | ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
204 | ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset" ); |
205 | } |
206 | } |
207 | |
208 | REMARK(" done\n" ); |
209 | |
210 | } |
211 | |
212 | // function_node has predecessors and successors |
213 | // try_get() rejects |
214 | // successor edges cannot be reversed |
215 | // predecessors will reverse (only rejecting will reverse) |
216 | void TestFunctionNode() { |
217 | tbb::flow::graph g; |
218 | tbb::flow::queue_node<int> qnode0(g); |
219 | tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
220 | // queueing function node |
221 | tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
222 | |
223 | tbb::flow::queue_node<int> qnode1(g); |
224 | |
225 | tbb::flow::make_edge(fnode0, qnode1); |
226 | tbb::flow::make_edge(qnode0, fnode0); |
227 | |
228 | serial_fn_state0 = 2; // just let it go |
229 | // see if the darned thing will work.... |
230 | qnode0.try_put(1); |
231 | g.wait_for_all(); |
232 | int ii; |
233 | ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed" ); |
234 | tbb::flow::remove_edge(qnode0, fnode0); |
235 | tbb::flow::remove_edge(fnode0, qnode1); |
236 | |
237 | tbb::flow::make_edge(fnode1, qnode1); |
238 | tbb::flow::make_edge(qnode0, fnode1); |
239 | |
240 | serial_fn_state0 = 2; // just let it go |
241 | // see if the darned thing will work.... |
242 | qnode0.try_put(1); |
243 | g.wait_for_all(); |
244 | ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed" ); |
245 | tbb::flow::remove_edge(qnode0, fnode1); |
246 | tbb::flow::remove_edge(fnode1, qnode1); |
247 | |
248 | // rejecting |
249 | serial_fn_state0 = 0; |
250 | tbb::flow::make_edge(fnode0, qnode1); |
251 | tbb::flow::make_edge(qnode0, fnode0); |
252 | REMARK("Testing rejecting function_node:" ); |
253 | ASSERT(!fnode0.my_queue, "node should have no queue" ); |
254 | ASSERT(!fnode0.my_successors.empty(), "successor edge not added" ); |
255 | qnode0.try_put(1); |
256 | BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start" ); |
257 | qnode0.try_put(2); // rejecting node should reject, reverse. |
258 | BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---" ); |
259 | serial_fn_state0 = 2; // release function_node body. |
260 | g.wait_for_all(); |
261 | REMARK(" reset" ); |
262 | g.reset(); // should reverse the edge from the input to the function node. |
263 | ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()" ); |
264 | ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed" ); |
265 | tbb::flow::remove_edge(qnode0, fnode0); |
266 | tbb::flow::remove_edge(fnode0, qnode1); |
267 | REMARK("\n" ); |
268 | |
269 | // queueing |
270 | tbb::flow::make_edge(fnode1, qnode1); |
271 | REMARK("Testing queueing function_node:" ); |
272 | ASSERT(fnode1.my_queue, "node should have no queue" ); |
273 | ASSERT(!fnode1.my_successors.empty(), "successor edge not added" ); |
274 | REMARK(" add_pred" ); |
275 | ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor" ); |
276 | ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor" ); |
277 | REMARK(" reset" ); |
278 | g.wait_for_all(); |
279 | g.reset(); // should reverse the edge from the input to the function node. |
280 | ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()" ); |
281 | ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed" ); |
282 | tbb::flow::remove_edge(qnode0, fnode1); |
283 | tbb::flow::remove_edge(fnode1, qnode1); |
284 | REMARK("\n" ); |
285 | |
286 | serial_fn_state0 = 0; // make the function_node wait |
287 | tbb::flow::make_edge(qnode0, fnode0); |
288 | REMARK(" start_func" ); |
289 | qnode0.try_put(1); |
290 | BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put" ); |
291 | // now if we put an item to the queues the edges to the function_node will reverse. |
292 | REMARK(" put_node(2)" ); |
293 | qnode0.try_put(2); // start queue node. |
294 | // wait for the edges to reverse |
295 | BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting" ); |
296 | ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed" ); |
297 | g.my_root_task->cancel_group_execution(); |
298 | // release the function_node |
299 | serial_fn_state0 = 2; |
300 | g.wait_for_all(); |
301 | ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed" ); |
302 | g.reset(tbb::flow::rf_clear_edges); |
303 | ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed" ); |
304 | ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed" ); |
305 | REMARK(" done\n" ); |
306 | } |
307 | |
308 | template<typename TT> |
309 | class tag_func { |
310 | TT my_mult; |
311 | public: |
312 | tag_func(TT multiplier) : my_mult(multiplier) { } |
313 | void operator=( const tag_func& other){my_mult = other.my_mult;} |
314 | // operator() will return [0 .. Count) |
315 | tbb::flow::tag_value operator()( TT v) { |
316 | tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); |
317 | return t; |
318 | } |
319 | }; |
320 | |
321 | template<typename JNODE_TYPE> |
322 | void |
323 | TestSimpleSuccessorArc(const char *name) { |
324 | tbb::flow::graph g; |
325 | { |
326 | REMARK("Join<%s> successor test " , name); |
327 | tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g); |
328 | tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g); |
329 | tbb::flow::make_edge(qj, bnode); |
330 | ASSERT(!qj.my_successors.empty(),"successor missing after linking" ); |
331 | g.reset(); |
332 | ASSERT(!qj.my_successors.empty(),"successor missing after reset()" ); |
333 | g.reset(tbb::flow::rf_clear_edges); |
334 | ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)" ); |
335 | } |
336 | } |
337 | |
338 | template<> |
339 | void |
340 | TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { |
341 | tbb::flow::graph g; |
342 | { |
343 | REMARK("Join<%s> successor test " , name); |
344 | typedef tbb::flow::tuple<int,int> my_tuple; |
345 | tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, |
346 | tag_func<int>(1), |
347 | tag_func<int>(1) |
348 | ); |
349 | tbb::flow::broadcast_node<my_tuple > bnode(g); |
350 | tbb::flow::make_edge(qj, bnode); |
351 | ASSERT(!qj.my_successors.empty(),"successor missing after linking" ); |
352 | g.reset(); |
353 | ASSERT(!qj.my_successors.empty(),"successor missing after reset()" ); |
354 | g.reset(tbb::flow::rf_clear_edges); |
355 | ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)" ); |
356 | } |
357 | } |
358 | |
359 | void |
360 | TestJoinNode() { |
361 | tbb::flow::graph g; |
362 | |
363 | TestSimpleSuccessorArc<tbb::flow::queueing>("queueing" ); |
364 | TestSimpleSuccessorArc<tbb::flow::reserving>("reserving" ); |
365 | TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching" ); |
366 | |
367 | // queueing and tagging join nodes have input queues, so the input ports do not reverse. |
368 | REMARK(" reserving preds" ); |
369 | { |
370 | tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g); |
371 | tbb::flow::queue_node<int> q0(g); |
372 | tbb::flow::queue_node<int> q1(g); |
373 | tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); |
374 | tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); |
375 | q0.try_put(1); |
376 | g.wait_for_all(); // quiesce |
377 | ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
378 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
379 | g.reset(); |
380 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
381 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
382 | q1.try_put(2); |
383 | g.wait_for_all(); // quiesce |
384 | ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
385 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
386 | g.reset(); |
387 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
388 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
389 | // should reset predecessors just as regular reset. |
390 | q1.try_put(3); |
391 | g.wait_for_all(); // quiesce |
392 | ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
393 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
394 | g.reset(tbb::flow::rf_clear_edges); |
395 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
396 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
397 | ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
398 | ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
399 | } |
400 | REMARK(" done\n" ); |
401 | } |
402 | |
403 | void |
404 | TestLimiterNode() { |
405 | int out_int; |
406 | tbb::flow::graph g; |
407 | tbb::flow::limiter_node<int> ln(g,1); |
408 | REMARK("Testing limiter_node: preds and succs" ); |
409 | ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count" ); |
410 | ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count" ); |
411 | ASSERT(ln.decrement.my_current_count == 0, "error in current count" ); |
412 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR |
413 | ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors" ); |
414 | #endif |
415 | ASSERT(ln.my_threshold == 1, "error in my_threshold" ); |
416 | tbb::flow::queue_node<int> inq(g); |
417 | tbb::flow::queue_node<int> outq(g); |
418 | tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g); |
419 | |
420 | tbb::flow::make_edge(inq,ln); |
421 | tbb::flow::make_edge(ln,outq); |
422 | tbb::flow::make_edge(bn,ln.decrement); |
423 | |
424 | g.wait_for_all(); |
425 | ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge" ); |
426 | ASSERT(ln.my_predecessors.empty(), "input edge reversed" ); |
427 | inq.try_put(1); |
428 | g.wait_for_all(); |
429 | ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value" ); |
430 | ASSERT(ln.my_predecessors.empty(), "input edge reversed" ); |
431 | inq.try_put(2); |
432 | g.wait_for_all(); |
433 | ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input" ); |
434 | ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed" ); |
435 | bn.try_put(tbb::flow::continue_msg()); |
436 | g.wait_for_all(); |
437 | ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value" ); |
438 | g.wait_for_all(); |
439 | ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())" ); |
440 | g.reset(); |
441 | ASSERT(ln.my_predecessors.empty(), "input edge not reset" ); |
442 | inq.try_put(3); |
443 | g.wait_for_all(); |
444 | ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value" ); |
445 | |
446 | REMARK(" rf_clear_edges" ); |
447 | // currently the limiter_node will not pass another message |
448 | g.reset(tbb::flow::rf_clear_edges); |
449 | ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count" ); |
450 | ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count" ); |
451 | ASSERT(ln.decrement.my_current_count == 0, "error in current count" ); |
452 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR |
453 | ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors" ); |
454 | #endif |
455 | ASSERT(ln.my_threshold == 1, "error in my_threshold" ); |
456 | ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)" ); |
457 | ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)" ); |
458 | ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)" ); |
459 | ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)" ); |
460 | ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)" ); |
461 | tbb::flow::make_edge(inq,ln); |
462 | tbb::flow::make_edge(ln,outq); |
463 | inq.try_put(4); |
464 | inq.try_put(5); |
465 | g.wait_for_all(); |
466 | ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)" ); |
467 | ASSERT(out_int == 4, "input incorrect (4)" ); |
468 | bn.try_put(tbb::flow::continue_msg()); |
469 | g.wait_for_all(); |
470 | ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)" ); |
471 | REMARK(" done\n" ); |
472 | } |
473 | |
474 | template<typename MF_TYPE> |
475 | struct mf_body { |
476 | tbb::atomic<int> *_flag; |
477 | mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { } |
478 | void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) { |
479 | if(*_flag == 0) { |
480 | *_flag = 1; |
481 | BACKOFF_WAIT(*_flag == 1, "multifunction_node not released" ); |
482 | } |
483 | |
484 | if(in & 0x1) tbb::flow::get<1>(outports).try_put(in); |
485 | else tbb::flow::get<0>(outports).try_put(in); |
486 | } |
487 | }; |
488 | |
489 | template<typename P, typename T> |
490 | struct test_reversal; |
491 | template<typename T> |
492 | struct test_reversal<tbb::flow::queueing, T> { |
493 | test_reversal() { REMARK("<queueing>" ); } |
494 | // queueing node will not reverse. |
495 | bool operator()( T &node) { return node.my_predecessors.empty(); } |
496 | }; |
497 | |
498 | template<typename T> |
499 | struct test_reversal<tbb::flow::rejecting, T> { |
500 | test_reversal() { REMARK("<rejecting>" ); } |
501 | bool operator()( T &node) { return !node.my_predecessors.empty(); } |
502 | }; |
503 | |
504 | template<typename P> |
505 | void |
506 | TestMultifunctionNode() { |
507 | typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type; |
508 | REMARK("Testing multifunction_node" ); |
509 | test_reversal<P,multinode_type> my_test; |
510 | REMARK(":" ); |
511 | tbb::flow::graph g; |
512 | multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); |
513 | tbb::flow::queue_node<int> qin(g); |
514 | tbb::flow::queue_node<int> qodd_out(g); |
515 | tbb::flow::queue_node<int> qeven_out(g); |
516 | tbb::flow::make_edge(qin,mf); |
517 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); |
518 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); |
519 | g.wait_for_all(); |
520 | for( int ii = 0; ii < 2 ; ++ii) { |
521 | serial_fn_state0 = 0; |
522 | if(ii == 0) REMARK(" reset preds" ); else REMARK(" 2nd" ); |
523 | qin.try_put(0); |
524 | // wait for node to be active |
525 | BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put" ); |
526 | qin.try_put(1); |
527 | BACKOFF_WAIT((!my_test(mf)), "Timed out waiting" ); |
528 | ASSERT(my_test(mf), "fail second put test" ); |
529 | g.my_root_task->cancel_group_execution(); |
530 | // release node |
531 | serial_fn_state0 = 2; |
532 | g.wait_for_all(); |
533 | ASSERT(my_test(mf), "fail cancel group test" ); |
534 | if( ii == 1) { |
535 | REMARK(" rf_clear_edges" ); |
536 | g.reset(tbb::flow::rf_clear_edges); |
537 | ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)" ); |
538 | ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)" ); |
539 | } |
540 | else |
541 | { |
542 | g.reset(); |
543 | } |
544 | ASSERT(mf.my_predecessors.empty(), "edge didn't reset" ); |
545 | ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset" ); |
546 | } |
547 | REMARK(" done\n" ); |
548 | } |
549 | |
550 | // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it |
551 | // never allows a successor to reverse its edge, so we only need test the successors. |
552 | void |
553 | TestIndexerNode() { |
554 | tbb::flow::graph g; |
555 | typedef tbb::flow::indexer_node< int, int > indexernode_type; |
556 | indexernode_type inode(g); |
557 | REMARK("Testing indexer_node:" ); |
558 | tbb::flow::queue_node<indexernode_type::output_type> qout(g); |
559 | tbb::flow::make_edge(inode,qout); |
560 | g.wait_for_all(); |
561 | ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing" ); |
562 | g.reset(); |
563 | ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset" ); |
564 | g.reset(tbb::flow::rf_clear_edges); |
565 | ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)" ); |
566 | REMARK(" done\n" ); |
567 | } |
568 | |
569 | template<typename Node> |
570 | void |
571 | TestScalarNode(const char *name) { |
572 | tbb::flow::graph g; |
573 | Node on(g); |
574 | tbb::flow::queue_node<int> qout(g); |
575 | REMARK("Testing %s:" , name); |
576 | tbb::flow::make_edge(on,qout); |
577 | g.wait_for_all(); |
578 | ASSERT(!on.my_successors.empty(), "edge not added" ); |
579 | g.reset(); |
580 | ASSERT(!on.my_successors.empty(), "edge improperly removed" ); |
581 | g.reset(tbb::flow::rf_clear_edges); |
582 | ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
583 | REMARK(" done\n" ); |
584 | } |
585 | |
586 | struct seq_body { |
587 | size_t operator()(const int &in) { |
588 | return size_t(in / 3); |
589 | } |
590 | }; |
591 | |
592 | // sequencer_node behaves like a queueing node, but requires a different constructor. |
593 | void |
594 | TestSequencerNode() { |
595 | tbb::flow::graph g; |
596 | tbb::flow::sequencer_node<int> bnode(g, seq_body()); |
597 | REMARK("Testing sequencer_node:" ); |
598 | tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
599 | REMARK("Testing sequencer_node:" ); |
600 | serial_fn_state0 = 0; // reset to waiting state. |
601 | REMARK(" make_edge" ); |
602 | tbb::flow::make_edge(bnode, fnode); |
603 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge" ); |
604 | REMARK(" try_put" ); |
605 | bnode.try_put(0); // will forward to the fnode |
606 | BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node" ); // wait for the function_node to fire up |
607 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message" ); |
608 | serial_fn_state0 = 0; |
609 | g.wait_for_all(); |
610 | REMARK(" remove_edge" ); |
611 | tbb::flow::remove_edge(bnode, fnode); |
612 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge" ); |
613 | tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g); |
614 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task |
615 | g.wait_for_all(); |
616 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join" ); |
617 | REMARK(" reverse" ); |
618 | bnode.try_put(3); // the edge should reverse |
619 | g.wait_for_all(); |
620 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
621 | REMARK(" reset()" ); |
622 | g.wait_for_all(); |
623 | g.reset(); // should be in forward direction again |
624 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
625 | REMARK(" remove_edge" ); |
626 | g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again |
627 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
628 | ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)" ); |
629 | REMARK(" done\n" ); |
630 | g.wait_for_all(); |
631 | } |
632 | |
633 | struct snode_body { |
634 | int max_cnt; |
635 | int my_cnt; |
636 | snode_body( const int &in) : max_cnt(in) { my_cnt = 0; } |
637 | bool operator()(int &out) { |
638 | if(max_cnt <= my_cnt++) return false; |
639 | out = my_cnt; |
640 | return true; |
641 | } |
642 | }; |
643 | |
644 | void |
645 | TestSourceNode() { |
646 | tbb::flow::graph g; |
647 | tbb::flow::source_node<int> sn(g, snode_body(4), false); |
648 | REMARK("Testing source_node:" ); |
649 | tbb::flow::queue_node<int> qin(g); |
650 | tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g); |
651 | tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g); |
652 | |
653 | REMARK(" make_edges" ); |
654 | tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn)); |
655 | tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); |
656 | tbb::flow::make_edge(jn,qout); |
657 | ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge" ); |
658 | g.wait_for_all(); |
659 | g.reset(); |
660 | ASSERT(!sn.my_successors.empty(), "source node has no successor after reset" ); |
661 | g.wait_for_all(); |
662 | g.reset(tbb::flow::rf_clear_edges); |
663 | ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)" ); |
664 | tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn)); |
665 | tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); |
666 | tbb::flow::make_edge(jn,qout); |
667 | g.wait_for_all(); |
668 | REMARK(" activate" ); |
669 | sn.activate(); // will forward to the fnode |
670 | REMARK(" wait1" ); |
671 | BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse" ); |
672 | ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message" ); |
673 | |
674 | g.wait_for_all(); |
675 | g.reset(); |
676 | ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset" ); |
677 | ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset." ); |
678 | REMARK(" done\n" ); |
679 | } |
680 | |
681 | int TestMain() { |
682 | |
683 | if(MinThread < 3) MinThread = 3; |
684 | tbb::task_scheduler_init init(MinThread); // tests presume at least three threads |
685 | |
686 | TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node" ); |
687 | TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node" ); |
688 | TestBufferingNode< tbb::flow::queue_node<int> >("queue_node" ); |
689 | TestSequencerNode(); |
690 | |
691 | TestMultifunctionNode<tbb::flow::rejecting>(); |
692 | TestMultifunctionNode<tbb::flow::queueing>(); |
693 | TestSourceNode(); |
694 | TestContinueNode(); |
695 | TestFunctionNode(); |
696 | |
697 | TestJoinNode(); |
698 | |
699 | TestLimiterNode(); |
700 | TestIndexerNode(); |
701 | TestSplitNode(); |
702 | TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node" ); |
703 | TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node" ); |
704 | TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node" ); |
705 | |
706 | return Harness::Done; |
707 | } |
708 | |
709 | |