1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <memory.h> |
40 | #include <toku_portability.h> |
41 | #include <stdio.h> |
42 | #include <errno.h> |
43 | #include <string.h> |
44 | |
45 | #include <toku_assert.h> |
46 | #include <toku_list.h> |
47 | #include <portability/toku_pthread.h> |
48 | |
49 | #include "threadpool.h" |
50 | |
51 | toku_instr_key *tpool_lock_mutex_key; |
52 | toku_instr_key *tp_thread_wait_key; |
53 | toku_instr_key *tp_pool_wait_free_key; |
54 | toku_instr_key *tp_internal_thread_key; |
55 | |
56 | struct toku_thread { |
57 | struct toku_thread_pool *pool; |
58 | toku_pthread_t tid; |
59 | void *(*f)(void *arg); |
60 | void *arg; |
61 | int doexit; |
62 | struct toku_list free_link; |
63 | struct toku_list all_link; |
64 | toku_cond_t wait; |
65 | }; |
66 | |
67 | struct toku_thread_pool { |
68 | int max_threads; |
69 | int cur_threads; |
70 | struct toku_list free_threads; |
71 | struct toku_list all_threads; |
72 | |
73 | toku_mutex_t lock; |
74 | toku_cond_t wait_free; |
75 | |
76 | uint64_t gets, get_blocks; |
77 | }; |
78 | |
79 | static void *toku_thread_run_internal(void *arg); |
80 | static void toku_thread_pool_lock(struct toku_thread_pool *pool); |
81 | static void toku_thread_pool_unlock(struct toku_thread_pool *pool); |
82 | |
83 | static int |
84 | toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) { |
85 | int r; |
86 | struct toku_thread *MALLOC(thread); |
87 | if (thread == nullptr) { |
88 | r = get_error_errno(); |
89 | } else { |
90 | memset(thread, 0, sizeof *thread); |
91 | thread->pool = pool; |
92 | toku_cond_init(*tp_thread_wait_key, &thread->wait, nullptr); |
93 | r = toku_pthread_create(*tp_internal_thread_key, |
94 | &thread->tid, |
95 | nullptr, |
96 | toku_thread_run_internal, |
97 | thread); |
98 | if (r) { |
99 | toku_cond_destroy(&thread->wait); |
100 | toku_free(thread); |
101 | thread = nullptr; |
102 | } |
103 | *toku_thread_return = thread; |
104 | } |
105 | return r; |
106 | } |
107 | |
108 | void |
109 | toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) { |
110 | toku_thread_pool_lock(thread->pool); |
111 | thread->f = f; |
112 | thread->arg = arg; |
113 | toku_cond_signal(&thread->wait); |
114 | toku_thread_pool_unlock(thread->pool); |
115 | } |
116 | |
117 | static void toku_thread_destroy(struct toku_thread *thread) { |
118 | int r; |
119 | void *ret; |
120 | r = toku_pthread_join(thread->tid, &ret); |
121 | invariant(r == 0 && ret == thread); |
122 | struct toku_thread_pool *pool = thread->pool; |
123 | toku_thread_pool_lock(pool); |
124 | toku_list_remove(&thread->free_link); |
125 | toku_thread_pool_unlock(pool); |
126 | toku_cond_destroy(&thread->wait); |
127 | toku_free(thread); |
128 | } |
129 | |
130 | static void |
131 | toku_thread_ask_exit(struct toku_thread *thread) { |
132 | thread->doexit = 1; |
133 | toku_cond_signal(&thread->wait); |
134 | } |
135 | |
136 | static void * |
137 | toku_thread_run_internal(void *arg) { |
138 | struct toku_thread *thread = (struct toku_thread *) arg; |
139 | struct toku_thread_pool *pool = thread->pool; |
140 | toku_thread_pool_lock(pool); |
141 | while (1) { |
142 | toku_cond_signal(&pool->wait_free); |
143 | void *(*thread_f)(void *); void *thread_arg; int doexit; |
144 | while (1) { |
145 | thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy |
146 | if (thread_f || doexit) |
147 | break; |
148 | toku_cond_wait(&thread->wait, &pool->lock); |
149 | } |
150 | toku_thread_pool_unlock(pool); |
151 | if (thread_f) |
152 | (void) thread_f(thread_arg); |
153 | if (doexit) |
154 | break; |
155 | toku_thread_pool_lock(pool); |
156 | thread->f = nullptr; |
157 | toku_list_push(&pool->free_threads, &thread->free_link); |
158 | } |
159 | return toku_pthread_done(arg); |
160 | } |
161 | |
162 | int toku_thread_pool_create(struct toku_thread_pool **pool_return, |
163 | int max_threads) { |
164 | int r; |
165 | struct toku_thread_pool *CALLOC(pool); |
166 | if (pool == nullptr) { |
167 | r = get_error_errno(); |
168 | } else { |
169 | toku_mutex_init(*tpool_lock_mutex_key, &pool->lock, nullptr); |
170 | toku_list_init(&pool->free_threads); |
171 | toku_list_init(&pool->all_threads); |
172 | toku_cond_init(*tp_pool_wait_free_key, &pool->wait_free, nullptr); |
173 | pool->cur_threads = 0; |
174 | pool->max_threads = max_threads; |
175 | *pool_return = pool; |
176 | r = 0; |
177 | } |
178 | return r; |
179 | } |
180 | |
181 | static void |
182 | toku_thread_pool_lock(struct toku_thread_pool *pool) { |
183 | toku_mutex_lock(&pool->lock); |
184 | } |
185 | |
186 | static void |
187 | toku_thread_pool_unlock(struct toku_thread_pool *pool) { |
188 | toku_mutex_unlock(&pool->lock); |
189 | } |
190 | |
191 | void |
192 | toku_thread_pool_destroy(struct toku_thread_pool **poolptr) { |
193 | struct toku_thread_pool *pool = *poolptr; |
194 | *poolptr = nullptr; |
195 | |
196 | // ask the threads to exit |
197 | toku_thread_pool_lock(pool); |
198 | struct toku_list *list; |
199 | for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) { |
200 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link); |
201 | toku_thread_ask_exit(thread); |
202 | } |
203 | toku_thread_pool_unlock(pool); |
204 | |
205 | // wait for all of the threads to exit |
206 | while (!toku_list_empty(&pool->all_threads)) { |
207 | list = toku_list_pop_head(&pool->all_threads); |
208 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link); |
209 | toku_thread_destroy(thread); |
210 | pool->cur_threads -= 1; |
211 | } |
212 | |
213 | invariant(pool->cur_threads == 0); |
214 | |
215 | // cleanup |
216 | toku_cond_destroy(&pool->wait_free); |
217 | toku_mutex_destroy(&pool->lock); |
218 | |
219 | toku_free(pool); |
220 | } |
221 | |
222 | static int |
223 | toku_thread_pool_add(struct toku_thread_pool *pool) { |
224 | struct toku_thread *thread = nullptr; |
225 | int r = toku_thread_create(pool, &thread); |
226 | if (r == 0) { |
227 | pool->cur_threads += 1; |
228 | toku_list_push(&pool->all_threads, &thread->all_link); |
229 | toku_list_push(&pool->free_threads, &thread->free_link); |
230 | toku_cond_signal(&pool->wait_free); |
231 | } |
232 | return r; |
233 | } |
234 | |
235 | // get one thread from the free pool. |
236 | static int |
237 | toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) { |
238 | int r = 0; |
239 | toku_thread_pool_lock(pool); |
240 | pool->gets++; |
241 | while (1) { |
242 | if (!toku_list_empty(&pool->free_threads)) |
243 | break; |
244 | if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads) |
245 | (void) toku_thread_pool_add(pool); |
246 | if (toku_list_empty(&pool->free_threads) && !dowait) { |
247 | r = EWOULDBLOCK; |
248 | break; |
249 | } |
250 | pool->get_blocks++; |
251 | toku_cond_wait(&pool->wait_free, &pool->lock); |
252 | } |
253 | if (r == 0) { |
254 | struct toku_list *list = toku_list_pop_head(&pool->free_threads); |
255 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link); |
256 | *toku_thread_return = thread; |
257 | } else |
258 | *toku_thread_return = nullptr; |
259 | toku_thread_pool_unlock(pool); |
260 | return r; |
261 | } |
262 | |
263 | int |
264 | toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) { |
265 | int r = 0; |
266 | int n = *nthreads; |
267 | int i; |
268 | for (i = 0; i < n; i++) { |
269 | r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]); |
270 | if (r != 0) |
271 | break; |
272 | } |
273 | *nthreads = i; |
274 | return r; |
275 | } |
276 | |
277 | int |
278 | toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) { |
279 | int n = *nthreads; |
280 | struct toku_thread *tids[n]; |
281 | int r = toku_thread_pool_get(pool, dowait, nthreads, tids); |
282 | if (r == 0 || r == EWOULDBLOCK) { |
283 | n = *nthreads; |
284 | for (int i = 0; i < n; i++) |
285 | toku_thread_run(tids[i], f, arg); |
286 | } |
287 | return r; |
288 | } |
289 | |
290 | void |
291 | toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) { |
292 | fprintf(out, "%s:%d %p %llu %llu\n" , __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks); |
293 | } |
294 | |
295 | int |
296 | toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) { |
297 | return pool->cur_threads; |
298 | } |
299 | |