1 | /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. |
2 | * |
3 | * Permission is hereby granted, free of charge, to any person obtaining a copy |
4 | * of this software and associated documentation files (the "Software"), to |
5 | * deal in the Software without restriction, including without limitation the |
6 | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
7 | * sell copies of the Software, and to permit persons to whom the Software is |
8 | * furnished to do so, subject to the following conditions: |
9 | * |
10 | * The above copyright notice and this permission notice shall be included in |
11 | * all copies or substantial portions of the Software. |
12 | * |
13 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
14 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
15 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
16 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
17 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
18 | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
19 | * IN THE SOFTWARE. |
20 | */ |
21 | |
22 | #include "uv-common.h" |
23 | |
24 | #if !defined(_WIN32) |
25 | # include "unix/internal.h" |
26 | #endif |
27 | |
28 | #include <stdlib.h> |
29 | |
30 | #define MAX_THREADPOOL_SIZE 1024 |
31 | |
32 | static uv_once_t once = UV_ONCE_INIT; |
33 | static uv_cond_t cond; |
34 | static uv_mutex_t mutex; |
35 | static unsigned int idle_threads; |
36 | static unsigned int slow_io_work_running; |
37 | static unsigned int nthreads; |
38 | static uv_thread_t* threads; |
39 | static uv_thread_t default_threads[4]; |
40 | static QUEUE exit_message; |
41 | static QUEUE wq; |
42 | static QUEUE run_slow_work_message; |
43 | static QUEUE slow_io_pending_wq; |
44 | |
45 | static unsigned int slow_work_thread_threshold(void) { |
46 | return (nthreads + 1) / 2; |
47 | } |
48 | |
49 | static void uv__cancelled(struct uv__work* w) { |
50 | abort(); |
51 | } |
52 | |
53 | |
54 | /* To avoid deadlock with uv_cancel() it's crucial that the worker |
55 | * never holds the global mutex and the loop-local mutex at the same time. |
56 | */ |
57 | static void worker(void* arg) { |
58 | struct uv__work* w; |
59 | QUEUE* q; |
60 | int is_slow_work; |
61 | |
62 | uv_sem_post((uv_sem_t*) arg); |
63 | arg = NULL; |
64 | |
65 | uv_mutex_lock(&mutex); |
66 | for (;;) { |
67 | /* `mutex` should always be locked at this point. */ |
68 | |
69 | /* Keep waiting while either no work is present or only slow I/O |
70 | and we're at the threshold for that. */ |
71 | while (QUEUE_EMPTY(&wq) || |
72 | (QUEUE_HEAD(&wq) == &run_slow_work_message && |
73 | QUEUE_NEXT(&run_slow_work_message) == &wq && |
74 | slow_io_work_running >= slow_work_thread_threshold())) { |
75 | idle_threads += 1; |
76 | uv_cond_wait(&cond, &mutex); |
77 | idle_threads -= 1; |
78 | } |
79 | |
80 | q = QUEUE_HEAD(&wq); |
81 | if (q == &exit_message) { |
82 | uv_cond_signal(&cond); |
83 | uv_mutex_unlock(&mutex); |
84 | break; |
85 | } |
86 | |
87 | QUEUE_REMOVE(q); |
88 | QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ |
89 | |
90 | is_slow_work = 0; |
91 | if (q == &run_slow_work_message) { |
92 | /* If we're at the slow I/O threshold, re-schedule until after all |
93 | other work in the queue is done. */ |
94 | if (slow_io_work_running >= slow_work_thread_threshold()) { |
95 | QUEUE_INSERT_TAIL(&wq, q); |
96 | continue; |
97 | } |
98 | |
99 | /* If we encountered a request to run slow I/O work but there is none |
100 | to run, that means it's cancelled => Start over. */ |
101 | if (QUEUE_EMPTY(&slow_io_pending_wq)) |
102 | continue; |
103 | |
104 | is_slow_work = 1; |
105 | slow_io_work_running++; |
106 | |
107 | q = QUEUE_HEAD(&slow_io_pending_wq); |
108 | QUEUE_REMOVE(q); |
109 | QUEUE_INIT(q); |
110 | |
111 | /* If there is more slow I/O work, schedule it to be run as well. */ |
112 | if (!QUEUE_EMPTY(&slow_io_pending_wq)) { |
113 | QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); |
114 | if (idle_threads > 0) |
115 | uv_cond_signal(&cond); |
116 | } |
117 | } |
118 | |
119 | uv_mutex_unlock(&mutex); |
120 | |
121 | w = QUEUE_DATA(q, struct uv__work, wq); |
122 | w->work(w); |
123 | |
124 | uv_mutex_lock(&w->loop->wq_mutex); |
125 | w->work = NULL; /* Signal uv_cancel() that the work req is done |
126 | executing. */ |
127 | QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); |
128 | uv_async_send(&w->loop->wq_async); |
129 | uv_mutex_unlock(&w->loop->wq_mutex); |
130 | |
131 | /* Lock `mutex` since that is expected at the start of the next |
132 | * iteration. */ |
133 | uv_mutex_lock(&mutex); |
134 | if (is_slow_work) { |
135 | /* `slow_io_work_running` is protected by `mutex`. */ |
136 | slow_io_work_running--; |
137 | } |
138 | } |
139 | } |
140 | |
141 | |
142 | static void post(QUEUE* q, enum uv__work_kind kind) { |
143 | uv_mutex_lock(&mutex); |
144 | if (kind == UV__WORK_SLOW_IO) { |
145 | /* Insert into a separate queue. */ |
146 | QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); |
147 | if (!QUEUE_EMPTY(&run_slow_work_message)) { |
148 | /* Running slow I/O tasks is already scheduled => Nothing to do here. |
149 | The worker that runs said other task will schedule this one as well. */ |
150 | uv_mutex_unlock(&mutex); |
151 | return; |
152 | } |
153 | q = &run_slow_work_message; |
154 | } |
155 | |
156 | QUEUE_INSERT_TAIL(&wq, q); |
157 | if (idle_threads > 0) |
158 | uv_cond_signal(&cond); |
159 | uv_mutex_unlock(&mutex); |
160 | } |
161 | |
162 | |
163 | void uv__threadpool_cleanup(void) { |
164 | #ifndef _WIN32 |
165 | unsigned int i; |
166 | |
167 | if (nthreads == 0) |
168 | return; |
169 | |
170 | post(&exit_message, UV__WORK_CPU); |
171 | |
172 | for (i = 0; i < nthreads; i++) |
173 | if (uv_thread_join(threads + i)) |
174 | abort(); |
175 | |
176 | if (threads != default_threads) |
177 | uv__free(threads); |
178 | |
179 | uv_mutex_destroy(&mutex); |
180 | uv_cond_destroy(&cond); |
181 | |
182 | threads = NULL; |
183 | nthreads = 0; |
184 | #endif |
185 | } |
186 | |
187 | |
188 | static void init_threads(void) { |
189 | unsigned int i; |
190 | const char* val; |
191 | uv_sem_t sem; |
192 | |
193 | nthreads = ARRAY_SIZE(default_threads); |
194 | val = getenv("UV_THREADPOOL_SIZE" ); |
195 | if (val != NULL) |
196 | nthreads = atoi(val); |
197 | if (nthreads == 0) |
198 | nthreads = 1; |
199 | if (nthreads > MAX_THREADPOOL_SIZE) |
200 | nthreads = MAX_THREADPOOL_SIZE; |
201 | |
202 | threads = default_threads; |
203 | if (nthreads > ARRAY_SIZE(default_threads)) { |
204 | threads = uv__malloc(nthreads * sizeof(threads[0])); |
205 | if (threads == NULL) { |
206 | nthreads = ARRAY_SIZE(default_threads); |
207 | threads = default_threads; |
208 | } |
209 | } |
210 | |
211 | if (uv_cond_init(&cond)) |
212 | abort(); |
213 | |
214 | if (uv_mutex_init(&mutex)) |
215 | abort(); |
216 | |
217 | QUEUE_INIT(&wq); |
218 | QUEUE_INIT(&slow_io_pending_wq); |
219 | QUEUE_INIT(&run_slow_work_message); |
220 | |
221 | if (uv_sem_init(&sem, 0)) |
222 | abort(); |
223 | |
224 | for (i = 0; i < nthreads; i++) |
225 | if (uv_thread_create(threads + i, worker, &sem)) |
226 | abort(); |
227 | |
228 | for (i = 0; i < nthreads; i++) |
229 | uv_sem_wait(&sem); |
230 | |
231 | uv_sem_destroy(&sem); |
232 | } |
233 | |
234 | |
235 | #ifndef _WIN32 |
236 | static void reset_once(void) { |
237 | uv_once_t child_once = UV_ONCE_INIT; |
238 | memcpy(&once, &child_once, sizeof(child_once)); |
239 | } |
240 | #endif |
241 | |
242 | |
243 | static void init_once(void) { |
244 | #ifndef _WIN32 |
245 | /* Re-initialize the threadpool after fork. |
246 | * Note that this discards the global mutex and condition as well |
247 | * as the work queue. |
248 | */ |
249 | if (pthread_atfork(NULL, NULL, &reset_once)) |
250 | abort(); |
251 | #endif |
252 | init_threads(); |
253 | } |
254 | |
255 | |
256 | void uv__work_submit(uv_loop_t* loop, |
257 | struct uv__work* w, |
258 | enum uv__work_kind kind, |
259 | void (*work)(struct uv__work* w), |
260 | void (*done)(struct uv__work* w, int status)) { |
261 | uv_once(&once, init_once); |
262 | w->loop = loop; |
263 | w->work = work; |
264 | w->done = done; |
265 | post(&w->wq, kind); |
266 | } |
267 | |
268 | |
269 | static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { |
270 | int cancelled; |
271 | |
272 | uv_mutex_lock(&mutex); |
273 | uv_mutex_lock(&w->loop->wq_mutex); |
274 | |
275 | cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; |
276 | if (cancelled) |
277 | QUEUE_REMOVE(&w->wq); |
278 | |
279 | uv_mutex_unlock(&w->loop->wq_mutex); |
280 | uv_mutex_unlock(&mutex); |
281 | |
282 | if (!cancelled) |
283 | return UV_EBUSY; |
284 | |
285 | w->work = uv__cancelled; |
286 | uv_mutex_lock(&loop->wq_mutex); |
287 | QUEUE_INSERT_TAIL(&loop->wq, &w->wq); |
288 | uv_async_send(&loop->wq_async); |
289 | uv_mutex_unlock(&loop->wq_mutex); |
290 | |
291 | return 0; |
292 | } |
293 | |
294 | |
295 | void uv__work_done(uv_async_t* handle) { |
296 | struct uv__work* w; |
297 | uv_loop_t* loop; |
298 | QUEUE* q; |
299 | QUEUE wq; |
300 | int err; |
301 | |
302 | loop = container_of(handle, uv_loop_t, wq_async); |
303 | uv_mutex_lock(&loop->wq_mutex); |
304 | QUEUE_MOVE(&loop->wq, &wq); |
305 | uv_mutex_unlock(&loop->wq_mutex); |
306 | |
307 | while (!QUEUE_EMPTY(&wq)) { |
308 | q = QUEUE_HEAD(&wq); |
309 | QUEUE_REMOVE(q); |
310 | |
311 | w = container_of(q, struct uv__work, wq); |
312 | err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; |
313 | w->done(w, err); |
314 | } |
315 | } |
316 | |
317 | |
318 | static void uv__queue_work(struct uv__work* w) { |
319 | uv_work_t* req = container_of(w, uv_work_t, work_req); |
320 | |
321 | req->work_cb(req); |
322 | } |
323 | |
324 | |
325 | static void uv__queue_done(struct uv__work* w, int err) { |
326 | uv_work_t* req; |
327 | |
328 | req = container_of(w, uv_work_t, work_req); |
329 | uv__req_unregister(req->loop, req); |
330 | |
331 | if (req->after_work_cb == NULL) |
332 | return; |
333 | |
334 | req->after_work_cb(req, err); |
335 | } |
336 | |
337 | |
338 | int uv_queue_work(uv_loop_t* loop, |
339 | uv_work_t* req, |
340 | uv_work_cb work_cb, |
341 | uv_after_work_cb after_work_cb) { |
342 | if (work_cb == NULL) |
343 | return UV_EINVAL; |
344 | |
345 | uv__req_init(loop, req, UV_WORK); |
346 | req->loop = loop; |
347 | req->work_cb = work_cb; |
348 | req->after_work_cb = after_work_cb; |
349 | uv__work_submit(loop, |
350 | &req->work_req, |
351 | UV__WORK_CPU, |
352 | uv__queue_work, |
353 | uv__queue_done); |
354 | return 0; |
355 | } |
356 | |
357 | |
358 | int uv_cancel(uv_req_t* req) { |
359 | struct uv__work* wreq; |
360 | uv_loop_t* loop; |
361 | |
362 | switch (req->type) { |
363 | case UV_FS: |
364 | loop = ((uv_fs_t*) req)->loop; |
365 | wreq = &((uv_fs_t*) req)->work_req; |
366 | break; |
367 | case UV_GETADDRINFO: |
368 | loop = ((uv_getaddrinfo_t*) req)->loop; |
369 | wreq = &((uv_getaddrinfo_t*) req)->work_req; |
370 | break; |
371 | case UV_GETNAMEINFO: |
372 | loop = ((uv_getnameinfo_t*) req)->loop; |
373 | wreq = &((uv_getnameinfo_t*) req)->work_req; |
374 | break; |
375 | case UV_RANDOM: |
376 | loop = ((uv_random_t*) req)->loop; |
377 | wreq = &((uv_random_t*) req)->work_req; |
378 | break; |
379 | case UV_WORK: |
380 | loop = ((uv_work_t*) req)->loop; |
381 | wreq = &((uv_work_t*) req)->work_req; |
382 | break; |
383 | default: |
384 | return UV_EINVAL; |
385 | } |
386 | |
387 | return uv__work_cancel(loop, req, wreq); |
388 | } |
389 | |