1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#if __TBB_CPF_BUILD
18#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19#endif
20
21#include "test_join_node.h"
22
23static tbb::atomic<int> output_count;
24
25// get the tag from the output tuple and emit it.
26// the first tuple component is tag * 2 cast to the type
27template<typename OutputTupleType>
28class recirc_output_func_body {
29public:
30 // we only need this to use source_node_helper
31 typedef typename tbb::flow::join_node<OutputTupleType, tbb::flow::tag_matching> join_node_type;
32 static const int N = tbb::flow::tuple_size<OutputTupleType>::value;
33 int operator()(const OutputTupleType &v) {
34 int out = int(tbb::flow::get<0>(v))/2;
35 source_node_helper<N, join_node_type>::only_check_value(out, v);
36 ++output_count;
37 return out;
38 }
39};
40
41template<typename JType>
42class tag_recirculation_test {
43public:
44 typedef typename JType::output_type TType;
45 typedef typename tbb::flow::tuple<int, tbb::flow::continue_msg> input_tuple_type;
46 typedef tbb::flow::join_node<input_tuple_type, tbb::flow::reserving> input_join_type;
47 static const int N = tbb::flow::tuple_size<TType>::value;
48 static void test() {
49 source_node_helper<N, JType>::print_remark("Recirculation test of tag-matching join");
50 REMARK(" >\n");
51 for(int maxTag = 1; maxTag <10; maxTag *= 3) {
52 for(int i = 0; i < N; ++i) all_source_nodes[i][0] = NULL;
53
54 tbb::flow::graph g;
55 // this is the tag-matching join we're testing
56 JType * my_join = makeJoin<N, JType, tbb::flow::tag_matching>::create(g);
57 // source_node for continue messages
58 tbb::flow::source_node<tbb::flow::continue_msg> snode(g, recirc_source_node_body(), false);
59 // reserving join that matches recirculating tags with continue messages.
60 input_join_type * my_input_join = makeJoin<2, input_join_type, tbb::flow::reserving>::create(g);
61 // tbb::flow::make_edge(snode, tbb::flow::input_port<1>(*my_input_join));
62 tbb::flow::make_edge(snode, tbb::flow::get<1>(my_input_join->input_ports()));
63 // queue to hold the tags
64 tbb::flow::queue_node<int> tag_queue(g);
65 tbb::flow::make_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
66 // add all the function_nodes that are inputs to the tag-matching join
67 source_node_helper<N, JType>::add_recirc_func_nodes(*my_join, *my_input_join, g);
68 // add the function_node that accepts the output of the join and emits the int tag it was based on
69 tbb::flow::function_node<TType, int> recreate_tag(g, tbb::flow::unlimited, recirc_output_func_body<TType>());
70 tbb::flow::make_edge(*my_join, recreate_tag);
71 // now the recirculating part (output back to the queue)
72 tbb::flow::make_edge(recreate_tag, tag_queue);
73
74 // put the tags into the queue
75 for(int t = 1; t<=maxTag; ++t) tag_queue.try_put(t);
76
77 input_count = Recirc_count;
78 output_count = 0;
79
80 // start up the source node to get things going
81 snode.activate();
82
83 // wait for everything to stop
84 g.wait_for_all();
85
86 ASSERT(output_count==Recirc_count, "not all instances were received");
87
88 int j;
89 // grab the tags from the queue, record them
90 std::vector<bool> out_tally(maxTag, false);
91 for(int i = 0; i < maxTag; ++i) {
92 ASSERT(tag_queue.try_get(j), "not enough tags in queue");
93 ASSERT(!out_tally.at(j-1), "duplicate tag from queue");
94 out_tally[j-1] = true;
95 }
96 ASSERT(!tag_queue.try_get(j), "Extra tags in recirculation queue");
97
98 // deconstruct graph
99 source_node_helper<N, JType>::remove_recirc_func_nodes(*my_join, *my_input_join);
100 tbb::flow::remove_edge(*my_join, recreate_tag);
101 makeJoin<N, JType, tbb::flow::tag_matching>::destroy(my_join);
102 tbb::flow::remove_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
103 tbb::flow::remove_edge(snode, tbb::flow::input_port<1>(*my_input_join));
104 makeJoin<2, input_join_type, tbb::flow::reserving>::destroy(my_input_join);
105 }
106 }
107};
108
109template<typename JType>
110class generate_recirc_test {
111public:
112 typedef tbb::flow::join_node<JType, tbb::flow::tag_matching> join_node_type;
113 static void do_test() {
114 tag_recirculation_test<join_node_type>::test();
115 }
116};
117
118int TestMain() {
119#if __TBB_USE_TBB_TUPLE
120 REMARK(" Using TBB tuple\n");
121#else
122 REMARK(" Using platform tuple\n");
123#endif
124
125 TestTaggedBuffers();
126 test_main<tbb::flow::queueing>();
127 test_main<tbb::flow::reserving>();
128 test_main<tbb::flow::tag_matching>();
129 return Harness::Done;
130}
131