| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <toku_race_tools.h> |
| 40 | #include <sys/types.h> |
| 41 | #include <pthread.h> |
| 42 | |
| 43 | #include "memory.h" |
| 44 | #include "partitioned_counter.h" |
| 45 | #include "doubly_linked_list.h" |
| 46 | #include "growable_array.h" |
| 47 | #include <portability/toku_atomic.h> |
| 48 | |
| 49 | #ifdef __APPLE__ |
| 50 | // TODO(leif): The __thread declspec is broken in ways I don't understand |
| 51 | // on Darwin. Partitioned counters use them and it would be prohibitive |
| 52 | // to tease them apart before a week after 6.5.0, so instead, we're just |
| 53 | // not going to use them in the most brutal way possible. This is a |
| 54 | // terrible implementation of the API in partitioned_counter.h but it |
| 55 | // should be correct enough to release a non-performant version on OSX for |
| 56 | // development. Soon, we need to either make portable partitioned |
| 57 | // counters, or we need to do this disabling in a portable way. |
| 58 | |
| 59 | struct partitioned_counter { |
| 60 | uint64_t v; |
| 61 | }; |
| 62 | |
| 63 | PARTITIONED_COUNTER create_partitioned_counter(void) { |
| 64 | PARTITIONED_COUNTER XCALLOC(counter); |
| 65 | return counter; |
| 66 | } |
| 67 | |
| 68 | void destroy_partitioned_counter(PARTITIONED_COUNTER counter) { |
| 69 | toku_free(counter); |
| 70 | } |
| 71 | |
| 72 | void increment_partitioned_counter(PARTITIONED_COUNTER counter, uint64_t delta) { |
| 73 | (void) toku_sync_fetch_and_add(&counter->v, delta); |
| 74 | } |
| 75 | |
| 76 | uint64_t read_partitioned_counter(PARTITIONED_COUNTER counter) { |
| 77 | return counter->v; |
| 78 | } |
| 79 | |
| 80 | void partitioned_counters_init(void) {} |
| 81 | void partitioned_counters_destroy(void) {} |
| 82 | |
| 83 | #else // __APPLE__ |
| 84 | |
| 85 | //****************************************************************************** |
| 86 | // |
| 87 | // Representation: The representation of a partitioned counter comprises a |
| 88 | // sum, called sum_of_dead; an index, called the ckey, which indexes into a |
| 89 | // thread-local array to find a thread-local part of the counter; and a |
| 90 | // linked list of thread-local parts. |
| 91 | // |
| 92 | // There is also a linked list, for each thread that has a thread-local part |
| 93 | // of any counter, of all the thread-local parts of all the counters. |
| 94 | // |
| 95 | // There is a pthread_key which gives us a hook to clean up thread-local |
| 96 | // state when a thread terminates. For each thread-local part of a counter |
| 97 | // that the thread has, we add in the thread-local sum into the sum_of_dead. |
| 98 | // |
| 99 | // Finally there is a list of all the thread-local arrays so that when we |
| 100 | // destroy the partitioned counter before the threads are done, we can find |
| 101 | // and destroy the thread_local_arrays before destroying the pthread_key. |
| 102 | // |
| 103 | // Abstraction function: The sum is represented by the sum of _sum and the |
| 104 | // sum's of the thread-local parts of the counter. |
| 105 | // |
| 106 | // Representation invariant: Every thread-local part is in the linked list of |
| 107 | // the thread-local parts of its counter, as well as in the linked list of |
| 108 | // the counters of a the thread. |
| 109 | // |
| 110 | //****************************************************************************** |
| 111 | |
| 112 | //****************************************************************************** |
| 113 | // The mutex for the PARTITIONED_COUNTER |
| 114 | // We have a single mutex for all the counters because |
| 115 | // (a) the mutex is obtained infrequently, and |
| 116 | // (b) it helps us avoid race conditions when destroying the counters. |
| 117 | // The alternative that I couldn't make work is to have a mutex per counter. |
| 118 | // But the problem is that the counter can be destroyed before threads |
| 119 | // terminate, or maybe a thread terminates before the counter is destroyed. |
| 120 | // If the counter is destroyed first, then the mutex is no longer available. |
| 121 | //****************************************************************************** |
| 122 | |
| 123 | using namespace toku; |
| 124 | |
| 125 | static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER; |
| 126 | |
| 127 | static void pc_lock (void) |
| 128 | // Effect: Lock the mutex. |
| 129 | { |
| 130 | int r = pthread_mutex_lock(&partitioned_counter_mutex); |
| 131 | assert(r==0); |
| 132 | } |
| 133 | |
| 134 | static void pc_unlock (void) |
| 135 | // Effect: Unlock the mutex. |
| 136 | { |
| 137 | int r = pthread_mutex_unlock(&partitioned_counter_mutex); |
| 138 | assert(r==0); |
| 139 | } |
| 140 | |
| 141 | //****************************************************************************** |
| 142 | // Key creation primitives |
| 143 | //****************************************************************************** |
| 144 | static void pk_create (pthread_key_t *key, void (*destructor)(void*)) { |
| 145 | int r = pthread_key_create(key, destructor); |
| 146 | assert(r==0); |
| 147 | } |
| 148 | |
| 149 | static void pk_delete (pthread_key_t key) { |
| 150 | int r = pthread_key_delete(key); |
| 151 | assert(r==0); |
| 152 | } |
| 153 | |
| 154 | static void pk_setspecific (pthread_key_t key, const void *value) { |
| 155 | int r = pthread_setspecific(key, value); |
| 156 | assert(r==0); |
| 157 | } |
| 158 | |
| 159 | //****************************************************************************** |
| 160 | // The counter itself. |
| 161 | // The thread local part of a counter, comprising the thread-local sum a pointer |
| 162 | // to the partitioned_counter, a pointer to the thread_local list head, and two |
| 163 | // linked lists. One of the lists is all the thread-local parts that belong to |
| 164 | // the same counter, and the other is all the thread-local parts that belogn to |
| 165 | // the same thread. |
| 166 | //****************************************************************************** |
| 167 | |
| 168 | struct local_counter; |
| 169 | |
| 170 | struct partitioned_counter { |
| 171 | uint64_t sum_of_dead; // The sum of all thread-local counts from threads that have terminated. |
| 172 | uint64_t pc_key; // A unique integer among all counters that have been created but not yet destroyed. |
| 173 | DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter. |
| 174 | }; |
| 175 | |
| 176 | struct local_counter { |
| 177 | uint64_t sum; // The thread-local sum. |
| 178 | PARTITIONED_COUNTER owner_pc; // The partitioned counter that this is part of. |
| 179 | GrowableArray<struct local_counter *> *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key. |
| 180 | LinkedListElement<struct local_counter *> ll_in_counter; // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER. |
| 181 | }; |
| 182 | |
| 183 | // Try to get it it into one cache line by aligning it. |
| 184 | static __thread GrowableArray<struct local_counter *> thread_local_array; |
| 185 | static __thread bool thread_local_array_inited = false; |
| 186 | |
| 187 | static DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays; |
| 188 | static __thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt; |
| 189 | |
| 190 | static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me); |
| 191 | static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__))) |
| 192 | // Effect: This function is called whenever a thread terminates using the |
| 193 | // destructor of the thread_destructor_key (defined below). First grab the |
| 194 | // lock, then go through all the partitioned counters and removes the part that |
| 195 | // is local to this thread. We don't actually need the contents of the |
| 196 | // thread_destructor_key except to cause this function to run. The content of |
| 197 | // the key is a static string, so don't try to free it. |
| 198 | { |
| 199 | pc_lock(); |
| 200 | for (size_t i=0; i<thread_local_array.get_size(); i++) { |
| 201 | struct local_counter *lc = thread_local_array.fetch_unchecked(i); |
| 202 | if (lc==NULL) continue; |
| 203 | PARTITIONED_COUNTER owner = lc->owner_pc; |
| 204 | owner->sum_of_dead += lc->sum; |
| 205 | owner->ll_counter_head.remove(&lc->ll_in_counter); |
| 206 | toku_free(lc); |
| 207 | } |
| 208 | all_thread_local_arrays.remove(&thread_local_ll_elt); |
| 209 | thread_local_array_inited = false; |
| 210 | thread_local_array.deinit(); |
| 211 | pc_unlock(); |
| 212 | } |
| 213 | |
| 214 | //****************************************************************************** |
| 215 | // We employ a system-wide pthread_key simply to get a notification when a |
| 216 | // thread terminates. The key will simply contain a constant string (it's "dont |
| 217 | // care", but it doesn't matter what it is, as long as it's not NULL. We need |
| 218 | // a constructor function to set up the pthread_key. We used a constructor |
| 219 | // function intead of a C++ constructor because that's what we are used to, |
| 220 | // rather than because it's necessarily better. Whenever a thread tries to |
| 221 | // increment a partitioned_counter for the first time, it sets the |
| 222 | // pthread_setspecific for the thread_destructor_key. It's OK if the key gets |
| 223 | // setspecific multiple times, it's always the same value. When a thread (that |
| 224 | // has created a thread-local part of any partitioned counter) terminates, the |
| 225 | // destroy_thread_local_part_of_partitioned_counters will run. It may run |
| 226 | // before or after other pthread_key destructors, but the thread-local |
| 227 | // ll_thread_head variable is still present until the thread is completely done |
| 228 | // running. |
| 229 | //****************************************************************************** |
| 230 | |
| 231 | static pthread_key_t thread_destructor_key; |
| 232 | |
| 233 | //****************************************************************************** |
| 234 | // We don't like using up pthread_keys (macos provides only 128 of them), |
| 235 | // so we built our own. Also, looking at the source code for linux libc, |
| 236 | // it looks like pthread_keys get slower if there are a lot of them. |
| 237 | // So we use only one pthread_key. |
| 238 | //****************************************************************************** |
| 239 | |
| 240 | GrowableArray<bool> counters_in_use; |
| 241 | |
| 242 | static uint64_t allocate_counter (void) |
| 243 | // Effect: Find an unused counter number, and allocate it, returning the counter number. |
| 244 | // Grabs the pc_lock. |
| 245 | { |
| 246 | uint64_t ret; |
| 247 | pc_lock(); |
| 248 | size_t size = counters_in_use.get_size(); |
| 249 | for (uint64_t i=0; i<size; i++) { |
| 250 | if (!counters_in_use.fetch_unchecked(i)) { |
| 251 | counters_in_use.store_unchecked(i, true); |
| 252 | ret = i; |
| 253 | goto unlock; |
| 254 | } |
| 255 | } |
| 256 | counters_in_use.push(true); |
| 257 | ret = size; |
| 258 | unlock: |
| 259 | pc_unlock(); |
| 260 | return ret; |
| 261 | } |
| 262 | |
| 263 | |
| 264 | static void free_counter(uint64_t counternum) |
| 265 | // Effect: Free a counter. |
| 266 | // Requires: The pc mutex is held before calling. |
| 267 | { |
| 268 | assert(counternum < counters_in_use.get_size()); |
| 269 | assert(counters_in_use.fetch_unchecked(counternum)); |
| 270 | counters_in_use.store_unchecked(counternum, false); |
| 271 | } |
| 272 | |
| 273 | static void destroy_counters (void) { |
| 274 | counters_in_use.deinit(); |
| 275 | } |
| 276 | |
| 277 | |
| 278 | //****************************************************************************** |
| 279 | // Now for the code that actually creates a counter. |
| 280 | //****************************************************************************** |
| 281 | |
| 282 | PARTITIONED_COUNTER create_partitioned_counter(void) |
| 283 | // Effect: Create a counter, initialized to zero. |
| 284 | { |
| 285 | PARTITIONED_COUNTER XMALLOC(result); |
| 286 | result->sum_of_dead = 0; |
| 287 | result->pc_key = allocate_counter(); |
| 288 | result->ll_counter_head.init(); |
| 289 | return result; |
| 290 | } |
| 291 | |
| 292 | void destroy_partitioned_counter(PARTITIONED_COUNTER pc) |
| 293 | // Effect: Destroy the counter. No operations on this counter are permitted after. |
| 294 | // Implementation note: Since we have a global lock, we can destroy all the thread-local |
| 295 | // versions as well. |
| 296 | { |
| 297 | pc_lock(); |
| 298 | uint64_t pc_key = pc->pc_key; |
| 299 | LinkedListElement<struct local_counter *> *first; |
| 300 | while (pc->ll_counter_head.pop(&first)) { |
| 301 | // We just removed first from the counter list, now we must remove it from the thread-local array. |
| 302 | struct local_counter *lc = first->get_container(); |
| 303 | assert(pc == lc->owner_pc); |
| 304 | GrowableArray<struct local_counter *> *tla = lc->thread_local_array; |
| 305 | tla->store_unchecked(pc_key, NULL); |
| 306 | toku_free(lc); |
| 307 | } |
| 308 | toku_free(pc); |
| 309 | free_counter(pc_key); |
| 310 | pc_unlock(); |
| 311 | } |
| 312 | |
| 313 | static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a) |
| 314 | { |
| 315 | if (pc_key >= a->get_size()) { |
| 316 | return NULL; |
| 317 | } else { |
| 318 | return a->fetch_unchecked(pc_key); |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc) |
| 323 | { |
| 324 | // Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL |
| 325 | // when a counter is destroyed (and in that case there should be no race because no other thread should be |
| 326 | // trying to access the same local counter at the same time. |
| 327 | uint64_t pc_key = pc->pc_key; |
| 328 | struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array); |
| 329 | if (lc == NULL) { |
| 330 | XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock. |
| 331 | pc_lock(); |
| 332 | |
| 333 | // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters. |
| 334 | if (!thread_local_array_inited) { |
| 335 | pk_setspecific(thread_destructor_key, "dont care" ); |
| 336 | thread_local_array_inited=true; |
| 337 | thread_local_array.init(); |
| 338 | all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array); |
| 339 | } |
| 340 | |
| 341 | lc->sum = 0; |
| 342 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy. |
| 343 | lc->owner_pc = pc; |
| 344 | lc->thread_local_array = &thread_local_array; |
| 345 | |
| 346 | // Grow the array if needed, filling in NULLs |
| 347 | while (thread_local_array.get_size() <= pc_key) { |
| 348 | thread_local_array.push(NULL); |
| 349 | } |
| 350 | thread_local_array.store_unchecked(pc_key, lc); |
| 351 | pc->ll_counter_head.insert(&lc->ll_in_counter, lc); |
| 352 | pc_unlock(); |
| 353 | } |
| 354 | return lc; |
| 355 | } |
| 356 | |
| 357 | void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) |
| 358 | // Effect: Increment the counter by amount. |
| 359 | // Requires: No overflows. This is a 64-bit unsigned counter. |
| 360 | { |
| 361 | struct local_counter *lc = get_or_alloc_thread_local_counter(pc); |
| 362 | lc->sum += amount; |
| 363 | } |
| 364 | |
| 365 | static int sumit(struct local_counter *lc, uint64_t *sum) { |
| 366 | (*sum)+=lc->sum; |
| 367 | return 0; |
| 368 | } |
| 369 | |
| 370 | uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc) |
| 371 | // Effect: Return the current value of the counter. |
| 372 | // Implementation note: Sum all the thread-local counts along with the sum_of_the_dead. |
| 373 | { |
| 374 | pc_lock(); |
| 375 | uint64_t sum = pc->sum_of_dead; |
| 376 | int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum); |
| 377 | assert(r==0); |
| 378 | pc_unlock(); |
| 379 | return sum; |
| 380 | } |
| 381 | |
| 382 | void partitioned_counters_init(void) |
| 383 | // Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run. |
| 384 | { |
| 385 | pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters); |
| 386 | all_thread_local_arrays.init(); |
| 387 | } |
| 388 | |
| 389 | void partitioned_counters_destroy(void) |
| 390 | // Effect: Destroy any partitioned counters data structures. |
| 391 | { |
| 392 | pc_lock(); |
| 393 | LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll; |
| 394 | while (all_thread_local_arrays.pop(&a_ll)) { |
| 395 | a_ll->get_container()->deinit(); |
| 396 | } |
| 397 | |
| 398 | pk_delete(thread_destructor_key); |
| 399 | destroy_counters(); |
| 400 | pc_unlock(); |
| 401 | } |
| 402 | |
| 403 | #endif // __APPLE__ |
| 404 | |