1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define HARNESS_DEFAULT_MIN_THREADS 2
18#define HARNESS_DEFAULT_MAX_THREADS 4
19#include "harness_defs.h"
20
21#if _MSC_VER
22 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
23#endif
24
25#if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
26 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
27 #pragma warning (disable: 4702)
28#endif
29
30#include "harness.h"
31
32// global task_scheduler_observer is an imperfect tool to find how many threads are really
33// participating. That was the hope, but it counts the entries into the marketplace,
34// not the arena.
35// #define USE_TASK_SCHEDULER_OBSERVER 1
36
37#if _MSC_VER && defined(__INTEL_COMPILER) && !TBB_USE_DEBUG
38 #define TBB_RUN_BUFFERING_TEST __INTEL_COMPILER > 1210
39#else
40 #define TBB_RUN_BUFFERING_TEST 1
41#endif
42
43#if TBB_USE_EXCEPTIONS
44#if USE_TASK_SCHEDULER_OBSERVER
45#include "tbb/task_scheduler_observer.h"
46#endif
47#include "tbb/flow_graph.h"
48#include "tbb/task_scheduler_init.h"
49#include <iostream>
50#include <vector>
51#include "harness_assert.h"
52#include "harness_checktype.h"
53
54inline intptr_t Existed() { return INT_MAX; } // resolve Existed in harness_eh.h
55
56#include "harness_eh.h"
57#include <stdexcept>
58
59#define NUM_ITEMS 15
60int g_NumItems;
61
62tbb::atomic<unsigned> nExceptions;
63tbb::atomic<intptr_t> g_TGCCancelled;
64
65enum TestNodeTypeEnum { nonThrowing, isThrowing };
66
67static const size_t unlimited_type = 0;
68static const size_t serial_type = 1;
69static const size_t limited_type = 4;
70
71template<TestNodeTypeEnum T> struct TestNodeTypeName;
72template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
73template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
74
75template<size_t Conc> struct concurrencyName;
76template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
77template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
78template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
79
80// Class that provides waiting and throwing behavior. If we are not throwing, do nothing
81// If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82// stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat
83// arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84// If parallel or serial and throwing, use Harness::ConcurrencyTracker to wait.
85
86template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
87class WaitThrow;
88
89template<>
90class WaitThrow<serial_type,nonThrowing> {
91protected:
92 void WaitAndThrow(int cnt, const char * /*name*/) {
93 if(cnt > g_NumThreads + 10) {
94 Harness::ConcurrencyTracker ct;
95 WaitUntilConcurrencyPeaks();
96 }
97 }
98};
99
100template<>
101class WaitThrow<serial_type,isThrowing> {
102protected:
103 void WaitAndThrow(int cnt, const char * /*name*/) {
104 if(cnt > g_NumThreads + 10) {
105 Harness::ConcurrencyTracker ct;
106 WaitUntilConcurrencyPeaks();
107 ThrowTestException(1);
108 }
109 }
110};
111
112// for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113// to make sure enough other nodes wait for concurrency to peak. If we are attached to
114// N successors, for each item we pass to a successor, we will get N executions of the
115// "absorbers" (because we broadcast to successors.) for an odd number of threads we
116// need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117// of an "absorber", but we can't change that without changing the behavior of the node.)
118template<>
119class WaitThrow<limited_type,nonThrowing> {
120protected:
121 void WaitAndThrow(int cnt, const char * /*name*/) {
122 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
123 return;
124 }
125 Harness::ConcurrencyTracker ct;
126 WaitUntilConcurrencyPeaks();
127 }
128};
129
130template<>
131class WaitThrow<limited_type,isThrowing> {
132protected:
133 void WaitAndThrow(int cnt, const char * /*name*/) {
134 Harness::ConcurrencyTracker ct;
135 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
136 return;
137 }
138 WaitUntilConcurrencyPeaks();
139 ThrowTestException(1);
140 }
141};
142
143template<>
144class WaitThrow<unlimited_type,nonThrowing> {
145protected:
146 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147 Harness::ConcurrencyTracker ct;
148 WaitUntilConcurrencyPeaks();
149 }
150};
151
152template<>
153class WaitThrow<unlimited_type,isThrowing> {
154protected:
155 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156 Harness::ConcurrencyTracker ct;
157 WaitUntilConcurrencyPeaks();
158 ThrowTestException(1);
159 }
160};
161
162void
163ResetGlobals(bool throwException = true, bool flog = false) {
164 nExceptions = 0;
165 g_TGCCancelled = 0;
166 ResetEhGlobals(throwException, flog);
167}
168
169// -------source_node body ------------------
170template <class OutputType, TestNodeTypeEnum TType>
171class test_source_body : WaitThrow<serial_type, TType> {
172 using WaitThrow<serial_type, TType>::WaitAndThrow;
173 tbb::atomic<int> *my_current_val;
174 int my_mult;
175public:
176 test_source_body(tbb::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177 REMARK("- --------- - - - constructed %lx\n", (size_t)(my_current_val));
178 }
179
180 bool operator()(OutputType & out) {
181 UPDATE_COUNTS();
182 out = OutputType(my_mult * ++(*my_current_val));
183 REMARK("xx(%lx) out == %d\n", (size_t)(my_current_val), (int)out);
184 if(*my_current_val > g_NumItems) {
185 REMARK(" ------ End of the line!\n");
186 *my_current_val = g_NumItems;
187 return false;
188 }
189 WaitAndThrow((int)out,"test_source_body");
190 return true;
191 }
192
193 int count_value() { return (int)*my_current_val; }
194};
195
196template <TestNodeTypeEnum TType>
197class test_source_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
198 using WaitThrow<serial_type, TType>::WaitAndThrow;
199 tbb::atomic<int> *my_current_val;
200public:
201 test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
202
203 bool operator()(tbb::flow::continue_msg & out) {
204 UPDATE_COUNTS();
205 int outint = ++(*my_current_val);
206 out = tbb::flow::continue_msg();
207 if(*my_current_val > g_NumItems) {
208 *my_current_val = g_NumItems;
209 return false;
210 }
211 WaitAndThrow(outint,"test_source_body");
212 return true;
213 }
214
215 int count_value() { return (int)*my_current_val; }
216};
217
218// -------{function/continue}_node body ------------------
219template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
220class absorber_body : WaitThrow<Conc,T> {
221 using WaitThrow<Conc,T>::WaitAndThrow;
222 tbb::atomic<int> *my_count;
223public:
224 absorber_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
225 OutputType operator()(const InputType &/*p_in*/) {
226 UPDATE_COUNTS();
227 int out = ++(*my_count);
228 WaitAndThrow(out,"absorber_body");
229 return OutputType();
230 }
231 int count_value() { return *my_count; }
232};
233
234// -------multifunction_node body ------------------
235
236// helper classes
237template<int N,class PortsType>
238struct IssueOutput {
239 typedef typename tbb::flow::tuple_element<N-1,PortsType>::type::output_type my_type;
240
241 static void issue_tuple_element( PortsType &my_ports) {
242 ASSERT(tbb::flow::get<N-1>(my_ports).try_put(my_type()), "Error putting to successor");
243 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
244 }
245};
246
247template<class PortsType>
248struct IssueOutput<1,PortsType> {
249 typedef typename tbb::flow::tuple_element<0,PortsType>::type::output_type my_type;
250
251 static void issue_tuple_element( PortsType &my_ports) {
252 ASSERT(tbb::flow::get<0>(my_ports).try_put(my_type()), "Error putting to successor");
253 }
254};
255
256template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
257class multifunction_node_body : WaitThrow<Conc,T> {
258 using WaitThrow<Conc,T>::WaitAndThrow;
259 static const int N = tbb::flow::tuple_size<OutputTupleType>::value;
260 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
261 typedef typename NodeType::output_ports_type PortsType;
262 tbb::atomic<int> *my_count;
263public:
264 multifunction_node_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
265 void operator()(const InputType& /*in*/, PortsType &my_ports) {
266 UPDATE_COUNTS();
267 int out = ++(*my_count);
268 WaitAndThrow(out,"multifunction_node_body");
269 // issue an item to each output port.
270 IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
271 }
272
273 int count_value() { return *my_count; }
274};
275
276// --------- body to sort items in sequencer_node
277template<class BufferItemType>
278struct sequencer_body {
279 size_t operator()(const BufferItemType &s) {
280 ASSERT(s, "sequencer item out of range (== 0)");
281 return size_t(s) - 1;
282 }
283};
284
285// --------- body to compare the "priorities" of objects for priority_queue_node five priority levels 0-4.
286template<class T>
287struct myLess {
288 bool operator()(const T &t1, const T &t2) {
289 return (int(t1) % 5) < (int(t2) % 5);
290 }
291};
292
293// --------- type for < comparison in priority_queue_node.
294template<class ItemType>
295struct less_body {
296 bool operator()(const ItemType &lhs, const ItemType &rhs) {
297 return (int(lhs) % 3) < (int(rhs) % 3);
298 }
299};
300
301// --------- tag methods for tag_matching join_node
302template<typename TT>
303class tag_func {
304 TT my_mult;
305public:
306 tag_func(TT multiplier) : my_mult(multiplier) { }
307 void operator=( const tag_func& other){my_mult = other.my_mult;}
308 // operator() will return [0 .. Count)
309 tbb::flow::tag_value operator()( TT v) {
310 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
311 return t;
312 }
313};
314
315// --------- Source body for split_node test.
316template <class OutputTuple, TestNodeTypeEnum TType>
317class tuple_test_source_body : WaitThrow<serial_type, TType> {
318 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
319 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
320 using WaitThrow<serial_type, TType>::WaitAndThrow;
321 tbb::atomic<int> *my_current_val;
322public:
323 tuple_test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
324
325 bool operator()(OutputTuple & out) {
326 UPDATE_COUNTS();
327 int ival = ++(*my_current_val);
328 out = OutputTuple(ItemType0(ival),ItemType1(ival));
329 if(*my_current_val > g_NumItems) {
330 *my_current_val = g_NumItems; // jam the final value; we assert on it later.
331 return false;
332 }
333 WaitAndThrow(ival,"tuple_test_source_body");
334 return true;
335 }
336
337 int count_value() { return (int)*my_current_val; }
338};
339
340// ------- end of node bodies
341
342// source_node is only-serial. source_node can throw, or the function_node can throw.
343// graph being tested is
344//
345// source_node+---+parallel function_node
346//
347// After each run the graph is reset(), to test the reset functionality.
348//
349
350
351template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
352void run_one_source_node_test(bool throwException, bool flog) {
353 typedef test_source_body<ItemType,srcThrowType> src_body_type;
354 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
355 tbb::atomic<int> source_body_count;
356 tbb::atomic<int> absorber_body_count;
357 source_body_count = 0;
358 absorber_body_count = 0;
359
360 tbb::flow::graph g;
361
362 g_Master = Harness::CurrentTid();
363
364#if USE_TASK_SCHEDULER_OBSERVER
365 eh_test_observer o;
366 o.observe(true);
367#endif
368
369 tbb::flow::source_node<ItemType> sn(g, src_body_type(source_body_count),/*is_active*/false);
370 parallel_absorb_body_type ab2(absorber_body_count);
371 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
372 make_edge(sn, parallel_fn);
373 for(int runcnt = 0; runcnt < 2; ++runcnt) {
374 ResetGlobals(throwException,flog);
375 if(throwException) {
376 TRY();
377 sn.activate();
378 g.wait_for_all();
379 CATCH_AND_ASSERT();
380 }
381 else {
382 TRY();
383 sn.activate();
384 g.wait_for_all();
385 CATCH_AND_FAIL();
386 }
387
388 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
389 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
390 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
391 if(throwException) {
392 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception flag in flow::graph not set");
393 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "canceled flag not set");
394 ASSERT(src_cnt <= g_NumItems, "Too many source_node items emitted");
395 ASSERT(sink_cnt <= src_cnt, "Too many source_node items received");
396 }
397 else {
398 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
399 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
400 ASSERT(src_cnt == g_NumItems, "Incorrect # source_node items emitted");
401 ASSERT(sink_cnt == src_cnt, "Incorrect # source_node items received");
402 }
403 g.reset(); // resets the body of the source_node and the absorb_nodes.
404 source_body_count = 0;
405 absorber_body_count = 0;
406 ASSERT(!g.exception_thrown(), "Reset didn't clear exception_thrown()");
407 ASSERT(!g.is_cancelled(), "Reset didn't clear is_cancelled()");
408 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
409 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
410 ASSERT(src_cnt == 0, "source_node count not reset");
411 ASSERT(sink_cnt == 0, "sink_node count not reset");
412 }
413#if USE_TASK_SCHEDULER_OBSERVER
414 o.observe(false);
415#endif
416} // run_one_source_node_test
417
418
419template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
420void run_source_node_test() {
421 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(false,false);
422 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,false);
423 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,true);
424} // run_source_node_test
425
426void test_source_node() {
427 REMARK("Testing source_node\n");
428 check_type<int>::check_type_counter = 0;
429 g_Wakeup_Msg = "source_node(1): Missed wakeup or machine is overloaded?";
430 run_source_node_test<check_type<int>, isThrowing, nonThrowing>();
431 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
432 g_Wakeup_Msg = "source_node(2): Missed wakeup or machine is overloaded?";
433 run_source_node_test<int, isThrowing, nonThrowing>();
434 g_Wakeup_Msg = "source_node(3): Missed wakeup or machine is overloaded?";
435 run_source_node_test<int, nonThrowing, isThrowing>();
436 g_Wakeup_Msg = "source_node(4): Missed wakeup or machine is overloaded?";
437 run_source_node_test<int, isThrowing, isThrowing>();
438 g_Wakeup_Msg = "source_node(5): Missed wakeup or machine is overloaded?";
439 run_source_node_test<check_type<int>, isThrowing, isThrowing>();
440 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
441 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
442}
443
444// -------- utilities & types to test function_node and multifunction_node.
445
446// need to tell the template which node type I am using so it attaches successors correctly.
447enum NodeFetchType { func_node_type, multifunc_node_type };
448
449template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
450struct AttachPoint;
451
452template<class NodeType, class ItemType, int indx>
453struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
454 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
455 return tbb::flow::output_port<indx>(n);
456 }
457};
458
459template<class NodeType, class ItemType, int indx>
460struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
461 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
462 return n;
463 }
464};
465
466
467// common template for running function_node, multifunction_node. continue_node
468// has different firing requirements, so it needs a different graph topology.
469template<
470 class SourceNodeType,
471 class SourceNodeBodyType0,
472 class SourceNodeBodyType1,
473 NodeFetchType NFT,
474 class TestNodeType,
475 class TestNodeBodyType,
476 class TypeToSink0, // what kind of item are we sending to sink0
477 class TypeToSink1, // what kind of item are we sending to sink1
478 class SinkNodeType0, // will be same for function;
479 class SinkNodeType1, // may differ for multifunction_node
480 class SinkNodeBodyType0,
481 class SinkNodeBodyType1,
482 size_t Conc
483 >
484void
485run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
486
487 char mymsg[132];
488 char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
489 tbb::flow::graph g;
490
491 tbb::atomic<int> source0_count;
492 tbb::atomic<int> source1_count;
493 tbb::atomic<int> sink0_count;
494 tbb::atomic<int> sink1_count;
495 tbb::atomic<int> test_count;
496 source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
497
498#if USE_TASK_SCHEDULER_OBSERVER
499 eh_test_observer o;
500 o.observe(true);
501#endif
502
503 g_Master = Harness::CurrentTid();
504 SourceNodeType source0(g, SourceNodeBodyType0(source0_count),/*is_active*/false);
505 SourceNodeType source1(g, SourceNodeBodyType1(source1_count),/*is_active*/false);
506 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
507 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
508 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
509 make_edge(source0, node_to_test);
510 make_edge(source1, node_to_test);
511 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
512 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
513
514 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
515 sprintf(mymsg, "%s iter=%d, threads=%d, throw=%s, flog=%s", saved_msg, iter, g_NumThreads,
516 throwException?"T":"F", flog?"T":"F");
517 g_Wakeup_Msg = mymsg;
518 ResetGlobals(throwException,flog);
519 if(throwException) {
520 TRY();
521 source0.activate();
522 source1.activate();
523 g.wait_for_all();
524 CATCH_AND_ASSERT();
525 }
526 else {
527 TRY();
528 source0.activate();
529 source1.activate();
530 g.wait_for_all();
531 CATCH_AND_FAIL();
532 }
533 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
534 int sb0_cnt = tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value();
535 int sb1_cnt = tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value();
536 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
537 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
538 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
539 if(throwException) {
540 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
541 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
542 ASSERT(sb0_cnt + sb1_cnt <= 2*g_NumItems, "Too many items sent by sources");
543 ASSERT(sb0_cnt + sb1_cnt >= t_cnt, "Too many items received by test node");
544 ASSERT(nb0_cnt + nb1_cnt <= t_cnt*2, "Too many items received by sink nodes");
545 }
546 else {
547 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
548 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
549 ASSERT(sb0_cnt + sb1_cnt == 2*g_NumItems, "Missing invocations of source_nodes");
550 ASSERT(t_cnt == 2*g_NumItems, "Not all items reached test node");
551 ASSERT(nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems, "Missing items in absorbers");
552 }
553 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
554 source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
555 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value(),"Reset source 0 failed");
556 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value(),"Reset source 1 failed");
557 ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
558 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(),"Reset sink 0 failed");
559 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(),"Reset sink 1 failed");
560
561 g_Wakeup_Msg = saved_msg;
562 }
563#if USE_TASK_SCHEDULER_OBSERVER
564 o.observe(false);
565#endif
566}
567
568// Test function_node
569//
570// graph being tested is
571//
572// source_node -\ /- parallel function_node
573// \ /
574// +function_node+
575// / \ x
576// source_node -/ \- parallel function_node
577//
578// After each run the graph is reset(), to test the reset functionality.
579//
580template<
581 TestNodeTypeEnum SType1, // does source node 1 throw?
582 TestNodeTypeEnum SType2, // does source node 2 throw?
583 class Item12, // type of item passed between sources and test node
584 TestNodeTypeEnum FType, // does function node throw?
585 class Item23, // type passed from function_node to sink nodes
586 TestNodeTypeEnum NType1, // does sink node 1 throw?
587 TestNodeTypeEnum NType2, // does sink node 1 throw?
588 class NodePolicy, // rejecting,queueing
589 size_t Conc // is node concurrent? {serial | limited | unlimited}
590>
591void run_function_node_test() {
592
593 typedef test_source_body<Item12,SType1> SBodyType1;
594 typedef test_source_body<Item12,SType2> SBodyType2;
595 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
596 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
597 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
598
599 typedef tbb::flow::source_node<Item12> SrcType;
600 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
601 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
602
603 for(int i = 0; i < 4; ++i ) {
604 if(i != 2) { // doesn't make sense to flog a non-throwing test
605 bool doThrow = (i & 0x1) != 0;
606 bool doFlog = (i & 0x2) != 0;
607 run_one_functype_node_test<
608 /*SourceNodeType*/ SrcType,
609 /*SourceNodeBodyType0*/ SBodyType1,
610 /*SourceNodeBodyType1*/ SBodyType2,
611 /* NFT */ func_node_type,
612 /*TestNodeType*/ TestType,
613 /*TestNodeBodyType*/ TestBodyType,
614 /*TypeToSink0 */ Item23,
615 /*TypeToSink1 */ Item23,
616 /*SinkNodeType0*/ SnkType,
617 /*SinkNodeType1*/ SnkType,
618 /*SinkNodeBodyType1*/ SinkBodyType1,
619 /*SinkNodeBodyType2*/ SinkBodyType2,
620 /*Conc*/ Conc>
621 (doThrow,doFlog,"function_node");
622 }
623 }
624} // run_function_node_test
625
626void test_function_node() {
627 REMARK("Testing function_node\n");
628 // serial rejecting
629 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
630 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
631 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
632 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
633 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
634 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
635
636 // serial queueing
637 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
638 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
639 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
640 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
641 check_type<int>::check_type_counter = 0;
642 run_function_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, check_type<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
643 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
644
645 // unlimited parallel rejecting
646 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
647 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
648 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
649 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
650
651 // limited parallel rejecting
652 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
653 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
654 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
655 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
656
657 // limited parallel queueing
658 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
659 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
660 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
661 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
662
663 // everyone throwing
664 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
665 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
666 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
667}
668
669// ----------------------------------- multifunction_node ----------------------------------
670// Test multifunction_node.
671//
672// graph being tested is
673//
674// source_node -\ /- parallel function_node
675// \ /
676// +multifunction_node+
677// / \ x
678// source_node -/ \- parallel function_node
679//
680// After each run the graph is reset(), to test the reset functionality. The
681// multifunction_node will put an item to each successor for every item
682// received.
683//
684template<
685 TestNodeTypeEnum SType0, // does source node 1 throw?
686 TestNodeTypeEnum SType1, // does source node 2 thorw?
687 class Item12, // type of item passed between sources and test node
688 TestNodeTypeEnum FType, // does multifunction node throw?
689 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes
690 TestNodeTypeEnum NType1, // does sink node 1 throw?
691 TestNodeTypeEnum NType2, // does sink node 2 throw?
692 class NodePolicy, // rejecting,queueing
693 size_t Conc // is node concurrent? {serial | limited | unlimited}
694>
695void run_multifunction_node_test() {
696
697 typedef typename tbb::flow::tuple_element<0,ItemTuple>::type Item23Type0;
698 typedef typename tbb::flow::tuple_element<1,ItemTuple>::type Item23Type1;
699 typedef test_source_body<Item12,SType0> SBodyType1;
700 typedef test_source_body<Item12,SType1> SBodyType2;
701 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
702 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
703 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
704
705 typedef tbb::flow::source_node<Item12> SrcType;
706 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
707 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
708 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
709
710 for(int i = 0; i < 4; ++i ) {
711 if(i != 2) { // doesn't make sense to flog a non-throwing test
712 bool doThrow = (i & 0x1) != 0;
713 bool doFlog = (i & 0x2) != 0;
714 run_one_functype_node_test<
715 /*SourceNodeType*/ SrcType,
716 /*SourceNodeBodyType0*/ SBodyType1,
717 /*SourceNodeBodyType1*/ SBodyType2,
718 /*NFT*/ multifunc_node_type,
719 /*TestNodeType*/ TestType,
720 /*TestNodeBodyType*/ TestBodyType,
721 /*TypeToSink0*/ Item23Type0,
722 /*TypeToSink1*/ Item23Type1,
723 /*SinkNodeType0*/ SnkType0,
724 /*SinkNodeType1*/ SnkType1,
725 /*SinkNodeBodyType0*/ SinkBodyType1,
726 /*SinkNodeBodyType1*/ SinkBodyType2,
727 /*Conc*/ Conc>
728 (doThrow,doFlog,"multifunction_node");
729 }
730 }
731} // run_multifunction_node_test
732
733void test_multifunction_node() {
734 REMARK("Testing multifunction_node\n");
735 g_Wakeup_Msg = "multifunction_node(source throws,rejecting,serial): Missed wakeup or machine is overloaded?";
736 // serial rejecting
737 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
738 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
739 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
740 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
741 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
742
743 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
744 // serial queueing
745 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
746 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
747 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
748 check_type<int>::check_type_counter = 0;
749 run_multifunction_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, tbb::flow::tuple<check_type<int>, check_type<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
750 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
751
752 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
753 // unlimited parallel rejecting
754 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
755 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
756 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
757
758 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
759 // limited parallel rejecting
760 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
761 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
762 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
763
764 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
765 // limited parallel queueing
766 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
767 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
768 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
769
770 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
771 // everyone throwing
772 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, tbb::flow::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
773 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
774}
775
776//
777// Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors
778// it executes the body of the node, and forwards a continue_msg to its successors.
779// However many predecessors the continue_node has, that's how many continue_msgs it receives
780// on input before forwarding a message.
781//
782// The graph will look like
783//
784// +broadcast_node+
785// / \ ___
786// source_node+------>+broadcast_node+ +continue_node+--->+absorber
787// \ /
788// +broadcast_node+
789//
790// The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
791// The absorber is parallel, so each item emitted by the source will result in one thread
792// spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if
793// we are allowed to.
794
795template < class SourceNodeType, class SourceNodeBodyType, class TTestNodeType, class TestNodeBodyType,
796 class SinkNodeType, class SinkNodeBodyType>
797void run_one_continue_node_test (bool throwException, bool flog) {
798 tbb::flow::graph g;
799
800 tbb::atomic<int> source_count;
801 tbb::atomic<int> test_count;
802 tbb::atomic<int> sink_count;
803 source_count = test_count = sink_count = 0;
804#if USE_TASK_SCHEDULER_OBSERVER
805 eh_test_observer o;
806 o.observe(true);
807#endif
808 g_Master = Harness::CurrentTid();
809 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
810 TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
811 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
812 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
813 make_edge(source, b1);
814 make_edge(b1,b2);
815 make_edge(b1,b3);
816 make_edge(b2,node_to_test);
817 make_edge(b3,node_to_test);
818 make_edge(node_to_test, sink);
819 for(int iter = 0; iter < 2; ++iter) {
820 ResetGlobals(throwException,flog);
821 if(throwException) {
822 TRY();
823 source.activate();
824 g.wait_for_all();
825 CATCH_AND_ASSERT();
826 }
827 else {
828 TRY();
829 source.activate();
830 g.wait_for_all();
831 CATCH_AND_FAIL();
832 }
833 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
834 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
835 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
836 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
837 if(throwException) {
838 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
839 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
840 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
841 ASSERT(sb_cnt >= t_cnt, "Too many items received by test node");
842 ASSERT(nb_cnt <= t_cnt, "Too many items received by sink nodes");
843 }
844 else {
845 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
846 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
847 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
848 ASSERT(t_cnt == g_NumItems, "Not all items reached test node");
849 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
850 }
851 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
852 source_count = test_count = sink_count = 0;
853 ASSERT(0 == (int)test_count, "Atomic wasn't reset properly");
854 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
855 ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
856 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
857 }
858#if USE_TASK_SCHEDULER_OBSERVER
859 o.observe(false);
860#endif
861}
862
863template<
864 class ItemType,
865 TestNodeTypeEnum SType, // does source node throw?
866 TestNodeTypeEnum CType, // does continue_node throw?
867 TestNodeTypeEnum AType> // does absorber throw
868void run_continue_node_test() {
869 typedef test_source_body<tbb::flow::continue_msg,SType> SBodyType;
870 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
871 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
872
873 typedef tbb::flow::source_node<tbb::flow::continue_msg> SrcType;
874 typedef tbb::flow::continue_node<ItemType> TestType;
875 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
876
877 for(int i = 0; i < 4; ++i ) {
878 if(i == 2) continue; // don't run (false,true); it doesn't make sense.
879 bool doThrow = (i & 0x1) != 0;
880 bool doFlog = (i & 0x2) != 0;
881 run_one_continue_node_test<
882 /*SourceNodeType*/ SrcType,
883 /*SourceNodeBodyType*/ SBodyType,
884 /*TestNodeType*/ TestType,
885 /*TestNodeBodyType*/ ContBodyType,
886 /*SinkNodeType*/ SnkType,
887 /*SinkNodeBodyType*/ SinkBodyType>
888 (doThrow,doFlog);
889 }
890}
891
892//
893void test_continue_node() {
894 REMARK("Testing continue_node\n");
895 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
896 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
897 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
898 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
899 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
900 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
901 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
902 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
903 check_type<double>::check_type_counter = 0;
904 run_continue_node_test<check_type<double>,isThrowing,isThrowing,isThrowing>();
905 ASSERT(!check_type<double>::check_type_counter, "Dropped objects in continue_node test");
906 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
907}
908
909// ---------- buffer_node queue_node overwrite_node --------------
910
911template<
912 class BufferItemType, //
913 class SourceNodeType,
914 class SourceNodeBodyType,
915 class TestNodeType,
916 class SinkNodeType,
917 class SinkNodeBodyType >
918void run_one_buffer_node_test(bool throwException,bool flog) {
919 tbb::flow::graph g;
920
921 tbb::atomic<int> source_count;
922 tbb::atomic<int> sink_count;
923 source_count = sink_count = 0;
924#if USE_TASK_SCHEDULER_OBSERVER
925 eh_test_observer o;
926 o.observe(true);
927#endif
928 g_Master = Harness::CurrentTid();
929 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
930 TestNodeType node_to_test(g);
931 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
932 make_edge(source,node_to_test);
933 make_edge(node_to_test, sink);
934 for(int iter = 0; iter < 2; ++iter) {
935 ResetGlobals(throwException,flog);
936 if(throwException) {
937 TRY();
938 source.activate();
939 g.wait_for_all();
940 CATCH_AND_ASSERT();
941 }
942 else {
943 TRY();
944 source.activate();
945 g.wait_for_all();
946 CATCH_AND_FAIL();
947 }
948 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
949 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
950 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
951 if(throwException) {
952 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
953 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
954 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
955 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
956 }
957 else {
958 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
959 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
960 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
961 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
962 }
963 if(iter == 0) {
964 remove_edge(node_to_test, sink);
965 node_to_test.try_put(BufferItemType());
966 g.wait_for_all();
967 g.reset();
968 source_count = sink_count = 0;
969 BufferItemType tmp;
970 ASSERT(!node_to_test.try_get(tmp), "node not empty");
971 make_edge(node_to_test, sink);
972 g.wait_for_all();
973 }
974 else {
975 g.reset();
976 source_count = sink_count = 0;
977 }
978 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
979 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
980 }
981
982#if USE_TASK_SCHEDULER_OBSERVER
983 o.observe(false);
984#endif
985}
986template<class BufferItemType,
987 TestNodeTypeEnum SourceThrowType,
988 TestNodeTypeEnum SinkThrowType>
989void run_buffer_queue_and_overwrite_node_test() {
990 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
991 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
992
993 typedef tbb::flow::source_node<BufferItemType> SrcType;
994 typedef tbb::flow::buffer_node<BufferItemType> BufType;
995 typedef tbb::flow::queue_node<BufferItemType> QueType;
996 typedef tbb::flow::overwrite_node<BufferItemType> OvrType;
997 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
998
999 for(int i = 0; i < 4; ++i) {
1000 if(i == 2) continue; // no need to test flog w/o throws
1001 bool throwException = (i & 0x1) != 0;
1002 bool doFlog = (i & 0x2) != 0;
1003#if TBB_RUN_BUFFERING_TEST
1004 run_one_buffer_node_test<
1005 /* class BufferItemType*/ BufferItemType,
1006 /*class SourceNodeType*/ SrcType,
1007 /*class SourceNodeBodyType*/ SourceBodyType,
1008 /*class TestNodeType*/ BufType,
1009 /*class SinkNodeType*/ SnkType,
1010 /*class SinkNodeBodyType*/ SinkBodyType
1011 >(throwException, doFlog);
1012 run_one_buffer_node_test<
1013 /* class BufferItemType*/ BufferItemType,
1014 /*class SourceNodeType*/ SrcType,
1015 /*class SourceNodeBodyType*/ SourceBodyType,
1016 /*class TestNodeType*/ QueType,
1017 /*class SinkNodeType*/ SnkType,
1018 /*class SinkNodeBodyType*/ SinkBodyType
1019 >(throwException, doFlog);
1020#endif
1021 run_one_buffer_node_test<
1022 /* class BufferItemType*/ BufferItemType,
1023 /*class SourceNodeType*/ SrcType,
1024 /*class SourceNodeBodyType*/ SourceBodyType,
1025 /*class TestNodeType*/ OvrType,
1026 /*class SinkNodeType*/ SnkType,
1027 /*class SinkNodeBodyType*/ SinkBodyType
1028 >(throwException, doFlog);
1029 }
1030}
1031
1032void test_buffer_queue_and_overwrite_node() {
1033 REMARK("Testing buffer_node, queue_node and overwrite_node\n");
1034#if TBB_RUN_BUFFERING_TEST
1035#else
1036 REMARK("skip buffer and queue test (known issue)\n");
1037#endif
1038 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1039 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1040 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1041 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1042 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1043 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1044 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1045}
1046
1047// ---------- sequencer_node -------------------------
1048
1049
1050template<
1051 class BufferItemType, //
1052 class SourceNodeType,
1053 class SourceNodeBodyType,
1054 class TestNodeType,
1055 class SeqBodyType,
1056 class SinkNodeType,
1057 class SinkNodeBodyType >
1058void run_one_sequencer_node_test(bool throwException,bool flog) {
1059 tbb::flow::graph g;
1060
1061 tbb::atomic<int> source_count;
1062 tbb::atomic<int> sink_count;
1063 source_count = sink_count = 0;
1064#if USE_TASK_SCHEDULER_OBSERVER
1065 eh_test_observer o;
1066 o.observe(true);
1067#endif
1068 g_Master = Harness::CurrentTid();
1069 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1070 TestNodeType node_to_test(g,SeqBodyType());
1071 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1072 make_edge(source,node_to_test);
1073 make_edge(node_to_test, sink);
1074 for(int iter = 0; iter < 2; ++iter) {
1075 ResetGlobals(throwException,flog);
1076 if(throwException) {
1077 TRY();
1078 source.activate();
1079 g.wait_for_all();
1080 CATCH_AND_ASSERT();
1081 }
1082 else {
1083 TRY();
1084 source.activate();
1085 g.wait_for_all();
1086 CATCH_AND_FAIL();
1087 }
1088 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1089 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1090 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1091 if(throwException) {
1092 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1093 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1094 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1095 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1096 }
1097 else {
1098 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1099 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1100 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1101 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1102 }
1103 if(iter == 0) {
1104 remove_edge(node_to_test, sink);
1105 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1106 node_to_test.try_put(BufferItemType(1));
1107 g.wait_for_all();
1108 g.reset();
1109 source_count = sink_count = 0;
1110 make_edge(node_to_test, sink);
1111 g.wait_for_all();
1112 }
1113 else {
1114 g.reset();
1115 source_count = sink_count = 0;
1116 }
1117 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1118 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1119 }
1120
1121#if USE_TASK_SCHEDULER_OBSERVER
1122 o.observe(false);
1123#endif
1124}
1125
1126template<class BufferItemType,
1127 TestNodeTypeEnum SourceThrowType,
1128 TestNodeTypeEnum SinkThrowType>
1129void run_sequencer_node_test() {
1130 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1131 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1132 typedef sequencer_body<BufferItemType> SeqBodyType;
1133
1134 typedef tbb::flow::source_node<BufferItemType> SrcType;
1135 typedef tbb::flow::sequencer_node<BufferItemType> SeqType;
1136 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1137
1138 for(int i = 0; i < 4; ++i) {
1139 if(i == 2) continue; // no need to test flog w/o throws
1140 bool throwException = (i & 0x1) != 0;
1141 bool doFlog = (i & 0x2) != 0;
1142 run_one_sequencer_node_test<
1143 /* class BufferItemType*/ BufferItemType,
1144 /*class SourceNodeType*/ SrcType,
1145 /*class SourceNodeBodyType*/ SourceBodyType,
1146 /*class TestNodeType*/ SeqType,
1147 /*class SeqBodyType*/ SeqBodyType,
1148 /*class SinkNodeType*/ SnkType,
1149 /*class SinkNodeBodyType*/ SinkBodyType
1150 >(throwException, doFlog);
1151 }
1152}
1153
1154
1155
1156void test_sequencer_node() {
1157 REMARK("Testing sequencer_node\n");
1158 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1159 run_sequencer_node_test<int, isThrowing,nonThrowing>();
1160 check_type<int>::check_type_counter = 0;
1161 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1162 run_sequencer_node_test<check_type<int>, nonThrowing,isThrowing>();
1163 ASSERT(!check_type<int>::check_type_counter, "Dropped objects in sequencer_node test");
1164 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1165 run_sequencer_node_test<int, isThrowing,isThrowing>();
1166 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1167}
1168
1169// ------------ priority_queue_node ------------------
1170
1171template<
1172 class BufferItemType,
1173 class SourceNodeType,
1174 class SourceNodeBodyType,
1175 class TestNodeType,
1176 class SinkNodeType,
1177 class SinkNodeBodyType >
1178void run_one_priority_queue_node_test(bool throwException,bool flog) {
1179 tbb::flow::graph g;
1180
1181 tbb::atomic<int> source_count;
1182 tbb::atomic<int> sink_count;
1183 source_count = sink_count = 0;
1184#if USE_TASK_SCHEDULER_OBSERVER
1185 eh_test_observer o;
1186 o.observe(true);
1187#endif
1188 g_Master = Harness::CurrentTid();
1189 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1190
1191 TestNodeType node_to_test(g);
1192
1193 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1194
1195 make_edge(source,node_to_test);
1196 make_edge(node_to_test, sink);
1197 for(int iter = 0; iter < 2; ++iter) {
1198 ResetGlobals(throwException,flog);
1199 if(throwException) {
1200 TRY();
1201 source.activate();
1202 g.wait_for_all();
1203 CATCH_AND_ASSERT();
1204 }
1205 else {
1206 TRY();
1207 source.activate();
1208 g.wait_for_all();
1209 CATCH_AND_FAIL();
1210 }
1211 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1212 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1213 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1214 if(throwException) {
1215 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1216 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1217 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1218 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1219 }
1220 else {
1221 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1222 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1223 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1224 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1225 }
1226 if(iter == 0) {
1227 remove_edge(node_to_test, sink);
1228 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1229 node_to_test.try_put(BufferItemType(g_NumItems + 2));
1230 node_to_test.try_put(BufferItemType());
1231 g.wait_for_all();
1232 g.reset();
1233 source_count = sink_count = 0;
1234 make_edge(node_to_test, sink);
1235 g.wait_for_all();
1236 }
1237 else {
1238 g.reset();
1239 source_count = sink_count = 0;
1240 }
1241 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1242 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1243 }
1244
1245#if USE_TASK_SCHEDULER_OBSERVER
1246 o.observe(false);
1247#endif
1248}
1249
1250template<class BufferItemType,
1251 TestNodeTypeEnum SourceThrowType,
1252 TestNodeTypeEnum SinkThrowType>
1253void run_priority_queue_node_test() {
1254 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1255 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1256 typedef less_body<BufferItemType> LessBodyType;
1257
1258 typedef tbb::flow::source_node<BufferItemType> SrcType;
1259 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType;
1260 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1261
1262 for(int i = 0; i < 4; ++i) {
1263 if(i == 2) continue; // no need to test flog w/o throws
1264 bool throwException = (i & 0x1) != 0;
1265 bool doFlog = (i & 0x2) != 0;
1266 run_one_priority_queue_node_test<
1267 /* class BufferItemType*/ BufferItemType,
1268 /*class SourceNodeType*/ SrcType,
1269 /*class SourceNodeBodyType*/ SourceBodyType,
1270 /*class TestNodeType*/ PrqType,
1271 /*class SinkNodeType*/ SnkType,
1272 /*class SinkNodeBodyType*/ SinkBodyType
1273 >(throwException, doFlog);
1274 }
1275}
1276
1277void test_priority_queue_node() {
1278 REMARK("Testing priority_queue_node\n");
1279 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1280 run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1281 check_type<int>::check_type_counter = 0;
1282 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1283 run_priority_queue_node_test<check_type<int>, nonThrowing,isThrowing>();
1284 ASSERT(!check_type<int>::check_type_counter, "Dropped objects in priority_queue_node test");
1285 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1286 run_priority_queue_node_test<int, isThrowing,isThrowing>();
1287 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1288}
1289
1290// ------------------- join_node ----------------
1291template<class JP> struct graph_policy_name{
1292 static const char* name() {return "unknown"; }
1293};
1294template<> struct graph_policy_name<tbb::flow::queueing> {
1295 static const char* name() {return "queueing"; }
1296};
1297template<> struct graph_policy_name<tbb::flow::reserving> {
1298 static const char* name() {return "reserving"; }
1299};
1300template<> struct graph_policy_name<tbb::flow::tag_matching> {
1301 static const char* name() {return "tag_matching"; }
1302};
1303
1304
1305template<
1306 class JP,
1307 class OutputTuple,
1308 class SourceType0,
1309 class SourceBodyType0,
1310 class SourceType1,
1311 class SourceBodyType1,
1312 class TestJoinType,
1313 class SinkType,
1314 class SinkBodyType
1315 >
1316struct run_one_join_node_test {
1317 run_one_join_node_test() {}
1318 static void execute_test(bool throwException,bool flog) {
1319 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1320 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1321
1322 tbb::flow::graph g;
1323 tbb::atomic<int>source0_count;
1324 tbb::atomic<int>source1_count;
1325 tbb::atomic<int>sink_count;
1326 source0_count = source1_count = sink_count = 0;
1327#if USE_TASK_SCHEDULER_OBSERVER
1328 eh_test_observer o;
1329 o.observe(true);
1330#endif
1331 g_Master = Harness::CurrentTid();
1332 SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1333 SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1334 TestJoinType node_to_test(g);
1335 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1336 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1337 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1338 make_edge(node_to_test, sink);
1339 for(int iter = 0; iter < 2; ++iter) {
1340 ResetGlobals(throwException,flog);
1341 if(throwException) {
1342 TRY();
1343 source0.activate();
1344 source1.activate();
1345 g.wait_for_all();
1346 CATCH_AND_ASSERT();
1347 }
1348 else {
1349 TRY();
1350 source0.activate();
1351 source1.activate();
1352 g.wait_for_all();
1353 CATCH_AND_FAIL();
1354 }
1355 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1356 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1357 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1358 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1359 if(throwException) {
1360 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1361 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1362 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1363 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1364 }
1365 else {
1366 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1367 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1368 if(sb0_cnt != g_NumItems) {
1369 REMARK("throwException == %s\n", throwException ? "true" : "false");
1370 REMARK("iter == %d\n", (int)iter);
1371 REMARK("sb0_cnt == %d\n", (int)sb0_cnt);
1372 REMARK("g_NumItems == %d\n", (int)g_NumItems);
1373 }
1374 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0"); // this one
1375 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1376 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1377 }
1378 if(iter == 0) {
1379 remove_edge(node_to_test, sink);
1380 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1381 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1382 g.wait_for_all();
1383 g.reset();
1384 source0_count = source1_count = sink_count = 0;
1385 make_edge(node_to_test, sink);
1386 g.wait_for_all();
1387 }
1388 else {
1389 g.wait_for_all();
1390 g.reset();
1391 source0_count = source1_count = sink_count = 0;
1392 }
1393 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1394 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1395 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1396 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1397 }
1398
1399#if USE_TASK_SCHEDULER_OBSERVER
1400 o.observe(false);
1401#endif
1402 }
1403}; // run_one_join_node_test
1404
1405template<
1406 class OutputTuple,
1407 class SourceType0,
1408 class SourceBodyType0,
1409 class SourceType1,
1410 class SourceBodyType1,
1411 class TestJoinType,
1412 class SinkType,
1413 class SinkBodyType
1414 >
1415struct run_one_join_node_test<
1416 tbb::flow::tag_matching,
1417 OutputTuple,
1418 SourceType0,
1419 SourceBodyType0,
1420 SourceType1,
1421 SourceBodyType1,
1422 TestJoinType,
1423 SinkType,
1424 SinkBodyType
1425 > {
1426 run_one_join_node_test() {}
1427 static void execute_test(bool throwException,bool flog) {
1428 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1429 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1430
1431 tbb::flow::graph g;
1432
1433 tbb::atomic<int>source0_count;
1434 tbb::atomic<int>source1_count;
1435 tbb::atomic<int>sink_count;
1436 source0_count = source1_count = sink_count = 0;
1437#if USE_TASK_SCHEDULER_OBSERVER
1438 eh_test_observer o;
1439 o.observe(true);
1440#endif
1441 g_Master = Harness::CurrentTid();
1442 SourceType0 source0(g, SourceBodyType0(source0_count, 2),/*is_active*/false);
1443 SourceType1 source1(g, SourceBodyType1(source1_count, 3),/*is_active*/false);
1444 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1445 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1446 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1447 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1448 make_edge(node_to_test, sink);
1449 for(int iter = 0; iter < 2; ++iter) {
1450 ResetGlobals(throwException,flog);
1451 if(throwException) {
1452 TRY();
1453 source0.activate();
1454 source1.activate();
1455 g.wait_for_all();
1456 CATCH_AND_ASSERT();
1457 }
1458 else {
1459 TRY();
1460 source0.activate();
1461 source1.activate();
1462 g.wait_for_all();
1463 CATCH_AND_FAIL();
1464 }
1465 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1466 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1467 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1468 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1469 if(throwException) {
1470 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1471 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1472 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1473 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1474 }
1475 else {
1476 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1477 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1478 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1479 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1480 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1481 }
1482 if(iter == 0) {
1483 remove_edge(node_to_test, sink);
1484 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1485 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1486 g.wait_for_all(); // have to wait for the graph to stop again....
1487 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
1488 source0_count = source1_count = sink_count = 0;
1489 make_edge(node_to_test, sink);
1490 g.wait_for_all(); // have to wait for the graph to stop again....
1491 }
1492 else {
1493 g.wait_for_all();
1494 g.reset();
1495 source0_count = source1_count = sink_count = 0;
1496 }
1497 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1498 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1499 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1500 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1501 }
1502
1503#if USE_TASK_SCHEDULER_OBSERVER
1504 o.observe(false);
1505#endif
1506 }
1507}; // run_one_join_node_test<tag_matching>
1508
1509template<class JP, class OutputTuple,
1510 TestNodeTypeEnum SourceThrowType,
1511 TestNodeTypeEnum SinkThrowType>
1512void run_join_node_test() {
1513 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1514 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1515 typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1516 typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1517 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1518
1519 typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1520 typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1521 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1522 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1523
1524 for(int i = 0; i < 4; ++i) {
1525 if(2 == i) continue;
1526 bool throwException = (i & 0x1) != 0;
1527 bool doFlog = (i & 0x2) != 0;
1528 run_one_join_node_test<
1529 JP,
1530 OutputTuple,
1531 SourceType0,
1532 SourceBodyType0,
1533 SourceType1,
1534 SourceBodyType1,
1535 TestJoinType,
1536 SinkType,
1537 SinkBodyType>::execute_test(throwException,doFlog);
1538 }
1539}
1540
1541template<class JP>
1542void test_join_node() {
1543 REMARK("Testing join_node<%s>\n", graph_policy_name<JP>::name());
1544 // only doing two-input joins
1545 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1546 run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1547 check_type<int>::check_type_counter = 0;
1548 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1549 run_join_node_test<JP, tbb::flow::tuple<check_type<int>,int>, nonThrowing, isThrowing>();
1550 ASSERT(!check_type<int>::check_type_counter, "Dropped items in test");
1551 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1552 run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1553 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1554}
1555
1556// ------------------- limiter_node -------------
1557
1558template<
1559 class BufferItemType, //
1560 class SourceNodeType,
1561 class SourceNodeBodyType,
1562 class TestNodeType,
1563 class SinkNodeType,
1564 class SinkNodeBodyType >
1565void run_one_limiter_node_test(bool throwException,bool flog) {
1566 tbb::flow::graph g;
1567
1568 tbb::atomic<int> source_count;
1569 tbb::atomic<int> sink_count;
1570 source_count = sink_count = 0;
1571#if USE_TASK_SCHEDULER_OBSERVER
1572 eh_test_observer o;
1573 o.observe(true);
1574#endif
1575 g_Master = Harness::CurrentTid();
1576 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1577 TestNodeType node_to_test(g,g_NumThreads + 1);
1578 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1579 make_edge(source,node_to_test);
1580 make_edge(node_to_test, sink);
1581 for(int iter = 0; iter < 2; ++iter) {
1582 ResetGlobals(throwException,flog);
1583 if(throwException) {
1584 TRY();
1585 source.activate();
1586 g.wait_for_all();
1587 CATCH_AND_ASSERT();
1588 }
1589 else {
1590 TRY();
1591 source.activate();
1592 g.wait_for_all();
1593 CATCH_AND_FAIL();
1594 }
1595 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1596 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1597 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1598 if(throwException) {
1599 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1600 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1601 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1602 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1603 }
1604 else {
1605 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1606 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1607 // we stop after limiter's limit, which is g_NumThreads + 1. The source_node
1608 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1609 ASSERT(sb_cnt == g_NumThreads + 2, "Missing invocations of source_node");
1610 ASSERT(nb_cnt == g_NumThreads + 1, "Missing items in absorbers");
1611 }
1612 if(iter == 0) {
1613 remove_edge(node_to_test, sink);
1614 node_to_test.try_put(BufferItemType());
1615 node_to_test.try_put(BufferItemType());
1616 g.wait_for_all();
1617 g.reset();
1618 source_count = sink_count = 0;
1619 BufferItemType tmp;
1620 ASSERT(!node_to_test.try_get(tmp), "node not empty");
1621 make_edge(node_to_test, sink);
1622 g.wait_for_all();
1623 }
1624 else {
1625 g.reset();
1626 source_count = sink_count = 0;
1627 }
1628 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1629 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1630 }
1631
1632#if USE_TASK_SCHEDULER_OBSERVER
1633 o.observe(false);
1634#endif
1635}
1636
1637template<class BufferItemType,
1638 TestNodeTypeEnum SourceThrowType,
1639 TestNodeTypeEnum SinkThrowType>
1640void run_limiter_node_test() {
1641 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1642 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1643
1644 typedef tbb::flow::source_node<BufferItemType> SrcType;
1645 typedef tbb::flow::limiter_node<BufferItemType> LmtType;
1646 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1647
1648 for(int i = 0; i < 4; ++i) {
1649 if(i == 2) continue; // no need to test flog w/o throws
1650 bool throwException = (i & 0x1) != 0;
1651 bool doFlog = (i & 0x2) != 0;
1652 run_one_limiter_node_test<
1653 /* class BufferItemType*/ BufferItemType,
1654 /*class SourceNodeType*/ SrcType,
1655 /*class SourceNodeBodyType*/ SourceBodyType,
1656 /*class TestNodeType*/ LmtType,
1657 /*class SinkNodeType*/ SnkType,
1658 /*class SinkNodeBodyType*/ SinkBodyType
1659 >(throwException, doFlog);
1660 }
1661}
1662
1663void test_limiter_node() {
1664 REMARK("Testing limiter_node\n");
1665 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1666 run_limiter_node_test<int,isThrowing,nonThrowing>();
1667 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1668 run_limiter_node_test<int,nonThrowing,isThrowing>();
1669 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1670 run_limiter_node_test<int,isThrowing,isThrowing>();
1671 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1672}
1673
1674// -------- split_node --------------------
1675
1676template<
1677 class InputTuple,
1678 class SourceType,
1679 class SourceBodyType,
1680 class TestSplitType,
1681 class SinkType0,
1682 class SinkBodyType0,
1683 class SinkType1,
1684 class SinkBodyType1>
1685void run_one_split_node_test(bool throwException, bool flog) {
1686
1687 tbb::flow::graph g;
1688
1689 tbb::atomic<int> source_count;
1690 tbb::atomic<int> sink0_count;
1691 tbb::atomic<int> sink1_count;
1692 source_count = sink0_count = sink1_count = 0;
1693#if USE_TASK_SCHEDULER_OBSERVER
1694 eh_test_observer o;
1695 o.observe(true);
1696#endif
1697
1698 g_Master = Harness::CurrentTid();
1699 SourceType source(g, SourceBodyType(source_count),/*is_active*/false);
1700 TestSplitType node_to_test(g);
1701 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1702 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1703 make_edge(source, node_to_test);
1704 make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1705 make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1706
1707 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
1708 ResetGlobals(throwException,flog);
1709 if(throwException) {
1710 TRY();
1711 source.activate();
1712 g.wait_for_all();
1713 CATCH_AND_ASSERT();
1714 }
1715 else {
1716 TRY();
1717 source.activate();
1718 g.wait_for_all();
1719 CATCH_AND_FAIL();
1720 }
1721 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1722 int sb_cnt = tbb::flow::copy_body<SourceBodyType>(source).count_value();
1723 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1724 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1725 if(throwException) {
1726 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1727 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1728 ASSERT(sb_cnt <= 2*g_NumItems, "Too many items sent by source");
1729 ASSERT(nb0_cnt + nb1_cnt <= sb_cnt*2, "Too many items received by sink nodes");
1730 }
1731 else {
1732 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1733 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1734 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_nodes");
1735 ASSERT(nb0_cnt == g_NumItems && nb1_cnt == g_NumItems, "Missing items in absorbers");
1736 }
1737 g.reset(); // resets the body of the source_nodes and the absorb_nodes.
1738 source_count = sink0_count = sink1_count = 0;
1739 ASSERT(0 == tbb::flow::copy_body<SourceBodyType>(source).count_value(),"Reset source failed");
1740 ASSERT(0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(),"Reset sink 0 failed");
1741 ASSERT(0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(),"Reset sink 1 failed");
1742 }
1743#if USE_TASK_SCHEDULER_OBSERVER
1744 o.observe(false);
1745#endif
1746}
1747
1748template<class InputTuple,
1749 TestNodeTypeEnum SourceThrowType,
1750 TestNodeTypeEnum SinkThrowType>
1751void run_split_node_test() {
1752 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1753 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1754 typedef tuple_test_source_body<InputTuple,SourceThrowType> SourceBodyType;
1755 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1756 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1757
1758 typedef typename tbb::flow::source_node<InputTuple> SourceType;
1759 typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1760 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1761 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1762
1763 for(int i = 0; i < 4; ++i) {
1764 if(2 == i) continue;
1765 bool throwException = (i & 0x1) != 0;
1766 bool doFlog = (i & 0x2) != 0;
1767 run_one_split_node_test<
1768 InputTuple,
1769 SourceType,
1770 SourceBodyType,
1771 TestSplitType,
1772 SinkType0,
1773 SinkBodyType0,
1774 SinkType1,
1775 SinkBodyType1>
1776 (throwException,doFlog);
1777 }
1778}
1779
1780void test_split_node() {
1781 REMARK("Testing split_node\n");
1782 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1783 run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1784 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1785 run_split_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1786 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1787 run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1788 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1789}
1790
1791// --------- indexer_node ----------------------
1792
1793template < class InputTuple,
1794 class SourceType0,
1795 class SourceBodyType0,
1796 class SourceType1,
1797 class SourceBodyType1,
1798 class TestNodeType,
1799 class SinkType,
1800 class SinkBodyType>
1801void run_one_indexer_node_test(bool throwException,bool flog) {
1802 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1803 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1804
1805 tbb::flow::graph g;
1806
1807 tbb::atomic<int> source0_count;
1808 tbb::atomic<int> source1_count;
1809 tbb::atomic<int> sink_count;
1810 source0_count = source1_count = sink_count = 0;
1811#if USE_TASK_SCHEDULER_OBSERVER
1812 eh_test_observer o;
1813 o.observe(true);
1814#endif
1815 g_Master = Harness::CurrentTid();
1816 SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1817 SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1818 TestNodeType node_to_test(g);
1819 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1820 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1821 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1822 make_edge(node_to_test, sink);
1823 for(int iter = 0; iter < 2; ++iter) {
1824 ResetGlobals(throwException,flog);
1825 if(throwException) {
1826 TRY();
1827 source0.activate();
1828 source1.activate();
1829 g.wait_for_all();
1830 CATCH_AND_ASSERT();
1831 }
1832 else {
1833 TRY();
1834 source0.activate();
1835 source1.activate();
1836 g.wait_for_all();
1837 CATCH_AND_FAIL();
1838 }
1839 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1840 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1841 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1842 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1843 if(throwException) {
1844 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1845 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1846 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1847 ASSERT(nb_cnt <= sb0_cnt + sb1_cnt, "Too many items received by sink nodes");
1848 }
1849 else {
1850 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1851 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1852 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1853 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1854 ASSERT(nb_cnt == 2*g_NumItems, "Missing items in absorbers");
1855 }
1856 if(iter == 0) {
1857 remove_edge(node_to_test, sink);
1858 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1859 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1860 g.wait_for_all();
1861 g.reset();
1862 source0_count = source1_count = sink_count = 0;
1863 make_edge(node_to_test, sink);
1864 g.wait_for_all();
1865 }
1866 else {
1867 g.wait_for_all();
1868 g.reset();
1869 source0_count = source1_count = sink_count = 0;
1870 }
1871 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1872 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1873 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1874 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1875 }
1876
1877#if USE_TASK_SCHEDULER_OBSERVER
1878 o.observe(false);
1879#endif
1880}
1881
1882template<class InputTuple,
1883 TestNodeTypeEnum SourceThrowType,
1884 TestNodeTypeEnum SinkThrowType>
1885void run_indexer_node_test() {
1886 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1887 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1888 typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1889 typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1890 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1891 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1892
1893 typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1894 typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1895 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1896
1897 for(int i = 0; i < 4; ++i) {
1898 if(2 == i) continue;
1899 bool throwException = (i & 0x1) != 0;
1900 bool doFlog = (i & 0x2) != 0;
1901 run_one_indexer_node_test<
1902 InputTuple,
1903 SourceType0,
1904 SourceBodyType0,
1905 SourceType1,
1906 SourceBodyType1,
1907 TestNodeType,
1908 SinkType,
1909 SinkBodyType>(throwException,doFlog);
1910 }
1911}
1912
1913void test_indexer_node() {
1914 REMARK("Testing indexer_node\n");
1915 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1916 run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1917 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1918 run_indexer_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1919 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1920 run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1921 g_Wakeup_Msg = g_Orig_Wakeup_Msg;;
1922}
1923
1924///////////////////////////////////////////////
1925// whole-graph exception test
1926
1927class Foo {
1928private:
1929 // std::vector<int>& m_vec;
1930 std::vector<int>* m_vec;
1931public:
1932 Foo(std::vector<int>& vec) : m_vec(&vec) { }
1933 void operator() (tbb::flow::continue_msg) const {
1934 ++nExceptions;
1935 m_vec->at(m_vec->size()); // Will throw out_of_range exception
1936 ASSERT(false, "Exception not thrown by invalid access");
1937 }
1938};
1939
1940// test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1941// exception thrown in graph node, not caught in wait_for_all()
1942void
1943test_flow_graph_exception0() {
1944 // Initializes body
1945 std::vector<int> vec;
1946 vec.push_back(0);
1947 Foo f(vec);
1948 nExceptions = 0;
1949
1950 // Construct graph and nodes
1951 tbb::flow::graph g;
1952 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1953 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1954
1955 // Construct edge
1956 tbb::flow::make_edge(start, fooNode);
1957
1958 // Execute graph
1959 ASSERT(!g.exception_thrown(), "exception_thrown flag already set");
1960 ASSERT(!g.is_cancelled(), "canceled flag already set");
1961 try {
1962 start.try_put(tbb::flow::continue_msg());
1963 g.wait_for_all();
1964 ASSERT(false, "Exception not thrown");
1965 }
1966 catch(std::out_of_range& ex) {
1967 REMARK("Exception: %s (expected)\n", ex.what());
1968 }
1969 catch(...) {
1970 REMARK("Unknown exception caught (expected)\n");
1971 }
1972 ASSERT(nExceptions > 0, "Exception caught, but no body signaled exception being thrown");
1973 nExceptions = 0;
1974 ASSERT(g.exception_thrown(), "Exception not intercepted");
1975 // if exception set, cancellation also set.
1976 ASSERT(g.is_cancelled(), "Exception cancellation not signaled");
1977 // in case we got an exception
1978 try {
1979 g.wait_for_all(); // context still signalled canceled, my_exception still set.
1980 }
1981 catch(...) {
1982 ASSERT(false, "Second exception thrown but no task executing");
1983 }
1984 ASSERT(nExceptions == 0, "body signaled exception being thrown, but no body executed");
1985 ASSERT(!g.exception_thrown(), "exception_thrown flag not reset");
1986 ASSERT(!g.is_cancelled(), "canceled flag not reset");
1987}
1988
1989void TestOneThreadNum(int nThread) {
1990 REMARK("Testing %d threads\n", nThread);
1991 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1992 g_NumThreads = nThread;
1993 tbb::task_scheduler_init init(nThread);
1994 // whole-graph exception catch and rethrow test
1995 test_flow_graph_exception0();
1996 for(int i = 0; i < 4; ++i) {
1997 g_ExceptionInMaster = (i & 1) != 0;
1998 g_SolitaryException = (i & 2) != 0;
1999 REMARK("g_ExceptionInMaster == %s, g_SolitaryException == %s\n",
2000 g_ExceptionInMaster ? "T":"F",
2001 g_SolitaryException ? "T":"F");
2002 test_source_node();
2003 test_function_node();
2004 test_continue_node(); // also test broadcast_node
2005 test_multifunction_node();
2006 // single- and multi-item buffering nodes
2007 test_buffer_queue_and_overwrite_node();
2008 test_sequencer_node();
2009 test_priority_queue_node();
2010
2011 // join_nodes
2012 test_join_node<tbb::flow::queueing>();
2013 test_join_node<tbb::flow::reserving>();
2014 test_join_node<tbb::flow::tag_matching>();
2015
2016 test_limiter_node();
2017 test_split_node();
2018 // graph for write_once_node will be complicated by the fact the node will
2019 // not do try_puts after it has been set. To get parallelism of N we have
2020 // to attach N successor nodes to the write_once (or play some similar game).
2021 // test_write_once_node();
2022 test_indexer_node();
2023 }
2024}
2025#endif // TBB_USE_EXCEPTIONS
2026
2027#if TBB_USE_EXCEPTIONS
2028int TestMain() {
2029 // reversing the order of tests
2030 for(int nThread=MaxThread; nThread >= MinThread; --nThread) {
2031 TestOneThreadNum(nThread);
2032 }
2033
2034 return Harness::Done;
2035}
2036#else
2037int TestMain() {
2038 return Harness::Skipped;
2039}
2040#endif // TBB_USE_EXCEPTIONS
2041