1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/executors/ThreadPoolExecutor.h>
18
19#include <folly/executors/GlobalThreadPoolList.h>
20#include <folly/synchronization/AsymmetricMemoryBarrier.h>
21
22namespace folly {
23
24using SyncVecThreadPoolExecutors =
25 folly::Synchronized<std::vector<ThreadPoolExecutor*>>;
26
27SyncVecThreadPoolExecutors& getSyncVecThreadPoolExecutors() {
28 static Indestructible<SyncVecThreadPoolExecutors> storage;
29 return *storage;
30}
31
32DEFINE_int64(
33 threadtimeout_ms,
34 60000,
35 "Idle time before ThreadPoolExecutor threads are joined");
36
37ThreadPoolExecutor::ThreadPoolExecutor(
38 size_t /* maxThreads */,
39 size_t minThreads,
40 std::shared_ptr<ThreadFactory> threadFactory,
41 bool isWaitForAll)
42 : threadFactory_(std::move(threadFactory)),
43 isWaitForAll_(isWaitForAll),
44 taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
45 threadPoolHook_("folly::ThreadPoolExecutor"),
46 minThreads_(minThreads),
47 threadTimeout_(FLAGS_threadtimeout_ms) {
48 getSyncVecThreadPoolExecutors()->push_back(this);
49}
50
51ThreadPoolExecutor::~ThreadPoolExecutor() {
52 joinKeepAliveOnce();
53 CHECK_EQ(0, threadList_.get().size());
54 getSyncVecThreadPoolExecutors().withWLock([this](auto& tpe) {
55 tpe.erase(std::remove(tpe.begin(), tpe.end(), this), tpe.end());
56 });
57}
58
59ThreadPoolExecutor::Task::Task(
60 Func&& func,
61 std::chrono::milliseconds expiration,
62 Func&& expireCallback)
63 : func_(std::move(func)),
64 expiration_(expiration),
65 expireCallback_(std::move(expireCallback)),
66 context_(folly::RequestContext::saveContext()) {
67 // Assume that the task in enqueued on creation
68 enqueueTime_ = std::chrono::steady_clock::now();
69}
70
71void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
72 thread->idle = false;
73 auto startTime = std::chrono::steady_clock::now();
74 task.stats_.waitTime = startTime - task.enqueueTime_;
75 if (task.expiration_ > std::chrono::milliseconds(0) &&
76 task.stats_.waitTime >= task.expiration_) {
77 task.stats_.expired = true;
78 if (task.expireCallback_ != nullptr) {
79 task.expireCallback_();
80 }
81 } else {
82 folly::RequestContextScopeGuard rctx(task.context_);
83 try {
84 task.func_();
85 } catch (const std::exception& e) {
86 LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled "
87 << typeid(e).name() << " exception: " << e.what();
88 } catch (...) {
89 LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
90 "object";
91 }
92 task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
93 }
94 thread->idle = true;
95 thread->lastActiveTime = std::chrono::steady_clock::now();
96 thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
97 *thread->taskStatsCallbacks->inCallback = true;
98 SCOPE_EXIT {
99 *thread->taskStatsCallbacks->inCallback = false;
100 };
101 try {
102 for (auto& callback : callbacks) {
103 callback(task.stats_);
104 }
105 } catch (const std::exception& e) {
106 LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
107 "unhandled "
108 << typeid(e).name() << " exception: " << e.what();
109 } catch (...) {
110 LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
111 "unhandled non-exception object";
112 }
113 });
114}
115
116size_t ThreadPoolExecutor::numThreads() {
117 return maxThreads_.load(std::memory_order_relaxed);
118}
119
120size_t ThreadPoolExecutor::numActiveThreads() {
121 return activeThreads_.load(std::memory_order_relaxed);
122}
123
124// Set the maximum number of running threads.
125void ThreadPoolExecutor::setNumThreads(size_t numThreads) {
126 /* Since ThreadPoolExecutor may be dynamically adjusting the number of
127 threads, we adjust the relevant variables instead of changing
128 the number of threads directly. Roughly:
129
130 If numThreads < minthreads reset minThreads to numThreads.
131
132 If numThreads < active threads, reduce number of running threads.
133
134 If the number of pending tasks is > 0, then increase the currently
135 active number of threads such that we can run all the tasks, or reach
136 numThreads.
137
138 Note that if there are observers, we actually have to create all
139 the threads, because some observer implementations need to 'observe'
140 all thread creation (see tests for an example of this)
141 */
142
143 size_t numThreadsToJoin = 0;
144 {
145 SharedMutex::WriteHolder w{&threadListLock_};
146 auto pending = getPendingTaskCountImpl();
147 maxThreads_.store(numThreads, std::memory_order_relaxed);
148 auto active = activeThreads_.load(std::memory_order_relaxed);
149 auto minthreads = minThreads_.load(std::memory_order_relaxed);
150 if (numThreads < minthreads) {
151 minthreads = numThreads;
152 minThreads_.store(numThreads, std::memory_order_relaxed);
153 }
154 if (active > numThreads) {
155 numThreadsToJoin = active - numThreads;
156 if (numThreadsToJoin > active - minthreads) {
157 numThreadsToJoin = active - minthreads;
158 }
159 ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
160 activeThreads_.store(
161 active - numThreadsToJoin, std::memory_order_relaxed);
162 } else if (pending > 0 || observers_.size() > 0 || active < minthreads) {
163 size_t numToAdd = std::min(pending, numThreads - active);
164 if (observers_.size() > 0) {
165 numToAdd = numThreads - active;
166 }
167 if (active + numToAdd < minthreads) {
168 numToAdd = minthreads - active;
169 }
170 ThreadPoolExecutor::addThreads(numToAdd);
171 activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
172 }
173 }
174
175 /* We may have removed some threads, attempt to join them */
176 joinStoppedThreads(numThreadsToJoin);
177}
178
179// threadListLock_ is writelocked
180void ThreadPoolExecutor::addThreads(size_t n) {
181 std::vector<ThreadPtr> newThreads;
182 for (size_t i = 0; i < n; i++) {
183 newThreads.push_back(makeThread());
184 }
185 for (auto& thread : newThreads) {
186 // TODO need a notion of failing to create the thread
187 // and then handling for that case
188 thread->handle = threadFactory_->newThread(
189 std::bind(&ThreadPoolExecutor::threadRun, this, thread));
190 threadList_.add(thread);
191 }
192 for (auto& thread : newThreads) {
193 thread->startupBaton.wait();
194 }
195 for (auto& o : observers_) {
196 for (auto& thread : newThreads) {
197 o->threadStarted(thread.get());
198 }
199 }
200}
201
202// threadListLock_ is writelocked
203void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
204 isJoin_ = isJoin;
205 stopThreads(n);
206}
207
208void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
209 for (size_t i = 0; i < n; i++) {
210 auto thread = stoppedThreads_.take();
211 thread->handle.join();
212 }
213}
214
215void ThreadPoolExecutor::stop() {
216 joinKeepAliveOnce();
217 size_t n = 0;
218 {
219 SharedMutex::WriteHolder w{&threadListLock_};
220 maxThreads_.store(0, std::memory_order_release);
221 activeThreads_.store(0, std::memory_order_release);
222 n = threadList_.get().size();
223 removeThreads(n, false);
224 n += threadsToJoin_.load(std::memory_order_relaxed);
225 threadsToJoin_.store(0, std::memory_order_relaxed);
226 }
227 joinStoppedThreads(n);
228 CHECK_EQ(0, threadList_.get().size());
229 CHECK_EQ(0, stoppedThreads_.size());
230}
231
232void ThreadPoolExecutor::join() {
233 joinKeepAliveOnce();
234 size_t n = 0;
235 {
236 SharedMutex::WriteHolder w{&threadListLock_};
237 maxThreads_.store(0, std::memory_order_release);
238 activeThreads_.store(0, std::memory_order_release);
239 n = threadList_.get().size();
240 removeThreads(n, true);
241 n += threadsToJoin_.load(std::memory_order_relaxed);
242 threadsToJoin_.store(0, std::memory_order_relaxed);
243 }
244 joinStoppedThreads(n);
245 CHECK_EQ(0, threadList_.get().size());
246 CHECK_EQ(0, stoppedThreads_.size());
247}
248
249void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) {
250 getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
251 for (auto tpe : tpes) {
252 f(*tpe);
253 }
254 });
255}
256
257ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
258 const auto now = std::chrono::steady_clock::now();
259 SharedMutex::ReadHolder r{&threadListLock_};
260 ThreadPoolExecutor::PoolStats stats;
261 size_t activeTasks = 0;
262 size_t idleAlive = 0;
263 for (auto thread : threadList_.get()) {
264 if (thread->idle) {
265 const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
266 stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
267 idleAlive++;
268 } else {
269 activeTasks++;
270 }
271 }
272 stats.pendingTaskCount = getPendingTaskCountImpl();
273 stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
274
275 stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
276 stats.activeThreadCount =
277 activeThreads_.load(std::memory_order_relaxed) - idleAlive;
278 stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
279 return stats;
280}
281
282size_t ThreadPoolExecutor::getPendingTaskCount() {
283 SharedMutex::ReadHolder r{&threadListLock_};
284 return getPendingTaskCountImpl();
285}
286
287std::string ThreadPoolExecutor::getName() {
288 auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
289 if (ntf == nullptr) {
290 return folly::demangle(typeid(*this).name()).toStdString();
291 }
292
293 return ntf->getNamePrefix();
294}
295
296std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
297
298void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) {
299 if (*taskStatsCallbacks_->inCallback) {
300 throw std::runtime_error("cannot subscribe in task stats callback");
301 }
302 taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
303}
304
305BlockingQueueAddResult ThreadPoolExecutor::StoppedThreadQueue::add(
306 ThreadPoolExecutor::ThreadPtr item) {
307 std::lock_guard<std::mutex> guard(mutex_);
308 queue_.push(std::move(item));
309 return sem_.post();
310}
311
312ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
313 while (true) {
314 {
315 std::lock_guard<std::mutex> guard(mutex_);
316 if (queue_.size() > 0) {
317 auto item = std::move(queue_.front());
318 queue_.pop();
319 return item;
320 }
321 }
322 sem_.wait();
323 }
324}
325
326folly::Optional<ThreadPoolExecutor::ThreadPtr>
327ThreadPoolExecutor::StoppedThreadQueue::try_take_for(
328 std::chrono::milliseconds time) {
329 while (true) {
330 {
331 std::lock_guard<std::mutex> guard(mutex_);
332 if (queue_.size() > 0) {
333 auto item = std::move(queue_.front());
334 queue_.pop();
335 return item;
336 }
337 }
338 if (!sem_.try_wait_for(time)) {
339 return folly::none;
340 }
341 }
342}
343
344size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
345 std::lock_guard<std::mutex> guard(mutex_);
346 return queue_.size();
347}
348
349void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
350 {
351 SharedMutex::WriteHolder r{&threadListLock_};
352 observers_.push_back(o);
353 for (auto& thread : threadList_.get()) {
354 o->threadPreviouslyStarted(thread.get());
355 }
356 }
357 while (activeThreads_.load(std::memory_order_relaxed) <
358 maxThreads_.load(std::memory_order_relaxed)) {
359 ensureActiveThreads();
360 }
361}
362
363void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
364 SharedMutex::WriteHolder r{&threadListLock_};
365 for (auto& thread : threadList_.get()) {
366 o->threadNotYetStopped(thread.get());
367 }
368
369 for (auto it = observers_.begin(); it != observers_.end(); it++) {
370 if (*it == o) {
371 observers_.erase(it);
372 return;
373 }
374 }
375 DCHECK(false);
376}
377
378// Idle threads may have destroyed themselves, attempt to join
379// them here
380void ThreadPoolExecutor::ensureJoined() {
381 auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
382 if (tojoin) {
383 {
384 SharedMutex::WriteHolder w{&threadListLock_};
385 tojoin = threadsToJoin_.load(std::memory_order_relaxed);
386 threadsToJoin_.store(0, std::memory_order_relaxed);
387 }
388 joinStoppedThreads(tojoin);
389 }
390}
391
392// threadListLock_ must be write locked.
393bool ThreadPoolExecutor::tryTimeoutThread() {
394 // Try to stop based on idle thread timeout (try_take_for),
395 // if there are at least minThreads running.
396 if (!minActive()) {
397 return false;
398 }
399
400 // Remove thread from active count
401 activeThreads_.store(
402 activeThreads_.load(std::memory_order_relaxed) - 1,
403 std::memory_order_relaxed);
404
405 // There is a memory ordering constraint w.r.t the queue
406 // implementation's add() and getPendingTaskCountImpl() - while many
407 // queues have seq_cst ordering, some do not, so add an explicit
408 // barrier. tryTimeoutThread is the slow path and only happens once
409 // every thread timeout; use asymmetric barrier to keep add() fast.
410 asymmetricHeavyBarrier();
411
412 // If this is based on idle thread timeout, then
413 // adjust vars appropriately (otherwise stop() or join()
414 // does this).
415 if (getPendingTaskCountImpl() > 0) {
416 // There are still pending tasks, we can't stop yet.
417 // re-up active threads and return.
418 activeThreads_.store(
419 activeThreads_.load(std::memory_order_relaxed) + 1,
420 std::memory_order_relaxed);
421 return false;
422 }
423
424 threadsToJoin_.store(
425 threadsToJoin_.load(std::memory_order_relaxed) + 1,
426 std::memory_order_relaxed);
427
428 return true;
429}
430
431// If we can't ensure that we were able to hand off a task to a thread,
432// attempt to start a thread that handled the task, if we aren't already
433// running the maximum number of threads.
434void ThreadPoolExecutor::ensureActiveThreads() {
435 ensureJoined();
436
437 // Matches barrier in tryTimeoutThread(). Ensure task added
438 // is seen before loading activeThreads_ below.
439 asymmetricLightBarrier();
440
441 // Fast path assuming we are already at max threads.
442 auto active = activeThreads_.load(std::memory_order_relaxed);
443 auto total = maxThreads_.load(std::memory_order_relaxed);
444
445 if (active >= total) {
446 return;
447 }
448
449 SharedMutex::WriteHolder w{&threadListLock_};
450 // Double check behind lock.
451 active = activeThreads_.load(std::memory_order_relaxed);
452 total = maxThreads_.load(std::memory_order_relaxed);
453 if (active >= total) {
454 return;
455 }
456 ThreadPoolExecutor::addThreads(1);
457 activeThreads_.store(active + 1, std::memory_order_relaxed);
458}
459
460// If an idle thread times out, only join it if there are at least
461// minThreads threads.
462bool ThreadPoolExecutor::minActive() {
463 return activeThreads_.load(std::memory_order_relaxed) >
464 minThreads_.load(std::memory_order_relaxed);
465}
466
467} // namespace folly
468