| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "old/concurrent_queue_v2.h" |
| 18 | #include "tbb/atomic.h" |
| 19 | #include "tbb/tick_count.h" |
| 20 | |
| 21 | #include "../test/harness_assert.h" |
| 22 | #include "../test/harness.h" |
| 23 | |
| 24 | static tbb::atomic<long> FooConstructed; |
| 25 | static tbb::atomic<long> FooDestroyed; |
| 26 | |
| 27 | enum state_t{ |
| 28 | LIVE=0x1234, |
| 29 | DEAD=0xDEAD |
| 30 | }; |
| 31 | |
| 32 | class Foo { |
| 33 | state_t state; |
| 34 | public: |
| 35 | int thread_id; |
| 36 | int serial; |
| 37 | Foo() : state(LIVE) { |
| 38 | ++FooConstructed; |
| 39 | } |
| 40 | Foo( const Foo& item ) : state(LIVE) { |
| 41 | ASSERT( item.state==LIVE, NULL ); |
| 42 | ++FooConstructed; |
| 43 | thread_id = item.thread_id; |
| 44 | serial = item.serial; |
| 45 | } |
| 46 | ~Foo() { |
| 47 | ASSERT( state==LIVE, NULL ); |
| 48 | ++FooDestroyed; |
| 49 | state=DEAD; |
| 50 | thread_id=DEAD; |
| 51 | serial=DEAD; |
| 52 | } |
| 53 | void operator=( Foo& item ) { |
| 54 | ASSERT( item.state==LIVE, NULL ); |
| 55 | ASSERT( state==LIVE, NULL ); |
| 56 | thread_id = item.thread_id; |
| 57 | serial = item.serial; |
| 58 | } |
| 59 | bool is_const() {return false;} |
| 60 | bool is_const() const {return true;} |
| 61 | }; |
| 62 | |
| 63 | const size_t MAXTHREAD = 256; |
| 64 | |
| 65 | static int Sum[MAXTHREAD]; |
| 66 | |
| 67 | //! Count of various pop operations |
| 68 | /** [0] = pop_if_present that failed |
| 69 | [1] = pop_if_present that succeeded |
| 70 | [2] = pop */ |
| 71 | static tbb::atomic<long> PopKind[3]; |
| 72 | |
| 73 | const int M = 10000; |
| 74 | |
| 75 | struct Body: NoAssign { |
| 76 | tbb::concurrent_queue<Foo>* queue; |
| 77 | const int nthread; |
| 78 | Body( int nthread_ ) : nthread(nthread_) {} |
| 79 | void operator()( long thread_id ) const { |
| 80 | long pop_kind[3] = {0,0,0}; |
| 81 | int serial[MAXTHREAD+1]; |
| 82 | memset( serial, 0, nthread*sizeof(unsigned) ); |
| 83 | ASSERT( thread_id<nthread, NULL ); |
| 84 | |
| 85 | long sum = 0; |
| 86 | for( long j=0; j<M; ++j ) { |
| 87 | Foo f; |
| 88 | f.thread_id = DEAD; |
| 89 | f.serial = DEAD; |
| 90 | bool prepopped = false; |
| 91 | if( j&1 ) { |
| 92 | prepopped = queue->pop_if_present(f); |
| 93 | ++pop_kind[prepopped]; |
| 94 | } |
| 95 | Foo g; |
| 96 | g.thread_id = thread_id; |
| 97 | g.serial = j+1; |
| 98 | queue->push( g ); |
| 99 | if( !prepopped ) { |
| 100 | queue->pop(f); |
| 101 | ++pop_kind[2]; |
| 102 | } |
| 103 | ASSERT( f.thread_id<=nthread, NULL ); |
| 104 | ASSERT( f.thread_id==nthread || serial[f.thread_id]<f.serial, "partial order violation" ); |
| 105 | serial[f.thread_id] = f.serial; |
| 106 | sum += f.serial-1; |
| 107 | } |
| 108 | Sum[thread_id] = sum; |
| 109 | for( int k=0; k<3; ++k ) |
| 110 | PopKind[k] += pop_kind[k]; |
| 111 | } |
| 112 | }; |
| 113 | |
| 114 | void TestPushPop( int prefill, ptrdiff_t capacity, int nthread ) { |
| 115 | ASSERT( nthread>0, "nthread must be positive" ); |
| 116 | if( prefill+1>=capacity ) |
| 117 | return; |
| 118 | bool success = false; |
| 119 | for( int k=0; k<3; ++k ) |
| 120 | PopKind[k] = 0; |
| 121 | for( int trial=0; !success; ++trial ) { |
| 122 | FooConstructed = 0; |
| 123 | FooDestroyed = 0; |
| 124 | Body body(nthread); |
| 125 | tbb::concurrent_queue<Foo> queue; |
| 126 | queue.set_capacity( capacity ); |
| 127 | body.queue = &queue; |
| 128 | for( int i=0; i<prefill; ++i ) { |
| 129 | Foo f; |
| 130 | f.thread_id = nthread; |
| 131 | f.serial = 1+i; |
| 132 | queue.push(f); |
| 133 | ASSERT( queue.size()==i+1, NULL ); |
| 134 | ASSERT( !queue.empty(), NULL ); |
| 135 | } |
| 136 | tbb::tick_count t0 = tbb::tick_count::now(); |
| 137 | NativeParallelFor( nthread, body ); |
| 138 | tbb::tick_count t1 = tbb::tick_count::now(); |
| 139 | double timing = (t1-t0).seconds(); |
| 140 | if( Verbose ) |
| 141 | printf("prefill=%d capacity=%d time = %g = %g nsec/operation\n" , prefill, int(capacity), timing, timing/(2*M*nthread)*1.E9); |
| 142 | int sum = 0; |
| 143 | for( int k=0; k<nthread; ++k ) |
| 144 | sum += Sum[k]; |
| 145 | int expected = nthread*((M-1)*M/2) + ((prefill-1)*prefill)/2; |
| 146 | for( int i=prefill; --i>=0; ) { |
| 147 | ASSERT( !queue.empty(), NULL ); |
| 148 | Foo f; |
| 149 | queue.pop(f); |
| 150 | ASSERT( queue.size()==i, NULL ); |
| 151 | sum += f.serial-1; |
| 152 | } |
| 153 | ASSERT( queue.empty(), NULL ); |
| 154 | ASSERT( queue.size()==0, NULL ); |
| 155 | if( sum!=expected ) |
| 156 | printf("sum=%d expected=%d\n" ,sum,expected); |
| 157 | ASSERT( FooConstructed==FooDestroyed, NULL ); |
| 158 | |
| 159 | success = true; |
| 160 | if( nthread>1 && prefill==0 ) { |
| 161 | // Check that pop_if_present got sufficient exercise |
| 162 | for( int k=0; k<2; ++k ) { |
| 163 | #if (_WIN32||_WIN64) |
| 164 | // The TBB library on Windows seems to have a tough time generating |
| 165 | // the desired interleavings for pop_if_present, so the code tries longer, and settles |
| 166 | // for fewer desired interleavings. |
| 167 | const int max_trial = 100; |
| 168 | const int min_requirement = 20; |
| 169 | #else |
| 170 | const int min_requirement = 100; |
| 171 | const int max_trial = 20; |
| 172 | #endif /* _WIN32||_WIN64 */ |
| 173 | if( PopKind[k]<min_requirement ) { |
| 174 | if( trial>=max_trial ) { |
| 175 | if( Verbose ) |
| 176 | printf("Warning: %d threads had only %ld pop_if_present operations %s after %d trials (expected at least %d). " |
| 177 | "This problem may merely be unlucky scheduling. " |
| 178 | "Investigate only if it happens repeatedly.\n" , |
| 179 | nthread, long(PopKind[k]), k==0?"failed" :"succeeded" , max_trial, min_requirement); |
| 180 | else |
| 181 | printf("Warning: the number of %s pop_if_present operations is less than expected for %d threads. Investigate if it happens repeatedly.\n" , |
| 182 | k==0?"failed" :"succeeded" , nthread ); |
| 183 | } else { |
| 184 | success = false; |
| 185 | } |
| 186 | } |
| 187 | } |
| 188 | } |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | template<typename Iterator1, typename Iterator2> |
| 193 | void TestIteratorAux( Iterator1 i, Iterator2 j, int size ) { |
| 194 | Iterator1 old_i; // assigned at first iteration below |
| 195 | for( int k=0; k<size; ++k ) { |
| 196 | ASSERT( i!=j, NULL ); |
| 197 | ASSERT( !(i==j), NULL ); |
| 198 | // Test "->" |
| 199 | ASSERT( k+1==i->serial, NULL ); |
| 200 | if( k&1 ) { |
| 201 | // Test post-increment |
| 202 | Foo f = *old_i++; |
| 203 | ASSERT( k+1==f.serial, NULL ); |
| 204 | // Test assignment |
| 205 | i = old_i; |
| 206 | } else { |
| 207 | // Test pre-increment |
| 208 | if( k<size-1 ) { |
| 209 | Foo f = *++i; |
| 210 | ASSERT( k+2==f.serial, NULL ); |
| 211 | } else ++i; |
| 212 | // Test assignment |
| 213 | old_i = i; |
| 214 | } |
| 215 | } |
| 216 | ASSERT( !(i!=j), NULL ); |
| 217 | ASSERT( i==j, NULL ); |
| 218 | } |
| 219 | |
| 220 | template<typename Iterator1, typename Iterator2> |
| 221 | void TestIteratorAssignment( Iterator2 j ) { |
| 222 | Iterator1 i(j); |
| 223 | ASSERT( i==j, NULL ); |
| 224 | ASSERT( !(i!=j), NULL ); |
| 225 | Iterator1 k; |
| 226 | k = j; |
| 227 | ASSERT( k==j, NULL ); |
| 228 | ASSERT( !(k!=j), NULL ); |
| 229 | } |
| 230 | |
| 231 | //! Test the iterators for concurrent_queue |
| 232 | void TestIterator() { |
| 233 | tbb::concurrent_queue<Foo> queue; |
| 234 | tbb::concurrent_queue<Foo>& const_queue = queue; |
| 235 | for( int j=0; j<500; ++j ) { |
| 236 | TestIteratorAux( queue.begin(), queue.end(), j ); |
| 237 | TestIteratorAux( const_queue.begin(), const_queue.end(), j ); |
| 238 | TestIteratorAux( const_queue.begin(), queue.end(), j ); |
| 239 | TestIteratorAux( queue.begin(), const_queue.end(), j ); |
| 240 | Foo f; |
| 241 | f.serial = j+1; |
| 242 | queue.push(f); |
| 243 | } |
| 244 | TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( const_queue.begin() ); |
| 245 | TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( queue.begin() ); |
| 246 | TestIteratorAssignment<tbb::concurrent_queue<Foo>:: iterator>( queue.begin() ); |
| 247 | } |
| 248 | |
| 249 | void TestConcurrentQueueType() { |
| 250 | AssertSameType( tbb::concurrent_queue<Foo>::value_type(), Foo() ); |
| 251 | Foo f; |
| 252 | const Foo g; |
| 253 | tbb::concurrent_queue<Foo>::reference r = f; |
| 254 | ASSERT( &r==&f, NULL ); |
| 255 | ASSERT( !r.is_const(), NULL ); |
| 256 | tbb::concurrent_queue<Foo>::const_reference cr = g; |
| 257 | ASSERT( &cr==&g, NULL ); |
| 258 | ASSERT( cr.is_const(), NULL ); |
| 259 | } |
| 260 | |
| 261 | template<typename T> |
| 262 | void TestEmptyQueue() { |
| 263 | const tbb::concurrent_queue<T> queue; |
| 264 | ASSERT( queue.size()==0, NULL ); |
| 265 | ASSERT( queue.capacity()>0, NULL ); |
| 266 | ASSERT( size_t(queue.capacity())>=size_t(-1)/(sizeof(void*)+sizeof(T)), NULL ); |
| 267 | } |
| 268 | |
| 269 | void TestFullQueue() { |
| 270 | for( int n=0; n<10; ++n ) { |
| 271 | FooConstructed = 0; |
| 272 | FooDestroyed = 0; |
| 273 | tbb::concurrent_queue<Foo> queue; |
| 274 | queue.set_capacity(n); |
| 275 | for( int i=0; i<=n; ++i ) { |
| 276 | Foo f; |
| 277 | f.serial = i; |
| 278 | bool result = queue.push_if_not_full( f ); |
| 279 | ASSERT( result==(i<n), NULL ); |
| 280 | } |
| 281 | for( int i=0; i<=n; ++i ) { |
| 282 | Foo f; |
| 283 | bool result = queue.pop_if_present( f ); |
| 284 | ASSERT( result==(i<n), NULL ); |
| 285 | ASSERT( !result || f.serial==i, NULL ); |
| 286 | } |
| 287 | ASSERT( FooConstructed==FooDestroyed, NULL ); |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | template<typename T> |
| 292 | struct TestNegativeQueueBody: NoAssign { |
| 293 | tbb::concurrent_queue<T>& queue; |
| 294 | const int nthread; |
| 295 | TestNegativeQueueBody( tbb::concurrent_queue<T>& q, int n ) : queue(q), nthread(n) {} |
| 296 | void operator()( int k ) const { |
| 297 | if( k==0 ) { |
| 298 | int number_of_pops = nthread-1; |
| 299 | // Wait for all pops to pend. |
| 300 | while( queue.size()>-number_of_pops ) { |
| 301 | __TBB_Yield(); |
| 302 | } |
| 303 | for( int i=0; ; ++i ) { |
| 304 | ASSERT( queue.size()==i-number_of_pops, NULL ); |
| 305 | ASSERT( queue.empty()==(queue.size()<=0), NULL ); |
| 306 | if( i==number_of_pops ) break; |
| 307 | // Satisfy another pop |
| 308 | queue.push( T() ); |
| 309 | } |
| 310 | } else { |
| 311 | // Pop item from queue |
| 312 | T item; |
| 313 | queue.pop(item); |
| 314 | } |
| 315 | } |
| 316 | }; |
| 317 | |
| 318 | //! Test a queue with a negative size. |
| 319 | template<typename T> |
| 320 | void TestNegativeQueue( int nthread ) { |
| 321 | tbb::concurrent_queue<T> queue; |
| 322 | NativeParallelFor( nthread, TestNegativeQueueBody<T>(queue,nthread) ); |
| 323 | } |
| 324 | |
| 325 | int TestMain () { |
| 326 | TestEmptyQueue<char>(); |
| 327 | TestEmptyQueue<Foo>(); |
| 328 | TestFullQueue(); |
| 329 | TestConcurrentQueueType(); |
| 330 | TestIterator(); |
| 331 | |
| 332 | // Test concurrent operations |
| 333 | for( int nthread=MinThread; nthread<=MaxThread; ++nthread ) { |
| 334 | TestNegativeQueue<Foo>(nthread); |
| 335 | for( int prefill=0; prefill<64; prefill+=(1+prefill/3) ) { |
| 336 | TestPushPop(prefill,ptrdiff_t(-1),nthread); |
| 337 | TestPushPop(prefill,ptrdiff_t(1),nthread); |
| 338 | TestPushPop(prefill,ptrdiff_t(2),nthread); |
| 339 | TestPushPop(prefill,ptrdiff_t(10),nthread); |
| 340 | TestPushPop(prefill,ptrdiff_t(100),nthread); |
| 341 | } |
| 342 | } |
| 343 | return Harness::Done; |
| 344 | } |
| 345 | |