1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "old/concurrent_queue_v2.h"
18#include "tbb/atomic.h"
19#include "tbb/tick_count.h"
20
21#include "../test/harness_assert.h"
22#include "../test/harness.h"
23
24static tbb::atomic<long> FooConstructed;
25static tbb::atomic<long> FooDestroyed;
26
27enum state_t{
28 LIVE=0x1234,
29 DEAD=0xDEAD
30};
31
32class Foo {
33 state_t state;
34public:
35 int thread_id;
36 int serial;
37 Foo() : state(LIVE) {
38 ++FooConstructed;
39 }
40 Foo( const Foo& item ) : state(LIVE) {
41 ASSERT( item.state==LIVE, NULL );
42 ++FooConstructed;
43 thread_id = item.thread_id;
44 serial = item.serial;
45 }
46 ~Foo() {
47 ASSERT( state==LIVE, NULL );
48 ++FooDestroyed;
49 state=DEAD;
50 thread_id=DEAD;
51 serial=DEAD;
52 }
53 void operator=( Foo& item ) {
54 ASSERT( item.state==LIVE, NULL );
55 ASSERT( state==LIVE, NULL );
56 thread_id = item.thread_id;
57 serial = item.serial;
58 }
59 bool is_const() {return false;}
60 bool is_const() const {return true;}
61};
62
63const size_t MAXTHREAD = 256;
64
65static int Sum[MAXTHREAD];
66
67//! Count of various pop operations
68/** [0] = pop_if_present that failed
69 [1] = pop_if_present that succeeded
70 [2] = pop */
71static tbb::atomic<long> PopKind[3];
72
73const int M = 10000;
74
75struct Body: NoAssign {
76 tbb::concurrent_queue<Foo>* queue;
77 const int nthread;
78 Body( int nthread_ ) : nthread(nthread_) {}
79 void operator()( long thread_id ) const {
80 long pop_kind[3] = {0,0,0};
81 int serial[MAXTHREAD+1];
82 memset( serial, 0, nthread*sizeof(unsigned) );
83 ASSERT( thread_id<nthread, NULL );
84
85 long sum = 0;
86 for( long j=0; j<M; ++j ) {
87 Foo f;
88 f.thread_id = DEAD;
89 f.serial = DEAD;
90 bool prepopped = false;
91 if( j&1 ) {
92 prepopped = queue->pop_if_present(f);
93 ++pop_kind[prepopped];
94 }
95 Foo g;
96 g.thread_id = thread_id;
97 g.serial = j+1;
98 queue->push( g );
99 if( !prepopped ) {
100 queue->pop(f);
101 ++pop_kind[2];
102 }
103 ASSERT( f.thread_id<=nthread, NULL );
104 ASSERT( f.thread_id==nthread || serial[f.thread_id]<f.serial, "partial order violation" );
105 serial[f.thread_id] = f.serial;
106 sum += f.serial-1;
107 }
108 Sum[thread_id] = sum;
109 for( int k=0; k<3; ++k )
110 PopKind[k] += pop_kind[k];
111 }
112};
113
114void TestPushPop( int prefill, ptrdiff_t capacity, int nthread ) {
115 ASSERT( nthread>0, "nthread must be positive" );
116 if( prefill+1>=capacity )
117 return;
118 bool success = false;
119 for( int k=0; k<3; ++k )
120 PopKind[k] = 0;
121 for( int trial=0; !success; ++trial ) {
122 FooConstructed = 0;
123 FooDestroyed = 0;
124 Body body(nthread);
125 tbb::concurrent_queue<Foo> queue;
126 queue.set_capacity( capacity );
127 body.queue = &queue;
128 for( int i=0; i<prefill; ++i ) {
129 Foo f;
130 f.thread_id = nthread;
131 f.serial = 1+i;
132 queue.push(f);
133 ASSERT( queue.size()==i+1, NULL );
134 ASSERT( !queue.empty(), NULL );
135 }
136 tbb::tick_count t0 = tbb::tick_count::now();
137 NativeParallelFor( nthread, body );
138 tbb::tick_count t1 = tbb::tick_count::now();
139 double timing = (t1-t0).seconds();
140 if( Verbose )
141 printf("prefill=%d capacity=%d time = %g = %g nsec/operation\n", prefill, int(capacity), timing, timing/(2*M*nthread)*1.E9);
142 int sum = 0;
143 for( int k=0; k<nthread; ++k )
144 sum += Sum[k];
145 int expected = nthread*((M-1)*M/2) + ((prefill-1)*prefill)/2;
146 for( int i=prefill; --i>=0; ) {
147 ASSERT( !queue.empty(), NULL );
148 Foo f;
149 queue.pop(f);
150 ASSERT( queue.size()==i, NULL );
151 sum += f.serial-1;
152 }
153 ASSERT( queue.empty(), NULL );
154 ASSERT( queue.size()==0, NULL );
155 if( sum!=expected )
156 printf("sum=%d expected=%d\n",sum,expected);
157 ASSERT( FooConstructed==FooDestroyed, NULL );
158
159 success = true;
160 if( nthread>1 && prefill==0 ) {
161 // Check that pop_if_present got sufficient exercise
162 for( int k=0; k<2; ++k ) {
163#if (_WIN32||_WIN64)
164 // The TBB library on Windows seems to have a tough time generating
165 // the desired interleavings for pop_if_present, so the code tries longer, and settles
166 // for fewer desired interleavings.
167 const int max_trial = 100;
168 const int min_requirement = 20;
169#else
170 const int min_requirement = 100;
171 const int max_trial = 20;
172#endif /* _WIN32||_WIN64 */
173 if( PopKind[k]<min_requirement ) {
174 if( trial>=max_trial ) {
175 if( Verbose )
176 printf("Warning: %d threads had only %ld pop_if_present operations %s after %d trials (expected at least %d). "
177 "This problem may merely be unlucky scheduling. "
178 "Investigate only if it happens repeatedly.\n",
179 nthread, long(PopKind[k]), k==0?"failed":"succeeded", max_trial, min_requirement);
180 else
181 printf("Warning: the number of %s pop_if_present operations is less than expected for %d threads. Investigate if it happens repeatedly.\n",
182 k==0?"failed":"succeeded", nthread );
183 } else {
184 success = false;
185 }
186 }
187 }
188 }
189 }
190}
191
192template<typename Iterator1, typename Iterator2>
193void TestIteratorAux( Iterator1 i, Iterator2 j, int size ) {
194 Iterator1 old_i; // assigned at first iteration below
195 for( int k=0; k<size; ++k ) {
196 ASSERT( i!=j, NULL );
197 ASSERT( !(i==j), NULL );
198 // Test "->"
199 ASSERT( k+1==i->serial, NULL );
200 if( k&1 ) {
201 // Test post-increment
202 Foo f = *old_i++;
203 ASSERT( k+1==f.serial, NULL );
204 // Test assignment
205 i = old_i;
206 } else {
207 // Test pre-increment
208 if( k<size-1 ) {
209 Foo f = *++i;
210 ASSERT( k+2==f.serial, NULL );
211 } else ++i;
212 // Test assignment
213 old_i = i;
214 }
215 }
216 ASSERT( !(i!=j), NULL );
217 ASSERT( i==j, NULL );
218}
219
220template<typename Iterator1, typename Iterator2>
221void TestIteratorAssignment( Iterator2 j ) {
222 Iterator1 i(j);
223 ASSERT( i==j, NULL );
224 ASSERT( !(i!=j), NULL );
225 Iterator1 k;
226 k = j;
227 ASSERT( k==j, NULL );
228 ASSERT( !(k!=j), NULL );
229}
230
231//! Test the iterators for concurrent_queue
232void TestIterator() {
233 tbb::concurrent_queue<Foo> queue;
234 tbb::concurrent_queue<Foo>& const_queue = queue;
235 for( int j=0; j<500; ++j ) {
236 TestIteratorAux( queue.begin(), queue.end(), j );
237 TestIteratorAux( const_queue.begin(), const_queue.end(), j );
238 TestIteratorAux( const_queue.begin(), queue.end(), j );
239 TestIteratorAux( queue.begin(), const_queue.end(), j );
240 Foo f;
241 f.serial = j+1;
242 queue.push(f);
243 }
244 TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( const_queue.begin() );
245 TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( queue.begin() );
246 TestIteratorAssignment<tbb::concurrent_queue<Foo>:: iterator>( queue.begin() );
247}
248
249void TestConcurrentQueueType() {
250 AssertSameType( tbb::concurrent_queue<Foo>::value_type(), Foo() );
251 Foo f;
252 const Foo g;
253 tbb::concurrent_queue<Foo>::reference r = f;
254 ASSERT( &r==&f, NULL );
255 ASSERT( !r.is_const(), NULL );
256 tbb::concurrent_queue<Foo>::const_reference cr = g;
257 ASSERT( &cr==&g, NULL );
258 ASSERT( cr.is_const(), NULL );
259}
260
261template<typename T>
262void TestEmptyQueue() {
263 const tbb::concurrent_queue<T> queue;
264 ASSERT( queue.size()==0, NULL );
265 ASSERT( queue.capacity()>0, NULL );
266 ASSERT( size_t(queue.capacity())>=size_t(-1)/(sizeof(void*)+sizeof(T)), NULL );
267}
268
269void TestFullQueue() {
270 for( int n=0; n<10; ++n ) {
271 FooConstructed = 0;
272 FooDestroyed = 0;
273 tbb::concurrent_queue<Foo> queue;
274 queue.set_capacity(n);
275 for( int i=0; i<=n; ++i ) {
276 Foo f;
277 f.serial = i;
278 bool result = queue.push_if_not_full( f );
279 ASSERT( result==(i<n), NULL );
280 }
281 for( int i=0; i<=n; ++i ) {
282 Foo f;
283 bool result = queue.pop_if_present( f );
284 ASSERT( result==(i<n), NULL );
285 ASSERT( !result || f.serial==i, NULL );
286 }
287 ASSERT( FooConstructed==FooDestroyed, NULL );
288 }
289}
290
291template<typename T>
292struct TestNegativeQueueBody: NoAssign {
293 tbb::concurrent_queue<T>& queue;
294 const int nthread;
295 TestNegativeQueueBody( tbb::concurrent_queue<T>& q, int n ) : queue(q), nthread(n) {}
296 void operator()( int k ) const {
297 if( k==0 ) {
298 int number_of_pops = nthread-1;
299 // Wait for all pops to pend.
300 while( queue.size()>-number_of_pops ) {
301 __TBB_Yield();
302 }
303 for( int i=0; ; ++i ) {
304 ASSERT( queue.size()==i-number_of_pops, NULL );
305 ASSERT( queue.empty()==(queue.size()<=0), NULL );
306 if( i==number_of_pops ) break;
307 // Satisfy another pop
308 queue.push( T() );
309 }
310 } else {
311 // Pop item from queue
312 T item;
313 queue.pop(item);
314 }
315 }
316};
317
318//! Test a queue with a negative size.
319template<typename T>
320void TestNegativeQueue( int nthread ) {
321 tbb::concurrent_queue<T> queue;
322 NativeParallelFor( nthread, TestNegativeQueueBody<T>(queue,nthread) );
323}
324
325int TestMain () {
326 TestEmptyQueue<char>();
327 TestEmptyQueue<Foo>();
328 TestFullQueue();
329 TestConcurrentQueueType();
330 TestIterator();
331
332 // Test concurrent operations
333 for( int nthread=MinThread; nthread<=MaxThread; ++nthread ) {
334 TestNegativeQueue<Foo>(nthread);
335 for( int prefill=0; prefill<64; prefill+=(1+prefill/3) ) {
336 TestPushPop(prefill,ptrdiff_t(-1),nthread);
337 TestPushPop(prefill,ptrdiff_t(1),nthread);
338 TestPushPop(prefill,ptrdiff_t(2),nthread);
339 TestPushPop(prefill,ptrdiff_t(10),nthread);
340 TestPushPop(prefill,ptrdiff_t(100),nthread);
341 }
342 }
343 return Harness::Done;
344}
345