| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <memory.h> |
| 40 | |
| 41 | #include <portability/toku_config.h> |
| 42 | #include <portability/toku_time.h> |
| 43 | #include <toku_pthread.h> |
| 44 | |
| 45 | #include "kibbutz.h" |
| 46 | |
| 47 | // A Kibbutz is a collection of workers and some work to do. |
| 48 | struct todo { |
| 49 | void (*f)(void *); |
| 50 | void *; |
| 51 | struct todo *next; |
| 52 | struct todo *prev; |
| 53 | }; |
| 54 | |
| 55 | struct kid { |
| 56 | struct kibbutz *k; |
| 57 | }; |
| 58 | |
| 59 | struct kibbutz { |
| 60 | toku_mutex_t mutex; |
| 61 | toku_cond_t cond; |
| 62 | bool please_shutdown; |
| 63 | struct todo *head, *tail; // head is the next thing to do. |
| 64 | int n_workers; |
| 65 | pthread_t *workers; // an array of n_workers |
| 66 | struct kid *ids; // pass this in when creating a worker so it knows who it is. |
| 67 | |
| 68 | uint64_t threads_active; |
| 69 | uint64_t queue_size; |
| 70 | uint64_t max_queue_size; |
| 71 | uint64_t total_items_processed; |
| 72 | uint64_t total_execution_time; |
| 73 | }; |
| 74 | |
| 75 | static void *work_on_kibbutz(void *); |
| 76 | |
| 77 | toku_instr_key *kibbutz_mutex_key; |
| 78 | toku_instr_key *kibbutz_k_cond_key; |
| 79 | toku_instr_key *kibbutz_thread_key; |
| 80 | |
| 81 | int toku_kibbutz_create(int n_workers, KIBBUTZ *kb_ret) { |
| 82 | int r = 0; |
| 83 | *kb_ret = NULL; |
| 84 | KIBBUTZ XCALLOC(k); |
| 85 | toku_mutex_init(*kibbutz_mutex_key, &k->mutex, nullptr); |
| 86 | toku_cond_init(*kibbutz_k_cond_key, &k->cond, nullptr); |
| 87 | k->please_shutdown = false; |
| 88 | k->head = NULL; |
| 89 | k->tail = NULL; |
| 90 | k->n_workers = n_workers; |
| 91 | k->threads_active = 0; |
| 92 | k->queue_size = 0; |
| 93 | k->max_queue_size = 0; |
| 94 | k->total_items_processed = 0; |
| 95 | k->total_execution_time = 0; |
| 96 | XMALLOC_N(n_workers, k->workers); |
| 97 | XMALLOC_N(n_workers, k->ids); |
| 98 | for (int i = 0; i < n_workers; i++) { |
| 99 | k->ids[i].k = k; |
| 100 | r = toku_pthread_create(*kibbutz_thread_key, |
| 101 | &k->workers[i], |
| 102 | nullptr, |
| 103 | work_on_kibbutz, |
| 104 | &k->ids[i]); |
| 105 | if (r != 0) { |
| 106 | k->n_workers = i; |
| 107 | toku_kibbutz_destroy(k); |
| 108 | break; |
| 109 | } |
| 110 | } |
| 111 | if (r == 0) { |
| 112 | *kb_ret = k; |
| 113 | } |
| 114 | return r; |
| 115 | } |
| 116 | |
| 117 | static void klock (KIBBUTZ k) { |
| 118 | toku_mutex_lock(&k->mutex); |
| 119 | } |
| 120 | static void kunlock (KIBBUTZ k) { |
| 121 | toku_mutex_unlock(&k->mutex); |
| 122 | } |
| 123 | static void kwait (KIBBUTZ k) { |
| 124 | toku_cond_wait(&k->cond, &k->mutex); |
| 125 | } |
| 126 | static void ksignal (KIBBUTZ k) { |
| 127 | toku_cond_signal(&k->cond); |
| 128 | } |
| 129 | |
| 130 | // |
| 131 | // pops the tail of the kibbutz off the list and works on it |
| 132 | // Note that in toku_kibbutz_enq, items are enqueued at the head, |
| 133 | // making the work be done in FIFO order. This is necessary |
| 134 | // to avoid deadlocks in flusher threads. |
| 135 | // |
| 136 | static void *work_on_kibbutz (void *kidv) { |
| 137 | struct kid *CAST_FROM_VOIDP(kid, kidv); |
| 138 | KIBBUTZ k = kid->k; |
| 139 | klock(k); |
| 140 | while (1) { |
| 141 | while (k->tail) { |
| 142 | struct todo *item = k->tail; |
| 143 | k->tail = item->prev; |
| 144 | toku_sync_sub_and_fetch(&k->queue_size, 1); |
| 145 | if (k->tail==NULL) { |
| 146 | k->head=NULL; |
| 147 | } else { |
| 148 | // if there are other things to do, then wake up the next guy, if there is one. |
| 149 | ksignal(k); |
| 150 | } |
| 151 | kunlock(k); |
| 152 | toku_sync_add_and_fetch(&k->threads_active, 1); |
| 153 | uint64_t starttime = toku_current_time_microsec(); |
| 154 | item->f(item->extra); |
| 155 | uint64_t duration = toku_current_time_microsec() - starttime; |
| 156 | toku_sync_add_and_fetch(&k->total_execution_time, duration); |
| 157 | toku_sync_add_and_fetch(&k->total_items_processed, 1); |
| 158 | toku_sync_sub_and_fetch(&k->threads_active, 1); |
| 159 | toku_free(item); |
| 160 | klock(k); |
| 161 | // if there's another item on k->head, then we'll just go grab it now, without waiting for a signal. |
| 162 | } |
| 163 | if (k->please_shutdown) { |
| 164 | // Don't follow this unless the work is all done, so that when we |
| 165 | // set please_shutdown, all the work finishes before any threads |
| 166 | // quit. |
| 167 | ksignal(k); // must wake up anyone else who is waiting, so they can |
| 168 | // shut down. |
| 169 | kunlock(k); |
| 170 | toku_instr_delete_current_thread(); |
| 171 | return nullptr; |
| 172 | } |
| 173 | // There is no work to do and it's not time to shutdown, so wait. |
| 174 | kwait(k); |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | // |
| 179 | // adds work to the head of the kibbutz |
| 180 | // Note that in work_on_kibbutz, items are popped off the tail for work, |
| 181 | // making the work be done in FIFO order. This is necessary |
| 182 | // to avoid deadlocks in flusher threads. |
| 183 | // |
| 184 | void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *) { |
| 185 | struct todo *XMALLOC(td); |
| 186 | td->f = f; |
| 187 | td->extra = extra; |
| 188 | klock(k); |
| 189 | assert(!k->please_shutdown); |
| 190 | td->next = k->head; |
| 191 | td->prev = NULL; |
| 192 | if (k->head) { |
| 193 | assert(k->head->prev == NULL); |
| 194 | k->head->prev = td; |
| 195 | } |
| 196 | k->head = td; |
| 197 | if (k->tail==NULL) k->tail = td; |
| 198 | |
| 199 | uint64_t newsize = toku_sync_add_and_fetch(&k->queue_size, 1); |
| 200 | // not exactly precise but we'll live with it |
| 201 | if (newsize > k->max_queue_size) k->max_queue_size = k->queue_size; |
| 202 | |
| 203 | ksignal(k); |
| 204 | kunlock(k); |
| 205 | } |
| 206 | |
| 207 | void toku_kibbutz_get_status(KIBBUTZ k, |
| 208 | uint64_t *num_threads, |
| 209 | uint64_t *num_threads_active, |
| 210 | uint64_t *queue_size, |
| 211 | uint64_t *max_queue_size, |
| 212 | uint64_t *total_items_processed, |
| 213 | uint64_t *total_execution_time) { |
| 214 | *num_threads = k->n_workers; |
| 215 | *num_threads_active = k->threads_active; |
| 216 | *queue_size = k->queue_size; |
| 217 | *max_queue_size = k->max_queue_size; |
| 218 | *total_items_processed = k->total_items_processed; |
| 219 | *total_execution_time = k->total_execution_time / 1000; // return in ms. |
| 220 | } |
| 221 | |
| 222 | void toku_kibbutz_destroy (KIBBUTZ k) |
| 223 | // Effect: wait for all the enqueued work to finish, and then destroy the kibbutz. |
| 224 | // Note: It is an error for to perform kibbutz_enq operations after this is called. |
| 225 | { |
| 226 | klock(k); |
| 227 | assert(!k->please_shutdown); |
| 228 | k->please_shutdown = true; |
| 229 | ksignal(k); // must wake everyone up to tell them to shutdown. |
| 230 | kunlock(k); |
| 231 | for (int i=0; i<k->n_workers; i++) { |
| 232 | void *result; |
| 233 | int r = toku_pthread_join(k->workers[i], &result); |
| 234 | assert(r==0); |
| 235 | assert(result==NULL); |
| 236 | } |
| 237 | toku_free(k->workers); |
| 238 | toku_free(k->ids); |
| 239 | toku_cond_destroy(&k->cond); |
| 240 | toku_mutex_destroy(&k->mutex); |
| 241 | toku_free(k); |
| 242 | } |
| 243 | |