1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/executors/ThreadPoolExecutor.h> |
18 | |
19 | #include <folly/executors/GlobalThreadPoolList.h> |
20 | #include <folly/synchronization/AsymmetricMemoryBarrier.h> |
21 | |
22 | namespace folly { |
23 | |
24 | using SyncVecThreadPoolExecutors = |
25 | folly::Synchronized<std::vector<ThreadPoolExecutor*>>; |
26 | |
27 | SyncVecThreadPoolExecutors& getSyncVecThreadPoolExecutors() { |
28 | static Indestructible<SyncVecThreadPoolExecutors> storage; |
29 | return *storage; |
30 | } |
31 | |
32 | DEFINE_int64( |
33 | threadtimeout_ms, |
34 | 60000, |
35 | "Idle time before ThreadPoolExecutor threads are joined" ); |
36 | |
37 | ThreadPoolExecutor::ThreadPoolExecutor( |
38 | size_t /* maxThreads */, |
39 | size_t minThreads, |
40 | std::shared_ptr<ThreadFactory> threadFactory, |
41 | bool isWaitForAll) |
42 | : threadFactory_(std::move(threadFactory)), |
43 | isWaitForAll_(isWaitForAll), |
44 | taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()), |
45 | threadPoolHook_("folly::ThreadPoolExecutor" ), |
46 | minThreads_(minThreads), |
47 | threadTimeout_(FLAGS_threadtimeout_ms) { |
48 | getSyncVecThreadPoolExecutors()->push_back(this); |
49 | } |
50 | |
51 | ThreadPoolExecutor::~ThreadPoolExecutor() { |
52 | joinKeepAliveOnce(); |
53 | CHECK_EQ(0, threadList_.get().size()); |
54 | getSyncVecThreadPoolExecutors().withWLock([this](auto& tpe) { |
55 | tpe.erase(std::remove(tpe.begin(), tpe.end(), this), tpe.end()); |
56 | }); |
57 | } |
58 | |
59 | ThreadPoolExecutor::Task::Task( |
60 | Func&& func, |
61 | std::chrono::milliseconds expiration, |
62 | Func&& expireCallback) |
63 | : func_(std::move(func)), |
64 | expiration_(expiration), |
65 | expireCallback_(std::move(expireCallback)), |
66 | context_(folly::RequestContext::saveContext()) { |
67 | // Assume that the task in enqueued on creation |
68 | enqueueTime_ = std::chrono::steady_clock::now(); |
69 | } |
70 | |
71 | void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) { |
72 | thread->idle = false; |
73 | auto startTime = std::chrono::steady_clock::now(); |
74 | task.stats_.waitTime = startTime - task.enqueueTime_; |
75 | if (task.expiration_ > std::chrono::milliseconds(0) && |
76 | task.stats_.waitTime >= task.expiration_) { |
77 | task.stats_.expired = true; |
78 | if (task.expireCallback_ != nullptr) { |
79 | task.expireCallback_(); |
80 | } |
81 | } else { |
82 | folly::RequestContextScopeGuard rctx(task.context_); |
83 | try { |
84 | task.func_(); |
85 | } catch (const std::exception& e) { |
86 | LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " |
87 | << typeid(e).name() << " exception: " << e.what(); |
88 | } catch (...) { |
89 | LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception " |
90 | "object" ; |
91 | } |
92 | task.stats_.runTime = std::chrono::steady_clock::now() - startTime; |
93 | } |
94 | thread->idle = true; |
95 | thread->lastActiveTime = std::chrono::steady_clock::now(); |
96 | thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) { |
97 | *thread->taskStatsCallbacks->inCallback = true; |
98 | SCOPE_EXIT { |
99 | *thread->taskStatsCallbacks->inCallback = false; |
100 | }; |
101 | try { |
102 | for (auto& callback : callbacks) { |
103 | callback(task.stats_); |
104 | } |
105 | } catch (const std::exception& e) { |
106 | LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw " |
107 | "unhandled " |
108 | << typeid(e).name() << " exception: " << e.what(); |
109 | } catch (...) { |
110 | LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw " |
111 | "unhandled non-exception object" ; |
112 | } |
113 | }); |
114 | } |
115 | |
116 | size_t ThreadPoolExecutor::numThreads() { |
117 | return maxThreads_.load(std::memory_order_relaxed); |
118 | } |
119 | |
120 | size_t ThreadPoolExecutor::numActiveThreads() { |
121 | return activeThreads_.load(std::memory_order_relaxed); |
122 | } |
123 | |
124 | // Set the maximum number of running threads. |
125 | void ThreadPoolExecutor::setNumThreads(size_t numThreads) { |
126 | /* Since ThreadPoolExecutor may be dynamically adjusting the number of |
127 | threads, we adjust the relevant variables instead of changing |
128 | the number of threads directly. Roughly: |
129 | |
130 | If numThreads < minthreads reset minThreads to numThreads. |
131 | |
132 | If numThreads < active threads, reduce number of running threads. |
133 | |
134 | If the number of pending tasks is > 0, then increase the currently |
135 | active number of threads such that we can run all the tasks, or reach |
136 | numThreads. |
137 | |
138 | Note that if there are observers, we actually have to create all |
139 | the threads, because some observer implementations need to 'observe' |
140 | all thread creation (see tests for an example of this) |
141 | */ |
142 | |
143 | size_t numThreadsToJoin = 0; |
144 | { |
145 | SharedMutex::WriteHolder w{&threadListLock_}; |
146 | auto pending = getPendingTaskCountImpl(); |
147 | maxThreads_.store(numThreads, std::memory_order_relaxed); |
148 | auto active = activeThreads_.load(std::memory_order_relaxed); |
149 | auto minthreads = minThreads_.load(std::memory_order_relaxed); |
150 | if (numThreads < minthreads) { |
151 | minthreads = numThreads; |
152 | minThreads_.store(numThreads, std::memory_order_relaxed); |
153 | } |
154 | if (active > numThreads) { |
155 | numThreadsToJoin = active - numThreads; |
156 | if (numThreadsToJoin > active - minthreads) { |
157 | numThreadsToJoin = active - minthreads; |
158 | } |
159 | ThreadPoolExecutor::removeThreads(numThreadsToJoin, false); |
160 | activeThreads_.store( |
161 | active - numThreadsToJoin, std::memory_order_relaxed); |
162 | } else if (pending > 0 || observers_.size() > 0 || active < minthreads) { |
163 | size_t numToAdd = std::min(pending, numThreads - active); |
164 | if (observers_.size() > 0) { |
165 | numToAdd = numThreads - active; |
166 | } |
167 | if (active + numToAdd < minthreads) { |
168 | numToAdd = minthreads - active; |
169 | } |
170 | ThreadPoolExecutor::addThreads(numToAdd); |
171 | activeThreads_.store(active + numToAdd, std::memory_order_relaxed); |
172 | } |
173 | } |
174 | |
175 | /* We may have removed some threads, attempt to join them */ |
176 | joinStoppedThreads(numThreadsToJoin); |
177 | } |
178 | |
179 | // threadListLock_ is writelocked |
180 | void ThreadPoolExecutor::addThreads(size_t n) { |
181 | std::vector<ThreadPtr> newThreads; |
182 | for (size_t i = 0; i < n; i++) { |
183 | newThreads.push_back(makeThread()); |
184 | } |
185 | for (auto& thread : newThreads) { |
186 | // TODO need a notion of failing to create the thread |
187 | // and then handling for that case |
188 | thread->handle = threadFactory_->newThread( |
189 | std::bind(&ThreadPoolExecutor::threadRun, this, thread)); |
190 | threadList_.add(thread); |
191 | } |
192 | for (auto& thread : newThreads) { |
193 | thread->startupBaton.wait(); |
194 | } |
195 | for (auto& o : observers_) { |
196 | for (auto& thread : newThreads) { |
197 | o->threadStarted(thread.get()); |
198 | } |
199 | } |
200 | } |
201 | |
202 | // threadListLock_ is writelocked |
203 | void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) { |
204 | isJoin_ = isJoin; |
205 | stopThreads(n); |
206 | } |
207 | |
208 | void ThreadPoolExecutor::joinStoppedThreads(size_t n) { |
209 | for (size_t i = 0; i < n; i++) { |
210 | auto thread = stoppedThreads_.take(); |
211 | thread->handle.join(); |
212 | } |
213 | } |
214 | |
215 | void ThreadPoolExecutor::stop() { |
216 | joinKeepAliveOnce(); |
217 | size_t n = 0; |
218 | { |
219 | SharedMutex::WriteHolder w{&threadListLock_}; |
220 | maxThreads_.store(0, std::memory_order_release); |
221 | activeThreads_.store(0, std::memory_order_release); |
222 | n = threadList_.get().size(); |
223 | removeThreads(n, false); |
224 | n += threadsToJoin_.load(std::memory_order_relaxed); |
225 | threadsToJoin_.store(0, std::memory_order_relaxed); |
226 | } |
227 | joinStoppedThreads(n); |
228 | CHECK_EQ(0, threadList_.get().size()); |
229 | CHECK_EQ(0, stoppedThreads_.size()); |
230 | } |
231 | |
232 | void ThreadPoolExecutor::join() { |
233 | joinKeepAliveOnce(); |
234 | size_t n = 0; |
235 | { |
236 | SharedMutex::WriteHolder w{&threadListLock_}; |
237 | maxThreads_.store(0, std::memory_order_release); |
238 | activeThreads_.store(0, std::memory_order_release); |
239 | n = threadList_.get().size(); |
240 | removeThreads(n, true); |
241 | n += threadsToJoin_.load(std::memory_order_relaxed); |
242 | threadsToJoin_.store(0, std::memory_order_relaxed); |
243 | } |
244 | joinStoppedThreads(n); |
245 | CHECK_EQ(0, threadList_.get().size()); |
246 | CHECK_EQ(0, stoppedThreads_.size()); |
247 | } |
248 | |
249 | void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) { |
250 | getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) { |
251 | for (auto tpe : tpes) { |
252 | f(*tpe); |
253 | } |
254 | }); |
255 | } |
256 | |
257 | ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() { |
258 | const auto now = std::chrono::steady_clock::now(); |
259 | SharedMutex::ReadHolder r{&threadListLock_}; |
260 | ThreadPoolExecutor::PoolStats stats; |
261 | size_t activeTasks = 0; |
262 | size_t idleAlive = 0; |
263 | for (auto thread : threadList_.get()) { |
264 | if (thread->idle) { |
265 | const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime; |
266 | stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime); |
267 | idleAlive++; |
268 | } else { |
269 | activeTasks++; |
270 | } |
271 | } |
272 | stats.pendingTaskCount = getPendingTaskCountImpl(); |
273 | stats.totalTaskCount = stats.pendingTaskCount + activeTasks; |
274 | |
275 | stats.threadCount = maxThreads_.load(std::memory_order_relaxed); |
276 | stats.activeThreadCount = |
277 | activeThreads_.load(std::memory_order_relaxed) - idleAlive; |
278 | stats.idleThreadCount = stats.threadCount - stats.activeThreadCount; |
279 | return stats; |
280 | } |
281 | |
282 | size_t ThreadPoolExecutor::getPendingTaskCount() { |
283 | SharedMutex::ReadHolder r{&threadListLock_}; |
284 | return getPendingTaskCountImpl(); |
285 | } |
286 | |
287 | std::string ThreadPoolExecutor::getName() { |
288 | auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get()); |
289 | if (ntf == nullptr) { |
290 | return folly::demangle(typeid(*this).name()).toStdString(); |
291 | } |
292 | |
293 | return ntf->getNamePrefix(); |
294 | } |
295 | |
296 | std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0); |
297 | |
298 | void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) { |
299 | if (*taskStatsCallbacks_->inCallback) { |
300 | throw std::runtime_error("cannot subscribe in task stats callback" ); |
301 | } |
302 | taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb)); |
303 | } |
304 | |
305 | BlockingQueueAddResult ThreadPoolExecutor::StoppedThreadQueue::add( |
306 | ThreadPoolExecutor::ThreadPtr item) { |
307 | std::lock_guard<std::mutex> guard(mutex_); |
308 | queue_.push(std::move(item)); |
309 | return sem_.post(); |
310 | } |
311 | |
312 | ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() { |
313 | while (true) { |
314 | { |
315 | std::lock_guard<std::mutex> guard(mutex_); |
316 | if (queue_.size() > 0) { |
317 | auto item = std::move(queue_.front()); |
318 | queue_.pop(); |
319 | return item; |
320 | } |
321 | } |
322 | sem_.wait(); |
323 | } |
324 | } |
325 | |
326 | folly::Optional<ThreadPoolExecutor::ThreadPtr> |
327 | ThreadPoolExecutor::StoppedThreadQueue::try_take_for( |
328 | std::chrono::milliseconds time) { |
329 | while (true) { |
330 | { |
331 | std::lock_guard<std::mutex> guard(mutex_); |
332 | if (queue_.size() > 0) { |
333 | auto item = std::move(queue_.front()); |
334 | queue_.pop(); |
335 | return item; |
336 | } |
337 | } |
338 | if (!sem_.try_wait_for(time)) { |
339 | return folly::none; |
340 | } |
341 | } |
342 | } |
343 | |
344 | size_t ThreadPoolExecutor::StoppedThreadQueue::size() { |
345 | std::lock_guard<std::mutex> guard(mutex_); |
346 | return queue_.size(); |
347 | } |
348 | |
349 | void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) { |
350 | { |
351 | SharedMutex::WriteHolder r{&threadListLock_}; |
352 | observers_.push_back(o); |
353 | for (auto& thread : threadList_.get()) { |
354 | o->threadPreviouslyStarted(thread.get()); |
355 | } |
356 | } |
357 | while (activeThreads_.load(std::memory_order_relaxed) < |
358 | maxThreads_.load(std::memory_order_relaxed)) { |
359 | ensureActiveThreads(); |
360 | } |
361 | } |
362 | |
363 | void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) { |
364 | SharedMutex::WriteHolder r{&threadListLock_}; |
365 | for (auto& thread : threadList_.get()) { |
366 | o->threadNotYetStopped(thread.get()); |
367 | } |
368 | |
369 | for (auto it = observers_.begin(); it != observers_.end(); it++) { |
370 | if (*it == o) { |
371 | observers_.erase(it); |
372 | return; |
373 | } |
374 | } |
375 | DCHECK(false); |
376 | } |
377 | |
378 | // Idle threads may have destroyed themselves, attempt to join |
379 | // them here |
380 | void ThreadPoolExecutor::ensureJoined() { |
381 | auto tojoin = threadsToJoin_.load(std::memory_order_relaxed); |
382 | if (tojoin) { |
383 | { |
384 | SharedMutex::WriteHolder w{&threadListLock_}; |
385 | tojoin = threadsToJoin_.load(std::memory_order_relaxed); |
386 | threadsToJoin_.store(0, std::memory_order_relaxed); |
387 | } |
388 | joinStoppedThreads(tojoin); |
389 | } |
390 | } |
391 | |
392 | // threadListLock_ must be write locked. |
393 | bool ThreadPoolExecutor::tryTimeoutThread() { |
394 | // Try to stop based on idle thread timeout (try_take_for), |
395 | // if there are at least minThreads running. |
396 | if (!minActive()) { |
397 | return false; |
398 | } |
399 | |
400 | // Remove thread from active count |
401 | activeThreads_.store( |
402 | activeThreads_.load(std::memory_order_relaxed) - 1, |
403 | std::memory_order_relaxed); |
404 | |
405 | // There is a memory ordering constraint w.r.t the queue |
406 | // implementation's add() and getPendingTaskCountImpl() - while many |
407 | // queues have seq_cst ordering, some do not, so add an explicit |
408 | // barrier. tryTimeoutThread is the slow path and only happens once |
409 | // every thread timeout; use asymmetric barrier to keep add() fast. |
410 | asymmetricHeavyBarrier(); |
411 | |
412 | // If this is based on idle thread timeout, then |
413 | // adjust vars appropriately (otherwise stop() or join() |
414 | // does this). |
415 | if (getPendingTaskCountImpl() > 0) { |
416 | // There are still pending tasks, we can't stop yet. |
417 | // re-up active threads and return. |
418 | activeThreads_.store( |
419 | activeThreads_.load(std::memory_order_relaxed) + 1, |
420 | std::memory_order_relaxed); |
421 | return false; |
422 | } |
423 | |
424 | threadsToJoin_.store( |
425 | threadsToJoin_.load(std::memory_order_relaxed) + 1, |
426 | std::memory_order_relaxed); |
427 | |
428 | return true; |
429 | } |
430 | |
431 | // If we can't ensure that we were able to hand off a task to a thread, |
432 | // attempt to start a thread that handled the task, if we aren't already |
433 | // running the maximum number of threads. |
434 | void ThreadPoolExecutor::ensureActiveThreads() { |
435 | ensureJoined(); |
436 | |
437 | // Matches barrier in tryTimeoutThread(). Ensure task added |
438 | // is seen before loading activeThreads_ below. |
439 | asymmetricLightBarrier(); |
440 | |
441 | // Fast path assuming we are already at max threads. |
442 | auto active = activeThreads_.load(std::memory_order_relaxed); |
443 | auto total = maxThreads_.load(std::memory_order_relaxed); |
444 | |
445 | if (active >= total) { |
446 | return; |
447 | } |
448 | |
449 | SharedMutex::WriteHolder w{&threadListLock_}; |
450 | // Double check behind lock. |
451 | active = activeThreads_.load(std::memory_order_relaxed); |
452 | total = maxThreads_.load(std::memory_order_relaxed); |
453 | if (active >= total) { |
454 | return; |
455 | } |
456 | ThreadPoolExecutor::addThreads(1); |
457 | activeThreads_.store(active + 1, std::memory_order_relaxed); |
458 | } |
459 | |
460 | // If an idle thread times out, only join it if there are at least |
461 | // minThreads threads. |
462 | bool ThreadPoolExecutor::minActive() { |
463 | return activeThreads_.load(std::memory_order_relaxed) > |
464 | minThreads_.load(std::memory_order_relaxed); |
465 | } |
466 | |
467 | } // namespace folly |
468 | |