1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define TBB_PREVIEW_WAITING_FOR_WORKERS 1
18#include "tbb/task_scheduler_init.h"
19#include "tbb/blocked_range.h"
20#include "tbb/cache_aligned_allocator.h"
21#include "tbb/parallel_for.h"
22
23#define HARNESS_DEFAULT_MIN_THREADS (tbb::task_scheduler_init::default_num_threads())
24#define HARNESS_DEFAULT_MAX_THREADS (4*tbb::task_scheduler_init::default_num_threads())
25#if __bg__
26// CNK does not support fork()
27#define HARNESS_SKIP_TEST 1
28#endif
29#include "harness.h"
30
31#if _WIN32||_WIN64
32#include "tbb/concurrent_hash_map.h"
33
34HANDLE getCurrentThreadHandle()
35{
36 HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE;
37#if TBB_USE_ASSERT
38 BOOL res =
39#endif
40 DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS );
41 __TBB_ASSERT( res, "Retrieving current thread handle failed" );
42 return hThr;
43}
44
45bool threadTerminated(HANDLE h)
46{
47 DWORD ret = WaitForSingleObjectEx(h, 0, FALSE);
48 return WAIT_OBJECT_0 == ret;
49}
50
51struct Data {
52 HANDLE h;
53};
54
55typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType;
56
57static TidTableType tidTable;
58
59#else
60
61#if __sun || __SUNPRO_CC
62#define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2)
63#endif
64#include <signal.h>
65#include <sys/types.h>
66#include <unistd.h>
67#include <sys/wait.h>
68#include <sched.h>
69
70#include "tbb/tick_count.h"
71
72void SigHandler(int) { }
73
74#endif // _WIN32||_WIN64
75
76class AllocTask {
77public:
78 void operator() (const tbb::blocked_range<int> &r) const {
79#if _WIN32||_WIN64
80 HANDLE h = getCurrentThreadHandle();
81 DWORD tid = GetCurrentThreadId();
82 {
83 TidTableType::accessor acc;
84 if (tidTable.insert(acc, tid)) {
85 acc->second.h = h;
86 }
87 }
88#endif
89 for (int y = r.begin(); y != r.end(); ++y) {
90 void *p = tbb::internal::NFS_Allocate(1, 7000, NULL);
91 tbb::internal::NFS_Free(p);
92 }
93 }
94 AllocTask() {}
95};
96
97void CallParallelFor()
98{
99 tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(),
100 tbb::simple_partitioner());
101}
102
103/* Regression test against data race between termination of workers
104 and setting blocking terination mode in main thread. */
105class RunWorkersBody : NoAssign {
106 bool wait_workers;
107public:
108 RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {}
109 void operator()(const int /*threadID*/) const {
110 tbb::task_scheduler_init sch(MaxThread);
111 CallParallelFor();
112 if (wait_workers) {
113 bool ok = sch.blocking_terminate(std::nothrow);
114 ASSERT(ok, NULL);
115 }
116 }
117};
118
119void TestBlockNonblock()
120{
121 for (int i=0; i<100; i++) {
122 REMARK("\rIteration %d ", i);
123 NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false));
124 RunWorkersBody(/*wait_workers=*/true)(0);
125 }
126}
127
128class RunInNativeThread : NoAssign {
129 bool create_tsi,
130 blocking;
131public:
132 RunInNativeThread(bool create_tsi_, bool blocking_) :
133 create_tsi(create_tsi_), blocking(blocking_) {}
134 void operator()(const int /*threadID*/) const {
135 // nested TSI or auto-initialized TSI can be terminated when
136 // wait_workers is true (deferred TSI means auto-initialization)
137 tbb::task_scheduler_init tsi(create_tsi? 2 : tbb::task_scheduler_init::deferred);
138 CallParallelFor();
139 if (blocking) {
140 bool ok = tsi.blocking_terminate(std::nothrow);
141 // all usages are nested
142 ASSERT(!ok, "Nested blocking terminate must fail.");
143 }
144 }
145};
146
147void TestTasksInThread()
148{
149 tbb::task_scheduler_init sch(2);
150 CallParallelFor();
151 for (int i=0; i<2; i++)
152 NativeParallelFor(2, RunInNativeThread(/*create_tsi=*/1==i, /*blocking=*/false));
153 bool ok = sch.blocking_terminate(std::nothrow);
154 ASSERT(ok, NULL);
155}
156
157#include "harness_memory.h"
158
159// check for memory leak during TBB task scheduler init/terminate life cycle
160// TODO: move to test_task_scheduler_init after workers waiting productization
161void TestSchedulerMemLeaks()
162{
163 const int ITERS = 10;
164 int it;
165
166 for (it=0; it<ITERS; it++) {
167 size_t memBefore = GetMemoryUsage();
168#if _MSC_VER && _DEBUG
169 // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG
170 _CrtMemState stateBefore, stateAfter, diffState;
171 _CrtMemCheckpoint(&stateBefore);
172#endif
173 for (int i=0; i<100; i++) {
174 tbb::task_scheduler_init sch(1);
175 for (int k=0; k<10; k++) {
176 tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task();
177 tbb::task::enqueue(*t);
178 }
179 bool ok = sch.blocking_terminate(std::nothrow);
180 ASSERT(ok, NULL);
181 }
182#if _MSC_VER && _DEBUG
183 _CrtMemCheckpoint(&stateAfter);
184 int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter);
185 ASSERT(!ret, "It must be no memory leaks at this point.");
186#endif
187 if (GetMemoryUsage() <= memBefore)
188 break;
189 }
190 ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?");
191}
192
193void TestNestingTSI()
194{
195 // nesting with and without blocking is possible
196 for (int i=0; i<2; i++) {
197 tbb::task_scheduler_init schBlock(2);
198 CallParallelFor();
199 tbb::task_scheduler_init schBlock1(2);
200 CallParallelFor();
201 if (i)
202 schBlock1.terminate();
203 else {
204 bool ok = schBlock1.blocking_terminate(std::nothrow);
205 ASSERT(!ok, "Nested blocking terminate must fail.");
206 }
207 bool ok = schBlock.blocking_terminate(std::nothrow);
208 ASSERT(ok, NULL);
209 }
210 {
211 tbb::task_scheduler_init schBlock(2);
212 NativeParallelFor(1, RunInNativeThread(/*create_tsi=*/true, /*blocking=*/true));
213 bool ok = schBlock.blocking_terminate(std::nothrow);
214 ASSERT(ok, NULL);
215 }
216}
217
218void TestAutoInit()
219{
220 CallParallelFor(); // autoinit
221 // creation of blocking scheduler is possible, but one is not block
222 NativeParallelFor(1, RunInNativeThread(/*create_tsi=*/true, /*blocking=*/true));
223}
224
225int TestMain()
226{
227 using namespace Harness;
228
229 TestNestingTSI();
230 TestBlockNonblock();
231 TestTasksInThread();
232 TestSchedulerMemLeaks();
233
234 bool child = false;
235#if _WIN32||_WIN64
236 DWORD masterTid = GetCurrentThreadId();
237#else
238 struct sigaction sa;
239 sigset_t sig_set;
240
241 sigemptyset(&sa.sa_mask);
242 sa.sa_flags = 0;
243 sa.sa_handler = SigHandler;
244 if (sigaction(SIGCHLD, &sa, NULL))
245 ASSERT(0, "sigaction failed");
246 if (sigaction(SIGALRM, &sa, NULL))
247 ASSERT(0, "sigaction failed");
248 // block SIGCHLD and SIGALRM, the mask is inherited by worker threads
249 sigemptyset(&sig_set);
250 sigaddset(&sig_set, SIGCHLD);
251 sigaddset(&sig_set, SIGALRM);
252 if (pthread_sigmask(SIG_BLOCK, &sig_set, NULL))
253 ASSERT(0, "pthread_sigmask failed");
254#endif
255 for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) {
256 for (int i=0; i<20; i++) {
257 if (!child)
258 REMARK("\rThreads %d %d ", threads, i);
259 {
260 tbb::task_scheduler_init sch(threads);
261 bool ok = sch.blocking_terminate(std::nothrow);
262 ASSERT(ok, NULL);
263 }
264 tbb::task_scheduler_init sch(threads);
265
266 CallParallelFor();
267 bool ok = sch.blocking_terminate(std::nothrow);
268 ASSERT(ok, NULL);
269
270#if _WIN32||_WIN64
271 // check that there is no alive threads after terminate()
272 for (TidTableType::const_iterator it = tidTable.begin();
273 it != tidTable.end(); ++it) {
274 if (masterTid != it->first) {
275 ASSERT(threadTerminated(it->second.h), NULL);
276 }
277 }
278 tidTable.clear();
279#else // _WIN32||_WIN64
280 if (child)
281 exit(0);
282 else {
283 pid_t pid = fork();
284 if (!pid) {
285 i = -1;
286 child = true;
287 } else {
288 int sig;
289 pid_t w_ret = 0;
290 // wait for SIGCHLD up to timeout
291 alarm(30);
292 if (0 != sigwait(&sig_set, &sig))
293 ASSERT(0, "sigwait failed");
294 alarm(0);
295 w_ret = waitpid(pid, NULL, WNOHANG);
296 ASSERT(w_ret>=0, "waitpid failed");
297 if (!w_ret) {
298 ASSERT(!kill(pid, SIGKILL), NULL);
299 w_ret = waitpid(pid, NULL, 0);
300 ASSERT(w_ret!=-1, "waitpid failed");
301
302 ASSERT(0, "Hang after fork");
303 }
304 // clean pending signals (if any occurs since sigwait)
305 sigset_t p_mask;
306 for (;;) {
307 sigemptyset(&p_mask);
308 sigpending(&p_mask);
309 if (sigismember(&p_mask, SIGALRM)
310 || sigismember(&p_mask, SIGCHLD)) {
311 if (0 != sigwait(&p_mask, &sig))
312 ASSERT(0, "sigwait failed");
313 } else
314 break;
315 }
316 }
317 }
318#endif // _WIN32||_WIN64
319 }
320 }
321 // auto initialization at this point
322 TestAutoInit();
323
324 return Harness::Done;
325}
326
327