1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <toku_race_tools.h> |
40 | #include <sys/types.h> |
41 | #include <pthread.h> |
42 | |
43 | #include "memory.h" |
44 | #include "partitioned_counter.h" |
45 | #include "doubly_linked_list.h" |
46 | #include "growable_array.h" |
47 | #include <portability/toku_atomic.h> |
48 | |
49 | #ifdef __APPLE__ |
50 | // TODO(leif): The __thread declspec is broken in ways I don't understand |
51 | // on Darwin. Partitioned counters use them and it would be prohibitive |
52 | // to tease them apart before a week after 6.5.0, so instead, we're just |
53 | // not going to use them in the most brutal way possible. This is a |
54 | // terrible implementation of the API in partitioned_counter.h but it |
55 | // should be correct enough to release a non-performant version on OSX for |
56 | // development. Soon, we need to either make portable partitioned |
57 | // counters, or we need to do this disabling in a portable way. |
58 | |
59 | struct partitioned_counter { |
60 | uint64_t v; |
61 | }; |
62 | |
63 | PARTITIONED_COUNTER create_partitioned_counter(void) { |
64 | PARTITIONED_COUNTER XCALLOC(counter); |
65 | return counter; |
66 | } |
67 | |
68 | void destroy_partitioned_counter(PARTITIONED_COUNTER counter) { |
69 | toku_free(counter); |
70 | } |
71 | |
72 | void increment_partitioned_counter(PARTITIONED_COUNTER counter, uint64_t delta) { |
73 | (void) toku_sync_fetch_and_add(&counter->v, delta); |
74 | } |
75 | |
76 | uint64_t read_partitioned_counter(PARTITIONED_COUNTER counter) { |
77 | return counter->v; |
78 | } |
79 | |
80 | void partitioned_counters_init(void) {} |
81 | void partitioned_counters_destroy(void) {} |
82 | |
83 | #else // __APPLE__ |
84 | |
85 | //****************************************************************************** |
86 | // |
87 | // Representation: The representation of a partitioned counter comprises a |
88 | // sum, called sum_of_dead; an index, called the ckey, which indexes into a |
89 | // thread-local array to find a thread-local part of the counter; and a |
90 | // linked list of thread-local parts. |
91 | // |
92 | // There is also a linked list, for each thread that has a thread-local part |
93 | // of any counter, of all the thread-local parts of all the counters. |
94 | // |
95 | // There is a pthread_key which gives us a hook to clean up thread-local |
96 | // state when a thread terminates. For each thread-local part of a counter |
97 | // that the thread has, we add in the thread-local sum into the sum_of_dead. |
98 | // |
99 | // Finally there is a list of all the thread-local arrays so that when we |
100 | // destroy the partitioned counter before the threads are done, we can find |
101 | // and destroy the thread_local_arrays before destroying the pthread_key. |
102 | // |
103 | // Abstraction function: The sum is represented by the sum of _sum and the |
104 | // sum's of the thread-local parts of the counter. |
105 | // |
106 | // Representation invariant: Every thread-local part is in the linked list of |
107 | // the thread-local parts of its counter, as well as in the linked list of |
108 | // the counters of a the thread. |
109 | // |
110 | //****************************************************************************** |
111 | |
112 | //****************************************************************************** |
113 | // The mutex for the PARTITIONED_COUNTER |
114 | // We have a single mutex for all the counters because |
115 | // (a) the mutex is obtained infrequently, and |
116 | // (b) it helps us avoid race conditions when destroying the counters. |
117 | // The alternative that I couldn't make work is to have a mutex per counter. |
118 | // But the problem is that the counter can be destroyed before threads |
119 | // terminate, or maybe a thread terminates before the counter is destroyed. |
120 | // If the counter is destroyed first, then the mutex is no longer available. |
121 | //****************************************************************************** |
122 | |
123 | using namespace toku; |
124 | |
125 | static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER; |
126 | |
127 | static void pc_lock (void) |
128 | // Effect: Lock the mutex. |
129 | { |
130 | int r = pthread_mutex_lock(&partitioned_counter_mutex); |
131 | assert(r==0); |
132 | } |
133 | |
134 | static void pc_unlock (void) |
135 | // Effect: Unlock the mutex. |
136 | { |
137 | int r = pthread_mutex_unlock(&partitioned_counter_mutex); |
138 | assert(r==0); |
139 | } |
140 | |
141 | //****************************************************************************** |
142 | // Key creation primitives |
143 | //****************************************************************************** |
144 | static void pk_create (pthread_key_t *key, void (*destructor)(void*)) { |
145 | int r = pthread_key_create(key, destructor); |
146 | assert(r==0); |
147 | } |
148 | |
149 | static void pk_delete (pthread_key_t key) { |
150 | int r = pthread_key_delete(key); |
151 | assert(r==0); |
152 | } |
153 | |
154 | static void pk_setspecific (pthread_key_t key, const void *value) { |
155 | int r = pthread_setspecific(key, value); |
156 | assert(r==0); |
157 | } |
158 | |
159 | //****************************************************************************** |
160 | // The counter itself. |
161 | // The thread local part of a counter, comprising the thread-local sum a pointer |
162 | // to the partitioned_counter, a pointer to the thread_local list head, and two |
163 | // linked lists. One of the lists is all the thread-local parts that belong to |
164 | // the same counter, and the other is all the thread-local parts that belogn to |
165 | // the same thread. |
166 | //****************************************************************************** |
167 | |
168 | struct local_counter; |
169 | |
170 | struct partitioned_counter { |
171 | uint64_t sum_of_dead; // The sum of all thread-local counts from threads that have terminated. |
172 | uint64_t pc_key; // A unique integer among all counters that have been created but not yet destroyed. |
173 | DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter. |
174 | }; |
175 | |
176 | struct local_counter { |
177 | uint64_t sum; // The thread-local sum. |
178 | PARTITIONED_COUNTER owner_pc; // The partitioned counter that this is part of. |
179 | GrowableArray<struct local_counter *> *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key. |
180 | LinkedListElement<struct local_counter *> ll_in_counter; // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER. |
181 | }; |
182 | |
183 | // Try to get it it into one cache line by aligning it. |
184 | static __thread GrowableArray<struct local_counter *> thread_local_array; |
185 | static __thread bool thread_local_array_inited = false; |
186 | |
187 | static DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays; |
188 | static __thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt; |
189 | |
190 | static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me); |
191 | static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__))) |
192 | // Effect: This function is called whenever a thread terminates using the |
193 | // destructor of the thread_destructor_key (defined below). First grab the |
194 | // lock, then go through all the partitioned counters and removes the part that |
195 | // is local to this thread. We don't actually need the contents of the |
196 | // thread_destructor_key except to cause this function to run. The content of |
197 | // the key is a static string, so don't try to free it. |
198 | { |
199 | pc_lock(); |
200 | for (size_t i=0; i<thread_local_array.get_size(); i++) { |
201 | struct local_counter *lc = thread_local_array.fetch_unchecked(i); |
202 | if (lc==NULL) continue; |
203 | PARTITIONED_COUNTER owner = lc->owner_pc; |
204 | owner->sum_of_dead += lc->sum; |
205 | owner->ll_counter_head.remove(&lc->ll_in_counter); |
206 | toku_free(lc); |
207 | } |
208 | all_thread_local_arrays.remove(&thread_local_ll_elt); |
209 | thread_local_array_inited = false; |
210 | thread_local_array.deinit(); |
211 | pc_unlock(); |
212 | } |
213 | |
214 | //****************************************************************************** |
215 | // We employ a system-wide pthread_key simply to get a notification when a |
216 | // thread terminates. The key will simply contain a constant string (it's "dont |
217 | // care", but it doesn't matter what it is, as long as it's not NULL. We need |
218 | // a constructor function to set up the pthread_key. We used a constructor |
219 | // function intead of a C++ constructor because that's what we are used to, |
220 | // rather than because it's necessarily better. Whenever a thread tries to |
221 | // increment a partitioned_counter for the first time, it sets the |
222 | // pthread_setspecific for the thread_destructor_key. It's OK if the key gets |
223 | // setspecific multiple times, it's always the same value. When a thread (that |
224 | // has created a thread-local part of any partitioned counter) terminates, the |
225 | // destroy_thread_local_part_of_partitioned_counters will run. It may run |
226 | // before or after other pthread_key destructors, but the thread-local |
227 | // ll_thread_head variable is still present until the thread is completely done |
228 | // running. |
229 | //****************************************************************************** |
230 | |
231 | static pthread_key_t thread_destructor_key; |
232 | |
233 | //****************************************************************************** |
234 | // We don't like using up pthread_keys (macos provides only 128 of them), |
235 | // so we built our own. Also, looking at the source code for linux libc, |
236 | // it looks like pthread_keys get slower if there are a lot of them. |
237 | // So we use only one pthread_key. |
238 | //****************************************************************************** |
239 | |
240 | GrowableArray<bool> counters_in_use; |
241 | |
242 | static uint64_t allocate_counter (void) |
243 | // Effect: Find an unused counter number, and allocate it, returning the counter number. |
244 | // Grabs the pc_lock. |
245 | { |
246 | uint64_t ret; |
247 | pc_lock(); |
248 | size_t size = counters_in_use.get_size(); |
249 | for (uint64_t i=0; i<size; i++) { |
250 | if (!counters_in_use.fetch_unchecked(i)) { |
251 | counters_in_use.store_unchecked(i, true); |
252 | ret = i; |
253 | goto unlock; |
254 | } |
255 | } |
256 | counters_in_use.push(true); |
257 | ret = size; |
258 | unlock: |
259 | pc_unlock(); |
260 | return ret; |
261 | } |
262 | |
263 | |
264 | static void free_counter(uint64_t counternum) |
265 | // Effect: Free a counter. |
266 | // Requires: The pc mutex is held before calling. |
267 | { |
268 | assert(counternum < counters_in_use.get_size()); |
269 | assert(counters_in_use.fetch_unchecked(counternum)); |
270 | counters_in_use.store_unchecked(counternum, false); |
271 | } |
272 | |
273 | static void destroy_counters (void) { |
274 | counters_in_use.deinit(); |
275 | } |
276 | |
277 | |
278 | //****************************************************************************** |
279 | // Now for the code that actually creates a counter. |
280 | //****************************************************************************** |
281 | |
282 | PARTITIONED_COUNTER create_partitioned_counter(void) |
283 | // Effect: Create a counter, initialized to zero. |
284 | { |
285 | PARTITIONED_COUNTER XMALLOC(result); |
286 | result->sum_of_dead = 0; |
287 | result->pc_key = allocate_counter(); |
288 | result->ll_counter_head.init(); |
289 | return result; |
290 | } |
291 | |
292 | void destroy_partitioned_counter(PARTITIONED_COUNTER pc) |
293 | // Effect: Destroy the counter. No operations on this counter are permitted after. |
294 | // Implementation note: Since we have a global lock, we can destroy all the thread-local |
295 | // versions as well. |
296 | { |
297 | pc_lock(); |
298 | uint64_t pc_key = pc->pc_key; |
299 | LinkedListElement<struct local_counter *> *first; |
300 | while (pc->ll_counter_head.pop(&first)) { |
301 | // We just removed first from the counter list, now we must remove it from the thread-local array. |
302 | struct local_counter *lc = first->get_container(); |
303 | assert(pc == lc->owner_pc); |
304 | GrowableArray<struct local_counter *> *tla = lc->thread_local_array; |
305 | tla->store_unchecked(pc_key, NULL); |
306 | toku_free(lc); |
307 | } |
308 | toku_free(pc); |
309 | free_counter(pc_key); |
310 | pc_unlock(); |
311 | } |
312 | |
313 | static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a) |
314 | { |
315 | if (pc_key >= a->get_size()) { |
316 | return NULL; |
317 | } else { |
318 | return a->fetch_unchecked(pc_key); |
319 | } |
320 | } |
321 | |
322 | static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc) |
323 | { |
324 | // Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL |
325 | // when a counter is destroyed (and in that case there should be no race because no other thread should be |
326 | // trying to access the same local counter at the same time. |
327 | uint64_t pc_key = pc->pc_key; |
328 | struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array); |
329 | if (lc == NULL) { |
330 | XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock. |
331 | pc_lock(); |
332 | |
333 | // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters. |
334 | if (!thread_local_array_inited) { |
335 | pk_setspecific(thread_destructor_key, "dont care" ); |
336 | thread_local_array_inited=true; |
337 | thread_local_array.init(); |
338 | all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array); |
339 | } |
340 | |
341 | lc->sum = 0; |
342 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy. |
343 | lc->owner_pc = pc; |
344 | lc->thread_local_array = &thread_local_array; |
345 | |
346 | // Grow the array if needed, filling in NULLs |
347 | while (thread_local_array.get_size() <= pc_key) { |
348 | thread_local_array.push(NULL); |
349 | } |
350 | thread_local_array.store_unchecked(pc_key, lc); |
351 | pc->ll_counter_head.insert(&lc->ll_in_counter, lc); |
352 | pc_unlock(); |
353 | } |
354 | return lc; |
355 | } |
356 | |
357 | void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) |
358 | // Effect: Increment the counter by amount. |
359 | // Requires: No overflows. This is a 64-bit unsigned counter. |
360 | { |
361 | struct local_counter *lc = get_or_alloc_thread_local_counter(pc); |
362 | lc->sum += amount; |
363 | } |
364 | |
365 | static int sumit(struct local_counter *lc, uint64_t *sum) { |
366 | (*sum)+=lc->sum; |
367 | return 0; |
368 | } |
369 | |
370 | uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc) |
371 | // Effect: Return the current value of the counter. |
372 | // Implementation note: Sum all the thread-local counts along with the sum_of_the_dead. |
373 | { |
374 | pc_lock(); |
375 | uint64_t sum = pc->sum_of_dead; |
376 | int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum); |
377 | assert(r==0); |
378 | pc_unlock(); |
379 | return sum; |
380 | } |
381 | |
382 | void partitioned_counters_init(void) |
383 | // Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run. |
384 | { |
385 | pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters); |
386 | all_thread_local_arrays.init(); |
387 | } |
388 | |
389 | void partitioned_counters_destroy(void) |
390 | // Effect: Destroy any partitioned counters data structures. |
391 | { |
392 | pc_lock(); |
393 | LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll; |
394 | while (all_thread_local_arrays.pop(&a_ll)) { |
395 | a_ll->get_container()->deinit(); |
396 | } |
397 | |
398 | pk_delete(thread_destructor_key); |
399 | destroy_counters(); |
400 | pc_unlock(); |
401 | } |
402 | |
403 | #endif // __APPLE__ |
404 | |