1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "old/concurrent_queue_v2.h" |
18 | #include "tbb/atomic.h" |
19 | #include "tbb/tick_count.h" |
20 | |
21 | #include "../test/harness_assert.h" |
22 | #include "../test/harness.h" |
23 | |
24 | static tbb::atomic<long> FooConstructed; |
25 | static tbb::atomic<long> FooDestroyed; |
26 | |
27 | enum state_t{ |
28 | LIVE=0x1234, |
29 | DEAD=0xDEAD |
30 | }; |
31 | |
32 | class Foo { |
33 | state_t state; |
34 | public: |
35 | int thread_id; |
36 | int serial; |
37 | Foo() : state(LIVE) { |
38 | ++FooConstructed; |
39 | } |
40 | Foo( const Foo& item ) : state(LIVE) { |
41 | ASSERT( item.state==LIVE, NULL ); |
42 | ++FooConstructed; |
43 | thread_id = item.thread_id; |
44 | serial = item.serial; |
45 | } |
46 | ~Foo() { |
47 | ASSERT( state==LIVE, NULL ); |
48 | ++FooDestroyed; |
49 | state=DEAD; |
50 | thread_id=DEAD; |
51 | serial=DEAD; |
52 | } |
53 | void operator=( Foo& item ) { |
54 | ASSERT( item.state==LIVE, NULL ); |
55 | ASSERT( state==LIVE, NULL ); |
56 | thread_id = item.thread_id; |
57 | serial = item.serial; |
58 | } |
59 | bool is_const() {return false;} |
60 | bool is_const() const {return true;} |
61 | }; |
62 | |
63 | const size_t MAXTHREAD = 256; |
64 | |
65 | static int Sum[MAXTHREAD]; |
66 | |
67 | //! Count of various pop operations |
68 | /** [0] = pop_if_present that failed |
69 | [1] = pop_if_present that succeeded |
70 | [2] = pop */ |
71 | static tbb::atomic<long> PopKind[3]; |
72 | |
73 | const int M = 10000; |
74 | |
75 | struct Body: NoAssign { |
76 | tbb::concurrent_queue<Foo>* queue; |
77 | const int nthread; |
78 | Body( int nthread_ ) : nthread(nthread_) {} |
79 | void operator()( long thread_id ) const { |
80 | long pop_kind[3] = {0,0,0}; |
81 | int serial[MAXTHREAD+1]; |
82 | memset( serial, 0, nthread*sizeof(unsigned) ); |
83 | ASSERT( thread_id<nthread, NULL ); |
84 | |
85 | long sum = 0; |
86 | for( long j=0; j<M; ++j ) { |
87 | Foo f; |
88 | f.thread_id = DEAD; |
89 | f.serial = DEAD; |
90 | bool prepopped = false; |
91 | if( j&1 ) { |
92 | prepopped = queue->pop_if_present(f); |
93 | ++pop_kind[prepopped]; |
94 | } |
95 | Foo g; |
96 | g.thread_id = thread_id; |
97 | g.serial = j+1; |
98 | queue->push( g ); |
99 | if( !prepopped ) { |
100 | queue->pop(f); |
101 | ++pop_kind[2]; |
102 | } |
103 | ASSERT( f.thread_id<=nthread, NULL ); |
104 | ASSERT( f.thread_id==nthread || serial[f.thread_id]<f.serial, "partial order violation" ); |
105 | serial[f.thread_id] = f.serial; |
106 | sum += f.serial-1; |
107 | } |
108 | Sum[thread_id] = sum; |
109 | for( int k=0; k<3; ++k ) |
110 | PopKind[k] += pop_kind[k]; |
111 | } |
112 | }; |
113 | |
114 | void TestPushPop( int prefill, ptrdiff_t capacity, int nthread ) { |
115 | ASSERT( nthread>0, "nthread must be positive" ); |
116 | if( prefill+1>=capacity ) |
117 | return; |
118 | bool success = false; |
119 | for( int k=0; k<3; ++k ) |
120 | PopKind[k] = 0; |
121 | for( int trial=0; !success; ++trial ) { |
122 | FooConstructed = 0; |
123 | FooDestroyed = 0; |
124 | Body body(nthread); |
125 | tbb::concurrent_queue<Foo> queue; |
126 | queue.set_capacity( capacity ); |
127 | body.queue = &queue; |
128 | for( int i=0; i<prefill; ++i ) { |
129 | Foo f; |
130 | f.thread_id = nthread; |
131 | f.serial = 1+i; |
132 | queue.push(f); |
133 | ASSERT( queue.size()==i+1, NULL ); |
134 | ASSERT( !queue.empty(), NULL ); |
135 | } |
136 | tbb::tick_count t0 = tbb::tick_count::now(); |
137 | NativeParallelFor( nthread, body ); |
138 | tbb::tick_count t1 = tbb::tick_count::now(); |
139 | double timing = (t1-t0).seconds(); |
140 | if( Verbose ) |
141 | printf("prefill=%d capacity=%d time = %g = %g nsec/operation\n" , prefill, int(capacity), timing, timing/(2*M*nthread)*1.E9); |
142 | int sum = 0; |
143 | for( int k=0; k<nthread; ++k ) |
144 | sum += Sum[k]; |
145 | int expected = nthread*((M-1)*M/2) + ((prefill-1)*prefill)/2; |
146 | for( int i=prefill; --i>=0; ) { |
147 | ASSERT( !queue.empty(), NULL ); |
148 | Foo f; |
149 | queue.pop(f); |
150 | ASSERT( queue.size()==i, NULL ); |
151 | sum += f.serial-1; |
152 | } |
153 | ASSERT( queue.empty(), NULL ); |
154 | ASSERT( queue.size()==0, NULL ); |
155 | if( sum!=expected ) |
156 | printf("sum=%d expected=%d\n" ,sum,expected); |
157 | ASSERT( FooConstructed==FooDestroyed, NULL ); |
158 | |
159 | success = true; |
160 | if( nthread>1 && prefill==0 ) { |
161 | // Check that pop_if_present got sufficient exercise |
162 | for( int k=0; k<2; ++k ) { |
163 | #if (_WIN32||_WIN64) |
164 | // The TBB library on Windows seems to have a tough time generating |
165 | // the desired interleavings for pop_if_present, so the code tries longer, and settles |
166 | // for fewer desired interleavings. |
167 | const int max_trial = 100; |
168 | const int min_requirement = 20; |
169 | #else |
170 | const int min_requirement = 100; |
171 | const int max_trial = 20; |
172 | #endif /* _WIN32||_WIN64 */ |
173 | if( PopKind[k]<min_requirement ) { |
174 | if( trial>=max_trial ) { |
175 | if( Verbose ) |
176 | printf("Warning: %d threads had only %ld pop_if_present operations %s after %d trials (expected at least %d). " |
177 | "This problem may merely be unlucky scheduling. " |
178 | "Investigate only if it happens repeatedly.\n" , |
179 | nthread, long(PopKind[k]), k==0?"failed" :"succeeded" , max_trial, min_requirement); |
180 | else |
181 | printf("Warning: the number of %s pop_if_present operations is less than expected for %d threads. Investigate if it happens repeatedly.\n" , |
182 | k==0?"failed" :"succeeded" , nthread ); |
183 | } else { |
184 | success = false; |
185 | } |
186 | } |
187 | } |
188 | } |
189 | } |
190 | } |
191 | |
192 | template<typename Iterator1, typename Iterator2> |
193 | void TestIteratorAux( Iterator1 i, Iterator2 j, int size ) { |
194 | Iterator1 old_i; // assigned at first iteration below |
195 | for( int k=0; k<size; ++k ) { |
196 | ASSERT( i!=j, NULL ); |
197 | ASSERT( !(i==j), NULL ); |
198 | // Test "->" |
199 | ASSERT( k+1==i->serial, NULL ); |
200 | if( k&1 ) { |
201 | // Test post-increment |
202 | Foo f = *old_i++; |
203 | ASSERT( k+1==f.serial, NULL ); |
204 | // Test assignment |
205 | i = old_i; |
206 | } else { |
207 | // Test pre-increment |
208 | if( k<size-1 ) { |
209 | Foo f = *++i; |
210 | ASSERT( k+2==f.serial, NULL ); |
211 | } else ++i; |
212 | // Test assignment |
213 | old_i = i; |
214 | } |
215 | } |
216 | ASSERT( !(i!=j), NULL ); |
217 | ASSERT( i==j, NULL ); |
218 | } |
219 | |
220 | template<typename Iterator1, typename Iterator2> |
221 | void TestIteratorAssignment( Iterator2 j ) { |
222 | Iterator1 i(j); |
223 | ASSERT( i==j, NULL ); |
224 | ASSERT( !(i!=j), NULL ); |
225 | Iterator1 k; |
226 | k = j; |
227 | ASSERT( k==j, NULL ); |
228 | ASSERT( !(k!=j), NULL ); |
229 | } |
230 | |
231 | //! Test the iterators for concurrent_queue |
232 | void TestIterator() { |
233 | tbb::concurrent_queue<Foo> queue; |
234 | tbb::concurrent_queue<Foo>& const_queue = queue; |
235 | for( int j=0; j<500; ++j ) { |
236 | TestIteratorAux( queue.begin(), queue.end(), j ); |
237 | TestIteratorAux( const_queue.begin(), const_queue.end(), j ); |
238 | TestIteratorAux( const_queue.begin(), queue.end(), j ); |
239 | TestIteratorAux( queue.begin(), const_queue.end(), j ); |
240 | Foo f; |
241 | f.serial = j+1; |
242 | queue.push(f); |
243 | } |
244 | TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( const_queue.begin() ); |
245 | TestIteratorAssignment<tbb::concurrent_queue<Foo>::const_iterator>( queue.begin() ); |
246 | TestIteratorAssignment<tbb::concurrent_queue<Foo>:: iterator>( queue.begin() ); |
247 | } |
248 | |
249 | void TestConcurrentQueueType() { |
250 | AssertSameType( tbb::concurrent_queue<Foo>::value_type(), Foo() ); |
251 | Foo f; |
252 | const Foo g; |
253 | tbb::concurrent_queue<Foo>::reference r = f; |
254 | ASSERT( &r==&f, NULL ); |
255 | ASSERT( !r.is_const(), NULL ); |
256 | tbb::concurrent_queue<Foo>::const_reference cr = g; |
257 | ASSERT( &cr==&g, NULL ); |
258 | ASSERT( cr.is_const(), NULL ); |
259 | } |
260 | |
261 | template<typename T> |
262 | void TestEmptyQueue() { |
263 | const tbb::concurrent_queue<T> queue; |
264 | ASSERT( queue.size()==0, NULL ); |
265 | ASSERT( queue.capacity()>0, NULL ); |
266 | ASSERT( size_t(queue.capacity())>=size_t(-1)/(sizeof(void*)+sizeof(T)), NULL ); |
267 | } |
268 | |
269 | void TestFullQueue() { |
270 | for( int n=0; n<10; ++n ) { |
271 | FooConstructed = 0; |
272 | FooDestroyed = 0; |
273 | tbb::concurrent_queue<Foo> queue; |
274 | queue.set_capacity(n); |
275 | for( int i=0; i<=n; ++i ) { |
276 | Foo f; |
277 | f.serial = i; |
278 | bool result = queue.push_if_not_full( f ); |
279 | ASSERT( result==(i<n), NULL ); |
280 | } |
281 | for( int i=0; i<=n; ++i ) { |
282 | Foo f; |
283 | bool result = queue.pop_if_present( f ); |
284 | ASSERT( result==(i<n), NULL ); |
285 | ASSERT( !result || f.serial==i, NULL ); |
286 | } |
287 | ASSERT( FooConstructed==FooDestroyed, NULL ); |
288 | } |
289 | } |
290 | |
291 | template<typename T> |
292 | struct TestNegativeQueueBody: NoAssign { |
293 | tbb::concurrent_queue<T>& queue; |
294 | const int nthread; |
295 | TestNegativeQueueBody( tbb::concurrent_queue<T>& q, int n ) : queue(q), nthread(n) {} |
296 | void operator()( int k ) const { |
297 | if( k==0 ) { |
298 | int number_of_pops = nthread-1; |
299 | // Wait for all pops to pend. |
300 | while( queue.size()>-number_of_pops ) { |
301 | __TBB_Yield(); |
302 | } |
303 | for( int i=0; ; ++i ) { |
304 | ASSERT( queue.size()==i-number_of_pops, NULL ); |
305 | ASSERT( queue.empty()==(queue.size()<=0), NULL ); |
306 | if( i==number_of_pops ) break; |
307 | // Satisfy another pop |
308 | queue.push( T() ); |
309 | } |
310 | } else { |
311 | // Pop item from queue |
312 | T item; |
313 | queue.pop(item); |
314 | } |
315 | } |
316 | }; |
317 | |
318 | //! Test a queue with a negative size. |
319 | template<typename T> |
320 | void TestNegativeQueue( int nthread ) { |
321 | tbb::concurrent_queue<T> queue; |
322 | NativeParallelFor( nthread, TestNegativeQueueBody<T>(queue,nthread) ); |
323 | } |
324 | |
325 | int TestMain () { |
326 | TestEmptyQueue<char>(); |
327 | TestEmptyQueue<Foo>(); |
328 | TestFullQueue(); |
329 | TestConcurrentQueueType(); |
330 | TestIterator(); |
331 | |
332 | // Test concurrent operations |
333 | for( int nthread=MinThread; nthread<=MaxThread; ++nthread ) { |
334 | TestNegativeQueue<Foo>(nthread); |
335 | for( int prefill=0; prefill<64; prefill+=(1+prefill/3) ) { |
336 | TestPushPop(prefill,ptrdiff_t(-1),nthread); |
337 | TestPushPop(prefill,ptrdiff_t(1),nthread); |
338 | TestPushPop(prefill,ptrdiff_t(2),nthread); |
339 | TestPushPop(prefill,ptrdiff_t(10),nthread); |
340 | TestPushPop(prefill,ptrdiff_t(100),nthread); |
341 | } |
342 | } |
343 | return Harness::Done; |
344 | } |
345 | |