1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef tbb_test_join_node_H
18#define tbb_test_join_node_H
19
20#if _MSC_VER
21// Suppress "decorated name length exceeded, name was truncated" warning
22#if __INTEL_COMPILER
23#pragma warning( disable: 2586 )
24#else
25#pragma warning( disable: 4503 )
26#endif
27#endif
28
29#include "harness_graph.h"
30#include "harness_checktype.h"
31
32#include "tbb/flow_graph.h"
33#include "tbb/task_scheduler_init.h"
34
35#define __TBB_MIC_OFFLOAD_TEST_COMPILATION_BROKEN __TBB_MIC_OFFLOAD
36
37const char *names[] = {
38 "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod",
39 "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven",
40 "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany",
41 "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline",
42 "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil",
43 "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin",
44 "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer",
45 "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore",
46 "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte",
47 "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia",
48 "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet",
49 "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan",
50 "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver",
51 "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto",
52 "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve",
53 "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia",
54 "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola",
55 "Yaritza", "Zanthe"};
56
57static const int NameCnt = sizeof(names)/sizeof(char *);
58
59template<typename K>
60struct index_to_key {
61 K operator()(const int indx) {
62 return (K)(3*indx+1);
63 }
64};
65
66template<>
67struct index_to_key<std::string> {
68 std::string operator()(const int indx) {
69 return std::string(names[indx % NameCnt]);
70 }
71};
72
73template<typename K>
74struct K_deref {
75 typedef K type;
76};
77
78template<typename K>
79struct K_deref<K&> {
80 typedef K type;
81};
82
83template<typename K, typename V>
84struct MyKeyFirst {
85 K my_key;
86 V my_value;
87 MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) {
88 }
89 void print_val() const {
90 REMARK("MyKeyFirst{"); print_my_value(my_key); REMARK(","); print_my_value(my_value); REMARK("}");
91 }
92 operator int() const { return (int)my_value; }
93};
94
95template<typename K, typename V>
96struct MyKeySecond {
97 V my_value;
98 K my_key;
99 MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
100 }
101 void print_val() const {
102 REMARK("MyKeySecond{"); print_my_value(my_key); REMARK(","); print_my_value(my_value); REMARK("}");
103 }
104 operator int() const { return (int)my_value; }
105};
106
107template<typename K, typename V>
108struct MyMessageKeyWithoutKey {
109 V my_value;
110 K my_message_key;
111 MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
112 }
113 void print_val() const {
114 REMARK("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); REMARK(","); print_my_value(my_value); REMARK("}");
115 }
116 operator int() const { return (int)my_value; }
117 const K& key() const {
118 return my_message_key;
119 }
120};
121
122template<typename K, typename V>
123struct MyMessageKeyWithBrokenKey {
124 V my_value;
125 K my_key;
126 K my_message_key;
127 MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) {
128 }
129 void print_val() const {
130 REMARK("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); REMARK(","); print_my_value(my_value); REMARK("}");
131 }
132 operator int() const { return (int)my_value; }
133 const K& key() const {
134 return my_message_key;
135 }
136
137};
138
139template<typename K, typename V>
140struct MyKeyWithBrokenMessageKey {
141 V my_value;
142 K my_key;
143 MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
144 }
145 void print_val() const {
146 REMARK("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); REMARK(","); print_my_value(my_value); REMARK("}");
147 }
148 operator int() const { return (int)my_value; }
149 K key() const {
150 ASSERT(false, "The method should never be called");
151 return K();
152 }
153};
154
155template<typename K, typename V>
156struct MyMessageKeyWithoutKeyMethod {
157 V my_value;
158 K my_message_key;
159 MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
160 }
161 void print_val() const {
162 REMARK("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); REMARK(","); print_my_value(my_value); REMARK("}");
163 }
164 operator int() const { return (int)my_value; }
165#if __TBB_COMPLICATED_ADL_BROKEN
166 const K& key() const { return my_message_key; }
167#endif
168 //K key() const; // Do not define
169};
170
171// Overload for MyMessageKeyWithoutKeyMethod
172template <typename K, typename V>
173K key_from_message(const MyMessageKeyWithoutKeyMethod<typename tbb::internal::strip<K>::type, V> &m) {
174 return m.my_message_key;
175}
176
177
178// pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple
179template<typename TT, size_t INDEX>
180struct make_thingie {
181 TT operator()(int const &i) {
182 return TT(i * (INDEX+1));
183 }
184};
185
186template<template <typename, typename> class T, typename K, typename V, size_t INDEX>
187struct make_thingie<T<K, V>, INDEX> {
188 T<K, V> operator()(int const &i) {
189 return T<K, V>(i, i*(INDEX+1));
190 }
191};
192
193// cast_from<T>::my_int_val(i);
194template<typename T>
195struct cast_from {
196 static int my_int_val(T const &i) { return (int)i; }
197};
198
199template<typename K, typename V>
200struct cast_from<MyKeyFirst<K, V> > {
201 static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); }
202};
203
204template<typename K, typename V>
205struct cast_from<MyKeySecond<K, V> > {
206 static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); }
207};
208
209template<typename T>
210void print_my_value(T const &i) {
211 REMARK(" %d ", cast_from<T>::my_int_val(i));
212}
213
214template<typename K, typename V>
215void print_my_value(MyKeyFirst<K, V> const &i) {
216 i.print_val();
217}
218
219template<typename K, typename V>
220void print_my_value(MyKeySecond<K, V> const &i) {
221 i.print_val();
222}
223
224template<>
225void print_my_value(std::string const &i) {
226 REMARK("\"%s\"", i.c_str());
227}
228
229//
230// Tests
231//
232
233//!
234// my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key
235template<class K, typename V>
236struct my_struct_key {
237 K operator()(const V& mv) {
238 return mv.my_key;
239 }
240};
241
242// specialization returning reference to my_key.
243template<class K, typename V>
244struct my_struct_key<K&, V> {
245 const K& operator()(const V& mv) {
246 return const_cast<const K&>(mv.my_key);
247 }
248};
249
250using tbb::internal::is_ref;
251
252template<class K, class V> struct VtoKFB {
253 typedef tbb::flow::interface10::internal::type_to_key_function_body<V, K> type;
254};
255
256template<typename K> struct make_hash_compare { typedef typename tbb::tbb_hash_compare<K> type; };
257
258template<typename K, class V>
259void hash_buffer_test(const char *sname) {
260 typedef typename K_deref<K>::type KnoR;
261 tbb::flow::interface10::internal::hash_buffer<
262 K,
263 V,
264 typename VtoKFB<K, V>::type,
265 tbb::tbb_hash_compare<KnoR>
266 > my_hash_buffer;
267 const bool k_is_ref = is_ref<K>::value;
268 typedef tbb::flow::interface10::internal::type_to_key_function_body_leaf<
269 V, K, my_struct_key<K, V> > my_func_body_type;
270 typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>());
271 my_hash_buffer.set_key_func(kp);
272 REMARK("Running hash_buffer test on %s; is ref == %s\n", sname, k_is_ref ? "true" : "false");
273 V mv1, mv0;
274 bool res;
275 for(int cnt = 0; cnt < 2; ++cnt) {
276 // insert 50 items after checking they are not already in the table
277 for(int i = 0; i < 50; ++i) {
278 KnoR kk = index_to_key<KnoR>()(i);
279 mv1.my_key = kk;
280 mv1.my_value = 0.5*i;
281 res = my_hash_buffer.find_with_key(kk, mv0);
282 ASSERT(!res, "Found non-inserted item");
283 res = my_hash_buffer.insert_with_key(mv1);
284 ASSERT(res, "insert failed");
285 res = my_hash_buffer.find_with_key(kk, mv0);
286 ASSERT(res, "not found after insert");
287 ASSERT(mv0.my_value==mv1.my_value, "result not correct");
288 }
289 // go backwards checking they are still there.
290 for(int i = 49; i>=0; --i) {
291 KnoR kk = index_to_key<KnoR>()(i);
292 double value = 0.5*i;
293 res = my_hash_buffer.find_with_key(kk, mv0);
294 ASSERT(res, "find failed");
295 ASSERT(mv0.my_value==value, "result not correct");
296 }
297 // delete every third item, check they are gone
298 for(int i = 0; i < 50; i += 3) {
299 KnoR kk = index_to_key<KnoR>()(i);
300 my_hash_buffer.delete_with_key(kk);
301 res = my_hash_buffer.find_with_key(kk, mv0);
302 ASSERT(!res, "Found deleted item");
303 }
304 // check the deleted items are gone, the non-deleted items are there.
305 for(int i = 0; i < 50; ++i) {
306 KnoR kk = index_to_key<KnoR>()(i);
307 double value = 0.5*i;
308 if(i%3==0) {
309 res = my_hash_buffer.find_with_key(kk, mv0);
310 ASSERT(!res, "found an item that was previously deleted");
311 }
312 else {
313 res = my_hash_buffer.find_with_key(kk, mv0);
314 ASSERT(res, "find failed");
315 ASSERT(mv0.my_value==value, "result not correct");
316 }
317 }
318 // insert new items, check the deleted items return true, the non-deleted items return false.
319 for(int i = 0; i < 50; ++i) {
320 KnoR kk = index_to_key<KnoR>()(i);
321 double value = 1.5*i;
322 mv1.my_key = kk;
323 mv1.my_value = value;
324 res = my_hash_buffer.insert_with_key(mv1);
325 if(i%3==0) {
326 ASSERT(res, "didn't insert in empty slot");
327 }
328 else {
329 ASSERT(!res, "slot was empty on insert");
330 }
331 }
332 // delete all items
333 for(int i = 0; i < 50; ++i) {
334 KnoR kk = index_to_key<KnoR>()(i);
335 my_hash_buffer.delete_with_key(kk);
336 res = my_hash_buffer.find_with_key(kk, mv0);
337 ASSERT(!res, "Found deleted item");
338 }
339 } // perform tasks twice
340}
341
342void
343TestTaggedBuffers() {
344 hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>");
345 hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&");
346 hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>");
347
348 hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>");
349 hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&");
350}
351
352#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
353template< typename T, typename NODE_TYPE >
354class test_join_base_extract : NoAssign {
355protected:
356 typedef typename NODE_TYPE::output_type tuple_t;
357 typedef tbb::flow::queue_node<T> in_queue_t;
358 typedef tbb::flow::queue_node<tuple_t> out_queue_t;
359
360 tbb::flow::graph &g;
361 in_queue_t &in0;
362 in_queue_t &in1;
363 in_queue_t &in2;
364 NODE_TYPE &middle;
365 out_queue_t &out0;
366 out_queue_t &out1;
367 in_queue_t *ins[3];
368 out_queue_t *outs[2];
369 typename in_queue_t::successor_type *ms_p0_ptr;
370 typename in_queue_t::successor_type *ms_p1_ptr;
371 typename out_queue_t::predecessor_type *mp_ptr;
372 typename in_queue_t::predecessor_list_type in0_p_list;
373 typename in_queue_t::successor_list_type in0_s_list;
374 typename in_queue_t::predecessor_list_type in1_p_list;
375 typename in_queue_t::successor_list_type in1_s_list;
376 typename in_queue_t::predecessor_list_type in2_p_list;
377 typename in_queue_t::successor_list_type in2_s_list;
378 typename out_queue_t::predecessor_list_type out0_p_list;
379 typename out_queue_t::successor_list_type out0_s_list;
380 typename out_queue_t::predecessor_list_type out1_p_list;
381 typename out_queue_t::successor_list_type out1_s_list;
382 typename in_queue_t::predecessor_list_type mp0_list;
383 typename in_queue_t::predecessor_list_type mp1_list;
384 typename out_queue_t::successor_list_type ms_list;
385
386 virtual void set_up_lists() {
387 in0_p_list.clear();
388 in0_s_list.clear();
389 in1_p_list.clear();
390 in1_s_list.clear();
391 in2_p_list.clear();
392 in2_s_list.clear();
393 out0_p_list.clear();
394 out0_s_list.clear();
395 out1_p_list.clear();
396 out1_s_list.clear();
397 mp0_list.clear();
398 mp1_list.clear();
399 ms_list.clear();
400
401 in0.copy_predecessors(in0_p_list);
402 in0.copy_successors(in0_s_list);
403 in1.copy_predecessors(in1_p_list);
404 in1.copy_successors(in1_s_list);
405 in2.copy_predecessors(in2_p_list);
406 in2.copy_successors(in2_s_list);
407 tbb::flow::input_port<0>(middle).copy_predecessors(mp0_list);
408 tbb::flow::input_port<1>(middle).copy_predecessors(mp1_list);
409 middle.copy_successors(ms_list);
410 out0.copy_predecessors(out0_p_list);
411 out0.copy_successors(out0_s_list);
412 out1.copy_predecessors(out1_p_list);
413 out1.copy_successors(out1_s_list);
414 }
415
416 void check_tuple(T &r, tuple_t &v) {
417 T t0 = tbb::flow::get<0>(v);
418 T t1 = tbb::flow::get<1>(v);
419 ASSERT((t0==1||t0==2)&&(t0&r)==0, "duplicate value");
420 r |= t0;
421 ASSERT((t1==4||t1==8)&&(t1&r)==0, "duplicate value");
422 r |= t1;
423 }
424
425 void make_and_validate_full_graph() {
426 /* in0 */
427 /* \ */
428 /* port0 out0 */
429 /* / | / */
430 /* in1 middle */
431 /* | \ */
432 /* in2 - port1 out1 */
433 tbb::flow::make_edge(in0, tbb::flow::input_port<0>(middle));
434 tbb::flow::make_edge(in1, tbb::flow::input_port<0>(middle));
435 tbb::flow::make_edge(in2, tbb::flow::input_port<1>(middle));
436 tbb::flow::make_edge(middle, out0);
437 tbb::flow::make_edge(middle, out1);
438
439 set_up_lists();
440
441 ASSERT(in0.predecessor_count()==0&&in0_p_list.size()==0, "expected 0 predecessors");
442 ASSERT(in0.successor_count()==1&&in0_s_list.size()==1&&*(in0_s_list.begin())==ms_p0_ptr, "expected 1 successor");
443 ASSERT(in1.predecessor_count()==0&&in1_p_list.size()==0, "expected 0 predecessors");
444 ASSERT(in1.successor_count()==1&&in1_s_list.size()==1&&*(in1_s_list.begin())==ms_p0_ptr, "expected 1 successor");
445 ASSERT(in2.predecessor_count()==0&&in2_p_list.size()==0, "expected 0 predecessors");
446 ASSERT(in2.successor_count()==1&&in2_s_list.size()==1&&*(in2_s_list.begin())==ms_p1_ptr, "expected 1 successor");
447 ASSERT(tbb::flow::input_port<0>(middle).predecessor_count()==2&&mp0_list.size()==2, "expected 2 predecessors");
448 ASSERT(tbb::flow::input_port<1>(middle).predecessor_count()==1&&mp1_list.size()==1, "expected 1 predecessors");
449 ASSERT(middle.successor_count()==2&&ms_list.size()==2, "expected 2 successors");
450 ASSERT(out0.predecessor_count()==1&&out0_p_list.size()==1&&*(out0_p_list.begin())==mp_ptr, "expected 1 predecessor");
451 ASSERT(out0.successor_count()==0&&out0_s_list.size()==0, "expected 0 successors");
452 ASSERT(out1.predecessor_count()==1&&out1_p_list.size()==1&&*(out1_p_list.begin())==mp_ptr, "expected 1 predecessor");
453 ASSERT(out1.successor_count()==0&&out1_s_list.size()==0, "expected 0 successors");
454
455 typename in_queue_t::predecessor_list_type::iterator mp0_list_iter = mp0_list.begin(); ++mp0_list_iter;
456 int first_pred = *(mp0_list.begin())==ins[0] ? 0 : (*(mp0_list.begin())==ins[1] ? 1 : -1);
457 int second_pred = *mp0_list_iter==ins[0] ? 0 : (*mp0_list_iter==ins[1] ? 1 : -1);
458 ASSERT(first_pred!=-1&&second_pred!=-1&&first_pred!=second_pred, "bad predecessor(s) for middle port 0");
459
460 ASSERT(*(mp1_list.begin())==ins[2], "bad predecessor for middle port 1");
461
462 typename out_queue_t::successor_list_type::iterator ms_list_iter = ms_list.begin(); ++ms_list_iter;
463 int first_succ = *(ms_list.begin())==outs[0] ? 0 : (*(ms_list.begin())==outs[1] ? 1 : -1);
464 int second_succ = *ms_list_iter==outs[0] ? 0 : (*ms_list_iter==outs[1] ? 1 : -1);
465 ASSERT(first_succ!=-1&&second_succ!=-1&&first_succ!=second_succ, "bad successor(s) for middle");
466
467 in0.try_put(1);
468 in1.try_put(2);
469 in2.try_put(8);
470 in2.try_put(4);
471 g.wait_for_all();
472
473 T v_in;
474 tuple_t v;
475
476 ASSERT(in0.try_get(v_in)==false, "buffer should not have a value");
477 ASSERT(in1.try_get(v_in)==false, "buffer should not have a value");
478 ASSERT(in1.try_get(v_in)==false, "buffer should not have a value");
479 ASSERT(in2.try_get(v_in)==false, "buffer should not have a value");
480 ASSERT(in2.try_get(v_in)==false, "buffer should not have a value");
481
482 T r = 0;
483 while(out0.try_get(v)) {
484 check_tuple(r, v);
485 g.wait_for_all();
486 }
487 ASSERT(r==15, "not all values received");
488
489 r = 0;
490 while(out1.try_get(v)) {
491 check_tuple(r, v);
492 g.wait_for_all();
493 }
494 ASSERT(r==15, "not all values received");
495 g.wait_for_all();
496 }
497
498 void validate_partial_graph() {
499 /* in0 */
500 /* */
501 /* port0 out0 */
502 /* / | */
503 /* in1 middle */
504 /* | \ */
505 /* in2 - port1 out1 */
506 set_up_lists();
507
508 ASSERT(in0.predecessor_count()==0&&in0_p_list.size()==0, "expected 0 predecessors");
509 ASSERT(in0.successor_count()==0&&in0_s_list.size()==0, "expected 0 successors");
510 ASSERT(in1.predecessor_count()==0&&in1_p_list.size()==0, "expected 0 predecessors");
511 ASSERT(in1.successor_count()==1&&in1_s_list.size()==1&&*(in1_s_list.begin())==ms_p0_ptr, "expected 1 successor");
512 ASSERT(in2.predecessor_count()==0&&in2_p_list.size()==0, "expected 0 predecessors");
513 ASSERT(in2.successor_count()==1&&in2_s_list.size()==1&&*(in2_s_list.begin())==ms_p1_ptr, "expected 1 successor");
514 ASSERT(tbb::flow::input_port<0>(middle).predecessor_count()==1&&mp0_list.size()==1&&*(mp0_list.begin())==ins[1], "expected 1 predecessor");
515 ASSERT(tbb::flow::input_port<1>(middle).predecessor_count()==1&&mp1_list.size()==1&&*(mp1_list.begin())==ins[2], "expected 1 predecessor");
516 ASSERT(middle.successor_count()==1&&ms_list.size()==1&&*(ms_list.begin())==outs[1], "expected 1 successor");
517 ASSERT(out0.predecessor_count()==0&&out0_p_list.size()==0, "expected 1 predecessor");
518 ASSERT(out0.successor_count()==0&&out0_s_list.size()==0, "expected 0 successors");
519 ASSERT(out1.predecessor_count()==1&&out1_p_list.size()==1&&*(out1_p_list.begin())==mp_ptr, "expected 1 predecessor");
520 ASSERT(out1.successor_count()==0&&out1_s_list.size()==0, "expected 0 successors");
521
522 in0.try_put(1);
523 in1.try_put(2);
524 in2.try_put(8);
525 in2.try_put(4);
526 g.wait_for_all();
527
528 T v_in;
529 tuple_t v;
530
531 ASSERT(in0.try_get(v_in)==true&&v_in==1, "buffer should have a value of 1");
532 ASSERT(in1.try_get(v_in)==false, "buffer should not have a value");
533 ASSERT(out0.try_get(v)==false, "buffer should not have a value");
534 ASSERT(out1.try_get(v)==true&&tbb::flow::get<0>(v)==2&&tbb::flow::get<1>(v)==8, "buffer should have a value of < 2, 8 >");
535 ASSERT(in0.try_get(v_in)==false, "buffer should not have a value");
536 g.wait_for_all();
537 g.reset(); // for queueing and tag_matching the 4 is now in the join
538 }
539
540 void validate_empty_graph() {
541 /* in0 */
542 /* */
543 /* port0 out0 */
544 /* | */
545 /* in1 middle */
546 /* | */
547 /* in2 port1 out1 */
548 set_up_lists();
549
550 ASSERT(in0.predecessor_count()==0&&in0_p_list.size()==0, "expected 0 predecessors");
551 ASSERT(in0.successor_count()==0&&in0_s_list.size()==0, "expected 0 successors");
552 ASSERT(in1.predecessor_count()==0&&in1_p_list.size()==0, "expected 0 predecessors");
553 ASSERT(in1.successor_count()==0&&in1_s_list.size()==0, "expected 0 successors");
554 ASSERT(in2.predecessor_count()==0&&in2_p_list.size()==0, "expected 0 predecessors");
555 ASSERT(in2.successor_count()==0&&in2_s_list.size()==0, "expected 0 successors");
556 ASSERT(tbb::flow::input_port<0>(middle).predecessor_count()==0&&mp0_list.size()==0, "expected 0 predecessors");
557 ASSERT(tbb::flow::input_port<1>(middle).predecessor_count()==0&&mp1_list.size()==0, "expected 0 predecessors");
558 ASSERT(middle.successor_count()==0&&ms_list.size()==0, "expected 0 successors");
559 ASSERT(out0.predecessor_count()==0&&out0_p_list.size()==0, "expected 0 predecessors");
560 ASSERT(out0.successor_count()==0&&out0_s_list.size()==0, "expected 0 successors");
561 ASSERT(out1.predecessor_count()==0&&out1_p_list.size()==0, "expected 0 predecessors");
562 ASSERT(out1.successor_count()==0&&out1_s_list.size()==0, "expected 0 successors");
563
564 in0.try_put(1);
565 in1.try_put(2);
566 in2.try_put(8);
567 in2.try_put(4);
568 g.wait_for_all();
569
570 T v_in;
571 tuple_t v;
572
573 ASSERT(in0.try_get(v_in)==true&&v_in==1, "buffer should have a value of 1");
574 ASSERT(in1.try_get(v_in)==true&&v_in==2, "buffer should have a value of 2");
575 ASSERT(in2.try_get(v_in)==true&&v_in==8, "buffer should have a value of 8");
576 ASSERT(in2.try_get(v_in)==true&&v_in==4, "buffer should have a value of 4");
577 ASSERT(out0.try_get(v)==false, "buffer should not have a value");
578 ASSERT(out1.try_get(v)==false, "buffer should not have a value");
579 g.wait_for_all();
580 g.reset(); // NOTE: this should not be necessary!!!!! But it is!!!!
581 }
582
583public:
584
585 test_join_base_extract(tbb::flow::graph &_g, in_queue_t &_in0, in_queue_t &_in1, in_queue_t &_in2, NODE_TYPE &m, out_queue_t &_out0, out_queue_t &_out1):
586 g(_g), in0(_in0), in1(_in1), in2(_in2), middle(m), out0(_out0), out1(_out1) {
587 ins[0] = &in0;
588 ins[1] = &in1;
589 ins[2] = &in2;
590 outs[0] = &out0;
591 outs[1] = &out1;
592 ms_p0_ptr = static_cast< typename in_queue_t::successor_type * >(&tbb::flow::input_port<0>(middle));
593 ms_p1_ptr = static_cast< typename in_queue_t::successor_type * >(&tbb::flow::input_port<1>(middle));
594 mp_ptr = static_cast< typename out_queue_t::predecessor_type *>(&middle);
595 }
596
597 virtual ~test_join_base_extract() {}
598
599 void run_tests() {
600 REMARK("full graph\n");
601 make_and_validate_full_graph();
602
603 in0.extract();
604 out0.extract();
605 REMARK("partial graph\n");
606 validate_partial_graph();
607
608 in1.extract();
609 in2.extract();
610 out1.extract();
611 REMARK("empty graph\n");
612 validate_empty_graph();
613
614 REMARK("full graph\n");
615 make_and_validate_full_graph();
616
617 middle.extract();
618 REMARK("empty graph\n");
619 validate_empty_graph();
620
621 REMARK("full graph\n");
622 make_and_validate_full_graph();
623
624 in0.extract();
625 in1.extract();
626 in2.extract();
627 middle.extract();
628 REMARK("empty graph\n");
629 validate_empty_graph();
630
631 REMARK("full graph\n");
632 make_and_validate_full_graph();
633
634 out0.extract();
635 out1.extract();
636 middle.extract();
637 REMARK("empty graph\n");
638 validate_empty_graph();
639
640 REMARK("full graph\n");
641 make_and_validate_full_graph();
642 }
643};
644
645template< typename T, typename NODE_TYPE >
646class test_join_extract : public test_join_base_extract< T, NODE_TYPE > {
647protected:
648 typedef typename NODE_TYPE::output_type tuple_t;
649 typedef tbb::flow::queue_node<T> in_queue_t;
650 typedef tbb::flow::queue_node<tuple_t> out_queue_t;
651
652 tbb::flow::graph my_g;
653 in_queue_t my_in0;
654 in_queue_t my_in1;
655 in_queue_t my_in2;
656 NODE_TYPE my_middle;
657 out_queue_t my_out0;
658 out_queue_t my_out1;
659
660public:
661 test_join_extract(): test_join_base_extract<T, NODE_TYPE>(my_g, my_in0, my_in1, my_in2, my_middle, my_out0, my_out1),
662 my_in0(my_g), my_in1(my_g), my_in2(my_g), my_middle(my_g), my_out0(my_g), my_out1(my_g) { }
663};
664
665template< typename T >
666class test_join_extract<T, tbb::flow::join_node< tbb::flow::tuple<T, T>, tbb::flow::tag_matching> > :
667 public test_join_base_extract< T, tbb::flow::join_node< tbb::flow::tuple<T, T>, tbb::flow::tag_matching> > {
668protected:
669 typedef tbb::flow::join_node< tbb::flow::tuple<T, T>, tbb::flow::tag_matching> my_node_t;
670
671 typedef typename my_node_t::output_type tuple_t;
672 typedef tbb::flow::queue_node<T> in_queue_t;
673 typedef tbb::flow::queue_node<tuple_t> out_queue_t;
674
675 tbb::flow::graph my_g;
676 in_queue_t my_in0;
677 in_queue_t my_in1;
678 in_queue_t my_in2;
679 my_node_t my_middle;
680 out_queue_t my_out0;
681 out_queue_t my_out1;
682 struct tag_match_0 { size_t operator()(T v) { return v; } };
683 struct tag_match_1 { size_t operator()(T v) { return v/4; } };
684public:
685 test_join_extract(): test_join_base_extract<T, my_node_t>(my_g, my_in0, my_in1, my_in2, my_middle, my_out0, my_out1),
686 my_in0(my_g), my_in1(my_g), my_in2(my_g), my_middle(my_g, tag_match_0(), tag_match_1()), my_out0(my_g), my_out1(my_g) { }
687};
688#endif
689
690struct threebyte {
691 unsigned char b1;
692 unsigned char b2;
693 unsigned char b3;
694 threebyte(int i = 0) {
695 b1 = (unsigned char)(i&0xFF);
696 b2 = (unsigned char)((i>>8)&0xFF);
697 b3 = (unsigned char)((i>>16)&0xFF);
698 }
699 threebyte(const threebyte &other): b1(other.b1), b2(other.b2), b3(other.b3) { }
700 operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); }
701};
702
703const int Count = 150;
704
705const int Recirc_count = 1000; // number of tuples to be generated
706const int MaxPorts = 10;
707const int MaxNSources = 5; // max # of source_nodes to register for each join_node input in parallel test
708bool outputCheck[MaxPorts][Count]; // for checking output
709
710void
711check_outputCheck(int nUsed, int maxCnt) {
712 for(int i = 0; i < nUsed; ++i) {
713 for(int j = 0; j < maxCnt; ++j) {
714 ASSERT(outputCheck[i][j], NULL);
715 }
716 }
717}
718
719void
720reset_outputCheck(int nUsed, int maxCnt) {
721 for(int i = 0; i < nUsed; ++i) {
722 for(int j = 0; j < maxCnt; ++j) {
723 outputCheck[i][j] = false;
724 }
725 }
726}
727
728template<typename T>
729class name_of {
730public:
731 static const char* name() { return "Unknown"; }
732};
733template<typename T>
734class name_of<check_type<T> > {
735public:
736 static const char* name() { return "checktype"; }
737};
738template<>
739class name_of<int> {
740public:
741 static const char* name() { return "int"; }
742};
743template<>
744class name_of<float> {
745public:
746 static const char* name() { return "float"; }
747};
748template<>
749class name_of<double> {
750public:
751 static const char* name() { return "double"; }
752};
753template<>
754class name_of<long> {
755public:
756 static const char* name() { return "long"; }
757};
758template<>
759class name_of<short> {
760public:
761 static const char* name() { return "short"; }
762};
763template<>
764class name_of<threebyte> {
765public:
766 static const char* name() { return "threebyte"; }
767};
768template<>
769class name_of<std::string> {
770public:
771 static const char* name() { return "std::string"; }
772};
773template<typename K, typename V>
774class name_of<MyKeyFirst<K, V> > {
775public:
776 static const char* name() { return "MyKeyFirst<K,V>"; }
777};
778template<typename K, typename V>
779class name_of<MyKeySecond<K, V> > {
780public:
781 static const char* name() { return "MyKeySecond<K,V>"; }
782};
783
784// The additional policy to differ message based key matching from usual key matching.
785// It only has sense for the test because join_node is created with the key_matching policy for the both cases.
786template <typename K, typename KHash = tbb::tbb_hash_compare<typename tbb::internal::strip<K>::type > >
787struct message_based_key_matching {};
788
789// test for key_matching
790template<class JP>
791struct is_key_matching_join {
792 static const bool value;
793 typedef int key_type; // have to define it to something
794};
795
796template<class JP>
797const bool is_key_matching_join<JP>::value = false;
798
799template<class K, class KHash>
800struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > {
801 static const bool value;
802 typedef K key_type;
803};
804
805template<class K, class KHash>
806const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true;
807
808template<class K, class KHash>
809struct is_key_matching_join<message_based_key_matching<K, KHash> > {
810 static const bool value;
811 typedef K key_type;
812};
813
814template<class K, class KHash>
815const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true;
816
817// for recirculating tags, input is tuple<index,continue_msg>
818// output is index*my_mult cast to the right type
819template<typename TT>
820class recirc_func_body {
821 TT my_mult;
822public:
823 typedef tbb::flow::tuple<int, tbb::flow::continue_msg> input_type;
824 recirc_func_body(TT multiplier): my_mult(multiplier) {}
825 recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { }
826 void operator=(const recirc_func_body &other) { my_mult = other.my_mult; }
827 TT operator()(const input_type &v) {
828 return TT(tbb::flow::get<0>(v)) * my_mult;
829 }
830};
831
832static int input_count; // source_nodes are serial
833
834// emit input_count continue_msg
835class recirc_source_node_body {
836public:
837 bool operator()(tbb::flow::continue_msg &v) {
838 --input_count;
839 v = tbb::flow::continue_msg();
840 return 0<=input_count;
841 }
842};
843
844// T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
845// so the max number generated right now is 1500 or so.) Source will generate a series of TT with value
846// (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body. We are attaching addend
847// source nodes to a join_port, and each will generate part of the numerical series the port is expecting
848// to receive. If there is only one source node, the series order will be maintained; if more than one,
849// this is not guaranteed.
850template<typename TT, size_t INDEX>
851class source_body {
852 int my_count;
853 int addend;
854public:
855 source_body(int init_val, int addto): my_count(init_val), addend(addto) { }
856 void operator=(const source_body& other) { my_count = other.my_count; addend = other.addend; }
857 bool operator()(TT &v) {
858 int lc = my_count;
859 v = make_thingie<TT, INDEX>()(my_count);
860 my_count += addend;
861 return lc < Count;
862 }
863};
864
865template<typename TT>
866class tag_func {
867 TT my_mult;
868public:
869 tag_func(TT multiplier): my_mult(multiplier) { }
870 void operator=(const tag_func& other) { my_mult = other.my_mult; }
871 // operator() will return [0 .. Count)
872 tbb::flow::tag_value operator()(TT v) {
873 tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult);
874 return t;
875 }
876};
877
878template <class JP>
879struct filter_out_message_based_key_matching {
880 typedef JP policy;
881};
882
883template <typename K, typename KHash>
884struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > {
885 // To have message based key matching in join_node, the key_matchig policy should be specified.
886 typedef tbb::flow::key_matching<K, KHash> policy;
887};
888
889// allocator for join_node. This is specialized for tag_matching and key_matching joins because they require a variable number
890// of tag_value methods passed to the constructor
891
892template<int N, typename JType, class JP>
893class makeJoin {
894public:
895 static JType *create(tbb::flow::graph& g) {
896 JType *temp = new JType(g);
897 return temp;
898 }
899 static void destroy(JType *p) { delete p; }
900};
901
902// for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field.
903//
904template<typename JType, typename K, typename KHash>
905class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > {
906 typedef typename JType::output_type TType;
907 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
908 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
909public:
910 static JType *create(tbb::flow::graph& g) {
911 JType *temp = new JType(g,
912 my_struct_key<K, T0>(),
913 my_struct_key<K, T1>()
914 );
915 return temp;
916 }
917 static void destroy(JType *p) { delete p; }
918};
919
920template<typename JType>
921class makeJoin<2, JType, tbb::flow::tag_matching> {
922 typedef typename JType::output_type TType;
923 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
924 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
925public:
926 static JType *create(tbb::flow::graph& g) {
927 JType *temp = new JType(g,
928 tag_func<T0>(T0(2)),
929 tag_func<T1>(T1(3))
930 );
931 return temp;
932 }
933 static void destroy(JType *p) { delete p; }
934};
935
936#if MAX_TUPLE_TEST_SIZE >= 3
937template<typename JType, typename K, typename KHash>
938class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > {
939 typedef typename JType::output_type TType;
940 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
941 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
942 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
943public:
944 static JType *create(tbb::flow::graph& g) {
945 JType *temp = new JType(g,
946 my_struct_key<K, T0>(),
947 my_struct_key<K, T1>(),
948 my_struct_key<K, T2>()
949 );
950 return temp;
951 }
952 static void destroy(JType *p) { delete p; }
953};
954
955template<typename JType>
956class makeJoin<3, JType, tbb::flow::tag_matching> {
957 typedef typename JType::output_type TType;
958 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
959 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
960 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
961public:
962 static JType *create(tbb::flow::graph& g) {
963 JType *temp = new JType(g,
964 tag_func<T0>(T0(2)),
965 tag_func<T1>(T1(3)),
966 tag_func<T2>(T2(4))
967 );
968 return temp;
969 }
970 static void destroy(JType *p) { delete p; }
971};
972
973#endif
974#if MAX_TUPLE_TEST_SIZE >= 4
975
976template<typename JType, typename K, typename KHash>
977class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > {
978 typedef typename JType::output_type TType;
979 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
980 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
981 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
982 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
983public:
984 static JType *create(tbb::flow::graph& g) {
985 JType *temp = new JType(g,
986 my_struct_key<K, T0>(),
987 my_struct_key<K, T1>(),
988 my_struct_key<K, T2>(),
989 my_struct_key<K, T3>()
990 );
991 return temp;
992 }
993 static void destroy(JType *p) { delete p; }
994};
995
996template<typename JType>
997class makeJoin<4, JType, tbb::flow::tag_matching> {
998 typedef typename JType::output_type TType;
999 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1000 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1001 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1002 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1003public:
1004 static JType *create(tbb::flow::graph& g) {
1005 JType *temp = new JType(g,
1006 tag_func<T0>(T0(2)),
1007 tag_func<T1>(T1(3)),
1008 tag_func<T2>(T2(4)),
1009 tag_func<T3>(T3(5))
1010 );
1011 return temp;
1012 }
1013 static void destroy(JType *p) { delete p; }
1014};
1015
1016#endif
1017#if MAX_TUPLE_TEST_SIZE >= 5
1018template<typename JType, typename K, typename KHash>
1019class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > {
1020 typedef typename JType::output_type TType;
1021 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1022 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1023 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1024 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1025 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1026public:
1027 static JType *create(tbb::flow::graph& g) {
1028 JType *temp = new JType(g,
1029 my_struct_key<K, T0>(),
1030 my_struct_key<K, T1>(),
1031 my_struct_key<K, T2>(),
1032 my_struct_key<K, T3>(),
1033 my_struct_key<K, T4>()
1034 );
1035 return temp;
1036 }
1037 static void destroy(JType *p) { delete p; }
1038};
1039
1040template<typename JType>
1041class makeJoin<5, JType, tbb::flow::tag_matching> {
1042 typedef typename JType::output_type TType;
1043 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1044 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1045 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1046 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1047 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1048public:
1049 static JType *create(tbb::flow::graph& g) {
1050 JType *temp = new JType(g,
1051 tag_func<T0>(T0(2)),
1052 tag_func<T1>(T1(3)),
1053 tag_func<T2>(T2(4)),
1054 tag_func<T3>(T3(5)),
1055 tag_func<T4>(T4(6))
1056 );
1057 return temp;
1058 }
1059 static void destroy(JType *p) { delete p; }
1060};
1061#endif
1062#if MAX_TUPLE_TEST_SIZE >= 6
1063template<typename JType, typename K, typename KHash>
1064class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > {
1065 typedef typename JType::output_type TType;
1066 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1067 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1068 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1069 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1070 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1071 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1072public:
1073 static JType *create(tbb::flow::graph& g) {
1074 JType *temp = new JType(g,
1075 my_struct_key<K, T0>(),
1076 my_struct_key<K, T1>(),
1077 my_struct_key<K, T2>(),
1078 my_struct_key<K, T3>(),
1079 my_struct_key<K, T4>(),
1080 my_struct_key<K, T5>()
1081 );
1082 return temp;
1083 }
1084 static void destroy(JType *p) { delete p; }
1085};
1086
1087template<typename JType>
1088class makeJoin<6, JType, tbb::flow::tag_matching> {
1089 typedef typename JType::output_type TType;
1090 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1091 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1092 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1093 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1094 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1095 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1096public:
1097 static JType *create(tbb::flow::graph& g) {
1098 JType *temp = new JType(g,
1099 tag_func<T0>(T0(2)),
1100 tag_func<T1>(T1(3)),
1101 tag_func<T2>(T2(4)),
1102 tag_func<T3>(T3(5)),
1103 tag_func<T4>(T4(6)),
1104 tag_func<T5>(T5(7))
1105 );
1106 return temp;
1107 }
1108 static void destroy(JType *p) { delete p; }
1109};
1110#endif
1111
1112#if MAX_TUPLE_TEST_SIZE >= 7
1113template<typename JType, typename K, typename KHash>
1114class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > {
1115 typedef typename JType::output_type TType;
1116 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1117 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1118 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1119 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1120 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1121 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1122 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1123public:
1124 static JType *create(tbb::flow::graph& g) {
1125 JType *temp = new JType(g,
1126 my_struct_key<K, T0>(),
1127 my_struct_key<K, T1>(),
1128 my_struct_key<K, T2>(),
1129 my_struct_key<K, T3>(),
1130 my_struct_key<K, T4>(),
1131 my_struct_key<K, T5>(),
1132 my_struct_key<K, T6>()
1133 );
1134 return temp;
1135 }
1136 static void destroy(JType *p) { delete p; }
1137};
1138
1139template<typename JType>
1140class makeJoin<7, JType, tbb::flow::tag_matching> {
1141 typedef typename JType::output_type TType;
1142 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1143 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1144 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1145 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1146 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1147 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1148 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1149public:
1150 static JType *create(tbb::flow::graph& g) {
1151 JType *temp = new JType(g,
1152 tag_func<T0>(T0(2)),
1153 tag_func<T1>(T1(3)),
1154 tag_func<T2>(T2(4)),
1155 tag_func<T3>(T3(5)),
1156 tag_func<T4>(T4(6)),
1157 tag_func<T5>(T5(7)),
1158 tag_func<T6>(T6(8))
1159 );
1160 return temp;
1161 }
1162 static void destroy(JType *p) { delete p; }
1163};
1164#endif
1165
1166#if MAX_TUPLE_TEST_SIZE >= 8
1167template<typename JType, typename K, typename KHash>
1168class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > {
1169 typedef typename JType::output_type TType;
1170 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1171 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1172 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1173 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1174 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1175 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1176 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1177 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1178public:
1179 static JType *create(tbb::flow::graph& g) {
1180 JType *temp = new JType(g,
1181 my_struct_key<K, T0>(),
1182 my_struct_key<K, T1>(),
1183 my_struct_key<K, T2>(),
1184 my_struct_key<K, T3>(),
1185 my_struct_key<K, T4>(),
1186 my_struct_key<K, T5>(),
1187 my_struct_key<K, T6>(),
1188 my_struct_key<K, T7>()
1189 );
1190 return temp;
1191 }
1192 static void destroy(JType *p) { delete p; }
1193};
1194
1195template<typename JType>
1196class makeJoin<8, JType, tbb::flow::tag_matching> {
1197 typedef typename JType::output_type TType;
1198 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1199 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1200 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1201 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1202 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1203 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1204 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1205 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1206public:
1207 static JType *create(tbb::flow::graph& g) {
1208 JType *temp = new JType(g,
1209 tag_func<T0>(T0(2)),
1210 tag_func<T1>(T1(3)),
1211 tag_func<T2>(T2(4)),
1212 tag_func<T3>(T3(5)),
1213 tag_func<T4>(T4(6)),
1214 tag_func<T5>(T5(7)),
1215 tag_func<T6>(T6(8)),
1216 tag_func<T7>(T7(9))
1217 );
1218 return temp;
1219 }
1220 static void destroy(JType *p) { delete p; }
1221};
1222#endif
1223
1224#if MAX_TUPLE_TEST_SIZE >= 9
1225template<typename JType, typename K, typename KHash>
1226class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > {
1227 typedef typename JType::output_type TType;
1228 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1229 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1230 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1231 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1232 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1233 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1234 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1235 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1236 typedef typename tbb::flow::tuple_element<8, TType>::type T8;
1237public:
1238 static JType *create(tbb::flow::graph& g) {
1239 JType *temp = new JType(g,
1240 my_struct_key<K, T0>(),
1241 my_struct_key<K, T1>(),
1242 my_struct_key<K, T2>(),
1243 my_struct_key<K, T3>(),
1244 my_struct_key<K, T4>(),
1245 my_struct_key<K, T5>(),
1246 my_struct_key<K, T6>(),
1247 my_struct_key<K, T7>(),
1248 my_struct_key<K, T8>()
1249 );
1250 return temp;
1251 }
1252 static void destroy(JType *p) { delete p; }
1253};
1254
1255template<typename JType>
1256class makeJoin<9, JType, tbb::flow::tag_matching> {
1257 typedef typename JType::output_type TType;
1258 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1259 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1260 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1261 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1262 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1263 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1264 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1265 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1266 typedef typename tbb::flow::tuple_element<8, TType>::type T8;
1267public:
1268 static JType *create(tbb::flow::graph& g) {
1269 JType *temp = new JType(g,
1270 tag_func<T0>(T0(2)),
1271 tag_func<T1>(T1(3)),
1272 tag_func<T2>(T2(4)),
1273 tag_func<T3>(T3(5)),
1274 tag_func<T4>(T4(6)),
1275 tag_func<T5>(T5(7)),
1276 tag_func<T6>(T6(8)),
1277 tag_func<T7>(T7(9)),
1278 tag_func<T8>(T8(10))
1279 );
1280 return temp;
1281 }
1282 static void destroy(JType *p) { delete p; }
1283};
1284#endif
1285
1286#if MAX_TUPLE_TEST_SIZE >= 10
1287template<typename JType, typename K, typename KHash>
1288class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > {
1289 typedef typename JType::output_type TType;
1290 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1291 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1292 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1293 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1294 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1295 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1296 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1297 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1298 typedef typename tbb::flow::tuple_element<8, TType>::type T8;
1299 typedef typename tbb::flow::tuple_element<9, TType>::type T9;
1300public:
1301 static JType *create(tbb::flow::graph& g) {
1302 JType *temp = new JType(g,
1303 my_struct_key<K, T0>(),
1304 my_struct_key<K, T1>(),
1305 my_struct_key<K, T2>(),
1306 my_struct_key<K, T3>(),
1307 my_struct_key<K, T4>(),
1308 my_struct_key<K, T5>(),
1309 my_struct_key<K, T6>(),
1310 my_struct_key<K, T7>(),
1311 my_struct_key<K, T8>(),
1312 my_struct_key<K, T9>()
1313 );
1314 return temp;
1315 }
1316 static void destroy(JType *p) { delete p; }
1317};
1318
1319template<typename JType>
1320class makeJoin<10, JType, tbb::flow::tag_matching> {
1321 typedef typename JType::output_type TType;
1322 typedef typename tbb::flow::tuple_element<0, TType>::type T0;
1323 typedef typename tbb::flow::tuple_element<1, TType>::type T1;
1324 typedef typename tbb::flow::tuple_element<2, TType>::type T2;
1325 typedef typename tbb::flow::tuple_element<3, TType>::type T3;
1326 typedef typename tbb::flow::tuple_element<4, TType>::type T4;
1327 typedef typename tbb::flow::tuple_element<5, TType>::type T5;
1328 typedef typename tbb::flow::tuple_element<6, TType>::type T6;
1329 typedef typename tbb::flow::tuple_element<7, TType>::type T7;
1330 typedef typename tbb::flow::tuple_element<8, TType>::type T8;
1331 typedef typename tbb::flow::tuple_element<9, TType>::type T9;
1332public:
1333 static JType *create(tbb::flow::graph& g) {
1334 JType *temp = new JType(g,
1335 tag_func<T0>(T0(2)),
1336 tag_func<T1>(T1(3)),
1337 tag_func<T2>(T2(4)),
1338 tag_func<T3>(T3(5)),
1339 tag_func<T4>(T4(6)),
1340 tag_func<T5>(T5(7)),
1341 tag_func<T6>(T6(8)),
1342 tag_func<T7>(T7(9)),
1343 tag_func<T8>(T8(10)),
1344 tag_func<T9>(T9(11))
1345 );
1346 return temp;
1347 }
1348 static void destroy(JType *p) { delete p; }
1349};
1350#endif
1351
1352// holder for source_node pointers for eventual deletion
1353
1354static void* all_source_nodes[MaxPorts][MaxNSources];
1355
1356template<int ELEM, typename JNT>
1357class source_node_helper {
1358public:
1359 typedef JNT join_node_type;
1360 typedef tbb::flow::join_node<tbb::flow::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1361 typedef typename join_node_type::output_type TT;
1362 typedef typename tbb::flow::tuple_element<ELEM-1, TT>::type IT;
1363 typedef typename tbb::flow::source_node<IT> my_source_node_type;
1364 typedef typename tbb::flow::function_node<tbb::flow::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1365 static void print_remark(const char * str) {
1366 source_node_helper<ELEM-1, JNT>::print_remark(str);
1367 REMARK(", %s", name_of<IT>::name());
1368 }
1369 static void add_source_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1370 for(int i = 0; i < nInputs; ++i) {
1371 my_source_node_type *new_node = new my_source_node_type(g, source_body<IT, ELEM>(i, nInputs));
1372 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1373 all_source_nodes[ELEM-1][i] = (void *)new_node;
1374 }
1375 // add the next source_node
1376 source_node_helper<ELEM-1, JNT>::add_source_nodes(my_join, g, nInputs);
1377 }
1378
1379 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1380 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1)));
1381 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1382 tbb::flow::make_edge(my_input, *new_node);
1383 all_source_nodes[ELEM-1][0] = (void *)new_node;
1384 source_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g);
1385 }
1386
1387 static void only_check_value(const int i, const TT &v) {
1388 ASSERT(tbb::flow::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), NULL);
1389 source_node_helper<ELEM-1, JNT>::only_check_value(i, v);
1390 }
1391
1392 static void check_value(int i, TT &v, bool is_serial) {
1393 // the fetched value will match only if there is only one source_node.
1394 ASSERT(!is_serial||tbb::flow::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), NULL);
1395 // tally the fetched value.
1396 int ival = (int)tbb::flow::get<ELEM-1>(v);
1397 ASSERT(!(ival%(ELEM+1)), NULL);
1398 ival /= (ELEM+1);
1399 ASSERT(!outputCheck[ELEM-1][ival], NULL);
1400 outputCheck[ELEM-1][ival] = true;
1401 source_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial);
1402 }
1403 static void remove_source_nodes(join_node_type& my_join, int nInputs) {
1404 for(int i = 0; i< nInputs; ++i) {
1405 my_source_node_type *dp = reinterpret_cast<my_source_node_type *>(all_source_nodes[ELEM-1][i]);
1406 tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join));
1407 delete dp;
1408 }
1409 source_node_helper<ELEM-1, JNT>::remove_source_nodes(my_join, nInputs);
1410 }
1411
1412 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1413 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_source_nodes[ELEM-1][0]);
1414 tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join));
1415 tbb::flow::remove_edge(my_input, *fn);
1416 delete fn;
1417 source_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input);
1418 }
1419};
1420
1421template<typename JNT>
1422class source_node_helper<1, JNT> {
1423 typedef JNT join_node_type;
1424 typedef tbb::flow::join_node<tbb::flow::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1425 typedef typename join_node_type::output_type TT;
1426 typedef typename tbb::flow::tuple_element<0, TT>::type IT;
1427 typedef typename tbb::flow::source_node<IT> my_source_node_type;
1428 typedef typename tbb::flow::function_node<tbb::flow::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1429public:
1430 static void print_remark(const char * str) {
1431 REMARK("%s< %s", str, name_of<IT>::name());
1432 }
1433 static void add_source_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1434 for(int i = 0; i < nInputs; ++i) {
1435 my_source_node_type *new_node = new my_source_node_type(g, source_body<IT, 1>(i, nInputs));
1436 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1437 all_source_nodes[0][i] = (void *)new_node;
1438 }
1439 }
1440
1441 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1442 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2)));
1443 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1444 tbb::flow::make_edge(my_input, *new_node);
1445 all_source_nodes[0][0] = (void *)new_node;
1446 }
1447
1448 static void only_check_value(const int i, const TT &v) {
1449 ASSERT(tbb::flow::get<0>(v)==(IT)(i*2), NULL);
1450 }
1451
1452 static void check_value(int i, TT &v, bool is_serial) {
1453 ASSERT(!is_serial||tbb::flow::get<0>(v)==(IT)(i*(2)), NULL);
1454 int ival = (int)tbb::flow::get<0>(v);
1455 ASSERT(!(ival%2), NULL);
1456 ival /= 2;
1457 ASSERT(!outputCheck[0][ival], NULL);
1458 outputCheck[0][ival] = true;
1459 }
1460 static void remove_source_nodes(join_node_type& my_join, int nInputs) {
1461 for(int i = 0; i < nInputs; ++i) {
1462 my_source_node_type *dp = reinterpret_cast<my_source_node_type *>(all_source_nodes[0][i]);
1463 tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join));
1464 delete dp;
1465 }
1466 }
1467
1468 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1469 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_source_nodes[0][0]);
1470 tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join));
1471 tbb::flow::remove_edge(my_input, *fn);
1472 delete fn;
1473 }
1474};
1475
1476#if _MSC_VER && !defined(__INTEL_COMPILER)
1477// Suppress "conditional expression is constant" warning.
1478#pragma warning( push )
1479#pragma warning( disable: 4127 )
1480#endif
1481
1482template<typename JType, class JP>
1483class parallel_test {
1484public:
1485 typedef typename JType::output_type TType;
1486 typedef typename is_key_matching_join<JP>::key_type key_type;
1487 static void test() {
1488 const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
1489 const bool is_key_matching = is_key_matching_join<JP>::value;
1490
1491 TType v;
1492 source_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node");
1493 REMARK(" > ");
1494 if(is_key_matching) {
1495 REMARK("with K == %s", name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1496 if(is_ref<typename is_key_matching_join<JP>::key_type>::value) {
1497 REMARK("&");
1498 }
1499 }
1500 REMARK("\n");
1501 for(int i = 0; i < MaxPorts; ++i) {
1502 for(int j = 0; j < MaxNSources; ++j) {
1503 all_source_nodes[i][j] = NULL;
1504 }
1505 }
1506 for(int nInputs = 1; nInputs<=MaxNSources; ++nInputs) {
1507 tbb::flow::graph g;
1508 bool not_out_of_order = (nInputs==1)&&(!is_key_matching);
1509 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1510 tbb::flow::queue_node<TType> outq1(g);
1511 tbb::flow::queue_node<TType> outq2(g);
1512
1513 tbb::flow::make_edge(*my_join, outq1);
1514 tbb::flow::make_edge(*my_join, outq2);
1515
1516 source_node_helper<TUPLE_SIZE, JType>::add_source_nodes((*my_join), g, nInputs);
1517
1518 g.wait_for_all();
1519
1520 reset_outputCheck(TUPLE_SIZE, Count);
1521 for(int i = 0; i < Count; ++i) {
1522 ASSERT(outq1.try_get(v), NULL);
1523 source_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1524 }
1525
1526 check_outputCheck(TUPLE_SIZE, Count);
1527 reset_outputCheck(TUPLE_SIZE, Count);
1528
1529 for(int i = 0; i < Count; i++) {
1530 ASSERT(outq2.try_get(v), NULL);;
1531 source_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1532 }
1533 check_outputCheck(TUPLE_SIZE, Count);
1534
1535 ASSERT(!outq1.try_get(v), NULL);
1536 ASSERT(!outq2.try_get(v), NULL);
1537
1538 source_node_helper<TUPLE_SIZE, JType>::remove_source_nodes((*my_join), nInputs);
1539 tbb::flow::remove_edge(*my_join, outq1);
1540 tbb::flow::remove_edge(*my_join, outq2);
1541 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1542 }
1543 }
1544};
1545
1546
1547template<int ELEM, typename JType>
1548class serial_queue_helper {
1549public:
1550 typedef typename JType::output_type TT;
1551 typedef typename tbb::flow::tuple_element<ELEM-1, TT>::type IT;
1552 typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1553 static void print_remark() {
1554 serial_queue_helper<ELEM-1, JType>::print_remark();
1555 REMARK(", %s", name_of<IT>::name());
1556 }
1557 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1558 serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join);
1559 my_queue_node_type *new_node = new my_queue_node_type(g);
1560 tbb::flow::make_edge(*new_node, tbb::flow::get<ELEM-1>(my_join.input_ports()));
1561 all_source_nodes[ELEM-1][0] = (void *)new_node;
1562 }
1563
1564 static void fill_one_queue(int maxVal) {
1565 // fill queue to "left" of me
1566 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[ELEM-1][0]);
1567 serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal);
1568 for(int i = 0; i < maxVal; ++i) {
1569 ASSERT(qptr->try_put(make_thingie<IT, ELEM>()(i)), NULL);
1570 }
1571 }
1572
1573 static void put_one_queue_val(int myVal) {
1574 // put this val to my "left".
1575 serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal);
1576 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[ELEM-1][0]);
1577 ASSERT(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), NULL);
1578 }
1579
1580 static void check_queue_value(int i, TT &v) {
1581 serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v);
1582 ASSERT(cast_from<IT>::my_int_val(tbb::flow::get<ELEM-1>(v))==i * (ELEM+1), NULL);
1583 }
1584
1585 static void remove_queue_nodes(JType &my_join) {
1586 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[ELEM-1][0]);
1587 tbb::flow::remove_edge(*vptr, tbb::flow::get<ELEM-1>(my_join.input_ports()));
1588 serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join);
1589 delete vptr;
1590 }
1591};
1592
1593template<typename JType>
1594class serial_queue_helper<1, JType> {
1595public:
1596 typedef typename JType::output_type TT;
1597 typedef typename tbb::flow::tuple_element<0, TT>::type IT;
1598 typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1599 static void print_remark() {
1600 REMARK("Serial test of join_node< %s", name_of<IT>::name());
1601 }
1602
1603 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1604 my_queue_node_type *new_node = new my_queue_node_type(g);
1605 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1606 all_source_nodes[0][0] = (void *)new_node;
1607 }
1608
1609 static void fill_one_queue(int maxVal) {
1610 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[0][0]);
1611 for(int i = 0; i < maxVal; ++i) {
1612 ASSERT(qptr->try_put(make_thingie<IT, 1>()(i)), NULL);
1613 }
1614 }
1615
1616 static void put_one_queue_val(int myVal) {
1617 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[0][0]);
1618 IT my_val = make_thingie<IT, 1>()(myVal);
1619 ASSERT(qptr->try_put(my_val), NULL);
1620 }
1621
1622 static void check_queue_value(int i, TT &v) {
1623 ASSERT(cast_from<IT>::my_int_val(tbb::flow::get<0>(v))==i*2, NULL);
1624 }
1625
1626 static void remove_queue_nodes(JType &my_join) {
1627 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_source_nodes[0][0]);
1628 tbb::flow::remove_edge(*vptr, tbb::flow::get<0>(my_join.input_ports()));
1629 delete vptr;
1630 }
1631};
1632
1633//
1634// Single reservable predecessor at each port, single accepting successor
1635// * put to buffer before port0, then put to buffer before port1, ...
1636// * fill buffer before port0 then fill buffer before port1, ...
1637
1638template<typename JType, class JP>
1639void test_one_serial(JType &my_join, tbb::flow::graph &g) {
1640 typedef typename JType::output_type TType;
1641 static const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
1642 bool is_key_matching = is_key_matching_join<JP>::value;
1643 std::vector<bool> flags;
1644 serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join);
1645 typedef TType q3_input_type;
1646 tbb::flow::queue_node< q3_input_type > q3(g);
1647
1648 tbb::flow::make_edge(my_join, q3);
1649
1650 // fill each queue with its value one-at-a-time
1651 flags.clear();
1652 for(int i = 0; i < Count; ++i) {
1653 serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i);
1654 flags.push_back(false);
1655 }
1656
1657 g.wait_for_all();
1658 for(int i = 0; i < Count; ++i) {
1659 q3_input_type v;
1660 g.wait_for_all();
1661 ASSERT(q3.try_get(v), "Error in try_get()");
1662 if(is_key_matching) {
1663 // because we look up tags in the hash table, the output may be out of order.
1664 int j = int(tbb::flow::get<0>(v))/2; // figure what the index should be
1665 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1666 flags[j] = true;
1667 }
1668 else {
1669 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1670 }
1671 }
1672
1673 if(is_key_matching) {
1674 for(int i = 0; i < Count; ++i) {
1675 ASSERT(flags[i], NULL);
1676 flags[i] = false;
1677 }
1678 }
1679
1680 // fill each queue completely before filling the next.
1681 serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count);
1682
1683 g.wait_for_all();
1684 for(int i = 0; i < Count; ++i) {
1685 q3_input_type v;
1686 g.wait_for_all();
1687 ASSERT(q3.try_get(v), "Error in try_get()");
1688 if(is_key_matching) {
1689 int j = int(tbb::flow::get<0>(v))/2;
1690 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1691 flags[i] = true;
1692 }
1693 else {
1694 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1695 }
1696 }
1697
1698 if(is_key_matching) {
1699 for(int i = 0; i < Count; ++i) {
1700 ASSERT(flags[i], NULL);
1701 }
1702 }
1703
1704 serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join);
1705
1706}
1707
1708template<typename JType, class JP>
1709class serial_test {
1710 typedef typename JType::output_type TType;
1711public:
1712 static void test() {
1713 tbb::flow::graph g;
1714 std::vector<bool> flags;
1715 bool is_key_matching = is_key_matching_join<JP>::value;
1716 flags.reserve(Count);
1717
1718 const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
1719 static const int ELEMS = 3;
1720
1721 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1722 test_input_ports_return_ref(*my_join);
1723 serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); REMARK(" >");
1724 if(is_key_matching) {
1725 REMARK("with K == %s", name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1726 if(is_ref<typename is_key_matching_join<JP>::key_type>::value) {
1727 REMARK("&");
1728 }
1729 }
1730 REMARK("\n");
1731
1732 test_one_serial<JType, JP>(*my_join, g);
1733 // build the vector with copy construction from the used join node.
1734 std::vector<JType>join_vector(ELEMS, *my_join);
1735 // destroy the tired old join_node in case we're accidentally reusing pieces of it.
1736 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1737
1738 for(int e = 0; e < ELEMS; ++e) { // exercise each of the vector elements
1739 test_one_serial<JType, JP>(join_vector[e], g);
1740 }
1741 }
1742
1743}; // serial_test
1744
1745#if _MSC_VER && !defined(__INTEL_COMPILER)
1746#pragma warning( pop )
1747#endif
1748
1749template<
1750 template<typename, class > class TestType, // serial_test or parallel_test
1751 typename OutputTupleType, // type of the output of the join
1752 class J> // graph_buffer_policy (reserving, queueing, tag_matching or key_matching)
1753 class generate_test {
1754 public:
1755 typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type;
1756 static void do_test() {
1757 TestType<join_node_type, J>::test();
1758 }
1759};
1760
1761template<class JP>
1762void test_input_port_policies();
1763
1764// join_node (reserving) does not consume inputs until an item is available at
1765// every input. It tries to reserve each input, and if any fails it releases the
1766// reservation. When it builds a tuple it broadcasts to all its successors and
1767// consumes all the inputs.
1768//
1769// So our test will put an item at one input port, then attach another node to the
1770// same node (a queue node in this case). The second successor should receive the
1771// item in the queue, emptying it.
1772//
1773// We then place an item in the second input queue, and check the output queues; they
1774// should still be empty. Then we place an item in the first queue; the output queues
1775// should then receive a tuple.
1776//
1777// we then attach another function node to the second input. It should not receive
1778// an item, verifying that the item in the queue is consumed.
1779template<>
1780void test_input_port_policies<tbb::flow::reserving>() {
1781 tbb::flow::graph g;
1782 typedef tbb::flow::join_node<tbb::flow::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy
1783 // create join_node<type0,type1> jn
1784 JType jn(g);
1785 // create output_queue oq0, oq1
1786 typedef JType::output_type OQType;
1787 tbb::flow::queue_node<OQType> oq0(g);
1788 tbb::flow::queue_node<OQType> oq1(g);
1789 // create iq0, iq1
1790 typedef tbb::flow::queue_node<int> IQType;
1791 IQType iq0(g);
1792 IQType iq1(g);
1793 // create qnp, qnq
1794 IQType qnp(g);
1795 IQType qnq(g);
1796 REMARK("Testing policies of join_node<reserving> input ports\n");
1797 // attach jn to oq0, oq1
1798 tbb::flow::make_edge(jn, oq0);
1799 tbb::flow::make_edge(jn, oq1);
1800#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1801 ASSERT(jn.successor_count()==2, NULL);
1802 JType::successor_list_type my_succs;
1803 jn.copy_successors(my_succs);
1804 ASSERT(my_succs.size()==2, NULL);
1805#endif
1806 // attach iq0, iq1 to jn
1807 tbb::flow::make_edge(iq0, tbb::flow::get<0>(jn.input_ports()));
1808 tbb::flow::make_edge(iq1, tbb::flow::get<1>(jn.input_ports()));
1809#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1810 ASSERT(tbb::flow::get<0>(jn.input_ports()).predecessor_count()==1, NULL);
1811 tbb::flow::tuple_element<0, JType::input_type>::type::predecessor_list_type my_0preds;
1812 tbb::flow::input_port<0>(jn).copy_predecessors(my_0preds);
1813 ASSERT(my_0preds.size()==1, NULL);
1814#endif
1815 for(int loop = 0; loop < 3; ++loop) {
1816 // place one item in iq0
1817 ASSERT(iq0.try_put(1), "Error putting to iq1");
1818 // attach iq0 to qnp
1819 tbb::flow::make_edge(iq0, qnp);
1820 // qnp should have an item in it.
1821 g.wait_for_all();
1822 {
1823 int i;
1824 ASSERT(qnp.try_get(i)&&i==1, "Error in item fetched by qnp");
1825 }
1826 // place item in iq1
1827 ASSERT(iq1.try_put(2), "Error putting to iq1");
1828 // oq0, oq1 should be empty
1829 g.wait_for_all();
1830 {
1831 OQType t1;
1832 ASSERT(!oq0.try_get(t1)&&!oq1.try_get(t1), "oq0 and oq1 not empty");
1833 }
1834 // detach qnp from iq0
1835 tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0
1836 // place item in iq0
1837 ASSERT(iq0.try_put(3), "Error on second put to iq0");
1838 // oq0, oq1 should have items in them
1839 g.wait_for_all();
1840 {
1841 OQType t0;
1842 OQType t1;
1843 ASSERT(oq0.try_get(t0)&&tbb::flow::get<0>(t0)==3&&tbb::flow::get<1>(t0)==2, "Error in oq0 output");
1844 ASSERT(oq1.try_get(t1)&&tbb::flow::get<0>(t1)==3&&tbb::flow::get<1>(t1)==2, "Error in oq1 output");
1845 }
1846 // attach qnp to iq0, qnq to iq1
1847 // qnp and qnq should be empty
1848 tbb::flow::make_edge(iq0, qnp);
1849 tbb::flow::make_edge(iq1, qnq);
1850 g.wait_for_all();
1851 {
1852 int i;
1853 ASSERT(!qnp.try_get(i), "iq0 still had value in it");
1854 ASSERT(!qnq.try_get(i), "iq1 still had value in it");
1855 }
1856 tbb::flow::remove_edge(iq0, qnp);
1857 tbb::flow::remove_edge(iq1, qnq);
1858 } // for ( int loop ...
1859}
1860
1861// join_node (queueing) consumes inputs as soon as they are available at
1862// any input. When it builds a tuple it broadcasts to all its successors and
1863// discards the broadcast values.
1864//
1865// So our test will put an item at one input port, then attach another node to the
1866// same node (a queue node in this case). The second successor should not receive
1867// an item (because the join consumed it).
1868//
1869// We then place an item in the second input queue, and check the output queues; they
1870// should each have a tuple.
1871//
1872// we then attach another function node to the second input. It should not receive
1873// an item, verifying that the item in the queue is consumed.
1874template<>
1875void test_input_port_policies<tbb::flow::queueing>() {
1876 tbb::flow::graph g;
1877 typedef tbb::flow::join_node<tbb::flow::tuple<int, int>, tbb::flow::queueing > JType;
1878 // create join_node<type0,type1> jn
1879 JType jn(g);
1880 // create output_queue oq0, oq1
1881 typedef JType::output_type OQType;
1882 tbb::flow::queue_node<OQType> oq0(g);
1883 tbb::flow::queue_node<OQType> oq1(g);
1884 // create iq0, iq1
1885 typedef tbb::flow::queue_node<int> IQType;
1886 IQType iq0(g);
1887 IQType iq1(g);
1888 // create qnp, qnq
1889 IQType qnp(g);
1890 IQType qnq(g);
1891 REMARK("Testing policies of join_node<queueing> input ports\n");
1892 // attach jn to oq0, oq1
1893 tbb::flow::make_edge(jn, oq0);
1894 tbb::flow::make_edge(jn, oq1);
1895#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1896 ASSERT(jn.successor_count()==2, NULL);
1897 JType::successor_list_type my_succs;
1898 jn.copy_successors(my_succs);
1899 ASSERT(my_succs.size()==2, NULL);
1900#endif
1901 // attach iq0, iq1 to jn
1902 tbb::flow::make_edge(iq0, tbb::flow::get<0>(jn.input_ports()));
1903 tbb::flow::make_edge(iq1, tbb::flow::get<1>(jn.input_ports()));
1904#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1905 ASSERT(tbb::flow::get<0>(jn.input_ports()).predecessor_count()==1, NULL);
1906 tbb::flow::tuple_element<0, JType::input_type>::type::predecessor_list_type my_0preds;
1907 tbb::flow::input_port<0>(jn).copy_predecessors(my_0preds);
1908 ASSERT(my_0preds.size()==1, NULL);
1909#endif
1910 for(int loop = 0; loop < 3; ++loop) {
1911 // place one item in iq0
1912 ASSERT(iq0.try_put(1), "Error putting to iq1");
1913 // attach iq0 to qnp
1914 tbb::flow::make_edge(iq0, qnp);
1915 // qnp should have an item in it.
1916 g.wait_for_all();
1917 {
1918 int i;
1919 ASSERT(!qnp.try_get(i), "Item was received by qnp");
1920 }
1921 // place item in iq1
1922 ASSERT(iq1.try_put(2), "Error putting to iq1");
1923 // oq0, oq1 should have items
1924 g.wait_for_all();
1925 {
1926 OQType t0;
1927 OQType t1;
1928 ASSERT(oq0.try_get(t0)&&tbb::flow::get<0>(t0)==1&&tbb::flow::get<1>(t0)==2, "Error in oq0 output");
1929 ASSERT(oq1.try_get(t1)&&tbb::flow::get<0>(t1)==1&&tbb::flow::get<1>(t1)==2, "Error in oq1 output");
1930 }
1931 // attach qnq to iq1
1932 // qnp and qnq should be empty
1933 tbb::flow::make_edge(iq1, qnq);
1934 g.wait_for_all();
1935 {
1936 int i;
1937 ASSERT(!qnp.try_get(i), "iq0 still had value in it");
1938 ASSERT(!qnq.try_get(i), "iq1 still had value in it");
1939 }
1940 tbb::flow::remove_edge(iq0, qnp);
1941 tbb::flow::remove_edge(iq1, qnq);
1942 } // for ( int loop ...
1943}
1944
1945template<typename T>
1946struct myTagValue {
1947 tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); }
1948};
1949
1950template<>
1951struct myTagValue<check_type<int> > {
1952 tbb::flow::tag_value operator()(check_type<int> i) { return tbb::flow::tag_value((int)i); }
1953};
1954
1955// join_node (tag_matching) consumes inputs as soon as they are available at
1956// any input. When it builds a tuple it broadcasts to all its successors and
1957// discards the broadcast values.
1958//
1959// It chooses the tuple it broadcasts by matching the tag values returned by the
1960// methods given the constructor of the join, in this case the method just casts
1961// the value in each port to tag_value.
1962//
1963// So our test will put an item at one input port, then attach another node to the
1964// same node (a queue node in this case). The second successor should not receive
1965// an item (because the join consumed it).
1966//
1967// We then place an item in the second input queue, and check the output queues; they
1968// should each have a tuple.
1969//
1970// we then attach another queue node to the second input. It should not receive
1971// an item, verifying that the item in the queue is consumed.
1972//
1973// We will then exercise the join with a bunch of values, and the output order should
1974// be determined by the order we insert items into the second queue. (Each tuple set
1975// corresponding to a tag will be complete when the second item is inserted.)
1976template<>
1977void test_input_port_policies<tbb::flow::tag_matching>() {
1978 tbb::flow::graph g;
1979 typedef tbb::flow::join_node<tbb::flow::tuple<int, check_type<int> >, tbb::flow::tag_matching > JoinNodeType;
1980 typedef JoinNodeType::output_type CheckTupleType;
1981 JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<check_type<int> >());
1982 tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g);
1983 tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g);
1984 {
1985 Check<check_type<int> > my_check;
1986
1987
1988 typedef tbb::flow::queue_node<int> IntQueueType;
1989 typedef tbb::flow::queue_node<check_type<int> > CheckQueueType;
1990 IntQueueType intInputQueue(g);
1991 CheckQueueType checkInputQueue(g);
1992 IntQueueType intEmptyTestQueue(g);
1993 CheckQueueType checkEmptyTestQueue(g);
1994 REMARK("Testing policies of join_node<tag_matching> input ports\n");
1995 // attach testJoinNode to checkTupleQueue0, checkTupleQueue1
1996 tbb::flow::make_edge(testJoinNode, checkTupleQueue0);
1997 tbb::flow::make_edge(testJoinNode, checkTupleQueue1);
1998#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1999 ASSERT(testJoinNode.successor_count()==2, NULL);
2000 JoinNodeType::successor_list_type my_succs;
2001 testJoinNode.copy_successors(my_succs);
2002 ASSERT(my_succs.size()==2, NULL);
2003#endif
2004 // attach intInputQueue, checkInputQueue to testJoinNode
2005 tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode));
2006 tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode));
2007#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2008 ASSERT(tbb::flow::input_port<0>(testJoinNode).predecessor_count()==1, NULL);
2009 tbb::flow::tuple_element<0, JoinNodeType::input_type>::type::predecessor_list_type my_0preds;
2010 tbb::flow::input_port<0>(testJoinNode).copy_predecessors(my_0preds);
2011 ASSERT(my_0preds.size()==1, NULL);
2012#endif
2013
2014 // we'll put four discrete values in the inputs to the join_node. Each
2015 // set of inputs should result in one output.
2016 for(int loop = 0; loop < 4; ++loop) {
2017 // place one item in intInputQueue
2018 ASSERT(intInputQueue.try_put(loop), "Error putting to intInputQueue");
2019 // attach intInputQueue to intEmptyTestQueue
2020 tbb::flow::make_edge(intInputQueue, intEmptyTestQueue);
2021 // intEmptyTestQueue should not have an item in it. (the join consumed it.)
2022 g.wait_for_all();
2023 {
2024 int intVal0;
2025 ASSERT(!intEmptyTestQueue.try_get(intVal0), "Item was received by intEmptyTestQueue");
2026 }
2027 // place item in checkInputQueue
2028 check_type<int> checkVal0(loop);
2029 ASSERT(checkInputQueue.try_put(checkVal0), "Error putting to checkInputQueue");
2030 // checkTupleQueue0, checkTupleQueue1 should have items
2031 g.wait_for_all();
2032 {
2033 CheckTupleType t0;
2034 CheckTupleType t1;
2035 ASSERT(checkTupleQueue0.try_get(t0)&&tbb::flow::get<0>(t0)==loop&&(int)tbb::flow::get<1>(t0)==loop, "Error in checkTupleQueue0 output");
2036 ASSERT(checkTupleQueue1.try_get(t1)&&tbb::flow::get<0>(t1)==loop&&(int)tbb::flow::get<1>(t1)==loop, "Error in checkTupleQueue1 output");
2037 ASSERT(!checkTupleQueue0.try_get(t0), "extra object in output queue checkTupleQueue0");
2038 ASSERT(!checkTupleQueue1.try_get(t0), "extra object in output queue checkTupleQueue1");
2039 }
2040 // attach checkEmptyTestQueue to checkInputQueue
2041 // intEmptyTestQueue and checkEmptyTestQueue should be empty
2042 tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue);
2043 g.wait_for_all();
2044 {
2045 int intVal1;
2046 check_type<int> checkVal1;
2047 ASSERT(!intEmptyTestQueue.try_get(intVal1), "intInputQueue still had value in it");
2048 ASSERT(!checkEmptyTestQueue.try_get(checkVal1), "checkInputQueue still had value in it");
2049 }
2050 tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue);
2051 tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue);
2052 } // for ( int loop ...
2053
2054 // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in
2055 // a different order. We should see tuples in the output queues in the order we inserted
2056 // the integers into checkInputQueue.
2057 const int nValues = 100;
2058 const int nIncr = 31; // relatively prime to nValues
2059
2060 for(int loop = 4; loop < 4+nValues; ++loop) {
2061 // place one item in intInputQueue
2062 ASSERT(intInputQueue.try_put(loop), "Error putting to intInputQueue");
2063 g.wait_for_all();
2064 {
2065 CheckTupleType t3;
2066 ASSERT(!checkTupleQueue0.try_get(t3), "Object in output queue");
2067 ASSERT(!checkTupleQueue1.try_get(t3), "Object in output queue");
2068 }
2069 } // for ( int loop ...
2070
2071 for(int loop = 1; loop<=nValues; ++loop) {
2072 int lp1 = 4+(loop * nIncr)%nValues;
2073 // place item in checkInputQueue
2074 ASSERT(checkInputQueue.try_put(lp1), "Error putting to checkInputQueue");
2075 // checkTupleQueue0, checkTupleQueue1 should have items
2076 g.wait_for_all();
2077 {
2078 CheckTupleType t0;
2079 CheckTupleType t1;
2080 ASSERT(checkTupleQueue0.try_get(t0)&&tbb::flow::get<0>(t0)==lp1 && tbb::flow::get<1>(t0)==lp1, "Error in checkTupleQueue0 output");
2081 ASSERT(checkTupleQueue1.try_get(t1)&&tbb::flow::get<0>(t1)==lp1 && tbb::flow::get<1>(t1)==lp1, "Error in checkTupleQueue1 output");
2082 ASSERT(!checkTupleQueue0.try_get(t0), "extra object in output queue checkTupleQueue0");
2083 ASSERT(!checkTupleQueue1.try_get(t0), "extra object in output queue checkTupleQueue1");
2084 }
2085 } // for ( int loop ...
2086 } // Check
2087}
2088
2089template<typename Policy> struct policy_name {};
2090
2091template<> struct policy_name<tbb::flow::queueing> {
2092const char* msg_beg() { return "queueing\n";}
2093const char* msg_end() { return "test queueing extract\n";}
2094};
2095
2096template<> struct policy_name<tbb::flow::reserving> {
2097const char* msg_beg() { return "reserving\n";}
2098const char* msg_end() { return "test reserving extract\n";}
2099};
2100
2101template<> struct policy_name<tbb::flow::tag_matching> {
2102const char* msg_beg() { return "tag_matching\n";}
2103const char* msg_end() { return "test tag_matching extract\n";}
2104};
2105
2106template<typename Policy>
2107void test_main() {
2108 test_input_port_policies<Policy>();
2109 for(int p = 0; p < 2; ++p) {
2110 REMARK(policy_name<Policy>().msg_beg());
2111 generate_test<serial_test, tbb::flow::tuple<threebyte, double>, Policy>::do_test();
2112#if MAX_TUPLE_TEST_SIZE >= 4
2113 {
2114 Check<check_type<int> > my_check;
2115 generate_test<serial_test, tbb::flow::tuple<float, double, check_type<int>, long>, Policy>::do_test();
2116 }
2117#endif
2118#if MAX_TUPLE_TEST_SIZE >= 6
2119 generate_test<serial_test, tbb::flow::tuple<double, double, int, long, int, short>, Policy>::do_test();
2120#endif
2121#if MAX_TUPLE_TEST_SIZE >= 8
2122 generate_test<serial_test, tbb::flow::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test();
2123#endif
2124#if MAX_TUPLE_TEST_SIZE >= 10
2125 generate_test<serial_test, tbb::flow::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test();
2126#endif
2127 {
2128 Check<check_type<int> > my_check1;
2129 generate_test<parallel_test, tbb::flow::tuple<float, check_type<int> >, Policy>::do_test();
2130 }
2131#if MAX_TUPLE_TEST_SIZE >= 3
2132 generate_test<parallel_test, tbb::flow::tuple<float, int, long>, Policy>::do_test();
2133#endif
2134#if MAX_TUPLE_TEST_SIZE >= 5
2135 generate_test<parallel_test, tbb::flow::tuple<double, double, int, int, short>, Policy>::do_test();
2136#endif
2137#if MAX_TUPLE_TEST_SIZE >= 7
2138 generate_test<parallel_test, tbb::flow::tuple<float, int, double, float, long, float, long>, Policy>::do_test();
2139#endif
2140#if MAX_TUPLE_TEST_SIZE >= 9
2141 generate_test<parallel_test, tbb::flow::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test();
2142#endif
2143 }
2144
2145#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2146 REMARK(policy_name<Policy>().msg_end());
2147 test_join_extract<int, tbb::flow::join_node< tbb::flow::tuple<int, int>, Policy> >().run_tests();
2148#endif
2149}
2150
2151#endif /* tbb_test_join_node_H */
2152