1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #define HARNESS_DEFAULT_MIN_THREADS 2 |
18 | #define HARNESS_DEFAULT_MAX_THREADS 4 |
19 | #include "harness_defs.h" |
20 | |
21 | #if _MSC_VER |
22 | #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning |
23 | #endif |
24 | |
25 | #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED |
26 | // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer) |
27 | #pragma warning (disable: 4702) |
28 | #endif |
29 | |
30 | #include "harness.h" |
31 | |
32 | // global task_scheduler_observer is an imperfect tool to find how many threads are really |
33 | // participating. That was the hope, but it counts the entries into the marketplace, |
34 | // not the arena. |
35 | // #define USE_TASK_SCHEDULER_OBSERVER 1 |
36 | |
37 | #if _MSC_VER && defined(__INTEL_COMPILER) && !TBB_USE_DEBUG |
38 | #define TBB_RUN_BUFFERING_TEST __INTEL_COMPILER > 1210 |
39 | #else |
40 | #define TBB_RUN_BUFFERING_TEST 1 |
41 | #endif |
42 | |
43 | #if TBB_USE_EXCEPTIONS |
44 | #if USE_TASK_SCHEDULER_OBSERVER |
45 | #include "tbb/task_scheduler_observer.h" |
46 | #endif |
47 | #include "tbb/flow_graph.h" |
48 | #include "tbb/task_scheduler_init.h" |
49 | #include <iostream> |
50 | #include <vector> |
51 | #include "harness_assert.h" |
52 | #include "harness_checktype.h" |
53 | |
54 | inline intptr_t Existed() { return INT_MAX; } // resolve Existed in harness_eh.h |
55 | |
56 | #include "harness_eh.h" |
57 | #include <stdexcept> |
58 | |
59 | #define NUM_ITEMS 15 |
60 | int g_NumItems; |
61 | |
62 | tbb::atomic<unsigned> nExceptions; |
63 | tbb::atomic<intptr_t> g_TGCCancelled; |
64 | |
65 | enum TestNodeTypeEnum { nonThrowing, isThrowing }; |
66 | |
67 | static const size_t unlimited_type = 0; |
68 | static const size_t serial_type = 1; |
69 | static const size_t limited_type = 4; |
70 | |
71 | template<TestNodeTypeEnum T> struct TestNodeTypeName; |
72 | template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing" ; } }; |
73 | template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing" ; } }; |
74 | |
75 | template<size_t Conc> struct concurrencyName; |
76 | template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial" ; } }; |
77 | template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited" ; } }; |
78 | template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited" ; } }; |
79 | |
80 | // Class that provides waiting and throwing behavior. If we are not throwing, do nothing |
81 | // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will |
82 | // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat |
83 | // arbitrary, and just makes sure there are enough items in the graph to keep it flowing), |
84 | // If parallel or serial and throwing, use Harness::ConcurrencyTracker to wait. |
85 | |
86 | template<size_t Conc, TestNodeTypeEnum t = nonThrowing> |
87 | class WaitThrow; |
88 | |
89 | template<> |
90 | class WaitThrow<serial_type,nonThrowing> { |
91 | protected: |
92 | void WaitAndThrow(int cnt, const char * /*name*/) { |
93 | if(cnt > g_NumThreads + 10) { |
94 | Harness::ConcurrencyTracker ct; |
95 | WaitUntilConcurrencyPeaks(); |
96 | } |
97 | } |
98 | }; |
99 | |
100 | template<> |
101 | class WaitThrow<serial_type,isThrowing> { |
102 | protected: |
103 | void WaitAndThrow(int cnt, const char * /*name*/) { |
104 | if(cnt > g_NumThreads + 10) { |
105 | Harness::ConcurrencyTracker ct; |
106 | WaitUntilConcurrencyPeaks(); |
107 | ThrowTestException(1); |
108 | } |
109 | } |
110 | }; |
111 | |
112 | // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need |
113 | // to make sure enough other nodes wait for concurrency to peak. If we are attached to |
114 | // N successors, for each item we pass to a successor, we will get N executions of the |
115 | // "absorbers" (because we broadcast to successors.) for an odd number of threads we |
116 | // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution |
117 | // of an "absorber", but we can't change that without changing the behavior of the node.) |
118 | template<> |
119 | class WaitThrow<limited_type,nonThrowing> { |
120 | protected: |
121 | void WaitAndThrow(int cnt, const char * /*name*/) { |
122 | if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { |
123 | return; |
124 | } |
125 | Harness::ConcurrencyTracker ct; |
126 | WaitUntilConcurrencyPeaks(); |
127 | } |
128 | }; |
129 | |
130 | template<> |
131 | class WaitThrow<limited_type,isThrowing> { |
132 | protected: |
133 | void WaitAndThrow(int cnt, const char * /*name*/) { |
134 | Harness::ConcurrencyTracker ct; |
135 | if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) { |
136 | return; |
137 | } |
138 | WaitUntilConcurrencyPeaks(); |
139 | ThrowTestException(1); |
140 | } |
141 | }; |
142 | |
143 | template<> |
144 | class WaitThrow<unlimited_type,nonThrowing> { |
145 | protected: |
146 | void WaitAndThrow(int /*cnt*/, const char * /*name*/) { |
147 | Harness::ConcurrencyTracker ct; |
148 | WaitUntilConcurrencyPeaks(); |
149 | } |
150 | }; |
151 | |
152 | template<> |
153 | class WaitThrow<unlimited_type,isThrowing> { |
154 | protected: |
155 | void WaitAndThrow(int /*cnt*/, const char * /*name*/) { |
156 | Harness::ConcurrencyTracker ct; |
157 | WaitUntilConcurrencyPeaks(); |
158 | ThrowTestException(1); |
159 | } |
160 | }; |
161 | |
162 | void |
163 | ResetGlobals(bool throwException = true, bool flog = false) { |
164 | nExceptions = 0; |
165 | g_TGCCancelled = 0; |
166 | ResetEhGlobals(throwException, flog); |
167 | } |
168 | |
169 | // -------source_node body ------------------ |
170 | template <class OutputType, TestNodeTypeEnum TType> |
171 | class test_source_body : WaitThrow<serial_type, TType> { |
172 | using WaitThrow<serial_type, TType>::WaitAndThrow; |
173 | tbb::atomic<int> *my_current_val; |
174 | int my_mult; |
175 | public: |
176 | test_source_body(tbb::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) { |
177 | REMARK("- --------- - - - constructed %lx\n" , (size_t)(my_current_val)); |
178 | } |
179 | |
180 | bool operator()(OutputType & out) { |
181 | UPDATE_COUNTS(); |
182 | out = OutputType(my_mult * ++(*my_current_val)); |
183 | REMARK("xx(%lx) out == %d\n" , (size_t)(my_current_val), (int)out); |
184 | if(*my_current_val > g_NumItems) { |
185 | REMARK(" ------ End of the line!\n" ); |
186 | *my_current_val = g_NumItems; |
187 | return false; |
188 | } |
189 | WaitAndThrow((int)out,"test_source_body" ); |
190 | return true; |
191 | } |
192 | |
193 | int count_value() { return (int)*my_current_val; } |
194 | }; |
195 | |
196 | template <TestNodeTypeEnum TType> |
197 | class test_source_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> { |
198 | using WaitThrow<serial_type, TType>::WaitAndThrow; |
199 | tbb::atomic<int> *my_current_val; |
200 | public: |
201 | test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } |
202 | |
203 | bool operator()(tbb::flow::continue_msg & out) { |
204 | UPDATE_COUNTS(); |
205 | int outint = ++(*my_current_val); |
206 | out = tbb::flow::continue_msg(); |
207 | if(*my_current_val > g_NumItems) { |
208 | *my_current_val = g_NumItems; |
209 | return false; |
210 | } |
211 | WaitAndThrow(outint,"test_source_body" ); |
212 | return true; |
213 | } |
214 | |
215 | int count_value() { return (int)*my_current_val; } |
216 | }; |
217 | |
218 | // -------{function/continue}_node body ------------------ |
219 | template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc> |
220 | class absorber_body : WaitThrow<Conc,T> { |
221 | using WaitThrow<Conc,T>::WaitAndThrow; |
222 | tbb::atomic<int> *my_count; |
223 | public: |
224 | absorber_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { } |
225 | OutputType operator()(const InputType &/*p_in*/) { |
226 | UPDATE_COUNTS(); |
227 | int out = ++(*my_count); |
228 | WaitAndThrow(out,"absorber_body" ); |
229 | return OutputType(); |
230 | } |
231 | int count_value() { return *my_count; } |
232 | }; |
233 | |
234 | // -------multifunction_node body ------------------ |
235 | |
236 | // helper classes |
237 | template<int N,class PortsType> |
238 | struct IssueOutput { |
239 | typedef typename tbb::flow::tuple_element<N-1,PortsType>::type::output_type my_type; |
240 | |
241 | static void issue_tuple_element( PortsType &my_ports) { |
242 | ASSERT(tbb::flow::get<N-1>(my_ports).try_put(my_type()), "Error putting to successor" ); |
243 | IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports); |
244 | } |
245 | }; |
246 | |
247 | template<class PortsType> |
248 | struct IssueOutput<1,PortsType> { |
249 | typedef typename tbb::flow::tuple_element<0,PortsType>::type::output_type my_type; |
250 | |
251 | static void issue_tuple_element( PortsType &my_ports) { |
252 | ASSERT(tbb::flow::get<0>(my_ports).try_put(my_type()), "Error putting to successor" ); |
253 | } |
254 | }; |
255 | |
256 | template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc> |
257 | class multifunction_node_body : WaitThrow<Conc,T> { |
258 | using WaitThrow<Conc,T>::WaitAndThrow; |
259 | static const int N = tbb::flow::tuple_size<OutputTupleType>::value; |
260 | typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType; |
261 | typedef typename NodeType::output_ports_type PortsType; |
262 | tbb::atomic<int> *my_count; |
263 | public: |
264 | multifunction_node_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { } |
265 | void operator()(const InputType& /*in*/, PortsType &my_ports) { |
266 | UPDATE_COUNTS(); |
267 | int out = ++(*my_count); |
268 | WaitAndThrow(out,"multifunction_node_body" ); |
269 | // issue an item to each output port. |
270 | IssueOutput<N,PortsType>::issue_tuple_element(my_ports); |
271 | } |
272 | |
273 | int count_value() { return *my_count; } |
274 | }; |
275 | |
276 | // --------- body to sort items in sequencer_node |
277 | template<class BufferItemType> |
278 | struct sequencer_body { |
279 | size_t operator()(const BufferItemType &s) { |
280 | ASSERT(s, "sequencer item out of range (== 0)" ); |
281 | return size_t(s) - 1; |
282 | } |
283 | }; |
284 | |
285 | // --------- body to compare the "priorities" of objects for priority_queue_node five priority levels 0-4. |
286 | template<class T> |
287 | struct myLess { |
288 | bool operator()(const T &t1, const T &t2) { |
289 | return (int(t1) % 5) < (int(t2) % 5); |
290 | } |
291 | }; |
292 | |
293 | // --------- type for < comparison in priority_queue_node. |
294 | template<class ItemType> |
295 | struct less_body { |
296 | bool operator()(const ItemType &lhs, const ItemType &rhs) { |
297 | return (int(lhs) % 3) < (int(rhs) % 3); |
298 | } |
299 | }; |
300 | |
301 | // --------- tag methods for tag_matching join_node |
302 | template<typename TT> |
303 | class tag_func { |
304 | TT my_mult; |
305 | public: |
306 | tag_func(TT multiplier) : my_mult(multiplier) { } |
307 | void operator=( const tag_func& other){my_mult = other.my_mult;} |
308 | // operator() will return [0 .. Count) |
309 | tbb::flow::tag_value operator()( TT v) { |
310 | tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult); |
311 | return t; |
312 | } |
313 | }; |
314 | |
315 | // --------- Source body for split_node test. |
316 | template <class OutputTuple, TestNodeTypeEnum TType> |
317 | class tuple_test_source_body : WaitThrow<serial_type, TType> { |
318 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0; |
319 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1; |
320 | using WaitThrow<serial_type, TType>::WaitAndThrow; |
321 | tbb::atomic<int> *my_current_val; |
322 | public: |
323 | tuple_test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { } |
324 | |
325 | bool operator()(OutputTuple & out) { |
326 | UPDATE_COUNTS(); |
327 | int ival = ++(*my_current_val); |
328 | out = OutputTuple(ItemType0(ival),ItemType1(ival)); |
329 | if(*my_current_val > g_NumItems) { |
330 | *my_current_val = g_NumItems; // jam the final value; we assert on it later. |
331 | return false; |
332 | } |
333 | WaitAndThrow(ival,"tuple_test_source_body" ); |
334 | return true; |
335 | } |
336 | |
337 | int count_value() { return (int)*my_current_val; } |
338 | }; |
339 | |
340 | // ------- end of node bodies |
341 | |
342 | // source_node is only-serial. source_node can throw, or the function_node can throw. |
343 | // graph being tested is |
344 | // |
345 | // source_node+---+parallel function_node |
346 | // |
347 | // After each run the graph is reset(), to test the reset functionality. |
348 | // |
349 | |
350 | |
351 | template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType> |
352 | void run_one_source_node_test(bool throwException, bool flog) { |
353 | typedef test_source_body<ItemType,srcThrowType> src_body_type; |
354 | typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type; |
355 | tbb::atomic<int> source_body_count; |
356 | tbb::atomic<int> absorber_body_count; |
357 | source_body_count = 0; |
358 | absorber_body_count = 0; |
359 | |
360 | tbb::flow::graph g; |
361 | |
362 | g_Master = Harness::CurrentTid(); |
363 | |
364 | #if USE_TASK_SCHEDULER_OBSERVER |
365 | eh_test_observer o; |
366 | o.observe(true); |
367 | #endif |
368 | |
369 | tbb::flow::source_node<ItemType> sn(g, src_body_type(source_body_count),/*is_active*/false); |
370 | parallel_absorb_body_type ab2(absorber_body_count); |
371 | tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2); |
372 | make_edge(sn, parallel_fn); |
373 | for(int runcnt = 0; runcnt < 2; ++runcnt) { |
374 | ResetGlobals(throwException,flog); |
375 | if(throwException) { |
376 | TRY(); |
377 | sn.activate(); |
378 | g.wait_for_all(); |
379 | CATCH_AND_ASSERT(); |
380 | } |
381 | else { |
382 | TRY(); |
383 | sn.activate(); |
384 | g.wait_for_all(); |
385 | CATCH_AND_FAIL(); |
386 | } |
387 | |
388 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
389 | int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); |
390 | int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); |
391 | if(throwException) { |
392 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception flag in flow::graph not set" ); |
393 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "canceled flag not set" ); |
394 | ASSERT(src_cnt <= g_NumItems, "Too many source_node items emitted" ); |
395 | ASSERT(sink_cnt <= src_cnt, "Too many source_node items received" ); |
396 | } |
397 | else { |
398 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
399 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
400 | ASSERT(src_cnt == g_NumItems, "Incorrect # source_node items emitted" ); |
401 | ASSERT(sink_cnt == src_cnt, "Incorrect # source_node items received" ); |
402 | } |
403 | g.reset(); // resets the body of the source_node and the absorb_nodes. |
404 | source_body_count = 0; |
405 | absorber_body_count = 0; |
406 | ASSERT(!g.exception_thrown(), "Reset didn't clear exception_thrown()" ); |
407 | ASSERT(!g.is_cancelled(), "Reset didn't clear is_cancelled()" ); |
408 | src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value(); |
409 | sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value(); |
410 | ASSERT(src_cnt == 0, "source_node count not reset" ); |
411 | ASSERT(sink_cnt == 0, "sink_node count not reset" ); |
412 | } |
413 | #if USE_TASK_SCHEDULER_OBSERVER |
414 | o.observe(false); |
415 | #endif |
416 | } // run_one_source_node_test |
417 | |
418 | |
419 | template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType> |
420 | void run_source_node_test() { |
421 | run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(false,false); |
422 | run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,false); |
423 | run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,true); |
424 | } // run_source_node_test |
425 | |
426 | void test_source_node() { |
427 | REMARK("Testing source_node\n" ); |
428 | check_type<int>::check_type_counter = 0; |
429 | g_Wakeup_Msg = "source_node(1): Missed wakeup or machine is overloaded?" ; |
430 | run_source_node_test<check_type<int>, isThrowing, nonThrowing>(); |
431 | ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test" ); |
432 | g_Wakeup_Msg = "source_node(2): Missed wakeup or machine is overloaded?" ; |
433 | run_source_node_test<int, isThrowing, nonThrowing>(); |
434 | g_Wakeup_Msg = "source_node(3): Missed wakeup or machine is overloaded?" ; |
435 | run_source_node_test<int, nonThrowing, isThrowing>(); |
436 | g_Wakeup_Msg = "source_node(4): Missed wakeup or machine is overloaded?" ; |
437 | run_source_node_test<int, isThrowing, isThrowing>(); |
438 | g_Wakeup_Msg = "source_node(5): Missed wakeup or machine is overloaded?" ; |
439 | run_source_node_test<check_type<int>, isThrowing, isThrowing>(); |
440 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
441 | ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test" ); |
442 | } |
443 | |
444 | // -------- utilities & types to test function_node and multifunction_node. |
445 | |
446 | // need to tell the template which node type I am using so it attaches successors correctly. |
447 | enum NodeFetchType { func_node_type, multifunc_node_type }; |
448 | |
449 | template<class NodeType, class ItemType, int indx, NodeFetchType NFT> |
450 | struct AttachPoint; |
451 | |
452 | template<class NodeType, class ItemType, int indx> |
453 | struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> { |
454 | static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { |
455 | return tbb::flow::output_port<indx>(n); |
456 | } |
457 | }; |
458 | |
459 | template<class NodeType, class ItemType, int indx> |
460 | struct AttachPoint<NodeType,ItemType,indx,func_node_type> { |
461 | static tbb::flow::sender<ItemType> &GetSender(NodeType &n) { |
462 | return n; |
463 | } |
464 | }; |
465 | |
466 | |
467 | // common template for running function_node, multifunction_node. continue_node |
468 | // has different firing requirements, so it needs a different graph topology. |
469 | template< |
470 | class SourceNodeType, |
471 | class SourceNodeBodyType0, |
472 | class SourceNodeBodyType1, |
473 | NodeFetchType NFT, |
474 | class TestNodeType, |
475 | class TestNodeBodyType, |
476 | class TypeToSink0, // what kind of item are we sending to sink0 |
477 | class TypeToSink1, // what kind of item are we sending to sink1 |
478 | class SinkNodeType0, // will be same for function; |
479 | class SinkNodeType1, // may differ for multifunction_node |
480 | class SinkNodeBodyType0, |
481 | class SinkNodeBodyType1, |
482 | size_t Conc |
483 | > |
484 | void |
485 | run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) { |
486 | |
487 | char mymsg[132]; |
488 | char *saved_msg = const_cast<char *>(g_Wakeup_Msg); |
489 | tbb::flow::graph g; |
490 | |
491 | tbb::atomic<int> source0_count; |
492 | tbb::atomic<int> source1_count; |
493 | tbb::atomic<int> sink0_count; |
494 | tbb::atomic<int> sink1_count; |
495 | tbb::atomic<int> test_count; |
496 | source0_count = source1_count = sink0_count = sink1_count = test_count = 0; |
497 | |
498 | #if USE_TASK_SCHEDULER_OBSERVER |
499 | eh_test_observer o; |
500 | o.observe(true); |
501 | #endif |
502 | |
503 | g_Master = Harness::CurrentTid(); |
504 | SourceNodeType source0(g, SourceNodeBodyType0(source0_count),/*is_active*/false); |
505 | SourceNodeType source1(g, SourceNodeBodyType1(source1_count),/*is_active*/false); |
506 | TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count)); |
507 | SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count)); |
508 | SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count)); |
509 | make_edge(source0, node_to_test); |
510 | make_edge(source1, node_to_test); |
511 | make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0); |
512 | make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1); |
513 | |
514 | for(int iter = 0; iter < 2; ++iter) { // run, reset, run again |
515 | sprintf(mymsg, "%s iter=%d, threads=%d, throw=%s, flog=%s" , saved_msg, iter, g_NumThreads, |
516 | throwException?"T" :"F" , flog?"T" :"F" ); |
517 | g_Wakeup_Msg = mymsg; |
518 | ResetGlobals(throwException,flog); |
519 | if(throwException) { |
520 | TRY(); |
521 | source0.activate(); |
522 | source1.activate(); |
523 | g.wait_for_all(); |
524 | CATCH_AND_ASSERT(); |
525 | } |
526 | else { |
527 | TRY(); |
528 | source0.activate(); |
529 | source1.activate(); |
530 | g.wait_for_all(); |
531 | CATCH_AND_FAIL(); |
532 | } |
533 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
534 | int sb0_cnt = tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value(); |
535 | int sb1_cnt = tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value(); |
536 | int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); |
537 | int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(); |
538 | int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(); |
539 | if(throwException) { |
540 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
541 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
542 | ASSERT(sb0_cnt + sb1_cnt <= 2*g_NumItems, "Too many items sent by sources" ); |
543 | ASSERT(sb0_cnt + sb1_cnt >= t_cnt, "Too many items received by test node" ); |
544 | ASSERT(nb0_cnt + nb1_cnt <= t_cnt*2, "Too many items received by sink nodes" ); |
545 | } |
546 | else { |
547 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
548 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
549 | ASSERT(sb0_cnt + sb1_cnt == 2*g_NumItems, "Missing invocations of source_nodes" ); |
550 | ASSERT(t_cnt == 2*g_NumItems, "Not all items reached test node" ); |
551 | ASSERT(nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems, "Missing items in absorbers" ); |
552 | } |
553 | g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes. |
554 | source0_count = source1_count = sink0_count = sink1_count = test_count = 0; |
555 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value(),"Reset source 0 failed" ); |
556 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value(),"Reset source 1 failed" ); |
557 | ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed" ); |
558 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(),"Reset sink 0 failed" ); |
559 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(),"Reset sink 1 failed" ); |
560 | |
561 | g_Wakeup_Msg = saved_msg; |
562 | } |
563 | #if USE_TASK_SCHEDULER_OBSERVER |
564 | o.observe(false); |
565 | #endif |
566 | } |
567 | |
568 | // Test function_node |
569 | // |
570 | // graph being tested is |
571 | // |
572 | // source_node -\ /- parallel function_node |
573 | // \ / |
574 | // +function_node+ |
575 | // / \ x |
576 | // source_node -/ \- parallel function_node |
577 | // |
578 | // After each run the graph is reset(), to test the reset functionality. |
579 | // |
580 | template< |
581 | TestNodeTypeEnum SType1, // does source node 1 throw? |
582 | TestNodeTypeEnum SType2, // does source node 2 throw? |
583 | class Item12, // type of item passed between sources and test node |
584 | TestNodeTypeEnum FType, // does function node throw? |
585 | class Item23, // type passed from function_node to sink nodes |
586 | TestNodeTypeEnum NType1, // does sink node 1 throw? |
587 | TestNodeTypeEnum NType2, // does sink node 1 throw? |
588 | class NodePolicy, // rejecting,queueing |
589 | size_t Conc // is node concurrent? {serial | limited | unlimited} |
590 | > |
591 | void run_function_node_test() { |
592 | |
593 | typedef test_source_body<Item12,SType1> SBodyType1; |
594 | typedef test_source_body<Item12,SType2> SBodyType2; |
595 | typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType; |
596 | typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; |
597 | typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; |
598 | |
599 | typedef tbb::flow::source_node<Item12> SrcType; |
600 | typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType; |
601 | typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType; |
602 | |
603 | for(int i = 0; i < 4; ++i ) { |
604 | if(i != 2) { // doesn't make sense to flog a non-throwing test |
605 | bool doThrow = (i & 0x1) != 0; |
606 | bool doFlog = (i & 0x2) != 0; |
607 | run_one_functype_node_test< |
608 | /*SourceNodeType*/ SrcType, |
609 | /*SourceNodeBodyType0*/ SBodyType1, |
610 | /*SourceNodeBodyType1*/ SBodyType2, |
611 | /* NFT */ func_node_type, |
612 | /*TestNodeType*/ TestType, |
613 | /*TestNodeBodyType*/ TestBodyType, |
614 | /*TypeToSink0 */ Item23, |
615 | /*TypeToSink1 */ Item23, |
616 | /*SinkNodeType0*/ SnkType, |
617 | /*SinkNodeType1*/ SnkType, |
618 | /*SinkNodeBodyType1*/ SinkBodyType1, |
619 | /*SinkNodeBodyType2*/ SinkBodyType2, |
620 | /*Conc*/ Conc> |
621 | (doThrow,doFlog,"function_node" ); |
622 | } |
623 | } |
624 | } // run_function_node_test |
625 | |
626 | void test_function_node() { |
627 | REMARK("Testing function_node\n" ); |
628 | // serial rejecting |
629 | g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?" ; |
630 | run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
631 | g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?" ; |
632 | run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
633 | g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?" ; |
634 | run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
635 | |
636 | // serial queueing |
637 | g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?" ; |
638 | run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
639 | run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
640 | run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
641 | check_type<int>::check_type_counter = 0; |
642 | run_function_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, check_type<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
643 | ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test" ); |
644 | |
645 | // unlimited parallel rejecting |
646 | g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?" ; |
647 | run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); |
648 | run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); |
649 | run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); |
650 | |
651 | // limited parallel rejecting |
652 | g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?" ; |
653 | run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); |
654 | run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); |
655 | run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); |
656 | |
657 | // limited parallel queueing |
658 | g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?" ; |
659 | run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
660 | run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
661 | run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
662 | |
663 | // everyone throwing |
664 | g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?" ; |
665 | run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); |
666 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
667 | } |
668 | |
669 | // ----------------------------------- multifunction_node ---------------------------------- |
670 | // Test multifunction_node. |
671 | // |
672 | // graph being tested is |
673 | // |
674 | // source_node -\ /- parallel function_node |
675 | // \ / |
676 | // +multifunction_node+ |
677 | // / \ x |
678 | // source_node -/ \- parallel function_node |
679 | // |
680 | // After each run the graph is reset(), to test the reset functionality. The |
681 | // multifunction_node will put an item to each successor for every item |
682 | // received. |
683 | // |
684 | template< |
685 | TestNodeTypeEnum SType0, // does source node 1 throw? |
686 | TestNodeTypeEnum SType1, // does source node 2 thorw? |
687 | class Item12, // type of item passed between sources and test node |
688 | TestNodeTypeEnum FType, // does multifunction node throw? |
689 | class ItemTuple, // tuple of types passed from multifunction_node to sink nodes |
690 | TestNodeTypeEnum NType1, // does sink node 1 throw? |
691 | TestNodeTypeEnum NType2, // does sink node 2 throw? |
692 | class NodePolicy, // rejecting,queueing |
693 | size_t Conc // is node concurrent? {serial | limited | unlimited} |
694 | > |
695 | void run_multifunction_node_test() { |
696 | |
697 | typedef typename tbb::flow::tuple_element<0,ItemTuple>::type Item23Type0; |
698 | typedef typename tbb::flow::tuple_element<1,ItemTuple>::type Item23Type1; |
699 | typedef test_source_body<Item12,SType0> SBodyType1; |
700 | typedef test_source_body<Item12,SType1> SBodyType2; |
701 | typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType; |
702 | typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1; |
703 | typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2; |
704 | |
705 | typedef tbb::flow::source_node<Item12> SrcType; |
706 | typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType; |
707 | typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0; |
708 | typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1; |
709 | |
710 | for(int i = 0; i < 4; ++i ) { |
711 | if(i != 2) { // doesn't make sense to flog a non-throwing test |
712 | bool doThrow = (i & 0x1) != 0; |
713 | bool doFlog = (i & 0x2) != 0; |
714 | run_one_functype_node_test< |
715 | /*SourceNodeType*/ SrcType, |
716 | /*SourceNodeBodyType0*/ SBodyType1, |
717 | /*SourceNodeBodyType1*/ SBodyType2, |
718 | /*NFT*/ multifunc_node_type, |
719 | /*TestNodeType*/ TestType, |
720 | /*TestNodeBodyType*/ TestBodyType, |
721 | /*TypeToSink0*/ Item23Type0, |
722 | /*TypeToSink1*/ Item23Type1, |
723 | /*SinkNodeType0*/ SnkType0, |
724 | /*SinkNodeType1*/ SnkType1, |
725 | /*SinkNodeBodyType0*/ SinkBodyType1, |
726 | /*SinkNodeBodyType1*/ SinkBodyType2, |
727 | /*Conc*/ Conc> |
728 | (doThrow,doFlog,"multifunction_node" ); |
729 | } |
730 | } |
731 | } // run_multifunction_node_test |
732 | |
733 | void test_multifunction_node() { |
734 | REMARK("Testing multifunction_node\n" ); |
735 | g_Wakeup_Msg = "multifunction_node(source throws,rejecting,serial): Missed wakeup or machine is overloaded?" ; |
736 | // serial rejecting |
737 | run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
738 | g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?" ; |
739 | run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
740 | g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?" ; |
741 | run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>(); |
742 | |
743 | g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?" ; |
744 | // serial queueing |
745 | run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
746 | run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
747 | run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
748 | check_type<int>::check_type_counter = 0; |
749 | run_multifunction_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, tbb::flow::tuple<check_type<int>, check_type<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>(); |
750 | ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test" ); |
751 | |
752 | g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?" ; |
753 | // unlimited parallel rejecting |
754 | run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); |
755 | run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>(); |
756 | run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); |
757 | |
758 | g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?" ; |
759 | // limited parallel rejecting |
760 | run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>(); |
761 | run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>(); |
762 | run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>(); |
763 | |
764 | g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?" ; |
765 | // limited parallel queueing |
766 | run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
767 | run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
768 | run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>(); |
769 | |
770 | g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?" ; |
771 | // everyone throwing |
772 | run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, tbb::flow::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>(); |
773 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
774 | } |
775 | |
776 | // |
777 | // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors |
778 | // it executes the body of the node, and forwards a continue_msg to its successors. |
779 | // However many predecessors the continue_node has, that's how many continue_msgs it receives |
780 | // on input before forwarding a message. |
781 | // |
782 | // The graph will look like |
783 | // |
784 | // +broadcast_node+ |
785 | // / \ ___ |
786 | // source_node+------>+broadcast_node+ +continue_node+--->+absorber |
787 | // \ / |
788 | // +broadcast_node+ |
789 | // |
790 | // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors. |
791 | // The absorber is parallel, so each item emitted by the source will result in one thread |
792 | // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if |
793 | // we are allowed to. |
794 | |
795 | template < class SourceNodeType, class SourceNodeBodyType, class TTestNodeType, class TestNodeBodyType, |
796 | class SinkNodeType, class SinkNodeBodyType> |
797 | void run_one_continue_node_test (bool throwException, bool flog) { |
798 | tbb::flow::graph g; |
799 | |
800 | tbb::atomic<int> source_count; |
801 | tbb::atomic<int> test_count; |
802 | tbb::atomic<int> sink_count; |
803 | source_count = test_count = sink_count = 0; |
804 | #if USE_TASK_SCHEDULER_OBSERVER |
805 | eh_test_observer o; |
806 | o.observe(true); |
807 | #endif |
808 | g_Master = Harness::CurrentTid(); |
809 | SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false); |
810 | TTestNodeType node_to_test(g, TestNodeBodyType(test_count)); |
811 | SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); |
812 | tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g); |
813 | make_edge(source, b1); |
814 | make_edge(b1,b2); |
815 | make_edge(b1,b3); |
816 | make_edge(b2,node_to_test); |
817 | make_edge(b3,node_to_test); |
818 | make_edge(node_to_test, sink); |
819 | for(int iter = 0; iter < 2; ++iter) { |
820 | ResetGlobals(throwException,flog); |
821 | if(throwException) { |
822 | TRY(); |
823 | source.activate(); |
824 | g.wait_for_all(); |
825 | CATCH_AND_ASSERT(); |
826 | } |
827 | else { |
828 | TRY(); |
829 | source.activate(); |
830 | g.wait_for_all(); |
831 | CATCH_AND_FAIL(); |
832 | } |
833 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
834 | int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(); |
835 | int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(); |
836 | int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); |
837 | if(throwException) { |
838 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
839 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
840 | ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources" ); |
841 | ASSERT(sb_cnt >= t_cnt, "Too many items received by test node" ); |
842 | ASSERT(nb_cnt <= t_cnt, "Too many items received by sink nodes" ); |
843 | } |
844 | else { |
845 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
846 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
847 | ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node" ); |
848 | ASSERT(t_cnt == g_NumItems, "Not all items reached test node" ); |
849 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
850 | } |
851 | g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes. |
852 | source_count = test_count = sink_count = 0; |
853 | ASSERT(0 == (int)test_count, "Atomic wasn't reset properly" ); |
854 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed" ); |
855 | ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed" ); |
856 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed" ); |
857 | } |
858 | #if USE_TASK_SCHEDULER_OBSERVER |
859 | o.observe(false); |
860 | #endif |
861 | } |
862 | |
863 | template< |
864 | class ItemType, |
865 | TestNodeTypeEnum SType, // does source node throw? |
866 | TestNodeTypeEnum CType, // does continue_node throw? |
867 | TestNodeTypeEnum AType> // does absorber throw |
868 | void run_continue_node_test() { |
869 | typedef test_source_body<tbb::flow::continue_msg,SType> SBodyType; |
870 | typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType; |
871 | typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType; |
872 | |
873 | typedef tbb::flow::source_node<tbb::flow::continue_msg> SrcType; |
874 | typedef tbb::flow::continue_node<ItemType> TestType; |
875 | typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType; |
876 | |
877 | for(int i = 0; i < 4; ++i ) { |
878 | if(i == 2) continue; // don't run (false,true); it doesn't make sense. |
879 | bool doThrow = (i & 0x1) != 0; |
880 | bool doFlog = (i & 0x2) != 0; |
881 | run_one_continue_node_test< |
882 | /*SourceNodeType*/ SrcType, |
883 | /*SourceNodeBodyType*/ SBodyType, |
884 | /*TestNodeType*/ TestType, |
885 | /*TestNodeBodyType*/ ContBodyType, |
886 | /*SinkNodeType*/ SnkType, |
887 | /*SinkNodeBodyType*/ SinkBodyType> |
888 | (doThrow,doFlog); |
889 | } |
890 | } |
891 | |
892 | // |
893 | void test_continue_node() { |
894 | REMARK("Testing continue_node\n" ); |
895 | g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?" ; |
896 | run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>(); |
897 | g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?" ; |
898 | run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>(); |
899 | g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?" ; |
900 | run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>(); |
901 | g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?" ; |
902 | run_continue_node_test<int,isThrowing,isThrowing,isThrowing>(); |
903 | check_type<double>::check_type_counter = 0; |
904 | run_continue_node_test<check_type<double>,isThrowing,isThrowing,isThrowing>(); |
905 | ASSERT(!check_type<double>::check_type_counter, "Dropped objects in continue_node test" ); |
906 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
907 | } |
908 | |
909 | // ---------- buffer_node queue_node overwrite_node -------------- |
910 | |
911 | template< |
912 | class BufferItemType, // |
913 | class SourceNodeType, |
914 | class SourceNodeBodyType, |
915 | class TestNodeType, |
916 | class SinkNodeType, |
917 | class SinkNodeBodyType > |
918 | void run_one_buffer_node_test(bool throwException,bool flog) { |
919 | tbb::flow::graph g; |
920 | |
921 | tbb::atomic<int> source_count; |
922 | tbb::atomic<int> sink_count; |
923 | source_count = sink_count = 0; |
924 | #if USE_TASK_SCHEDULER_OBSERVER |
925 | eh_test_observer o; |
926 | o.observe(true); |
927 | #endif |
928 | g_Master = Harness::CurrentTid(); |
929 | SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false); |
930 | TestNodeType node_to_test(g); |
931 | SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); |
932 | make_edge(source,node_to_test); |
933 | make_edge(node_to_test, sink); |
934 | for(int iter = 0; iter < 2; ++iter) { |
935 | ResetGlobals(throwException,flog); |
936 | if(throwException) { |
937 | TRY(); |
938 | source.activate(); |
939 | g.wait_for_all(); |
940 | CATCH_AND_ASSERT(); |
941 | } |
942 | else { |
943 | TRY(); |
944 | source.activate(); |
945 | g.wait_for_all(); |
946 | CATCH_AND_FAIL(); |
947 | } |
948 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
949 | int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(); |
950 | int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); |
951 | if(throwException) { |
952 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
953 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
954 | ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources" ); |
955 | ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes" ); |
956 | } |
957 | else { |
958 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
959 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
960 | ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node" ); |
961 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
962 | } |
963 | if(iter == 0) { |
964 | remove_edge(node_to_test, sink); |
965 | node_to_test.try_put(BufferItemType()); |
966 | g.wait_for_all(); |
967 | g.reset(); |
968 | source_count = sink_count = 0; |
969 | BufferItemType tmp; |
970 | ASSERT(!node_to_test.try_get(tmp), "node not empty" ); |
971 | make_edge(node_to_test, sink); |
972 | g.wait_for_all(); |
973 | } |
974 | else { |
975 | g.reset(); |
976 | source_count = sink_count = 0; |
977 | } |
978 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed" ); |
979 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed" ); |
980 | } |
981 | |
982 | #if USE_TASK_SCHEDULER_OBSERVER |
983 | o.observe(false); |
984 | #endif |
985 | } |
986 | template<class BufferItemType, |
987 | TestNodeTypeEnum SourceThrowType, |
988 | TestNodeTypeEnum SinkThrowType> |
989 | void run_buffer_queue_and_overwrite_node_test() { |
990 | typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType; |
991 | typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
992 | |
993 | typedef tbb::flow::source_node<BufferItemType> SrcType; |
994 | typedef tbb::flow::buffer_node<BufferItemType> BufType; |
995 | typedef tbb::flow::queue_node<BufferItemType> QueType; |
996 | typedef tbb::flow::overwrite_node<BufferItemType> OvrType; |
997 | typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; |
998 | |
999 | for(int i = 0; i < 4; ++i) { |
1000 | if(i == 2) continue; // no need to test flog w/o throws |
1001 | bool throwException = (i & 0x1) != 0; |
1002 | bool doFlog = (i & 0x2) != 0; |
1003 | #if TBB_RUN_BUFFERING_TEST |
1004 | run_one_buffer_node_test< |
1005 | /* class BufferItemType*/ BufferItemType, |
1006 | /*class SourceNodeType*/ SrcType, |
1007 | /*class SourceNodeBodyType*/ SourceBodyType, |
1008 | /*class TestNodeType*/ BufType, |
1009 | /*class SinkNodeType*/ SnkType, |
1010 | /*class SinkNodeBodyType*/ SinkBodyType |
1011 | >(throwException, doFlog); |
1012 | run_one_buffer_node_test< |
1013 | /* class BufferItemType*/ BufferItemType, |
1014 | /*class SourceNodeType*/ SrcType, |
1015 | /*class SourceNodeBodyType*/ SourceBodyType, |
1016 | /*class TestNodeType*/ QueType, |
1017 | /*class SinkNodeType*/ SnkType, |
1018 | /*class SinkNodeBodyType*/ SinkBodyType |
1019 | >(throwException, doFlog); |
1020 | #endif |
1021 | run_one_buffer_node_test< |
1022 | /* class BufferItemType*/ BufferItemType, |
1023 | /*class SourceNodeType*/ SrcType, |
1024 | /*class SourceNodeBodyType*/ SourceBodyType, |
1025 | /*class TestNodeType*/ OvrType, |
1026 | /*class SinkNodeType*/ SnkType, |
1027 | /*class SinkNodeBodyType*/ SinkBodyType |
1028 | >(throwException, doFlog); |
1029 | } |
1030 | } |
1031 | |
1032 | void test_buffer_queue_and_overwrite_node() { |
1033 | REMARK("Testing buffer_node, queue_node and overwrite_node\n" ); |
1034 | #if TBB_RUN_BUFFERING_TEST |
1035 | #else |
1036 | REMARK("skip buffer and queue test (known issue)\n" ); |
1037 | #endif |
1038 | g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?" ; |
1039 | run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>(); |
1040 | g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?" ; |
1041 | run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>(); |
1042 | g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?" ; |
1043 | run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>(); |
1044 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1045 | } |
1046 | |
1047 | // ---------- sequencer_node ------------------------- |
1048 | |
1049 | |
1050 | template< |
1051 | class BufferItemType, // |
1052 | class SourceNodeType, |
1053 | class SourceNodeBodyType, |
1054 | class TestNodeType, |
1055 | class SeqBodyType, |
1056 | class SinkNodeType, |
1057 | class SinkNodeBodyType > |
1058 | void run_one_sequencer_node_test(bool throwException,bool flog) { |
1059 | tbb::flow::graph g; |
1060 | |
1061 | tbb::atomic<int> source_count; |
1062 | tbb::atomic<int> sink_count; |
1063 | source_count = sink_count = 0; |
1064 | #if USE_TASK_SCHEDULER_OBSERVER |
1065 | eh_test_observer o; |
1066 | o.observe(true); |
1067 | #endif |
1068 | g_Master = Harness::CurrentTid(); |
1069 | SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false); |
1070 | TestNodeType node_to_test(g,SeqBodyType()); |
1071 | SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); |
1072 | make_edge(source,node_to_test); |
1073 | make_edge(node_to_test, sink); |
1074 | for(int iter = 0; iter < 2; ++iter) { |
1075 | ResetGlobals(throwException,flog); |
1076 | if(throwException) { |
1077 | TRY(); |
1078 | source.activate(); |
1079 | g.wait_for_all(); |
1080 | CATCH_AND_ASSERT(); |
1081 | } |
1082 | else { |
1083 | TRY(); |
1084 | source.activate(); |
1085 | g.wait_for_all(); |
1086 | CATCH_AND_FAIL(); |
1087 | } |
1088 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1089 | int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(); |
1090 | int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); |
1091 | if(throwException) { |
1092 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1093 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1094 | ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources" ); |
1095 | ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes" ); |
1096 | } |
1097 | else { |
1098 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1099 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1100 | ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node" ); |
1101 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
1102 | } |
1103 | if(iter == 0) { |
1104 | remove_edge(node_to_test, sink); |
1105 | node_to_test.try_put(BufferItemType(g_NumItems + 1)); |
1106 | node_to_test.try_put(BufferItemType(1)); |
1107 | g.wait_for_all(); |
1108 | g.reset(); |
1109 | source_count = sink_count = 0; |
1110 | make_edge(node_to_test, sink); |
1111 | g.wait_for_all(); |
1112 | } |
1113 | else { |
1114 | g.reset(); |
1115 | source_count = sink_count = 0; |
1116 | } |
1117 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed" ); |
1118 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed" ); |
1119 | } |
1120 | |
1121 | #if USE_TASK_SCHEDULER_OBSERVER |
1122 | o.observe(false); |
1123 | #endif |
1124 | } |
1125 | |
1126 | template<class BufferItemType, |
1127 | TestNodeTypeEnum SourceThrowType, |
1128 | TestNodeTypeEnum SinkThrowType> |
1129 | void run_sequencer_node_test() { |
1130 | typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType; |
1131 | typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
1132 | typedef sequencer_body<BufferItemType> SeqBodyType; |
1133 | |
1134 | typedef tbb::flow::source_node<BufferItemType> SrcType; |
1135 | typedef tbb::flow::sequencer_node<BufferItemType> SeqType; |
1136 | typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; |
1137 | |
1138 | for(int i = 0; i < 4; ++i) { |
1139 | if(i == 2) continue; // no need to test flog w/o throws |
1140 | bool throwException = (i & 0x1) != 0; |
1141 | bool doFlog = (i & 0x2) != 0; |
1142 | run_one_sequencer_node_test< |
1143 | /* class BufferItemType*/ BufferItemType, |
1144 | /*class SourceNodeType*/ SrcType, |
1145 | /*class SourceNodeBodyType*/ SourceBodyType, |
1146 | /*class TestNodeType*/ SeqType, |
1147 | /*class SeqBodyType*/ SeqBodyType, |
1148 | /*class SinkNodeType*/ SnkType, |
1149 | /*class SinkNodeBodyType*/ SinkBodyType |
1150 | >(throwException, doFlog); |
1151 | } |
1152 | } |
1153 | |
1154 | |
1155 | |
1156 | void test_sequencer_node() { |
1157 | REMARK("Testing sequencer_node\n" ); |
1158 | g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?" ; |
1159 | run_sequencer_node_test<int, isThrowing,nonThrowing>(); |
1160 | check_type<int>::check_type_counter = 0; |
1161 | g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?" ; |
1162 | run_sequencer_node_test<check_type<int>, nonThrowing,isThrowing>(); |
1163 | ASSERT(!check_type<int>::check_type_counter, "Dropped objects in sequencer_node test" ); |
1164 | g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?" ; |
1165 | run_sequencer_node_test<int, isThrowing,isThrowing>(); |
1166 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1167 | } |
1168 | |
1169 | // ------------ priority_queue_node ------------------ |
1170 | |
1171 | template< |
1172 | class BufferItemType, |
1173 | class SourceNodeType, |
1174 | class SourceNodeBodyType, |
1175 | class TestNodeType, |
1176 | class SinkNodeType, |
1177 | class SinkNodeBodyType > |
1178 | void run_one_priority_queue_node_test(bool throwException,bool flog) { |
1179 | tbb::flow::graph g; |
1180 | |
1181 | tbb::atomic<int> source_count; |
1182 | tbb::atomic<int> sink_count; |
1183 | source_count = sink_count = 0; |
1184 | #if USE_TASK_SCHEDULER_OBSERVER |
1185 | eh_test_observer o; |
1186 | o.observe(true); |
1187 | #endif |
1188 | g_Master = Harness::CurrentTid(); |
1189 | SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false); |
1190 | |
1191 | TestNodeType node_to_test(g); |
1192 | |
1193 | SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); |
1194 | |
1195 | make_edge(source,node_to_test); |
1196 | make_edge(node_to_test, sink); |
1197 | for(int iter = 0; iter < 2; ++iter) { |
1198 | ResetGlobals(throwException,flog); |
1199 | if(throwException) { |
1200 | TRY(); |
1201 | source.activate(); |
1202 | g.wait_for_all(); |
1203 | CATCH_AND_ASSERT(); |
1204 | } |
1205 | else { |
1206 | TRY(); |
1207 | source.activate(); |
1208 | g.wait_for_all(); |
1209 | CATCH_AND_FAIL(); |
1210 | } |
1211 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1212 | int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(); |
1213 | int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); |
1214 | if(throwException) { |
1215 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1216 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1217 | ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources" ); |
1218 | ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes" ); |
1219 | } |
1220 | else { |
1221 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1222 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1223 | ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node" ); |
1224 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
1225 | } |
1226 | if(iter == 0) { |
1227 | remove_edge(node_to_test, sink); |
1228 | node_to_test.try_put(BufferItemType(g_NumItems + 1)); |
1229 | node_to_test.try_put(BufferItemType(g_NumItems + 2)); |
1230 | node_to_test.try_put(BufferItemType()); |
1231 | g.wait_for_all(); |
1232 | g.reset(); |
1233 | source_count = sink_count = 0; |
1234 | make_edge(node_to_test, sink); |
1235 | g.wait_for_all(); |
1236 | } |
1237 | else { |
1238 | g.reset(); |
1239 | source_count = sink_count = 0; |
1240 | } |
1241 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed" ); |
1242 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed" ); |
1243 | } |
1244 | |
1245 | #if USE_TASK_SCHEDULER_OBSERVER |
1246 | o.observe(false); |
1247 | #endif |
1248 | } |
1249 | |
1250 | template<class BufferItemType, |
1251 | TestNodeTypeEnum SourceThrowType, |
1252 | TestNodeTypeEnum SinkThrowType> |
1253 | void run_priority_queue_node_test() { |
1254 | typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType; |
1255 | typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
1256 | typedef less_body<BufferItemType> LessBodyType; |
1257 | |
1258 | typedef tbb::flow::source_node<BufferItemType> SrcType; |
1259 | typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType; |
1260 | typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; |
1261 | |
1262 | for(int i = 0; i < 4; ++i) { |
1263 | if(i == 2) continue; // no need to test flog w/o throws |
1264 | bool throwException = (i & 0x1) != 0; |
1265 | bool doFlog = (i & 0x2) != 0; |
1266 | run_one_priority_queue_node_test< |
1267 | /* class BufferItemType*/ BufferItemType, |
1268 | /*class SourceNodeType*/ SrcType, |
1269 | /*class SourceNodeBodyType*/ SourceBodyType, |
1270 | /*class TestNodeType*/ PrqType, |
1271 | /*class SinkNodeType*/ SnkType, |
1272 | /*class SinkNodeBodyType*/ SinkBodyType |
1273 | >(throwException, doFlog); |
1274 | } |
1275 | } |
1276 | |
1277 | void test_priority_queue_node() { |
1278 | REMARK("Testing priority_queue_node\n" ); |
1279 | g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?" ; |
1280 | run_priority_queue_node_test<int, isThrowing,nonThrowing>(); |
1281 | check_type<int>::check_type_counter = 0; |
1282 | g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?" ; |
1283 | run_priority_queue_node_test<check_type<int>, nonThrowing,isThrowing>(); |
1284 | ASSERT(!check_type<int>::check_type_counter, "Dropped objects in priority_queue_node test" ); |
1285 | g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?" ; |
1286 | run_priority_queue_node_test<int, isThrowing,isThrowing>(); |
1287 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1288 | } |
1289 | |
1290 | // ------------------- join_node ---------------- |
1291 | template<class JP> struct graph_policy_name{ |
1292 | static const char* name() {return "unknown" ; } |
1293 | }; |
1294 | template<> struct graph_policy_name<tbb::flow::queueing> { |
1295 | static const char* name() {return "queueing" ; } |
1296 | }; |
1297 | template<> struct graph_policy_name<tbb::flow::reserving> { |
1298 | static const char* name() {return "reserving" ; } |
1299 | }; |
1300 | template<> struct graph_policy_name<tbb::flow::tag_matching> { |
1301 | static const char* name() {return "tag_matching" ; } |
1302 | }; |
1303 | |
1304 | |
1305 | template< |
1306 | class JP, |
1307 | class OutputTuple, |
1308 | class SourceType0, |
1309 | class SourceBodyType0, |
1310 | class SourceType1, |
1311 | class SourceBodyType1, |
1312 | class TestJoinType, |
1313 | class SinkType, |
1314 | class SinkBodyType |
1315 | > |
1316 | struct run_one_join_node_test { |
1317 | run_one_join_node_test() {} |
1318 | static void execute_test(bool throwException,bool flog) { |
1319 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0; |
1320 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1; |
1321 | |
1322 | tbb::flow::graph g; |
1323 | tbb::atomic<int>source0_count; |
1324 | tbb::atomic<int>source1_count; |
1325 | tbb::atomic<int>sink_count; |
1326 | source0_count = source1_count = sink_count = 0; |
1327 | #if USE_TASK_SCHEDULER_OBSERVER |
1328 | eh_test_observer o; |
1329 | o.observe(true); |
1330 | #endif |
1331 | g_Master = Harness::CurrentTid(); |
1332 | SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false); |
1333 | SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false); |
1334 | TestJoinType node_to_test(g); |
1335 | SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); |
1336 | make_edge(source0,tbb::flow::input_port<0>(node_to_test)); |
1337 | make_edge(source1,tbb::flow::input_port<1>(node_to_test)); |
1338 | make_edge(node_to_test, sink); |
1339 | for(int iter = 0; iter < 2; ++iter) { |
1340 | ResetGlobals(throwException,flog); |
1341 | if(throwException) { |
1342 | TRY(); |
1343 | source0.activate(); |
1344 | source1.activate(); |
1345 | g.wait_for_all(); |
1346 | CATCH_AND_ASSERT(); |
1347 | } |
1348 | else { |
1349 | TRY(); |
1350 | source0.activate(); |
1351 | source1.activate(); |
1352 | g.wait_for_all(); |
1353 | CATCH_AND_FAIL(); |
1354 | } |
1355 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1356 | int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value(); |
1357 | int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value(); |
1358 | int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1359 | if(throwException) { |
1360 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1361 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1362 | ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources" ); |
1363 | ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes" ); |
1364 | } |
1365 | else { |
1366 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1367 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1368 | if(sb0_cnt != g_NumItems) { |
1369 | REMARK("throwException == %s\n" , throwException ? "true" : "false" ); |
1370 | REMARK("iter == %d\n" , (int)iter); |
1371 | REMARK("sb0_cnt == %d\n" , (int)sb0_cnt); |
1372 | REMARK("g_NumItems == %d\n" , (int)g_NumItems); |
1373 | } |
1374 | ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0" ); // this one |
1375 | ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1" ); |
1376 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
1377 | } |
1378 | if(iter == 0) { |
1379 | remove_edge(node_to_test, sink); |
1380 | tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1)); |
1381 | tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); |
1382 | g.wait_for_all(); |
1383 | g.reset(); |
1384 | source0_count = source1_count = sink_count = 0; |
1385 | make_edge(node_to_test, sink); |
1386 | g.wait_for_all(); |
1387 | } |
1388 | else { |
1389 | g.wait_for_all(); |
1390 | g.reset(); |
1391 | source0_count = source1_count = sink_count = 0; |
1392 | } |
1393 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed" ); |
1394 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed" ); |
1395 | nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1396 | ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed" ); |
1397 | } |
1398 | |
1399 | #if USE_TASK_SCHEDULER_OBSERVER |
1400 | o.observe(false); |
1401 | #endif |
1402 | } |
1403 | }; // run_one_join_node_test |
1404 | |
1405 | template< |
1406 | class OutputTuple, |
1407 | class SourceType0, |
1408 | class SourceBodyType0, |
1409 | class SourceType1, |
1410 | class SourceBodyType1, |
1411 | class TestJoinType, |
1412 | class SinkType, |
1413 | class SinkBodyType |
1414 | > |
1415 | struct run_one_join_node_test< |
1416 | tbb::flow::tag_matching, |
1417 | OutputTuple, |
1418 | SourceType0, |
1419 | SourceBodyType0, |
1420 | SourceType1, |
1421 | SourceBodyType1, |
1422 | TestJoinType, |
1423 | SinkType, |
1424 | SinkBodyType |
1425 | > { |
1426 | run_one_join_node_test() {} |
1427 | static void execute_test(bool throwException,bool flog) { |
1428 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0; |
1429 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1; |
1430 | |
1431 | tbb::flow::graph g; |
1432 | |
1433 | tbb::atomic<int>source0_count; |
1434 | tbb::atomic<int>source1_count; |
1435 | tbb::atomic<int>sink_count; |
1436 | source0_count = source1_count = sink_count = 0; |
1437 | #if USE_TASK_SCHEDULER_OBSERVER |
1438 | eh_test_observer o; |
1439 | o.observe(true); |
1440 | #endif |
1441 | g_Master = Harness::CurrentTid(); |
1442 | SourceType0 source0(g, SourceBodyType0(source0_count, 2),/*is_active*/false); |
1443 | SourceType1 source1(g, SourceBodyType1(source1_count, 3),/*is_active*/false); |
1444 | TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3))); |
1445 | SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); |
1446 | make_edge(source0,tbb::flow::input_port<0>(node_to_test)); |
1447 | make_edge(source1,tbb::flow::input_port<1>(node_to_test)); |
1448 | make_edge(node_to_test, sink); |
1449 | for(int iter = 0; iter < 2; ++iter) { |
1450 | ResetGlobals(throwException,flog); |
1451 | if(throwException) { |
1452 | TRY(); |
1453 | source0.activate(); |
1454 | source1.activate(); |
1455 | g.wait_for_all(); |
1456 | CATCH_AND_ASSERT(); |
1457 | } |
1458 | else { |
1459 | TRY(); |
1460 | source0.activate(); |
1461 | source1.activate(); |
1462 | g.wait_for_all(); |
1463 | CATCH_AND_FAIL(); |
1464 | } |
1465 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1466 | int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value(); |
1467 | int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value(); |
1468 | int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1469 | if(throwException) { |
1470 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1471 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1472 | ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources" ); |
1473 | ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes" ); |
1474 | } |
1475 | else { |
1476 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1477 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1478 | ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0" ); |
1479 | ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1" ); |
1480 | ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers" ); |
1481 | } |
1482 | if(iter == 0) { |
1483 | remove_edge(node_to_test, sink); |
1484 | tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); |
1485 | tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); |
1486 | g.wait_for_all(); // have to wait for the graph to stop again.... |
1487 | g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes. |
1488 | source0_count = source1_count = sink_count = 0; |
1489 | make_edge(node_to_test, sink); |
1490 | g.wait_for_all(); // have to wait for the graph to stop again.... |
1491 | } |
1492 | else { |
1493 | g.wait_for_all(); |
1494 | g.reset(); |
1495 | source0_count = source1_count = sink_count = 0; |
1496 | } |
1497 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed" ); |
1498 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed" ); |
1499 | nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1500 | ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed" ); |
1501 | } |
1502 | |
1503 | #if USE_TASK_SCHEDULER_OBSERVER |
1504 | o.observe(false); |
1505 | #endif |
1506 | } |
1507 | }; // run_one_join_node_test<tag_matching> |
1508 | |
1509 | template<class JP, class OutputTuple, |
1510 | TestNodeTypeEnum SourceThrowType, |
1511 | TestNodeTypeEnum SinkThrowType> |
1512 | void run_join_node_test() { |
1513 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0; |
1514 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1; |
1515 | typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0; |
1516 | typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1; |
1517 | typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
1518 | |
1519 | typedef typename tbb::flow::source_node<ItemType0> SourceType0; |
1520 | typedef typename tbb::flow::source_node<ItemType1> SourceType1; |
1521 | typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType; |
1522 | typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType; |
1523 | |
1524 | for(int i = 0; i < 4; ++i) { |
1525 | if(2 == i) continue; |
1526 | bool throwException = (i & 0x1) != 0; |
1527 | bool doFlog = (i & 0x2) != 0; |
1528 | run_one_join_node_test< |
1529 | JP, |
1530 | OutputTuple, |
1531 | SourceType0, |
1532 | SourceBodyType0, |
1533 | SourceType1, |
1534 | SourceBodyType1, |
1535 | TestJoinType, |
1536 | SinkType, |
1537 | SinkBodyType>::execute_test(throwException,doFlog); |
1538 | } |
1539 | } |
1540 | |
1541 | template<class JP> |
1542 | void test_join_node() { |
1543 | REMARK("Testing join_node<%s>\n" , graph_policy_name<JP>::name()); |
1544 | // only doing two-input joins |
1545 | g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?" ; |
1546 | run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, nonThrowing>(); |
1547 | check_type<int>::check_type_counter = 0; |
1548 | g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?" ; |
1549 | run_join_node_test<JP, tbb::flow::tuple<check_type<int>,int>, nonThrowing, isThrowing>(); |
1550 | ASSERT(!check_type<int>::check_type_counter, "Dropped items in test" ); |
1551 | g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?" ; |
1552 | run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, isThrowing>(); |
1553 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1554 | } |
1555 | |
1556 | // ------------------- limiter_node ------------- |
1557 | |
1558 | template< |
1559 | class BufferItemType, // |
1560 | class SourceNodeType, |
1561 | class SourceNodeBodyType, |
1562 | class TestNodeType, |
1563 | class SinkNodeType, |
1564 | class SinkNodeBodyType > |
1565 | void run_one_limiter_node_test(bool throwException,bool flog) { |
1566 | tbb::flow::graph g; |
1567 | |
1568 | tbb::atomic<int> source_count; |
1569 | tbb::atomic<int> sink_count; |
1570 | source_count = sink_count = 0; |
1571 | #if USE_TASK_SCHEDULER_OBSERVER |
1572 | eh_test_observer o; |
1573 | o.observe(true); |
1574 | #endif |
1575 | g_Master = Harness::CurrentTid(); |
1576 | SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false); |
1577 | TestNodeType node_to_test(g,g_NumThreads + 1); |
1578 | SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count)); |
1579 | make_edge(source,node_to_test); |
1580 | make_edge(node_to_test, sink); |
1581 | for(int iter = 0; iter < 2; ++iter) { |
1582 | ResetGlobals(throwException,flog); |
1583 | if(throwException) { |
1584 | TRY(); |
1585 | source.activate(); |
1586 | g.wait_for_all(); |
1587 | CATCH_AND_ASSERT(); |
1588 | } |
1589 | else { |
1590 | TRY(); |
1591 | source.activate(); |
1592 | g.wait_for_all(); |
1593 | CATCH_AND_FAIL(); |
1594 | } |
1595 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1596 | int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(); |
1597 | int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(); |
1598 | if(throwException) { |
1599 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1600 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1601 | ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources" ); |
1602 | ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes" ); |
1603 | } |
1604 | else { |
1605 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1606 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1607 | // we stop after limiter's limit, which is g_NumThreads + 1. The source_node |
1608 | // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2. |
1609 | ASSERT(sb_cnt == g_NumThreads + 2, "Missing invocations of source_node" ); |
1610 | ASSERT(nb_cnt == g_NumThreads + 1, "Missing items in absorbers" ); |
1611 | } |
1612 | if(iter == 0) { |
1613 | remove_edge(node_to_test, sink); |
1614 | node_to_test.try_put(BufferItemType()); |
1615 | node_to_test.try_put(BufferItemType()); |
1616 | g.wait_for_all(); |
1617 | g.reset(); |
1618 | source_count = sink_count = 0; |
1619 | BufferItemType tmp; |
1620 | ASSERT(!node_to_test.try_get(tmp), "node not empty" ); |
1621 | make_edge(node_to_test, sink); |
1622 | g.wait_for_all(); |
1623 | } |
1624 | else { |
1625 | g.reset(); |
1626 | source_count = sink_count = 0; |
1627 | } |
1628 | ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed" ); |
1629 | ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed" ); |
1630 | } |
1631 | |
1632 | #if USE_TASK_SCHEDULER_OBSERVER |
1633 | o.observe(false); |
1634 | #endif |
1635 | } |
1636 | |
1637 | template<class BufferItemType, |
1638 | TestNodeTypeEnum SourceThrowType, |
1639 | TestNodeTypeEnum SinkThrowType> |
1640 | void run_limiter_node_test() { |
1641 | typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType; |
1642 | typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
1643 | |
1644 | typedef tbb::flow::source_node<BufferItemType> SrcType; |
1645 | typedef tbb::flow::limiter_node<BufferItemType> LmtType; |
1646 | typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType; |
1647 | |
1648 | for(int i = 0; i < 4; ++i) { |
1649 | if(i == 2) continue; // no need to test flog w/o throws |
1650 | bool throwException = (i & 0x1) != 0; |
1651 | bool doFlog = (i & 0x2) != 0; |
1652 | run_one_limiter_node_test< |
1653 | /* class BufferItemType*/ BufferItemType, |
1654 | /*class SourceNodeType*/ SrcType, |
1655 | /*class SourceNodeBodyType*/ SourceBodyType, |
1656 | /*class TestNodeType*/ LmtType, |
1657 | /*class SinkNodeType*/ SnkType, |
1658 | /*class SinkNodeBodyType*/ SinkBodyType |
1659 | >(throwException, doFlog); |
1660 | } |
1661 | } |
1662 | |
1663 | void test_limiter_node() { |
1664 | REMARK("Testing limiter_node\n" ); |
1665 | g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?" ; |
1666 | run_limiter_node_test<int,isThrowing,nonThrowing>(); |
1667 | g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?" ; |
1668 | run_limiter_node_test<int,nonThrowing,isThrowing>(); |
1669 | g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?" ; |
1670 | run_limiter_node_test<int,isThrowing,isThrowing>(); |
1671 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1672 | } |
1673 | |
1674 | // -------- split_node -------------------- |
1675 | |
1676 | template< |
1677 | class InputTuple, |
1678 | class SourceType, |
1679 | class SourceBodyType, |
1680 | class TestSplitType, |
1681 | class SinkType0, |
1682 | class SinkBodyType0, |
1683 | class SinkType1, |
1684 | class SinkBodyType1> |
1685 | void run_one_split_node_test(bool throwException, bool flog) { |
1686 | |
1687 | tbb::flow::graph g; |
1688 | |
1689 | tbb::atomic<int> source_count; |
1690 | tbb::atomic<int> sink0_count; |
1691 | tbb::atomic<int> sink1_count; |
1692 | source_count = sink0_count = sink1_count = 0; |
1693 | #if USE_TASK_SCHEDULER_OBSERVER |
1694 | eh_test_observer o; |
1695 | o.observe(true); |
1696 | #endif |
1697 | |
1698 | g_Master = Harness::CurrentTid(); |
1699 | SourceType source(g, SourceBodyType(source_count),/*is_active*/false); |
1700 | TestSplitType node_to_test(g); |
1701 | SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count)); |
1702 | SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count)); |
1703 | make_edge(source, node_to_test); |
1704 | make_edge(tbb::flow::output_port<0>(node_to_test), sink0); |
1705 | make_edge(tbb::flow::output_port<1>(node_to_test), sink1); |
1706 | |
1707 | for(int iter = 0; iter < 2; ++iter) { // run, reset, run again |
1708 | ResetGlobals(throwException,flog); |
1709 | if(throwException) { |
1710 | TRY(); |
1711 | source.activate(); |
1712 | g.wait_for_all(); |
1713 | CATCH_AND_ASSERT(); |
1714 | } |
1715 | else { |
1716 | TRY(); |
1717 | source.activate(); |
1718 | g.wait_for_all(); |
1719 | CATCH_AND_FAIL(); |
1720 | } |
1721 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1722 | int sb_cnt = tbb::flow::copy_body<SourceBodyType>(source).count_value(); |
1723 | int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(); |
1724 | int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(); |
1725 | if(throwException) { |
1726 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1727 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1728 | ASSERT(sb_cnt <= 2*g_NumItems, "Too many items sent by source" ); |
1729 | ASSERT(nb0_cnt + nb1_cnt <= sb_cnt*2, "Too many items received by sink nodes" ); |
1730 | } |
1731 | else { |
1732 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1733 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1734 | ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_nodes" ); |
1735 | ASSERT(nb0_cnt == g_NumItems && nb1_cnt == g_NumItems, "Missing items in absorbers" ); |
1736 | } |
1737 | g.reset(); // resets the body of the source_nodes and the absorb_nodes. |
1738 | source_count = sink0_count = sink1_count = 0; |
1739 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType>(source).count_value(),"Reset source failed" ); |
1740 | ASSERT(0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(),"Reset sink 0 failed" ); |
1741 | ASSERT(0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(),"Reset sink 1 failed" ); |
1742 | } |
1743 | #if USE_TASK_SCHEDULER_OBSERVER |
1744 | o.observe(false); |
1745 | #endif |
1746 | } |
1747 | |
1748 | template<class InputTuple, |
1749 | TestNodeTypeEnum SourceThrowType, |
1750 | TestNodeTypeEnum SinkThrowType> |
1751 | void run_split_node_test() { |
1752 | typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0; |
1753 | typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1; |
1754 | typedef tuple_test_source_body<InputTuple,SourceThrowType> SourceBodyType; |
1755 | typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0; |
1756 | typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1; |
1757 | |
1758 | typedef typename tbb::flow::source_node<InputTuple> SourceType; |
1759 | typedef typename tbb::flow::split_node<InputTuple> TestSplitType; |
1760 | typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0; |
1761 | typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1; |
1762 | |
1763 | for(int i = 0; i < 4; ++i) { |
1764 | if(2 == i) continue; |
1765 | bool throwException = (i & 0x1) != 0; |
1766 | bool doFlog = (i & 0x2) != 0; |
1767 | run_one_split_node_test< |
1768 | InputTuple, |
1769 | SourceType, |
1770 | SourceBodyType, |
1771 | TestSplitType, |
1772 | SinkType0, |
1773 | SinkBodyType0, |
1774 | SinkType1, |
1775 | SinkBodyType1> |
1776 | (throwException,doFlog); |
1777 | } |
1778 | } |
1779 | |
1780 | void test_split_node() { |
1781 | REMARK("Testing split_node\n" ); |
1782 | g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?" ; |
1783 | run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>(); |
1784 | g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?" ; |
1785 | run_split_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>(); |
1786 | g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?" ; |
1787 | run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>(); |
1788 | g_Wakeup_Msg = g_Orig_Wakeup_Msg; |
1789 | } |
1790 | |
1791 | // --------- indexer_node ---------------------- |
1792 | |
1793 | template < class InputTuple, |
1794 | class SourceType0, |
1795 | class SourceBodyType0, |
1796 | class SourceType1, |
1797 | class SourceBodyType1, |
1798 | class TestNodeType, |
1799 | class SinkType, |
1800 | class SinkBodyType> |
1801 | void run_one_indexer_node_test(bool throwException,bool flog) { |
1802 | typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0; |
1803 | typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1; |
1804 | |
1805 | tbb::flow::graph g; |
1806 | |
1807 | tbb::atomic<int> source0_count; |
1808 | tbb::atomic<int> source1_count; |
1809 | tbb::atomic<int> sink_count; |
1810 | source0_count = source1_count = sink_count = 0; |
1811 | #if USE_TASK_SCHEDULER_OBSERVER |
1812 | eh_test_observer o; |
1813 | o.observe(true); |
1814 | #endif |
1815 | g_Master = Harness::CurrentTid(); |
1816 | SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false); |
1817 | SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false); |
1818 | TestNodeType node_to_test(g); |
1819 | SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count)); |
1820 | make_edge(source0,tbb::flow::input_port<0>(node_to_test)); |
1821 | make_edge(source1,tbb::flow::input_port<1>(node_to_test)); |
1822 | make_edge(node_to_test, sink); |
1823 | for(int iter = 0; iter < 2; ++iter) { |
1824 | ResetGlobals(throwException,flog); |
1825 | if(throwException) { |
1826 | TRY(); |
1827 | source0.activate(); |
1828 | source1.activate(); |
1829 | g.wait_for_all(); |
1830 | CATCH_AND_ASSERT(); |
1831 | } |
1832 | else { |
1833 | TRY(); |
1834 | source0.activate(); |
1835 | source1.activate(); |
1836 | g.wait_for_all(); |
1837 | CATCH_AND_FAIL(); |
1838 | } |
1839 | bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException; |
1840 | int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value(); |
1841 | int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value(); |
1842 | int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1843 | if(throwException) { |
1844 | ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph" ); |
1845 | ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph" ); |
1846 | ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources" ); |
1847 | ASSERT(nb_cnt <= sb0_cnt + sb1_cnt, "Too many items received by sink nodes" ); |
1848 | } |
1849 | else { |
1850 | ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred" ); |
1851 | ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred" ); |
1852 | ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0" ); |
1853 | ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1" ); |
1854 | ASSERT(nb_cnt == 2*g_NumItems, "Missing items in absorbers" ); |
1855 | } |
1856 | if(iter == 0) { |
1857 | remove_edge(node_to_test, sink); |
1858 | tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4)); |
1859 | tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2)); |
1860 | g.wait_for_all(); |
1861 | g.reset(); |
1862 | source0_count = source1_count = sink_count = 0; |
1863 | make_edge(node_to_test, sink); |
1864 | g.wait_for_all(); |
1865 | } |
1866 | else { |
1867 | g.wait_for_all(); |
1868 | g.reset(); |
1869 | source0_count = source1_count = sink_count = 0; |
1870 | } |
1871 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed" ); |
1872 | ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed" ); |
1873 | nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value(); |
1874 | ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed" ); |
1875 | } |
1876 | |
1877 | #if USE_TASK_SCHEDULER_OBSERVER |
1878 | o.observe(false); |
1879 | #endif |
1880 | } |
1881 | |
1882 | template<class InputTuple, |
1883 | TestNodeTypeEnum SourceThrowType, |
1884 | TestNodeTypeEnum SinkThrowType> |
1885 | void run_indexer_node_test() { |
1886 | typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0; |
1887 | typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1; |
1888 | typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0; |
1889 | typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1; |
1890 | typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType; |
1891 | typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType; |
1892 | |
1893 | typedef typename tbb::flow::source_node<ItemType0> SourceType0; |
1894 | typedef typename tbb::flow::source_node<ItemType1> SourceType1; |
1895 | typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType; |
1896 | |
1897 | for(int i = 0; i < 4; ++i) { |
1898 | if(2 == i) continue; |
1899 | bool throwException = (i & 0x1) != 0; |
1900 | bool doFlog = (i & 0x2) != 0; |
1901 | run_one_indexer_node_test< |
1902 | InputTuple, |
1903 | SourceType0, |
1904 | SourceBodyType0, |
1905 | SourceType1, |
1906 | SourceBodyType1, |
1907 | TestNodeType, |
1908 | SinkType, |
1909 | SinkBodyType>(throwException,doFlog); |
1910 | } |
1911 | } |
1912 | |
1913 | void test_indexer_node() { |
1914 | REMARK("Testing indexer_node\n" ); |
1915 | g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?" ; |
1916 | run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>(); |
1917 | g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?" ; |
1918 | run_indexer_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>(); |
1919 | g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?" ; |
1920 | run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>(); |
1921 | g_Wakeup_Msg = g_Orig_Wakeup_Msg;; |
1922 | } |
1923 | |
1924 | /////////////////////////////////////////////// |
1925 | // whole-graph exception test |
1926 | |
1927 | class Foo { |
1928 | private: |
1929 | // std::vector<int>& m_vec; |
1930 | std::vector<int>* m_vec; |
1931 | public: |
1932 | Foo(std::vector<int>& vec) : m_vec(&vec) { } |
1933 | void operator() (tbb::flow::continue_msg) const { |
1934 | ++nExceptions; |
1935 | m_vec->at(m_vec->size()); // Will throw out_of_range exception |
1936 | ASSERT(false, "Exception not thrown by invalid access" ); |
1937 | } |
1938 | }; |
1939 | |
1940 | // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786 |
1941 | // exception thrown in graph node, not caught in wait_for_all() |
1942 | void |
1943 | test_flow_graph_exception0() { |
1944 | // Initializes body |
1945 | std::vector<int> vec; |
1946 | vec.push_back(0); |
1947 | Foo f(vec); |
1948 | nExceptions = 0; |
1949 | |
1950 | // Construct graph and nodes |
1951 | tbb::flow::graph g; |
1952 | tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g); |
1953 | tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f); |
1954 | |
1955 | // Construct edge |
1956 | tbb::flow::make_edge(start, fooNode); |
1957 | |
1958 | // Execute graph |
1959 | ASSERT(!g.exception_thrown(), "exception_thrown flag already set" ); |
1960 | ASSERT(!g.is_cancelled(), "canceled flag already set" ); |
1961 | try { |
1962 | start.try_put(tbb::flow::continue_msg()); |
1963 | g.wait_for_all(); |
1964 | ASSERT(false, "Exception not thrown" ); |
1965 | } |
1966 | catch(std::out_of_range& ex) { |
1967 | REMARK("Exception: %s (expected)\n" , ex.what()); |
1968 | } |
1969 | catch(...) { |
1970 | REMARK("Unknown exception caught (expected)\n" ); |
1971 | } |
1972 | ASSERT(nExceptions > 0, "Exception caught, but no body signaled exception being thrown" ); |
1973 | nExceptions = 0; |
1974 | ASSERT(g.exception_thrown(), "Exception not intercepted" ); |
1975 | // if exception set, cancellation also set. |
1976 | ASSERT(g.is_cancelled(), "Exception cancellation not signaled" ); |
1977 | // in case we got an exception |
1978 | try { |
1979 | g.wait_for_all(); // context still signalled canceled, my_exception still set. |
1980 | } |
1981 | catch(...) { |
1982 | ASSERT(false, "Second exception thrown but no task executing" ); |
1983 | } |
1984 | ASSERT(nExceptions == 0, "body signaled exception being thrown, but no body executed" ); |
1985 | ASSERT(!g.exception_thrown(), "exception_thrown flag not reset" ); |
1986 | ASSERT(!g.is_cancelled(), "canceled flag not reset" ); |
1987 | } |
1988 | |
1989 | void TestOneThreadNum(int nThread) { |
1990 | REMARK("Testing %d threads\n" , nThread); |
1991 | g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS); |
1992 | g_NumThreads = nThread; |
1993 | tbb::task_scheduler_init init(nThread); |
1994 | // whole-graph exception catch and rethrow test |
1995 | test_flow_graph_exception0(); |
1996 | for(int i = 0; i < 4; ++i) { |
1997 | g_ExceptionInMaster = (i & 1) != 0; |
1998 | g_SolitaryException = (i & 2) != 0; |
1999 | REMARK("g_ExceptionInMaster == %s, g_SolitaryException == %s\n" , |
2000 | g_ExceptionInMaster ? "T" :"F" , |
2001 | g_SolitaryException ? "T" :"F" ); |
2002 | test_source_node(); |
2003 | test_function_node(); |
2004 | test_continue_node(); // also test broadcast_node |
2005 | test_multifunction_node(); |
2006 | // single- and multi-item buffering nodes |
2007 | test_buffer_queue_and_overwrite_node(); |
2008 | test_sequencer_node(); |
2009 | test_priority_queue_node(); |
2010 | |
2011 | // join_nodes |
2012 | test_join_node<tbb::flow::queueing>(); |
2013 | test_join_node<tbb::flow::reserving>(); |
2014 | test_join_node<tbb::flow::tag_matching>(); |
2015 | |
2016 | test_limiter_node(); |
2017 | test_split_node(); |
2018 | // graph for write_once_node will be complicated by the fact the node will |
2019 | // not do try_puts after it has been set. To get parallelism of N we have |
2020 | // to attach N successor nodes to the write_once (or play some similar game). |
2021 | // test_write_once_node(); |
2022 | test_indexer_node(); |
2023 | } |
2024 | } |
2025 | #endif // TBB_USE_EXCEPTIONS |
2026 | |
2027 | #if TBB_USE_EXCEPTIONS |
2028 | int TestMain() { |
2029 | // reversing the order of tests |
2030 | for(int nThread=MaxThread; nThread >= MinThread; --nThread) { |
2031 | TestOneThreadNum(nThread); |
2032 | } |
2033 | |
2034 | return Harness::Done; |
2035 | } |
2036 | #else |
2037 | int TestMain() { |
2038 | return Harness::Skipped; |
2039 | } |
2040 | #endif // TBB_USE_EXCEPTIONS |
2041 | |