1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17// undefine __TBB_CPF_BUILD to simulate user's setup
18#undef __TBB_CPF_BUILD
19
20#include "tbb/tbb_config.h"
21#include "harness.h"
22
23#if __TBB_SCHEDULER_OBSERVER
24#include "tbb/task_scheduler_observer.h"
25#include "tbb/task_scheduler_init.h"
26#include "tbb/atomic.h"
27#include "tbb/task.h"
28#include "tbb/enumerable_thread_specific.h"
29#include "../tbb/tls.h"
30#include "tbb/tick_count.h"
31#include "harness_barrier.h"
32
33#if _MSC_VER && __TBB_NO_IMPLICIT_LINKAGE
34// plays around __TBB_NO_IMPLICIT_LINKAGE. __TBB_LIB_NAME should be defined (in makefiles)
35 #pragma comment(lib, __TBB_STRING(__TBB_LIB_NAME))
36#endif
37
38const int MaxFlagIndex = sizeof(uintptr_t)*8-1;
39
40struct ObserverStats {
41 tbb::atomic<int> m_entries;
42 tbb::atomic<int> m_exits;
43 tbb::atomic<int> m_workerEntries;
44 tbb::atomic<int> m_workerExits;
45
46 void Reset () {
47 m_entries = m_exits = m_workerEntries = m_workerExits = 0;
48 }
49
50 void operator += ( const ObserverStats& s ) {
51 m_entries += s.m_entries;
52 m_exits += s.m_exits;
53 m_workerEntries += s.m_workerEntries;
54 m_workerExits += s.m_workerExits;
55 }
56};
57
58struct ThreadState {
59 uintptr_t m_flags;
60 tbb::task_scheduler_observer *m_dyingObserver;
61 bool m_isMaster;
62 ThreadState() { reset(); }
63 void reset() {
64 m_flags = 0;
65 m_dyingObserver = NULL;
66 m_isMaster = false;
67 }
68 static ThreadState &get();
69};
70
71tbb::enumerable_thread_specific<ThreadState> theLocalState;
72tbb::internal::tls<intptr_t> theThreadPrivate;
73
74ThreadState &ThreadState::get() {
75 bool exists;
76 ThreadState& state = theLocalState.local(exists);
77 // ETS will not detect that a thread was allocated with the same id as a destroyed thread
78 if( exists && theThreadPrivate.get() == 0 ) state.reset();
79 theThreadPrivate = 1; // mark thread constructed
80 return state;
81}
82
83static ObserverStats theStats;
84static tbb::atomic<int> theNumObservers;
85
86const int P = min( tbb::task_scheduler_init::default_num_threads(), (int)sizeof(int) * CHAR_BIT );
87
88enum TestMode {
89 //! Ensure timely workers destruction in order to guarantee all exit notification are fired.
90 tmSynchronized = 1,
91 //! Use local observer.
92 tmLocalObservation = 2,
93 //! Observer causes autoinitialization of the scheduler
94 tmAutoinitialization = 4
95};
96
97uintptr_t theTestMode,
98 thePrevMode = 0;
99
100class MyObserver : public tbb::task_scheduler_observer, public ObserverStats {
101 uintptr_t m_flag;
102 tbb::atomic<bool> m_dying;
103
104 void on_scheduler_entry( bool is_worker ) __TBB_override {
105 ThreadState& state = ThreadState::get();
106 ASSERT( is_worker==!state.m_isMaster, NULL );
107 if ( thePrevMode & tmSynchronized ) {
108 ASSERT( !(state.m_flags & m_flag), "Observer repeatedly invoked for the same thread" );
109 if ( theTestMode & tmLocalObservation )
110 ASSERT( !state.m_flags, "Observer locality breached" );
111 }
112 if ( m_dying && theTestMode & tmLocalObservation ) {
113 // In case of local observation a worker may enter the arena after
114 // the wait for lagging on_entry calls in the MyObserver destructor
115 // succeeds but before its base class tbb::task_scheduler_observer
116 // destructor removes it from the internal list maintained by the
117 // task scheduler. This will result in on_entry notification without,
118 // subsequent on_exit as the observer is likely to be destroyed before
119 // the worker discovers that the arena is empty and leaves it.
120 //
121 // To prevent statistics distortion, ignore the notifications for
122 // observers about to be destroyed.
123 ASSERT( !state.m_dyingObserver || state.m_dyingObserver != this || thePrevMode & tmSynchronized, NULL );
124 state.m_dyingObserver = this;
125 return;
126 }
127 state.m_dyingObserver = NULL;
128 ++m_entries;
129 state.m_flags |= m_flag;
130 if ( is_worker )
131 ++m_workerEntries;
132 }
133 void on_scheduler_exit( bool is_worker ) __TBB_override {
134 ThreadState& state = ThreadState::get();
135 ASSERT( is_worker==!state.m_isMaster, NULL );
136 if ( m_dying && state.m_dyingObserver ) {
137 ASSERT( state.m_dyingObserver == this, "Exit without entry (for a dying observer)" );
138 state.m_dyingObserver = NULL;
139 return;
140 }
141 ASSERT( state.m_flags & m_flag, "Exit without entry" );
142 state.m_flags &= ~m_flag;
143 ++m_exits;
144 if ( is_worker )
145 ++m_workerExits;
146 }
147public:
148 MyObserver( uintptr_t flag )
149 : tbb::task_scheduler_observer(theTestMode & tmLocalObservation ? true : false)
150 , m_flag(flag)
151 {
152 ++theNumObservers;
153 Reset();
154 m_dying = false;
155 // Local observer causes automatic scheduler initialization
156 // in the current thread, so here, we must postpone the activation.
157 if ( !(theTestMode & tmLocalObservation))
158 observe(true);
159 }
160
161 ~MyObserver () {
162 m_dying = true;
163 ASSERT( m_exits <= m_entries, NULL );
164 if ( theTestMode & tmSynchronized ) {
165 tbb::tick_count t0 = tbb::tick_count::now();
166 while ( m_exits < m_entries && (tbb::tick_count::now() - t0).seconds() < 5 )
167 Harness::Sleep(10);
168 if ( m_exits < m_entries )
169 REPORT( "Warning: Entry/exit count mismatch (%d, %d). Observer is broken or machine is overloaded.\n", (int)m_entries, (int)m_exits );
170 }
171 theStats += *this;
172 --theNumObservers;
173 // it is recommended to disable observation before destructor of the base class starts,
174 // otherwise it can lead to concurrent notification callback on partly destroyed object,
175 // which in turn can harm (in addition) if derived class has new virtual methods.
176 // This class has no, and for test purposes we rely on implementation failsafe mechanism.
177 //observe(false);
178 }
179}; // class MyObserver
180
181Harness::SpinBarrier theGlobalBarrier;
182bool theGlobalBarrierActive = true;
183
184class FibTask : public tbb::task {
185 const int N;
186 uintptr_t m_flag;
187 MyObserver &m_observer;
188public:
189 FibTask( int n, uintptr_t flags, MyObserver &obs ) : N(n), m_flag(flags), m_observer(obs) {}
190
191 tbb::task* execute() __TBB_override {
192 ThreadState& s = ThreadState::get();
193 ASSERT( !(~s.m_flags & m_flag), NULL );
194 if( N < 2 )
195 return NULL;
196 bool globalBarrierActive = false;
197 if ( s.m_isMaster ) {
198 if ( theGlobalBarrierActive ) {
199 // This is the root task. Its N is equal to the number of threads.
200 // Spawn a task for each worker.
201 set_ref_count(N);
202 for ( int i = 1; i < N; ++i )
203 spawn( *new( allocate_child() ) FibTask(20, m_flag, m_observer) );
204 if ( theTestMode & tmSynchronized ) {
205 theGlobalBarrier.wait();
206 ASSERT( m_observer.m_entries >= N, "Wrong number of on_entry calls after the first barrier" );
207 // All the spawned tasks have been stolen by workers.
208 // Now wait for workers to spawn some more tasks for this thread to steal back.
209 theGlobalBarrier.wait();
210 ASSERT( !theGlobalBarrierActive, "Workers are expected to have reset this flag" );
211 }
212 else
213 theGlobalBarrierActive = false;
214 wait_for_all();
215 return NULL;
216 }
217 }
218 else {
219 if ( theGlobalBarrierActive ) {
220 if ( theTestMode & tmSynchronized ) {
221 theGlobalBarrier.wait();
222 globalBarrierActive = true;
223 }
224 theGlobalBarrierActive = false;
225 }
226 }
227 set_ref_count(3);
228 spawn( *new( allocate_child() ) FibTask(N-1, m_flag, m_observer) );
229 spawn( *new( allocate_child() ) FibTask(N-2, m_flag, m_observer) );
230 if ( globalBarrierActive ) {
231 // It's the first task executed by a worker. Release the master thread.
232 theGlobalBarrier.wait();
233 }
234 wait_for_all();
235 return NULL;
236 }
237}; // class FibTask
238
239Harness::SpinBarrier theMasterBarrier;
240
241class TestBody {
242 int m_numThreads;
243public:
244 TestBody( int numThreads ) : m_numThreads(numThreads) {}
245
246 void operator()( int i ) const {
247 ThreadState &state = ThreadState::get();
248 ASSERT( !state.m_isMaster, "should be newly initialized thread");
249 state.m_isMaster = true;
250 uintptr_t f = i <= MaxFlagIndex ? 1<<i : 0;
251 MyObserver o(f);
252 if ( theTestMode & tmSynchronized )
253 theMasterBarrier.wait();
254 // when mode is local observation but not synchronized and when num threads == default
255 if ( theTestMode & tmAutoinitialization )
256 o.observe(true); // test autoinitialization can be done by observer
257 // Observer in enabled state must outlive the scheduler to ensure that
258 // all exit notifications are called.
259 tbb::task_scheduler_init init(m_numThreads);
260 // when local & non-autoinitialized observation mode
261 if ( theTestMode & tmLocalObservation )
262 o.observe(true);
263 for ( int j = 0; j < 2; ++j ) {
264 tbb::task &t = *new( tbb::task::allocate_root() ) FibTask(m_numThreads, f, o);
265 tbb::task::spawn_root_and_wait(t);
266 thePrevMode = theTestMode;
267 }
268 }
269}; // class TestBody
270
271void TestObserver( int M, int T, uintptr_t testMode ) {
272 theLocalState.clear();
273 theStats.Reset();
274 theGlobalBarrierActive = true;
275 theTestMode = testMode;
276 NativeParallelFor( M, TestBody(T) );
277 // When T (number of threads in arena, i.e. master + workers) is less than P
278 // (hardware concurrency), more than T-1 workers can visit the same arena. This
279 // is possible in case of imbalance or when other arenas are activated/deactivated
280 // concurrently).
281 ASSERT( !theNumObservers, "Unexpected alive observer(s)" );
282 REMARK( "Entries %d / %d, exits %d\n", (int)theStats.m_entries, (int)theStats.m_workerEntries, (int)theStats.m_exits );
283 if ( testMode & tmSynchronized ) {
284 if ( testMode & tmLocalObservation ) {
285 ASSERT( theStats.m_entries >= M * T, "Too few on_entry calls" );
286 ASSERT( theStats.m_workerEntries >= M * (T - 1), "Too few worker entries" );
287 }
288 else {
289 ASSERT( theStats.m_entries >= M * M * T, "Too few on_entry calls" );
290 ASSERT( theStats.m_entries <= M * (P + 1), "Too many on_entry calls" );
291 ASSERT( theStats.m_workerEntries >= M * M * (T - 1), "Too few worker entries" );
292 ASSERT( theStats.m_workerEntries <= M * (P - 1), "Too many worker entries" );
293 }
294 ASSERT( theStats.m_entries == theStats.m_exits, "Entries/exits mismatch" );
295 }
296 else {
297 ASSERT( theStats.m_entries >= M, "Too few on_entry calls" );
298 ASSERT( theStats.m_exits >= M || (testMode & tmAutoinitialization), "Too few on_exit calls" );
299 if ( !(testMode & tmLocalObservation) ) {
300 ASSERT( theStats.m_entries <= M * M * P, "Too many on_entry calls" );
301 ASSERT( theStats.m_exits <= M * M * T, "Too many on_exit calls" );
302 }
303 ASSERT( theStats.m_entries >= theStats.m_exits, "More exits than entries" );
304 }
305}
306
307int TestMain () {
308 if ( P < 2 )
309 return Harness::Skipped;
310 theNumObservers = 0;
311 // Fully- and under-utilized mode
312 for ( int M = 1; M < P; M <<= 1 ) {
313 if ( M > P/2 ) {
314 ASSERT( P & (P-1), "Can get here only in case of non power of two cores" );
315 M = P/2;
316 if ( M==1 || (M & (M-1)) )
317 break; // Already tested this configuration
318 }
319 int T = P / M;
320 ASSERT( T > 1, NULL );
321 REMARK( "Masters: %d; Arena size: %d\n", M, T );
322 theMasterBarrier.initialize(M);
323 theGlobalBarrier.initialize(M * T);
324 TestObserver(M, T, 0);
325 TestObserver(M, T, tmSynchronized | tmLocalObservation );
326 // keep tmAutoInitialization the last, as it does not release worker threads
327 TestObserver(M, T, tmLocalObservation | ( T==P? tmAutoinitialization : 0) );
328 }
329 // Oversubscribed mode
330 for ( int i = 0; i < 4; ++i ) {
331 REMARK( "Masters: %d; Arena size: %d\n", P-1, P );
332 TestObserver(P-1, P, 0);
333 TestObserver(P-1, P, tmLocalObservation);
334 }
335 Harness::Sleep(20);
336 return Harness::Done;
337}
338
339#else /* !__TBB_SCHEDULER_OBSERVER */
340
341int TestMain () {
342 return Harness::Skipped;
343}
344#endif /* !__TBB_SCHEDULER_OBSERVER */
345