1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "vm/thread_pool.h"
6
7#include "vm/dart.h"
8#include "vm/flags.h"
9#include "vm/lockers.h"
10
11namespace dart {
12
13DEFINE_FLAG(int,
14 worker_timeout_millis,
15 5000,
16 "Free workers when they have been idle for this amount of time.");
17
18static int64_t ComputeTimeout(int64_t idle_start) {
19 int64_t worker_timeout_micros =
20 FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond;
21 if (worker_timeout_micros <= 0) {
22 // No timeout.
23 return 0;
24 } else {
25 int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start;
26 if (waited >= worker_timeout_micros) {
27 // We must have gotten a spurious wakeup just before we timed
28 // out. Give the worker one last desperate chance to live. We
29 // are merciful.
30 return 1;
31 } else {
32 return worker_timeout_micros - waited;
33 }
34 }
35}
36
37ThreadPool::ThreadPool(uintptr_t max_pool_size)
38 : all_workers_dead_(false), max_pool_size_(max_pool_size) {}
39
40ThreadPool::~ThreadPool() {
41 Shutdown();
42}
43
44void ThreadPool::Shutdown() {
45 {
46 MonitorLocker ml(&pool_monitor_);
47
48 // Prevent scheduling of new tasks.
49 shutting_down_ = true;
50
51 if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
52 // All workers have already died.
53 all_workers_dead_ = true;
54 } else {
55 // Tell workers to drain remaining work and then shut down.
56 ml.NotifyAll();
57 }
58 }
59
60 // Wait until all workers are dead. Any new death will notify the exit
61 // monitor.
62 {
63 MonitorLocker eml(&exit_monitor_);
64 while (!all_workers_dead_) {
65 eml.Wait();
66 }
67 }
68 ASSERT(count_idle_ == 0);
69 ASSERT(count_running_ == 0);
70 ASSERT(idle_workers_.IsEmpty());
71 ASSERT(running_workers_.IsEmpty());
72
73 WorkerList dead_workers_to_join;
74 {
75 MonitorLocker ml(&pool_monitor_);
76 ObtainDeadWorkersLocked(&dead_workers_to_join);
77 }
78 JoinDeadWorkersLocked(&dead_workers_to_join);
79
80 ASSERT(count_dead_ == 0);
81 ASSERT(dead_workers_.IsEmpty());
82}
83
84bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
85 Worker* new_worker = nullptr;
86 {
87 MonitorLocker ml(&pool_monitor_);
88 if (shutting_down_) {
89 return false;
90 }
91 new_worker = ScheduleTaskLocked(&ml, std::move(task));
92 }
93 if (new_worker != nullptr) {
94 new_worker->StartThread();
95 }
96 return true;
97}
98
99bool ThreadPool::CurrentThreadIsWorker() {
100 auto worker =
101 static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
102 return worker != nullptr && worker->pool_ == this;
103}
104
105void ThreadPool::MarkCurrentWorkerAsBlocked() {
106 auto worker =
107 static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
108 Worker* new_worker = nullptr;
109 if (worker != nullptr) {
110 MonitorLocker ml(&pool_monitor_);
111 ASSERT(!worker->is_blocked_);
112 worker->is_blocked_ = true;
113 if (max_pool_size_ > 0) {
114 ++max_pool_size_;
115 // This thread is blocked and therefore no longer usable as a worker.
116 // If we have pending tasks and there are no idle workers, we will spawn a
117 // new thread (temporarily allow exceeding the maximum pool size) to
118 // handle the pending tasks.
119 if (idle_workers_.IsEmpty() && pending_tasks_ > 0) {
120 new_worker = new Worker(this);
121 idle_workers_.Append(new_worker);
122 count_idle_++;
123 }
124 }
125 }
126 if (new_worker != nullptr) {
127 new_worker->StartThread();
128 }
129}
130
131void ThreadPool::MarkCurrentWorkerAsUnBlocked() {
132 auto worker =
133 static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
134 if (worker != nullptr) {
135 MonitorLocker ml(&pool_monitor_);
136 if (worker->is_blocked_) {
137 worker->is_blocked_ = false;
138 if (max_pool_size_ > 0) {
139 --max_pool_size_;
140 ASSERT(max_pool_size_ > 0);
141 }
142 }
143 }
144}
145
146void ThreadPool::WorkerLoop(Worker* worker) {
147 WorkerList dead_workers_to_join;
148
149 while (true) {
150 MonitorLocker ml(&pool_monitor_);
151
152 if (!tasks_.IsEmpty()) {
153 IdleToRunningLocked(worker);
154 while (!tasks_.IsEmpty()) {
155 std::unique_ptr<Task> task(tasks_.RemoveFirst());
156 pending_tasks_--;
157 MonitorLeaveScope mls(&ml);
158 task->Run();
159 ASSERT(Isolate::Current() == nullptr);
160 task.reset();
161 }
162 RunningToIdleLocked(worker);
163 }
164
165 if (running_workers_.IsEmpty()) {
166 ASSERT(tasks_.IsEmpty());
167 OnEnterIdleLocked(&ml);
168 if (!tasks_.IsEmpty()) {
169 continue;
170 }
171 }
172
173 if (shutting_down_) {
174 ObtainDeadWorkersLocked(&dead_workers_to_join);
175 IdleToDeadLocked(worker);
176 break;
177 }
178
179 // Sleep until we get a new task, we time out or we're shutdown.
180 const int64_t idle_start = OS::GetCurrentMonotonicMicros();
181 bool done = false;
182 while (!done) {
183 const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
184
185 // We have to drain all pending tasks.
186 if (!tasks_.IsEmpty()) break;
187
188 if (shutting_down_ || result == Monitor::kTimedOut) {
189 done = true;
190 break;
191 }
192 }
193 if (done) {
194 ObtainDeadWorkersLocked(&dead_workers_to_join);
195 IdleToDeadLocked(worker);
196 break;
197 }
198 }
199
200 // Before we transitioned to dead we obtained the list of previously died dead
201 // workers, which we join here. Since every death of a worker will join
202 // previously died workers, we keep the pending non-joined [dead_workers_] to
203 // effectively 1.
204 JoinDeadWorkersLocked(&dead_workers_to_join);
205}
206
207void ThreadPool::IdleToRunningLocked(Worker* worker) {
208 ASSERT(idle_workers_.ContainsForDebugging(worker));
209 idle_workers_.Remove(worker);
210 running_workers_.Append(worker);
211 count_idle_--;
212 count_running_++;
213}
214
215void ThreadPool::RunningToIdleLocked(Worker* worker) {
216 ASSERT(tasks_.IsEmpty());
217
218 ASSERT(running_workers_.ContainsForDebugging(worker));
219 running_workers_.Remove(worker);
220 idle_workers_.Append(worker);
221 count_running_--;
222 count_idle_++;
223}
224
225void ThreadPool::IdleToDeadLocked(Worker* worker) {
226 ASSERT(tasks_.IsEmpty());
227
228 ASSERT(idle_workers_.ContainsForDebugging(worker));
229 idle_workers_.Remove(worker);
230 dead_workers_.Append(worker);
231 count_idle_--;
232 count_dead_++;
233
234 // Notify shutdown thread that the worker thread is about to finish.
235 if (shutting_down_) {
236 if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
237 all_workers_dead_ = true;
238 MonitorLocker eml(&exit_monitor_);
239 eml.Notify();
240 }
241 }
242}
243
244void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
245 dead_workers_to_join->AppendList(&dead_workers_);
246 ASSERT(dead_workers_.IsEmpty());
247 count_dead_ = 0;
248}
249
250void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
251 auto it = dead_workers_to_join->begin();
252 while (it != dead_workers_to_join->end()) {
253 Worker* worker = *it;
254 it = dead_workers_to_join->Erase(it);
255
256 OSThread::Join(worker->join_id_);
257 delete worker;
258 }
259 ASSERT(dead_workers_to_join->IsEmpty());
260}
261
262ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
263 std::unique_ptr<Task> task) {
264 // Enqueue the new task.
265 tasks_.Append(task.release());
266 pending_tasks_++;
267 ASSERT(pending_tasks_ >= 1);
268
269 // Notify existing idle worker (if available).
270 if (count_idle_ >= pending_tasks_) {
271 ASSERT(!idle_workers_.IsEmpty());
272 ml->Notify();
273 return nullptr;
274 }
275
276 // If we have maxed out the number of threads running, we will not start a
277 // new one.
278 if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
279 if (!idle_workers_.IsEmpty()) {
280 ml->Notify();
281 }
282 return nullptr;
283 }
284
285 // Otherwise start a new worker.
286 auto new_worker = new Worker(this);
287 idle_workers_.Append(new_worker);
288 count_idle_++;
289 return new_worker;
290}
291
292ThreadPool::Worker::Worker(ThreadPool* pool)
293 : pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}
294
295void ThreadPool::Worker::StartThread() {
296 int result = OSThread::Start("DartWorker", &Worker::Main,
297 reinterpret_cast<uword>(this));
298 if (result != 0) {
299 FATAL1("Could not start worker thread: result = %d.", result);
300 }
301}
302
303void ThreadPool::Worker::Main(uword args) {
304 OSThread* os_thread = OSThread::Current();
305 ASSERT(os_thread != nullptr);
306
307 Worker* worker = reinterpret_cast<Worker*>(args);
308 ThreadPool* pool = worker->pool_;
309
310 os_thread->owning_thread_pool_worker_ = worker;
311 worker->os_thread_ = os_thread;
312
313 // Once the worker quits it needs to be joined.
314 worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
315
316#if defined(DEBUG)
317 {
318 MonitorLocker ml(&pool->pool_monitor_);
319 ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
320 }
321#endif
322
323 pool->WorkerLoop(worker);
324
325 worker->os_thread_ = nullptr;
326 os_thread->owning_thread_pool_worker_ = nullptr;
327
328 // Call the thread exit hook here to notify the embedder that the
329 // thread pool thread is exiting.
330 if (Dart::thread_exit_callback() != NULL) {
331 (*Dart::thread_exit_callback())();
332 }
333}
334
335} // namespace dart
336