1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef tbb_tests_harness_concurrency_tracker_H
18#define tbb_tests_harness_concurrency_tracker_H
19
20#include "harness_assert.h"
21#include "harness_barrier.h"
22#include "tbb/atomic.h"
23#include "../tbb/tls.h"
24// Note: This file is used by RML tests which do not link TBB.
25// Functionality that requires TBB binaries must be guarded by !__TBB_NO_IMPLICIT_LINKAGE
26#if !defined(__TBB_NO_IMPLICIT_LINKAGE)
27#include "tbb/mutex.h"
28#include "tbb/task.h"
29#include "tbb/combinable.h"
30#include "tbb/parallel_for.h"
31#include <functional> // for std::plus
32#include "harness.h" // for Harness::NoCopy
33#endif
34
35namespace Harness {
36
37static tbb::atomic<unsigned> ctInstantParallelism;
38static tbb::atomic<unsigned> ctPeakParallelism;
39static tbb::internal::tls<uintptr_t> ctNested;
40
41class ConcurrencyTracker {
42 bool m_Outer;
43
44 static void Started () {
45 unsigned p = ++ctInstantParallelism;
46 unsigned q = ctPeakParallelism;
47 while( q<p ) {
48 q = ctPeakParallelism.compare_and_swap(p,q);
49 }
50 }
51
52 static void Stopped () {
53 ASSERT ( ctInstantParallelism > 0, "Mismatched call to ConcurrencyTracker::Stopped()" );
54 --ctInstantParallelism;
55 }
56public:
57 ConcurrencyTracker() : m_Outer(false) {
58 uintptr_t nested = ctNested;
59 ASSERT (nested == 0 || nested == 1, NULL);
60 if ( !ctNested ) {
61 Started();
62 m_Outer = true;
63 ctNested = 1;
64 }
65 }
66 ~ConcurrencyTracker() {
67 if ( m_Outer ) {
68 Stopped();
69 ctNested = 0;
70 }
71 }
72
73 static unsigned PeakParallelism() { return ctPeakParallelism; }
74 static unsigned InstantParallelism() { return ctInstantParallelism; }
75
76 static void Reset() {
77 ASSERT (ctInstantParallelism == 0, "Reset cannot be called when concurrency tracking is underway");
78 ctInstantParallelism = ctPeakParallelism = 0;
79 }
80}; // ConcurrencyTracker
81
82#if !defined(__TBB_NO_IMPLICIT_LINKAGE)
83struct ExactConcurrencyLevel : NoCopy {
84 typedef tbb::combinable<size_t> Combinable;
85private:
86 Harness::SpinBarrier *myBarrier;
87 // count unique worker threads
88 Combinable *myUniqueThreads;
89 mutable tbb::atomic<size_t> myActiveBodyCnt;
90 // output parameter for parallel_for body to report that max is reached
91 mutable bool myReachedMax;
92 // zero timeout means no barrier is used during concurrency level detection
93 const double myTimeout;
94 const size_t myConcLevel;
95 const bool myCrashOnFail;
96
97 static tbb::mutex global_mutex;
98
99 ExactConcurrencyLevel(double timeout, size_t concLevel, Combinable *uniq, bool crashOnFail) :
100 myBarrier(NULL), myUniqueThreads(uniq), myReachedMax(false),
101 myTimeout(timeout), myConcLevel(concLevel), myCrashOnFail(crashOnFail) {
102 myActiveBodyCnt = 0;
103 }
104 bool run() {
105 const int LOOP_ITERS = 100;
106 tbb::combinable<size_t> uniq;
107 Harness::SpinBarrier barrier((unsigned)myConcLevel, /*throwaway=*/true);
108 if (myTimeout != 0.)
109 myBarrier = &barrier;
110 if (!myUniqueThreads)
111 myUniqueThreads = &uniq;
112 tbb::parallel_for((size_t)0, myConcLevel*LOOP_ITERS, *this, tbb::simple_partitioner());
113 return myReachedMax;
114 }
115public:
116 void operator()(size_t) const {
117 size_t v = ++myActiveBodyCnt;
118 ASSERT(v <= myConcLevel, "Number of active bodies is too high.");
119 if (v == myConcLevel) // record that the max expected concurrency was observed
120 myReachedMax = true;
121 // try to get barrier when 1st time in the thread
122 if (myBarrier && !myBarrier->timed_wait_noerror(myTimeout))
123 ASSERT(!myCrashOnFail, "Timeout was detected.");
124
125 myUniqueThreads->local() = 1;
126 for (int i=0; i<100; i++)
127 __TBB_Pause(1);
128 --myActiveBodyCnt;
129 }
130
131 enum Mode {
132 None,
133 // When multiple blocking checks are performed, there might be not enough
134 // concurrency for all of them. Serialize check() calls.
135 Serialize
136 };
137
138 // check that we have never got more than concLevel threads,
139 // and that in some moment we saw exactly concLevel threads
140 static void check(size_t concLevel, Mode m = None) {
141 ExactConcurrencyLevel o(30., concLevel, NULL, /*crashOnFail=*/true);
142
143 tbb::mutex::scoped_lock lock;
144 if (m == Serialize)
145 lock.acquire(global_mutex);
146 bool ok = o.run();
147 ASSERT(ok, NULL);
148 }
149
150 static bool isEqual(size_t concLevel) {
151 ExactConcurrencyLevel o(3., concLevel, NULL, /*crashOnFail=*/false);
152 return o.run();
153 }
154
155 static void checkLessOrEqual(size_t concLevel, tbb::combinable<size_t> *unique) {
156 ExactConcurrencyLevel o(0., concLevel, unique, /*crashOnFail=*/true);
157
158 o.run(); // ignore result, as without a barrier it is not reliable
159 const size_t num = unique->combine(std::plus<size_t>());
160 ASSERT(num<=concLevel, "Too many workers observed.");
161 }
162};
163
164tbb::mutex ExactConcurrencyLevel::global_mutex;
165
166#endif /* !defined(__TBB_NO_IMPLICIT_LINKAGE) */
167
168} // namespace Harness
169
170#endif /* tbb_tests_harness_concurrency_tracker_H */
171