| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #define HARNESS_DEFAULT_MIN_THREADS 3 |
| 18 | #define HARNESS_DEFAULT_MAX_THREADS 4 |
| 19 | |
| 20 | #if _MSC_VER |
| 21 | #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning |
| 22 | #if _MSC_VER==1700 && !defined(__INTEL_COMPILER) |
| 23 | // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012) |
| 24 | #pragma warning (disable: 4702) |
| 25 | #endif |
| 26 | #endif |
| 27 | |
| 28 | #include "harness.h" |
| 29 | #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations) |
| 30 | |
| 31 | // need these to get proper external names for private methods in library. |
| 32 | #include "tbb/spin_mutex.h" |
| 33 | #include "tbb/spin_rw_mutex.h" |
| 34 | #include "tbb/task.h" |
| 35 | #include "tbb/task_arena.h" |
| 36 | |
| 37 | #define private public |
| 38 | #define protected public |
| 39 | #include "tbb/flow_graph.h" |
| 40 | #undef protected |
| 41 | #undef private |
| 42 | #include "tbb/task_scheduler_init.h" |
| 43 | #include "harness_graph.h" |
| 44 | |
| 45 | template<typename T> |
| 46 | struct receiverBody { |
| 47 | tbb::flow::continue_msg operator()(const T &/*in*/) { |
| 48 | return tbb::flow::continue_msg(); |
| 49 | } |
| 50 | }; |
| 51 | |
| 52 | // split_nodes cannot have predecessors |
| 53 | // they do not reject messages and always forward. |
| 54 | // they reject edge reversals from successors. |
| 55 | void TestSplitNode() { |
| 56 | typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type; |
| 57 | tbb::flow::graph g; |
| 58 | snode_type snode(g); |
| 59 | tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>()); |
| 60 | REMARK("Testing split_node\n" ); |
| 61 | ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors" ); |
| 62 | // tbb::flow::output_port<0>(snode) |
| 63 | tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr); |
| 64 | ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor." ); |
| 65 | snode.try_put(tbb::flow::tuple<int>(1)); |
| 66 | g.wait_for_all(); |
| 67 | g.reset(); |
| 68 | ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor." ); |
| 69 | g.reset(tbb::flow::rf_clear_edges); |
| 70 | ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor." ); |
| 71 | } |
| 72 | |
| 73 | // buffering nodes cannot have predecessors |
| 74 | // they do not reject messages and always save or forward |
| 75 | // they allow edge reversals from successors |
| 76 | template< typename B > |
| 77 | void TestBufferingNode(const char * name) { |
| 78 | tbb::flow::graph g; |
| 79 | B bnode(g); |
| 80 | tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
| 81 | REMARK("Testing %s:" , name); |
| 82 | for(int icnt = 0; icnt < 2; icnt++) { |
| 83 | bool reverse_edge = (icnt & 0x2) != 0; |
| 84 | serial_fn_state0 = 0; // reset to waiting state. |
| 85 | REMARK(" make_edge" ); |
| 86 | tbb::flow::make_edge(bnode, fnode); |
| 87 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge" ); |
| 88 | REMARK(" try_put" ); |
| 89 | bnode.try_put(1); // will forward to the fnode |
| 90 | BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put" ); |
| 91 | if(reverse_edge) { |
| 92 | REMARK(" try_put2" ); |
| 93 | bnode.try_put(2); // will reverse the edge |
| 94 | // cannot do a wait_for_all here; the function_node is still executing |
| 95 | BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put" ); |
| 96 | // at this point the only task running is the one for the function_node. |
| 97 | ASSERT(bnode.my_successors.empty(), "successor not removed" ); |
| 98 | } |
| 99 | else { |
| 100 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message" ); |
| 101 | } |
| 102 | serial_fn_state0 = 0; // release the function_node. |
| 103 | if(reverse_edge) { |
| 104 | // have to do a second release because the function_node will get the 2nd item |
| 105 | BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put" ); |
| 106 | serial_fn_state0 = 0; // release the function_node. |
| 107 | } |
| 108 | g.wait_for_all(); |
| 109 | REMARK(" remove_edge" ); |
| 110 | tbb::flow::remove_edge(bnode, fnode); |
| 111 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge" ); |
| 112 | } |
| 113 | tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g); |
| 114 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task |
| 115 | g.wait_for_all(); |
| 116 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join" ); |
| 117 | REMARK(" reverse" ); |
| 118 | bnode.try_put(1); // the edge should reverse |
| 119 | g.wait_for_all(); |
| 120 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
| 121 | REMARK(" reset()" ); |
| 122 | g.wait_for_all(); |
| 123 | g.reset(); // should be in forward direction again |
| 124 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
| 125 | REMARK(" remove_edge" ); |
| 126 | g.reset(tbb::flow::rf_clear_edges); |
| 127 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
| 128 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again |
| 129 | // reverse edge by adding to buffer. |
| 130 | bnode.try_put(1); // the edge should reverse |
| 131 | g.wait_for_all(); |
| 132 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
| 133 | REMARK(" remove_edge(reversed)" ); |
| 134 | g.reset(tbb::flow::rf_clear_edges); |
| 135 | ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
| 136 | ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset" ); |
| 137 | REMARK(" done\n" ); |
| 138 | g.wait_for_all(); |
| 139 | } |
| 140 | |
| 141 | // continue_node has only predecessor count |
| 142 | // they do not have predecessors, only the counts |
| 143 | // successor edges cannot be reversed |
| 144 | void TestContinueNode() { |
| 145 | tbb::flow::graph g; |
| 146 | tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
| 147 | tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0)); |
| 148 | tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1)); |
| 149 | tbb::flow::make_edge(fnode0, cnode); |
| 150 | tbb::flow::make_edge(cnode, fnode1); |
| 151 | REMARK("Testing continue_node:" ); |
| 152 | for( int icnt = 0; icnt < 2; ++icnt ) { |
| 153 | REMARK( " initial%d" , icnt); |
| 154 | ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count" ); |
| 155 | ASSERT(!cnode.successors().empty(), "successors empty though we added one" ); |
| 156 | ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect" ); |
| 157 | serial_continue_state0 = 0; |
| 158 | serial_fn_state0 = 0; |
| 159 | serial_fn_state1 = 0; |
| 160 | |
| 161 | fnode0.try_put(1); // start the first function node. |
| 162 | BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start" ); |
| 163 | // Now the body of function_node 0 is executing. |
| 164 | serial_fn_state0 = 0; // release the node |
| 165 | // wait for node to count the message (or for the node body to execute, which would be wrong) |
| 166 | BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change" ); |
| 167 | ASSERT(serial_continue_state0 == 0, "Improperly released continue_node" ); |
| 168 | ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect" ); |
| 169 | if(icnt == 0) { // first time through, let the continue_node fire |
| 170 | REMARK(" firing" ); |
| 171 | fnode0.try_put(1); // second message |
| 172 | BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute" ); |
| 173 | // Now the body of function_node 0 is executing. |
| 174 | serial_fn_state0 = 0; // release the node |
| 175 | |
| 176 | BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start" ); // now we wait for the continue_node. |
| 177 | ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started" ); |
| 178 | serial_continue_state0 = 0; // release the continue_node |
| 179 | BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start" ); // wait for the successor function_node to enter body |
| 180 | serial_fn_state1 = 0; // release successor function_node. |
| 181 | g.wait_for_all(); |
| 182 | |
| 183 | // try a try_get() |
| 184 | { |
| 185 | int i; |
| 186 | ASSERT(!cnode.try_get(i), "try_get not rejected" ); |
| 187 | } |
| 188 | |
| 189 | REMARK(" reset" ); |
| 190 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)" ); |
| 191 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)" ); |
| 192 | g.reset(); // should still be the same |
| 193 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" ); |
| 194 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)" ); |
| 195 | } |
| 196 | else { // we're going to see if the rf_clear_edges resets things. |
| 197 | g.wait_for_all(); |
| 198 | REMARK(" reset(rf_clear_edges)" ); |
| 199 | ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)" ); |
| 200 | ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)" ); |
| 201 | g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again |
| 202 | ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)" ); |
| 203 | ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
| 204 | ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset" ); |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | REMARK(" done\n" ); |
| 209 | |
| 210 | } |
| 211 | |
| 212 | // function_node has predecessors and successors |
| 213 | // try_get() rejects |
| 214 | // successor edges cannot be reversed |
| 215 | // predecessors will reverse (only rejecting will reverse) |
| 216 | void TestFunctionNode() { |
| 217 | tbb::flow::graph g; |
| 218 | tbb::flow::queue_node<int> qnode0(g); |
| 219 | tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
| 220 | // queueing function node |
| 221 | tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
| 222 | |
| 223 | tbb::flow::queue_node<int> qnode1(g); |
| 224 | |
| 225 | tbb::flow::make_edge(fnode0, qnode1); |
| 226 | tbb::flow::make_edge(qnode0, fnode0); |
| 227 | |
| 228 | serial_fn_state0 = 2; // just let it go |
| 229 | // see if the darned thing will work.... |
| 230 | qnode0.try_put(1); |
| 231 | g.wait_for_all(); |
| 232 | int ii; |
| 233 | ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed" ); |
| 234 | tbb::flow::remove_edge(qnode0, fnode0); |
| 235 | tbb::flow::remove_edge(fnode0, qnode1); |
| 236 | |
| 237 | tbb::flow::make_edge(fnode1, qnode1); |
| 238 | tbb::flow::make_edge(qnode0, fnode1); |
| 239 | |
| 240 | serial_fn_state0 = 2; // just let it go |
| 241 | // see if the darned thing will work.... |
| 242 | qnode0.try_put(1); |
| 243 | g.wait_for_all(); |
| 244 | ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed" ); |
| 245 | tbb::flow::remove_edge(qnode0, fnode1); |
| 246 | tbb::flow::remove_edge(fnode1, qnode1); |
| 247 | |
| 248 | // rejecting |
| 249 | serial_fn_state0 = 0; |
| 250 | tbb::flow::make_edge(fnode0, qnode1); |
| 251 | tbb::flow::make_edge(qnode0, fnode0); |
| 252 | REMARK("Testing rejecting function_node:" ); |
| 253 | ASSERT(!fnode0.my_queue, "node should have no queue" ); |
| 254 | ASSERT(!fnode0.my_successors.empty(), "successor edge not added" ); |
| 255 | qnode0.try_put(1); |
| 256 | BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start" ); |
| 257 | qnode0.try_put(2); // rejecting node should reject, reverse. |
| 258 | BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---" ); |
| 259 | serial_fn_state0 = 2; // release function_node body. |
| 260 | g.wait_for_all(); |
| 261 | REMARK(" reset" ); |
| 262 | g.reset(); // should reverse the edge from the input to the function node. |
| 263 | ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()" ); |
| 264 | ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed" ); |
| 265 | tbb::flow::remove_edge(qnode0, fnode0); |
| 266 | tbb::flow::remove_edge(fnode0, qnode1); |
| 267 | REMARK("\n" ); |
| 268 | |
| 269 | // queueing |
| 270 | tbb::flow::make_edge(fnode1, qnode1); |
| 271 | REMARK("Testing queueing function_node:" ); |
| 272 | ASSERT(fnode1.my_queue, "node should have no queue" ); |
| 273 | ASSERT(!fnode1.my_successors.empty(), "successor edge not added" ); |
| 274 | REMARK(" add_pred" ); |
| 275 | ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor" ); |
| 276 | ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor" ); |
| 277 | REMARK(" reset" ); |
| 278 | g.wait_for_all(); |
| 279 | g.reset(); // should reverse the edge from the input to the function node. |
| 280 | ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()" ); |
| 281 | ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed" ); |
| 282 | tbb::flow::remove_edge(qnode0, fnode1); |
| 283 | tbb::flow::remove_edge(fnode1, qnode1); |
| 284 | REMARK("\n" ); |
| 285 | |
| 286 | serial_fn_state0 = 0; // make the function_node wait |
| 287 | tbb::flow::make_edge(qnode0, fnode0); |
| 288 | REMARK(" start_func" ); |
| 289 | qnode0.try_put(1); |
| 290 | BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put" ); |
| 291 | // now if we put an item to the queues the edges to the function_node will reverse. |
| 292 | REMARK(" put_node(2)" ); |
| 293 | qnode0.try_put(2); // start queue node. |
| 294 | // wait for the edges to reverse |
| 295 | BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting" ); |
| 296 | ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed" ); |
| 297 | g.my_root_task->cancel_group_execution(); |
| 298 | // release the function_node |
| 299 | serial_fn_state0 = 2; |
| 300 | g.wait_for_all(); |
| 301 | ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed" ); |
| 302 | g.reset(tbb::flow::rf_clear_edges); |
| 303 | ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed" ); |
| 304 | ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed" ); |
| 305 | REMARK(" done\n" ); |
| 306 | } |
| 307 | |
| 308 | template<typename TT> |
| 309 | class tag_func { |
| 310 | TT my_mult; |
| 311 | public: |
| 312 | tag_func(TT multiplier) : my_mult(multiplier) { } |
| 313 | void operator=( const tag_func& other){my_mult = other.my_mult;} |
| 314 | // operator() will return [0 .. Count) |
| 315 | tbb::flow::tag_value operator()( TT v) { |
| 316 | tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); |
| 317 | return t; |
| 318 | } |
| 319 | }; |
| 320 | |
| 321 | template<typename JNODE_TYPE> |
| 322 | void |
| 323 | TestSimpleSuccessorArc(const char *name) { |
| 324 | tbb::flow::graph g; |
| 325 | { |
| 326 | REMARK("Join<%s> successor test " , name); |
| 327 | tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g); |
| 328 | tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g); |
| 329 | tbb::flow::make_edge(qj, bnode); |
| 330 | ASSERT(!qj.my_successors.empty(),"successor missing after linking" ); |
| 331 | g.reset(); |
| 332 | ASSERT(!qj.my_successors.empty(),"successor missing after reset()" ); |
| 333 | g.reset(tbb::flow::rf_clear_edges); |
| 334 | ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)" ); |
| 335 | } |
| 336 | } |
| 337 | |
| 338 | template<> |
| 339 | void |
| 340 | TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) { |
| 341 | tbb::flow::graph g; |
| 342 | { |
| 343 | REMARK("Join<%s> successor test " , name); |
| 344 | typedef tbb::flow::tuple<int,int> my_tuple; |
| 345 | tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g, |
| 346 | tag_func<int>(1), |
| 347 | tag_func<int>(1) |
| 348 | ); |
| 349 | tbb::flow::broadcast_node<my_tuple > bnode(g); |
| 350 | tbb::flow::make_edge(qj, bnode); |
| 351 | ASSERT(!qj.my_successors.empty(),"successor missing after linking" ); |
| 352 | g.reset(); |
| 353 | ASSERT(!qj.my_successors.empty(),"successor missing after reset()" ); |
| 354 | g.reset(tbb::flow::rf_clear_edges); |
| 355 | ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)" ); |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | void |
| 360 | TestJoinNode() { |
| 361 | tbb::flow::graph g; |
| 362 | |
| 363 | TestSimpleSuccessorArc<tbb::flow::queueing>("queueing" ); |
| 364 | TestSimpleSuccessorArc<tbb::flow::reserving>("reserving" ); |
| 365 | TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching" ); |
| 366 | |
| 367 | // queueing and tagging join nodes have input queues, so the input ports do not reverse. |
| 368 | REMARK(" reserving preds" ); |
| 369 | { |
| 370 | tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g); |
| 371 | tbb::flow::queue_node<int> q0(g); |
| 372 | tbb::flow::queue_node<int> q1(g); |
| 373 | tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj)); |
| 374 | tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj)); |
| 375 | q0.try_put(1); |
| 376 | g.wait_for_all(); // quiesce |
| 377 | ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
| 378 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
| 379 | g.reset(); |
| 380 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
| 381 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
| 382 | q1.try_put(2); |
| 383 | g.wait_for_all(); // quiesce |
| 384 | ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
| 385 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
| 386 | g.reset(); |
| 387 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
| 388 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
| 389 | // should reset predecessors just as regular reset. |
| 390 | q1.try_put(3); |
| 391 | g.wait_for_all(); // quiesce |
| 392 | ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor" ); |
| 393 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred" ); |
| 394 | g.reset(tbb::flow::rf_clear_edges); |
| 395 | ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()" ); |
| 396 | ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()" ); |
| 397 | ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
| 398 | ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
| 399 | } |
| 400 | REMARK(" done\n" ); |
| 401 | } |
| 402 | |
| 403 | void |
| 404 | TestLimiterNode() { |
| 405 | int out_int; |
| 406 | tbb::flow::graph g; |
| 407 | tbb::flow::limiter_node<int> ln(g,1); |
| 408 | REMARK("Testing limiter_node: preds and succs" ); |
| 409 | ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count" ); |
| 410 | ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count" ); |
| 411 | ASSERT(ln.decrement.my_current_count == 0, "error in current count" ); |
| 412 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR |
| 413 | ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors" ); |
| 414 | #endif |
| 415 | ASSERT(ln.my_threshold == 1, "error in my_threshold" ); |
| 416 | tbb::flow::queue_node<int> inq(g); |
| 417 | tbb::flow::queue_node<int> outq(g); |
| 418 | tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g); |
| 419 | |
| 420 | tbb::flow::make_edge(inq,ln); |
| 421 | tbb::flow::make_edge(ln,outq); |
| 422 | tbb::flow::make_edge(bn,ln.decrement); |
| 423 | |
| 424 | g.wait_for_all(); |
| 425 | ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge" ); |
| 426 | ASSERT(ln.my_predecessors.empty(), "input edge reversed" ); |
| 427 | inq.try_put(1); |
| 428 | g.wait_for_all(); |
| 429 | ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value" ); |
| 430 | ASSERT(ln.my_predecessors.empty(), "input edge reversed" ); |
| 431 | inq.try_put(2); |
| 432 | g.wait_for_all(); |
| 433 | ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input" ); |
| 434 | ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed" ); |
| 435 | bn.try_put(tbb::flow::continue_msg()); |
| 436 | g.wait_for_all(); |
| 437 | ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value" ); |
| 438 | g.wait_for_all(); |
| 439 | ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())" ); |
| 440 | g.reset(); |
| 441 | ASSERT(ln.my_predecessors.empty(), "input edge not reset" ); |
| 442 | inq.try_put(3); |
| 443 | g.wait_for_all(); |
| 444 | ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value" ); |
| 445 | |
| 446 | REMARK(" rf_clear_edges" ); |
| 447 | // currently the limiter_node will not pass another message |
| 448 | g.reset(tbb::flow::rf_clear_edges); |
| 449 | ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count" ); |
| 450 | ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count" ); |
| 451 | ASSERT(ln.decrement.my_current_count == 0, "error in current count" ); |
| 452 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR |
| 453 | ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors" ); |
| 454 | #endif |
| 455 | ASSERT(ln.my_threshold == 1, "error in my_threshold" ); |
| 456 | ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)" ); |
| 457 | ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)" ); |
| 458 | ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)" ); |
| 459 | ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)" ); |
| 460 | ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)" ); |
| 461 | tbb::flow::make_edge(inq,ln); |
| 462 | tbb::flow::make_edge(ln,outq); |
| 463 | inq.try_put(4); |
| 464 | inq.try_put(5); |
| 465 | g.wait_for_all(); |
| 466 | ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)" ); |
| 467 | ASSERT(out_int == 4, "input incorrect (4)" ); |
| 468 | bn.try_put(tbb::flow::continue_msg()); |
| 469 | g.wait_for_all(); |
| 470 | ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)" ); |
| 471 | REMARK(" done\n" ); |
| 472 | } |
| 473 | |
| 474 | template<typename MF_TYPE> |
| 475 | struct mf_body { |
| 476 | tbb::atomic<int> *_flag; |
| 477 | mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { } |
| 478 | void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) { |
| 479 | if(*_flag == 0) { |
| 480 | *_flag = 1; |
| 481 | BACKOFF_WAIT(*_flag == 1, "multifunction_node not released" ); |
| 482 | } |
| 483 | |
| 484 | if(in & 0x1) tbb::flow::get<1>(outports).try_put(in); |
| 485 | else tbb::flow::get<0>(outports).try_put(in); |
| 486 | } |
| 487 | }; |
| 488 | |
| 489 | template<typename P, typename T> |
| 490 | struct test_reversal; |
| 491 | template<typename T> |
| 492 | struct test_reversal<tbb::flow::queueing, T> { |
| 493 | test_reversal() { REMARK("<queueing>" ); } |
| 494 | // queueing node will not reverse. |
| 495 | bool operator()( T &node) { return node.my_predecessors.empty(); } |
| 496 | }; |
| 497 | |
| 498 | template<typename T> |
| 499 | struct test_reversal<tbb::flow::rejecting, T> { |
| 500 | test_reversal() { REMARK("<rejecting>" ); } |
| 501 | bool operator()( T &node) { return !node.my_predecessors.empty(); } |
| 502 | }; |
| 503 | |
| 504 | template<typename P> |
| 505 | void |
| 506 | TestMultifunctionNode() { |
| 507 | typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type; |
| 508 | REMARK("Testing multifunction_node" ); |
| 509 | test_reversal<P,multinode_type> my_test; |
| 510 | REMARK(":" ); |
| 511 | tbb::flow::graph g; |
| 512 | multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0)); |
| 513 | tbb::flow::queue_node<int> qin(g); |
| 514 | tbb::flow::queue_node<int> qodd_out(g); |
| 515 | tbb::flow::queue_node<int> qeven_out(g); |
| 516 | tbb::flow::make_edge(qin,mf); |
| 517 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out); |
| 518 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out); |
| 519 | g.wait_for_all(); |
| 520 | for( int ii = 0; ii < 2 ; ++ii) { |
| 521 | serial_fn_state0 = 0; |
| 522 | if(ii == 0) REMARK(" reset preds" ); else REMARK(" 2nd" ); |
| 523 | qin.try_put(0); |
| 524 | // wait for node to be active |
| 525 | BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put" ); |
| 526 | qin.try_put(1); |
| 527 | BACKOFF_WAIT((!my_test(mf)), "Timed out waiting" ); |
| 528 | ASSERT(my_test(mf), "fail second put test" ); |
| 529 | g.my_root_task->cancel_group_execution(); |
| 530 | // release node |
| 531 | serial_fn_state0 = 2; |
| 532 | g.wait_for_all(); |
| 533 | ASSERT(my_test(mf), "fail cancel group test" ); |
| 534 | if( ii == 1) { |
| 535 | REMARK(" rf_clear_edges" ); |
| 536 | g.reset(tbb::flow::rf_clear_edges); |
| 537 | ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)" ); |
| 538 | ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)" ); |
| 539 | } |
| 540 | else |
| 541 | { |
| 542 | g.reset(); |
| 543 | } |
| 544 | ASSERT(mf.my_predecessors.empty(), "edge didn't reset" ); |
| 545 | ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset" ); |
| 546 | } |
| 547 | REMARK(" done\n" ); |
| 548 | } |
| 549 | |
| 550 | // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it |
| 551 | // never allows a successor to reverse its edge, so we only need test the successors. |
| 552 | void |
| 553 | TestIndexerNode() { |
| 554 | tbb::flow::graph g; |
| 555 | typedef tbb::flow::indexer_node< int, int > indexernode_type; |
| 556 | indexernode_type inode(g); |
| 557 | REMARK("Testing indexer_node:" ); |
| 558 | tbb::flow::queue_node<indexernode_type::output_type> qout(g); |
| 559 | tbb::flow::make_edge(inode,qout); |
| 560 | g.wait_for_all(); |
| 561 | ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing" ); |
| 562 | g.reset(); |
| 563 | ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset" ); |
| 564 | g.reset(tbb::flow::rf_clear_edges); |
| 565 | ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)" ); |
| 566 | REMARK(" done\n" ); |
| 567 | } |
| 568 | |
| 569 | template<typename Node> |
| 570 | void |
| 571 | TestScalarNode(const char *name) { |
| 572 | tbb::flow::graph g; |
| 573 | Node on(g); |
| 574 | tbb::flow::queue_node<int> qout(g); |
| 575 | REMARK("Testing %s:" , name); |
| 576 | tbb::flow::make_edge(on,qout); |
| 577 | g.wait_for_all(); |
| 578 | ASSERT(!on.my_successors.empty(), "edge not added" ); |
| 579 | g.reset(); |
| 580 | ASSERT(!on.my_successors.empty(), "edge improperly removed" ); |
| 581 | g.reset(tbb::flow::rf_clear_edges); |
| 582 | ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)" ); |
| 583 | REMARK(" done\n" ); |
| 584 | } |
| 585 | |
| 586 | struct seq_body { |
| 587 | size_t operator()(const int &in) { |
| 588 | return size_t(in / 3); |
| 589 | } |
| 590 | }; |
| 591 | |
| 592 | // sequencer_node behaves like a queueing node, but requires a different constructor. |
| 593 | void |
| 594 | TestSequencerNode() { |
| 595 | tbb::flow::graph g; |
| 596 | tbb::flow::sequencer_node<int> bnode(g, seq_body()); |
| 597 | REMARK("Testing sequencer_node:" ); |
| 598 | tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0)); |
| 599 | REMARK("Testing sequencer_node:" ); |
| 600 | serial_fn_state0 = 0; // reset to waiting state. |
| 601 | REMARK(" make_edge" ); |
| 602 | tbb::flow::make_edge(bnode, fnode); |
| 603 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge" ); |
| 604 | REMARK(" try_put" ); |
| 605 | bnode.try_put(0); // will forward to the fnode |
| 606 | BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node" ); // wait for the function_node to fire up |
| 607 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message" ); |
| 608 | serial_fn_state0 = 0; |
| 609 | g.wait_for_all(); |
| 610 | REMARK(" remove_edge" ); |
| 611 | tbb::flow::remove_edge(bnode, fnode); |
| 612 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge" ); |
| 613 | tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g); |
| 614 | tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task |
| 615 | g.wait_for_all(); |
| 616 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join" ); |
| 617 | REMARK(" reverse" ); |
| 618 | bnode.try_put(3); // the edge should reverse |
| 619 | g.wait_for_all(); |
| 620 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving" ); |
| 621 | REMARK(" reset()" ); |
| 622 | g.wait_for_all(); |
| 623 | g.reset(); // should be in forward direction again |
| 624 | ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()" ); |
| 625 | REMARK(" remove_edge" ); |
| 626 | g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again |
| 627 | ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)" ); |
| 628 | ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)" ); |
| 629 | REMARK(" done\n" ); |
| 630 | g.wait_for_all(); |
| 631 | } |
| 632 | |
| 633 | struct snode_body { |
| 634 | int max_cnt; |
| 635 | int my_cnt; |
| 636 | snode_body( const int &in) : max_cnt(in) { my_cnt = 0; } |
| 637 | bool operator()(int &out) { |
| 638 | if(max_cnt <= my_cnt++) return false; |
| 639 | out = my_cnt; |
| 640 | return true; |
| 641 | } |
| 642 | }; |
| 643 | |
| 644 | void |
| 645 | TestSourceNode() { |
| 646 | tbb::flow::graph g; |
| 647 | tbb::flow::source_node<int> sn(g, snode_body(4), false); |
| 648 | REMARK("Testing source_node:" ); |
| 649 | tbb::flow::queue_node<int> qin(g); |
| 650 | tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g); |
| 651 | tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g); |
| 652 | |
| 653 | REMARK(" make_edges" ); |
| 654 | tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn)); |
| 655 | tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); |
| 656 | tbb::flow::make_edge(jn,qout); |
| 657 | ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge" ); |
| 658 | g.wait_for_all(); |
| 659 | g.reset(); |
| 660 | ASSERT(!sn.my_successors.empty(), "source node has no successor after reset" ); |
| 661 | g.wait_for_all(); |
| 662 | g.reset(tbb::flow::rf_clear_edges); |
| 663 | ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)" ); |
| 664 | tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn)); |
| 665 | tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn)); |
| 666 | tbb::flow::make_edge(jn,qout); |
| 667 | g.wait_for_all(); |
| 668 | REMARK(" activate" ); |
| 669 | sn.activate(); // will forward to the fnode |
| 670 | REMARK(" wait1" ); |
| 671 | BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse" ); |
| 672 | ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message" ); |
| 673 | |
| 674 | g.wait_for_all(); |
| 675 | g.reset(); |
| 676 | ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset" ); |
| 677 | ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset." ); |
| 678 | REMARK(" done\n" ); |
| 679 | } |
| 680 | |
| 681 | int TestMain() { |
| 682 | |
| 683 | if(MinThread < 3) MinThread = 3; |
| 684 | tbb::task_scheduler_init init(MinThread); // tests presume at least three threads |
| 685 | |
| 686 | TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node" ); |
| 687 | TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node" ); |
| 688 | TestBufferingNode< tbb::flow::queue_node<int> >("queue_node" ); |
| 689 | TestSequencerNode(); |
| 690 | |
| 691 | TestMultifunctionNode<tbb::flow::rejecting>(); |
| 692 | TestMultifunctionNode<tbb::flow::queueing>(); |
| 693 | TestSourceNode(); |
| 694 | TestContinueNode(); |
| 695 | TestFunctionNode(); |
| 696 | |
| 697 | TestJoinNode(); |
| 698 | |
| 699 | TestLimiterNode(); |
| 700 | TestIndexerNode(); |
| 701 | TestSplitNode(); |
| 702 | TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node" ); |
| 703 | TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node" ); |
| 704 | TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node" ); |
| 705 | |
| 706 | return Harness::Done; |
| 707 | } |
| 708 | |
| 709 | |