1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define HARNESS_DEFAULT_MIN_THREADS 6
18#define HARNESS_DEFAULT_MAX_THREADS 8
19
20#include "tbb/concurrent_monitor.h"
21#include "tbb/atomic.h"
22#include "tbb/task_scheduler_init.h"
23#include "tbb/parallel_for.h"
24#include "tbb/blocked_range.h"
25#include "harness.h"
26#if _WIN32||_WIN64
27#include "tbb/dynamic_link.cpp"
28#endif
29
30#include "tbb/semaphore.cpp"
31#include "tbb/concurrent_monitor.cpp"
32
33#if _MSC_VER && !defined(__INTEL_COMPILER)
34 // Workaround for overzealous compiler warnings
35 // Suppress compiler warning about constant conditional expression
36 #pragma warning (disable: 4127)
37#endif
38
39using namespace tbb;
40
41//! Queuing lock with concurrent_monitor; to test concurrent_monitor::notify( Predicate p )
42class QueuingMutex {
43public:
44 //! Construct unacquired mutex.
45 QueuingMutex() { q_tail = NULL; }
46
47 //! The scoped locking pattern
48 class ScopedLock: internal::no_copy {
49 void Initialize() { mutex = NULL; }
50 public:
51 ScopedLock() {Initialize();}
52 ScopedLock( QueuingMutex& m, size_t test_mode ) { Initialize(); Acquire(m,test_mode); }
53 ~ScopedLock() { if( mutex ) Release(); }
54 void Acquire( QueuingMutex& m, size_t test_mode );
55 void Release();
56 void SleepPerhaps();
57
58 private:
59 QueuingMutex* mutex;
60 ScopedLock* next;
61 uintptr_t going;
62 internal::concurrent_monitor::thread_context thr_ctx;
63 };
64
65 friend class ScopedLock;
66private:
67 //! The last competitor requesting the lock
68 atomic<ScopedLock*> q_tail;
69 internal::concurrent_monitor waitq;
70};
71
72struct PredicateEq {
73 uintptr_t p;
74 PredicateEq( uintptr_t p_ ) : p(p_) {}
75 bool operator() ( uintptr_t v ) const {return p==v;}
76};
77
78struct QueuingMutex_Context {
79 const QueuingMutex::ScopedLock* lck;
80 QueuingMutex_Context( QueuingMutex::ScopedLock* l_ ) : lck(l_) {}
81 uintptr_t operator()() { return uintptr_t(lck); }
82};
83
84struct QueuingMutex_Until : NoAssign {
85 uintptr_t& flag;
86 QueuingMutex_Until( uintptr_t& f_ ) : flag(f_) {}
87 bool operator()() { return flag!=0ul; }
88};
89
90//! A method to acquire QueuingMutex lock
91void QueuingMutex::ScopedLock::Acquire( QueuingMutex& m, size_t test_mode )
92{
93 // Must set all fields before the fetch_and_store, because once the
94 // fetch_and_store executes, *this becomes accessible to other threads.
95 mutex = &m;
96 next = NULL;
97 going = 0;
98
99 // The fetch_and_store must have release semantics, because we are
100 // "sending" the fields initialized above to other processors.
101 ScopedLock* pred = m.q_tail.fetch_and_store<tbb::release>(this);
102 if( pred ) {
103#if TBB_USE_ASSERT
104 __TBB_control_consistency_helper(); // on "m.q_tail"
105 ASSERT( !pred->next, "the predecessor has another successor!");
106#endif
107 pred->next = this;
108 for( int i=0; i<16; ++i ) {
109 if( going!=0ul ) break;
110 __TBB_Yield();
111 }
112 int x = int( test_mode%3 );
113 switch( x ) {
114 case 0:
115 mutex->waitq.wait( QueuingMutex_Until(going), QueuingMutex_Context(this) );
116 break;
117#if __TBB_CPP11_LAMBDAS_PRESENT
118 case 1:
119 mutex->waitq.wait( [&](){ return going!=0ul; }, [=]() { return (uintptr_t)this; } );
120 break;
121#endif
122 default:
123 SleepPerhaps();
124 break;
125 }
126 }
127
128 // Acquire critical section indirectly from previous owner or directly from predecessor.
129 __TBB_control_consistency_helper(); // on either "m.q_tail" or "going"
130}
131
132//! A method to release QueuingMutex lock
133void QueuingMutex::ScopedLock::Release( )
134{
135 if( !next ) {
136 if( this == mutex->q_tail.compare_and_swap<tbb::release>(NULL, this) ) {
137 // this was the only item in the queue, and the queue is now empty.
138 goto done;
139 }
140 // Someone in the queue
141 spin_wait_while_eq( next, (ScopedLock*)0 );
142 }
143 __TBB_store_with_release(next->going, 1);
144 mutex->waitq.notify( PredicateEq(uintptr_t(next)) );
145done:
146 Initialize();
147}
148
149//! Yield and block; go to sleep
150void QueuingMutex::ScopedLock::SleepPerhaps()
151{
152 bool slept = false;
153 internal::concurrent_monitor& mq = mutex->waitq;
154 mq.prepare_wait( thr_ctx, uintptr_t(this) );
155 while( going==0ul ) {
156 if( (slept=mq.commit_wait( thr_ctx ))==true && going!=0ul )
157 break;
158 slept = false;
159 mq.prepare_wait( thr_ctx, uintptr_t(this) );
160 }
161 if( !slept )
162 mq.cancel_wait( thr_ctx );
163}
164
165// Spin lock with concurrent_monitor; to test concurrent_monitor::notify_all() and concurrent_monitor::notify()
166class SpinMutex {
167public:
168 //! Construct unacquired mutex.
169 SpinMutex() : toggle(false) { flag = 0; }
170
171 //! The scoped locking pattern
172 class ScopedLock: internal::no_copy {
173 void Initialize() { mutex = NULL; }
174 public:
175 ScopedLock() {Initialize();}
176 ScopedLock( SpinMutex& m, size_t test_mode ) { Initialize(); Acquire(m,test_mode); }
177 ~ScopedLock() { if( mutex ) Release(); }
178 void Acquire( SpinMutex& m, size_t test_mode );
179 void Release();
180 void SleepPerhaps();
181
182 private:
183 SpinMutex* mutex;
184 internal::concurrent_monitor::thread_context thr_ctx;
185 };
186
187 friend class ScopedLock;
188 friend struct SpinMutex_Until;
189private:
190 tbb::atomic<unsigned> flag;
191 bool toggle;
192 internal::concurrent_monitor waitq;
193};
194
195struct SpinMutex_Context {
196 const SpinMutex::ScopedLock* lck;
197 SpinMutex_Context( SpinMutex::ScopedLock* l_ ) : lck(l_) {}
198 uintptr_t operator()() { return uintptr_t(lck); }
199};
200
201struct SpinMutex_Until {
202 const SpinMutex* mtx;
203 SpinMutex_Until( SpinMutex* m_ ) : mtx(m_) {}
204 bool operator()() { return mtx->flag==0; }
205};
206
207//! A method to acquire SpinMutex lock
208void SpinMutex::ScopedLock::Acquire( SpinMutex& m, size_t test_mode )
209{
210 mutex = &m;
211retry:
212 if( m.flag.compare_and_swap( 1, 0 )!=0 ) {
213 int x = int( test_mode%3 );
214 switch( x ) {
215 case 0:
216 mutex->waitq.wait( SpinMutex_Until(mutex), SpinMutex_Context(this) );
217 break;
218#if __TBB_CPP11_LAMBDAS_PRESENT
219 case 1:
220 mutex->waitq.wait( [&](){ return mutex->flag==0; }, [=]() { return (uintptr_t)this; } );
221 break;
222#endif
223 default:
224 SleepPerhaps();
225 break;
226 }
227 goto retry;
228 }
229}
230
231//! A method to release SpinMutex lock
232void SpinMutex::ScopedLock::Release()
233{
234 bool old_toggle = mutex->toggle;
235 mutex->toggle = !mutex->toggle;
236 mutex->flag = 0;
237 if( old_toggle )
238 mutex->waitq.notify_one();
239 else
240 mutex->waitq.notify_all();
241}
242
243//! Yield and block; go to sleep
244void SpinMutex::ScopedLock::SleepPerhaps()
245{
246 bool slept = false;
247 internal::concurrent_monitor& mq = mutex->waitq;
248 mq.prepare_wait( thr_ctx, uintptr_t(this) );
249 while( mutex->flag ) {
250 if( (slept=mq.commit_wait( thr_ctx ))==true )
251 break;
252 mq.prepare_wait( thr_ctx, uintptr_t(this) );
253 }
254 if( !slept )
255 mq.cancel_wait( thr_ctx );
256}
257
258//! A value protected by a mutex.
259template<typename M>
260struct Counter {
261 typedef M mutex_type;
262 M mutex;
263 long value;
264};
265
266//! Function object for use with parallel_for.h.
267template<typename C, int D>
268struct AddOne: NoAssign {
269 C& counter;
270 /** Increments counter once for each iteration in the iteration space. */
271 void operator()( tbb::blocked_range<size_t>& range ) const {
272 for( size_t i=range.begin(); i!=range.end(); ++i ) {
273 typename C::mutex_type::ScopedLock lock(counter.mutex, i);
274 counter.value = counter.value+1;
275 if( D>0 )
276 for( int j=0; j<D; ++j ) __TBB_Yield();
277 }
278 }
279 AddOne( C& counter_ ) : counter(counter_) {}
280};
281
282//! Generic test with TBB mutex type M, max range R, and delay D.
283template<typename M,int R, int D>
284void Test( int p ) {
285 Counter<M> counter;
286 counter.value = 0;
287 const int n = R;
288 tbb::task_scheduler_init init(p);
289 tbb::parallel_for(tbb::blocked_range<size_t>(0,n,n/10),AddOne<Counter<M>,D>(counter));
290 if( counter.value!=n )
291 REPORT("ERROR : counter.value=%ld (instead of %ld)\n",counter.value,n);
292}
293
294#if TBB_USE_EXCEPTIONS
295#define NTHRS_USED_IN_DESTRUCTOR_TEST 8
296
297atomic<size_t> n_sleepers;
298
299#if defined(_MSC_VER) && defined(_Wp64)
300 // Workaround for overzealous compiler warnings in /Wp64 mode
301 #pragma warning (disable: 4244 4267)
302#endif
303
304struct AllButOneSleep : NoAssign {
305 internal::concurrent_monitor*& mon;
306 static const size_t VLN = 1024*1024;
307 void operator()( int i ) const {
308 internal::concurrent_monitor::thread_context thr_ctx;
309
310 if( i==0 ) {
311 size_t n_expected_sleepers = NTHRS_USED_IN_DESTRUCTOR_TEST-1;
312 while( n_sleepers<n_expected_sleepers )
313 __TBB_Yield();
314 while( n_sleepers.compare_and_swap( VLN+NTHRS_USED_IN_DESTRUCTOR_TEST, n_expected_sleepers )!=n_expected_sleepers )
315 __TBB_Yield();
316
317 for( int j=0; j<100; ++j )
318 Harness::Sleep( 1 );
319 delete mon;
320 mon = NULL;
321 } else {
322 mon->prepare_wait( thr_ctx, uintptr_t(this) );
323 while( n_sleepers<VLN ) {
324 try {
325 ++n_sleepers;
326 mon->commit_wait( thr_ctx );
327 if( --n_sleepers>VLN )
328 break;
329 } catch( tbb::user_abort& ) {
330 // can no longer access 'mon'
331 break;
332 }
333 mon->prepare_wait( thr_ctx, uintptr_t(this) );
334 }
335 }
336 }
337 AllButOneSleep( internal::concurrent_monitor*& m_ ) : mon(m_) {}
338};
339#endif /* TBB_USE_EXCEPTIONS */
340
341void TestDestructor() {
342#if TBB_USE_EXCEPTIONS
343 tbb::task_scheduler_init init(NTHRS_USED_IN_DESTRUCTOR_TEST);
344 internal::concurrent_monitor* my_mon = new internal::concurrent_monitor;
345 REMARK( "testing the destructor\n" );
346 n_sleepers = 0;
347 NativeParallelFor(NTHRS_USED_IN_DESTRUCTOR_TEST,AllButOneSleep(my_mon));
348 ASSERT( my_mon==NULL, "" );
349#endif /* TBB_USE_EXCEPTIONS */
350}
351
352int TestMain () {
353 for( int p=MinThread; p<=MaxThread; ++p ) {
354 REMARK( "testing with %d workers\n", static_cast<int>(p) );
355 // test the predicated notify
356 Test<QueuingMutex,100000,0>( p );
357 Test<QueuingMutex,1000,10000>( p );
358 // test the notify_all method
359 Test<SpinMutex,100000,0>( p );
360 Test<SpinMutex,1000,10000>( p );
361 REMARK( "calling destructor for task_scheduler_init\n" );
362 }
363 TestDestructor();
364 return Harness::Done;
365}
366