1 | // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 | // for details. All rights reserved. Use of this source code is governed by a |
3 | // BSD-style license that can be found in the LICENSE file. |
4 | |
5 | #include "vm/thread_pool.h" |
6 | |
7 | #include "vm/dart.h" |
8 | #include "vm/flags.h" |
9 | #include "vm/lockers.h" |
10 | |
11 | namespace dart { |
12 | |
13 | DEFINE_FLAG(int, |
14 | worker_timeout_millis, |
15 | 5000, |
16 | "Free workers when they have been idle for this amount of time." ); |
17 | |
18 | static int64_t ComputeTimeout(int64_t idle_start) { |
19 | int64_t worker_timeout_micros = |
20 | FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond; |
21 | if (worker_timeout_micros <= 0) { |
22 | // No timeout. |
23 | return 0; |
24 | } else { |
25 | int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start; |
26 | if (waited >= worker_timeout_micros) { |
27 | // We must have gotten a spurious wakeup just before we timed |
28 | // out. Give the worker one last desperate chance to live. We |
29 | // are merciful. |
30 | return 1; |
31 | } else { |
32 | return worker_timeout_micros - waited; |
33 | } |
34 | } |
35 | } |
36 | |
37 | ThreadPool::ThreadPool(uintptr_t max_pool_size) |
38 | : all_workers_dead_(false), max_pool_size_(max_pool_size) {} |
39 | |
40 | ThreadPool::~ThreadPool() { |
41 | Shutdown(); |
42 | } |
43 | |
44 | void ThreadPool::Shutdown() { |
45 | { |
46 | MonitorLocker ml(&pool_monitor_); |
47 | |
48 | // Prevent scheduling of new tasks. |
49 | shutting_down_ = true; |
50 | |
51 | if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) { |
52 | // All workers have already died. |
53 | all_workers_dead_ = true; |
54 | } else { |
55 | // Tell workers to drain remaining work and then shut down. |
56 | ml.NotifyAll(); |
57 | } |
58 | } |
59 | |
60 | // Wait until all workers are dead. Any new death will notify the exit |
61 | // monitor. |
62 | { |
63 | MonitorLocker eml(&exit_monitor_); |
64 | while (!all_workers_dead_) { |
65 | eml.Wait(); |
66 | } |
67 | } |
68 | ASSERT(count_idle_ == 0); |
69 | ASSERT(count_running_ == 0); |
70 | ASSERT(idle_workers_.IsEmpty()); |
71 | ASSERT(running_workers_.IsEmpty()); |
72 | |
73 | WorkerList dead_workers_to_join; |
74 | { |
75 | MonitorLocker ml(&pool_monitor_); |
76 | ObtainDeadWorkersLocked(&dead_workers_to_join); |
77 | } |
78 | JoinDeadWorkersLocked(&dead_workers_to_join); |
79 | |
80 | ASSERT(count_dead_ == 0); |
81 | ASSERT(dead_workers_.IsEmpty()); |
82 | } |
83 | |
84 | bool ThreadPool::RunImpl(std::unique_ptr<Task> task) { |
85 | Worker* new_worker = nullptr; |
86 | { |
87 | MonitorLocker ml(&pool_monitor_); |
88 | if (shutting_down_) { |
89 | return false; |
90 | } |
91 | new_worker = ScheduleTaskLocked(&ml, std::move(task)); |
92 | } |
93 | if (new_worker != nullptr) { |
94 | new_worker->StartThread(); |
95 | } |
96 | return true; |
97 | } |
98 | |
99 | bool ThreadPool::CurrentThreadIsWorker() { |
100 | auto worker = |
101 | static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_); |
102 | return worker != nullptr && worker->pool_ == this; |
103 | } |
104 | |
105 | void ThreadPool::MarkCurrentWorkerAsBlocked() { |
106 | auto worker = |
107 | static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_); |
108 | Worker* new_worker = nullptr; |
109 | if (worker != nullptr) { |
110 | MonitorLocker ml(&pool_monitor_); |
111 | ASSERT(!worker->is_blocked_); |
112 | worker->is_blocked_ = true; |
113 | if (max_pool_size_ > 0) { |
114 | ++max_pool_size_; |
115 | // This thread is blocked and therefore no longer usable as a worker. |
116 | // If we have pending tasks and there are no idle workers, we will spawn a |
117 | // new thread (temporarily allow exceeding the maximum pool size) to |
118 | // handle the pending tasks. |
119 | if (idle_workers_.IsEmpty() && pending_tasks_ > 0) { |
120 | new_worker = new Worker(this); |
121 | idle_workers_.Append(new_worker); |
122 | count_idle_++; |
123 | } |
124 | } |
125 | } |
126 | if (new_worker != nullptr) { |
127 | new_worker->StartThread(); |
128 | } |
129 | } |
130 | |
131 | void ThreadPool::MarkCurrentWorkerAsUnBlocked() { |
132 | auto worker = |
133 | static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_); |
134 | if (worker != nullptr) { |
135 | MonitorLocker ml(&pool_monitor_); |
136 | if (worker->is_blocked_) { |
137 | worker->is_blocked_ = false; |
138 | if (max_pool_size_ > 0) { |
139 | --max_pool_size_; |
140 | ASSERT(max_pool_size_ > 0); |
141 | } |
142 | } |
143 | } |
144 | } |
145 | |
146 | void ThreadPool::WorkerLoop(Worker* worker) { |
147 | WorkerList dead_workers_to_join; |
148 | |
149 | while (true) { |
150 | MonitorLocker ml(&pool_monitor_); |
151 | |
152 | if (!tasks_.IsEmpty()) { |
153 | IdleToRunningLocked(worker); |
154 | while (!tasks_.IsEmpty()) { |
155 | std::unique_ptr<Task> task(tasks_.RemoveFirst()); |
156 | pending_tasks_--; |
157 | MonitorLeaveScope mls(&ml); |
158 | task->Run(); |
159 | ASSERT(Isolate::Current() == nullptr); |
160 | task.reset(); |
161 | } |
162 | RunningToIdleLocked(worker); |
163 | } |
164 | |
165 | if (running_workers_.IsEmpty()) { |
166 | ASSERT(tasks_.IsEmpty()); |
167 | OnEnterIdleLocked(&ml); |
168 | if (!tasks_.IsEmpty()) { |
169 | continue; |
170 | } |
171 | } |
172 | |
173 | if (shutting_down_) { |
174 | ObtainDeadWorkersLocked(&dead_workers_to_join); |
175 | IdleToDeadLocked(worker); |
176 | break; |
177 | } |
178 | |
179 | // Sleep until we get a new task, we time out or we're shutdown. |
180 | const int64_t idle_start = OS::GetCurrentMonotonicMicros(); |
181 | bool done = false; |
182 | while (!done) { |
183 | const auto result = ml.WaitMicros(ComputeTimeout(idle_start)); |
184 | |
185 | // We have to drain all pending tasks. |
186 | if (!tasks_.IsEmpty()) break; |
187 | |
188 | if (shutting_down_ || result == Monitor::kTimedOut) { |
189 | done = true; |
190 | break; |
191 | } |
192 | } |
193 | if (done) { |
194 | ObtainDeadWorkersLocked(&dead_workers_to_join); |
195 | IdleToDeadLocked(worker); |
196 | break; |
197 | } |
198 | } |
199 | |
200 | // Before we transitioned to dead we obtained the list of previously died dead |
201 | // workers, which we join here. Since every death of a worker will join |
202 | // previously died workers, we keep the pending non-joined [dead_workers_] to |
203 | // effectively 1. |
204 | JoinDeadWorkersLocked(&dead_workers_to_join); |
205 | } |
206 | |
207 | void ThreadPool::IdleToRunningLocked(Worker* worker) { |
208 | ASSERT(idle_workers_.ContainsForDebugging(worker)); |
209 | idle_workers_.Remove(worker); |
210 | running_workers_.Append(worker); |
211 | count_idle_--; |
212 | count_running_++; |
213 | } |
214 | |
215 | void ThreadPool::RunningToIdleLocked(Worker* worker) { |
216 | ASSERT(tasks_.IsEmpty()); |
217 | |
218 | ASSERT(running_workers_.ContainsForDebugging(worker)); |
219 | running_workers_.Remove(worker); |
220 | idle_workers_.Append(worker); |
221 | count_running_--; |
222 | count_idle_++; |
223 | } |
224 | |
225 | void ThreadPool::IdleToDeadLocked(Worker* worker) { |
226 | ASSERT(tasks_.IsEmpty()); |
227 | |
228 | ASSERT(idle_workers_.ContainsForDebugging(worker)); |
229 | idle_workers_.Remove(worker); |
230 | dead_workers_.Append(worker); |
231 | count_idle_--; |
232 | count_dead_++; |
233 | |
234 | // Notify shutdown thread that the worker thread is about to finish. |
235 | if (shutting_down_) { |
236 | if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) { |
237 | all_workers_dead_ = true; |
238 | MonitorLocker eml(&exit_monitor_); |
239 | eml.Notify(); |
240 | } |
241 | } |
242 | } |
243 | |
244 | void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) { |
245 | dead_workers_to_join->AppendList(&dead_workers_); |
246 | ASSERT(dead_workers_.IsEmpty()); |
247 | count_dead_ = 0; |
248 | } |
249 | |
250 | void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) { |
251 | auto it = dead_workers_to_join->begin(); |
252 | while (it != dead_workers_to_join->end()) { |
253 | Worker* worker = *it; |
254 | it = dead_workers_to_join->Erase(it); |
255 | |
256 | OSThread::Join(worker->join_id_); |
257 | delete worker; |
258 | } |
259 | ASSERT(dead_workers_to_join->IsEmpty()); |
260 | } |
261 | |
262 | ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml, |
263 | std::unique_ptr<Task> task) { |
264 | // Enqueue the new task. |
265 | tasks_.Append(task.release()); |
266 | pending_tasks_++; |
267 | ASSERT(pending_tasks_ >= 1); |
268 | |
269 | // Notify existing idle worker (if available). |
270 | if (count_idle_ >= pending_tasks_) { |
271 | ASSERT(!idle_workers_.IsEmpty()); |
272 | ml->Notify(); |
273 | return nullptr; |
274 | } |
275 | |
276 | // If we have maxed out the number of threads running, we will not start a |
277 | // new one. |
278 | if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) { |
279 | if (!idle_workers_.IsEmpty()) { |
280 | ml->Notify(); |
281 | } |
282 | return nullptr; |
283 | } |
284 | |
285 | // Otherwise start a new worker. |
286 | auto new_worker = new Worker(this); |
287 | idle_workers_.Append(new_worker); |
288 | count_idle_++; |
289 | return new_worker; |
290 | } |
291 | |
292 | ThreadPool::Worker::Worker(ThreadPool* pool) |
293 | : pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {} |
294 | |
295 | void ThreadPool::Worker::StartThread() { |
296 | int result = OSThread::Start("DartWorker" , &Worker::Main, |
297 | reinterpret_cast<uword>(this)); |
298 | if (result != 0) { |
299 | FATAL1("Could not start worker thread: result = %d." , result); |
300 | } |
301 | } |
302 | |
303 | void ThreadPool::Worker::Main(uword args) { |
304 | OSThread* os_thread = OSThread::Current(); |
305 | ASSERT(os_thread != nullptr); |
306 | |
307 | Worker* worker = reinterpret_cast<Worker*>(args); |
308 | ThreadPool* pool = worker->pool_; |
309 | |
310 | os_thread->owning_thread_pool_worker_ = worker; |
311 | worker->os_thread_ = os_thread; |
312 | |
313 | // Once the worker quits it needs to be joined. |
314 | worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread); |
315 | |
316 | #if defined(DEBUG) |
317 | { |
318 | MonitorLocker ml(&pool->pool_monitor_); |
319 | ASSERT(pool->idle_workers_.ContainsForDebugging(worker)); |
320 | } |
321 | #endif |
322 | |
323 | pool->WorkerLoop(worker); |
324 | |
325 | worker->os_thread_ = nullptr; |
326 | os_thread->owning_thread_pool_worker_ = nullptr; |
327 | |
328 | // Call the thread exit hook here to notify the embedder that the |
329 | // thread pool thread is exiting. |
330 | if (Dart::thread_exit_callback() != NULL) { |
331 | (*Dart::thread_exit_callback())(); |
332 | } |
333 | } |
334 | |
335 | } // namespace dart |
336 | |