1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <memory.h>
40#include <toku_portability.h>
41#include <stdio.h>
42#include <errno.h>
43#include <string.h>
44
45#include <toku_assert.h>
46#include <toku_list.h>
47#include <portability/toku_pthread.h>
48
49#include "threadpool.h"
50
51toku_instr_key *tpool_lock_mutex_key;
52toku_instr_key *tp_thread_wait_key;
53toku_instr_key *tp_pool_wait_free_key;
54toku_instr_key *tp_internal_thread_key;
55
56struct toku_thread {
57 struct toku_thread_pool *pool;
58 toku_pthread_t tid;
59 void *(*f)(void *arg);
60 void *arg;
61 int doexit;
62 struct toku_list free_link;
63 struct toku_list all_link;
64 toku_cond_t wait;
65};
66
67struct toku_thread_pool {
68 int max_threads;
69 int cur_threads;
70 struct toku_list free_threads;
71 struct toku_list all_threads;
72
73 toku_mutex_t lock;
74 toku_cond_t wait_free;
75
76 uint64_t gets, get_blocks;
77};
78
79static void *toku_thread_run_internal(void *arg);
80static void toku_thread_pool_lock(struct toku_thread_pool *pool);
81static void toku_thread_pool_unlock(struct toku_thread_pool *pool);
82
83static int
84toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) {
85 int r;
86 struct toku_thread *MALLOC(thread);
87 if (thread == nullptr) {
88 r = get_error_errno();
89 } else {
90 memset(thread, 0, sizeof *thread);
91 thread->pool = pool;
92 toku_cond_init(*tp_thread_wait_key, &thread->wait, nullptr);
93 r = toku_pthread_create(*tp_internal_thread_key,
94 &thread->tid,
95 nullptr,
96 toku_thread_run_internal,
97 thread);
98 if (r) {
99 toku_cond_destroy(&thread->wait);
100 toku_free(thread);
101 thread = nullptr;
102 }
103 *toku_thread_return = thread;
104 }
105 return r;
106}
107
108void
109toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) {
110 toku_thread_pool_lock(thread->pool);
111 thread->f = f;
112 thread->arg = arg;
113 toku_cond_signal(&thread->wait);
114 toku_thread_pool_unlock(thread->pool);
115}
116
117static void toku_thread_destroy(struct toku_thread *thread) {
118 int r;
119 void *ret;
120 r = toku_pthread_join(thread->tid, &ret);
121 invariant(r == 0 && ret == thread);
122 struct toku_thread_pool *pool = thread->pool;
123 toku_thread_pool_lock(pool);
124 toku_list_remove(&thread->free_link);
125 toku_thread_pool_unlock(pool);
126 toku_cond_destroy(&thread->wait);
127 toku_free(thread);
128}
129
130static void
131toku_thread_ask_exit(struct toku_thread *thread) {
132 thread->doexit = 1;
133 toku_cond_signal(&thread->wait);
134}
135
136static void *
137toku_thread_run_internal(void *arg) {
138 struct toku_thread *thread = (struct toku_thread *) arg;
139 struct toku_thread_pool *pool = thread->pool;
140 toku_thread_pool_lock(pool);
141 while (1) {
142 toku_cond_signal(&pool->wait_free);
143 void *(*thread_f)(void *); void *thread_arg; int doexit;
144 while (1) {
145 thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy
146 if (thread_f || doexit)
147 break;
148 toku_cond_wait(&thread->wait, &pool->lock);
149 }
150 toku_thread_pool_unlock(pool);
151 if (thread_f)
152 (void) thread_f(thread_arg);
153 if (doexit)
154 break;
155 toku_thread_pool_lock(pool);
156 thread->f = nullptr;
157 toku_list_push(&pool->free_threads, &thread->free_link);
158 }
159 return toku_pthread_done(arg);
160}
161
162int toku_thread_pool_create(struct toku_thread_pool **pool_return,
163 int max_threads) {
164 int r;
165 struct toku_thread_pool *CALLOC(pool);
166 if (pool == nullptr) {
167 r = get_error_errno();
168 } else {
169 toku_mutex_init(*tpool_lock_mutex_key, &pool->lock, nullptr);
170 toku_list_init(&pool->free_threads);
171 toku_list_init(&pool->all_threads);
172 toku_cond_init(*tp_pool_wait_free_key, &pool->wait_free, nullptr);
173 pool->cur_threads = 0;
174 pool->max_threads = max_threads;
175 *pool_return = pool;
176 r = 0;
177 }
178 return r;
179}
180
181static void
182toku_thread_pool_lock(struct toku_thread_pool *pool) {
183 toku_mutex_lock(&pool->lock);
184}
185
186static void
187toku_thread_pool_unlock(struct toku_thread_pool *pool) {
188 toku_mutex_unlock(&pool->lock);
189}
190
191void
192toku_thread_pool_destroy(struct toku_thread_pool **poolptr) {
193 struct toku_thread_pool *pool = *poolptr;
194 *poolptr = nullptr;
195
196 // ask the threads to exit
197 toku_thread_pool_lock(pool);
198 struct toku_list *list;
199 for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) {
200 struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
201 toku_thread_ask_exit(thread);
202 }
203 toku_thread_pool_unlock(pool);
204
205 // wait for all of the threads to exit
206 while (!toku_list_empty(&pool->all_threads)) {
207 list = toku_list_pop_head(&pool->all_threads);
208 struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
209 toku_thread_destroy(thread);
210 pool->cur_threads -= 1;
211 }
212
213 invariant(pool->cur_threads == 0);
214
215 // cleanup
216 toku_cond_destroy(&pool->wait_free);
217 toku_mutex_destroy(&pool->lock);
218
219 toku_free(pool);
220}
221
222static int
223toku_thread_pool_add(struct toku_thread_pool *pool) {
224 struct toku_thread *thread = nullptr;
225 int r = toku_thread_create(pool, &thread);
226 if (r == 0) {
227 pool->cur_threads += 1;
228 toku_list_push(&pool->all_threads, &thread->all_link);
229 toku_list_push(&pool->free_threads, &thread->free_link);
230 toku_cond_signal(&pool->wait_free);
231 }
232 return r;
233}
234
235// get one thread from the free pool.
236static int
237toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) {
238 int r = 0;
239 toku_thread_pool_lock(pool);
240 pool->gets++;
241 while (1) {
242 if (!toku_list_empty(&pool->free_threads))
243 break;
244 if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads)
245 (void) toku_thread_pool_add(pool);
246 if (toku_list_empty(&pool->free_threads) && !dowait) {
247 r = EWOULDBLOCK;
248 break;
249 }
250 pool->get_blocks++;
251 toku_cond_wait(&pool->wait_free, &pool->lock);
252 }
253 if (r == 0) {
254 struct toku_list *list = toku_list_pop_head(&pool->free_threads);
255 struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link);
256 *toku_thread_return = thread;
257 } else
258 *toku_thread_return = nullptr;
259 toku_thread_pool_unlock(pool);
260 return r;
261}
262
263int
264toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) {
265 int r = 0;
266 int n = *nthreads;
267 int i;
268 for (i = 0; i < n; i++) {
269 r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]);
270 if (r != 0)
271 break;
272 }
273 *nthreads = i;
274 return r;
275}
276
277int
278toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) {
279 int n = *nthreads;
280 struct toku_thread *tids[n];
281 int r = toku_thread_pool_get(pool, dowait, nthreads, tids);
282 if (r == 0 || r == EWOULDBLOCK) {
283 n = *nthreads;
284 for (int i = 0; i < n; i++)
285 toku_thread_run(tids[i], f, arg);
286 }
287 return r;
288}
289
290void
291toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) {
292 fprintf(out, "%s:%d %p %llu %llu\n", __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks);
293}
294
295int
296toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) {
297 return pool->cur_threads;
298}
299