1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17//
18// Test for counting semaphore.
19//
20// set semaphore to N
21// create N + M threads
22// have each thread
23// A. P()
24// B. increment atomic count
25// C. spin for awhile checking the value of the count; make sure it doesn't exceed N
26// D. decrement atomic count
27// E. V()
28//
29
30#include "../tbb/semaphore.h"
31#include "tbb/atomic.h"
32#include "tbb/blocked_range.h"
33
34#include <vector>
35using std::vector;
36
37#include "harness_assert.h"
38#include "harness.h"
39
40using tbb::internal::semaphore;
41
42#include "harness_barrier.h"
43
44tbb::atomic<int> pCount;
45
46Harness::SpinBarrier sBarrier;
47
48#include "tbb/tick_count.h"
49// semaphore basic function:
50// set semaphore to initial value
51// see that semaphore only allows that number of threads to be active
52class Body: NoAssign {
53 const int nIters;
54 tbb::internal::semaphore &mySem;
55 vector<int> &ourCounts;
56 vector<double> &tottime;
57 static const int tickCounts = 1; // millisecond
58 static const int innerWait = 5; // millisecond
59public:
60 Body(int nThread_, int nIter_, semaphore &mySem_,
61 vector<int>& ourCounts_,
62 vector<double>& tottime_
63 ) : nIters(nIter_), mySem(mySem_), ourCounts(ourCounts_), tottime(tottime_) { sBarrier.initialize(nThread_); pCount = 0; }
64void operator()(const int tid) const {
65 sBarrier.wait();
66 for(int i=0; i < nIters; ++i) {
67 Harness::Sleep( tid * tickCounts );
68 tbb::tick_count t0 = tbb::tick_count::now();
69 mySem.P();
70 tbb::tick_count t1 = tbb::tick_count::now();
71 tottime[tid] += (t1-t0).seconds();
72 int curval = ++pCount;
73 if(curval > ourCounts[tid]) ourCounts[tid] = curval;
74 Harness::Sleep( innerWait );
75 --pCount;
76 ASSERT((int)pCount >= 0, NULL);
77 mySem.V();
78 }
79}
80};
81
82
83void testSemaphore( int semInitCnt, int extraThreads ) {
84 semaphore my_sem(semInitCnt);
85 // tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred);
86 int nThreads = semInitCnt + extraThreads;
87 vector<int> maxVals(nThreads);
88 vector<double> totTimes(nThreads);
89 int nIters = 10;
90 Body myBody(nThreads, nIters, my_sem, maxVals, totTimes);
91
92 REMARK( " sem(%d) with %d extra threads\n", semInitCnt, extraThreads);
93 pCount = 0;
94 NativeParallelFor(nThreads, myBody);
95 if(extraThreads == 0) {
96 double allPWaits = 0;
97 for(vector<double>::const_iterator j = totTimes.begin(); j != totTimes.end(); ++j) {
98 allPWaits += *j;
99 }
100 allPWaits /= static_cast<double>(nThreads * nIters);
101 REMARK("Average wait for P() in uncontested case for nThreads = %d is %g\n", nThreads, allPWaits);
102 }
103 ASSERT(!pCount, "not all threads decremented pCount");
104 int maxCount = -1;
105 for(vector<int>::const_iterator i=maxVals.begin(); i!= maxVals.end();++i) {
106 maxCount = max(maxCount,*i);
107 }
108 ASSERT(maxCount <= semInitCnt,"too many threads in semaphore-protected increment");
109 if(maxCount < semInitCnt) {
110 REMARK("Not enough threads in semaphore-protected region (%d < %d)\n", static_cast<int>(maxCount), semInitCnt);
111 }
112}
113
114#include "../tbb/semaphore.cpp"
115#if _WIN32||_WIN64
116#include "../tbb/dynamic_link.cpp"
117
118void testOSVersion() {
119#if __TBB_USE_SRWLOCK
120 BOOL bIsWindowsVistaOrLater;
121#if __TBB_WIN8UI_SUPPORT
122 bIsWindowsVistaOrLater = true;
123#else
124 OSVERSIONINFO osvi;
125
126 memset( (void*)&osvi, 0, sizeof(OSVERSIONINFO) );
127 osvi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
128 GetVersionEx(&osvi);
129 bIsWindowsVistaOrLater = (osvi.dwMajorVersion >= 6 );
130#endif
131
132 if( bIsWindowsVistaOrLater ) {
133 REMARK("Checking SRWLock is loaded\n");
134 tbb::internal::binary_semaphore s;
135 ASSERT( (uintptr_t)tbb::internal::__TBB_init_binsem!=(uintptr_t)&tbb::internal::init_binsem_using_event, NULL );
136 ASSERT( (uintptr_t)tbb::internal::__TBB_acquire_binsem!=(uintptr_t)&tbb::internal::acquire_binsem_using_event, NULL );
137 ASSERT( (uintptr_t)tbb::internal::__TBB_release_binsem!=(uintptr_t)&tbb::internal::release_binsem_using_event, NULL );
138 }
139#endif /* __TBB_USE_SRWLOCK */
140}
141#endif /* _WIN32||_WIN64 */
142
143#define N_TIMES 1000
144
145template<typename S>
146struct Counter {
147 volatile long value;
148 S my_sem;
149 Counter() : value(0) {}
150};
151
152//! Function object for use with parallel_for.h.
153template<typename C>
154struct AddOne: NoAssign {
155 C& my_counter;
156 /** Increments counter once for each iteration in the iteration space. */
157 void operator()( int /*tid*/ ) const {
158 for( size_t i=0; i<N_TIMES; ++i ) {
159 my_counter.my_sem.P();
160 my_counter.value = my_counter.value + 1;
161 my_counter.my_sem.V();
162 }
163 }
164 AddOne( C& c_ ) : my_counter(c_) { my_counter.my_sem.V(); }
165};
166
167void testBinarySemaphore( int nThreads ) {
168 REMARK("Testing binary semaphore\n");
169 Counter<tbb::internal::binary_semaphore> counter;
170 AddOne<Counter<tbb::internal::binary_semaphore> > myAddOne(counter);
171 NativeParallelFor( nThreads, myAddOne );
172 ASSERT( nThreads*N_TIMES==counter.value, "Binary semaphore operations P()/V() have a race");
173}
174
175// Power of 2, the most tokens that can be in flight.
176#define MAX_TOKENS 32
177enum FilterType { imaProducer, imaConsumer };
178class FilterBase : NoAssign {
179protected:
180 FilterType ima;
181 unsigned totTokens; // total number of tokens to be emitted, only used by producer
182 tbb::atomic<unsigned>& myTokens;
183 tbb::atomic<unsigned>& otherTokens;
184 unsigned myWait;
185 semaphore &mySem;
186 semaphore &nextSem;
187 unsigned* myBuffer;
188 unsigned* nextBuffer;
189 unsigned curToken;
190public:
191 FilterBase( FilterType ima_
192 ,unsigned totTokens_
193 ,tbb::atomic<unsigned>& myTokens_
194 ,tbb::atomic<unsigned>& otherTokens_
195 ,unsigned myWait_
196 ,semaphore &mySem_
197 ,semaphore &nextSem_
198 ,unsigned* myBuffer_
199 ,unsigned* nextBuffer_
200 )
201 : ima(ima_),totTokens(totTokens_),myTokens(myTokens_),otherTokens(otherTokens_),myWait(myWait_),mySem(mySem_),
202 nextSem(nextSem_),myBuffer(myBuffer_),nextBuffer(nextBuffer_)
203 {
204 curToken = 0;
205 }
206 void Produce(const int tid);
207 void Consume(const int tid);
208 void operator()(const int tid) { if(ima == imaConsumer) Consume(tid); else Produce(tid); }
209};
210
211class ProduceConsumeBody {
212 FilterBase** myFilters;
213 public:
214 ProduceConsumeBody(FilterBase** myFilters_) : myFilters(myFilters_) {}
215 void operator()(const int tid) const {
216 myFilters[tid]->operator()(tid);
217 }
218};
219
220// send a bunch of non-Null "tokens" to consumer, then a NULL.
221void FilterBase::Produce(const int /*tid*/) {
222 nextBuffer[0] = 0; // just in case we provide no tokens
223 sBarrier.wait();
224 while(totTokens) {
225 while(!myTokens)
226 mySem.P();
227 // we have a slot available.
228 --myTokens; // moving this down reduces spurious wakeups
229 --totTokens;
230 if(totTokens)
231 nextBuffer[curToken&(MAX_TOKENS-1)] = curToken*3+1;
232 else
233 nextBuffer[curToken&(MAX_TOKENS-1)] = 0;
234 ++curToken;
235 Harness::Sleep(myWait);
236 unsigned temp = ++otherTokens;
237 if(temp == 1)
238 nextSem.V();
239 }
240 nextSem.V(); // final wakeup
241}
242
243void FilterBase::Consume(const int /*tid*/) {
244 unsigned myToken;
245 sBarrier.wait();
246 do {
247 while(!myTokens)
248 mySem.P();
249 // we have a slot available.
250 --myTokens; // moving this down reduces spurious wakeups
251 myToken = myBuffer[curToken&(MAX_TOKENS-1)];
252 if(myToken) {
253 ASSERT(myToken == curToken*3+1, "Error in received token");
254 ++curToken;
255 Harness::Sleep(myWait);
256 unsigned temp = ++otherTokens;
257 if(temp == 1)
258 nextSem.V();
259 }
260 } while(myToken);
261 // end of processing
262 ASSERT(curToken + 1 == totTokens, "Didn't receive enough tokens");
263}
264
265// -- test of producer/consumer with atomic buffer cnt and semaphore
266// nTokens are total number of tokens through the pipe
267// pWait is the wait time for the producer
268// cWait is the wait time for the consumer
269void testProducerConsumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait) {
270 semaphore pSem;
271 semaphore cSem;
272 tbb::atomic<unsigned> pTokens;
273 tbb::atomic<unsigned> cTokens;
274 cTokens = 0;
275 unsigned cBuffer[MAX_TOKENS];
276 FilterBase* myFilters[2]; // one producer, one consumer
277 REMARK("Testing producer/consumer with %lu total tokens, %lu tokens at a time, producer wait(%lu), consumer wait (%lu)\n", totTokens, nTokens, pWait, cWait);
278 ASSERT(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
279 myFilters[0] = new FilterBase(imaProducer, totTokens, pTokens, cTokens, pWait, cSem, pSem, (unsigned *)NULL, &(cBuffer[0]));
280 myFilters[1] = new FilterBase(imaConsumer, totTokens, cTokens, pTokens, cWait, pSem, cSem, cBuffer, (unsigned *)NULL);
281 pTokens = nTokens;
282 ProduceConsumeBody myBody(myFilters);
283 sBarrier.initialize(2);
284 NativeParallelFor(2, myBody);
285 delete myFilters[0];
286 delete myFilters[1];
287}
288
289int TestMain() {
290 REMARK("Started\n");
291#if _WIN32||_WIN64
292 testOSVersion();
293#endif
294 if(MaxThread > 0) {
295 testBinarySemaphore( MaxThread );
296 for(int semSize = 1; semSize <= MaxThread; ++semSize) {
297 for(int exThreads = 0; exThreads <= MaxThread - semSize; ++exThreads) {
298 testSemaphore( semSize, exThreads );
299 }
300 }
301 }
302 // Test producer/consumer with varying execution times and buffer sizes
303 // ( total tokens, tokens in buffer, sleep for producer, sleep for consumer )
304 testProducerConsumer( 10, 2, 5, 5 );
305 testProducerConsumer( 10, 2, 20, 5 );
306 testProducerConsumer( 10, 2, 5, 20 );
307 testProducerConsumer( 10, 1, 5, 5 );
308 testProducerConsumer( 20, 10, 5, 20 );
309 testProducerConsumer( 64, 32, 1, 20 );
310 return Harness::Done;
311}
312