1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <memory.h> |
40 | |
41 | #include <portability/toku_config.h> |
42 | #include <portability/toku_time.h> |
43 | #include <toku_pthread.h> |
44 | |
45 | #include "kibbutz.h" |
46 | |
47 | // A Kibbutz is a collection of workers and some work to do. |
48 | struct todo { |
49 | void (*f)(void *); |
50 | void *; |
51 | struct todo *next; |
52 | struct todo *prev; |
53 | }; |
54 | |
55 | struct kid { |
56 | struct kibbutz *k; |
57 | }; |
58 | |
59 | struct kibbutz { |
60 | toku_mutex_t mutex; |
61 | toku_cond_t cond; |
62 | bool please_shutdown; |
63 | struct todo *head, *tail; // head is the next thing to do. |
64 | int n_workers; |
65 | pthread_t *workers; // an array of n_workers |
66 | struct kid *ids; // pass this in when creating a worker so it knows who it is. |
67 | |
68 | uint64_t threads_active; |
69 | uint64_t queue_size; |
70 | uint64_t max_queue_size; |
71 | uint64_t total_items_processed; |
72 | uint64_t total_execution_time; |
73 | }; |
74 | |
75 | static void *work_on_kibbutz(void *); |
76 | |
77 | toku_instr_key *kibbutz_mutex_key; |
78 | toku_instr_key *kibbutz_k_cond_key; |
79 | toku_instr_key *kibbutz_thread_key; |
80 | |
81 | int toku_kibbutz_create(int n_workers, KIBBUTZ *kb_ret) { |
82 | int r = 0; |
83 | *kb_ret = NULL; |
84 | KIBBUTZ XCALLOC(k); |
85 | toku_mutex_init(*kibbutz_mutex_key, &k->mutex, nullptr); |
86 | toku_cond_init(*kibbutz_k_cond_key, &k->cond, nullptr); |
87 | k->please_shutdown = false; |
88 | k->head = NULL; |
89 | k->tail = NULL; |
90 | k->n_workers = n_workers; |
91 | k->threads_active = 0; |
92 | k->queue_size = 0; |
93 | k->max_queue_size = 0; |
94 | k->total_items_processed = 0; |
95 | k->total_execution_time = 0; |
96 | XMALLOC_N(n_workers, k->workers); |
97 | XMALLOC_N(n_workers, k->ids); |
98 | for (int i = 0; i < n_workers; i++) { |
99 | k->ids[i].k = k; |
100 | r = toku_pthread_create(*kibbutz_thread_key, |
101 | &k->workers[i], |
102 | nullptr, |
103 | work_on_kibbutz, |
104 | &k->ids[i]); |
105 | if (r != 0) { |
106 | k->n_workers = i; |
107 | toku_kibbutz_destroy(k); |
108 | break; |
109 | } |
110 | } |
111 | if (r == 0) { |
112 | *kb_ret = k; |
113 | } |
114 | return r; |
115 | } |
116 | |
117 | static void klock (KIBBUTZ k) { |
118 | toku_mutex_lock(&k->mutex); |
119 | } |
120 | static void kunlock (KIBBUTZ k) { |
121 | toku_mutex_unlock(&k->mutex); |
122 | } |
123 | static void kwait (KIBBUTZ k) { |
124 | toku_cond_wait(&k->cond, &k->mutex); |
125 | } |
126 | static void ksignal (KIBBUTZ k) { |
127 | toku_cond_signal(&k->cond); |
128 | } |
129 | |
130 | // |
131 | // pops the tail of the kibbutz off the list and works on it |
132 | // Note that in toku_kibbutz_enq, items are enqueued at the head, |
133 | // making the work be done in FIFO order. This is necessary |
134 | // to avoid deadlocks in flusher threads. |
135 | // |
136 | static void *work_on_kibbutz (void *kidv) { |
137 | struct kid *CAST_FROM_VOIDP(kid, kidv); |
138 | KIBBUTZ k = kid->k; |
139 | klock(k); |
140 | while (1) { |
141 | while (k->tail) { |
142 | struct todo *item = k->tail; |
143 | k->tail = item->prev; |
144 | toku_sync_sub_and_fetch(&k->queue_size, 1); |
145 | if (k->tail==NULL) { |
146 | k->head=NULL; |
147 | } else { |
148 | // if there are other things to do, then wake up the next guy, if there is one. |
149 | ksignal(k); |
150 | } |
151 | kunlock(k); |
152 | toku_sync_add_and_fetch(&k->threads_active, 1); |
153 | uint64_t starttime = toku_current_time_microsec(); |
154 | item->f(item->extra); |
155 | uint64_t duration = toku_current_time_microsec() - starttime; |
156 | toku_sync_add_and_fetch(&k->total_execution_time, duration); |
157 | toku_sync_add_and_fetch(&k->total_items_processed, 1); |
158 | toku_sync_sub_and_fetch(&k->threads_active, 1); |
159 | toku_free(item); |
160 | klock(k); |
161 | // if there's another item on k->head, then we'll just go grab it now, without waiting for a signal. |
162 | } |
163 | if (k->please_shutdown) { |
164 | // Don't follow this unless the work is all done, so that when we |
165 | // set please_shutdown, all the work finishes before any threads |
166 | // quit. |
167 | ksignal(k); // must wake up anyone else who is waiting, so they can |
168 | // shut down. |
169 | kunlock(k); |
170 | toku_instr_delete_current_thread(); |
171 | return nullptr; |
172 | } |
173 | // There is no work to do and it's not time to shutdown, so wait. |
174 | kwait(k); |
175 | } |
176 | } |
177 | |
178 | // |
179 | // adds work to the head of the kibbutz |
180 | // Note that in work_on_kibbutz, items are popped off the tail for work, |
181 | // making the work be done in FIFO order. This is necessary |
182 | // to avoid deadlocks in flusher threads. |
183 | // |
184 | void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *) { |
185 | struct todo *XMALLOC(td); |
186 | td->f = f; |
187 | td->extra = extra; |
188 | klock(k); |
189 | assert(!k->please_shutdown); |
190 | td->next = k->head; |
191 | td->prev = NULL; |
192 | if (k->head) { |
193 | assert(k->head->prev == NULL); |
194 | k->head->prev = td; |
195 | } |
196 | k->head = td; |
197 | if (k->tail==NULL) k->tail = td; |
198 | |
199 | uint64_t newsize = toku_sync_add_and_fetch(&k->queue_size, 1); |
200 | // not exactly precise but we'll live with it |
201 | if (newsize > k->max_queue_size) k->max_queue_size = k->queue_size; |
202 | |
203 | ksignal(k); |
204 | kunlock(k); |
205 | } |
206 | |
207 | void toku_kibbutz_get_status(KIBBUTZ k, |
208 | uint64_t *num_threads, |
209 | uint64_t *num_threads_active, |
210 | uint64_t *queue_size, |
211 | uint64_t *max_queue_size, |
212 | uint64_t *total_items_processed, |
213 | uint64_t *total_execution_time) { |
214 | *num_threads = k->n_workers; |
215 | *num_threads_active = k->threads_active; |
216 | *queue_size = k->queue_size; |
217 | *max_queue_size = k->max_queue_size; |
218 | *total_items_processed = k->total_items_processed; |
219 | *total_execution_time = k->total_execution_time / 1000; // return in ms. |
220 | } |
221 | |
222 | void toku_kibbutz_destroy (KIBBUTZ k) |
223 | // Effect: wait for all the enqueued work to finish, and then destroy the kibbutz. |
224 | // Note: It is an error for to perform kibbutz_enq operations after this is called. |
225 | { |
226 | klock(k); |
227 | assert(!k->please_shutdown); |
228 | k->please_shutdown = true; |
229 | ksignal(k); // must wake everyone up to tell them to shutdown. |
230 | kunlock(k); |
231 | for (int i=0; i<k->n_workers; i++) { |
232 | void *result; |
233 | int r = toku_pthread_join(k->workers[i], &result); |
234 | assert(r==0); |
235 | assert(result==NULL); |
236 | } |
237 | toku_free(k->workers); |
238 | toku_free(k->ids); |
239 | toku_cond_destroy(&k->cond); |
240 | toku_mutex_destroy(&k->mutex); |
241 | toku_free(k); |
242 | } |
243 | |