1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/executors/CPUThreadPoolExecutor.h> |
18 | |
19 | #include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h> |
20 | #include <folly/portability/GFlags.h> |
21 | |
22 | DEFINE_bool( |
23 | dynamic_cputhreadpoolexecutor, |
24 | true, |
25 | "CPUThreadPoolExecutor will dynamically create and destroy threads" ); |
26 | |
27 | namespace folly { |
28 | |
29 | const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14; |
30 | |
31 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
32 | size_t numThreads, |
33 | std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, |
34 | std::shared_ptr<ThreadFactory> threadFactory) |
35 | : ThreadPoolExecutor( |
36 | numThreads, |
37 | FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads, |
38 | std::move(threadFactory)), |
39 | taskQueue_(std::move(taskQueue)) { |
40 | setNumThreads(numThreads); |
41 | } |
42 | |
43 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
44 | std::pair<size_t, size_t> numThreads, |
45 | std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, |
46 | std::shared_ptr<ThreadFactory> threadFactory) |
47 | : ThreadPoolExecutor( |
48 | numThreads.first, |
49 | numThreads.second, |
50 | std::move(threadFactory)), |
51 | taskQueue_(std::move(taskQueue)) { |
52 | setNumThreads(numThreads.first); |
53 | } |
54 | |
55 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
56 | size_t numThreads, |
57 | std::shared_ptr<ThreadFactory> threadFactory) |
58 | : CPUThreadPoolExecutor( |
59 | numThreads, |
60 | std::make_unique<LifoSemMPMCQueue<CPUTask>>( |
61 | CPUThreadPoolExecutor::kDefaultMaxQueueSize), |
62 | std::move(threadFactory)) {} |
63 | |
64 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
65 | std::pair<size_t, size_t> numThreads, |
66 | std::shared_ptr<ThreadFactory> threadFactory) |
67 | : CPUThreadPoolExecutor( |
68 | numThreads, |
69 | std::make_unique<LifoSemMPMCQueue<CPUTask>>( |
70 | CPUThreadPoolExecutor::kDefaultMaxQueueSize), |
71 | std::move(threadFactory)) {} |
72 | |
73 | CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads) |
74 | : CPUThreadPoolExecutor( |
75 | numThreads, |
76 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )) {} |
77 | |
78 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
79 | size_t numThreads, |
80 | int8_t numPriorities, |
81 | std::shared_ptr<ThreadFactory> threadFactory) |
82 | : CPUThreadPoolExecutor( |
83 | numThreads, |
84 | std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>( |
85 | numPriorities, |
86 | CPUThreadPoolExecutor::kDefaultMaxQueueSize), |
87 | std::move(threadFactory)) {} |
88 | |
89 | CPUThreadPoolExecutor::CPUThreadPoolExecutor( |
90 | size_t numThreads, |
91 | int8_t numPriorities, |
92 | size_t maxQueueSize, |
93 | std::shared_ptr<ThreadFactory> threadFactory) |
94 | : CPUThreadPoolExecutor( |
95 | numThreads, |
96 | std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>( |
97 | numPriorities, |
98 | maxQueueSize), |
99 | std::move(threadFactory)) {} |
100 | |
101 | CPUThreadPoolExecutor::~CPUThreadPoolExecutor() { |
102 | stop(); |
103 | CHECK(threadsToStop_ == 0); |
104 | } |
105 | |
106 | void CPUThreadPoolExecutor::add(Func func) { |
107 | add(std::move(func), std::chrono::milliseconds(0)); |
108 | } |
109 | |
110 | void CPUThreadPoolExecutor::add( |
111 | Func func, |
112 | std::chrono::milliseconds expiration, |
113 | Func expireCallback) { |
114 | auto result = taskQueue_->add( |
115 | CPUTask(std::move(func), expiration, std::move(expireCallback))); |
116 | if (!result.reusedThread) { |
117 | ensureActiveThreads(); |
118 | } |
119 | } |
120 | |
121 | void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) { |
122 | add(std::move(func), priority, std::chrono::milliseconds(0)); |
123 | } |
124 | |
125 | void CPUThreadPoolExecutor::add( |
126 | Func func, |
127 | int8_t priority, |
128 | std::chrono::milliseconds expiration, |
129 | Func expireCallback) { |
130 | CHECK(getNumPriorities() > 0); |
131 | auto result = taskQueue_->addWithPriority( |
132 | CPUTask(std::move(func), expiration, std::move(expireCallback)), |
133 | priority); |
134 | if (!result.reusedThread) { |
135 | ensureActiveThreads(); |
136 | } |
137 | } |
138 | |
139 | uint8_t CPUThreadPoolExecutor::getNumPriorities() const { |
140 | return taskQueue_->getNumPriorities(); |
141 | } |
142 | |
143 | size_t CPUThreadPoolExecutor::getTaskQueueSize() const { |
144 | return taskQueue_->size(); |
145 | } |
146 | |
147 | BlockingQueue<CPUThreadPoolExecutor::CPUTask>* |
148 | CPUThreadPoolExecutor::getTaskQueue() { |
149 | return taskQueue_.get(); |
150 | } |
151 | |
152 | // threadListLock_ must be writelocked. |
153 | bool CPUThreadPoolExecutor::tryDecrToStop() { |
154 | auto toStop = threadsToStop_.load(std::memory_order_relaxed); |
155 | if (toStop <= 0) { |
156 | return false; |
157 | } |
158 | threadsToStop_.store(toStop - 1, std::memory_order_relaxed); |
159 | return true; |
160 | } |
161 | |
162 | bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) { |
163 | if (tryDecrToStop()) { |
164 | return true; |
165 | } |
166 | if (task) { |
167 | return false; |
168 | } else { |
169 | return tryTimeoutThread(); |
170 | } |
171 | return true; |
172 | } |
173 | |
174 | void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) { |
175 | this->threadPoolHook_.registerThread(); |
176 | |
177 | thread->startupBaton.post(); |
178 | while (true) { |
179 | auto task = taskQueue_->try_take_for(threadTimeout_); |
180 | // Handle thread stopping, either by task timeout, or |
181 | // by 'poison' task added in join() or stop(). |
182 | if (UNLIKELY(!task || task.value().poison)) { |
183 | // Actually remove the thread from the list. |
184 | SharedMutex::WriteHolder w{&threadListLock_}; |
185 | if (taskShouldStop(task)) { |
186 | for (auto& o : observers_) { |
187 | o->threadStopped(thread.get()); |
188 | } |
189 | threadList_.remove(thread); |
190 | stoppedThreads_.add(thread); |
191 | return; |
192 | } else { |
193 | continue; |
194 | } |
195 | } |
196 | |
197 | runTask(thread, std::move(task.value())); |
198 | |
199 | if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) { |
200 | SharedMutex::WriteHolder w{&threadListLock_}; |
201 | if (tryDecrToStop()) { |
202 | threadList_.remove(thread); |
203 | stoppedThreads_.add(thread); |
204 | return; |
205 | } |
206 | } |
207 | } |
208 | } |
209 | |
210 | void CPUThreadPoolExecutor::stopThreads(size_t n) { |
211 | threadsToStop_ += n; |
212 | for (size_t i = 0; i < n; i++) { |
213 | taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI); |
214 | } |
215 | } |
216 | |
217 | // threadListLock_ is read (or write) locked. |
218 | size_t CPUThreadPoolExecutor::getPendingTaskCountImpl() { |
219 | return taskQueue_->size(); |
220 | } |
221 | |
222 | } // namespace folly |
223 | |