| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <memory.h> |
| 40 | #include <toku_portability.h> |
| 41 | #include <stdio.h> |
| 42 | #include <errno.h> |
| 43 | #include <string.h> |
| 44 | |
| 45 | #include <toku_assert.h> |
| 46 | #include <toku_list.h> |
| 47 | #include <portability/toku_pthread.h> |
| 48 | |
| 49 | #include "threadpool.h" |
| 50 | |
| 51 | toku_instr_key *tpool_lock_mutex_key; |
| 52 | toku_instr_key *tp_thread_wait_key; |
| 53 | toku_instr_key *tp_pool_wait_free_key; |
| 54 | toku_instr_key *tp_internal_thread_key; |
| 55 | |
| 56 | struct toku_thread { |
| 57 | struct toku_thread_pool *pool; |
| 58 | toku_pthread_t tid; |
| 59 | void *(*f)(void *arg); |
| 60 | void *arg; |
| 61 | int doexit; |
| 62 | struct toku_list free_link; |
| 63 | struct toku_list all_link; |
| 64 | toku_cond_t wait; |
| 65 | }; |
| 66 | |
| 67 | struct toku_thread_pool { |
| 68 | int max_threads; |
| 69 | int cur_threads; |
| 70 | struct toku_list free_threads; |
| 71 | struct toku_list all_threads; |
| 72 | |
| 73 | toku_mutex_t lock; |
| 74 | toku_cond_t wait_free; |
| 75 | |
| 76 | uint64_t gets, get_blocks; |
| 77 | }; |
| 78 | |
| 79 | static void *toku_thread_run_internal(void *arg); |
| 80 | static void toku_thread_pool_lock(struct toku_thread_pool *pool); |
| 81 | static void toku_thread_pool_unlock(struct toku_thread_pool *pool); |
| 82 | |
| 83 | static int |
| 84 | toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) { |
| 85 | int r; |
| 86 | struct toku_thread *MALLOC(thread); |
| 87 | if (thread == nullptr) { |
| 88 | r = get_error_errno(); |
| 89 | } else { |
| 90 | memset(thread, 0, sizeof *thread); |
| 91 | thread->pool = pool; |
| 92 | toku_cond_init(*tp_thread_wait_key, &thread->wait, nullptr); |
| 93 | r = toku_pthread_create(*tp_internal_thread_key, |
| 94 | &thread->tid, |
| 95 | nullptr, |
| 96 | toku_thread_run_internal, |
| 97 | thread); |
| 98 | if (r) { |
| 99 | toku_cond_destroy(&thread->wait); |
| 100 | toku_free(thread); |
| 101 | thread = nullptr; |
| 102 | } |
| 103 | *toku_thread_return = thread; |
| 104 | } |
| 105 | return r; |
| 106 | } |
| 107 | |
| 108 | void |
| 109 | toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) { |
| 110 | toku_thread_pool_lock(thread->pool); |
| 111 | thread->f = f; |
| 112 | thread->arg = arg; |
| 113 | toku_cond_signal(&thread->wait); |
| 114 | toku_thread_pool_unlock(thread->pool); |
| 115 | } |
| 116 | |
| 117 | static void toku_thread_destroy(struct toku_thread *thread) { |
| 118 | int r; |
| 119 | void *ret; |
| 120 | r = toku_pthread_join(thread->tid, &ret); |
| 121 | invariant(r == 0 && ret == thread); |
| 122 | struct toku_thread_pool *pool = thread->pool; |
| 123 | toku_thread_pool_lock(pool); |
| 124 | toku_list_remove(&thread->free_link); |
| 125 | toku_thread_pool_unlock(pool); |
| 126 | toku_cond_destroy(&thread->wait); |
| 127 | toku_free(thread); |
| 128 | } |
| 129 | |
| 130 | static void |
| 131 | toku_thread_ask_exit(struct toku_thread *thread) { |
| 132 | thread->doexit = 1; |
| 133 | toku_cond_signal(&thread->wait); |
| 134 | } |
| 135 | |
| 136 | static void * |
| 137 | toku_thread_run_internal(void *arg) { |
| 138 | struct toku_thread *thread = (struct toku_thread *) arg; |
| 139 | struct toku_thread_pool *pool = thread->pool; |
| 140 | toku_thread_pool_lock(pool); |
| 141 | while (1) { |
| 142 | toku_cond_signal(&pool->wait_free); |
| 143 | void *(*thread_f)(void *); void *thread_arg; int doexit; |
| 144 | while (1) { |
| 145 | thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy |
| 146 | if (thread_f || doexit) |
| 147 | break; |
| 148 | toku_cond_wait(&thread->wait, &pool->lock); |
| 149 | } |
| 150 | toku_thread_pool_unlock(pool); |
| 151 | if (thread_f) |
| 152 | (void) thread_f(thread_arg); |
| 153 | if (doexit) |
| 154 | break; |
| 155 | toku_thread_pool_lock(pool); |
| 156 | thread->f = nullptr; |
| 157 | toku_list_push(&pool->free_threads, &thread->free_link); |
| 158 | } |
| 159 | return toku_pthread_done(arg); |
| 160 | } |
| 161 | |
| 162 | int toku_thread_pool_create(struct toku_thread_pool **pool_return, |
| 163 | int max_threads) { |
| 164 | int r; |
| 165 | struct toku_thread_pool *CALLOC(pool); |
| 166 | if (pool == nullptr) { |
| 167 | r = get_error_errno(); |
| 168 | } else { |
| 169 | toku_mutex_init(*tpool_lock_mutex_key, &pool->lock, nullptr); |
| 170 | toku_list_init(&pool->free_threads); |
| 171 | toku_list_init(&pool->all_threads); |
| 172 | toku_cond_init(*tp_pool_wait_free_key, &pool->wait_free, nullptr); |
| 173 | pool->cur_threads = 0; |
| 174 | pool->max_threads = max_threads; |
| 175 | *pool_return = pool; |
| 176 | r = 0; |
| 177 | } |
| 178 | return r; |
| 179 | } |
| 180 | |
| 181 | static void |
| 182 | toku_thread_pool_lock(struct toku_thread_pool *pool) { |
| 183 | toku_mutex_lock(&pool->lock); |
| 184 | } |
| 185 | |
| 186 | static void |
| 187 | toku_thread_pool_unlock(struct toku_thread_pool *pool) { |
| 188 | toku_mutex_unlock(&pool->lock); |
| 189 | } |
| 190 | |
| 191 | void |
| 192 | toku_thread_pool_destroy(struct toku_thread_pool **poolptr) { |
| 193 | struct toku_thread_pool *pool = *poolptr; |
| 194 | *poolptr = nullptr; |
| 195 | |
| 196 | // ask the threads to exit |
| 197 | toku_thread_pool_lock(pool); |
| 198 | struct toku_list *list; |
| 199 | for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) { |
| 200 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link); |
| 201 | toku_thread_ask_exit(thread); |
| 202 | } |
| 203 | toku_thread_pool_unlock(pool); |
| 204 | |
| 205 | // wait for all of the threads to exit |
| 206 | while (!toku_list_empty(&pool->all_threads)) { |
| 207 | list = toku_list_pop_head(&pool->all_threads); |
| 208 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link); |
| 209 | toku_thread_destroy(thread); |
| 210 | pool->cur_threads -= 1; |
| 211 | } |
| 212 | |
| 213 | invariant(pool->cur_threads == 0); |
| 214 | |
| 215 | // cleanup |
| 216 | toku_cond_destroy(&pool->wait_free); |
| 217 | toku_mutex_destroy(&pool->lock); |
| 218 | |
| 219 | toku_free(pool); |
| 220 | } |
| 221 | |
| 222 | static int |
| 223 | toku_thread_pool_add(struct toku_thread_pool *pool) { |
| 224 | struct toku_thread *thread = nullptr; |
| 225 | int r = toku_thread_create(pool, &thread); |
| 226 | if (r == 0) { |
| 227 | pool->cur_threads += 1; |
| 228 | toku_list_push(&pool->all_threads, &thread->all_link); |
| 229 | toku_list_push(&pool->free_threads, &thread->free_link); |
| 230 | toku_cond_signal(&pool->wait_free); |
| 231 | } |
| 232 | return r; |
| 233 | } |
| 234 | |
| 235 | // get one thread from the free pool. |
| 236 | static int |
| 237 | toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) { |
| 238 | int r = 0; |
| 239 | toku_thread_pool_lock(pool); |
| 240 | pool->gets++; |
| 241 | while (1) { |
| 242 | if (!toku_list_empty(&pool->free_threads)) |
| 243 | break; |
| 244 | if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads) |
| 245 | (void) toku_thread_pool_add(pool); |
| 246 | if (toku_list_empty(&pool->free_threads) && !dowait) { |
| 247 | r = EWOULDBLOCK; |
| 248 | break; |
| 249 | } |
| 250 | pool->get_blocks++; |
| 251 | toku_cond_wait(&pool->wait_free, &pool->lock); |
| 252 | } |
| 253 | if (r == 0) { |
| 254 | struct toku_list *list = toku_list_pop_head(&pool->free_threads); |
| 255 | struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link); |
| 256 | *toku_thread_return = thread; |
| 257 | } else |
| 258 | *toku_thread_return = nullptr; |
| 259 | toku_thread_pool_unlock(pool); |
| 260 | return r; |
| 261 | } |
| 262 | |
| 263 | int |
| 264 | toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) { |
| 265 | int r = 0; |
| 266 | int n = *nthreads; |
| 267 | int i; |
| 268 | for (i = 0; i < n; i++) { |
| 269 | r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]); |
| 270 | if (r != 0) |
| 271 | break; |
| 272 | } |
| 273 | *nthreads = i; |
| 274 | return r; |
| 275 | } |
| 276 | |
| 277 | int |
| 278 | toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) { |
| 279 | int n = *nthreads; |
| 280 | struct toku_thread *tids[n]; |
| 281 | int r = toku_thread_pool_get(pool, dowait, nthreads, tids); |
| 282 | if (r == 0 || r == EWOULDBLOCK) { |
| 283 | n = *nthreads; |
| 284 | for (int i = 0; i < n; i++) |
| 285 | toku_thread_run(tids[i], f, arg); |
| 286 | } |
| 287 | return r; |
| 288 | } |
| 289 | |
| 290 | void |
| 291 | toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) { |
| 292 | fprintf(out, "%s:%d %p %llu %llu\n" , __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks); |
| 293 | } |
| 294 | |
| 295 | int |
| 296 | toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) { |
| 297 | return pool->cur_threads; |
| 298 | } |
| 299 | |