1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17// have to expose the reset_node method to be able to reset a function_body
18#include "harness.h"
19
20#if __TBB_CPF_BUILD
21#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
22#endif
23
24#include "harness_graph.h"
25#include "tbb/flow_graph.h"
26#include "tbb/task.h"
27#include "tbb/task_scheduler_init.h"
28
29const int N = 1000;
30
31template< typename T >
32class test_push_receiver : public tbb::flow::receiver<T>, NoAssign {
33
34 tbb::atomic<int> my_counters[N];
35 tbb::flow::graph& my_graph;
36
37public:
38
39 test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
40 for (int i = 0; i < N; ++i )
41 my_counters[i] = 0;
42 }
43
44 int get_count( int i ) {
45 int v = my_counters[i];
46 return v;
47 }
48
49 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
50
51#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
52 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
53 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
54 built_predecessors_type bpt;
55 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
56 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
57 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
58 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
59 size_t predecessor_count() __TBB_override { return 0; }
60#endif
61
62 tbb::task *try_put_task( const T &v ) __TBB_override {
63 int i = (int)v;
64 ++my_counters[i];
65 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
66 }
67
68 tbb::flow::graph& graph_reference() __TBB_override {
69 return my_graph;
70 }
71
72 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {}
73};
74
75template< typename T >
76class source_body {
77
78 tbb::atomic<int> my_count;
79 int *ninvocations;
80
81public:
82
83 source_body() : ninvocations(NULL) { my_count = 0; }
84 source_body(int &_inv) : ninvocations(&_inv) { my_count = 0; }
85
86 bool operator()( T &v ) {
87 v = (T)my_count.fetch_and_increment();
88 if(ninvocations) ++(*ninvocations);
89 if ( (int)v < N )
90 return true;
91 else
92 return false;
93 }
94
95};
96
97template< typename T >
98class function_body {
99
100 tbb::atomic<int> *my_counters;
101
102public:
103
104 function_body( tbb::atomic<int> *counters ) : my_counters(counters) {
105 for (int i = 0; i < N; ++i )
106 my_counters[i] = 0;
107 }
108
109 bool operator()( T v ) {
110 ++my_counters[(int)v];
111 return true;
112 }
113
114};
115
116template< typename T >
117void test_single_dest() {
118
119 // push only
120 tbb::flow::graph g;
121 tbb::flow::source_node<T> src(g, source_body<T>() );
122 test_push_receiver<T> dest(g);
123 tbb::flow::make_edge( src, dest );
124 g.wait_for_all();
125 for (int i = 0; i < N; ++i ) {
126 ASSERT( dest.get_count(i) == 1, NULL );
127 }
128
129 // push only
130 tbb::atomic<int> counters3[N];
131 tbb::flow::source_node<T> src3(g, source_body<T>() );
132 function_body<T> b3( counters3 );
133 tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
134 tbb::flow::make_edge( src3, dest3 );
135 g.wait_for_all();
136 for (int i = 0; i < N; ++i ) {
137 int v = counters3[i];
138 ASSERT( v == 1, NULL );
139 }
140
141 // push & pull
142 tbb::flow::source_node<T> src2(g, source_body<T>() );
143 tbb::atomic<int> counters2[N];
144 function_body<T> b2( counters2 );
145 tbb::flow::function_node<T,bool> dest2(g, tbb::flow::serial, b2 );
146 tbb::flow::make_edge( src2, dest2 );
147#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
148 ASSERT(src2.successor_count() == 1, NULL);
149 typename tbb::flow::source_node<T>::successor_list_type my_succs;
150 src2.copy_successors(my_succs);
151 ASSERT(my_succs.size() == 1, NULL);
152#endif
153 g.wait_for_all();
154 for (int i = 0; i < N; ++i ) {
155 int v = counters2[i];
156 ASSERT( v == 1, NULL );
157 }
158
159 // test copy constructor
160 tbb::flow::source_node<T> src_copy(src);
161 test_push_receiver<T> dest_c(g);
162 ASSERT( src_copy.register_successor(dest_c), NULL );
163 g.wait_for_all();
164 for (int i = 0; i < N; ++i ) {
165 ASSERT( dest_c.get_count(i) == 1, NULL );
166 }
167}
168
169void test_reset() {
170 // source_node -> function_node
171 tbb::flow::graph g;
172 tbb::atomic<int> counters3[N];
173 tbb::flow::source_node<int> src3(g, source_body<int>() );
174 tbb::flow::source_node<int> src_inactive(g, source_body<int>(), /*active*/ false );
175 function_body<int> b3( counters3 );
176 tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3 );
177 tbb::flow::make_edge( src3, dest3 );
178 // source_node already in active state. Let the graph run,
179 g.wait_for_all();
180 // check the array for each value.
181 for (int i = 0; i < N; ++i ) {
182 int v = counters3[i];
183 ASSERT( v == 1, NULL );
184 counters3[i] = 0;
185 }
186 g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts.
187 // and spawns task to run source
188 g.wait_for_all();
189 // check output queue again. Should be the same contents.
190 for (int i = 0; i < N; ++i ) {
191 int v = counters3[i];
192 ASSERT( v == 1, NULL );
193 counters3[i] = 0;
194 }
195 g.reset(); // doesn't reset the source_node_body to initial state, but does spawn a task
196 // to run the source_node.
197
198 g.wait_for_all();
199 // array should be all zero
200 for (int i = 0; i < N; ++i ) {
201 int v = counters3[i];
202 ASSERT( v == 0, NULL );
203 }
204
205 remove_edge(src3, dest3);
206 make_edge(src_inactive, dest3);
207
208 // src_inactive doesn't run
209 g.wait_for_all();
210 for (int i = 0; i < N; ++i ) {
211 int v = counters3[i];
212 ASSERT( v == 0, NULL );
213 }
214
215 // run graph
216 src_inactive.activate();
217 g.wait_for_all();
218 // check output
219 for (int i = 0; i < N; ++i ) {
220 int v = counters3[i];
221 ASSERT( v == 1, NULL );
222 counters3[i] = 0;
223 }
224 g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts
225 // src_inactive doesn't run
226 g.wait_for_all();
227 for (int i = 0; i < N; ++i ) {
228 int v = counters3[i];
229 ASSERT( v == 0, NULL );
230 }
231
232 // start it up
233 src_inactive.activate();
234 g.wait_for_all();
235 for (int i = 0; i < N; ++i ) {
236 int v = counters3[i];
237 ASSERT( v == 1, NULL );
238 counters3[i] = 0;
239 }
240 g.reset(); // doesn't reset the source_node_body to initial state, and doesn't
241 // spawn a task to run the source_node.
242
243 g.wait_for_all();
244 // array should be all zero
245 for (int i = 0; i < N; ++i ) {
246 int v = counters3[i];
247 ASSERT( v == 0, NULL );
248 }
249 src_inactive.activate();
250 // source_node_body is already in final state, so source_node will not forward a message.
251 g.wait_for_all();
252 for (int i = 0; i < N; ++i ) {
253 int v = counters3[i];
254 ASSERT( v == 0, NULL );
255 }
256}
257
258#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
259void test_extract() {
260 int counts = 0;
261 tbb::flow::tuple<int,int> dont_care;
262 tbb::flow::graph g;
263 typedef tbb::flow::source_node<int> snode_type;
264 typedef snode_type::successor_list_type successor_list_type;
265 snode_type s0(g, source_body<int>(counts), /*is_active*/false );
266 tbb::flow::join_node< tbb::flow::tuple<int,int>, tbb::flow::reserving > j0(g);
267 tbb::flow::join_node< tbb::flow::tuple<int,int>, tbb::flow::reserving > j1(g);
268 tbb::flow::join_node< tbb::flow::tuple<int,int>, tbb::flow::reserving > j2(g);
269 tbb::flow::queue_node<int> q0(g);
270 tbb::flow::queue_node<tbb::flow::tuple<int,int> > q1(g);
271 tbb::flow::make_edge(s0, tbb::flow::get<0>(j0.input_ports()));
272 /* s0 ----+ */
273 /* | j0 */
274 /* + */
275 ASSERT(!counts, "source_node activated too soon");
276 s0.activate();
277 g.wait_for_all(); // should produce one value, buffer it.
278 ASSERT(counts == 1, "source_node did not react to activation");
279
280 g.reset(tbb::flow::rf_reset_bodies);
281 counts = 0;
282 s0.extract();
283 /* s0 + */
284 /* | j0 */
285 /* + */
286 s0.activate();
287 g.wait_for_all(); // no successors, so the body will not execute
288 ASSERT(counts == 0, "source_node shouldn't forward (no successors)");
289 g.reset(tbb::flow::rf_reset_bodies);
290
291 tbb::flow::make_edge(s0, tbb::flow::get<0>(j0.input_ports()));
292 tbb::flow::make_edge(s0, tbb::flow::get<0>(j1.input_ports()));
293 tbb::flow::make_edge(s0, tbb::flow::get<0>(j2.input_ports()));
294
295 /* /+ */
296 /* / | j0 */
297 /* / + */
298 /* / */
299 /* / /--+ */
300 /* s0-/ | j1 */
301 /* \ + */
302 /* \ */
303 /* \--+ */
304 /* | j2 */
305 /* + */
306
307 // do all joins appear in successor list?
308 successor_list_type jv1;
309 jv1.push_back(&(tbb::flow::get<0>(j0.input_ports())));
310 jv1.push_back(&(tbb::flow::get<0>(j1.input_ports())));
311 jv1.push_back(&(tbb::flow::get<0>(j2.input_ports())));
312 snode_type::successor_list_type sv;
313 s0.copy_successors(sv);
314 ASSERT(lists_match(sv, jv1), "mismatch in successor list");
315
316 tbb::flow::make_edge(q0, tbb::flow::get<1>(j2.input_ports()));
317 tbb::flow::make_edge(j2, q1);
318 s0.activate();
319
320 /* /+ */
321 /* / | j0 */
322 /* / + */
323 /* / */
324 /* / /--+ */
325 /* s0-/ | j1 */
326 /* \ + */
327 /* \ */
328 /* \--+ */
329 /* | j2----q1 */
330 /* q0-----+ */
331
332 q0.try_put(1);
333 g.wait_for_all();
334 ASSERT(q1.try_get(dont_care), "join did not emit result");
335 j2.extract();
336 tbb::flow::make_edge(q0, tbb::flow::get<1>(j2.input_ports()));
337 tbb::flow::make_edge(j2, q1);
338
339 /* /+ */
340 /* / | j0 */
341 /* / + */
342 /* / */
343 /* / /--+ */
344 /* s0-/ | j1 */
345 /* + */
346 /* */
347 /* + */
348 /* | j2----q1 */
349 /* q0-----+ */
350
351 jv1.clear();
352 jv1.push_back(&(tbb::flow::get<0>(j0.input_ports())));
353 jv1.push_back(&(tbb::flow::get<0>(j1.input_ports())));
354 s0.copy_successors(sv);
355 ASSERT(lists_match(sv, jv1), "mismatch in successor list");
356
357 q0.try_put(1);
358 g.wait_for_all();
359 ASSERT(!q1.try_get(dont_care), "extract of successor did not remove pred link");
360
361 s0.extract();
362
363 /* + */
364 /* | j0 */
365 /* + */
366 /* */
367 /* + */
368 /* s0 | j1 */
369 /* + */
370 /* */
371 /* + */
372 /* | j2----q1 */
373 /* q0-----+ */
374
375 ASSERT(s0.successor_count() == 0, "successor list not cleared");
376 s0.copy_successors(sv);
377 ASSERT(sv.size() == 0, "non-empty successor list");
378
379 tbb::flow::make_edge(s0, tbb::flow::get<0>(j2.input_ports()));
380
381 /* + */
382 /* | j0 */
383 /* + */
384 /* */
385 /* + */
386 /* s0 | j1 */
387 /* \ + */
388 /* \ */
389 /* \--+ */
390 /* | j2----q1 */
391 /* q0-----+ */
392
393 jv1.clear();
394 jv1.push_back(&(tbb::flow::get<0>(j2.input_ports())));
395 s0.copy_successors(sv);
396 ASSERT(lists_match(sv, jv1), "mismatch in successor list");
397
398 q0.try_put(1);
399 g.wait_for_all();
400 ASSERT(!q1.try_get(dont_care), "extract of successor did not remove pred link");
401}
402#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403
404int TestMain() {
405 if( MinThread<1 ) {
406 REPORT("number of threads must be positive\n");
407 exit(1);
408 }
409 for ( int p = MinThread; p < MaxThread; ++p ) {
410 tbb::task_scheduler_init init(p);
411 test_single_dest<int>();
412 test_single_dest<float>();
413 }
414 test_reset();
415#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
416 test_extract();
417#endif
418 return Harness::Done;
419}
420
421