1//************************************ bs::framework - Copyright 2018 Marko Pintera **************************************//
2//*********** Licensed under the MIT license. See LICENSE.md for full terms. This notice is not to be removed. ***********//
3#include "Threading/BsThreadPool.h"
4#include "Debug/BsDebug.h"
5
6#if BS_PLATFORM == BS_PLATFORM_WIN32
7#include "windows.h"
8
9#if BS_COMPILER == BS_COMPILER_MSVC
10// disable: nonstandard extension used: 'X' uses SEH and 'Y' has destructor
11// We don't care about this as any exception is meant to crash the program.
12#pragma warning(disable: 4509)
13#endif // BS_COMPILER == BS_COMPILER_MSVC
14
15#endif // BS_PLATFORM == BS_PLATFORM_WIN32
16
17namespace bs
18{
19 /** The thread pool will check for unused threads every UNUSED_CHECK_PERIOD getThread() calls*/
20 static constexpr int UNUSED_CHECK_PERIOD = 32;
21
22 HThread::HThread(ThreadPool* pool, UINT32 threadId)
23 :mThreadId(threadId), mPool(pool)
24 { }
25
26 void HThread::blockUntilComplete()
27 {
28 PooledThread* parentThread = nullptr;
29
30 {
31 Lock lock(mPool->mMutex);
32
33 for (auto& thread : mPool->mThreads)
34 {
35 if (thread->getId() == mThreadId)
36 {
37 parentThread = thread;
38 break;
39 }
40 }
41 }
42
43 if (parentThread != nullptr)
44 {
45 Lock lock(parentThread->mMutex);
46
47 if (parentThread->mId == mThreadId) // Check again in case it changed
48 {
49 while (!parentThread->mIdle)
50 parentThread->mWorkerEndedCond.wait(lock);
51 }
52 }
53 }
54
55 void PooledThread::initialize()
56 {
57 mThread = bs_new<Thread>(std::bind(&PooledThread::run, this));
58
59 Lock lock(mMutex);
60
61 while(!mThreadStarted)
62 mStartedCond.wait(lock);
63 }
64
65 void PooledThread::start(std::function<void()> workerMethod, UINT32 id)
66 {
67 {
68 Lock lock(mMutex);
69
70 mWorkerMethod = workerMethod;
71 mIdle = false;
72 mIdleTime = std::time(nullptr);
73 mThreadReady = true;
74 mId = id;
75 }
76
77 mReadyCond.notify_one();
78 }
79
80 void PooledThread::run()
81 {
82 onThreadStarted(mName);
83
84 {
85 Lock lock(mMutex);
86 mThreadStarted = true;
87 }
88
89 mStartedCond.notify_one();
90
91 while(true)
92 {
93 std::function<void()> worker = nullptr;
94
95 {
96 {
97 Lock lock(mMutex);
98
99 while (!mThreadReady)
100 mReadyCond.wait(lock);
101
102 worker = mWorkerMethod;
103 }
104
105 if (worker == nullptr)
106 {
107 onThreadEnded(mName);
108 return;
109 }
110 }
111
112#if BS_PLATFORM == BS_PLATFORM_WIN32
113 __try
114 {
115 worker();
116 }
117 __except (gCrashHandler().reportCrash(GetExceptionInformation()))
118 {
119 PlatformUtility::terminate(true);
120 }
121#else
122 worker();
123#endif
124
125 {
126 Lock lock(mMutex);
127
128 mIdle = true;
129 mIdleTime = std::time(nullptr);
130 mThreadReady = false;
131 mWorkerMethod = nullptr; // Make sure to clear as it could have bound shared pointers and similar
132
133 mWorkerEndedCond.notify_one();
134 }
135 }
136 }
137
138 void PooledThread::destroy()
139 {
140 blockUntilComplete();
141
142 {
143 Lock lock(mMutex);
144 mWorkerMethod = nullptr;
145 mThreadReady = true;
146 }
147
148 mReadyCond.notify_one();
149 mThread->join();
150 bs_delete(mThread);
151 }
152
153 void PooledThread::blockUntilComplete()
154 {
155 Lock lock(mMutex);
156
157 while (!mIdle)
158 mWorkerEndedCond.wait(lock);
159 }
160
161 bool PooledThread::isIdle()
162 {
163 Lock lock(mMutex);
164
165 return mIdle;
166 }
167
168 time_t PooledThread::idleTime()
169 {
170 Lock lock(mMutex);
171
172 return (time(nullptr) - mIdleTime);
173 }
174
175 void PooledThread::setName(const String& name)
176 {
177 mName = name;
178 }
179
180 UINT32 PooledThread::getId() const
181 {
182 Lock lock(mMutex);
183
184 return mId;
185 }
186
187 ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
188 :mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout)
189 {
190
191 }
192
193 ThreadPool::~ThreadPool()
194 {
195 stopAll();
196 }
197
198 HThread ThreadPool::run(const String& name, std::function<void()> workerMethod)
199 {
200 PooledThread* thread = getThread(name);
201 thread->start(workerMethod, mUniqueId++);
202
203 return HThread(this, thread->getId());
204 }
205
206 void ThreadPool::stopAll()
207 {
208 Lock lock(mMutex);
209 for(auto& thread : mThreads)
210 {
211 destroyThread(thread);
212 }
213
214 mThreads.clear();
215 }
216
217 void ThreadPool::clearUnused()
218 {
219 Lock lock(mMutex);
220 mAge = 0;
221
222 if(mThreads.size() <= mDefaultCapacity)
223 return;
224
225 Vector<PooledThread*> idleThreads;
226 Vector<PooledThread*> expiredThreads;
227 Vector<PooledThread*> activeThreads;
228
229 idleThreads.reserve(mThreads.size());
230 expiredThreads.reserve(mThreads.size());
231 activeThreads.reserve(mThreads.size());
232
233 for(auto& thread : mThreads)
234 {
235 if(thread->isIdle())
236 {
237 if(thread->idleTime() >= mIdleTimeout)
238 expiredThreads.push_back(thread);
239 else
240 idleThreads.push_back(thread);
241 }
242 else
243 activeThreads.push_back(thread);
244 }
245
246 idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
247 UINT32 limit = std::min((UINT32)idleThreads.size(), mDefaultCapacity);
248
249 UINT32 i = 0;
250 mThreads.clear();
251
252 for(auto& thread : idleThreads)
253 {
254 if (i < limit)
255 {
256 mThreads.push_back(thread);
257 i++;
258 }
259 else
260 destroyThread(thread);
261 }
262
263 mThreads.insert(mThreads.end(), activeThreads.begin(), activeThreads.end());
264 }
265
266 void ThreadPool::destroyThread(PooledThread* thread)
267 {
268 thread->destroy();
269 bs_delete(thread);
270 }
271
272 PooledThread* ThreadPool::getThread(const String& name)
273 {
274 UINT32 age = 0;
275 {
276 Lock lock(mMutex);
277 age = ++mAge;
278 }
279
280 if(age == UNUSED_CHECK_PERIOD)
281 clearUnused();
282
283 Lock lock(mMutex);
284
285 for(auto& thread : mThreads)
286 {
287 if(thread->isIdle())
288 {
289 thread->setName(name);
290 return thread;
291 }
292 }
293
294 if(mThreads.size() >= mMaxCapacity)
295 BS_EXCEPT(InvalidStateException, "Unable to create a new thread in the pool because maximum capacity has been reached.");
296
297 PooledThread* newThread = createThread(name);
298 mThreads.push_back(newThread);
299
300 return newThread;
301 }
302
303 UINT32 ThreadPool::getNumAvailable() const
304 {
305 UINT32 numAvailable = mMaxCapacity;
306
307 Lock lock(mMutex);
308 for(auto& thread : mThreads)
309 {
310 if(!thread->isIdle())
311 numAvailable--;
312 }
313
314 return numAvailable;
315 }
316
317 UINT32 ThreadPool::getNumActive() const
318 {
319 UINT32 numActive = 0;
320
321 Lock lock(mMutex);
322 for(auto& thread : mThreads)
323 {
324 if(!thread->isIdle())
325 numActive++;
326 }
327
328 return numActive;
329 }
330
331 UINT32 ThreadPool::getNumAllocated() const
332 {
333 Lock lock(mMutex);
334
335 return (UINT32)mThreads.size();
336 }
337}
338