1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <memory.h>
40
41#include <portability/toku_config.h>
42#include <portability/toku_time.h>
43#include <toku_pthread.h>
44
45#include "kibbutz.h"
46
47// A Kibbutz is a collection of workers and some work to do.
48struct todo {
49 void (*f)(void *extra);
50 void *extra;
51 struct todo *next;
52 struct todo *prev;
53};
54
55struct kid {
56 struct kibbutz *k;
57};
58
59struct kibbutz {
60 toku_mutex_t mutex;
61 toku_cond_t cond;
62 bool please_shutdown;
63 struct todo *head, *tail; // head is the next thing to do.
64 int n_workers;
65 pthread_t *workers; // an array of n_workers
66 struct kid *ids; // pass this in when creating a worker so it knows who it is.
67
68 uint64_t threads_active;
69 uint64_t queue_size;
70 uint64_t max_queue_size;
71 uint64_t total_items_processed;
72 uint64_t total_execution_time;
73};
74
75static void *work_on_kibbutz(void *);
76
77toku_instr_key *kibbutz_mutex_key;
78toku_instr_key *kibbutz_k_cond_key;
79toku_instr_key *kibbutz_thread_key;
80
81int toku_kibbutz_create(int n_workers, KIBBUTZ *kb_ret) {
82 int r = 0;
83 *kb_ret = NULL;
84 KIBBUTZ XCALLOC(k);
85 toku_mutex_init(*kibbutz_mutex_key, &k->mutex, nullptr);
86 toku_cond_init(*kibbutz_k_cond_key, &k->cond, nullptr);
87 k->please_shutdown = false;
88 k->head = NULL;
89 k->tail = NULL;
90 k->n_workers = n_workers;
91 k->threads_active = 0;
92 k->queue_size = 0;
93 k->max_queue_size = 0;
94 k->total_items_processed = 0;
95 k->total_execution_time = 0;
96 XMALLOC_N(n_workers, k->workers);
97 XMALLOC_N(n_workers, k->ids);
98 for (int i = 0; i < n_workers; i++) {
99 k->ids[i].k = k;
100 r = toku_pthread_create(*kibbutz_thread_key,
101 &k->workers[i],
102 nullptr,
103 work_on_kibbutz,
104 &k->ids[i]);
105 if (r != 0) {
106 k->n_workers = i;
107 toku_kibbutz_destroy(k);
108 break;
109 }
110 }
111 if (r == 0) {
112 *kb_ret = k;
113 }
114 return r;
115}
116
117static void klock (KIBBUTZ k) {
118 toku_mutex_lock(&k->mutex);
119}
120static void kunlock (KIBBUTZ k) {
121 toku_mutex_unlock(&k->mutex);
122}
123static void kwait (KIBBUTZ k) {
124 toku_cond_wait(&k->cond, &k->mutex);
125}
126static void ksignal (KIBBUTZ k) {
127 toku_cond_signal(&k->cond);
128}
129
130//
131// pops the tail of the kibbutz off the list and works on it
132// Note that in toku_kibbutz_enq, items are enqueued at the head,
133// making the work be done in FIFO order. This is necessary
134// to avoid deadlocks in flusher threads.
135//
136static void *work_on_kibbutz (void *kidv) {
137 struct kid *CAST_FROM_VOIDP(kid, kidv);
138 KIBBUTZ k = kid->k;
139 klock(k);
140 while (1) {
141 while (k->tail) {
142 struct todo *item = k->tail;
143 k->tail = item->prev;
144 toku_sync_sub_and_fetch(&k->queue_size, 1);
145 if (k->tail==NULL) {
146 k->head=NULL;
147 } else {
148 // if there are other things to do, then wake up the next guy, if there is one.
149 ksignal(k);
150 }
151 kunlock(k);
152 toku_sync_add_and_fetch(&k->threads_active, 1);
153 uint64_t starttime = toku_current_time_microsec();
154 item->f(item->extra);
155 uint64_t duration = toku_current_time_microsec() - starttime;
156 toku_sync_add_and_fetch(&k->total_execution_time, duration);
157 toku_sync_add_and_fetch(&k->total_items_processed, 1);
158 toku_sync_sub_and_fetch(&k->threads_active, 1);
159 toku_free(item);
160 klock(k);
161 // if there's another item on k->head, then we'll just go grab it now, without waiting for a signal.
162 }
163 if (k->please_shutdown) {
164 // Don't follow this unless the work is all done, so that when we
165 // set please_shutdown, all the work finishes before any threads
166 // quit.
167 ksignal(k); // must wake up anyone else who is waiting, so they can
168 // shut down.
169 kunlock(k);
170 toku_instr_delete_current_thread();
171 return nullptr;
172 }
173 // There is no work to do and it's not time to shutdown, so wait.
174 kwait(k);
175 }
176}
177
178//
179// adds work to the head of the kibbutz
180// Note that in work_on_kibbutz, items are popped off the tail for work,
181// making the work be done in FIFO order. This is necessary
182// to avoid deadlocks in flusher threads.
183//
184void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *extra) {
185 struct todo *XMALLOC(td);
186 td->f = f;
187 td->extra = extra;
188 klock(k);
189 assert(!k->please_shutdown);
190 td->next = k->head;
191 td->prev = NULL;
192 if (k->head) {
193 assert(k->head->prev == NULL);
194 k->head->prev = td;
195 }
196 k->head = td;
197 if (k->tail==NULL) k->tail = td;
198
199 uint64_t newsize = toku_sync_add_and_fetch(&k->queue_size, 1);
200 // not exactly precise but we'll live with it
201 if (newsize > k->max_queue_size) k->max_queue_size = k->queue_size;
202
203 ksignal(k);
204 kunlock(k);
205}
206
207void toku_kibbutz_get_status(KIBBUTZ k,
208 uint64_t *num_threads,
209 uint64_t *num_threads_active,
210 uint64_t *queue_size,
211 uint64_t *max_queue_size,
212 uint64_t *total_items_processed,
213 uint64_t *total_execution_time) {
214 *num_threads = k->n_workers;
215 *num_threads_active = k->threads_active;
216 *queue_size = k->queue_size;
217 *max_queue_size = k->max_queue_size;
218 *total_items_processed = k->total_items_processed;
219 *total_execution_time = k->total_execution_time / 1000; // return in ms.
220}
221
222void toku_kibbutz_destroy (KIBBUTZ k)
223// Effect: wait for all the enqueued work to finish, and then destroy the kibbutz.
224// Note: It is an error for to perform kibbutz_enq operations after this is called.
225{
226 klock(k);
227 assert(!k->please_shutdown);
228 k->please_shutdown = true;
229 ksignal(k); // must wake everyone up to tell them to shutdown.
230 kunlock(k);
231 for (int i=0; i<k->n_workers; i++) {
232 void *result;
233 int r = toku_pthread_join(k->workers[i], &result);
234 assert(r==0);
235 assert(result==NULL);
236 }
237 toku_free(k->workers);
238 toku_free(k->ids);
239 toku_cond_destroy(&k->cond);
240 toku_mutex_destroy(&k->mutex);
241 toku_free(k);
242}
243