1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/executors/CPUThreadPoolExecutor.h>
18
19#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
20#include <folly/portability/GFlags.h>
21
22DEFINE_bool(
23 dynamic_cputhreadpoolexecutor,
24 true,
25 "CPUThreadPoolExecutor will dynamically create and destroy threads");
26
27namespace folly {
28
29const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
30
31CPUThreadPoolExecutor::CPUThreadPoolExecutor(
32 size_t numThreads,
33 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
34 std::shared_ptr<ThreadFactory> threadFactory)
35 : ThreadPoolExecutor(
36 numThreads,
37 FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
38 std::move(threadFactory)),
39 taskQueue_(std::move(taskQueue)) {
40 setNumThreads(numThreads);
41}
42
43CPUThreadPoolExecutor::CPUThreadPoolExecutor(
44 std::pair<size_t, size_t> numThreads,
45 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
46 std::shared_ptr<ThreadFactory> threadFactory)
47 : ThreadPoolExecutor(
48 numThreads.first,
49 numThreads.second,
50 std::move(threadFactory)),
51 taskQueue_(std::move(taskQueue)) {
52 setNumThreads(numThreads.first);
53}
54
55CPUThreadPoolExecutor::CPUThreadPoolExecutor(
56 size_t numThreads,
57 std::shared_ptr<ThreadFactory> threadFactory)
58 : CPUThreadPoolExecutor(
59 numThreads,
60 std::make_unique<LifoSemMPMCQueue<CPUTask>>(
61 CPUThreadPoolExecutor::kDefaultMaxQueueSize),
62 std::move(threadFactory)) {}
63
64CPUThreadPoolExecutor::CPUThreadPoolExecutor(
65 std::pair<size_t, size_t> numThreads,
66 std::shared_ptr<ThreadFactory> threadFactory)
67 : CPUThreadPoolExecutor(
68 numThreads,
69 std::make_unique<LifoSemMPMCQueue<CPUTask>>(
70 CPUThreadPoolExecutor::kDefaultMaxQueueSize),
71 std::move(threadFactory)) {}
72
73CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
74 : CPUThreadPoolExecutor(
75 numThreads,
76 std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
77
78CPUThreadPoolExecutor::CPUThreadPoolExecutor(
79 size_t numThreads,
80 int8_t numPriorities,
81 std::shared_ptr<ThreadFactory> threadFactory)
82 : CPUThreadPoolExecutor(
83 numThreads,
84 std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
85 numPriorities,
86 CPUThreadPoolExecutor::kDefaultMaxQueueSize),
87 std::move(threadFactory)) {}
88
89CPUThreadPoolExecutor::CPUThreadPoolExecutor(
90 size_t numThreads,
91 int8_t numPriorities,
92 size_t maxQueueSize,
93 std::shared_ptr<ThreadFactory> threadFactory)
94 : CPUThreadPoolExecutor(
95 numThreads,
96 std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
97 numPriorities,
98 maxQueueSize),
99 std::move(threadFactory)) {}
100
101CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
102 stop();
103 CHECK(threadsToStop_ == 0);
104}
105
106void CPUThreadPoolExecutor::add(Func func) {
107 add(std::move(func), std::chrono::milliseconds(0));
108}
109
110void CPUThreadPoolExecutor::add(
111 Func func,
112 std::chrono::milliseconds expiration,
113 Func expireCallback) {
114 auto result = taskQueue_->add(
115 CPUTask(std::move(func), expiration, std::move(expireCallback)));
116 if (!result.reusedThread) {
117 ensureActiveThreads();
118 }
119}
120
121void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
122 add(std::move(func), priority, std::chrono::milliseconds(0));
123}
124
125void CPUThreadPoolExecutor::add(
126 Func func,
127 int8_t priority,
128 std::chrono::milliseconds expiration,
129 Func expireCallback) {
130 CHECK(getNumPriorities() > 0);
131 auto result = taskQueue_->addWithPriority(
132 CPUTask(std::move(func), expiration, std::move(expireCallback)),
133 priority);
134 if (!result.reusedThread) {
135 ensureActiveThreads();
136 }
137}
138
139uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
140 return taskQueue_->getNumPriorities();
141}
142
143size_t CPUThreadPoolExecutor::getTaskQueueSize() const {
144 return taskQueue_->size();
145}
146
147BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
148CPUThreadPoolExecutor::getTaskQueue() {
149 return taskQueue_.get();
150}
151
152// threadListLock_ must be writelocked.
153bool CPUThreadPoolExecutor::tryDecrToStop() {
154 auto toStop = threadsToStop_.load(std::memory_order_relaxed);
155 if (toStop <= 0) {
156 return false;
157 }
158 threadsToStop_.store(toStop - 1, std::memory_order_relaxed);
159 return true;
160}
161
162bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) {
163 if (tryDecrToStop()) {
164 return true;
165 }
166 if (task) {
167 return false;
168 } else {
169 return tryTimeoutThread();
170 }
171 return true;
172}
173
174void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
175 this->threadPoolHook_.registerThread();
176
177 thread->startupBaton.post();
178 while (true) {
179 auto task = taskQueue_->try_take_for(threadTimeout_);
180 // Handle thread stopping, either by task timeout, or
181 // by 'poison' task added in join() or stop().
182 if (UNLIKELY(!task || task.value().poison)) {
183 // Actually remove the thread from the list.
184 SharedMutex::WriteHolder w{&threadListLock_};
185 if (taskShouldStop(task)) {
186 for (auto& o : observers_) {
187 o->threadStopped(thread.get());
188 }
189 threadList_.remove(thread);
190 stoppedThreads_.add(thread);
191 return;
192 } else {
193 continue;
194 }
195 }
196
197 runTask(thread, std::move(task.value()));
198
199 if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
200 SharedMutex::WriteHolder w{&threadListLock_};
201 if (tryDecrToStop()) {
202 threadList_.remove(thread);
203 stoppedThreads_.add(thread);
204 return;
205 }
206 }
207 }
208}
209
210void CPUThreadPoolExecutor::stopThreads(size_t n) {
211 threadsToStop_ += n;
212 for (size_t i = 0; i < n; i++) {
213 taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
214 }
215}
216
217// threadListLock_ is read (or write) locked.
218size_t CPUThreadPoolExecutor::getPendingTaskCountImpl() {
219 return taskQueue_->size();
220}
221
222} // namespace folly
223