1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | // |
18 | // Test for counting semaphore. |
19 | // |
20 | // set semaphore to N |
21 | // create N + M threads |
22 | // have each thread |
23 | // A. P() |
24 | // B. increment atomic count |
25 | // C. spin for awhile checking the value of the count; make sure it doesn't exceed N |
26 | // D. decrement atomic count |
27 | // E. V() |
28 | // |
29 | |
30 | #include "../tbb/semaphore.h" |
31 | #include "tbb/atomic.h" |
32 | #include "tbb/blocked_range.h" |
33 | |
34 | #include <vector> |
35 | using std::vector; |
36 | |
37 | #include "harness_assert.h" |
38 | #include "harness.h" |
39 | |
40 | using tbb::internal::semaphore; |
41 | |
42 | #include "harness_barrier.h" |
43 | |
44 | tbb::atomic<int> pCount; |
45 | |
46 | Harness::SpinBarrier sBarrier; |
47 | |
48 | #include "tbb/tick_count.h" |
49 | // semaphore basic function: |
50 | // set semaphore to initial value |
51 | // see that semaphore only allows that number of threads to be active |
52 | class Body: NoAssign { |
53 | const int nIters; |
54 | tbb::internal::semaphore &mySem; |
55 | vector<int> &ourCounts; |
56 | vector<double> &tottime; |
57 | static const int tickCounts = 1; // millisecond |
58 | static const int innerWait = 5; // millisecond |
59 | public: |
60 | Body(int nThread_, int nIter_, semaphore &mySem_, |
61 | vector<int>& ourCounts_, |
62 | vector<double>& tottime_ |
63 | ) : nIters(nIter_), mySem(mySem_), ourCounts(ourCounts_), tottime(tottime_) { sBarrier.initialize(nThread_); pCount = 0; } |
64 | void operator()(const int tid) const { |
65 | sBarrier.wait(); |
66 | for(int i=0; i < nIters; ++i) { |
67 | Harness::Sleep( tid * tickCounts ); |
68 | tbb::tick_count t0 = tbb::tick_count::now(); |
69 | mySem.P(); |
70 | tbb::tick_count t1 = tbb::tick_count::now(); |
71 | tottime[tid] += (t1-t0).seconds(); |
72 | int curval = ++pCount; |
73 | if(curval > ourCounts[tid]) ourCounts[tid] = curval; |
74 | Harness::Sleep( innerWait ); |
75 | --pCount; |
76 | ASSERT((int)pCount >= 0, NULL); |
77 | mySem.V(); |
78 | } |
79 | } |
80 | }; |
81 | |
82 | |
83 | void testSemaphore( int semInitCnt, int ) { |
84 | semaphore my_sem(semInitCnt); |
85 | // tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred); |
86 | int nThreads = semInitCnt + extraThreads; |
87 | vector<int> maxVals(nThreads); |
88 | vector<double> totTimes(nThreads); |
89 | int nIters = 10; |
90 | Body myBody(nThreads, nIters, my_sem, maxVals, totTimes); |
91 | |
92 | REMARK( " sem(%d) with %d extra threads\n" , semInitCnt, extraThreads); |
93 | pCount = 0; |
94 | NativeParallelFor(nThreads, myBody); |
95 | if(extraThreads == 0) { |
96 | double allPWaits = 0; |
97 | for(vector<double>::const_iterator j = totTimes.begin(); j != totTimes.end(); ++j) { |
98 | allPWaits += *j; |
99 | } |
100 | allPWaits /= static_cast<double>(nThreads * nIters); |
101 | REMARK("Average wait for P() in uncontested case for nThreads = %d is %g\n" , nThreads, allPWaits); |
102 | } |
103 | ASSERT(!pCount, "not all threads decremented pCount" ); |
104 | int maxCount = -1; |
105 | for(vector<int>::const_iterator i=maxVals.begin(); i!= maxVals.end();++i) { |
106 | maxCount = max(maxCount,*i); |
107 | } |
108 | ASSERT(maxCount <= semInitCnt,"too many threads in semaphore-protected increment" ); |
109 | if(maxCount < semInitCnt) { |
110 | REMARK("Not enough threads in semaphore-protected region (%d < %d)\n" , static_cast<int>(maxCount), semInitCnt); |
111 | } |
112 | } |
113 | |
114 | #include "../tbb/semaphore.cpp" |
115 | #if _WIN32||_WIN64 |
116 | #include "../tbb/dynamic_link.cpp" |
117 | |
118 | void testOSVersion() { |
119 | #if __TBB_USE_SRWLOCK |
120 | BOOL bIsWindowsVistaOrLater; |
121 | #if __TBB_WIN8UI_SUPPORT |
122 | bIsWindowsVistaOrLater = true; |
123 | #else |
124 | OSVERSIONINFO osvi; |
125 | |
126 | memset( (void*)&osvi, 0, sizeof(OSVERSIONINFO) ); |
127 | osvi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); |
128 | GetVersionEx(&osvi); |
129 | bIsWindowsVistaOrLater = (osvi.dwMajorVersion >= 6 ); |
130 | #endif |
131 | |
132 | if( bIsWindowsVistaOrLater ) { |
133 | REMARK("Checking SRWLock is loaded\n" ); |
134 | tbb::internal::binary_semaphore s; |
135 | ASSERT( (uintptr_t)tbb::internal::__TBB_init_binsem!=(uintptr_t)&tbb::internal::init_binsem_using_event, NULL ); |
136 | ASSERT( (uintptr_t)tbb::internal::__TBB_acquire_binsem!=(uintptr_t)&tbb::internal::acquire_binsem_using_event, NULL ); |
137 | ASSERT( (uintptr_t)tbb::internal::__TBB_release_binsem!=(uintptr_t)&tbb::internal::release_binsem_using_event, NULL ); |
138 | } |
139 | #endif /* __TBB_USE_SRWLOCK */ |
140 | } |
141 | #endif /* _WIN32||_WIN64 */ |
142 | |
143 | #define N_TIMES 1000 |
144 | |
145 | template<typename S> |
146 | struct Counter { |
147 | volatile long value; |
148 | S my_sem; |
149 | Counter() : value(0) {} |
150 | }; |
151 | |
152 | //! Function object for use with parallel_for.h. |
153 | template<typename C> |
154 | struct AddOne: NoAssign { |
155 | C& my_counter; |
156 | /** Increments counter once for each iteration in the iteration space. */ |
157 | void operator()( int /*tid*/ ) const { |
158 | for( size_t i=0; i<N_TIMES; ++i ) { |
159 | my_counter.my_sem.P(); |
160 | my_counter.value = my_counter.value + 1; |
161 | my_counter.my_sem.V(); |
162 | } |
163 | } |
164 | AddOne( C& c_ ) : my_counter(c_) { my_counter.my_sem.V(); } |
165 | }; |
166 | |
167 | void testBinarySemaphore( int nThreads ) { |
168 | REMARK("Testing binary semaphore\n" ); |
169 | Counter<tbb::internal::binary_semaphore> counter; |
170 | AddOne<Counter<tbb::internal::binary_semaphore> > myAddOne(counter); |
171 | NativeParallelFor( nThreads, myAddOne ); |
172 | ASSERT( nThreads*N_TIMES==counter.value, "Binary semaphore operations P()/V() have a race" ); |
173 | } |
174 | |
175 | // Power of 2, the most tokens that can be in flight. |
176 | #define MAX_TOKENS 32 |
177 | enum FilterType { imaProducer, imaConsumer }; |
178 | class FilterBase : NoAssign { |
179 | protected: |
180 | FilterType ima; |
181 | unsigned totTokens; // total number of tokens to be emitted, only used by producer |
182 | tbb::atomic<unsigned>& myTokens; |
183 | tbb::atomic<unsigned>& otherTokens; |
184 | unsigned myWait; |
185 | semaphore &mySem; |
186 | semaphore &nextSem; |
187 | unsigned* myBuffer; |
188 | unsigned* nextBuffer; |
189 | unsigned curToken; |
190 | public: |
191 | FilterBase( FilterType ima_ |
192 | ,unsigned totTokens_ |
193 | ,tbb::atomic<unsigned>& myTokens_ |
194 | ,tbb::atomic<unsigned>& otherTokens_ |
195 | ,unsigned myWait_ |
196 | ,semaphore &mySem_ |
197 | ,semaphore &nextSem_ |
198 | ,unsigned* myBuffer_ |
199 | ,unsigned* nextBuffer_ |
200 | ) |
201 | : ima(ima_),totTokens(totTokens_),myTokens(myTokens_),otherTokens(otherTokens_),myWait(myWait_),mySem(mySem_), |
202 | nextSem(nextSem_),myBuffer(myBuffer_),nextBuffer(nextBuffer_) |
203 | { |
204 | curToken = 0; |
205 | } |
206 | void Produce(const int tid); |
207 | void Consume(const int tid); |
208 | void operator()(const int tid) { if(ima == imaConsumer) Consume(tid); else Produce(tid); } |
209 | }; |
210 | |
211 | class ProduceConsumeBody { |
212 | FilterBase** myFilters; |
213 | public: |
214 | ProduceConsumeBody(FilterBase** myFilters_) : myFilters(myFilters_) {} |
215 | void operator()(const int tid) const { |
216 | myFilters[tid]->operator()(tid); |
217 | } |
218 | }; |
219 | |
220 | // send a bunch of non-Null "tokens" to consumer, then a NULL. |
221 | void FilterBase::Produce(const int /*tid*/) { |
222 | nextBuffer[0] = 0; // just in case we provide no tokens |
223 | sBarrier.wait(); |
224 | while(totTokens) { |
225 | while(!myTokens) |
226 | mySem.P(); |
227 | // we have a slot available. |
228 | --myTokens; // moving this down reduces spurious wakeups |
229 | --totTokens; |
230 | if(totTokens) |
231 | nextBuffer[curToken&(MAX_TOKENS-1)] = curToken*3+1; |
232 | else |
233 | nextBuffer[curToken&(MAX_TOKENS-1)] = 0; |
234 | ++curToken; |
235 | Harness::Sleep(myWait); |
236 | unsigned temp = ++otherTokens; |
237 | if(temp == 1) |
238 | nextSem.V(); |
239 | } |
240 | nextSem.V(); // final wakeup |
241 | } |
242 | |
243 | void FilterBase::Consume(const int /*tid*/) { |
244 | unsigned myToken; |
245 | sBarrier.wait(); |
246 | do { |
247 | while(!myTokens) |
248 | mySem.P(); |
249 | // we have a slot available. |
250 | --myTokens; // moving this down reduces spurious wakeups |
251 | myToken = myBuffer[curToken&(MAX_TOKENS-1)]; |
252 | if(myToken) { |
253 | ASSERT(myToken == curToken*3+1, "Error in received token" ); |
254 | ++curToken; |
255 | Harness::Sleep(myWait); |
256 | unsigned temp = ++otherTokens; |
257 | if(temp == 1) |
258 | nextSem.V(); |
259 | } |
260 | } while(myToken); |
261 | // end of processing |
262 | ASSERT(curToken + 1 == totTokens, "Didn't receive enough tokens" ); |
263 | } |
264 | |
265 | // -- test of producer/consumer with atomic buffer cnt and semaphore |
266 | // nTokens are total number of tokens through the pipe |
267 | // pWait is the wait time for the producer |
268 | // cWait is the wait time for the consumer |
269 | void testProducerConsumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait) { |
270 | semaphore pSem; |
271 | semaphore cSem; |
272 | tbb::atomic<unsigned> pTokens; |
273 | tbb::atomic<unsigned> cTokens; |
274 | cTokens = 0; |
275 | unsigned cBuffer[MAX_TOKENS]; |
276 | FilterBase* myFilters[2]; // one producer, one consumer |
277 | REMARK("Testing producer/consumer with %lu total tokens, %lu tokens at a time, producer wait(%lu), consumer wait (%lu)\n" , totTokens, nTokens, pWait, cWait); |
278 | ASSERT(nTokens <= MAX_TOKENS, "Not enough slots for tokens" ); |
279 | myFilters[0] = new FilterBase(imaProducer, totTokens, pTokens, cTokens, pWait, cSem, pSem, (unsigned *)NULL, &(cBuffer[0])); |
280 | myFilters[1] = new FilterBase(imaConsumer, totTokens, cTokens, pTokens, cWait, pSem, cSem, cBuffer, (unsigned *)NULL); |
281 | pTokens = nTokens; |
282 | ProduceConsumeBody myBody(myFilters); |
283 | sBarrier.initialize(2); |
284 | NativeParallelFor(2, myBody); |
285 | delete myFilters[0]; |
286 | delete myFilters[1]; |
287 | } |
288 | |
289 | int TestMain() { |
290 | REMARK("Started\n" ); |
291 | #if _WIN32||_WIN64 |
292 | testOSVersion(); |
293 | #endif |
294 | if(MaxThread > 0) { |
295 | testBinarySemaphore( MaxThread ); |
296 | for(int semSize = 1; semSize <= MaxThread; ++semSize) { |
297 | for(int exThreads = 0; exThreads <= MaxThread - semSize; ++exThreads) { |
298 | testSemaphore( semSize, exThreads ); |
299 | } |
300 | } |
301 | } |
302 | // Test producer/consumer with varying execution times and buffer sizes |
303 | // ( total tokens, tokens in buffer, sleep for producer, sleep for consumer ) |
304 | testProducerConsumer( 10, 2, 5, 5 ); |
305 | testProducerConsumer( 10, 2, 20, 5 ); |
306 | testProducerConsumer( 10, 2, 5, 20 ); |
307 | testProducerConsumer( 10, 1, 5, 5 ); |
308 | testProducerConsumer( 20, 10, 5, 20 ); |
309 | testProducerConsumer( 64, 32, 1, 20 ); |
310 | return Harness::Done; |
311 | } |
312 | |