| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | // |
| 18 | // Test for counting semaphore. |
| 19 | // |
| 20 | // set semaphore to N |
| 21 | // create N + M threads |
| 22 | // have each thread |
| 23 | // A. P() |
| 24 | // B. increment atomic count |
| 25 | // C. spin for awhile checking the value of the count; make sure it doesn't exceed N |
| 26 | // D. decrement atomic count |
| 27 | // E. V() |
| 28 | // |
| 29 | |
| 30 | #include "../tbb/semaphore.h" |
| 31 | #include "tbb/atomic.h" |
| 32 | #include "tbb/blocked_range.h" |
| 33 | |
| 34 | #include <vector> |
| 35 | using std::vector; |
| 36 | |
| 37 | #include "harness_assert.h" |
| 38 | #include "harness.h" |
| 39 | |
| 40 | using tbb::internal::semaphore; |
| 41 | |
| 42 | #include "harness_barrier.h" |
| 43 | |
| 44 | tbb::atomic<int> pCount; |
| 45 | |
| 46 | Harness::SpinBarrier sBarrier; |
| 47 | |
| 48 | #include "tbb/tick_count.h" |
| 49 | // semaphore basic function: |
| 50 | // set semaphore to initial value |
| 51 | // see that semaphore only allows that number of threads to be active |
| 52 | class Body: NoAssign { |
| 53 | const int nIters; |
| 54 | tbb::internal::semaphore &mySem; |
| 55 | vector<int> &ourCounts; |
| 56 | vector<double> &tottime; |
| 57 | static const int tickCounts = 1; // millisecond |
| 58 | static const int innerWait = 5; // millisecond |
| 59 | public: |
| 60 | Body(int nThread_, int nIter_, semaphore &mySem_, |
| 61 | vector<int>& ourCounts_, |
| 62 | vector<double>& tottime_ |
| 63 | ) : nIters(nIter_), mySem(mySem_), ourCounts(ourCounts_), tottime(tottime_) { sBarrier.initialize(nThread_); pCount = 0; } |
| 64 | void operator()(const int tid) const { |
| 65 | sBarrier.wait(); |
| 66 | for(int i=0; i < nIters; ++i) { |
| 67 | Harness::Sleep( tid * tickCounts ); |
| 68 | tbb::tick_count t0 = tbb::tick_count::now(); |
| 69 | mySem.P(); |
| 70 | tbb::tick_count t1 = tbb::tick_count::now(); |
| 71 | tottime[tid] += (t1-t0).seconds(); |
| 72 | int curval = ++pCount; |
| 73 | if(curval > ourCounts[tid]) ourCounts[tid] = curval; |
| 74 | Harness::Sleep( innerWait ); |
| 75 | --pCount; |
| 76 | ASSERT((int)pCount >= 0, NULL); |
| 77 | mySem.V(); |
| 78 | } |
| 79 | } |
| 80 | }; |
| 81 | |
| 82 | |
| 83 | void testSemaphore( int semInitCnt, int ) { |
| 84 | semaphore my_sem(semInitCnt); |
| 85 | // tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred); |
| 86 | int nThreads = semInitCnt + extraThreads; |
| 87 | vector<int> maxVals(nThreads); |
| 88 | vector<double> totTimes(nThreads); |
| 89 | int nIters = 10; |
| 90 | Body myBody(nThreads, nIters, my_sem, maxVals, totTimes); |
| 91 | |
| 92 | REMARK( " sem(%d) with %d extra threads\n" , semInitCnt, extraThreads); |
| 93 | pCount = 0; |
| 94 | NativeParallelFor(nThreads, myBody); |
| 95 | if(extraThreads == 0) { |
| 96 | double allPWaits = 0; |
| 97 | for(vector<double>::const_iterator j = totTimes.begin(); j != totTimes.end(); ++j) { |
| 98 | allPWaits += *j; |
| 99 | } |
| 100 | allPWaits /= static_cast<double>(nThreads * nIters); |
| 101 | REMARK("Average wait for P() in uncontested case for nThreads = %d is %g\n" , nThreads, allPWaits); |
| 102 | } |
| 103 | ASSERT(!pCount, "not all threads decremented pCount" ); |
| 104 | int maxCount = -1; |
| 105 | for(vector<int>::const_iterator i=maxVals.begin(); i!= maxVals.end();++i) { |
| 106 | maxCount = max(maxCount,*i); |
| 107 | } |
| 108 | ASSERT(maxCount <= semInitCnt,"too many threads in semaphore-protected increment" ); |
| 109 | if(maxCount < semInitCnt) { |
| 110 | REMARK("Not enough threads in semaphore-protected region (%d < %d)\n" , static_cast<int>(maxCount), semInitCnt); |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | #include "../tbb/semaphore.cpp" |
| 115 | #if _WIN32||_WIN64 |
| 116 | #include "../tbb/dynamic_link.cpp" |
| 117 | |
| 118 | void testOSVersion() { |
| 119 | #if __TBB_USE_SRWLOCK |
| 120 | BOOL bIsWindowsVistaOrLater; |
| 121 | #if __TBB_WIN8UI_SUPPORT |
| 122 | bIsWindowsVistaOrLater = true; |
| 123 | #else |
| 124 | OSVERSIONINFO osvi; |
| 125 | |
| 126 | memset( (void*)&osvi, 0, sizeof(OSVERSIONINFO) ); |
| 127 | osvi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); |
| 128 | GetVersionEx(&osvi); |
| 129 | bIsWindowsVistaOrLater = (osvi.dwMajorVersion >= 6 ); |
| 130 | #endif |
| 131 | |
| 132 | if( bIsWindowsVistaOrLater ) { |
| 133 | REMARK("Checking SRWLock is loaded\n" ); |
| 134 | tbb::internal::binary_semaphore s; |
| 135 | ASSERT( (uintptr_t)tbb::internal::__TBB_init_binsem!=(uintptr_t)&tbb::internal::init_binsem_using_event, NULL ); |
| 136 | ASSERT( (uintptr_t)tbb::internal::__TBB_acquire_binsem!=(uintptr_t)&tbb::internal::acquire_binsem_using_event, NULL ); |
| 137 | ASSERT( (uintptr_t)tbb::internal::__TBB_release_binsem!=(uintptr_t)&tbb::internal::release_binsem_using_event, NULL ); |
| 138 | } |
| 139 | #endif /* __TBB_USE_SRWLOCK */ |
| 140 | } |
| 141 | #endif /* _WIN32||_WIN64 */ |
| 142 | |
| 143 | #define N_TIMES 1000 |
| 144 | |
| 145 | template<typename S> |
| 146 | struct Counter { |
| 147 | volatile long value; |
| 148 | S my_sem; |
| 149 | Counter() : value(0) {} |
| 150 | }; |
| 151 | |
| 152 | //! Function object for use with parallel_for.h. |
| 153 | template<typename C> |
| 154 | struct AddOne: NoAssign { |
| 155 | C& my_counter; |
| 156 | /** Increments counter once for each iteration in the iteration space. */ |
| 157 | void operator()( int /*tid*/ ) const { |
| 158 | for( size_t i=0; i<N_TIMES; ++i ) { |
| 159 | my_counter.my_sem.P(); |
| 160 | my_counter.value = my_counter.value + 1; |
| 161 | my_counter.my_sem.V(); |
| 162 | } |
| 163 | } |
| 164 | AddOne( C& c_ ) : my_counter(c_) { my_counter.my_sem.V(); } |
| 165 | }; |
| 166 | |
| 167 | void testBinarySemaphore( int nThreads ) { |
| 168 | REMARK("Testing binary semaphore\n" ); |
| 169 | Counter<tbb::internal::binary_semaphore> counter; |
| 170 | AddOne<Counter<tbb::internal::binary_semaphore> > myAddOne(counter); |
| 171 | NativeParallelFor( nThreads, myAddOne ); |
| 172 | ASSERT( nThreads*N_TIMES==counter.value, "Binary semaphore operations P()/V() have a race" ); |
| 173 | } |
| 174 | |
| 175 | // Power of 2, the most tokens that can be in flight. |
| 176 | #define MAX_TOKENS 32 |
| 177 | enum FilterType { imaProducer, imaConsumer }; |
| 178 | class FilterBase : NoAssign { |
| 179 | protected: |
| 180 | FilterType ima; |
| 181 | unsigned totTokens; // total number of tokens to be emitted, only used by producer |
| 182 | tbb::atomic<unsigned>& myTokens; |
| 183 | tbb::atomic<unsigned>& otherTokens; |
| 184 | unsigned myWait; |
| 185 | semaphore &mySem; |
| 186 | semaphore &nextSem; |
| 187 | unsigned* myBuffer; |
| 188 | unsigned* nextBuffer; |
| 189 | unsigned curToken; |
| 190 | public: |
| 191 | FilterBase( FilterType ima_ |
| 192 | ,unsigned totTokens_ |
| 193 | ,tbb::atomic<unsigned>& myTokens_ |
| 194 | ,tbb::atomic<unsigned>& otherTokens_ |
| 195 | ,unsigned myWait_ |
| 196 | ,semaphore &mySem_ |
| 197 | ,semaphore &nextSem_ |
| 198 | ,unsigned* myBuffer_ |
| 199 | ,unsigned* nextBuffer_ |
| 200 | ) |
| 201 | : ima(ima_),totTokens(totTokens_),myTokens(myTokens_),otherTokens(otherTokens_),myWait(myWait_),mySem(mySem_), |
| 202 | nextSem(nextSem_),myBuffer(myBuffer_),nextBuffer(nextBuffer_) |
| 203 | { |
| 204 | curToken = 0; |
| 205 | } |
| 206 | void Produce(const int tid); |
| 207 | void Consume(const int tid); |
| 208 | void operator()(const int tid) { if(ima == imaConsumer) Consume(tid); else Produce(tid); } |
| 209 | }; |
| 210 | |
| 211 | class ProduceConsumeBody { |
| 212 | FilterBase** myFilters; |
| 213 | public: |
| 214 | ProduceConsumeBody(FilterBase** myFilters_) : myFilters(myFilters_) {} |
| 215 | void operator()(const int tid) const { |
| 216 | myFilters[tid]->operator()(tid); |
| 217 | } |
| 218 | }; |
| 219 | |
| 220 | // send a bunch of non-Null "tokens" to consumer, then a NULL. |
| 221 | void FilterBase::Produce(const int /*tid*/) { |
| 222 | nextBuffer[0] = 0; // just in case we provide no tokens |
| 223 | sBarrier.wait(); |
| 224 | while(totTokens) { |
| 225 | while(!myTokens) |
| 226 | mySem.P(); |
| 227 | // we have a slot available. |
| 228 | --myTokens; // moving this down reduces spurious wakeups |
| 229 | --totTokens; |
| 230 | if(totTokens) |
| 231 | nextBuffer[curToken&(MAX_TOKENS-1)] = curToken*3+1; |
| 232 | else |
| 233 | nextBuffer[curToken&(MAX_TOKENS-1)] = 0; |
| 234 | ++curToken; |
| 235 | Harness::Sleep(myWait); |
| 236 | unsigned temp = ++otherTokens; |
| 237 | if(temp == 1) |
| 238 | nextSem.V(); |
| 239 | } |
| 240 | nextSem.V(); // final wakeup |
| 241 | } |
| 242 | |
| 243 | void FilterBase::Consume(const int /*tid*/) { |
| 244 | unsigned myToken; |
| 245 | sBarrier.wait(); |
| 246 | do { |
| 247 | while(!myTokens) |
| 248 | mySem.P(); |
| 249 | // we have a slot available. |
| 250 | --myTokens; // moving this down reduces spurious wakeups |
| 251 | myToken = myBuffer[curToken&(MAX_TOKENS-1)]; |
| 252 | if(myToken) { |
| 253 | ASSERT(myToken == curToken*3+1, "Error in received token" ); |
| 254 | ++curToken; |
| 255 | Harness::Sleep(myWait); |
| 256 | unsigned temp = ++otherTokens; |
| 257 | if(temp == 1) |
| 258 | nextSem.V(); |
| 259 | } |
| 260 | } while(myToken); |
| 261 | // end of processing |
| 262 | ASSERT(curToken + 1 == totTokens, "Didn't receive enough tokens" ); |
| 263 | } |
| 264 | |
| 265 | // -- test of producer/consumer with atomic buffer cnt and semaphore |
| 266 | // nTokens are total number of tokens through the pipe |
| 267 | // pWait is the wait time for the producer |
| 268 | // cWait is the wait time for the consumer |
| 269 | void testProducerConsumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait) { |
| 270 | semaphore pSem; |
| 271 | semaphore cSem; |
| 272 | tbb::atomic<unsigned> pTokens; |
| 273 | tbb::atomic<unsigned> cTokens; |
| 274 | cTokens = 0; |
| 275 | unsigned cBuffer[MAX_TOKENS]; |
| 276 | FilterBase* myFilters[2]; // one producer, one consumer |
| 277 | REMARK("Testing producer/consumer with %lu total tokens, %lu tokens at a time, producer wait(%lu), consumer wait (%lu)\n" , totTokens, nTokens, pWait, cWait); |
| 278 | ASSERT(nTokens <= MAX_TOKENS, "Not enough slots for tokens" ); |
| 279 | myFilters[0] = new FilterBase(imaProducer, totTokens, pTokens, cTokens, pWait, cSem, pSem, (unsigned *)NULL, &(cBuffer[0])); |
| 280 | myFilters[1] = new FilterBase(imaConsumer, totTokens, cTokens, pTokens, cWait, pSem, cSem, cBuffer, (unsigned *)NULL); |
| 281 | pTokens = nTokens; |
| 282 | ProduceConsumeBody myBody(myFilters); |
| 283 | sBarrier.initialize(2); |
| 284 | NativeParallelFor(2, myBody); |
| 285 | delete myFilters[0]; |
| 286 | delete myFilters[1]; |
| 287 | } |
| 288 | |
| 289 | int TestMain() { |
| 290 | REMARK("Started\n" ); |
| 291 | #if _WIN32||_WIN64 |
| 292 | testOSVersion(); |
| 293 | #endif |
| 294 | if(MaxThread > 0) { |
| 295 | testBinarySemaphore( MaxThread ); |
| 296 | for(int semSize = 1; semSize <= MaxThread; ++semSize) { |
| 297 | for(int exThreads = 0; exThreads <= MaxThread - semSize; ++exThreads) { |
| 298 | testSemaphore( semSize, exThreads ); |
| 299 | } |
| 300 | } |
| 301 | } |
| 302 | // Test producer/consumer with varying execution times and buffer sizes |
| 303 | // ( total tokens, tokens in buffer, sleep for producer, sleep for consumer ) |
| 304 | testProducerConsumer( 10, 2, 5, 5 ); |
| 305 | testProducerConsumer( 10, 2, 20, 5 ); |
| 306 | testProducerConsumer( 10, 2, 5, 20 ); |
| 307 | testProducerConsumer( 10, 1, 5, 5 ); |
| 308 | testProducerConsumer( 20, 10, 5, 20 ); |
| 309 | testProducerConsumer( 64, 32, 1, 20 ); |
| 310 | return Harness::Done; |
| 311 | } |
| 312 | |