1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef TBB_PREVIEW_FLOW_GRAPH_FEATURES
18 #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
19#endif
20
21#include "tbb/tbb_config.h"
22
23#if __TBB_PREVIEW_ASYNC_MSG
24
25#if _MSC_VER
26#pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
27#endif
28
29#include "tbb/flow_graph.h"
30#include "tbb/tbb_thread.h"
31#include "tbb/concurrent_queue.h"
32
33#include "harness.h"
34#include "harness_graph.h"
35#include "harness_barrier.h"
36
37#include <sstream> // std::ostringstream
38#include <type_traits> // std::is_base_of
39
40static const int USE_N = 1000;
41static const int ACTIVITY_PAUSE_MS_NODE1 = 0;//500;
42static const int ACTIVITY_PAUSE_MS_NODE2 = 0;//100;
43
44#define _TRACE_(msg) { \
45 if (Verbose) { \
46 std::ostringstream os; \
47 os << "[TID=" << tbb::this_tbb_thread::get_id() << "] " << msg; \
48 REMARK("%s\n", os.str().c_str()); \
49 } \
50}
51
52class UserAsyncActivity // Singleton
53{
54public:
55 static UserAsyncActivity* create(const tbb::flow::async_msg<int>& msg, int timeoutMS) {
56 ASSERT(s_Activity == NULL, "created twice");
57 _TRACE_( "Create UserAsyncActivity" );
58 s_Activity = new UserAsyncActivity(msg, timeoutMS);
59 _TRACE_( "CREATED! UserAsyncActivity" );
60 return s_Activity;
61 }
62
63 static void destroy() {
64 _TRACE_( "Start UserAsyncActivity::destroy()" );
65 ASSERT(s_Activity != NULL, "destroyed twice");
66 s_Activity->myThread.join();
67 delete s_Activity;
68 s_Activity = NULL;
69 _TRACE_( "End UserAsyncActivity::destroy()" );
70 }
71
72 static int s_Result;
73
74private:
75 static void threadFunc(UserAsyncActivity* activity) {
76 _TRACE_( "UserAsyncActivity::threadFunc" );
77
78 Harness::Sleep(activity->myTimeoutMS);
79
80 const int result = static_cast<int>(reinterpret_cast<size_t>(activity)) & 0xFF; // just different random results
81 s_Result = result;
82
83 _TRACE_( "UserAsyncActivity::threadFunc - returned result " << result );
84
85 activity->returnActivityResults(result);
86 }
87
88 UserAsyncActivity(const tbb::flow::async_msg<int>& msg, int timeoutMS) : myMsg(msg), myTimeoutMS(timeoutMS)
89 , myThread(threadFunc, this)
90 {
91 // Start local thread here...
92 _TRACE_( "Started AsyncActivity" );
93 }
94
95 // Will be called from working thread
96 void returnActivityResults(int result) {
97 myMsg.set(result);
98 }
99
100private: // DATA
101 tbb::flow::async_msg<int> myMsg;
102 int myTimeoutMS;
103 tbb::tbb_thread myThread;
104
105 static UserAsyncActivity* s_Activity;
106};
107
108UserAsyncActivity* UserAsyncActivity::s_Activity = NULL;
109int UserAsyncActivity::s_Result = -1;
110
111class UserAsyncMsg1 : public tbb::flow::async_msg<int>
112{
113public:
114 typedef tbb::flow::async_msg<int> base;
115
116 UserAsyncMsg1() : base() {}
117 UserAsyncMsg1(int value) : base(value) {}
118 UserAsyncMsg1(const UserAsyncMsg1& msg) : base(msg) {}
119};
120
121struct F2_body : tbb::internal::no_assign
122{
123 static int s_FinalResult;
124
125 int& myI;
126 bool myAlive;
127
128 F2_body(int& i) : myI(i), myAlive(true) {}
129
130 F2_body(const F2_body& b) : myI(b.myI), myAlive(true) {}
131
132 ~F2_body() {
133 myAlive = false;
134 _TRACE_( "~F2_body" );
135 }
136
137 void operator () (int result) {
138 __TBB_ASSERT(myAlive, "dead node");
139
140 // Handle async activity result here
141 s_FinalResult = result;
142 _TRACE_( "F2: Got async_msg result = " << result );
143 }
144};
145
146// static
147int F2_body::s_FinalResult = -2;
148
149static bool testSimplestCase() {
150 bool bOk = true;
151 _TRACE_( "--- SAMPLE 1 (simple case 3-in-1: F1(A<T>) ---> F2(T)) " );
152
153 for (int i = 0; i <= 2; ++i) {
154 _TRACE_( "CASE " << i + 1 << ": data is " << (i > 0 ? "NOT " : "") << "ready in storage" << (i > 1 ? " NO WAITING in graph" : "") );
155 _TRACE_( "MAIN THREAD" );
156
157 {
158 tbb::flow::graph g;
159 tbb::flow::function_node< tbb::flow::continue_msg, UserAsyncMsg1 > f1( g, tbb::flow::unlimited,
160 [&]( tbb::flow::continue_msg ) -> UserAsyncMsg1 {
161 _TRACE_( "F1: Created async_msg" );
162
163 UserAsyncMsg1 a;
164 UserAsyncActivity::create(a, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
165
166 Harness::Sleep(ACTIVITY_PAUSE_MS_NODE2); // let activity to finish
167 return a;
168 }
169 );
170
171
172 tbb::flow::function_node< int > f2( g, tbb::flow::unlimited,
173 F2_body(i)
174 );
175
176 make_edge(f1, f2);
177 f1.try_put( tbb::flow::continue_msg() );
178 g.wait_for_all();
179 UserAsyncActivity::destroy();
180 _TRACE_( "Done UserAsyncActivity::destroy" );
181 g.wait_for_all();
182 _TRACE_( "Done g.wait_for_all()" );
183 }
184
185 _TRACE_( "--- THE END --- " );
186
187 if (F2_body::s_FinalResult >= 0 && UserAsyncActivity::s_Result == F2_body::s_FinalResult) {
188 _TRACE_( "CASE " << i + 1 << ": " << "PASSED" );
189 }
190 else {
191 _TRACE_( "CASE " << i + 1 << ": " << "FAILED! " << UserAsyncActivity::s_Result << " != " << F2_body::s_FinalResult );
192 bOk = false;
193 ASSERT(0, "testSimplestCase failed");
194 }
195 }
196
197 return bOk;
198}
199
200// ========================================================
201
202class UserAsyncActivityChaining;
203
204class UserAsyncMsg : public tbb::flow::async_msg<int>
205{
206public:
207 typedef tbb::flow::async_msg<int> base;
208
209 UserAsyncMsg() : base() {}
210 UserAsyncMsg(int value) : base(value) {}
211 UserAsyncMsg(const UserAsyncMsg& msg) : base(msg) {}
212
213 // Notify AsyncActivity that it must return result because async calculation chain is over
214 void finalize() const __TBB_override;
215};
216
217class UserAsyncActivityChaining // Singleton: task queue in worker thread
218{
219public:
220 static UserAsyncActivityChaining* instance() {
221 if (s_Activity == NULL) {
222 s_Activity = new UserAsyncActivityChaining();
223 }
224
225 return s_Activity;
226 }
227
228 static void destroy() {
229 ASSERT(s_Activity != NULL, "destroyed twice");
230 s_Activity->myThread.join();
231 delete s_Activity;
232 s_Activity = NULL;
233 }
234
235 static void finish(const UserAsyncMsg& msg) {
236 ASSERT(UserAsyncActivityChaining::s_Activity != NULL, "activity must be alive");
237 UserAsyncActivityChaining::s_Activity->finishTaskQueue(msg);
238 }
239
240 void addWork(int addValue, int timeout = 0) {
241 myQueue.push( MyTask(addValue, timeout) );
242 }
243
244 void finishTaskQueue(const UserAsyncMsg& msg) {
245 myMsg = msg;
246 myQueue.push( MyTask(0, 0, true) );
247 }
248
249 static int s_Result;
250
251private:
252 struct MyTask
253 {
254 MyTask(int addValue = 0, int timeout = 0, bool finishFlag = false)
255 : myAddValue(addValue), myTimeout(timeout), myFinishFlag(finishFlag) {}
256
257 int myAddValue;
258 int myTimeout;
259 bool myFinishFlag;
260 };
261
262 static void threadFunc(UserAsyncActivityChaining* activity)
263 {
264 _TRACE_( "UserAsyncActivityChaining::threadFunc" );
265
266 for (;;)
267 {
268 // Process task queue
269 MyTask work;
270 activity->myQueue.pop(work); // Waits until it can succeed
271
272 _TRACE_( "UserAsyncActivityChaining::threadFunc - work: add "
273 << work.myAddValue << " (timeout = " << work.myTimeout << ")" << (work.myFinishFlag ? " FINAL" : "") );
274
275 // 'finish flag' task is not real task, just end of queue flag
276 Harness::Sleep(work.myTimeout);
277
278 if (work.myFinishFlag) {
279 break;
280 }
281
282 activity->myQueueSum += work.myAddValue;
283 }
284
285 s_Result = activity->myQueueSum;
286 _TRACE_( "UserAsyncActivityChaining::threadFunc - returned result " << activity->myQueueSum );
287
288 // Get result back to Flow Graph
289 activity->myMsg.set(activity->myQueueSum);
290 }
291
292 UserAsyncActivityChaining()
293 : myQueueSum(0)
294 , myThread(threadFunc, this)
295 {
296 // Start local thread here...
297 _TRACE_( "Started AsyncActivityChaining" );
298 }
299
300private: // DATA
301 tbb::concurrent_bounded_queue<MyTask> myQueue;
302 int myQueueSum;
303 UserAsyncMsg myMsg;
304
305 tbb::tbb_thread myThread;
306
307 static UserAsyncActivityChaining* s_Activity;
308};
309
310// static
311UserAsyncActivityChaining* UserAsyncActivityChaining::s_Activity = NULL;
312// static
313int UserAsyncActivityChaining::s_Result = -4;
314
315// override
316void UserAsyncMsg::finalize() const {
317 _TRACE_( "UserAsyncMsg::finalize()" );
318 UserAsyncActivityChaining::finish(*this);
319}
320
321struct F3_body : tbb::internal::no_assign
322{
323 static int s_FinalResult;
324
325 int& myI;
326 bool myAlive;
327
328 F3_body(int& _i) : myI(_i), myAlive(true) {}
329
330 F3_body(const F3_body& b) : myI(b.myI), myAlive(true) {}
331
332 ~F3_body() {
333 myAlive = false;
334 _TRACE_( "~F3_body" );
335 }
336
337 void operator () (int result) {
338 __TBB_ASSERT(myAlive, "dead node");
339 // Handle async activity result here
340 s_FinalResult = result;
341 _TRACE_( "F3: Got async_msg result = " << result );
342 }
343};
344
345// static
346int F3_body::s_FinalResult = -8;
347
348static bool testChaining() {
349 bool bOk = true;
350 _TRACE_( "--- SAMPLE 2 (case with chaining: F1(A<T>) ---> F2(A<T>) ---> F3(T)) " );
351
352 for (int i = 0; i <= 2; ++i) {
353 _TRACE_( "CASE " << i + 1 << ": data is " << (i > 0 ? "NOT " : "") << "ready in storage" << (i > 1 ? " NO WAITING in graph" : "") );
354 _TRACE_( "MAIN THREAD" );
355
356 tbb::flow::graph g;
357 tbb::flow::function_node< tbb::flow::continue_msg, UserAsyncMsg > f1( g, tbb::flow::unlimited,
358 [&]( tbb::flow::continue_msg ) -> UserAsyncMsg {
359 _TRACE_( "F1: Created UserAsyncMsg" );
360
361 UserAsyncMsg a;
362 UserAsyncActivityChaining::instance()->addWork(11, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
363
364 return a;
365 }
366 );
367
368 tbb::flow::function_node< UserAsyncMsg, UserAsyncMsg > f2( g, tbb::flow::unlimited,
369 [&]( UserAsyncMsg a) -> UserAsyncMsg {
370 _TRACE_( "F2: resend UserAsyncMsg" );
371
372 UserAsyncActivityChaining::instance()->addWork(22, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
373
374 Harness::Sleep(ACTIVITY_PAUSE_MS_NODE2); // let activity to finish
375 return a;
376 }
377 );
378
379 tbb::flow::function_node< int > f3( g, tbb::flow::unlimited,
380 F3_body(i)
381 );
382
383 make_edge(f1, f2);
384 make_edge(f2, f3);
385 f1.try_put( tbb::flow::continue_msg() );
386 g.wait_for_all();
387
388 UserAsyncActivityChaining::destroy();
389 _TRACE_( "Done UserAsyncActivityChaining::destroy" );
390 g.wait_for_all();
391 _TRACE_( "Done g.wait_for_all()" );
392
393 _TRACE_( "--- THE END ---" );
394
395 if (F3_body::s_FinalResult >= 0 && UserAsyncActivityChaining::s_Result == F3_body::s_FinalResult) {
396 _TRACE_( "CASE " << i + 1 << ": " << "PASSED" );
397 }
398 else {
399 _TRACE_( "CASE " << i + 1 << ": " << "FAILED! " << UserAsyncActivityChaining::s_Result << " != " << F3_body::s_FinalResult );
400 bOk = false;
401 ASSERT(0, "testChaining failed");
402 }
403 }
404
405 return bOk;
406}
407
408// ========================================================
409namespace testFunctionsAvailabilityNS {
410
411using namespace tbb::flow;
412using tbb::flow::interface10::internal::untyped_sender;
413using tbb::flow::interface10::internal::untyped_receiver;
414
415using tbb::internal::is_same_type;
416using tbb::internal::strip;
417using tbb::flow::interface10::internal::wrap_tuple_elements;
418using tbb::flow::interface10::internal::async_helpers;
419
420class A {}; // Any type (usually called 'T')
421struct ImpossibleType {};
422
423template <typename T>
424struct UserAsync_T : public async_msg<T> {
425 UserAsync_T() {}
426 UserAsync_T(const T& t) : async_msg<T>(t) {}
427};
428
429typedef UserAsync_T<int > UserAsync_int;
430typedef UserAsync_T<float> UserAsync_float;
431typedef UserAsync_T<A > UserAsync_A;
432
433typedef tuple< UserAsync_A, UserAsync_float, UserAsync_int, async_msg<A>, async_msg<float>, async_msg<int>, A, float, int > TypeTuple;
434
435static int g_CheckerCounter = 0;
436
437template <typename T, typename U>
438struct CheckerTryPut {
439 static ImpossibleType check( ... );
440
441 template <typename C>
442 static auto check( C* p, U* q ) -> decltype(p->try_put(*q));
443
444 static const bool value = !is_same_type<decltype(check(static_cast<T*>(0), 0)), ImpossibleType>::value;
445};
446
447template <typename T1, typename T2>
448struct CheckerMakeEdge {
449 static ImpossibleType checkMake( ... );
450 static ImpossibleType checkRemove( ... );
451
452 template <typename N1, typename N2>
453 static auto checkMake( N1* n1, N2* n2 ) -> decltype(tbb::flow::make_edge(*n1, *n2));
454
455 template <typename N1, typename N2>
456 static auto checkRemove( N1* n1, N2* n2 ) -> decltype(tbb::flow::remove_edge(*n1, *n2));
457
458 static const bool valueMake = !is_same_type<decltype(checkMake (static_cast<T1*>(0), static_cast<T2*>(0))), ImpossibleType>::value;
459 static const bool valueRemove = !is_same_type<decltype(checkRemove(static_cast<T1*>(0), static_cast<T2*>(0))), ImpossibleType>::value;
460
461 __TBB_STATIC_ASSERT( valueMake == valueRemove, "make_edge() availability is NOT equal to remove_edge() availability" );
462
463 static const bool value = valueMake;
464};
465
466template <typename T1, typename T2>
467struct TypeChecker {
468 TypeChecker() {
469 ++g_CheckerCounter;
470
471 REMARK("%d: %s -> %s: %s %s \n", g_CheckerCounter, typeid(T1).name(), typeid(T2).name(),
472 (bAllowed ? "YES" : "no"), (bConvertible ? " (Convertible)" : ""));
473 }
474
475//
476// Check connection: function_node<continue_msg, SENDING_TYPE> <-> function_node<RECEIVING_TYPE>
477// R E C E I V I N G T Y P E
478// S 'bAllowed' | int | float | A | async_msg | async_msg | async_msg | UserAsync | UserAsync | UserAsync |
479// E value | | | | <int> | <float> | <A> | _int | _float | _A |
480// N -------------------------------------------------------------------------------------------------------------
481// D int | Y | | | Y | | | Y | | |
482// I float | | Y | | | Y | | | Y | |
483// N A | | | Y | | | Y | | | Y |
484// G async_msg<int> | Y | | | Y | | | | | |
485// async_msg<float> | | Y | | | Y | | | | |
486// T async_msg<A> | | | Y | | | Y | | | |
487// Y UserAsync_int | Y | | | | | | Y | | |
488// P UserAsync_float | | Y | | | | | | Y | |
489// E UserAsync_A | | | Y | | | | | | Y |
490//
491 // Test make_edge() & remove_edge() availability
492 static const bool bAllowed = is_same_type<T1, T2>::value
493 || is_same_type<typename async_helpers<T1>::filtered_type, T2>::value
494 || is_same_type<T1, typename async_helpers<T2>::filtered_type>::value;
495
496 static const bool bConvertible = bAllowed
497 || std::is_base_of<T1, T2>::value
498 || (is_same_type<typename async_helpers<T1>::filtered_type, int>::value && is_same_type<T2, float>::value)
499 || (is_same_type<typename async_helpers<T1>::filtered_type, float>::value && is_same_type<T2, int>::value);
500
501 __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<function_node<continue_msg, T1>, function_node<T2> >::value), "invalid connection Fn<T1> -> Fn<T2>" );
502 __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<queue_node<T1>, function_node<T2> >::value), "invalid connection Queue<T1> -> Fn<T2>" );
503
504 // Test make_edge() & remove_edge() availability with output_port<N>(node&)
505 __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<typename strip< decltype(
506 output_port<0>( *static_cast<multifunction_node< continue_msg, tuple<T1, int> >*>(0) ) ) >::type,
507 function_node<T2> >::value), "invalid connection MultuFn<0><T1,int> -> Fn<T2>" );
508
509 __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<typename strip< decltype(
510 output_port<1>( *static_cast<multifunction_node< continue_msg, tuple<int, T1> >*>(0) ) ) >::type,
511 function_node<T2> >::value), "invalid connection MultuFn<1><int, T1> -> Fn<T2>" );
512
513 // Test untyped_sender connections
514 __TBB_STATIC_ASSERT( (true == CheckerMakeEdge< untyped_sender, function_node<T1> >::value), "cannot connect UntypedSender -> Fn<T1>" );
515 // Test untyped_receiver connections
516 __TBB_STATIC_ASSERT( (true == CheckerMakeEdge< function_node<continue_msg, T1>, untyped_receiver >::value), "cannot connect F<.., T1> -> UntypedReceiver" );
517
518 // Test untyped_receiver->try_put(T2) availability
519 __TBB_STATIC_ASSERT( (true == CheckerTryPut<untyped_receiver, T2>::value), "untyped_receiver cannot try_put(T2)" );
520 // Test receiver<T1>->try_put(T2) availability
521 __TBB_STATIC_ASSERT( (bConvertible == CheckerTryPut<receiver<T1>, T2>::value), "invalid availability of receiver<T1>->try_put(T2)" );
522};
523
524template <typename T1>
525struct WrappedChecker {
526 WrappedChecker() {} // Workaround for compilation error
527
528 template <typename T2>
529 struct T1T2Checker : TypeChecker<T1, T2> {};
530
531 typename wrap_tuple_elements< tuple_size<TypeTuple>::value, T1T2Checker, TypeTuple >::type a;
532};
533
534typedef wrap_tuple_elements< tuple_size<TypeTuple>::value, WrappedChecker, TypeTuple >::type Checker;
535
536} // namespace testFunctionsAvailabilityNS
537
538static void testTryPut() {
539 {
540 tbb::flow::graph g;
541 tbb::flow::function_node< int > f(g, tbb::flow::unlimited, [&](int) {});
542
543 ASSERT(f.try_put(5), "try_put(int) must return true");
544 ASSERT(f.try_put(7), "try_put(int) must return true");
545
546 tbb::flow::async_msg<int> a1, a2;
547 a1.set(5);
548 ASSERT(f.try_put(a1), "try_put(async_msg) must return true");
549 ASSERT(f.try_put(a2), "try_put(async_msg) must return true");
550 a2.set(7);
551 g.wait_for_all();
552 }
553 {
554 tbb::flow::graph g;
555 typedef tbb::flow::indexer_node< int >::output_type output_type;
556 tbb::flow::indexer_node< int > i(g);
557 tbb::flow::function_node< output_type > f(g, tbb::flow::unlimited, [&](output_type) {});
558 make_edge(i, f);
559
560 ASSERT(tbb::flow::input_port<0>(i).try_put(5), "try_put(int) must return true");
561 ASSERT(tbb::flow::input_port<0>(i).try_put(7), "try_put(int) must return true");
562
563 tbb::flow::async_msg<int> a1(5), a2(7);
564 ASSERT(tbb::flow::input_port<0>(i).try_put(a1), "try_put(async_msg) must return true");
565 ASSERT(tbb::flow::input_port<0>(i).try_put(a2), "try_put(async_msg) must return true");
566 g.wait_for_all();
567 }
568}
569
570int TestMain() {
571 REMARK(" *** CHECKING FUNCTIONS: make_edge/remove_edge(node<.., T1>, node<T2>) & node<T1>->try_put(T2) ***\n");
572 testFunctionsAvailabilityNS::Checker a;
573 const int typeTupleSize = tbb::flow::tuple_size<testFunctionsAvailabilityNS::TypeTuple>::value;
574 ASSERT(testFunctionsAvailabilityNS::g_CheckerCounter == typeTupleSize*typeTupleSize, "Type checker counter value is incorrect");
575
576 testTryPut();
577
578 // NOTE: Use '-v' command line argument to get traces & remarks
579 tbb::task_scheduler_init init(4);
580 bool bOk = true;
581
582 for (int i = 0; i < USE_N; ++i) {
583 if (i > 0 && i%1000 == 0) {
584 REPORT(" *** Starting TEST %d... ***\n", i);
585 }
586
587 REMARK(" *** TEST %d ***\n", i);
588 bOk = bOk && testSimplestCase();
589 bOk = bOk && testChaining();
590 }
591
592 _TRACE_( " *** " << USE_N << " tests: " << (bOk ? "all tests passed" : "TESTS FAILED !!!") << " ***" );
593 return (bOk ? Harness::Done : Harness::Unknown);
594}
595
596#else // __TBB_PREVIEW_ASYNC_MSG
597
598#include "harness.h"
599
600int TestMain() {
601 return Harness::Skipped;
602}
603
604#endif // __TBB_PREVIEW_ASYNC_MSG
605