1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness_task.h"
18#include "harness_barrier.h"
19#include "tbb/atomic.h"
20#include "tbb/tbb_thread.h"
21#include "tbb/task_scheduler_init.h"
22#include "tbb/tick_count.h"
23
24////////////////////////////////////////////////////////////////////////////////
25// Test for basic FIFO scheduling functionality
26
27const int PairsPerTrack = 100;
28
29class EnqueuedTask : public tbb::task {
30 task* my_successor;
31 int my_enqueue_order;
32 int* my_track;
33 tbb::task* execute() __TBB_override {
34 // Capture execution order in the very beginning
35 int execution_order = 2 - my_successor->decrement_ref_count();
36 // Create some local work.
37 TaskGenerator& p = *new( allocate_root() ) TaskGenerator(2,2);
38 spawn_root_and_wait(p);
39 if( execution_order==2 ) { // the "slower" of two peer tasks
40 ++nCompletedPairs;
41 // Of course execution order can differ from dequeue order.
42 // But there is no better approximation at hand; and a single worker
43 // will execute in dequeue order, which is enough for our check.
44 if (my_enqueue_order==execution_order)
45 ++nOrderedPairs;
46 FireTwoTasks(my_track);
47 destroy(*my_successor);
48 }
49 return NULL;
50 }
51public:
52 EnqueuedTask( task* successor, int enq_order, int* track )
53 : my_successor(successor), my_enqueue_order(enq_order), my_track(track) {}
54
55 // Create and enqueue two tasks
56 static void FireTwoTasks( int* track ) {
57 int progress = ++*track;
58 if( progress < PairsPerTrack ) {
59 task* successor = new (allocate_root()) tbb::empty_task;
60 successor->set_ref_count(2);
61 enqueue( *new (allocate_root()) EnqueuedTask(successor, 1, track) );
62 enqueue( *new (allocate_root()) EnqueuedTask(successor, 2, track) );
63 }
64 }
65
66 static tbb::atomic<int> nCompletedPairs;
67 static tbb::atomic<int> nOrderedPairs;
68};
69
70tbb::atomic<int> EnqueuedTask::nCompletedPairs;
71tbb::atomic<int> EnqueuedTask::nOrderedPairs;
72
73const int nTracks = 10;
74static int TaskTracks[nTracks];
75const int stall_threshold = 1000000; // 1 sec
76
77void TimedYield( double pause_time ) {
78 tbb::tick_count start = tbb::tick_count::now();
79 while( (tbb::tick_count::now()-start).seconds() < pause_time )
80 tbb::this_tbb_thread::sleep(tbb::tick_count::interval_t(pause_time));
81}
82
83class ProgressMonitor {
84public:
85 void operator() ( ) {
86 int track_snapshot[nTracks];
87 int stall_count = 0, uneven_progress_count = 0, last_progress_mask = 0;
88 for(int i=0; i<nTracks; ++i)
89 track_snapshot[i]=0;
90 bool completed;
91 do {
92 // Yield repeatedly for at least 1 usec
93 TimedYield( 1E-6 );
94 int overall_progress = 0, progress_mask = 0;
95 const int all_progressed = (1<<nTracks) - 1;
96 completed = true;
97 for(int i=0; i<nTracks; ++i) {
98 int ti = TaskTracks[i];
99 int pi = ti-track_snapshot[i];
100 if( pi ) progress_mask |= 1<<i;
101 overall_progress += pi;
102 completed = completed && ti==PairsPerTrack;
103 track_snapshot[i]=ti;
104 }
105 // The constants in the next asserts are subjective and may need correction.
106 if( overall_progress )
107 stall_count=0;
108 else {
109 ++stall_count;
110 // no progress; consider it dead.
111 ASSERT(stall_count < stall_threshold, "no progress on enqueued tasks; deadlock, or the machine is heavily oversubscribed?");
112 }
113 if( progress_mask==all_progressed || progress_mask^last_progress_mask ) {
114 uneven_progress_count = 0;
115 last_progress_mask = progress_mask;
116 }
117 else if ( overall_progress > 2 ) {
118 ++uneven_progress_count;
119 // The threshold of 32 is 4x bigger than what was observed on a 8-core machine with oversubscription.
120 ASSERT_WARNING(uneven_progress_count < 32,
121 "some enqueued tasks seem stalling; no simultaneous progress, or the machine is oversubscribed? Investigate if repeated");
122 }
123 } while( !completed );
124 }
125};
126
127void TestEnqueue( int p ) {
128 REMARK("Testing task::enqueue for %d threads\n", p);
129 for(int mode=0;mode<3;++mode) {
130 tbb::task_scheduler_init init(p);
131 EnqueuedTask::nCompletedPairs = EnqueuedTask::nOrderedPairs = 0;
132 for(int i=0; i<nTracks; ++i) {
133 TaskTracks[i] = -1; // to accommodate for the starting call
134 EnqueuedTask::FireTwoTasks(TaskTracks+i);
135 }
136 ProgressMonitor pm;
137 tbb::tbb_thread thr( pm );
138 if(mode==1) {
139 // do some parallel work in the meantime
140 for(int i=0; i<10; i++) {
141 TaskGenerator& g = *new( tbb::task::allocate_root() ) TaskGenerator(2,5);
142 tbb::task::spawn_root_and_wait(g);
143 TimedYield( 1E-6 );
144 }
145 }
146 if( mode==2 ) {
147 // Additionally enqueue a bunch of empty tasks. The goal is to test that tasks
148 // allocated and enqueued by a thread are safe to use after the thread leaves TBB.
149 tbb::task* root = new (tbb::task::allocate_root()) tbb::empty_task;
150 root->set_ref_count(100);
151 for( int i=0; i<100; ++i )
152 tbb::task::enqueue( *new (root->allocate_child()) tbb::empty_task );
153 init.terminate(); // master thread deregistered
154 }
155 thr.join();
156 ASSERT(EnqueuedTask::nCompletedPairs==nTracks*PairsPerTrack, NULL);
157 ASSERT(EnqueuedTask::nOrderedPairs<EnqueuedTask::nCompletedPairs,
158 "all task pairs executed in enqueue order; de facto guarantee is too strong?");
159 }
160}
161
162////////////////////////////////////////////////////////////////////////////////
163// Tests for Fire-And-Forget scheduling functionality
164
165int NumRepeats = 200;
166const int MaxNumThreads = 16;
167static volatile bool Finished[MaxNumThreads] = {};
168
169static volatile bool CanStart;
170
171//! Custom user task interface
172class ITask {
173public:
174 virtual ~ITask() {}
175 virtual void Execute() = 0;
176 virtual void Release() { delete this; }
177};
178
179class TestTask : public ITask {
180 volatile bool *m_pDone;
181public:
182 TestTask ( volatile bool *pDone ) : m_pDone(pDone) {}
183
184 void Execute() __TBB_override {
185 *m_pDone = true;
186 }
187};
188
189class CarrierTask : public tbb::task {
190 ITask* m_pTask;
191public:
192 CarrierTask(ITask* pTask) : m_pTask(pTask) {}
193
194 task* execute() __TBB_override {
195 m_pTask->Execute();
196 m_pTask->Release();
197 return NULL;
198 }
199};
200
201class SpawnerTask : public ITask {
202 ITask* m_taskToSpawn;
203public:
204 SpawnerTask(ITask* job) : m_taskToSpawn(job) {}
205
206 void Execute() __TBB_override {
207 while ( !CanStart )
208 __TBB_Yield();
209 Harness::Sleep(10); // increases probability of the bug
210 tbb::task::enqueue( *new( tbb::task::allocate_root() ) CarrierTask(m_taskToSpawn) );
211 }
212};
213
214class EnqueuerBody {
215public:
216 void operator() ( int id ) const {
217 tbb::task_scheduler_init init(tbb::task_scheduler_init::default_num_threads() + 1);
218
219 SpawnerTask* pTask = new SpawnerTask( new TestTask(Finished + id) );
220 tbb::task::enqueue( *new( tbb::task::allocate_root() ) CarrierTask(pTask) );
221 }
222};
223
224//! Regression test for a bug that caused premature arena destruction
225void TestCascadedEnqueue () {
226 REMARK("Testing cascaded enqueue\n");
227 tbb::task_scheduler_init init(tbb::task_scheduler_init::default_num_threads() + 1);
228
229 int minNumThreads = min(tbb::task_scheduler_init::default_num_threads(), MaxNumThreads) / 2;
230 int maxNumThreads = min(tbb::task_scheduler_init::default_num_threads() * 2, MaxNumThreads);
231
232 for ( int numThreads = minNumThreads; numThreads <= maxNumThreads; ++numThreads ) {
233 for ( int i = 0; i < NumRepeats; ++i ) {
234 CanStart = false;
235 __TBB_Yield();
236 NativeParallelFor( numThreads, EnqueuerBody() );
237 CanStart = true;
238 int j = 0;
239 while ( j < numThreads ) {
240 if ( Finished[j] )
241 ++j;
242 else
243 __TBB_Yield();
244 }
245 for ( j = 0; j < numThreads; ++j )
246 Finished[j] = false;
247 REMARK("\r%02d threads; Iteration %03d", numThreads, i);
248 }
249 }
250 REMARK( "\r \r" );
251}
252
253class DummyTask : public tbb::task {
254public:
255 task *execute() __TBB_override {
256 Harness::Sleep(1);
257 return NULL;
258 }
259};
260
261class SharedRootBody {
262 tbb::task *my_root;
263public:
264 SharedRootBody ( tbb::task *root ) : my_root(root) {}
265
266 void operator() ( int ) const {
267 tbb::task::enqueue( *new( tbb::task::allocate_additional_child_of(*my_root) ) DummyTask );
268 }
269};
270
271//! Test for enqueuing children of the same root from different master threads
272void TestSharedRoot ( int p ) {
273 REMARK("Testing enqueuing siblings from different masters\n");
274 tbb::task_scheduler_init init(p);
275 tbb::task *root = new ( tbb::task::allocate_root() ) tbb::empty_task;
276 root->set_ref_count(1);
277 for( int n = MinThread; n <= MaxThread; ++n ) {
278 REMARK("%d masters, %d requested workers\r", n, p-1);
279 NativeParallelFor( n, SharedRootBody(root) );
280 }
281 REMARK( " \r" );
282 root->wait_for_all();
283 tbb::task::destroy(*root);
284}
285
286class BlockingTask : public tbb::task {
287 Harness::SpinBarrier &m_Barrier;
288
289 tbb::task* execute () __TBB_override {
290 m_Barrier.wait();
291 return 0;
292 }
293
294public:
295 BlockingTask ( Harness::SpinBarrier& bar ) : m_Barrier(bar) {}
296};
297
298//! Test making sure that masters can dequeue tasks
299/** Success criterion is not hanging. **/
300void TestDequeueByMaster () {
301 REMARK("Testing task dequeuing by master\n");
302 tbb::task_scheduler_init init(1);
303 Harness::SpinBarrier bar(2);
304 tbb::task &r = *new ( tbb::task::allocate_root() ) tbb::empty_task;
305 r.set_ref_count(3);
306 tbb::task::enqueue( *new(r.allocate_child()) BlockingTask(bar) );
307 tbb::task::enqueue( *new(r.allocate_child()) BlockingTask(bar) );
308 r.wait_for_all();
309 tbb::task::destroy(r);
310}
311
312////////////////////// Missed wake-ups ///////
313#include "tbb/blocked_range.h"
314#include "tbb/parallel_for.h"
315
316static const int NUM_TASKS = 4;
317static const size_t NUM_REPEATS = TBB_USE_DEBUG ? 50000 : 100000;
318static tbb::task_group_context persistent_context(tbb::task_group_context::isolated);
319
320struct Functor : NoAssign
321{
322 Harness::SpinBarrier &my_barrier;
323 Functor(Harness::SpinBarrier &a_barrier) : my_barrier(a_barrier) { }
324 void operator()(const tbb::blocked_range<int>& r) const
325 {
326 ASSERT(r.size() == 1, NULL);
327 // allocate_root() uses current context of parallel_for which is destroyed when it finishes.
328 // But enqueued tasks can outlive parallel_for execution. Thus, use a persistent context.
329 tbb::task *t = new(tbb::task::allocate_root(persistent_context)) tbb::empty_task();
330 tbb::task::enqueue(*t); // ensure no missing wake-ups
331 my_barrier.timed_wait(10, "Attention: poorly reproducible event, if seen stress testing required" );
332 }
333};
334
335void TestWakeups()
336{
337 tbb::task_scheduler_init my(tbb::task_scheduler_init::deferred);
338 if( tbb::task_scheduler_init::default_num_threads() <= NUM_TASKS )
339 my.initialize(NUM_TASKS*2);
340 else // workaround issue #1996 for TestCascadedEnqueue
341 my.initialize(tbb::task_scheduler_init::default_num_threads()+1);
342 Harness::SpinBarrier barrier(NUM_TASKS);
343 REMARK("Missing wake-up: affinity_partitioner\n");
344 tbb::affinity_partitioner aff;
345 for (size_t i = 0; i < NUM_REPEATS; ++i)
346 tbb::parallel_for(tbb::blocked_range<int>(0, NUM_TASKS), Functor(barrier), aff);
347 REMARK("Missing wake-up: simple_partitioner\n");
348 for (size_t i = 0; i < NUM_REPEATS; ++i)
349 tbb::parallel_for(tbb::blocked_range<int>(0, NUM_TASKS), Functor(barrier), tbb::simple_partitioner());
350 REMARK("Missing wake-up: auto_partitioner\n");
351 for (size_t i = 0; i < NUM_REPEATS; ++i)
352 tbb::parallel_for(tbb::blocked_range<int>(0, NUM_TASKS), Functor(barrier)); // auto
353}
354
355#include "tbb/global_control.h"
356
357int TestMain () {
358
359 TestWakeups(); // 1st because requests oversubscription
360 for (int i=0; i<2; i++) {
361 tbb::global_control *c = i?
362 new tbb::global_control(tbb::global_control::max_allowed_parallelism, 1) : NULL;
363 if (i) // decrease workload for max_allowed_parallelism == 1
364 NumRepeats = 10;
365
366 TestCascadedEnqueue(); // needs oversubscription
367 if (!c)
368 TestDequeueByMaster(); // no oversubscription needed
369 for( int p=MinThread; p<=MaxThread; ++p ) {
370 TestEnqueue(p);
371 TestSharedRoot(p);
372 }
373 delete c;
374 }
375 return Harness::Done;
376}
377