| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <my_global.h> |
| 40 | #include <string.h> |
| 41 | #include <time.h> |
| 42 | #include <stdarg.h> |
| 43 | |
| 44 | #include <portability/memory.h> |
| 45 | #include <portability/toku_race_tools.h> |
| 46 | #include <portability/toku_atomic.h> |
| 47 | #include <portability/toku_pthread.h> |
| 48 | #include <portability/toku_portability.h> |
| 49 | #include <portability/toku_stdlib.h> |
| 50 | #include <portability/toku_time.h> |
| 51 | |
| 52 | #include "ft/cachetable/cachetable.h" |
| 53 | #include "ft/cachetable/cachetable-internal.h" |
| 54 | #include "ft/cachetable/checkpoint.h" |
| 55 | #include "ft/logger/log-internal.h" |
| 56 | #include "util/rwlock.h" |
| 57 | #include "util/scoped_malloc.h" |
| 58 | #include "util/status.h" |
| 59 | #include "util/context.h" |
| 60 | |
| 61 | toku_instr_key *cachetable_m_mutex_key; |
| 62 | toku_instr_key *cachetable_ev_thread_lock_mutex_key; |
| 63 | |
| 64 | toku_instr_key *cachetable_m_list_lock_key; |
| 65 | toku_instr_key *cachetable_m_pending_lock_expensive_key; |
| 66 | toku_instr_key *cachetable_m_pending_lock_cheap_key; |
| 67 | toku_instr_key *cachetable_m_lock_key; |
| 68 | |
| 69 | toku_instr_key *cachetable_value_key; |
| 70 | toku_instr_key *cachetable_disk_nb_rwlock_key; |
| 71 | |
| 72 | toku_instr_key *cachetable_p_refcount_wait_key; |
| 73 | toku_instr_key *cachetable_m_flow_control_cond_key; |
| 74 | toku_instr_key *cachetable_m_ev_thread_cond_key; |
| 75 | |
| 76 | toku_instr_key *cachetable_disk_nb_mutex_key; |
| 77 | toku_instr_key *log_internal_lock_mutex_key; |
| 78 | toku_instr_key *eviction_thread_key; |
| 79 | |
| 80 | /////////////////////////////////////////////////////////////////////////////////// |
| 81 | // Engine status |
| 82 | // |
| 83 | // Status is intended for display to humans to help understand system behavior. |
| 84 | // It does not need to be perfectly thread-safe. |
| 85 | |
| 86 | // These should be in the cachetable object, but we make them file-wide so that gdb can get them easily. |
| 87 | // They were left here after engine status cleanup (#2949, rather than moved into the status struct) |
| 88 | // so they are still easily available to the debugger and to save lots of typing. |
| 89 | static uint64_t cachetable_miss; |
| 90 | static uint64_t cachetable_misstime; // time spent waiting for disk read |
| 91 | static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? |
| 92 | static uint64_t cachetable_evictions; |
| 93 | static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed |
| 94 | |
| 95 | |
| 96 | // Note, toku_cachetable_get_status() is below, after declaration of cachetable. |
| 97 | |
| 98 | static void * const zero_value = nullptr; |
| 99 | static PAIR_ATTR const zero_attr = { |
| 100 | .size = 0, |
| 101 | .nonleaf_size = 0, |
| 102 | .leaf_size = 0, |
| 103 | .rollback_size = 0, |
| 104 | .cache_pressure_size = 0, |
| 105 | .is_valid = true |
| 106 | }; |
| 107 | |
| 108 | |
| 109 | static inline void ctpair_destroy(PAIR p) { |
| 110 | p->value_rwlock.deinit(); |
| 111 | paranoid_invariant(p->refcount == 0); |
| 112 | nb_mutex_destroy(&p->disk_nb_mutex); |
| 113 | toku_cond_destroy(&p->refcount_wait); |
| 114 | toku_free(p); |
| 115 | } |
| 116 | |
| 117 | static inline void pair_lock(PAIR p) { |
| 118 | toku_mutex_lock(p->mutex); |
| 119 | } |
| 120 | |
| 121 | static inline void pair_unlock(PAIR p) { |
| 122 | toku_mutex_unlock(p->mutex); |
| 123 | } |
| 124 | |
| 125 | // adds a reference to the PAIR |
| 126 | // on input and output, PAIR mutex is held |
| 127 | static void pair_add_ref_unlocked(PAIR p) { |
| 128 | p->refcount++; |
| 129 | } |
| 130 | |
| 131 | // releases a reference to the PAIR |
| 132 | // on input and output, PAIR mutex is held |
| 133 | static void pair_release_ref_unlocked(PAIR p) { |
| 134 | paranoid_invariant(p->refcount > 0); |
| 135 | p->refcount--; |
| 136 | if (p->refcount == 0 && p->num_waiting_on_refs > 0) { |
| 137 | toku_cond_broadcast(&p->refcount_wait); |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | static void pair_wait_for_ref_release_unlocked(PAIR p) { |
| 142 | p->num_waiting_on_refs++; |
| 143 | while (p->refcount > 0) { |
| 144 | toku_cond_wait(&p->refcount_wait, p->mutex); |
| 145 | } |
| 146 | p->num_waiting_on_refs--; |
| 147 | } |
| 148 | |
| 149 | bool toku_ctpair_is_write_locked(PAIR pair) { |
| 150 | return pair->value_rwlock.writers() == 1; |
| 151 | } |
| 152 | |
| 153 | void |
| 154 | toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { |
| 155 | ct_status.init(); |
| 156 | CT_STATUS_VAL(CT_MISS) = cachetable_miss; |
| 157 | CT_STATUS_VAL(CT_MISSTIME) = cachetable_misstime; |
| 158 | CT_STATUS_VAL(CT_PREFETCHES) = cachetable_prefetches; |
| 159 | CT_STATUS_VAL(CT_EVICTIONS) = cachetable_evictions; |
| 160 | CT_STATUS_VAL(CT_CLEANER_EXECUTIONS) = cleaner_executions; |
| 161 | CT_STATUS_VAL(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct); |
| 162 | CT_STATUS_VAL(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct); |
| 163 | toku_kibbutz_get_status(ct->client_kibbutz, |
| 164 | &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS), |
| 165 | &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE), |
| 166 | &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE), |
| 167 | &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE), |
| 168 | &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED), |
| 169 | &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME)); |
| 170 | toku_kibbutz_get_status(ct->ct_kibbutz, |
| 171 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS), |
| 172 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE), |
| 173 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE), |
| 174 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE), |
| 175 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED), |
| 176 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME)); |
| 177 | toku_kibbutz_get_status(ct->checkpointing_kibbutz, |
| 178 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS), |
| 179 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE), |
| 180 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE), |
| 181 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE), |
| 182 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED), |
| 183 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME)); |
| 184 | ct->ev.fill_engine_status(); |
| 185 | *statp = ct_status; |
| 186 | } |
| 187 | |
| 188 | // FIXME global with no toku prefix |
| 189 | void remove_background_job_from_cf(CACHEFILE cf) |
| 190 | { |
| 191 | bjm_remove_background_job(cf->bjm); |
| 192 | } |
| 193 | |
| 194 | // FIXME global with no toku prefix |
| 195 | void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *) |
| 196 | // The function f must call remove_background_job_from_cf when it completes |
| 197 | { |
| 198 | int r = bjm_add_background_job(cf->bjm); |
| 199 | // if client is adding a background job, then it must be done |
| 200 | // at a time when the manager is accepting background jobs, otherwise |
| 201 | // the client is screwing up |
| 202 | assert_zero(r); |
| 203 | toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra); |
| 204 | } |
| 205 | |
| 206 | static int |
| 207 | checkpoint_thread (void *checkpointer_v) |
| 208 | // Effect: If checkpoint_period>0 thn periodically run a checkpoint. |
| 209 | // If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later. |
| 210 | // If someone sets the checkpoint_shutdown boolean , then this thread exits. |
| 211 | // This thread notices those changes by waiting on a condition variable. |
| 212 | { |
| 213 | CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v); |
| 214 | int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT); |
| 215 | invariant_zero(r); |
| 216 | return r; |
| 217 | } |
| 218 | |
| 219 | void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) { |
| 220 | ct->cp.set_checkpoint_period(new_period); |
| 221 | } |
| 222 | |
| 223 | uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) { |
| 224 | return ct->cp.get_checkpoint_period(); |
| 225 | } |
| 226 | |
| 227 | void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) { |
| 228 | ct->cl.set_period(new_period); |
| 229 | } |
| 230 | |
| 231 | uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) { |
| 232 | return ct->cl.get_period_unlocked(); |
| 233 | } |
| 234 | |
| 235 | void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) { |
| 236 | ct->cl.set_iterations(new_iterations); |
| 237 | } |
| 238 | |
| 239 | uint32_t toku_get_cleaner_iterations (CACHETABLE ct) { |
| 240 | return ct->cl.get_iterations(); |
| 241 | } |
| 242 | |
| 243 | uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) { |
| 244 | return ct->cl.get_iterations(); |
| 245 | } |
| 246 | |
| 247 | void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) { |
| 248 | ct->ev.set_enable_partial_eviction(enabled); |
| 249 | } |
| 250 | |
| 251 | bool toku_get_enable_partial_eviction (CACHETABLE ct) { |
| 252 | return ct->ev.get_enable_partial_eviction(); |
| 253 | } |
| 254 | |
| 255 | // reserve 25% as "unreservable". The loader cannot have it. |
| 256 | #define unreservable_memory(size) ((size)/4) |
| 257 | |
| 258 | int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit, |
| 259 | unsigned long client_pool_threads, |
| 260 | unsigned long cachetable_pool_threads, |
| 261 | unsigned long checkpoint_pool_threads, |
| 262 | LSN UU(initial_lsn), TOKULOGGER logger) { |
| 263 | int result = 0; |
| 264 | int r; |
| 265 | |
| 266 | if (size_limit == 0) { |
| 267 | size_limit = 128*1024*1024; |
| 268 | } |
| 269 | |
| 270 | CACHETABLE XCALLOC(ct); |
| 271 | ct->list.init(); |
| 272 | ct->cf_list.init(); |
| 273 | |
| 274 | int num_processors = toku_os_get_number_active_processors(); |
| 275 | int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1; |
| 276 | r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors, |
| 277 | &ct->client_kibbutz); |
| 278 | if (r != 0) { |
| 279 | result = r; |
| 280 | goto cleanup; |
| 281 | } |
| 282 | r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors, |
| 283 | &ct->ct_kibbutz); |
| 284 | if (r != 0) { |
| 285 | result = r; |
| 286 | goto cleanup; |
| 287 | } |
| 288 | r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers, |
| 289 | &ct->checkpointing_kibbutz); |
| 290 | if (r != 0) { |
| 291 | result = r; |
| 292 | goto cleanup; |
| 293 | } |
| 294 | // must be done after creating ct_kibbutz |
| 295 | r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD); |
| 296 | if (r != 0) { |
| 297 | result = r; |
| 298 | goto cleanup; |
| 299 | } |
| 300 | r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list); |
| 301 | if (r != 0) { |
| 302 | result = r; |
| 303 | goto cleanup; |
| 304 | } |
| 305 | r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration |
| 306 | if (r != 0) { |
| 307 | result = r; |
| 308 | goto cleanup; |
| 309 | } |
| 310 | ct->env_dir = toku_xstrdup("." ); |
| 311 | cleanup: |
| 312 | if (result == 0) { |
| 313 | *ct_result = ct; |
| 314 | } else { |
| 315 | toku_cachetable_close(&ct); |
| 316 | } |
| 317 | return result; |
| 318 | } |
| 319 | |
| 320 | // Returns a pointer to the checkpoint contained within |
| 321 | // the given cachetable. |
| 322 | CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) { |
| 323 | return &ct->cp; |
| 324 | } |
| 325 | |
| 326 | uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) { |
| 327 | uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound); |
| 328 | return reserved_memory; |
| 329 | } |
| 330 | |
| 331 | void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) { |
| 332 | ct->ev.release_reserved_memory(reserved_memory); |
| 333 | } |
| 334 | |
| 335 | void |
| 336 | toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) { |
| 337 | toku_free(ct->env_dir); |
| 338 | ct->env_dir = toku_xstrdup(env_dir); |
| 339 | } |
| 340 | |
| 341 | // What cachefile goes with particular iname (iname relative to env)? |
| 342 | // The transaction that is adding the reference might not have a reference |
| 343 | // to the ft, therefore the cachefile might be closing. |
| 344 | // If closing, we want to return that it is not there, but must wait till after |
| 345 | // the close has finished. |
| 346 | // Once the close has finished, there must not be a cachefile with that name |
| 347 | // in the cachetable. |
| 348 | int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) { |
| 349 | return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf); |
| 350 | } |
| 351 | |
| 352 | // What cachefile goes with particular fd? |
| 353 | // This function can only be called if the ft is still open, so file must |
| 354 | // still be open |
| 355 | int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) { |
| 356 | return ct->cf_list.cachefile_of_filenum(filenum, cf); |
| 357 | } |
| 358 | |
| 359 | // TEST-ONLY function |
| 360 | // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. |
| 361 | int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) { |
| 362 | FILENUM filenum = toku_cachetable_reserve_filenum(ct); |
| 363 | bool was_open; |
| 364 | return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open); |
| 365 | } |
| 366 | |
| 367 | // Get a unique filenum from the cachetable |
| 368 | FILENUM |
| 369 | toku_cachetable_reserve_filenum(CACHETABLE ct) { |
| 370 | return ct->cf_list.reserve_filenum(); |
| 371 | } |
| 372 | |
| 373 | static void create_new_cachefile( |
| 374 | CACHETABLE ct, |
| 375 | FILENUM filenum, |
| 376 | uint32_t hash_id, |
| 377 | int fd, |
| 378 | const char *fname_in_env, |
| 379 | struct fileid fileid, |
| 380 | CACHEFILE *cfptr |
| 381 | ) { |
| 382 | // File is not open. Make a new cachefile. |
| 383 | CACHEFILE newcf = NULL; |
| 384 | XCALLOC(newcf); |
| 385 | newcf->cachetable = ct; |
| 386 | newcf->hash_id = hash_id; |
| 387 | newcf->fileid = fileid; |
| 388 | |
| 389 | newcf->filenum = filenum; |
| 390 | newcf->fd = fd; |
| 391 | newcf->fname_in_env = toku_xstrdup(fname_in_env); |
| 392 | bjm_init(&newcf->bjm); |
| 393 | *cfptr = newcf; |
| 394 | } |
| 395 | |
| 396 | int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, |
| 397 | const char *fname_in_env, |
| 398 | FILENUM filenum, bool* was_open) { |
| 399 | int r; |
| 400 | CACHEFILE newcf; |
| 401 | struct fileid fileid; |
| 402 | |
| 403 | assert(filenum.fileid != FILENUM_NONE.fileid); |
| 404 | r = toku_os_get_unique_file_id(fd, &fileid); |
| 405 | if (r != 0) { |
| 406 | r = get_error_errno(); |
| 407 | close(fd); |
| 408 | return r; |
| 409 | } |
| 410 | ct->cf_list.write_lock(); |
| 411 | CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid); |
| 412 | if (existing_cf) { |
| 413 | *was_open = true; |
| 414 | // Reuse an existing cachefile and close the caller's fd, whose |
| 415 | // responsibility has been passed to us. |
| 416 | r = close(fd); |
| 417 | assert(r == 0); |
| 418 | *cfptr = existing_cf; |
| 419 | r = 0; |
| 420 | goto exit; |
| 421 | } |
| 422 | *was_open = false; |
| 423 | ct->cf_list.verify_unused_filenum(filenum); |
| 424 | // now let's try to find it in the stale cachefiles |
| 425 | existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid); |
| 426 | // found the stale file, |
| 427 | if (existing_cf) { |
| 428 | // fix up the fields in the cachefile |
| 429 | existing_cf->filenum = filenum; |
| 430 | existing_cf->fd = fd; |
| 431 | existing_cf->fname_in_env = toku_xstrdup(fname_in_env); |
| 432 | bjm_init(&existing_cf->bjm); |
| 433 | |
| 434 | // now we need to move all the PAIRs in it back into the cachetable |
| 435 | ct->list.write_list_lock(); |
| 436 | for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) { |
| 437 | pair_lock(curr_pair); |
| 438 | ct->list.add_to_cachetable_only(curr_pair); |
| 439 | pair_unlock(curr_pair); |
| 440 | } |
| 441 | ct->list.write_list_unlock(); |
| 442 | // move the cachefile back to the list of active cachefiles |
| 443 | ct->cf_list.remove_stale_cf_unlocked(existing_cf); |
| 444 | ct->cf_list.add_cf_unlocked(existing_cf); |
| 445 | *cfptr = existing_cf; |
| 446 | r = 0; |
| 447 | goto exit; |
| 448 | } |
| 449 | |
| 450 | create_new_cachefile( |
| 451 | ct, |
| 452 | filenum, |
| 453 | ct->cf_list.get_new_hash_id_unlocked(), |
| 454 | fd, |
| 455 | fname_in_env, |
| 456 | fileid, |
| 457 | &newcf |
| 458 | ); |
| 459 | |
| 460 | ct->cf_list.add_cf_unlocked(newcf); |
| 461 | |
| 462 | *cfptr = newcf; |
| 463 | r = 0; |
| 464 | exit: |
| 465 | ct->cf_list.write_unlock(); |
| 466 | return r; |
| 467 | } |
| 468 | |
| 469 | static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely); |
| 470 | |
| 471 | //TEST_ONLY_FUNCTION |
| 472 | int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) { |
| 473 | char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env); |
| 474 | int fd = open(fname_in_cwd, flags+O_BINARY, mode); |
| 475 | int r; |
| 476 | if (fd < 0) { |
| 477 | r = get_error_errno(); |
| 478 | } else { |
| 479 | r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env); |
| 480 | } |
| 481 | toku_free(fname_in_cwd); |
| 482 | return r; |
| 483 | } |
| 484 | |
| 485 | char * |
| 486 | toku_cachefile_fname_in_env (CACHEFILE cf) { |
| 487 | if (cf) { |
| 488 | return cf->fname_in_env; |
| 489 | } |
| 490 | return nullptr; |
| 491 | } |
| 492 | |
| 493 | void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) { |
| 494 | cf->fname_in_env = new_fname_in_env; |
| 495 | } |
| 496 | |
| 497 | int |
| 498 | toku_cachefile_get_fd (CACHEFILE cf) { |
| 499 | return cf->fd; |
| 500 | } |
| 501 | |
| 502 | static void cachefile_destroy(CACHEFILE cf) { |
| 503 | if (cf->free_userdata) { |
| 504 | cf->free_userdata(cf, cf->userdata); |
| 505 | } |
| 506 | toku_free(cf); |
| 507 | } |
| 508 | |
| 509 | void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { |
| 510 | CACHEFILE cf = *cfp; |
| 511 | CACHETABLE ct = cf->cachetable; |
| 512 | |
| 513 | bjm_wait_for_jobs_to_finish(cf->bjm); |
| 514 | |
| 515 | // Clients should never attempt to close a cachefile that is being |
| 516 | // checkpointed. We notify clients this is happening in the |
| 517 | // note_pin_by_checkpoint callback. |
| 518 | assert(!cf->for_checkpoint); |
| 519 | |
| 520 | // Flush the cachefile and remove all of its pairs from the cachetable, |
| 521 | // but keep the PAIRs linked in the cachefile. We will store the cachefile |
| 522 | // away in case it gets opened immedietely |
| 523 | // |
| 524 | // if we are unlinking on close, then we want to evict completely, |
| 525 | // otherwise, we will keep the PAIRs and cachefile around in case |
| 526 | // a subsequent open comes soon |
| 527 | cachetable_flush_cachefile(ct, cf, cf->unlink_on_close); |
| 528 | |
| 529 | // Call the close userdata callback to notify the client this cachefile |
| 530 | // and its underlying file are going to be closed |
| 531 | if (cf->close_userdata) { |
| 532 | cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn); |
| 533 | } |
| 534 | // fsync and close the fd. |
| 535 | toku_file_fsync_without_accounting(cf->fd); |
| 536 | int r = close(cf->fd); |
| 537 | assert(r == 0); |
| 538 | cf->fd = -1; |
| 539 | |
| 540 | // destroy the parts of the cachefile |
| 541 | // that do not persist across opens/closes |
| 542 | bjm_destroy(cf->bjm); |
| 543 | cf->bjm = NULL; |
| 544 | |
| 545 | // remove the cf from the list of active cachefiles |
| 546 | ct->cf_list.remove_cf(cf); |
| 547 | cf->filenum = FILENUM_NONE; |
| 548 | |
| 549 | // Unlink the file if the bit was set |
| 550 | if (cf->unlink_on_close) { |
| 551 | char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env); |
| 552 | r = unlink(fname_in_cwd); |
| 553 | assert_zero(r); |
| 554 | toku_free(fname_in_cwd); |
| 555 | } |
| 556 | toku_free(cf->fname_in_env); |
| 557 | cf->fname_in_env = NULL; |
| 558 | |
| 559 | // we destroy the cf if the unlink bit was set or if no PAIRs exist |
| 560 | // if no PAIRs exist, there is no sense in keeping the cachefile around |
| 561 | bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL); |
| 562 | if (destroy_cf) { |
| 563 | cachefile_destroy(cf); |
| 564 | } |
| 565 | else { |
| 566 | ct->cf_list.add_stale_cf(cf); |
| 567 | } |
| 568 | } |
| 569 | |
| 570 | // This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c |
| 571 | // The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number. |
| 572 | // Instead we can use a bitmask on a table of size power of two. |
| 573 | // This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan |
| 574 | static inline uint32_t rot(uint32_t x, uint32_t k) { |
| 575 | return (x<<k) | (x>>(32-k)); |
| 576 | } |
| 577 | static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) { |
| 578 | c ^= b; c -= rot(b,14); |
| 579 | a ^= c; a -= rot(c,11); |
| 580 | b ^= a; b -= rot(a,25); |
| 581 | c ^= b; c -= rot(b,16); |
| 582 | a ^= c; a -= rot(c,4); |
| 583 | b ^= a; b -= rot(a,14); |
| 584 | c ^= b; c -= rot(b,24); |
| 585 | return c; |
| 586 | } |
| 587 | |
| 588 | uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key) |
| 589 | // Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two. |
| 590 | { |
| 591 | return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b); |
| 592 | } |
| 593 | |
| 594 | #define CLOCK_SATURATION 15 |
| 595 | #define CLOCK_INITIAL_COUNT 3 |
| 596 | |
| 597 | // Requires pair's mutex to be held |
| 598 | static void pair_touch (PAIR p) { |
| 599 | p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION; |
| 600 | } |
| 601 | |
| 602 | // Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held |
| 603 | // Effects: the pair is removed from the LRU list and from the cachetable's hash table. |
| 604 | // The size of the objects in the cachetable is adjusted by the size of the pair being |
| 605 | // removed. |
| 606 | static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) { |
| 607 | list->evict_completely(p); |
| 608 | ev->remove_pair_attr(p->attr); |
| 609 | } |
| 610 | |
| 611 | static void cachetable_free_pair(PAIR p) { |
| 612 | CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; |
| 613 | CACHEKEY key = p->key; |
| 614 | void *value = p->value_data; |
| 615 | void* disk_data = p->disk_data; |
| 616 | void * = p->write_extraargs; |
| 617 | PAIR_ATTR old_attr = p->attr; |
| 618 | |
| 619 | cachetable_evictions++; |
| 620 | PAIR_ATTR new_attr = p->attr; |
| 621 | // Note that flush_callback is called with write_me false, so the only purpose of this |
| 622 | // call is to tell the ft layer to evict the node (keep_me is false). |
| 623 | // Also, because we have already removed the PAIR from the cachetable in |
| 624 | // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd |
| 625 | // for the first two parameters, as these may be invalid (#5171), so, we |
| 626 | // pass in NULL and -1, dummy values |
| 627 | flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false); |
| 628 | |
| 629 | ctpair_destroy(p); |
| 630 | } |
| 631 | |
| 632 | // assumes value_rwlock and disk_nb_mutex held on entry |
| 633 | // responsibility of this function is to only write a locked PAIR to disk |
| 634 | // and NOTHING else. We do not manipulate the state of the PAIR |
| 635 | // of the cachetable here (with the exception of ct->size_current for clones) |
| 636 | // |
| 637 | // No pair_list lock should be held, and the PAIR mutex should not be held |
| 638 | // |
| 639 | static void cachetable_only_write_locked_data( |
| 640 | evictor* ev, |
| 641 | PAIR p, |
| 642 | bool for_checkpoint, |
| 643 | PAIR_ATTR* new_attr, |
| 644 | bool is_clone |
| 645 | ) |
| 646 | { |
| 647 | CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; |
| 648 | CACHEFILE cachefile = p->cachefile; |
| 649 | CACHEKEY key = p->key; |
| 650 | void *value = is_clone ? p->cloned_value_data : p->value_data; |
| 651 | void *disk_data = p->disk_data; |
| 652 | void * = p->write_extraargs; |
| 653 | PAIR_ATTR old_attr; |
| 654 | // we do this for drd. If we are a cloned pair and only |
| 655 | // have the disk_nb_mutex, it is a race to access p->attr. |
| 656 | // Luckily, old_attr here is only used for some test applications, |
| 657 | // so inaccurate non-size fields are ok. |
| 658 | if (is_clone) { |
| 659 | old_attr = make_pair_attr(p->cloned_value_size); |
| 660 | } |
| 661 | else { |
| 662 | old_attr = p->attr; |
| 663 | } |
| 664 | bool dowrite = true; |
| 665 | |
| 666 | // write callback |
| 667 | flush_callback( |
| 668 | cachefile, |
| 669 | cachefile->fd, |
| 670 | key, |
| 671 | value, |
| 672 | &disk_data, |
| 673 | write_extraargs, |
| 674 | old_attr, |
| 675 | new_attr, |
| 676 | dowrite, |
| 677 | is_clone ? false : true, // keep_me (only keep if this is not cloned pointer) |
| 678 | for_checkpoint, |
| 679 | is_clone //is_clone |
| 680 | ); |
| 681 | p->disk_data = disk_data; |
| 682 | if (is_clone) { |
| 683 | p->cloned_value_data = NULL; |
| 684 | ev->remove_cloned_data_size(p->cloned_value_size); |
| 685 | p->cloned_value_size = 0; |
| 686 | } |
| 687 | } |
| 688 | |
| 689 | |
| 690 | // |
| 691 | // This function writes a PAIR's value out to disk. Currently, it is called |
| 692 | // by get_and_pin functions that write a PAIR out for checkpoint, by |
| 693 | // evictor threads that evict dirty PAIRS, and by the checkpoint thread |
| 694 | // that needs to write out a dirty node for checkpoint. |
| 695 | // |
| 696 | // Requires on entry for p->mutex to NOT be held, otherwise |
| 697 | // calling cachetable_only_write_locked_data will be very expensive |
| 698 | // |
| 699 | static void cachetable_write_locked_pair( |
| 700 | evictor* ev, |
| 701 | PAIR p, |
| 702 | bool for_checkpoint |
| 703 | ) |
| 704 | { |
| 705 | PAIR_ATTR old_attr = p->attr; |
| 706 | PAIR_ATTR new_attr = p->attr; |
| 707 | // grabbing the disk_nb_mutex here ensures that |
| 708 | // after this point, no one is writing out a cloned value |
| 709 | // if we grab the disk_nb_mutex inside the if clause, |
| 710 | // then we may try to evict a PAIR that is in the process |
| 711 | // of having its clone be written out |
| 712 | pair_lock(p); |
| 713 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 714 | pair_unlock(p); |
| 715 | // make sure that assumption about cloned_value_data is true |
| 716 | // if we have grabbed the disk_nb_mutex, then that means that |
| 717 | // there should be no cloned value data |
| 718 | assert(p->cloned_value_data == NULL); |
| 719 | if (p->dirty) { |
| 720 | cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false); |
| 721 | // |
| 722 | // now let's update variables |
| 723 | // |
| 724 | if (new_attr.is_valid) { |
| 725 | p->attr = new_attr; |
| 726 | ev->change_pair_attr(old_attr, new_attr); |
| 727 | } |
| 728 | } |
| 729 | // the pair is no longer dirty once written |
| 730 | p->dirty = CACHETABLE_CLEAN; |
| 731 | pair_lock(p); |
| 732 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 733 | pair_unlock(p); |
| 734 | } |
| 735 | |
| 736 | // Worker thread function to writes and evicts a pair from memory to its cachefile |
| 737 | static void cachetable_evicter(void* ) { |
| 738 | PAIR p = (PAIR)extra; |
| 739 | pair_list* pl = p->list; |
| 740 | CACHEFILE cf = p->cachefile; |
| 741 | pl->read_pending_exp_lock(); |
| 742 | bool for_checkpoint = p->checkpoint_pending; |
| 743 | p->checkpoint_pending = false; |
| 744 | // per the contract of evictor::evict_pair, |
| 745 | // the pair's mutex, p->mutex, must be held on entry |
| 746 | pair_lock(p); |
| 747 | p->ev->evict_pair(p, for_checkpoint); |
| 748 | pl->read_pending_exp_unlock(); |
| 749 | bjm_remove_background_job(cf->bjm); |
| 750 | } |
| 751 | |
| 752 | static void cachetable_partial_eviction(void* ) { |
| 753 | PAIR p = (PAIR)extra; |
| 754 | CACHEFILE cf = p->cachefile; |
| 755 | p->ev->do_partial_eviction(p); |
| 756 | bjm_remove_background_job(cf->bjm); |
| 757 | } |
| 758 | |
| 759 | void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) { |
| 760 | void* old_value = old_pair->value_data; |
| 761 | void* new_value = new_pair->value_data; |
| 762 | old_pair->value_data = new_value; |
| 763 | new_pair->value_data = old_value; |
| 764 | } |
| 765 | |
| 766 | void toku_cachetable_maybe_flush_some(CACHETABLE ct) { |
| 767 | // TODO: <CER> Maybe move this... |
| 768 | ct->ev.signal_eviction_thread(); |
| 769 | } |
| 770 | |
| 771 | // Initializes a pair's members. |
| 772 | // |
| 773 | void pair_init(PAIR p, |
| 774 | CACHEFILE cachefile, |
| 775 | CACHEKEY key, |
| 776 | void *value, |
| 777 | PAIR_ATTR attr, |
| 778 | enum cachetable_dirty dirty, |
| 779 | uint32_t fullhash, |
| 780 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 781 | evictor *ev, |
| 782 | pair_list *list) |
| 783 | { |
| 784 | p->cachefile = cachefile; |
| 785 | p->key = key; |
| 786 | p->value_data = value; |
| 787 | p->cloned_value_data = NULL; |
| 788 | p->cloned_value_size = 0; |
| 789 | p->disk_data = NULL; |
| 790 | p->attr = attr; |
| 791 | p->dirty = dirty; |
| 792 | p->fullhash = fullhash; |
| 793 | |
| 794 | p->flush_callback = write_callback.flush_callback; |
| 795 | p->pe_callback = write_callback.pe_callback; |
| 796 | p->pe_est_callback = write_callback.pe_est_callback; |
| 797 | p->cleaner_callback = write_callback.cleaner_callback; |
| 798 | p->clone_callback = write_callback.clone_callback; |
| 799 | p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback; |
| 800 | p->write_extraargs = write_callback.write_extraargs; |
| 801 | |
| 802 | p->count = 0; // <CER> Is zero the correct init value? |
| 803 | p->refcount = 0; |
| 804 | p->num_waiting_on_refs = 0; |
| 805 | toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr); |
| 806 | p->checkpoint_pending = false; |
| 807 | |
| 808 | p->mutex = list->get_mutex_for_pair(fullhash); |
| 809 | assert(p->mutex); |
| 810 | p->value_rwlock.init(p->mutex |
| 811 | #ifdef TOKU_MYSQL_WITH_PFS |
| 812 | , |
| 813 | *cachetable_value_key |
| 814 | #endif |
| 815 | ); |
| 816 | nb_mutex_init(*cachetable_disk_nb_mutex_key, |
| 817 | *cachetable_disk_nb_rwlock_key, |
| 818 | &p->disk_nb_mutex); |
| 819 | |
| 820 | p->size_evicting_estimate = 0; // <CER> Is zero the correct init value? |
| 821 | |
| 822 | p->ev = ev; |
| 823 | p->list = list; |
| 824 | |
| 825 | p->clock_next = p->clock_prev = NULL; |
| 826 | p->pending_next = p->pending_prev = NULL; |
| 827 | p->cf_next = p->cf_prev = NULL; |
| 828 | p->hash_chain = NULL; |
| 829 | } |
| 830 | |
| 831 | // has ct locked on entry |
| 832 | // This function MUST NOT release and reacquire the cachetable lock |
| 833 | // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. |
| 834 | // |
| 835 | // Requires pair list's write lock to be held on entry. |
| 836 | // the pair's mutex must be held as wel |
| 837 | // |
| 838 | // |
| 839 | static PAIR cachetable_insert_at(CACHETABLE ct, |
| 840 | CACHEFILE cachefile, CACHEKEY key, void *value, |
| 841 | uint32_t fullhash, |
| 842 | PAIR_ATTR attr, |
| 843 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 844 | enum cachetable_dirty dirty) { |
| 845 | PAIR MALLOC(p); |
| 846 | assert(p); |
| 847 | memset(p, 0, sizeof *p); |
| 848 | pair_init(p, |
| 849 | cachefile, |
| 850 | key, |
| 851 | value, |
| 852 | attr, |
| 853 | dirty, |
| 854 | fullhash, |
| 855 | write_callback, |
| 856 | &ct->ev, |
| 857 | &ct->list |
| 858 | ); |
| 859 | |
| 860 | ct->list.put(p); |
| 861 | ct->ev.add_pair_attr(attr); |
| 862 | return p; |
| 863 | } |
| 864 | |
| 865 | // on input, the write list lock must be held AND |
| 866 | // the pair's mutex must be held as wel |
| 867 | static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) { |
| 868 | ct->list.put(p); |
| 869 | ct->ev.add_pair_attr(attr); |
| 870 | } |
| 871 | |
| 872 | |
| 873 | // has ct locked on entry |
| 874 | // This function MUST NOT release and reacquire the cachetable lock |
| 875 | // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. |
| 876 | // |
| 877 | // Requires pair list's write lock to be held on entry |
| 878 | // |
| 879 | static void cachetable_put_internal( |
| 880 | CACHEFILE cachefile, |
| 881 | PAIR p, |
| 882 | void *value, |
| 883 | PAIR_ATTR attr, |
| 884 | CACHETABLE_PUT_CALLBACK put_callback |
| 885 | ) |
| 886 | { |
| 887 | CACHETABLE ct = cachefile->cachetable; |
| 888 | // |
| 889 | // |
| 890 | // TODO: (Zardosht), make code run in debug only |
| 891 | // |
| 892 | // |
| 893 | //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash); |
| 894 | //invariant_null(dummy_p); |
| 895 | cachetable_insert_pair_at(ct, p, attr); |
| 896 | invariant_notnull(put_callback); |
| 897 | put_callback(p->key, value, p); |
| 898 | } |
| 899 | |
| 900 | // Pair mutex (p->mutex) is may or may not be held on entry, |
| 901 | // Holding the pair mutex on entry is not important |
| 902 | // for performance or corrrectness |
| 903 | // Pair is pinned on entry |
| 904 | static void |
| 905 | clone_pair(evictor* ev, PAIR p) { |
| 906 | PAIR_ATTR old_attr = p->attr; |
| 907 | PAIR_ATTR new_attr; |
| 908 | long clone_size = 0; |
| 909 | |
| 910 | // act of cloning should be fast, |
| 911 | // not sure if we have to release |
| 912 | // and regrab the cachetable lock, |
| 913 | // but doing it for now |
| 914 | p->clone_callback( |
| 915 | p->value_data, |
| 916 | &p->cloned_value_data, |
| 917 | &clone_size, |
| 918 | &new_attr, |
| 919 | true, |
| 920 | p->write_extraargs |
| 921 | ); |
| 922 | |
| 923 | // now we need to do the same actions we would do |
| 924 | // if the PAIR had been written to disk |
| 925 | // |
| 926 | // because we hold the value_rwlock, |
| 927 | // it doesn't matter whether we clear |
| 928 | // the pending bit before the clone |
| 929 | // or after the clone |
| 930 | p->dirty = CACHETABLE_CLEAN; |
| 931 | if (new_attr.is_valid) { |
| 932 | p->attr = new_attr; |
| 933 | ev->change_pair_attr(old_attr, new_attr); |
| 934 | } |
| 935 | p->cloned_value_size = clone_size; |
| 936 | ev->add_cloned_data_size(p->cloned_value_size); |
| 937 | } |
| 938 | |
| 939 | static void checkpoint_cloned_pair(void* ) { |
| 940 | PAIR p = (PAIR)extra; |
| 941 | CACHETABLE ct = p->cachefile->cachetable; |
| 942 | PAIR_ATTR new_attr; |
| 943 | // note that pending lock is not needed here because |
| 944 | // we KNOW we are in the middle of a checkpoint |
| 945 | // and that a begin_checkpoint cannot happen |
| 946 | cachetable_only_write_locked_data( |
| 947 | p->ev, |
| 948 | p, |
| 949 | true, //for_checkpoint |
| 950 | &new_attr, |
| 951 | true //is_clone |
| 952 | ); |
| 953 | pair_lock(p); |
| 954 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 955 | pair_unlock(p); |
| 956 | ct->cp.remove_background_job(); |
| 957 | } |
| 958 | |
| 959 | static void |
| 960 | checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { |
| 961 | toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p); |
| 962 | } |
| 963 | |
| 964 | |
| 965 | // |
| 966 | // Given a PAIR p with the value_rwlock altready held, do the following: |
| 967 | // - If the PAIR needs to be written out to disk for checkpoint: |
| 968 | // - If the PAIR is cloneable, clone the PAIR and place the work |
| 969 | // of writing the PAIR on a background thread. |
| 970 | // - If the PAIR is not cloneable, write the PAIR to disk for checkpoint |
| 971 | // on the current thread |
| 972 | // |
| 973 | // On entry, pair's mutex is NOT held |
| 974 | // |
| 975 | static void |
| 976 | write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending) |
| 977 | { |
| 978 | if (checkpoint_pending && p->checkpoint_complete_callback) { |
| 979 | p->checkpoint_complete_callback(p->value_data); |
| 980 | } |
| 981 | if (p->dirty && checkpoint_pending) { |
| 982 | if (p->clone_callback) { |
| 983 | pair_lock(p); |
| 984 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 985 | pair_unlock(p); |
| 986 | assert(!p->cloned_value_data); |
| 987 | clone_pair(&ct->ev, p); |
| 988 | assert(p->cloned_value_data); |
| 989 | // place it on the background thread and continue |
| 990 | // responsibility of writer thread to release disk_nb_mutex |
| 991 | ct->cp.add_background_job(); |
| 992 | checkpoint_cloned_pair_on_writer_thread(ct, p); |
| 993 | } |
| 994 | else { |
| 995 | // The pair is not cloneable, just write the pair to disk |
| 996 | // we already have p->value_rwlock and we just do the write in our own thread. |
| 997 | cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock |
| 998 | } |
| 999 | } |
| 1000 | } |
| 1001 | |
| 1002 | // On entry and exit: hold the pair's mutex (p->mutex) |
| 1003 | // Method: take write lock |
| 1004 | // maybe write out the node |
| 1005 | // Else release write lock |
| 1006 | // |
| 1007 | static void |
| 1008 | write_pair_for_checkpoint_thread (evictor* ev, PAIR p) |
| 1009 | { |
| 1010 | // Grab an exclusive lock on the pair. |
| 1011 | // If we grab an expensive lock, then other threads will return |
| 1012 | // TRY_AGAIN rather than waiting. In production, the only time |
| 1013 | // another thread will check if grabbing a lock is expensive is when |
| 1014 | // we have a clone_callback (FTNODEs), so the act of checkpointing |
| 1015 | // will be cheap. Also, much of the time we'll just be clearing |
| 1016 | // pending bits and that's definitely cheap. (see #5427) |
| 1017 | p->value_rwlock.write_lock(false); |
| 1018 | if (p->checkpoint_pending && p->checkpoint_complete_callback) { |
| 1019 | p->checkpoint_complete_callback(p->value_data); |
| 1020 | } |
| 1021 | if (p->dirty && p->checkpoint_pending) { |
| 1022 | if (p->clone_callback) { |
| 1023 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 1024 | assert(!p->cloned_value_data); |
| 1025 | clone_pair(ev, p); |
| 1026 | assert(p->cloned_value_data); |
| 1027 | } |
| 1028 | else { |
| 1029 | // The pair is not cloneable, just write the pair to disk |
| 1030 | // we already have p->value_rwlock and we just do the write in our own thread. |
| 1031 | // this will grab and release disk_nb_mutex |
| 1032 | pair_unlock(p); |
| 1033 | cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock |
| 1034 | pair_lock(p); |
| 1035 | } |
| 1036 | p->checkpoint_pending = false; |
| 1037 | |
| 1038 | // now release value_rwlock, before we write the PAIR out |
| 1039 | // so that the PAIR is available to client threads |
| 1040 | p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves. |
| 1041 | if (p->clone_callback) { |
| 1042 | // note that pending lock is not needed here because |
| 1043 | // we KNOW we are in the middle of a checkpoint |
| 1044 | // and that a begin_checkpoint cannot happen |
| 1045 | PAIR_ATTR attr; |
| 1046 | pair_unlock(p); |
| 1047 | cachetable_only_write_locked_data( |
| 1048 | ev, |
| 1049 | p, |
| 1050 | true, //for_checkpoint |
| 1051 | &attr, |
| 1052 | true //is_clone |
| 1053 | ); |
| 1054 | pair_lock(p); |
| 1055 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 1056 | } |
| 1057 | } |
| 1058 | else { |
| 1059 | // |
| 1060 | // we may clear the pending bit here because we have |
| 1061 | // both the cachetable lock and the PAIR lock. |
| 1062 | // The rule, as mentioned in toku_cachetable_begin_checkpoint, |
| 1063 | // is that to clear the bit, we must have both the PAIR lock |
| 1064 | // and the pending lock |
| 1065 | // |
| 1066 | p->checkpoint_pending = false; |
| 1067 | p->value_rwlock.write_unlock(); |
| 1068 | } |
| 1069 | } |
| 1070 | |
| 1071 | // |
| 1072 | // For each PAIR associated with these CACHEFILEs and CACHEKEYs |
| 1073 | // if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR |
| 1074 | // to disk. |
| 1075 | // We assume the PAIRs passed in have been locked by the client that made calls |
| 1076 | // into the cachetable that eventually make it here. |
| 1077 | // |
| 1078 | static void checkpoint_dependent_pairs( |
| 1079 | CACHETABLE ct, |
| 1080 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
| 1081 | PAIR* dependent_pairs, |
| 1082 | bool* checkpoint_pending, |
| 1083 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
| 1084 | ) |
| 1085 | { |
| 1086 | for (uint32_t i =0; i < num_dependent_pairs; i++) { |
| 1087 | PAIR curr_dep_pair = dependent_pairs[i]; |
| 1088 | // we need to update the dirtyness of the dependent pair, |
| 1089 | // because the client may have dirtied it while holding its lock, |
| 1090 | // and if the pair is pending a checkpoint, it needs to be written out |
| 1091 | if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY; |
| 1092 | if (checkpoint_pending[i]) { |
| 1093 | write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]); |
| 1094 | } |
| 1095 | } |
| 1096 | } |
| 1097 | |
| 1098 | void toku_cachetable_put_with_dep_pairs( |
| 1099 | CACHEFILE cachefile, |
| 1100 | CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, |
| 1101 | void *value, |
| 1102 | PAIR_ATTR attr, |
| 1103 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 1104 | void *get_key_and_fullhash_extra, |
| 1105 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
| 1106 | PAIR* dependent_pairs, |
| 1107 | enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs |
| 1108 | CACHEKEY* key, |
| 1109 | uint32_t* fullhash, |
| 1110 | CACHETABLE_PUT_CALLBACK put_callback |
| 1111 | ) |
| 1112 | { |
| 1113 | // |
| 1114 | // need to get the key and filehash |
| 1115 | // |
| 1116 | CACHETABLE ct = cachefile->cachetable; |
| 1117 | if (ct->ev.should_client_thread_sleep()) { |
| 1118 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 1119 | } |
| 1120 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 1121 | ct->ev.signal_eviction_thread(); |
| 1122 | } |
| 1123 | |
| 1124 | PAIR p = NULL; |
| 1125 | XMALLOC(p); |
| 1126 | memset(p, 0, sizeof *p); |
| 1127 | |
| 1128 | ct->list.write_list_lock(); |
| 1129 | get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); |
| 1130 | pair_init( |
| 1131 | p, |
| 1132 | cachefile, |
| 1133 | *key, |
| 1134 | value, |
| 1135 | attr, |
| 1136 | CACHETABLE_DIRTY, |
| 1137 | *fullhash, |
| 1138 | write_callback, |
| 1139 | &ct->ev, |
| 1140 | &ct->list |
| 1141 | ); |
| 1142 | pair_lock(p); |
| 1143 | p->value_rwlock.write_lock(true); |
| 1144 | cachetable_put_internal( |
| 1145 | cachefile, |
| 1146 | p, |
| 1147 | value, |
| 1148 | attr, |
| 1149 | put_callback |
| 1150 | ); |
| 1151 | pair_unlock(p); |
| 1152 | bool checkpoint_pending[num_dependent_pairs]; |
| 1153 | ct->list.write_pending_cheap_lock(); |
| 1154 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
| 1155 | checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
| 1156 | dependent_pairs[i]->checkpoint_pending = false; |
| 1157 | } |
| 1158 | ct->list.write_pending_cheap_unlock(); |
| 1159 | ct->list.write_list_unlock(); |
| 1160 | |
| 1161 | // |
| 1162 | // now that we have inserted the row, let's checkpoint the |
| 1163 | // dependent nodes, if they need checkpointing |
| 1164 | // |
| 1165 | checkpoint_dependent_pairs( |
| 1166 | ct, |
| 1167 | num_dependent_pairs, |
| 1168 | dependent_pairs, |
| 1169 | checkpoint_pending, |
| 1170 | dependent_dirty |
| 1171 | ); |
| 1172 | } |
| 1173 | |
| 1174 | void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr, |
| 1175 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 1176 | CACHETABLE_PUT_CALLBACK put_callback |
| 1177 | ) { |
| 1178 | CACHETABLE ct = cachefile->cachetable; |
| 1179 | if (ct->ev.should_client_thread_sleep()) { |
| 1180 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 1181 | } |
| 1182 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 1183 | ct->ev.signal_eviction_thread(); |
| 1184 | } |
| 1185 | |
| 1186 | PAIR p = NULL; |
| 1187 | XMALLOC(p); |
| 1188 | memset(p, 0, sizeof *p); |
| 1189 | |
| 1190 | ct->list.write_list_lock(); |
| 1191 | pair_init( |
| 1192 | p, |
| 1193 | cachefile, |
| 1194 | key, |
| 1195 | value, |
| 1196 | attr, |
| 1197 | CACHETABLE_DIRTY, |
| 1198 | fullhash, |
| 1199 | write_callback, |
| 1200 | &ct->ev, |
| 1201 | &ct->list |
| 1202 | ); |
| 1203 | pair_lock(p); |
| 1204 | p->value_rwlock.write_lock(true); |
| 1205 | cachetable_put_internal( |
| 1206 | cachefile, |
| 1207 | p, |
| 1208 | value, |
| 1209 | attr, |
| 1210 | put_callback |
| 1211 | ); |
| 1212 | pair_unlock(p); |
| 1213 | ct->list.write_list_unlock(); |
| 1214 | } |
| 1215 | |
| 1216 | static uint64_t get_tnow(void) { |
| 1217 | struct timeval tv; |
| 1218 | int r = gettimeofday(&tv, NULL); assert(r == 0); |
| 1219 | return tv.tv_sec * 1000000ULL + tv.tv_usec; |
| 1220 | } |
| 1221 | |
| 1222 | // |
| 1223 | // cachetable lock and PAIR lock are held on entry |
| 1224 | // On exit, cachetable lock is still held, but PAIR lock |
| 1225 | // is either released. |
| 1226 | // |
| 1227 | // No locks are held on entry (besides the rwlock write lock of the PAIR) |
| 1228 | // |
| 1229 | static void |
| 1230 | do_partial_fetch( |
| 1231 | CACHETABLE ct, |
| 1232 | CACHEFILE cachefile, |
| 1233 | PAIR p, |
| 1234 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 1235 | void *, |
| 1236 | bool keep_pair_locked |
| 1237 | ) |
| 1238 | { |
| 1239 | PAIR_ATTR old_attr = p->attr; |
| 1240 | PAIR_ATTR new_attr = zero_attr; |
| 1241 | // As of Dr. No, only clean PAIRs may have pieces missing, |
| 1242 | // so we do a sanity check here. |
| 1243 | assert(!p->dirty); |
| 1244 | |
| 1245 | pair_lock(p); |
| 1246 | invariant(p->value_rwlock.writers()); |
| 1247 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 1248 | pair_unlock(p); |
| 1249 | int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr); |
| 1250 | lazy_assert_zero(r); |
| 1251 | p->attr = new_attr; |
| 1252 | ct->ev.change_pair_attr(old_attr, new_attr); |
| 1253 | pair_lock(p); |
| 1254 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 1255 | if (!keep_pair_locked) { |
| 1256 | p->value_rwlock.write_unlock(); |
| 1257 | } |
| 1258 | pair_unlock(p); |
| 1259 | } |
| 1260 | |
| 1261 | void toku_cachetable_pf_pinned_pair( |
| 1262 | void* value, |
| 1263 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 1264 | void* , |
| 1265 | CACHEFILE cf, |
| 1266 | CACHEKEY key, |
| 1267 | uint32_t fullhash |
| 1268 | ) |
| 1269 | { |
| 1270 | PAIR_ATTR attr; |
| 1271 | PAIR p = NULL; |
| 1272 | CACHETABLE ct = cf->cachetable; |
| 1273 | ct->list.pair_lock_by_fullhash(fullhash); |
| 1274 | p = ct->list.find_pair(cf, key, fullhash); |
| 1275 | assert(p != NULL); |
| 1276 | assert(p->value_data == value); |
| 1277 | assert(p->value_rwlock.writers()); |
| 1278 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 1279 | pair_unlock(p); |
| 1280 | |
| 1281 | int fd = cf->fd; |
| 1282 | pf_callback(value, p->disk_data, read_extraargs, fd, &attr); |
| 1283 | |
| 1284 | pair_lock(p); |
| 1285 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 1286 | pair_unlock(p); |
| 1287 | } |
| 1288 | |
| 1289 | int toku_cachetable_get_and_pin ( |
| 1290 | CACHEFILE cachefile, |
| 1291 | CACHEKEY key, |
| 1292 | uint32_t fullhash, |
| 1293 | void**value, |
| 1294 | long *sizep, |
| 1295 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 1296 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
| 1297 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
| 1298 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 1299 | bool may_modify_value, |
| 1300 | void* // parameter for fetch_callback, pf_req_callback, and pf_callback |
| 1301 | ) |
| 1302 | { |
| 1303 | pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ; |
| 1304 | // We have separate parameters of read_extraargs and write_extraargs because |
| 1305 | // the lifetime of the two parameters are different. write_extraargs may be used |
| 1306 | // long after this function call (e.g. after a flush to disk), whereas read_extraargs |
| 1307 | // will not be used after this function returns. As a result, the caller may allocate |
| 1308 | // read_extraargs on the stack, whereas write_extraargs must be allocated |
| 1309 | // on the heap. |
| 1310 | return toku_cachetable_get_and_pin_with_dep_pairs ( |
| 1311 | cachefile, |
| 1312 | key, |
| 1313 | fullhash, |
| 1314 | value, |
| 1315 | sizep, |
| 1316 | write_callback, |
| 1317 | fetch_callback, |
| 1318 | pf_req_callback, |
| 1319 | pf_callback, |
| 1320 | lock_type, |
| 1321 | read_extraargs, |
| 1322 | 0, // number of dependent pairs that we may need to checkpoint |
| 1323 | NULL, // array of dependent pairs |
| 1324 | NULL // array stating dirty/cleanness of dependent pairs |
| 1325 | ); |
| 1326 | } |
| 1327 | |
| 1328 | // Read a pair from a cachefile into memory using the pair's fetch callback |
| 1329 | // on entry, pair mutex (p->mutex) is NOT held, but pair is pinned |
| 1330 | static void cachetable_fetch_pair( |
| 1331 | CACHETABLE ct, |
| 1332 | CACHEFILE cf, |
| 1333 | PAIR p, |
| 1334 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
| 1335 | void* , |
| 1336 | bool keep_pair_locked |
| 1337 | ) |
| 1338 | { |
| 1339 | // helgrind |
| 1340 | CACHEKEY key = p->key; |
| 1341 | uint32_t fullhash = p->fullhash; |
| 1342 | |
| 1343 | void *toku_value = NULL; |
| 1344 | void *disk_data = NULL; |
| 1345 | PAIR_ATTR attr; |
| 1346 | |
| 1347 | // FIXME this should be enum cachetable_dirty, right? |
| 1348 | int dirty = 0; |
| 1349 | |
| 1350 | pair_lock(p); |
| 1351 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 1352 | pair_unlock(p); |
| 1353 | |
| 1354 | int r; |
| 1355 | r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs); |
| 1356 | if (dirty) { |
| 1357 | p->dirty = CACHETABLE_DIRTY; |
| 1358 | } |
| 1359 | assert(r == 0); |
| 1360 | |
| 1361 | p->value_data = toku_value; |
| 1362 | p->disk_data = disk_data; |
| 1363 | p->attr = attr; |
| 1364 | ct->ev.add_pair_attr(attr); |
| 1365 | pair_lock(p); |
| 1366 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 1367 | if (!keep_pair_locked) { |
| 1368 | p->value_rwlock.write_unlock(); |
| 1369 | } |
| 1370 | pair_unlock(p); |
| 1371 | } |
| 1372 | |
| 1373 | static bool get_checkpoint_pending(PAIR p, pair_list* pl) { |
| 1374 | bool checkpoint_pending = false; |
| 1375 | pl->read_pending_cheap_lock(); |
| 1376 | checkpoint_pending = p->checkpoint_pending; |
| 1377 | p->checkpoint_pending = false; |
| 1378 | pl->read_pending_cheap_unlock(); |
| 1379 | return checkpoint_pending; |
| 1380 | } |
| 1381 | |
| 1382 | static void checkpoint_pair_and_dependent_pairs( |
| 1383 | CACHETABLE ct, |
| 1384 | PAIR p, |
| 1385 | bool p_is_pending_checkpoint, |
| 1386 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
| 1387 | PAIR* dependent_pairs, |
| 1388 | bool* dependent_pairs_pending_checkpoint, |
| 1389 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
| 1390 | ) |
| 1391 | { |
| 1392 | |
| 1393 | // |
| 1394 | // A checkpoint must not begin while we are checking dependent pairs or pending bits. |
| 1395 | // Here is why. |
| 1396 | // |
| 1397 | // Now that we have all of the locks on the pairs we |
| 1398 | // care about, we can take care of the necessary checkpointing. |
| 1399 | // For each pair, we simply need to write the pair if it is |
| 1400 | // pending a checkpoint. If no pair is pending a checkpoint, |
| 1401 | // then all of this work will be done with the cachetable lock held, |
| 1402 | // so we don't need to worry about a checkpoint beginning |
| 1403 | // in the middle of any operation below. If some pair |
| 1404 | // is pending a checkpoint, then the checkpoint thread |
| 1405 | // will not complete its current checkpoint until it can |
| 1406 | // successfully grab a lock on the pending pair and |
| 1407 | // remove it from its list of pairs pending a checkpoint. |
| 1408 | // This cannot be done until we release the lock |
| 1409 | // that we have, which is not done in this function. |
| 1410 | // So, the point is, it is impossible for a checkpoint |
| 1411 | // to begin while we write any of these locked pairs |
| 1412 | // for checkpoint, even though writing a pair releases |
| 1413 | // the cachetable lock. |
| 1414 | // |
| 1415 | write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint); |
| 1416 | |
| 1417 | checkpoint_dependent_pairs( |
| 1418 | ct, |
| 1419 | num_dependent_pairs, |
| 1420 | dependent_pairs, |
| 1421 | dependent_pairs_pending_checkpoint, |
| 1422 | dependent_dirty |
| 1423 | ); |
| 1424 | } |
| 1425 | |
| 1426 | static void unpin_pair(PAIR p, bool read_lock_grabbed) { |
| 1427 | if (read_lock_grabbed) { |
| 1428 | p->value_rwlock.read_unlock(); |
| 1429 | } |
| 1430 | else { |
| 1431 | p->value_rwlock.write_unlock(); |
| 1432 | } |
| 1433 | } |
| 1434 | |
| 1435 | |
| 1436 | // on input, the pair's mutex is held, |
| 1437 | // on output, the pair's mutex is not held. |
| 1438 | // if true, we must try again, and pair is not pinned |
| 1439 | // if false, we succeeded, the pair is pinned |
| 1440 | static bool try_pin_pair( |
| 1441 | PAIR p, |
| 1442 | CACHETABLE ct, |
| 1443 | CACHEFILE cachefile, |
| 1444 | pair_lock_type lock_type, |
| 1445 | uint32_t num_dependent_pairs, |
| 1446 | PAIR* dependent_pairs, |
| 1447 | enum cachetable_dirty* dependent_dirty, |
| 1448 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
| 1449 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 1450 | void* , |
| 1451 | bool already_slept |
| 1452 | ) |
| 1453 | { |
| 1454 | bool dep_checkpoint_pending[num_dependent_pairs]; |
| 1455 | bool try_again = true; |
| 1456 | bool expensive = (lock_type == PL_WRITE_EXPENSIVE); |
| 1457 | if (lock_type != PL_READ) { |
| 1458 | p->value_rwlock.write_lock(expensive); |
| 1459 | } |
| 1460 | else { |
| 1461 | p->value_rwlock.read_lock(); |
| 1462 | } |
| 1463 | pair_touch(p); |
| 1464 | pair_unlock(p); |
| 1465 | |
| 1466 | bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
| 1467 | |
| 1468 | if (partial_fetch_required) { |
| 1469 | toku::context pf_ctx(CTX_PARTIAL_FETCH); |
| 1470 | |
| 1471 | if (ct->ev.should_client_thread_sleep() && !already_slept) { |
| 1472 | pair_lock(p); |
| 1473 | unpin_pair(p, (lock_type == PL_READ)); |
| 1474 | pair_unlock(p); |
| 1475 | try_again = true; |
| 1476 | goto exit; |
| 1477 | } |
| 1478 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 1479 | ct->ev.signal_eviction_thread(); |
| 1480 | } |
| 1481 | // |
| 1482 | // Just because the PAIR exists does necessarily mean the all the data the caller requires |
| 1483 | // is in memory. A partial fetch may be required, which is evaluated above |
| 1484 | // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock |
| 1485 | // and then call a callback to retrieve what we need |
| 1486 | // |
| 1487 | assert(partial_fetch_required); |
| 1488 | // As of Dr. No, only clean PAIRs may have pieces missing, |
| 1489 | // so we do a sanity check here. |
| 1490 | assert(!p->dirty); |
| 1491 | |
| 1492 | if (lock_type == PL_READ) { |
| 1493 | pair_lock(p); |
| 1494 | p->value_rwlock.read_unlock(); |
| 1495 | p->value_rwlock.write_lock(true); |
| 1496 | pair_unlock(p); |
| 1497 | } |
| 1498 | else if (lock_type == PL_WRITE_CHEAP) { |
| 1499 | pair_lock(p); |
| 1500 | p->value_rwlock.write_unlock(); |
| 1501 | p->value_rwlock.write_lock(true); |
| 1502 | pair_unlock(p); |
| 1503 | } |
| 1504 | |
| 1505 | partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
| 1506 | if (partial_fetch_required) { |
| 1507 | do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true); |
| 1508 | } |
| 1509 | if (lock_type == PL_READ) { |
| 1510 | // |
| 1511 | // TODO: Zardosht, somehow ensure that a partial eviction cannot happen |
| 1512 | // between these two calls |
| 1513 | // |
| 1514 | pair_lock(p); |
| 1515 | p->value_rwlock.write_unlock(); |
| 1516 | p->value_rwlock.read_lock(); |
| 1517 | pair_unlock(p); |
| 1518 | } |
| 1519 | else if (lock_type == PL_WRITE_CHEAP) { |
| 1520 | pair_lock(p); |
| 1521 | p->value_rwlock.write_unlock(); |
| 1522 | p->value_rwlock.write_lock(false); |
| 1523 | pair_unlock(p); |
| 1524 | } |
| 1525 | // small hack here for #5439, |
| 1526 | // for queries, pf_req_callback does some work for the caller, |
| 1527 | // that information may be out of date after a write_unlock |
| 1528 | // followed by a relock, so we do it again. |
| 1529 | bool pf_required = pf_req_callback(p->value_data,read_extraargs); |
| 1530 | assert(!pf_required); |
| 1531 | } |
| 1532 | |
| 1533 | if (lock_type != PL_READ) { |
| 1534 | ct->list.read_pending_cheap_lock(); |
| 1535 | bool p_checkpoint_pending = p->checkpoint_pending; |
| 1536 | p->checkpoint_pending = false; |
| 1537 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
| 1538 | dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
| 1539 | dependent_pairs[i]->checkpoint_pending = false; |
| 1540 | } |
| 1541 | ct->list.read_pending_cheap_unlock(); |
| 1542 | checkpoint_pair_and_dependent_pairs( |
| 1543 | ct, |
| 1544 | p, |
| 1545 | p_checkpoint_pending, |
| 1546 | num_dependent_pairs, |
| 1547 | dependent_pairs, |
| 1548 | dep_checkpoint_pending, |
| 1549 | dependent_dirty |
| 1550 | ); |
| 1551 | } |
| 1552 | |
| 1553 | try_again = false; |
| 1554 | exit: |
| 1555 | return try_again; |
| 1556 | } |
| 1557 | |
| 1558 | int toku_cachetable_get_and_pin_with_dep_pairs ( |
| 1559 | CACHEFILE cachefile, |
| 1560 | CACHEKEY key, |
| 1561 | uint32_t fullhash, |
| 1562 | void**value, |
| 1563 | long *sizep, |
| 1564 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 1565 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
| 1566 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
| 1567 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 1568 | pair_lock_type lock_type, |
| 1569 | void* , // parameter for fetch_callback, pf_req_callback, and pf_callback |
| 1570 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
| 1571 | PAIR* dependent_pairs, |
| 1572 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
| 1573 | ) |
| 1574 | // See cachetable/cachetable.h |
| 1575 | { |
| 1576 | CACHETABLE ct = cachefile->cachetable; |
| 1577 | bool wait = false; |
| 1578 | bool already_slept = false; |
| 1579 | bool dep_checkpoint_pending[num_dependent_pairs]; |
| 1580 | |
| 1581 | // |
| 1582 | // If in the process of pinning the node we add data to the cachetable via a partial fetch |
| 1583 | // or a full fetch, we may need to first sleep because there is too much data in the |
| 1584 | // cachetable. In those cases, we set the bool wait to true and goto try_again, so that |
| 1585 | // we can do our sleep and then restart the function. |
| 1586 | // |
| 1587 | beginning: |
| 1588 | if (wait) { |
| 1589 | // We shouldn't be holding the read list lock while |
| 1590 | // waiting for the evictor to remove pairs. |
| 1591 | already_slept = true; |
| 1592 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 1593 | } |
| 1594 | |
| 1595 | ct->list.pair_lock_by_fullhash(fullhash); |
| 1596 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
| 1597 | if (p) { |
| 1598 | // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash) |
| 1599 | // on exit, does not hold p->mutex |
| 1600 | bool try_again = try_pin_pair( |
| 1601 | p, |
| 1602 | ct, |
| 1603 | cachefile, |
| 1604 | lock_type, |
| 1605 | num_dependent_pairs, |
| 1606 | dependent_pairs, |
| 1607 | dependent_dirty, |
| 1608 | pf_req_callback, |
| 1609 | pf_callback, |
| 1610 | read_extraargs, |
| 1611 | already_slept |
| 1612 | ); |
| 1613 | if (try_again) { |
| 1614 | wait = true; |
| 1615 | goto beginning; |
| 1616 | } |
| 1617 | else { |
| 1618 | goto got_value; |
| 1619 | } |
| 1620 | } |
| 1621 | else { |
| 1622 | toku::context fetch_ctx(CTX_FULL_FETCH); |
| 1623 | |
| 1624 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 1625 | // we only want to sleep once per call to get_and_pin. If we have already |
| 1626 | // slept and there is still cache pressure, then we might as |
| 1627 | // well just complete the call, because the sleep did not help |
| 1628 | // By sleeping only once per get_and_pin, we prevent starvation and ensure |
| 1629 | // that we make progress (however slow) on each thread, which allows |
| 1630 | // assumptions of the form 'x will eventually happen'. |
| 1631 | // This happens in extreme scenarios. |
| 1632 | if (ct->ev.should_client_thread_sleep() && !already_slept) { |
| 1633 | wait = true; |
| 1634 | goto beginning; |
| 1635 | } |
| 1636 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 1637 | ct->ev.signal_eviction_thread(); |
| 1638 | } |
| 1639 | // Since the pair was not found, we need the write list |
| 1640 | // lock to add it. So, we have to release the read list lock |
| 1641 | // first. |
| 1642 | ct->list.write_list_lock(); |
| 1643 | ct->list.pair_lock_by_fullhash(fullhash); |
| 1644 | p = ct->list.find_pair(cachefile, key, fullhash); |
| 1645 | if (p != NULL) { |
| 1646 | ct->list.write_list_unlock(); |
| 1647 | // on entry, holds p->mutex, |
| 1648 | // on exit, does not hold p->mutex |
| 1649 | bool try_again = try_pin_pair( |
| 1650 | p, |
| 1651 | ct, |
| 1652 | cachefile, |
| 1653 | lock_type, |
| 1654 | num_dependent_pairs, |
| 1655 | dependent_pairs, |
| 1656 | dependent_dirty, |
| 1657 | pf_req_callback, |
| 1658 | pf_callback, |
| 1659 | read_extraargs, |
| 1660 | already_slept |
| 1661 | ); |
| 1662 | if (try_again) { |
| 1663 | wait = true; |
| 1664 | goto beginning; |
| 1665 | } |
| 1666 | else { |
| 1667 | goto got_value; |
| 1668 | } |
| 1669 | } |
| 1670 | assert(p == NULL); |
| 1671 | |
| 1672 | // Insert a PAIR into the cachetable |
| 1673 | // NOTE: At this point we still have the write list lock held. |
| 1674 | p = cachetable_insert_at( |
| 1675 | ct, |
| 1676 | cachefile, |
| 1677 | key, |
| 1678 | zero_value, |
| 1679 | fullhash, |
| 1680 | zero_attr, |
| 1681 | write_callback, |
| 1682 | CACHETABLE_CLEAN |
| 1683 | ); |
| 1684 | invariant_notnull(p); |
| 1685 | |
| 1686 | // Pin the pair. |
| 1687 | p->value_rwlock.write_lock(true); |
| 1688 | pair_unlock(p); |
| 1689 | |
| 1690 | |
| 1691 | if (lock_type != PL_READ) { |
| 1692 | ct->list.read_pending_cheap_lock(); |
| 1693 | invariant(!p->checkpoint_pending); |
| 1694 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
| 1695 | dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
| 1696 | dependent_pairs[i]->checkpoint_pending = false; |
| 1697 | } |
| 1698 | ct->list.read_pending_cheap_unlock(); |
| 1699 | } |
| 1700 | // We should release the lock before we perform |
| 1701 | // these expensive operations. |
| 1702 | ct->list.write_list_unlock(); |
| 1703 | |
| 1704 | if (lock_type != PL_READ) { |
| 1705 | checkpoint_dependent_pairs( |
| 1706 | ct, |
| 1707 | num_dependent_pairs, |
| 1708 | dependent_pairs, |
| 1709 | dep_checkpoint_pending, |
| 1710 | dependent_dirty |
| 1711 | ); |
| 1712 | } |
| 1713 | uint64_t t0 = get_tnow(); |
| 1714 | |
| 1715 | // Retrieve the value of the PAIR from disk. |
| 1716 | // The pair being fetched will be marked as pending if a checkpoint happens during the |
| 1717 | // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean. |
| 1718 | cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true); |
| 1719 | cachetable_miss++; |
| 1720 | cachetable_misstime += get_tnow() - t0; |
| 1721 | |
| 1722 | // If the lock_type requested was a PL_READ, we downgrade to PL_READ, |
| 1723 | // but if the request was for a PL_WRITE_CHEAP, we don't bother |
| 1724 | // downgrading, because we would have to possibly resolve the |
| 1725 | // checkpointing again, and that would just make this function even |
| 1726 | // messier. |
| 1727 | // |
| 1728 | // TODO(yoni): in case of PL_WRITE_CHEAP, write and use |
| 1729 | // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better) |
| 1730 | // to downgrade from an expensive write lock to a cheap one |
| 1731 | if (lock_type == PL_READ) { |
| 1732 | pair_lock(p); |
| 1733 | p->value_rwlock.write_unlock(); |
| 1734 | p->value_rwlock.read_lock(); |
| 1735 | pair_unlock(p); |
| 1736 | // small hack here for #5439, |
| 1737 | // for queries, pf_req_callback does some work for the caller, |
| 1738 | // that information may be out of date after a write_unlock |
| 1739 | // followed by a read_lock, so we do it again. |
| 1740 | bool pf_required = pf_req_callback(p->value_data,read_extraargs); |
| 1741 | assert(!pf_required); |
| 1742 | } |
| 1743 | goto got_value; |
| 1744 | } |
| 1745 | got_value: |
| 1746 | *value = p->value_data; |
| 1747 | if (sizep) *sizep = p->attr.size; |
| 1748 | return 0; |
| 1749 | } |
| 1750 | |
| 1751 | // Lookup a key in the cachetable. If it is found and it is not being written, then |
| 1752 | // acquire a read lock on the pair, update the LRU list, and return sucess. |
| 1753 | // |
| 1754 | // However, if the page is clean or has checkpoint pending, don't return success. |
| 1755 | // This will minimize the number of dirty nodes. |
| 1756 | // Rationale: maybe_get_and_pin is used when the system has an alternative to modifying a node. |
| 1757 | // In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O. |
| 1758 | // For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify |
| 1759 | // the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit. |
| 1760 | // Similarly, if the checkpoint is actually pending, we don't want to block on it. |
| 1761 | int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { |
| 1762 | CACHETABLE ct = cachefile->cachetable; |
| 1763 | int r = -1; |
| 1764 | ct->list.pair_lock_by_fullhash(fullhash); |
| 1765 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
| 1766 | if (p) { |
| 1767 | const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); |
| 1768 | bool got_lock = false; |
| 1769 | switch (lock_type) { |
| 1770 | case PL_READ: |
| 1771 | if (p->value_rwlock.try_read_lock()) { |
| 1772 | got_lock = p->dirty; |
| 1773 | |
| 1774 | if (!got_lock) { |
| 1775 | p->value_rwlock.read_unlock(); |
| 1776 | } |
| 1777 | } |
| 1778 | break; |
| 1779 | case PL_WRITE_CHEAP: |
| 1780 | case PL_WRITE_EXPENSIVE: |
| 1781 | if (p->value_rwlock.try_write_lock(lock_is_expensive)) { |
| 1782 | // we got the lock fast, so continue |
| 1783 | ct->list.read_pending_cheap_lock(); |
| 1784 | |
| 1785 | // if pending a checkpoint, then we don't want to return |
| 1786 | // the value to the user, because we are responsible for |
| 1787 | // handling the checkpointing, which we do not want to do, |
| 1788 | // because it is expensive |
| 1789 | got_lock = p->dirty && !p->checkpoint_pending; |
| 1790 | |
| 1791 | ct->list.read_pending_cheap_unlock(); |
| 1792 | if (!got_lock) { |
| 1793 | p->value_rwlock.write_unlock(); |
| 1794 | } |
| 1795 | } |
| 1796 | break; |
| 1797 | } |
| 1798 | if (got_lock) { |
| 1799 | pair_touch(p); |
| 1800 | *value = p->value_data; |
| 1801 | r = 0; |
| 1802 | } |
| 1803 | } |
| 1804 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 1805 | return r; |
| 1806 | } |
| 1807 | |
| 1808 | //Used by flusher threads to possibly pin child on client thread if pinning is cheap |
| 1809 | //Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless). |
| 1810 | //All other conditions remain the same. |
| 1811 | int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { |
| 1812 | CACHETABLE ct = cachefile->cachetable; |
| 1813 | int r = -1; |
| 1814 | ct->list.pair_lock_by_fullhash(fullhash); |
| 1815 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
| 1816 | if (p) { |
| 1817 | const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); |
| 1818 | bool got_lock = false; |
| 1819 | switch (lock_type) { |
| 1820 | case PL_READ: |
| 1821 | if (p->value_rwlock.try_read_lock()) { |
| 1822 | got_lock = true; |
| 1823 | } else if (!p->value_rwlock.read_lock_is_expensive()) { |
| 1824 | p->value_rwlock.write_lock(lock_is_expensive); |
| 1825 | got_lock = true; |
| 1826 | } |
| 1827 | if (got_lock) { |
| 1828 | pair_touch(p); |
| 1829 | } |
| 1830 | pair_unlock(p); |
| 1831 | break; |
| 1832 | case PL_WRITE_CHEAP: |
| 1833 | case PL_WRITE_EXPENSIVE: |
| 1834 | if (p->value_rwlock.try_write_lock(lock_is_expensive)) { |
| 1835 | got_lock = true; |
| 1836 | } else if (!p->value_rwlock.write_lock_is_expensive()) { |
| 1837 | p->value_rwlock.write_lock(lock_is_expensive); |
| 1838 | got_lock = true; |
| 1839 | } |
| 1840 | if (got_lock) { |
| 1841 | pair_touch(p); |
| 1842 | } |
| 1843 | pair_unlock(p); |
| 1844 | if (got_lock) { |
| 1845 | bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); |
| 1846 | write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); |
| 1847 | } |
| 1848 | break; |
| 1849 | } |
| 1850 | if (got_lock) { |
| 1851 | *value = p->value_data; |
| 1852 | r = 0; |
| 1853 | } |
| 1854 | } else { |
| 1855 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 1856 | } |
| 1857 | return r; |
| 1858 | } |
| 1859 | |
| 1860 | // |
| 1861 | // internal function to unpin a PAIR. |
| 1862 | // As of Clayface, this is may be called in two ways: |
| 1863 | // - with flush false |
| 1864 | // - with flush true |
| 1865 | // The first is for when this is run during run_unlockers in |
| 1866 | // toku_cachetable_get_and_pin_nonblocking, the second is during |
| 1867 | // normal operations. Only during normal operations do we want to possibly |
| 1868 | // induce evictions or sleep. |
| 1869 | // |
| 1870 | static int |
| 1871 | cachetable_unpin_internal( |
| 1872 | CACHEFILE cachefile, |
| 1873 | PAIR p, |
| 1874 | enum cachetable_dirty dirty, |
| 1875 | PAIR_ATTR attr, |
| 1876 | bool flush |
| 1877 | ) |
| 1878 | { |
| 1879 | invariant_notnull(p); |
| 1880 | |
| 1881 | CACHETABLE ct = cachefile->cachetable; |
| 1882 | bool added_data_to_cachetable = false; |
| 1883 | |
| 1884 | // hack for #3969, only exists in case where we run unlockers |
| 1885 | pair_lock(p); |
| 1886 | PAIR_ATTR old_attr = p->attr; |
| 1887 | PAIR_ATTR new_attr = attr; |
| 1888 | if (dirty) { |
| 1889 | p->dirty = CACHETABLE_DIRTY; |
| 1890 | } |
| 1891 | if (attr.is_valid) { |
| 1892 | p->attr = attr; |
| 1893 | } |
| 1894 | bool read_lock_grabbed = p->value_rwlock.readers() != 0; |
| 1895 | unpin_pair(p, read_lock_grabbed); |
| 1896 | pair_unlock(p); |
| 1897 | |
| 1898 | if (attr.is_valid) { |
| 1899 | if (new_attr.size > old_attr.size) { |
| 1900 | added_data_to_cachetable = true; |
| 1901 | } |
| 1902 | ct->ev.change_pair_attr(old_attr, new_attr); |
| 1903 | } |
| 1904 | |
| 1905 | // see comments above this function to understand this code |
| 1906 | if (flush && added_data_to_cachetable) { |
| 1907 | if (ct->ev.should_client_thread_sleep()) { |
| 1908 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 1909 | } |
| 1910 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 1911 | ct->ev.signal_eviction_thread(); |
| 1912 | } |
| 1913 | } |
| 1914 | return 0; |
| 1915 | } |
| 1916 | |
| 1917 | int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
| 1918 | return cachetable_unpin_internal(cachefile, p, dirty, attr, true); |
| 1919 | } |
| 1920 | int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
| 1921 | return cachetable_unpin_internal(cachefile, p, dirty, attr, false); |
| 1922 | } |
| 1923 | |
| 1924 | static void |
| 1925 | run_unlockers (UNLOCKERS unlockers) { |
| 1926 | while (unlockers) { |
| 1927 | assert(unlockers->locked); |
| 1928 | unlockers->locked = false; |
| 1929 | unlockers->f(unlockers->extra); |
| 1930 | unlockers=unlockers->next; |
| 1931 | } |
| 1932 | } |
| 1933 | |
| 1934 | // |
| 1935 | // This function tries to pin the pair without running the unlockers. |
| 1936 | // If it can pin the pair cheaply, it does so, and returns 0. |
| 1937 | // If the pin will be expensive, it runs unlockers, |
| 1938 | // pins the pair, then releases the pin, |
| 1939 | // and then returns TOKUDB_TRY_AGAIN |
| 1940 | // |
| 1941 | // on entry, pair mutex is held, |
| 1942 | // on exit, pair mutex is NOT held |
| 1943 | static int |
| 1944 | maybe_pin_pair( |
| 1945 | PAIR p, |
| 1946 | pair_lock_type lock_type, |
| 1947 | UNLOCKERS unlockers |
| 1948 | ) |
| 1949 | { |
| 1950 | int retval = 0; |
| 1951 | bool expensive = (lock_type == PL_WRITE_EXPENSIVE); |
| 1952 | |
| 1953 | // we can pin the PAIR. In each case, we check to see |
| 1954 | // if acquiring the pin is expensive. If so, we run the unlockers, set the |
| 1955 | // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR. |
| 1956 | // If not, then we pin the PAIR, keep retval at 0, and do not |
| 1957 | // run the unlockers, as we intend to return the value to the user |
| 1958 | if (lock_type == PL_READ) { |
| 1959 | if (p->value_rwlock.read_lock_is_expensive()) { |
| 1960 | pair_add_ref_unlocked(p); |
| 1961 | pair_unlock(p); |
| 1962 | run_unlockers(unlockers); |
| 1963 | retval = TOKUDB_TRY_AGAIN; |
| 1964 | pair_lock(p); |
| 1965 | pair_release_ref_unlocked(p); |
| 1966 | } |
| 1967 | p->value_rwlock.read_lock(); |
| 1968 | } |
| 1969 | else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){ |
| 1970 | if (p->value_rwlock.write_lock_is_expensive()) { |
| 1971 | pair_add_ref_unlocked(p); |
| 1972 | pair_unlock(p); |
| 1973 | run_unlockers(unlockers); |
| 1974 | // change expensive to false because |
| 1975 | // we will unpin the pair immedietely |
| 1976 | // after pinning it |
| 1977 | expensive = false; |
| 1978 | retval = TOKUDB_TRY_AGAIN; |
| 1979 | pair_lock(p); |
| 1980 | pair_release_ref_unlocked(p); |
| 1981 | } |
| 1982 | p->value_rwlock.write_lock(expensive); |
| 1983 | } |
| 1984 | else { |
| 1985 | abort(); |
| 1986 | } |
| 1987 | |
| 1988 | if (retval == TOKUDB_TRY_AGAIN) { |
| 1989 | unpin_pair(p, (lock_type == PL_READ)); |
| 1990 | } |
| 1991 | pair_touch(p); |
| 1992 | pair_unlock(p); |
| 1993 | return retval; |
| 1994 | } |
| 1995 | |
| 1996 | int toku_cachetable_get_and_pin_nonblocking( |
| 1997 | CACHEFILE cf, |
| 1998 | CACHEKEY key, |
| 1999 | uint32_t fullhash, |
| 2000 | void**value, |
| 2001 | long* UU(sizep), |
| 2002 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 2003 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
| 2004 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
| 2005 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 2006 | pair_lock_type lock_type, |
| 2007 | void *, |
| 2008 | UNLOCKERS unlockers |
| 2009 | ) |
| 2010 | // See cachetable/cachetable.h. |
| 2011 | { |
| 2012 | CACHETABLE ct = cf->cachetable; |
| 2013 | assert(lock_type == PL_READ || |
| 2014 | lock_type == PL_WRITE_CHEAP || |
| 2015 | lock_type == PL_WRITE_EXPENSIVE |
| 2016 | ); |
| 2017 | try_again: |
| 2018 | ct->list.pair_lock_by_fullhash(fullhash); |
| 2019 | PAIR p = ct->list.find_pair(cf, key, fullhash); |
| 2020 | if (p == NULL) { |
| 2021 | toku::context fetch_ctx(CTX_FULL_FETCH); |
| 2022 | |
| 2023 | // Not found |
| 2024 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 2025 | ct->list.write_list_lock(); |
| 2026 | ct->list.pair_lock_by_fullhash(fullhash); |
| 2027 | p = ct->list.find_pair(cf, key, fullhash); |
| 2028 | if (p != NULL) { |
| 2029 | // we just did another search with the write list lock and |
| 2030 | // found the pair this means that in between our |
| 2031 | // releasing the read list lock and grabbing the write list lock, |
| 2032 | // another thread snuck in and inserted the PAIR into |
| 2033 | // the cachetable. For simplicity, we just return |
| 2034 | // to the top and restart the function |
| 2035 | ct->list.write_list_unlock(); |
| 2036 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 2037 | goto try_again; |
| 2038 | } |
| 2039 | |
| 2040 | p = cachetable_insert_at( |
| 2041 | ct, |
| 2042 | cf, |
| 2043 | key, |
| 2044 | zero_value, |
| 2045 | fullhash, |
| 2046 | zero_attr, |
| 2047 | write_callback, |
| 2048 | CACHETABLE_CLEAN |
| 2049 | ); |
| 2050 | assert(p); |
| 2051 | // grab expensive write lock, because we are about to do a fetch |
| 2052 | // off disk |
| 2053 | // No one can access this pair because |
| 2054 | // we hold the write list lock and we just injected |
| 2055 | // the pair into the cachetable. Therefore, this lock acquisition |
| 2056 | // will not block. |
| 2057 | p->value_rwlock.write_lock(true); |
| 2058 | pair_unlock(p); |
| 2059 | run_unlockers(unlockers); // we hold the write list_lock. |
| 2060 | ct->list.write_list_unlock(); |
| 2061 | |
| 2062 | // at this point, only the pair is pinned, |
| 2063 | // and no pair mutex held, and |
| 2064 | // no list lock is held |
| 2065 | uint64_t t0 = get_tnow(); |
| 2066 | cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false); |
| 2067 | cachetable_miss++; |
| 2068 | cachetable_misstime += get_tnow() - t0; |
| 2069 | |
| 2070 | if (ct->ev.should_client_thread_sleep()) { |
| 2071 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 2072 | } |
| 2073 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 2074 | ct->ev.signal_eviction_thread(); |
| 2075 | } |
| 2076 | |
| 2077 | return TOKUDB_TRY_AGAIN; |
| 2078 | } |
| 2079 | else { |
| 2080 | int r = maybe_pin_pair(p, lock_type, unlockers); |
| 2081 | if (r == TOKUDB_TRY_AGAIN) { |
| 2082 | return TOKUDB_TRY_AGAIN; |
| 2083 | } |
| 2084 | assert_zero(r); |
| 2085 | |
| 2086 | if (lock_type != PL_READ) { |
| 2087 | bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); |
| 2088 | write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); |
| 2089 | } |
| 2090 | |
| 2091 | // At this point, we have pinned the PAIR |
| 2092 | // and resolved its checkpointing. The pair's |
| 2093 | // mutex is not held. The read list lock IS held. Before |
| 2094 | // returning the PAIR to the user, we must |
| 2095 | // still check for partial fetch |
| 2096 | bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
| 2097 | if (partial_fetch_required) { |
| 2098 | toku::context fetch_ctx(CTX_PARTIAL_FETCH); |
| 2099 | |
| 2100 | run_unlockers(unlockers); |
| 2101 | |
| 2102 | // we are now getting an expensive write lock, because we |
| 2103 | // are doing a partial fetch. So, if we previously have |
| 2104 | // either a read lock or a cheap write lock, we need to |
| 2105 | // release and reacquire the correct lock type |
| 2106 | if (lock_type == PL_READ) { |
| 2107 | pair_lock(p); |
| 2108 | p->value_rwlock.read_unlock(); |
| 2109 | p->value_rwlock.write_lock(true); |
| 2110 | pair_unlock(p); |
| 2111 | } |
| 2112 | else if (lock_type == PL_WRITE_CHEAP) { |
| 2113 | pair_lock(p); |
| 2114 | p->value_rwlock.write_unlock(); |
| 2115 | p->value_rwlock.write_lock(true); |
| 2116 | pair_unlock(p); |
| 2117 | } |
| 2118 | |
| 2119 | // Now wait for the I/O to occur. |
| 2120 | partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
| 2121 | if (partial_fetch_required) { |
| 2122 | do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false); |
| 2123 | } |
| 2124 | else { |
| 2125 | pair_lock(p); |
| 2126 | p->value_rwlock.write_unlock(); |
| 2127 | pair_unlock(p); |
| 2128 | } |
| 2129 | |
| 2130 | if (ct->ev.should_client_thread_sleep()) { |
| 2131 | ct->ev.wait_for_cache_pressure_to_subside(); |
| 2132 | } |
| 2133 | if (ct->ev.should_client_wake_eviction_thread()) { |
| 2134 | ct->ev.signal_eviction_thread(); |
| 2135 | } |
| 2136 | |
| 2137 | return TOKUDB_TRY_AGAIN; |
| 2138 | } |
| 2139 | else { |
| 2140 | *value = p->value_data; |
| 2141 | return 0; |
| 2142 | } |
| 2143 | } |
| 2144 | // We should not get here. Above code should hit a return in all cases. |
| 2145 | abort(); |
| 2146 | } |
| 2147 | |
| 2148 | struct cachefile_prefetch_args { |
| 2149 | PAIR p; |
| 2150 | CACHETABLE_FETCH_CALLBACK fetch_callback; |
| 2151 | void* ; |
| 2152 | }; |
| 2153 | |
| 2154 | struct cachefile_partial_prefetch_args { |
| 2155 | PAIR p; |
| 2156 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback; |
| 2157 | void *; |
| 2158 | }; |
| 2159 | |
| 2160 | // Worker thread function to read a pair from a cachefile to memory |
| 2161 | static void cachetable_reader(void* ) { |
| 2162 | struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra; |
| 2163 | CACHEFILE cf = cpargs->p->cachefile; |
| 2164 | CACHETABLE ct = cf->cachetable; |
| 2165 | cachetable_fetch_pair( |
| 2166 | ct, |
| 2167 | cpargs->p->cachefile, |
| 2168 | cpargs->p, |
| 2169 | cpargs->fetch_callback, |
| 2170 | cpargs->read_extraargs, |
| 2171 | false |
| 2172 | ); |
| 2173 | bjm_remove_background_job(cf->bjm); |
| 2174 | toku_free(cpargs); |
| 2175 | } |
| 2176 | |
| 2177 | static void cachetable_partial_reader(void* ) { |
| 2178 | struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra; |
| 2179 | CACHEFILE cf = cpargs->p->cachefile; |
| 2180 | CACHETABLE ct = cf->cachetable; |
| 2181 | do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false); |
| 2182 | bjm_remove_background_job(cf->bjm); |
| 2183 | toku_free(cpargs); |
| 2184 | } |
| 2185 | |
| 2186 | int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash, |
| 2187 | CACHETABLE_WRITE_CALLBACK write_callback, |
| 2188 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
| 2189 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
| 2190 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
| 2191 | void *, |
| 2192 | bool *doing_prefetch) |
| 2193 | // Effect: See the documentation for this function in cachetable/cachetable.h |
| 2194 | { |
| 2195 | int r = 0; |
| 2196 | PAIR p = NULL; |
| 2197 | if (doing_prefetch) { |
| 2198 | *doing_prefetch = false; |
| 2199 | } |
| 2200 | CACHETABLE ct = cf->cachetable; |
| 2201 | // if cachetable has too much data, don't bother prefetching |
| 2202 | if (ct->ev.should_client_thread_sleep()) { |
| 2203 | goto exit; |
| 2204 | } |
| 2205 | ct->list.pair_lock_by_fullhash(fullhash); |
| 2206 | // lookup |
| 2207 | p = ct->list.find_pair(cf, key, fullhash); |
| 2208 | // if not found then create a pair and fetch it |
| 2209 | if (p == NULL) { |
| 2210 | cachetable_prefetches++; |
| 2211 | ct->list.pair_unlock_by_fullhash(fullhash); |
| 2212 | ct->list.write_list_lock(); |
| 2213 | ct->list.pair_lock_by_fullhash(fullhash); |
| 2214 | p = ct->list.find_pair(cf, key, fullhash); |
| 2215 | if (p != NULL) { |
| 2216 | ct->list.write_list_unlock(); |
| 2217 | goto found_pair; |
| 2218 | } |
| 2219 | |
| 2220 | r = bjm_add_background_job(cf->bjm); |
| 2221 | assert_zero(r); |
| 2222 | p = cachetable_insert_at( |
| 2223 | ct, |
| 2224 | cf, |
| 2225 | key, |
| 2226 | zero_value, |
| 2227 | fullhash, |
| 2228 | zero_attr, |
| 2229 | write_callback, |
| 2230 | CACHETABLE_CLEAN |
| 2231 | ); |
| 2232 | assert(p); |
| 2233 | p->value_rwlock.write_lock(true); |
| 2234 | pair_unlock(p); |
| 2235 | ct->list.write_list_unlock(); |
| 2236 | |
| 2237 | struct cachefile_prefetch_args *MALLOC(cpargs); |
| 2238 | cpargs->p = p; |
| 2239 | cpargs->fetch_callback = fetch_callback; |
| 2240 | cpargs->read_extraargs = read_extraargs; |
| 2241 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs); |
| 2242 | if (doing_prefetch) { |
| 2243 | *doing_prefetch = true; |
| 2244 | } |
| 2245 | goto exit; |
| 2246 | } |
| 2247 | |
| 2248 | found_pair: |
| 2249 | // at this point, p is found, pair's mutex is grabbed, and |
| 2250 | // no list lock is held |
| 2251 | // TODO(leif): should this also just go ahead and wait if all there |
| 2252 | // are to wait for are readers? |
| 2253 | if (p->value_rwlock.try_write_lock(true)) { |
| 2254 | // nobody else is using the node, so we should go ahead and prefetch |
| 2255 | pair_touch(p); |
| 2256 | pair_unlock(p); |
| 2257 | bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs); |
| 2258 | |
| 2259 | if (partial_fetch_required) { |
| 2260 | r = bjm_add_background_job(cf->bjm); |
| 2261 | assert_zero(r); |
| 2262 | struct cachefile_partial_prefetch_args *MALLOC(cpargs); |
| 2263 | cpargs->p = p; |
| 2264 | cpargs->pf_callback = pf_callback; |
| 2265 | cpargs->read_extraargs = read_extraargs; |
| 2266 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs); |
| 2267 | if (doing_prefetch) { |
| 2268 | *doing_prefetch = true; |
| 2269 | } |
| 2270 | } |
| 2271 | else { |
| 2272 | pair_lock(p); |
| 2273 | p->value_rwlock.write_unlock(); |
| 2274 | pair_unlock(p); |
| 2275 | } |
| 2276 | } |
| 2277 | else { |
| 2278 | // Couldn't get the write lock cheaply |
| 2279 | pair_unlock(p); |
| 2280 | } |
| 2281 | exit: |
| 2282 | return 0; |
| 2283 | } |
| 2284 | |
| 2285 | void toku_cachefile_verify (CACHEFILE cf) { |
| 2286 | toku_cachetable_verify(cf->cachetable); |
| 2287 | } |
| 2288 | |
| 2289 | void toku_cachetable_verify (CACHETABLE ct) { |
| 2290 | ct->list.verify(); |
| 2291 | } |
| 2292 | |
| 2293 | |
| 2294 | |
| 2295 | struct pair_flush_for_close{ |
| 2296 | PAIR p; |
| 2297 | BACKGROUND_JOB_MANAGER bjm; |
| 2298 | }; |
| 2299 | |
| 2300 | static void cachetable_flush_pair_for_close(void* ) { |
| 2301 | struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra); |
| 2302 | PAIR p = args->p; |
| 2303 | CACHEFILE cf = p->cachefile; |
| 2304 | CACHETABLE ct = cf->cachetable; |
| 2305 | PAIR_ATTR attr; |
| 2306 | cachetable_only_write_locked_data( |
| 2307 | &ct->ev, |
| 2308 | p, |
| 2309 | false, // not for a checkpoint, as we assert above |
| 2310 | &attr, |
| 2311 | false // not a clone |
| 2312 | ); |
| 2313 | p->dirty = CACHETABLE_CLEAN; |
| 2314 | bjm_remove_background_job(args->bjm); |
| 2315 | toku_free(args); |
| 2316 | } |
| 2317 | |
| 2318 | |
| 2319 | static void flush_pair_for_close_on_background_thread( |
| 2320 | PAIR p, |
| 2321 | BACKGROUND_JOB_MANAGER bjm, |
| 2322 | CACHETABLE ct |
| 2323 | ) |
| 2324 | { |
| 2325 | pair_lock(p); |
| 2326 | assert(p->value_rwlock.users() == 0); |
| 2327 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
| 2328 | assert(!p->cloned_value_data); |
| 2329 | if (p->dirty == CACHETABLE_DIRTY) { |
| 2330 | int r = bjm_add_background_job(bjm); |
| 2331 | assert_zero(r); |
| 2332 | struct pair_flush_for_close *XMALLOC(args); |
| 2333 | args->p = p; |
| 2334 | args->bjm = bjm; |
| 2335 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args); |
| 2336 | } |
| 2337 | pair_unlock(p); |
| 2338 | } |
| 2339 | |
| 2340 | static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) { |
| 2341 | pair_lock(p); |
| 2342 | assert(p->value_rwlock.users() == 0); |
| 2343 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
| 2344 | assert(!p->cloned_value_data); |
| 2345 | assert(p->dirty == CACHETABLE_CLEAN); |
| 2346 | assert(p->refcount == 0); |
| 2347 | if (completely) { |
| 2348 | cachetable_remove_pair(&ct->list, &ct->ev, p); |
| 2349 | pair_unlock(p); |
| 2350 | // TODO: Eventually, we should not hold the write list lock during free |
| 2351 | cachetable_free_pair(p); |
| 2352 | } |
| 2353 | else { |
| 2354 | // if we are not evicting completely, |
| 2355 | // we only want to remove the PAIR from the cachetable, |
| 2356 | // that is, remove from the hashtable and various linked |
| 2357 | // list, but we will keep the PAIRS and the linked list |
| 2358 | // in the cachefile intact, as they will be cached away |
| 2359 | // in case an open comes soon. |
| 2360 | ct->list.evict_from_cachetable(p); |
| 2361 | pair_unlock(p); |
| 2362 | } |
| 2363 | } |
| 2364 | |
| 2365 | // helper function for cachetable_flush_cachefile, which happens on a close |
| 2366 | // writes out the dirty pairs on background threads and returns when |
| 2367 | // the writing is done |
| 2368 | static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) { |
| 2369 | BACKGROUND_JOB_MANAGER bjm = NULL; |
| 2370 | bjm_init(&bjm); |
| 2371 | ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here |
| 2372 | PAIR p = NULL; |
| 2373 | // write out dirty PAIRs |
| 2374 | uint32_t i; |
| 2375 | if (cf) { |
| 2376 | for (i = 0, p = cf->cf_head; |
| 2377 | i < cf->num_pairs; |
| 2378 | i++, p = p->cf_next) |
| 2379 | { |
| 2380 | flush_pair_for_close_on_background_thread(p, bjm, ct); |
| 2381 | } |
| 2382 | } |
| 2383 | else { |
| 2384 | for (i = 0, p = ct->list.m_checkpoint_head; |
| 2385 | i < ct->list.m_n_in_table; |
| 2386 | i++, p = p->clock_next) |
| 2387 | { |
| 2388 | flush_pair_for_close_on_background_thread(p, bjm, ct); |
| 2389 | } |
| 2390 | } |
| 2391 | ct->list.write_list_unlock(); |
| 2392 | bjm_wait_for_jobs_to_finish(bjm); |
| 2393 | bjm_destroy(bjm); |
| 2394 | } |
| 2395 | |
| 2396 | static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { |
| 2397 | ct->list.write_list_lock(); |
| 2398 | if (cf) { |
| 2399 | if (evict_completely) { |
| 2400 | // if we are evicting completely, then the PAIRs will |
| 2401 | // be removed from the linked list managed by the |
| 2402 | // cachefile, so this while loop works |
| 2403 | while (cf->num_pairs > 0) { |
| 2404 | PAIR p = cf->cf_head; |
| 2405 | remove_pair_for_close(p, ct, evict_completely); |
| 2406 | } |
| 2407 | } |
| 2408 | else { |
| 2409 | // on the other hand, if we are not evicting completely, |
| 2410 | // then the cachefile's linked list stays intact, and we must |
| 2411 | // iterate like this. |
| 2412 | for (PAIR p = cf->cf_head; p; p = p->cf_next) { |
| 2413 | remove_pair_for_close(p, ct, evict_completely); |
| 2414 | } |
| 2415 | } |
| 2416 | } |
| 2417 | else { |
| 2418 | while (ct->list.m_n_in_table > 0) { |
| 2419 | PAIR p = ct->list.m_checkpoint_head; |
| 2420 | // if there is no cachefile, then we better |
| 2421 | // be evicting completely because we have no |
| 2422 | // cachefile to save the PAIRs to. At least, |
| 2423 | // we have no guarantees that the cachefile |
| 2424 | // will remain good |
| 2425 | invariant(evict_completely); |
| 2426 | remove_pair_for_close(p, ct, true); |
| 2427 | } |
| 2428 | } |
| 2429 | ct->list.write_list_unlock(); |
| 2430 | } |
| 2431 | |
| 2432 | static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) { |
| 2433 | #ifdef TOKU_DEBUG_PARANOID |
| 2434 | // assert here that cachefile is flushed by checking |
| 2435 | // pair_list and finding no pairs belonging to this cachefile |
| 2436 | // Make a list of pairs that belong to this cachefile. |
| 2437 | if (cf) { |
| 2438 | ct->list.write_list_lock(); |
| 2439 | // assert here that cachefile is flushed by checking |
| 2440 | // pair_list and finding no pairs belonging to this cachefile |
| 2441 | // Make a list of pairs that belong to this cachefile. |
| 2442 | uint32_t i; |
| 2443 | PAIR p = NULL; |
| 2444 | for (i = 0, p = ct->list.m_checkpoint_head; |
| 2445 | i < ct->list.m_n_in_table; |
| 2446 | i++, p = p->clock_next) |
| 2447 | { |
| 2448 | assert(p->cachefile != cf); |
| 2449 | } |
| 2450 | ct->list.write_list_unlock(); |
| 2451 | } |
| 2452 | #endif |
| 2453 | } |
| 2454 | |
| 2455 | // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if |
| 2456 | // the cachefile is NULL. |
| 2457 | // Must be holding cachetable lock on entry. |
| 2458 | // |
| 2459 | // This function assumes that no client thread is accessing or |
| 2460 | // trying to access the cachefile while this function is executing. |
| 2461 | // This implies no client thread will be trying to lock any nodes |
| 2462 | // belonging to the cachefile. |
| 2463 | // |
| 2464 | // This function also assumes that the cachefile is not in the process |
| 2465 | // of being used by a checkpoint. If a checkpoint is currently happening, |
| 2466 | // it does NOT include this cachefile. |
| 2467 | // |
| 2468 | static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { |
| 2469 | // |
| 2470 | // Because work on a kibbutz is always done by the client thread, |
| 2471 | // and this function assumes that no client thread is doing any work |
| 2472 | // on the cachefile, we assume that no client thread will be adding jobs |
| 2473 | // to this cachefile's kibbutz. |
| 2474 | // |
| 2475 | // The caller of this function must ensure that there are |
| 2476 | // no jobs added to the kibbutz. This implies that the only work other |
| 2477 | // threads may be doing is work by the writer threads. |
| 2478 | // |
| 2479 | // first write out dirty PAIRs |
| 2480 | write_dirty_pairs_for_close(ct, cf); |
| 2481 | |
| 2482 | // now that everything is clean, get rid of everything |
| 2483 | remove_all_pairs_for_close(ct, cf, evict_completely); |
| 2484 | |
| 2485 | verify_cachefile_flushed(ct, cf); |
| 2486 | } |
| 2487 | |
| 2488 | /* Requires that no locks be held that are used by the checkpoint logic */ |
| 2489 | void |
| 2490 | toku_cachetable_minicron_shutdown(CACHETABLE ct) { |
| 2491 | int r = ct->cp.shutdown(); |
| 2492 | assert(r==0); |
| 2493 | ct->cl.destroy(); |
| 2494 | } |
| 2495 | |
| 2496 | void toku_cachetable_prepare_close(CACHETABLE ct UU()) { |
| 2497 | extern bool toku_serialize_in_parallel; |
| 2498 | toku_unsafe_set(&toku_serialize_in_parallel, true); |
| 2499 | } |
| 2500 | |
| 2501 | /* Requires that it all be flushed. */ |
| 2502 | void toku_cachetable_close (CACHETABLE *ctp) { |
| 2503 | CACHETABLE ct = *ctp; |
| 2504 | ct->cp.destroy(); |
| 2505 | ct->cl.destroy(); |
| 2506 | ct->cf_list.free_stale_data(&ct->ev); |
| 2507 | cachetable_flush_cachefile(ct, NULL, true); |
| 2508 | ct->ev.destroy(); |
| 2509 | ct->list.destroy(); |
| 2510 | ct->cf_list.destroy(); |
| 2511 | |
| 2512 | if (ct->client_kibbutz) |
| 2513 | toku_kibbutz_destroy(ct->client_kibbutz); |
| 2514 | if (ct->ct_kibbutz) |
| 2515 | toku_kibbutz_destroy(ct->ct_kibbutz); |
| 2516 | if (ct->checkpointing_kibbutz) |
| 2517 | toku_kibbutz_destroy(ct->checkpointing_kibbutz); |
| 2518 | toku_free(ct->env_dir); |
| 2519 | toku_free(ct); |
| 2520 | *ctp = 0; |
| 2521 | } |
| 2522 | |
| 2523 | static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) { |
| 2524 | CACHETABLE ct = cachefile->cachetable; |
| 2525 | |
| 2526 | if (!have_ct_lock) { |
| 2527 | ct->list.read_list_lock(); |
| 2528 | } |
| 2529 | |
| 2530 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
| 2531 | assert(p != NULL); |
| 2532 | if (!have_ct_lock) { |
| 2533 | ct->list.read_list_unlock(); |
| 2534 | } |
| 2535 | return p; |
| 2536 | } |
| 2537 | |
| 2538 | //test-only wrapper |
| 2539 | int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
| 2540 | // By default we don't have the lock |
| 2541 | PAIR p = test_get_pair(cachefile, key, fullhash, false); |
| 2542 | return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock |
| 2543 | } |
| 2544 | |
| 2545 | //test-only wrapper |
| 2546 | int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
| 2547 | // We hold the cachetable mutex. |
| 2548 | PAIR p = test_get_pair(cachefile, key, fullhash, true); |
| 2549 | return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr); |
| 2550 | } |
| 2551 | |
| 2552 | //test-only wrapper |
| 2553 | int toku_test_cachetable_unpin_and_remove ( |
| 2554 | CACHEFILE cachefile, |
| 2555 | CACHEKEY key, |
| 2556 | CACHETABLE_REMOVE_KEY remove_key, |
| 2557 | void* ) |
| 2558 | { |
| 2559 | uint32_t fullhash = toku_cachetable_hash(cachefile, key); |
| 2560 | PAIR p = test_get_pair(cachefile, key, fullhash, false); |
| 2561 | return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra); |
| 2562 | } |
| 2563 | |
| 2564 | int toku_cachetable_unpin_and_remove ( |
| 2565 | CACHEFILE cachefile, |
| 2566 | PAIR p, |
| 2567 | CACHETABLE_REMOVE_KEY remove_key, |
| 2568 | void* |
| 2569 | ) |
| 2570 | { |
| 2571 | invariant_notnull(p); |
| 2572 | int r = ENOENT; |
| 2573 | CACHETABLE ct = cachefile->cachetable; |
| 2574 | |
| 2575 | p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it. |
| 2576 | // grab disk_nb_mutex to ensure any background thread writing |
| 2577 | // out a cloned value completes |
| 2578 | pair_lock(p); |
| 2579 | assert(p->value_rwlock.writers()); |
| 2580 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 2581 | pair_unlock(p); |
| 2582 | assert(p->cloned_value_data == NULL); |
| 2583 | |
| 2584 | // |
| 2585 | // take care of key removal |
| 2586 | // |
| 2587 | ct->list.write_list_lock(); |
| 2588 | ct->list.read_pending_cheap_lock(); |
| 2589 | bool for_checkpoint = p->checkpoint_pending; |
| 2590 | // now let's wipe out the pending bit, because we are |
| 2591 | // removing the PAIR |
| 2592 | p->checkpoint_pending = false; |
| 2593 | |
| 2594 | // For the PAIR to not be picked by the |
| 2595 | // cleaner thread, we mark the cachepressure_size to be 0 |
| 2596 | // (This is redundant since we have the write_list_lock) |
| 2597 | // This should not be an issue because we call |
| 2598 | // cachetable_remove_pair before |
| 2599 | // releasing the cachetable lock. |
| 2600 | // |
| 2601 | CACHEKEY key_to_remove = p->key; |
| 2602 | p->attr.cache_pressure_size = 0; |
| 2603 | // |
| 2604 | // callback for removing the key |
| 2605 | // for FTNODEs, this leads to calling |
| 2606 | // toku_free_blocknum |
| 2607 | // |
| 2608 | if (remove_key) { |
| 2609 | remove_key( |
| 2610 | &key_to_remove, |
| 2611 | for_checkpoint, |
| 2612 | remove_key_extra |
| 2613 | ); |
| 2614 | } |
| 2615 | ct->list.read_pending_cheap_unlock(); |
| 2616 | |
| 2617 | pair_lock(p); |
| 2618 | p->value_rwlock.write_unlock(); |
| 2619 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 2620 | // |
| 2621 | // As of Clayface (6.5), only these threads may be |
| 2622 | // blocked waiting to lock this PAIR: |
| 2623 | // - the checkpoint thread (because a checkpoint is in progress |
| 2624 | // and the PAIR was in the list of pending pairs) |
| 2625 | // - a client thread running get_and_pin_nonblocking, who |
| 2626 | // ran unlockers, then waited on the PAIR lock. |
| 2627 | // While waiting on a PAIR lock, another thread comes in, |
| 2628 | // locks the PAIR, and ends up calling unpin_and_remove, |
| 2629 | // all while get_and_pin_nonblocking is waiting on the PAIR lock. |
| 2630 | // We did not realize this at first, which caused bug #4357 |
| 2631 | // The following threads CANNOT be blocked waiting on |
| 2632 | // the PAIR lock: |
| 2633 | // - a thread trying to run eviction via run_eviction. |
| 2634 | // That cannot happen because run_eviction only |
| 2635 | // attempts to lock PAIRS that are not locked, and this PAIR |
| 2636 | // is locked. |
| 2637 | // - cleaner thread, for the same reason as a thread running |
| 2638 | // eviction |
| 2639 | // - client thread doing a normal get_and_pin. The client is smart |
| 2640 | // enough to not try to lock a PAIR that another client thread |
| 2641 | // is trying to unpin and remove. Note that this includes work |
| 2642 | // done on kibbutzes. |
| 2643 | // - writer thread. Writer threads do not grab PAIR locks. They |
| 2644 | // get PAIR locks transferred to them by client threads. |
| 2645 | // |
| 2646 | |
| 2647 | // first thing we do is remove the PAIR from the various |
| 2648 | // cachetable data structures, so no other thread can possibly |
| 2649 | // access it. We do not want to risk some other thread |
| 2650 | // trying to lock this PAIR if we release the write list lock |
| 2651 | // below. If some thread is already waiting on the lock, |
| 2652 | // then we let that thread grab the lock and finish, but |
| 2653 | // we don't want any NEW threads to try to grab the PAIR |
| 2654 | // lock. |
| 2655 | // |
| 2656 | // Because we call cachetable_remove_pair and wait, |
| 2657 | // the threads that may be waiting |
| 2658 | // on this PAIR lock must be careful to do NOTHING with the PAIR |
| 2659 | // As per our analysis above, we only need |
| 2660 | // to make sure the checkpoint thread and get_and_pin_nonblocking do |
| 2661 | // nothing, and looking at those functions, it is clear they do nothing. |
| 2662 | // |
| 2663 | cachetable_remove_pair(&ct->list, &ct->ev, p); |
| 2664 | ct->list.write_list_unlock(); |
| 2665 | if (p->refcount > 0) { |
| 2666 | pair_wait_for_ref_release_unlocked(p); |
| 2667 | } |
| 2668 | if (p->value_rwlock.users() > 0) { |
| 2669 | // Need to wait for everyone else to leave |
| 2670 | // This write lock will be granted only after all waiting |
| 2671 | // threads are done. |
| 2672 | p->value_rwlock.write_lock(true); |
| 2673 | assert(p->refcount == 0); |
| 2674 | assert(p->value_rwlock.users() == 1); // us |
| 2675 | assert(!p->checkpoint_pending); |
| 2676 | assert(p->attr.cache_pressure_size == 0); |
| 2677 | p->value_rwlock.write_unlock(); |
| 2678 | } |
| 2679 | // just a sanity check |
| 2680 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
| 2681 | assert(p->cloned_value_data == NULL); |
| 2682 | //Remove pair. |
| 2683 | pair_unlock(p); |
| 2684 | cachetable_free_pair(p); |
| 2685 | r = 0; |
| 2686 | return r; |
| 2687 | } |
| 2688 | |
| 2689 | int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array); |
| 2690 | int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) { |
| 2691 | array[index] = toku_cachefile_filenum(ft->cf); |
| 2692 | return 0; |
| 2693 | } |
| 2694 | |
| 2695 | static int log_open_txn (TOKUTXN txn, void* ) { |
| 2696 | int r; |
| 2697 | checkpointer* cp = (checkpointer *)extra; |
| 2698 | TOKULOGGER logger = txn->logger; |
| 2699 | FILENUMS open_filenums; |
| 2700 | uint32_t num_filenums = txn->open_fts.size(); |
| 2701 | FILENUM array[num_filenums]; |
| 2702 | if (toku_txn_is_read_only(txn)) { |
| 2703 | goto cleanup; |
| 2704 | } |
| 2705 | else { |
| 2706 | cp->increment_num_txns(); |
| 2707 | } |
| 2708 | |
| 2709 | open_filenums.num = num_filenums; |
| 2710 | open_filenums.filenums = array; |
| 2711 | //Fill in open_filenums |
| 2712 | r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array); |
| 2713 | invariant(r==0); |
| 2714 | switch (toku_txn_get_state(txn)) { |
| 2715 | case TOKUTXN_LIVE:{ |
| 2716 | toku_log_xstillopen(logger, NULL, 0, txn, |
| 2717 | toku_txn_get_txnid(txn), |
| 2718 | toku_txn_get_txnid(toku_logger_txn_parent(txn)), |
| 2719 | txn->roll_info.rollentry_raw_count, |
| 2720 | open_filenums, |
| 2721 | txn->force_fsync_on_commit, |
| 2722 | txn->roll_info.num_rollback_nodes, |
| 2723 | txn->roll_info.num_rollentries, |
| 2724 | txn->roll_info.spilled_rollback_head, |
| 2725 | txn->roll_info.spilled_rollback_tail, |
| 2726 | txn->roll_info.current_rollback); |
| 2727 | goto cleanup; |
| 2728 | } |
| 2729 | case TOKUTXN_PREPARING: { |
| 2730 | TOKU_XA_XID xa_xid; |
| 2731 | toku_txn_get_prepared_xa_xid(txn, &xa_xid); |
| 2732 | toku_log_xstillopenprepared(logger, NULL, 0, txn, |
| 2733 | toku_txn_get_txnid(txn), |
| 2734 | &xa_xid, |
| 2735 | txn->roll_info.rollentry_raw_count, |
| 2736 | open_filenums, |
| 2737 | txn->force_fsync_on_commit, |
| 2738 | txn->roll_info.num_rollback_nodes, |
| 2739 | txn->roll_info.num_rollentries, |
| 2740 | txn->roll_info.spilled_rollback_head, |
| 2741 | txn->roll_info.spilled_rollback_tail, |
| 2742 | txn->roll_info.current_rollback); |
| 2743 | goto cleanup; |
| 2744 | } |
| 2745 | case TOKUTXN_RETIRED: |
| 2746 | case TOKUTXN_COMMITTING: |
| 2747 | case TOKUTXN_ABORTING: { |
| 2748 | assert(0); |
| 2749 | } |
| 2750 | } |
| 2751 | // default is an error |
| 2752 | assert(0); |
| 2753 | cleanup: |
| 2754 | return 0; |
| 2755 | } |
| 2756 | |
| 2757 | // Requires: All three checkpoint-relevant locks must be held (see checkpoint.c). |
| 2758 | // Algorithm: Write a checkpoint record to the log, noting the LSN of that record. |
| 2759 | // Use the begin_checkpoint callback to take necessary snapshots (header, btt) |
| 2760 | // Mark every dirty node as "pending." ("Pending" means that the node must be |
| 2761 | // written to disk before it can be modified.) |
| 2762 | void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) { |
| 2763 | cp->begin_checkpoint(); |
| 2764 | } |
| 2765 | |
| 2766 | |
| 2767 | // This is used by the cachetable_race test. |
| 2768 | static volatile int toku_checkpointing_user_data_status = 0; |
| 2769 | static void toku_cachetable_set_checkpointing_user_data_status (int v) { |
| 2770 | toku_checkpointing_user_data_status = v; |
| 2771 | } |
| 2772 | int toku_cachetable_get_checkpointing_user_data_status (void) { |
| 2773 | return toku_checkpointing_user_data_status; |
| 2774 | } |
| 2775 | |
| 2776 | // Requires: The big checkpoint lock must be held (see checkpoint.c). |
| 2777 | // Algorithm: Write all pending nodes to disk |
| 2778 | // Use checkpoint callback to write snapshot information to disk (header, btt) |
| 2779 | // Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks |
| 2780 | // Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log |
| 2781 | void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger), |
| 2782 | void (*testcallback_f)(void*), void* ) { |
| 2783 | cp->end_checkpoint(testcallback_f, testextra); |
| 2784 | } |
| 2785 | |
| 2786 | TOKULOGGER toku_cachefile_logger (CACHEFILE cf) { |
| 2787 | return cf->cachetable->cp.get_logger(); |
| 2788 | } |
| 2789 | |
| 2790 | FILENUM toku_cachefile_filenum (CACHEFILE cf) { |
| 2791 | return cf->filenum; |
| 2792 | } |
| 2793 | |
| 2794 | // debug functions |
| 2795 | |
| 2796 | int toku_cachetable_assert_all_unpinned (CACHETABLE ct) { |
| 2797 | uint32_t i; |
| 2798 | int some_pinned=0; |
| 2799 | ct->list.read_list_lock(); |
| 2800 | for (i=0; i<ct->list.m_table_size; i++) { |
| 2801 | PAIR p; |
| 2802 | for (p=ct->list.m_table[i]; p; p=p->hash_chain) { |
| 2803 | pair_lock(p); |
| 2804 | if (p->value_rwlock.users()) { |
| 2805 | //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data); |
| 2806 | some_pinned=1; |
| 2807 | } |
| 2808 | pair_unlock(p); |
| 2809 | } |
| 2810 | } |
| 2811 | ct->list.read_list_unlock(); |
| 2812 | return some_pinned; |
| 2813 | } |
| 2814 | |
| 2815 | int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) { |
| 2816 | assert(cf != NULL); |
| 2817 | int n_pinned=0; |
| 2818 | CACHETABLE ct = cf->cachetable; |
| 2819 | ct->list.read_list_lock(); |
| 2820 | |
| 2821 | // Iterate over all the pairs to find pairs specific to the |
| 2822 | // given cachefile. |
| 2823 | for (uint32_t i = 0; i < ct->list.m_table_size; i++) { |
| 2824 | for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) { |
| 2825 | if (p->cachefile == cf) { |
| 2826 | pair_lock(p); |
| 2827 | if (p->value_rwlock.users()) { |
| 2828 | if (print_them) { |
| 2829 | printf("%s:%d pinned: %" PRId64 " (%p)\n" , |
| 2830 | __FILE__, |
| 2831 | __LINE__, |
| 2832 | p->key.b, |
| 2833 | p->value_data); |
| 2834 | } |
| 2835 | n_pinned++; |
| 2836 | } |
| 2837 | pair_unlock(p); |
| 2838 | } |
| 2839 | } |
| 2840 | } |
| 2841 | |
| 2842 | ct->list.read_list_unlock(); |
| 2843 | return n_pinned; |
| 2844 | } |
| 2845 | |
| 2846 | void toku_cachetable_print_state (CACHETABLE ct) { |
| 2847 | uint32_t i; |
| 2848 | ct->list.read_list_lock(); |
| 2849 | for (i=0; i<ct->list.m_table_size; i++) { |
| 2850 | PAIR p = ct->list.m_table[i]; |
| 2851 | if (p != 0) { |
| 2852 | pair_lock(p); |
| 2853 | printf("t[%u]=" , i); |
| 2854 | for (p=ct->list.m_table[i]; p; p=p->hash_chain) { |
| 2855 | printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}" , p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size); |
| 2856 | } |
| 2857 | printf("\n" ); |
| 2858 | pair_unlock(p); |
| 2859 | } |
| 2860 | } |
| 2861 | ct->list.read_list_unlock(); |
| 2862 | } |
| 2863 | |
| 2864 | void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) { |
| 2865 | ct->list.get_state(num_entries_ptr, hash_size_ptr); |
| 2866 | ct->ev.get_state(size_current_ptr, size_limit_ptr); |
| 2867 | } |
| 2868 | |
| 2869 | int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr, |
| 2870 | int *dirty_ptr, long long *pin_ptr, long *size_ptr) { |
| 2871 | int r = -1; |
| 2872 | uint32_t fullhash = toku_cachetable_hash(cf, key); |
| 2873 | ct->list.read_list_lock(); |
| 2874 | PAIR p = ct->list.find_pair(cf, key, fullhash); |
| 2875 | if (p) { |
| 2876 | pair_lock(p); |
| 2877 | if (value_ptr) |
| 2878 | *value_ptr = p->value_data; |
| 2879 | if (dirty_ptr) |
| 2880 | *dirty_ptr = p->dirty; |
| 2881 | if (pin_ptr) |
| 2882 | *pin_ptr = p->value_rwlock.users(); |
| 2883 | if (size_ptr) |
| 2884 | *size_ptr = p->attr.size; |
| 2885 | r = 0; |
| 2886 | pair_unlock(p); |
| 2887 | } |
| 2888 | ct->list.read_list_unlock(); |
| 2889 | return r; |
| 2890 | } |
| 2891 | |
| 2892 | void |
| 2893 | toku_cachefile_set_userdata (CACHEFILE cf, |
| 2894 | void *userdata, |
| 2895 | void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), |
| 2896 | void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), |
| 2897 | void (*free_userdata)(CACHEFILE, void*), |
| 2898 | void (*checkpoint_userdata)(CACHEFILE, int, void*), |
| 2899 | void (*begin_checkpoint_userdata)(LSN, void*), |
| 2900 | void (*end_checkpoint_userdata)(CACHEFILE, int, void*), |
| 2901 | void (*note_pin_by_checkpoint)(CACHEFILE, void*), |
| 2902 | void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { |
| 2903 | cf->userdata = userdata; |
| 2904 | cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; |
| 2905 | cf->close_userdata = close_userdata; |
| 2906 | cf->free_userdata = free_userdata; |
| 2907 | cf->checkpoint_userdata = checkpoint_userdata; |
| 2908 | cf->begin_checkpoint_userdata = begin_checkpoint_userdata; |
| 2909 | cf->end_checkpoint_userdata = end_checkpoint_userdata; |
| 2910 | cf->note_pin_by_checkpoint = note_pin_by_checkpoint; |
| 2911 | cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint; |
| 2912 | } |
| 2913 | |
| 2914 | void *toku_cachefile_get_userdata(CACHEFILE cf) { |
| 2915 | return cf->userdata; |
| 2916 | } |
| 2917 | |
| 2918 | CACHETABLE |
| 2919 | toku_cachefile_get_cachetable(CACHEFILE cf) { |
| 2920 | return cf->cachetable; |
| 2921 | } |
| 2922 | |
| 2923 | CACHEFILE toku_pair_get_cachefile(PAIR pair) { |
| 2924 | return pair->cachefile; |
| 2925 | } |
| 2926 | |
| 2927 | //Only called by ft_end_checkpoint |
| 2928 | //Must have access to cf->fd (must be protected) |
| 2929 | void toku_cachefile_fsync(CACHEFILE cf) { |
| 2930 | toku_file_fsync(cf->fd); |
| 2931 | } |
| 2932 | |
| 2933 | // Make it so when the cachefile closes, the underlying file is unlinked |
| 2934 | void toku_cachefile_unlink_on_close(CACHEFILE cf) { |
| 2935 | assert(!cf->unlink_on_close); |
| 2936 | cf->unlink_on_close = true; |
| 2937 | } |
| 2938 | |
| 2939 | // is this cachefile marked as unlink on close? |
| 2940 | bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) { |
| 2941 | return cf->unlink_on_close; |
| 2942 | } |
| 2943 | |
| 2944 | void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) { |
| 2945 | cf->skip_log_recover_on_close = true; |
| 2946 | } |
| 2947 | |
| 2948 | void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) { |
| 2949 | cf->skip_log_recover_on_close = false; |
| 2950 | } |
| 2951 | |
| 2952 | bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) { |
| 2953 | return cf->skip_log_recover_on_close; |
| 2954 | } |
| 2955 | |
| 2956 | uint64_t toku_cachefile_size(CACHEFILE cf) { |
| 2957 | int64_t file_size; |
| 2958 | int fd = toku_cachefile_get_fd(cf); |
| 2959 | int r = toku_os_get_file_size(fd, &file_size); |
| 2960 | assert_zero(r); |
| 2961 | return file_size; |
| 2962 | } |
| 2963 | |
| 2964 | char * |
| 2965 | toku_construct_full_name(int count, ...) { |
| 2966 | va_list ap; |
| 2967 | char *name = NULL; |
| 2968 | size_t n = 0; |
| 2969 | int i; |
| 2970 | va_start(ap, count); |
| 2971 | for (i=0; i<count; i++) { |
| 2972 | char *arg = va_arg(ap, char *); |
| 2973 | if (arg) { |
| 2974 | n += 1 + strlen(arg) + 1; |
| 2975 | char *XMALLOC_N(n, newname); |
| 2976 | if (name && !toku_os_is_absolute_name(arg)) |
| 2977 | snprintf(newname, n, "%s/%s" , name, arg); |
| 2978 | else |
| 2979 | snprintf(newname, n, "%s" , arg); |
| 2980 | toku_free(name); |
| 2981 | name = newname; |
| 2982 | } |
| 2983 | } |
| 2984 | va_end(ap); |
| 2985 | |
| 2986 | return name; |
| 2987 | } |
| 2988 | |
| 2989 | char * |
| 2990 | toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) { |
| 2991 | return toku_construct_full_name(2, ct->env_dir, fname_in_env); |
| 2992 | } |
| 2993 | |
| 2994 | static long |
| 2995 | cleaner_thread_rate_pair(PAIR p) |
| 2996 | { |
| 2997 | return p->attr.cache_pressure_size; |
| 2998 | } |
| 2999 | |
| 3000 | static int const CLEANER_N_TO_CHECK = 8; |
| 3001 | |
| 3002 | int toku_cleaner_thread_for_test (CACHETABLE ct) { |
| 3003 | return ct->cl.run_cleaner(); |
| 3004 | } |
| 3005 | |
| 3006 | int toku_cleaner_thread (void *cleaner_v) { |
| 3007 | cleaner* cl = (cleaner *) cleaner_v; |
| 3008 | assert(cl); |
| 3009 | return cl->run_cleaner(); |
| 3010 | } |
| 3011 | |
| 3012 | ///////////////////////////////////////////////////////////////////////// |
| 3013 | // |
| 3014 | // cleaner methods |
| 3015 | // |
| 3016 | ENSURE_POD(cleaner); |
| 3017 | |
| 3018 | int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) { |
| 3019 | // default is no cleaner, for now |
| 3020 | m_cleaner_cron_init = false; |
| 3021 | int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this); |
| 3022 | if (r == 0) { |
| 3023 | m_cleaner_cron_init = true; |
| 3024 | } |
| 3025 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations); |
| 3026 | m_cleaner_iterations = _cleaner_iterations; |
| 3027 | m_pl = _pl; |
| 3028 | m_ct = _ct; |
| 3029 | m_cleaner_init = true; |
| 3030 | return r; |
| 3031 | } |
| 3032 | |
| 3033 | // this function is allowed to be called multiple times |
| 3034 | void cleaner::destroy(void) { |
| 3035 | if (!m_cleaner_init) { |
| 3036 | return; |
| 3037 | } |
| 3038 | if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) { |
| 3039 | // for test code only, production code uses toku_cachetable_minicron_shutdown() |
| 3040 | int r = toku_minicron_shutdown(&m_cleaner_cron); |
| 3041 | assert(r==0); |
| 3042 | } |
| 3043 | } |
| 3044 | |
| 3045 | uint32_t cleaner::get_iterations(void) { |
| 3046 | return m_cleaner_iterations; |
| 3047 | } |
| 3048 | |
| 3049 | void cleaner::set_iterations(uint32_t new_iterations) { |
| 3050 | m_cleaner_iterations = new_iterations; |
| 3051 | } |
| 3052 | |
| 3053 | uint32_t cleaner::get_period_unlocked(void) { |
| 3054 | return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron); |
| 3055 | } |
| 3056 | |
| 3057 | // |
| 3058 | // Sets how often the cleaner thread will run, in seconds |
| 3059 | // |
| 3060 | void cleaner::set_period(uint32_t new_period) { |
| 3061 | toku_minicron_change_period(&m_cleaner_cron, new_period*1000); |
| 3062 | } |
| 3063 | |
| 3064 | // Effect: runs a cleaner. |
| 3065 | // |
| 3066 | // We look through some number of nodes, the first N that we see which are |
| 3067 | // unlocked and are not involved in a cachefile flush, pick one, and call |
| 3068 | // the cleaner callback. While we're picking a node, we have the |
| 3069 | // cachetable lock the whole time, so we don't need any extra |
| 3070 | // synchronization. Once we have one we want, we lock it and notify the |
| 3071 | // cachefile that we're doing some background work (so a flush won't |
| 3072 | // start). At this point, we can safely unlock the cachetable, do the |
| 3073 | // work (callback), and unlock/release our claim to the cachefile. |
| 3074 | int cleaner::run_cleaner(void) { |
| 3075 | toku::context cleaner_ctx(CTX_CLEANER); |
| 3076 | |
| 3077 | int r; |
| 3078 | uint32_t num_iterations = this->get_iterations(); |
| 3079 | for (uint32_t i = 0; i < num_iterations; ++i) { |
| 3080 | cleaner_executions++; |
| 3081 | m_pl->read_list_lock(); |
| 3082 | PAIR best_pair = NULL; |
| 3083 | int n_seen = 0; |
| 3084 | long best_score = 0; |
| 3085 | const PAIR first_pair = m_pl->m_cleaner_head; |
| 3086 | if (first_pair == NULL) { |
| 3087 | // nothing in the cachetable, just get out now |
| 3088 | m_pl->read_list_unlock(); |
| 3089 | break; |
| 3090 | } |
| 3091 | // here we select a PAIR for cleaning |
| 3092 | // look at some number of PAIRS, and |
| 3093 | // pick what we think is the best one for cleaning |
| 3094 | //***** IMPORTANT ****** |
| 3095 | // we MUST not pick a PAIR whose rating is 0. We have |
| 3096 | // numerous assumptions in other parts of the code that |
| 3097 | // this is the case: |
| 3098 | // - this is how rollback nodes and leaf nodes are not selected for cleaning |
| 3099 | // - this is how a thread that is calling unpin_and_remove will prevent |
| 3100 | // the cleaner thread from picking its PAIR (see comments in that function) |
| 3101 | do { |
| 3102 | // |
| 3103 | // We are already holding onto best_pair, if we run across a pair that |
| 3104 | // has the same mutex due to a collision in the hashtable, we need |
| 3105 | // to be careful. |
| 3106 | // |
| 3107 | if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) { |
| 3108 | // Advance the cleaner head. |
| 3109 | long score = 0; |
| 3110 | // only bother with this pair if it has no current users |
| 3111 | if (m_pl->m_cleaner_head->value_rwlock.users() == 0) { |
| 3112 | score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); |
| 3113 | if (score > best_score) { |
| 3114 | best_score = score; |
| 3115 | best_pair = m_pl->m_cleaner_head; |
| 3116 | } |
| 3117 | } |
| 3118 | m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; |
| 3119 | continue; |
| 3120 | } |
| 3121 | pair_lock(m_pl->m_cleaner_head); |
| 3122 | if (m_pl->m_cleaner_head->value_rwlock.users() > 0) { |
| 3123 | pair_unlock(m_pl->m_cleaner_head); |
| 3124 | } |
| 3125 | else { |
| 3126 | n_seen++; |
| 3127 | long score = 0; |
| 3128 | score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); |
| 3129 | if (score > best_score) { |
| 3130 | best_score = score; |
| 3131 | // Since we found a new best pair, we need to |
| 3132 | // free the old best pair. |
| 3133 | if (best_pair) { |
| 3134 | pair_unlock(best_pair); |
| 3135 | } |
| 3136 | best_pair = m_pl->m_cleaner_head; |
| 3137 | } |
| 3138 | else { |
| 3139 | pair_unlock(m_pl->m_cleaner_head); |
| 3140 | } |
| 3141 | } |
| 3142 | // Advance the cleaner head. |
| 3143 | m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; |
| 3144 | } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK); |
| 3145 | m_pl->read_list_unlock(); |
| 3146 | |
| 3147 | // |
| 3148 | // at this point, if we have found a PAIR for cleaning, |
| 3149 | // that is, best_pair != NULL, we do the clean |
| 3150 | // |
| 3151 | // if best_pair !=NULL, then best_pair->mutex is held |
| 3152 | // no list lock is held |
| 3153 | // |
| 3154 | if (best_pair) { |
| 3155 | CACHEFILE cf = best_pair->cachefile; |
| 3156 | // try to add a background job to the manager |
| 3157 | // if we can't, that means the cachefile is flushing, so |
| 3158 | // we simply continue the for loop and this iteration |
| 3159 | // becomes a no-op |
| 3160 | r = bjm_add_background_job(cf->bjm); |
| 3161 | if (r) { |
| 3162 | pair_unlock(best_pair); |
| 3163 | continue; |
| 3164 | } |
| 3165 | best_pair->value_rwlock.write_lock(true); |
| 3166 | pair_unlock(best_pair); |
| 3167 | // verify a key assumption. |
| 3168 | assert(cleaner_thread_rate_pair(best_pair) > 0); |
| 3169 | // check the checkpoint_pending bit |
| 3170 | m_pl->read_pending_cheap_lock(); |
| 3171 | bool checkpoint_pending = best_pair->checkpoint_pending; |
| 3172 | best_pair->checkpoint_pending = false; |
| 3173 | m_pl->read_pending_cheap_unlock(); |
| 3174 | if (checkpoint_pending) { |
| 3175 | write_locked_pair_for_checkpoint(m_ct, best_pair, true); |
| 3176 | } |
| 3177 | |
| 3178 | bool cleaner_callback_called = false; |
| 3179 | |
| 3180 | // it's theoretically possible that after writing a PAIR for checkpoint, the |
| 3181 | // PAIR's heuristic tells us nothing needs to be done. It is not possible |
| 3182 | // in Dr. Noga, but unit tests verify this behavior works properly. |
| 3183 | if (cleaner_thread_rate_pair(best_pair) > 0) { |
| 3184 | r = best_pair->cleaner_callback(best_pair->value_data, |
| 3185 | best_pair->key, |
| 3186 | best_pair->fullhash, |
| 3187 | best_pair->write_extraargs); |
| 3188 | assert_zero(r); |
| 3189 | cleaner_callback_called = true; |
| 3190 | } |
| 3191 | |
| 3192 | // The cleaner callback must have unlocked the pair, so we |
| 3193 | // don't need to unlock it if the cleaner callback is called. |
| 3194 | if (!cleaner_callback_called) { |
| 3195 | pair_lock(best_pair); |
| 3196 | best_pair->value_rwlock.write_unlock(); |
| 3197 | pair_unlock(best_pair); |
| 3198 | } |
| 3199 | // We need to make sure the cachefile sticks around so a close |
| 3200 | // can't come destroy it. That's the purpose of this |
| 3201 | // "add/remove_background_job" business, which means the |
| 3202 | // cachefile is still valid here, even though the cleaner |
| 3203 | // callback unlocks the pair. |
| 3204 | bjm_remove_background_job(cf->bjm); |
| 3205 | } |
| 3206 | else { |
| 3207 | // If we didn't find anything this time around the cachetable, |
| 3208 | // we probably won't find anything if we run around again, so |
| 3209 | // just break out from the for-loop now and |
| 3210 | // we'll try again when the cleaner thread runs again. |
| 3211 | break; |
| 3212 | } |
| 3213 | } |
| 3214 | return 0; |
| 3215 | } |
| 3216 | |
| 3217 | static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD" ); |
| 3218 | |
| 3219 | const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20; |
| 3220 | uint32_t PAIR_LOCK_SIZE = 1<<20; |
| 3221 | |
| 3222 | void toku_pair_list_set_lock_size(uint32_t num_locks) { |
| 3223 | PAIR_LOCK_SIZE = num_locks; |
| 3224 | } |
| 3225 | |
| 3226 | static void evict_pair_from_cachefile(PAIR p) { |
| 3227 | CACHEFILE cf = p->cachefile; |
| 3228 | if (p->cf_next) { |
| 3229 | p->cf_next->cf_prev = p->cf_prev; |
| 3230 | } |
| 3231 | if (p->cf_prev) { |
| 3232 | p->cf_prev->cf_next = p->cf_next; |
| 3233 | } |
| 3234 | else if (p->cachefile->cf_head == p) { |
| 3235 | cf->cf_head = p->cf_next; |
| 3236 | } |
| 3237 | p->cf_prev = p->cf_next = NULL; |
| 3238 | cf->num_pairs--; |
| 3239 | } |
| 3240 | |
| 3241 | // Allocates the hash table of pairs inside this pair list. |
| 3242 | // |
| 3243 | void pair_list::init() { |
| 3244 | m_table_size = INITIAL_PAIR_LIST_SIZE; |
| 3245 | m_num_locks = PAIR_LOCK_SIZE; |
| 3246 | m_n_in_table = 0; |
| 3247 | m_clock_head = NULL; |
| 3248 | m_cleaner_head = NULL; |
| 3249 | m_checkpoint_head = NULL; |
| 3250 | m_pending_head = NULL; |
| 3251 | m_table = NULL; |
| 3252 | |
| 3253 | |
| 3254 | pthread_rwlockattr_t attr; |
| 3255 | pthread_rwlockattr_init(&attr); |
| 3256 | #if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP) |
| 3257 | pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); |
| 3258 | #else |
| 3259 | // TODO: need to figure out how to make writer-preferential rwlocks |
| 3260 | // happen on osx |
| 3261 | #endif |
| 3262 | toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr); |
| 3263 | toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key, |
| 3264 | &m_pending_lock_expensive, |
| 3265 | &attr); |
| 3266 | toku_pthread_rwlock_init( |
| 3267 | *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr); |
| 3268 | XCALLOC_N(m_table_size, m_table); |
| 3269 | XCALLOC_N(m_num_locks, m_mutexes); |
| 3270 | for (uint64_t i = 0; i < m_num_locks; i++) { |
| 3271 | toku_mutex_init( |
| 3272 | #ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX |
| 3273 | *cachetable_m_mutex_key, |
| 3274 | #else |
| 3275 | toku_uninstrumented, |
| 3276 | #endif |
| 3277 | &m_mutexes[i].aligned_mutex, |
| 3278 | nullptr); |
| 3279 | } |
| 3280 | } |
| 3281 | |
| 3282 | // Frees the pair_list hash table. It is expected to be empty by |
| 3283 | // the time this is called. Returns an error if there are any |
| 3284 | // pairs in any of the hash table slots. |
| 3285 | void pair_list::destroy() { |
| 3286 | // Check if any entries exist in the hash table. |
| 3287 | for (uint32_t i = 0; i < m_table_size; ++i) { |
| 3288 | invariant_null(m_table[i]); |
| 3289 | } |
| 3290 | for (uint64_t i = 0; i < m_num_locks; i++) { |
| 3291 | toku_mutex_destroy(&m_mutexes[i].aligned_mutex); |
| 3292 | } |
| 3293 | toku_pthread_rwlock_destroy(&m_list_lock); |
| 3294 | toku_pthread_rwlock_destroy(&m_pending_lock_expensive); |
| 3295 | toku_pthread_rwlock_destroy(&m_pending_lock_cheap); |
| 3296 | toku_free(m_table); |
| 3297 | toku_free(m_mutexes); |
| 3298 | } |
| 3299 | |
| 3300 | // adds a PAIR to the cachetable's structures, |
| 3301 | // but does NOT add it to the list maintained by |
| 3302 | // the cachefile |
| 3303 | void pair_list::add_to_cachetable_only(PAIR p) { |
| 3304 | // sanity check to make sure that the PAIR does not already exist |
| 3305 | PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash); |
| 3306 | assert(pp == NULL); |
| 3307 | |
| 3308 | this->add_to_clock(p); |
| 3309 | this->add_to_hash_chain(p); |
| 3310 | m_n_in_table++; |
| 3311 | } |
| 3312 | |
| 3313 | // This places the given pair inside of the pair list. |
| 3314 | // |
| 3315 | // requires caller to have grabbed write lock on list. |
| 3316 | // requires caller to have p->mutex held as well |
| 3317 | // |
| 3318 | void pair_list::put(PAIR p) { |
| 3319 | this->add_to_cachetable_only(p); |
| 3320 | this->add_to_cf_list(p); |
| 3321 | } |
| 3322 | |
| 3323 | // This removes the given pair from completely from the pair list. |
| 3324 | // |
| 3325 | // requires caller to have grabbed write lock on list, and p->mutex held |
| 3326 | // |
| 3327 | void pair_list::evict_completely(PAIR p) { |
| 3328 | this->evict_from_cachetable(p); |
| 3329 | this->evict_from_cachefile(p); |
| 3330 | } |
| 3331 | |
| 3332 | // Removes the PAIR from the cachetable's lists, |
| 3333 | // but does NOT impact the list maintained by the cachefile |
| 3334 | void pair_list::evict_from_cachetable(PAIR p) { |
| 3335 | this->pair_remove(p); |
| 3336 | this->pending_pairs_remove(p); |
| 3337 | this->remove_from_hash_chain(p); |
| 3338 | |
| 3339 | assert(m_n_in_table > 0); |
| 3340 | m_n_in_table--; |
| 3341 | } |
| 3342 | |
| 3343 | // Removes the PAIR from the cachefile's list of PAIRs |
| 3344 | void pair_list::evict_from_cachefile(PAIR p) { |
| 3345 | evict_pair_from_cachefile(p); |
| 3346 | } |
| 3347 | |
| 3348 | // |
| 3349 | // Remove pair from linked list for cleaner/clock |
| 3350 | // |
| 3351 | // |
| 3352 | // requires caller to have grabbed write lock on list. |
| 3353 | // |
| 3354 | void pair_list::pair_remove (PAIR p) { |
| 3355 | if (p->clock_prev == p) { |
| 3356 | invariant(m_clock_head == p); |
| 3357 | invariant(p->clock_next == p); |
| 3358 | invariant(m_cleaner_head == p); |
| 3359 | invariant(m_checkpoint_head == p); |
| 3360 | m_clock_head = NULL; |
| 3361 | m_cleaner_head = NULL; |
| 3362 | m_checkpoint_head = NULL; |
| 3363 | } |
| 3364 | else { |
| 3365 | if (p == m_clock_head) { |
| 3366 | m_clock_head = m_clock_head->clock_next; |
| 3367 | } |
| 3368 | if (p == m_cleaner_head) { |
| 3369 | m_cleaner_head = m_cleaner_head->clock_next; |
| 3370 | } |
| 3371 | if (p == m_checkpoint_head) { |
| 3372 | m_checkpoint_head = m_checkpoint_head->clock_next; |
| 3373 | } |
| 3374 | p->clock_prev->clock_next = p->clock_next; |
| 3375 | p->clock_next->clock_prev = p->clock_prev; |
| 3376 | } |
| 3377 | p->clock_prev = p->clock_next = NULL; |
| 3378 | } |
| 3379 | |
| 3380 | //Remove a pair from the list of pairs that were marked with the |
| 3381 | //pending bit for the in-progress checkpoint. |
| 3382 | // |
| 3383 | // requires that if the caller is the checkpoint thread, then a read lock |
| 3384 | // is grabbed on the list. Otherwise, must have write lock on list. |
| 3385 | // |
| 3386 | void pair_list::pending_pairs_remove (PAIR p) { |
| 3387 | if (p->pending_next) { |
| 3388 | p->pending_next->pending_prev = p->pending_prev; |
| 3389 | } |
| 3390 | if (p->pending_prev) { |
| 3391 | p->pending_prev->pending_next = p->pending_next; |
| 3392 | } |
| 3393 | else if (m_pending_head==p) { |
| 3394 | m_pending_head = p->pending_next; |
| 3395 | } |
| 3396 | p->pending_prev = p->pending_next = NULL; |
| 3397 | } |
| 3398 | |
| 3399 | void pair_list::remove_from_hash_chain(PAIR p) { |
| 3400 | // Remove it from the hash chain. |
| 3401 | unsigned int h = p->fullhash&(m_table_size - 1); |
| 3402 | paranoid_invariant(m_table[h] != NULL); |
| 3403 | if (m_table[h] == p) { |
| 3404 | m_table[h] = p->hash_chain; |
| 3405 | } |
| 3406 | else { |
| 3407 | PAIR curr = m_table[h]; |
| 3408 | while (curr->hash_chain != p) { |
| 3409 | curr = curr->hash_chain; |
| 3410 | } |
| 3411 | // remove p from the singular linked list |
| 3412 | curr->hash_chain = p->hash_chain; |
| 3413 | } |
| 3414 | p->hash_chain = NULL; |
| 3415 | } |
| 3416 | |
| 3417 | // Returns a pair from the pair list, using the given |
| 3418 | // pair. If the pair cannot be found, null is returned. |
| 3419 | // |
| 3420 | // requires caller to have grabbed either a read lock on the list or |
| 3421 | // bucket's mutex. |
| 3422 | // |
| 3423 | PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) { |
| 3424 | PAIR found_pair = nullptr; |
| 3425 | for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) { |
| 3426 | if (p->key.b == key.b && p->cachefile == file) { |
| 3427 | found_pair = p; |
| 3428 | break; |
| 3429 | } |
| 3430 | } |
| 3431 | return found_pair; |
| 3432 | } |
| 3433 | |
| 3434 | // Add PAIR to linked list shared by cleaner thread and clock |
| 3435 | // |
| 3436 | // requires caller to have grabbed write lock on list. |
| 3437 | // |
| 3438 | void pair_list::add_to_clock (PAIR p) { |
| 3439 | // requires that p is not currently in the table. |
| 3440 | // inserts p into the clock list at the tail. |
| 3441 | |
| 3442 | p->count = CLOCK_INITIAL_COUNT; |
| 3443 | //assert either both head and tail are set or they are both NULL |
| 3444 | // tail and head exist |
| 3445 | if (m_clock_head) { |
| 3446 | assert(m_cleaner_head); |
| 3447 | assert(m_checkpoint_head); |
| 3448 | // insert right before the head |
| 3449 | p->clock_next = m_clock_head; |
| 3450 | p->clock_prev = m_clock_head->clock_prev; |
| 3451 | |
| 3452 | p->clock_prev->clock_next = p; |
| 3453 | p->clock_next->clock_prev = p; |
| 3454 | |
| 3455 | } |
| 3456 | // this is the first element in the list |
| 3457 | else { |
| 3458 | m_clock_head = p; |
| 3459 | p->clock_next = p->clock_prev = m_clock_head; |
| 3460 | m_cleaner_head = p; |
| 3461 | m_checkpoint_head = p; |
| 3462 | } |
| 3463 | } |
| 3464 | |
| 3465 | // add the pair to the linked list that of PAIRs belonging |
| 3466 | // to the same cachefile. This linked list is used |
| 3467 | // in cachetable_flush_cachefile. |
| 3468 | void pair_list::add_to_cf_list(PAIR p) { |
| 3469 | CACHEFILE cf = p->cachefile; |
| 3470 | if (cf->cf_head) { |
| 3471 | cf->cf_head->cf_prev = p; |
| 3472 | } |
| 3473 | p->cf_next = cf->cf_head; |
| 3474 | p->cf_prev = NULL; |
| 3475 | cf->cf_head = p; |
| 3476 | cf->num_pairs++; |
| 3477 | } |
| 3478 | |
| 3479 | // Add PAIR to the hashtable |
| 3480 | // |
| 3481 | // requires caller to have grabbed write lock on list |
| 3482 | // and to have grabbed the p->mutex. |
| 3483 | void pair_list::add_to_hash_chain(PAIR p) { |
| 3484 | uint32_t h = p->fullhash & (m_table_size - 1); |
| 3485 | p->hash_chain = m_table[h]; |
| 3486 | m_table[h] = p; |
| 3487 | } |
| 3488 | |
| 3489 | // test function |
| 3490 | // |
| 3491 | // grabs and releases write list lock |
| 3492 | // |
| 3493 | void pair_list::verify() { |
| 3494 | this->write_list_lock(); |
| 3495 | uint32_t num_found = 0; |
| 3496 | |
| 3497 | // First clear all the verify flags by going through the hash chains |
| 3498 | { |
| 3499 | uint32_t i; |
| 3500 | for (i = 0; i < m_table_size; i++) { |
| 3501 | PAIR p; |
| 3502 | for (p = m_table[i]; p; p = p->hash_chain) { |
| 3503 | num_found++; |
| 3504 | } |
| 3505 | } |
| 3506 | } |
| 3507 | assert(num_found == m_n_in_table); |
| 3508 | num_found = 0; |
| 3509 | // Now go through the clock chain, make sure everything in the LRU chain is hashed. |
| 3510 | { |
| 3511 | PAIR p; |
| 3512 | bool is_first = true; |
| 3513 | for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) { |
| 3514 | is_first=false; |
| 3515 | PAIR p2; |
| 3516 | uint32_t fullhash = p->fullhash; |
| 3517 | //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key)); |
| 3518 | for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) { |
| 3519 | if (p2==p) { |
| 3520 | /* found it */ |
| 3521 | num_found++; |
| 3522 | goto next; |
| 3523 | } |
| 3524 | } |
| 3525 | fprintf(stderr, "Something in the clock chain is not hashed\n" ); |
| 3526 | assert(0); |
| 3527 | next:; |
| 3528 | } |
| 3529 | assert (num_found == m_n_in_table); |
| 3530 | } |
| 3531 | this->write_list_unlock(); |
| 3532 | } |
| 3533 | |
| 3534 | // If given pointers are not null, assign the hash table size of |
| 3535 | // this pair list and the number of pairs in this pair list. |
| 3536 | // |
| 3537 | // |
| 3538 | // grabs and releases read list lock |
| 3539 | // |
| 3540 | void pair_list::get_state(int *num_entries, int *hash_size) { |
| 3541 | this->read_list_lock(); |
| 3542 | if (num_entries) { |
| 3543 | *num_entries = m_n_in_table; |
| 3544 | } |
| 3545 | if (hash_size) { |
| 3546 | *hash_size = m_table_size; |
| 3547 | } |
| 3548 | this->read_list_unlock(); |
| 3549 | } |
| 3550 | |
| 3551 | void pair_list::read_list_lock() { |
| 3552 | toku_pthread_rwlock_rdlock(&m_list_lock); |
| 3553 | } |
| 3554 | |
| 3555 | void pair_list::read_list_unlock() { |
| 3556 | toku_pthread_rwlock_rdunlock(&m_list_lock); |
| 3557 | } |
| 3558 | |
| 3559 | void pair_list::write_list_lock() { |
| 3560 | toku_pthread_rwlock_wrlock(&m_list_lock); |
| 3561 | } |
| 3562 | |
| 3563 | void pair_list::write_list_unlock() { |
| 3564 | toku_pthread_rwlock_wrunlock(&m_list_lock); |
| 3565 | } |
| 3566 | |
| 3567 | void pair_list::read_pending_exp_lock() { |
| 3568 | toku_pthread_rwlock_rdlock(&m_pending_lock_expensive); |
| 3569 | } |
| 3570 | |
| 3571 | void pair_list::read_pending_exp_unlock() { |
| 3572 | toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive); |
| 3573 | } |
| 3574 | |
| 3575 | void pair_list::write_pending_exp_lock() { |
| 3576 | toku_pthread_rwlock_wrlock(&m_pending_lock_expensive); |
| 3577 | } |
| 3578 | |
| 3579 | void pair_list::write_pending_exp_unlock() { |
| 3580 | toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive); |
| 3581 | } |
| 3582 | |
| 3583 | void pair_list::read_pending_cheap_lock() { |
| 3584 | toku_pthread_rwlock_rdlock(&m_pending_lock_cheap); |
| 3585 | } |
| 3586 | |
| 3587 | void pair_list::read_pending_cheap_unlock() { |
| 3588 | toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap); |
| 3589 | } |
| 3590 | |
| 3591 | void pair_list::write_pending_cheap_lock() { |
| 3592 | toku_pthread_rwlock_wrlock(&m_pending_lock_cheap); |
| 3593 | } |
| 3594 | |
| 3595 | void pair_list::write_pending_cheap_unlock() { |
| 3596 | toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap); |
| 3597 | } |
| 3598 | |
| 3599 | toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) { |
| 3600 | return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex; |
| 3601 | } |
| 3602 | |
| 3603 | void pair_list::pair_lock_by_fullhash(uint32_t fullhash) { |
| 3604 | toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); |
| 3605 | } |
| 3606 | |
| 3607 | void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) { |
| 3608 | toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); |
| 3609 | } |
| 3610 | |
| 3611 | |
| 3612 | ENSURE_POD(evictor); |
| 3613 | |
| 3614 | // |
| 3615 | // This is the function that runs eviction on its own thread. |
| 3616 | // |
| 3617 | static void *eviction_thread(void *evictor_v) { |
| 3618 | evictor *CAST_FROM_VOIDP(evictor, evictor_v); |
| 3619 | evictor->run_eviction_thread(); |
| 3620 | return toku_pthread_done(evictor_v); |
| 3621 | } |
| 3622 | |
| 3623 | // |
| 3624 | // Starts the eviction thread, assigns external object references, |
| 3625 | // and initializes all counters and condition variables. |
| 3626 | // |
| 3627 | int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) { |
| 3628 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running); |
| 3629 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting); |
| 3630 | |
| 3631 | // set max difference to around 500MB |
| 3632 | int64_t max_diff = (1 << 29); |
| 3633 | |
| 3634 | m_low_size_watermark = _size_limit; |
| 3635 | // these values are selected kind of arbitrarily right now as |
| 3636 | // being a percentage more than low_size_watermark, which is provided |
| 3637 | // by the caller. |
| 3638 | m_low_size_hysteresis = (11 * _size_limit)/10; //10% more |
| 3639 | if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) { |
| 3640 | m_low_size_hysteresis = m_low_size_watermark + max_diff; |
| 3641 | } |
| 3642 | m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more |
| 3643 | if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) { |
| 3644 | m_high_size_hysteresis = m_low_size_hysteresis + max_diff; |
| 3645 | } |
| 3646 | m_high_size_watermark = (3 * _size_limit)/2; // 50% more |
| 3647 | if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) { |
| 3648 | m_high_size_watermark = m_high_size_hysteresis + max_diff; |
| 3649 | } |
| 3650 | |
| 3651 | m_enable_partial_eviction = true; |
| 3652 | |
| 3653 | m_size_reserved = unreservable_memory(_size_limit); |
| 3654 | m_size_current = 0; |
| 3655 | m_size_cloned_data = 0; |
| 3656 | m_size_evicting = 0; |
| 3657 | |
| 3658 | m_size_nonleaf = create_partitioned_counter(); |
| 3659 | m_size_leaf = create_partitioned_counter(); |
| 3660 | m_size_rollback = create_partitioned_counter(); |
| 3661 | m_size_cachepressure = create_partitioned_counter(); |
| 3662 | m_wait_pressure_count = create_partitioned_counter(); |
| 3663 | m_wait_pressure_time = create_partitioned_counter(); |
| 3664 | m_long_wait_pressure_count = create_partitioned_counter(); |
| 3665 | m_long_wait_pressure_time = create_partitioned_counter(); |
| 3666 | |
| 3667 | m_pl = _pl; |
| 3668 | m_cf_list = _cf_list; |
| 3669 | m_kibbutz = _kibbutz; |
| 3670 | toku_mutex_init( |
| 3671 | *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr); |
| 3672 | toku_cond_init( |
| 3673 | *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr); |
| 3674 | toku_cond_init( |
| 3675 | *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr); |
| 3676 | m_num_sleepers = 0; |
| 3677 | m_ev_thread_is_running = false; |
| 3678 | m_period_in_seconds = eviction_period; |
| 3679 | |
| 3680 | unsigned int seed = (unsigned int) time(NULL); |
| 3681 | int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data); |
| 3682 | assert_zero(r); |
| 3683 | |
| 3684 | // start the background thread |
| 3685 | m_run_thread = true; |
| 3686 | m_num_eviction_thread_runs = 0; |
| 3687 | m_ev_thread_init = false; |
| 3688 | r = toku_pthread_create( |
| 3689 | *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this); |
| 3690 | if (r == 0) { |
| 3691 | m_ev_thread_init = true; |
| 3692 | } |
| 3693 | m_evictor_init = true; |
| 3694 | return r; |
| 3695 | } |
| 3696 | |
| 3697 | // |
| 3698 | // This stops the eviction thread and clears the condition variable. |
| 3699 | // |
| 3700 | // NOTE: This should only be called if there are no evictions in progress. |
| 3701 | // |
| 3702 | void evictor::destroy() { |
| 3703 | if (!m_evictor_init) { |
| 3704 | return; |
| 3705 | } |
| 3706 | assert(m_size_evicting == 0); |
| 3707 | // |
| 3708 | // commented out of Ming, because we could not finish |
| 3709 | // #5672. Once #5672 is solved, we should restore this |
| 3710 | // |
| 3711 | //assert(m_size_current == 0); |
| 3712 | |
| 3713 | // Stop the eviction thread. |
| 3714 | if (m_ev_thread_init) { |
| 3715 | toku_mutex_lock(&m_ev_thread_lock); |
| 3716 | m_run_thread = false; |
| 3717 | this->signal_eviction_thread_locked(); |
| 3718 | toku_mutex_unlock(&m_ev_thread_lock); |
| 3719 | void *ret; |
| 3720 | int r = toku_pthread_join(m_ev_thread, &ret); |
| 3721 | assert_zero(r); |
| 3722 | assert(!m_ev_thread_is_running); |
| 3723 | } |
| 3724 | destroy_partitioned_counter(m_size_nonleaf); |
| 3725 | m_size_nonleaf = NULL; |
| 3726 | destroy_partitioned_counter(m_size_leaf); |
| 3727 | m_size_leaf = NULL; |
| 3728 | destroy_partitioned_counter(m_size_rollback); |
| 3729 | m_size_rollback = NULL; |
| 3730 | destroy_partitioned_counter(m_size_cachepressure); |
| 3731 | m_size_cachepressure = NULL; |
| 3732 | |
| 3733 | destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL; |
| 3734 | destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL; |
| 3735 | destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL; |
| 3736 | destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL; |
| 3737 | |
| 3738 | toku_cond_destroy(&m_flow_control_cond); |
| 3739 | toku_cond_destroy(&m_ev_thread_cond); |
| 3740 | toku_mutex_destroy(&m_ev_thread_lock); |
| 3741 | } |
| 3742 | |
| 3743 | // |
| 3744 | // Increases status variables and the current size variable |
| 3745 | // of the evictor based on the given pair attribute. |
| 3746 | // |
| 3747 | void evictor::add_pair_attr(PAIR_ATTR attr) { |
| 3748 | assert(attr.is_valid); |
| 3749 | add_to_size_current(attr.size); |
| 3750 | increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size); |
| 3751 | increment_partitioned_counter(m_size_leaf, attr.leaf_size); |
| 3752 | increment_partitioned_counter(m_size_rollback, attr.rollback_size); |
| 3753 | increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size); |
| 3754 | } |
| 3755 | |
| 3756 | // |
| 3757 | // Decreases status variables and the current size variable |
| 3758 | // of the evictor based on the given pair attribute. |
| 3759 | // |
| 3760 | void evictor::remove_pair_attr(PAIR_ATTR attr) { |
| 3761 | assert(attr.is_valid); |
| 3762 | remove_from_size_current(attr.size); |
| 3763 | increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size); |
| 3764 | increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size); |
| 3765 | increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size); |
| 3766 | increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size); |
| 3767 | } |
| 3768 | |
| 3769 | // |
| 3770 | // Updates this evictor's stats to match the "new" pair attribute given |
| 3771 | // while also removing the given "old" pair attribute. |
| 3772 | // |
| 3773 | void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) { |
| 3774 | this->add_pair_attr(new_attr); |
| 3775 | this->remove_pair_attr(old_attr); |
| 3776 | } |
| 3777 | |
| 3778 | // |
| 3779 | // Adds the given size to the evictor's estimation of |
| 3780 | // the size of the cachetable. |
| 3781 | // |
| 3782 | void evictor::add_to_size_current(long size) { |
| 3783 | (void) toku_sync_fetch_and_add(&m_size_current, size); |
| 3784 | } |
| 3785 | |
| 3786 | // |
| 3787 | // Subtracts the given size from the evictor's current |
| 3788 | // approximation of the cachetable size. |
| 3789 | // |
| 3790 | void evictor::remove_from_size_current(long size) { |
| 3791 | (void) toku_sync_fetch_and_sub(&m_size_current, size); |
| 3792 | } |
| 3793 | |
| 3794 | // |
| 3795 | // Adds the size of cloned data to necessary variables in the evictor |
| 3796 | // |
| 3797 | void evictor::add_cloned_data_size(long size) { |
| 3798 | (void) toku_sync_fetch_and_add(&m_size_cloned_data, size); |
| 3799 | add_to_size_current(size); |
| 3800 | } |
| 3801 | |
| 3802 | // |
| 3803 | // Removes the size of cloned data to necessary variables in the evictor |
| 3804 | // |
| 3805 | void evictor::remove_cloned_data_size(long size) { |
| 3806 | (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size); |
| 3807 | remove_from_size_current(size); |
| 3808 | } |
| 3809 | |
| 3810 | // |
| 3811 | // TODO: (Zardosht) comment this function |
| 3812 | // |
| 3813 | uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) { |
| 3814 | toku_mutex_lock(&m_ev_thread_lock); |
| 3815 | uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved); |
| 3816 | if (0) { // debug |
| 3817 | fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n" , __PRETTY_FUNCTION__, reserved_memory, upper_bound); |
| 3818 | } |
| 3819 | if (upper_bound > 0 && reserved_memory > upper_bound) { |
| 3820 | reserved_memory = upper_bound; |
| 3821 | } |
| 3822 | m_size_reserved += reserved_memory; |
| 3823 | (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory); |
| 3824 | this->signal_eviction_thread_locked(); |
| 3825 | toku_mutex_unlock(&m_ev_thread_lock); |
| 3826 | |
| 3827 | if (this->should_client_thread_sleep()) { |
| 3828 | this->wait_for_cache_pressure_to_subside(); |
| 3829 | } |
| 3830 | return reserved_memory; |
| 3831 | } |
| 3832 | |
| 3833 | // |
| 3834 | // TODO: (Zardosht) comment this function |
| 3835 | // |
| 3836 | void evictor::release_reserved_memory(uint64_t reserved_memory){ |
| 3837 | (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory); |
| 3838 | toku_mutex_lock(&m_ev_thread_lock); |
| 3839 | m_size_reserved -= reserved_memory; |
| 3840 | // signal the eviction thread in order to possibly wake up sleeping clients |
| 3841 | if (m_num_sleepers > 0) { |
| 3842 | this->signal_eviction_thread_locked(); |
| 3843 | } |
| 3844 | toku_mutex_unlock(&m_ev_thread_lock); |
| 3845 | } |
| 3846 | |
| 3847 | // |
| 3848 | // This function is the eviction thread. It runs for the lifetime of |
| 3849 | // the evictor. Goes to sleep for period_in_seconds |
| 3850 | // by waiting on m_ev_thread_cond. |
| 3851 | // |
| 3852 | void evictor::run_eviction_thread(){ |
| 3853 | toku_mutex_lock(&m_ev_thread_lock); |
| 3854 | while (m_run_thread) { |
| 3855 | m_num_eviction_thread_runs++; // for test purposes only |
| 3856 | m_ev_thread_is_running = true; |
| 3857 | // responsibility of run_eviction to release and |
| 3858 | // regrab ev_thread_lock as it sees fit |
| 3859 | this->run_eviction(); |
| 3860 | m_ev_thread_is_running = false; |
| 3861 | |
| 3862 | if (m_run_thread) { |
| 3863 | // |
| 3864 | // sleep until either we are signaled |
| 3865 | // via signal_eviction_thread or |
| 3866 | // m_period_in_seconds amount of time has passed |
| 3867 | // |
| 3868 | if (m_period_in_seconds) { |
| 3869 | toku_timespec_t wakeup_time; |
| 3870 | struct timeval tv; |
| 3871 | gettimeofday(&tv, 0); |
| 3872 | wakeup_time.tv_sec = tv.tv_sec; |
| 3873 | wakeup_time.tv_nsec = tv.tv_usec * 1000LL; |
| 3874 | wakeup_time.tv_sec += m_period_in_seconds; |
| 3875 | toku_cond_timedwait( |
| 3876 | &m_ev_thread_cond, |
| 3877 | &m_ev_thread_lock, |
| 3878 | &wakeup_time |
| 3879 | ); |
| 3880 | } |
| 3881 | // for test purposes, we have an option of |
| 3882 | // not waiting on a period, but rather sleeping indefinitely |
| 3883 | else { |
| 3884 | toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock); |
| 3885 | } |
| 3886 | } |
| 3887 | } |
| 3888 | toku_mutex_unlock(&m_ev_thread_lock); |
| 3889 | } |
| 3890 | |
| 3891 | // |
| 3892 | // runs eviction. |
| 3893 | // on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed |
| 3894 | // it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit. |
| 3895 | // |
| 3896 | void evictor::run_eviction(){ |
| 3897 | // |
| 3898 | // These variables will help us detect if everything in the clock is currently being accessed. |
| 3899 | // We must detect this case otherwise we will end up in an infinite loop below. |
| 3900 | // |
| 3901 | bool exited_early = false; |
| 3902 | uint32_t num_pairs_examined_without_evicting = 0; |
| 3903 | |
| 3904 | while (this->eviction_needed()) { |
| 3905 | if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) { |
| 3906 | toku_cond_broadcast(&m_flow_control_cond); |
| 3907 | } |
| 3908 | // release ev_thread_lock so that eviction may run without holding mutex |
| 3909 | toku_mutex_unlock(&m_ev_thread_lock); |
| 3910 | |
| 3911 | // first try to do an eviction from stale cachefiles |
| 3912 | bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this); |
| 3913 | if (!some_eviction_ran) { |
| 3914 | m_pl->read_list_lock(); |
| 3915 | PAIR curr_in_clock = m_pl->m_clock_head; |
| 3916 | // if nothing to evict, we need to exit |
| 3917 | if (!curr_in_clock) { |
| 3918 | m_pl->read_list_unlock(); |
| 3919 | toku_mutex_lock(&m_ev_thread_lock); |
| 3920 | exited_early = true; |
| 3921 | goto exit; |
| 3922 | } |
| 3923 | if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) { |
| 3924 | // we have a cycle where everything in the clock is in use |
| 3925 | // do not return an error |
| 3926 | // just let memory be overfull |
| 3927 | m_pl->read_list_unlock(); |
| 3928 | toku_mutex_lock(&m_ev_thread_lock); |
| 3929 | exited_early = true; |
| 3930 | goto exit; |
| 3931 | } |
| 3932 | bool eviction_run = run_eviction_on_pair(curr_in_clock); |
| 3933 | if (eviction_run) { |
| 3934 | // reset the count |
| 3935 | num_pairs_examined_without_evicting = 0; |
| 3936 | } |
| 3937 | else { |
| 3938 | num_pairs_examined_without_evicting++; |
| 3939 | } |
| 3940 | // at this point, either curr_in_clock is still in the list because it has not been fully evicted, |
| 3941 | // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted |
| 3942 | // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock |
| 3943 | // modified ct->m_clock_head |
| 3944 | if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) { |
| 3945 | m_pl->m_clock_head = m_pl->m_clock_head->clock_next; |
| 3946 | } |
| 3947 | m_pl->read_list_unlock(); |
| 3948 | } |
| 3949 | toku_mutex_lock(&m_ev_thread_lock); |
| 3950 | } |
| 3951 | |
| 3952 | exit: |
| 3953 | if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) { |
| 3954 | toku_cond_broadcast(&m_flow_control_cond); |
| 3955 | } |
| 3956 | return; |
| 3957 | } |
| 3958 | |
| 3959 | // |
| 3960 | // NOTE: Cachetable lock held on entry. |
| 3961 | // Runs eviction on the given PAIR. This may be a |
| 3962 | // partial eviction or full eviction. |
| 3963 | // |
| 3964 | // on entry, pair mutex is NOT held, but pair list's read list lock |
| 3965 | // IS held |
| 3966 | // on exit, the same conditions must apply |
| 3967 | // |
| 3968 | bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { |
| 3969 | uint32_t n_in_table; |
| 3970 | int64_t size_current; |
| 3971 | bool ret_val = false; |
| 3972 | // function meant to be called on PAIR that is not being accessed right now |
| 3973 | CACHEFILE cf = curr_in_clock->cachefile; |
| 3974 | int r = bjm_add_background_job(cf->bjm); |
| 3975 | if (r) { |
| 3976 | goto exit; |
| 3977 | } |
| 3978 | pair_lock(curr_in_clock); |
| 3979 | // these are the circumstances under which we don't run eviction on a pair: |
| 3980 | // - if other users are waiting on the lock |
| 3981 | // - if the PAIR is referenced by users |
| 3982 | // - if the PAIR's disk_nb_mutex is in use, implying that it is |
| 3983 | // undergoing a checkpoint |
| 3984 | if (curr_in_clock->value_rwlock.users() || |
| 3985 | curr_in_clock->refcount > 0 || |
| 3986 | nb_mutex_users(&curr_in_clock->disk_nb_mutex)) |
| 3987 | { |
| 3988 | pair_unlock(curr_in_clock); |
| 3989 | bjm_remove_background_job(cf->bjm); |
| 3990 | goto exit; |
| 3991 | } |
| 3992 | |
| 3993 | // extract and use these values so that we don't risk them changing |
| 3994 | // out from underneath us in calculations below. |
| 3995 | n_in_table = m_pl->m_n_in_table; |
| 3996 | size_current = m_size_current; |
| 3997 | |
| 3998 | // now that we have the pair mutex we care about, we can |
| 3999 | // release the read list lock and reacquire it at the end of the function |
| 4000 | m_pl->read_list_unlock(); |
| 4001 | ret_val = true; |
| 4002 | if (curr_in_clock->count > 0) { |
| 4003 | toku::context pe_ctx(CTX_PARTIAL_EVICTION); |
| 4004 | |
| 4005 | uint32_t curr_size = curr_in_clock->attr.size; |
| 4006 | // if the size of this PAIR is greater than the average size of PAIRs |
| 4007 | // in the cachetable, then decrement it, otherwise, decrement |
| 4008 | // probabilistically |
| 4009 | if (curr_size*n_in_table >= size_current) { |
| 4010 | curr_in_clock->count--; |
| 4011 | } else { |
| 4012 | // generate a random number between 0 and 2^16 |
| 4013 | assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows |
| 4014 | int32_t rnd = myrandom_r(&m_random_data) % (1<<16); |
| 4015 | // The if-statement below will be true with probability of |
| 4016 | // curr_size/(average size of PAIR in cachetable) |
| 4017 | // Here is how the math is done: |
| 4018 | // average_size = size_current/n_in_table |
| 4019 | // curr_size/average_size = curr_size*n_in_table/size_current |
| 4020 | // we evaluate if a random number from 0 to 2^16 is less than |
| 4021 | // than curr_size/average_size * 2^16. So, our if-clause should be |
| 4022 | // if (2^16*curr_size/average_size > rnd) |
| 4023 | // this evaluates to: |
| 4024 | // if (2^16*curr_size*n_in_table/size_current > rnd) |
| 4025 | // by multiplying each side of the equation by size_current, we get |
| 4026 | // if (2^16*curr_size*n_in_table > rnd*size_current) |
| 4027 | // and dividing each side by 2^16, |
| 4028 | // we get the if-clause below |
| 4029 | // |
| 4030 | if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) { |
| 4031 | curr_in_clock->count--; |
| 4032 | } |
| 4033 | } |
| 4034 | |
| 4035 | if (m_enable_partial_eviction) { |
| 4036 | // call the partial eviction callback |
| 4037 | curr_in_clock->value_rwlock.write_lock(true); |
| 4038 | |
| 4039 | void *value = curr_in_clock->value_data; |
| 4040 | void* disk_data = curr_in_clock->disk_data; |
| 4041 | void * = curr_in_clock->write_extraargs; |
| 4042 | enum partial_eviction_cost cost; |
| 4043 | long bytes_freed_estimate = 0; |
| 4044 | curr_in_clock->pe_est_callback(value, disk_data, |
| 4045 | &bytes_freed_estimate, &cost, |
| 4046 | write_extraargs); |
| 4047 | if (cost == PE_CHEAP) { |
| 4048 | pair_unlock(curr_in_clock); |
| 4049 | curr_in_clock->size_evicting_estimate = 0; |
| 4050 | this->do_partial_eviction(curr_in_clock); |
| 4051 | bjm_remove_background_job(cf->bjm); |
| 4052 | } else if (cost == PE_EXPENSIVE) { |
| 4053 | // only bother running an expensive partial eviction |
| 4054 | // if it is expected to free space |
| 4055 | if (bytes_freed_estimate > 0) { |
| 4056 | pair_unlock(curr_in_clock); |
| 4057 | curr_in_clock->size_evicting_estimate = bytes_freed_estimate; |
| 4058 | toku_mutex_lock(&m_ev_thread_lock); |
| 4059 | m_size_evicting += bytes_freed_estimate; |
| 4060 | toku_mutex_unlock(&m_ev_thread_lock); |
| 4061 | toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction, |
| 4062 | curr_in_clock); |
| 4063 | } else { |
| 4064 | curr_in_clock->value_rwlock.write_unlock(); |
| 4065 | pair_unlock(curr_in_clock); |
| 4066 | bjm_remove_background_job(cf->bjm); |
| 4067 | } |
| 4068 | } else { |
| 4069 | assert(false); |
| 4070 | } |
| 4071 | } else { |
| 4072 | pair_unlock(curr_in_clock); |
| 4073 | bjm_remove_background_job(cf->bjm); |
| 4074 | } |
| 4075 | } else { |
| 4076 | toku::context pe_ctx(CTX_FULL_EVICTION); |
| 4077 | |
| 4078 | // responsibility of try_evict_pair to eventually remove background job |
| 4079 | // pair's mutex is still grabbed here |
| 4080 | this->try_evict_pair(curr_in_clock); |
| 4081 | } |
| 4082 | // regrab the read list lock, because the caller assumes |
| 4083 | // that it is held. The contract requires this. |
| 4084 | m_pl->read_list_lock(); |
| 4085 | exit: |
| 4086 | return ret_val; |
| 4087 | } |
| 4088 | |
| 4089 | struct { |
| 4090 | (evictor *e, PAIR p) : |
| 4091 | ev(e), pair(p) { |
| 4092 | } |
| 4093 | evictor *; |
| 4094 | PAIR ; |
| 4095 | }; |
| 4096 | |
| 4097 | static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *) { |
| 4098 | struct pair_unpin_with_new_attr_extra *info = |
| 4099 | reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra); |
| 4100 | PAIR p = info->pair; |
| 4101 | evictor *ev = info->ev; |
| 4102 | |
| 4103 | // change the attr in the evictor, then update the value in the pair |
| 4104 | ev->change_pair_attr(p->attr, new_attr); |
| 4105 | p->attr = new_attr; |
| 4106 | |
| 4107 | // unpin |
| 4108 | pair_lock(p); |
| 4109 | p->value_rwlock.write_unlock(); |
| 4110 | pair_unlock(p); |
| 4111 | } |
| 4112 | |
| 4113 | // |
| 4114 | // on entry and exit, pair's mutex is not held |
| 4115 | // on exit, PAIR is unpinned |
| 4116 | // |
| 4117 | void evictor::do_partial_eviction(PAIR p) { |
| 4118 | // Copy the old attr |
| 4119 | PAIR_ATTR old_attr = p->attr; |
| 4120 | long long size_evicting_estimate = p->size_evicting_estimate; |
| 4121 | |
| 4122 | struct pair_unpin_with_new_attr_extra (this, p); |
| 4123 | p->pe_callback(p->value_data, old_attr, p->write_extraargs, |
| 4124 | // passed as the finalize continuation, which allows the |
| 4125 | // pe_callback to unpin the node before doing expensive cleanup |
| 4126 | pair_unpin_with_new_attr, &extra); |
| 4127 | |
| 4128 | // now that the pe_callback (and its pair_unpin_with_new_attr continuation) |
| 4129 | // have finished, we can safely decrease size_evicting |
| 4130 | this->decrease_size_evicting(size_evicting_estimate); |
| 4131 | } |
| 4132 | |
| 4133 | // |
| 4134 | // CT lock held on entry |
| 4135 | // background job has been added for p->cachefile on entry |
| 4136 | // responsibility of this function to make sure that background job is removed |
| 4137 | // |
| 4138 | // on entry, pair's mutex is held, on exit, the pair's mutex is NOT held |
| 4139 | // |
| 4140 | void evictor::try_evict_pair(PAIR p) { |
| 4141 | CACHEFILE cf = p->cachefile; |
| 4142 | // evictions without a write or unpinned pair's that are clean |
| 4143 | // can be run in the current thread |
| 4144 | |
| 4145 | // the only caller, run_eviction_on_pair, should call this function |
| 4146 | // only if no one else is trying to use it |
| 4147 | assert(!p->value_rwlock.users()); |
| 4148 | p->value_rwlock.write_lock(true); |
| 4149 | // if the PAIR is dirty, the running eviction requires writing the |
| 4150 | // PAIR out. if the disk_nb_mutex is grabbed, then running |
| 4151 | // eviction requires waiting for the disk_nb_mutex to become available, |
| 4152 | // which may be expensive. Hence, if either is true, we |
| 4153 | // do the eviction on a writer thread |
| 4154 | if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) { |
| 4155 | p->size_evicting_estimate = 0; |
| 4156 | // |
| 4157 | // This method will unpin PAIR and release PAIR mutex |
| 4158 | // |
| 4159 | // because the PAIR is not dirty, we can safely pass |
| 4160 | // false for the for_checkpoint parameter |
| 4161 | this->evict_pair(p, false); |
| 4162 | bjm_remove_background_job(cf->bjm); |
| 4163 | } |
| 4164 | else { |
| 4165 | pair_unlock(p); |
| 4166 | toku_mutex_lock(&m_ev_thread_lock); |
| 4167 | assert(m_size_evicting >= 0); |
| 4168 | p->size_evicting_estimate = p->attr.size; |
| 4169 | m_size_evicting += p->size_evicting_estimate; |
| 4170 | assert(m_size_evicting >= 0); |
| 4171 | toku_mutex_unlock(&m_ev_thread_lock); |
| 4172 | toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p); |
| 4173 | } |
| 4174 | } |
| 4175 | |
| 4176 | // |
| 4177 | // Requires: This thread must hold the write lock (nb_mutex) for the pair. |
| 4178 | // The pair's mutex (p->mutex) is also held. |
| 4179 | // on exit, neither is held |
| 4180 | // |
| 4181 | void evictor::evict_pair(PAIR p, bool for_checkpoint) { |
| 4182 | if (p->dirty) { |
| 4183 | pair_unlock(p); |
| 4184 | cachetable_write_locked_pair(this, p, for_checkpoint); |
| 4185 | pair_lock(p); |
| 4186 | } |
| 4187 | // one thing we can do here is extract the size_evicting estimate, |
| 4188 | // have decrease_size_evicting take the estimate and not the pair, |
| 4189 | // and do this work after we have called |
| 4190 | // cachetable_maybe_remove_and_free_pair |
| 4191 | this->decrease_size_evicting(p->size_evicting_estimate); |
| 4192 | // if we are to remove this pair, we need the write list lock, |
| 4193 | // to get it in a way that avoids deadlocks, we must first release |
| 4194 | // the pair's mutex, then grab the write list lock, then regrab the |
| 4195 | // pair's mutex. The pair cannot go anywhere because |
| 4196 | // the pair is still pinned |
| 4197 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
| 4198 | pair_unlock(p); |
| 4199 | m_pl->write_list_lock(); |
| 4200 | pair_lock(p); |
| 4201 | p->value_rwlock.write_unlock(); |
| 4202 | nb_mutex_unlock(&p->disk_nb_mutex); |
| 4203 | // at this point, we have the pair list's write list lock |
| 4204 | // and we have the pair's mutex (p->mutex) held |
| 4205 | |
| 4206 | // this ensures that a clone running in the background first completes |
| 4207 | bool removed = false; |
| 4208 | if (p->value_rwlock.users() == 0 && p->refcount == 0) { |
| 4209 | // assumption is that if we are about to remove the pair |
| 4210 | // that no one has grabbed the disk_nb_mutex, |
| 4211 | // and that there is no cloned_value_data, because |
| 4212 | // no one is writing a cloned value out. |
| 4213 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
| 4214 | assert(p->cloned_value_data == NULL); |
| 4215 | cachetable_remove_pair(m_pl, this, p); |
| 4216 | removed = true; |
| 4217 | } |
| 4218 | pair_unlock(p); |
| 4219 | m_pl->write_list_unlock(); |
| 4220 | // do not want to hold the write list lock while freeing a pair |
| 4221 | if (removed) { |
| 4222 | cachetable_free_pair(p); |
| 4223 | } |
| 4224 | } |
| 4225 | |
| 4226 | // |
| 4227 | // this function handles the responsibilities for writer threads when they |
| 4228 | // decrease size_evicting. The responsibilities are: |
| 4229 | // - decrease m_size_evicting in a thread safe manner |
| 4230 | // - in some circumstances, signal the eviction thread |
| 4231 | // |
| 4232 | void evictor::decrease_size_evicting(long size_evicting_estimate) { |
| 4233 | if (size_evicting_estimate > 0) { |
| 4234 | toku_mutex_lock(&m_ev_thread_lock); |
| 4235 | int64_t buffer = m_high_size_hysteresis - m_low_size_watermark; |
| 4236 | // if size_evicting is transitioning from greater than buffer to below buffer, and |
| 4237 | // some client threads are sleeping, we need to wake up the eviction thread. |
| 4238 | // Here is why. In this scenario, we are in one of two cases: |
| 4239 | // - size_current - size_evicting < low_size_watermark |
| 4240 | // If this is true, then size_current < high_size_hysteresis, which |
| 4241 | // means we need to wake up sleeping clients |
| 4242 | // - size_current - size_evicting > low_size_watermark, |
| 4243 | // which means more evictions must be run. |
| 4244 | // The consequences of both cases are the responsibility |
| 4245 | // of the eviction thread. |
| 4246 | // |
| 4247 | bool need_to_signal_ev_thread = |
| 4248 | (m_num_sleepers > 0) && |
| 4249 | !m_ev_thread_is_running && |
| 4250 | (m_size_evicting > buffer) && |
| 4251 | ((m_size_evicting - size_evicting_estimate) <= buffer); |
| 4252 | m_size_evicting -= size_evicting_estimate; |
| 4253 | assert(m_size_evicting >= 0); |
| 4254 | if (need_to_signal_ev_thread) { |
| 4255 | this->signal_eviction_thread_locked(); |
| 4256 | } |
| 4257 | toku_mutex_unlock(&m_ev_thread_lock); |
| 4258 | } |
| 4259 | } |
| 4260 | |
| 4261 | // |
| 4262 | // Wait for cache table space to become available |
| 4263 | // size_current is number of bytes currently occupied by data (referred to by pairs) |
| 4264 | // size_evicting is number of bytes queued up to be evicted |
| 4265 | // |
| 4266 | void evictor::wait_for_cache_pressure_to_subside() { |
| 4267 | uint64_t t0 = toku_current_time_microsec(); |
| 4268 | toku_mutex_lock(&m_ev_thread_lock); |
| 4269 | m_num_sleepers++; |
| 4270 | this->signal_eviction_thread_locked(); |
| 4271 | toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock); |
| 4272 | m_num_sleepers--; |
| 4273 | toku_mutex_unlock(&m_ev_thread_lock); |
| 4274 | uint64_t t1 = toku_current_time_microsec(); |
| 4275 | increment_partitioned_counter(m_wait_pressure_count, 1); |
| 4276 | uint64_t tdelta = t1 - t0; |
| 4277 | increment_partitioned_counter(m_wait_pressure_time, tdelta); |
| 4278 | if (tdelta > 1000000) { |
| 4279 | increment_partitioned_counter(m_long_wait_pressure_count, 1); |
| 4280 | increment_partitioned_counter(m_long_wait_pressure_time, tdelta); |
| 4281 | } |
| 4282 | } |
| 4283 | |
| 4284 | // |
| 4285 | // Get the status of the current estimated size of the cachetable, |
| 4286 | // and the evictor's set limit. |
| 4287 | // |
| 4288 | void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) { |
| 4289 | if (size_current_ptr) { |
| 4290 | *size_current_ptr = m_size_current; |
| 4291 | } |
| 4292 | if (size_limit_ptr) { |
| 4293 | *size_limit_ptr = m_low_size_watermark; |
| 4294 | } |
| 4295 | } |
| 4296 | |
| 4297 | // |
| 4298 | // Force the eviction thread to do some work. |
| 4299 | // |
| 4300 | // This function does not require any mutex to be held. |
| 4301 | // As a result, scheduling is not guaranteed, but that is tolerable. |
| 4302 | // |
| 4303 | void evictor::signal_eviction_thread() { |
| 4304 | toku_mutex_lock(&m_ev_thread_lock); |
| 4305 | toku_cond_signal(&m_ev_thread_cond); |
| 4306 | toku_mutex_unlock(&m_ev_thread_lock); |
| 4307 | } |
| 4308 | |
| 4309 | void evictor::signal_eviction_thread_locked() { |
| 4310 | toku_cond_signal(&m_ev_thread_cond); |
| 4311 | } |
| 4312 | |
| 4313 | // |
| 4314 | // Returns true if the cachetable is so over subscribed, that a client thread should sleep |
| 4315 | // |
| 4316 | // This function may be called in a thread-unsafe manner. Locks are not |
| 4317 | // required to read size_current. The result is that |
| 4318 | // the values may be a little off, but we think that is tolerable. |
| 4319 | // |
| 4320 | bool evictor::should_client_thread_sleep(){ |
| 4321 | return unsafe_read_size_current() > m_high_size_watermark; |
| 4322 | } |
| 4323 | |
| 4324 | // |
| 4325 | // Returns true if a sleeping client should be woken up because |
| 4326 | // the cachetable is not overly subscribed |
| 4327 | // |
| 4328 | // This function may be called in a thread-unsafe manner. Locks are not |
| 4329 | // required to read size_current. The result is that |
| 4330 | // the values may be a little off, but we think that is tolerable. |
| 4331 | // |
| 4332 | bool evictor::should_sleeping_clients_wakeup() { |
| 4333 | return unsafe_read_size_current() <= m_high_size_hysteresis; |
| 4334 | } |
| 4335 | |
| 4336 | // |
| 4337 | // Returns true if a client thread should try to wake up the eviction |
| 4338 | // thread because the client thread has noticed too much data taken |
| 4339 | // up in the cachetable. |
| 4340 | // |
| 4341 | // This function may be called in a thread-unsafe manner. Locks are not |
| 4342 | // required to read size_current or size_evicting. The result is that |
| 4343 | // the values may be a little off, but we think that is tolerable. |
| 4344 | // If the caller wants to ensure that ev_thread_is_running and size_evicting |
| 4345 | // are accurate, then the caller must hold ev_thread_lock before |
| 4346 | // calling this function. |
| 4347 | // |
| 4348 | bool evictor::should_client_wake_eviction_thread() { |
| 4349 | return |
| 4350 | !m_ev_thread_is_running && |
| 4351 | ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis); |
| 4352 | } |
| 4353 | |
| 4354 | // |
| 4355 | // Determines if eviction is needed. If the current size of |
| 4356 | // the cachetable exceeds the sum of our fixed size limit and |
| 4357 | // the amount of data currently being evicted, then eviction is needed |
| 4358 | // |
| 4359 | bool evictor::eviction_needed() { |
| 4360 | return (m_size_current - m_size_evicting) > m_low_size_watermark; |
| 4361 | } |
| 4362 | |
| 4363 | inline int64_t evictor::unsafe_read_size_current(void) const { |
| 4364 | return m_size_current; |
| 4365 | } |
| 4366 | |
| 4367 | void evictor::fill_engine_status() { |
| 4368 | CT_STATUS_VAL(CT_SIZE_CURRENT) = m_size_current; |
| 4369 | CT_STATUS_VAL(CT_SIZE_LIMIT) = m_low_size_hysteresis; |
| 4370 | CT_STATUS_VAL(CT_SIZE_WRITING) = m_size_evicting; |
| 4371 | CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf); |
| 4372 | CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf); |
| 4373 | CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback); |
| 4374 | CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure); |
| 4375 | CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data; |
| 4376 | CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count); |
| 4377 | CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time); |
| 4378 | CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count); |
| 4379 | CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time); |
| 4380 | } |
| 4381 | |
| 4382 | void evictor::set_enable_partial_eviction(bool enabled) { |
| 4383 | m_enable_partial_eviction = enabled; |
| 4384 | } |
| 4385 | |
| 4386 | bool evictor::get_enable_partial_eviction(void) const { |
| 4387 | return m_enable_partial_eviction; |
| 4388 | } |
| 4389 | |
| 4390 | //////////////////////////////////////////////////////////////////////////////// |
| 4391 | |
| 4392 | ENSURE_POD(checkpointer); |
| 4393 | |
| 4394 | // |
| 4395 | // Sets the cachetable reference in this checkpointer class, this is temporary. |
| 4396 | // |
| 4397 | int checkpointer::init(pair_list *_pl, |
| 4398 | TOKULOGGER _logger, |
| 4399 | evictor *_ev, |
| 4400 | cachefile_list *files) { |
| 4401 | m_list = _pl; |
| 4402 | m_logger = _logger; |
| 4403 | m_ev = _ev; |
| 4404 | m_cf_list = files; |
| 4405 | bjm_init(&m_checkpoint_clones_bjm); |
| 4406 | |
| 4407 | // Default is no checkpointing. |
| 4408 | m_checkpointer_cron_init = false; |
| 4409 | int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this); |
| 4410 | if (r == 0) { |
| 4411 | m_checkpointer_cron_init = true; |
| 4412 | } |
| 4413 | m_checkpointer_init = true; |
| 4414 | return r; |
| 4415 | } |
| 4416 | |
| 4417 | void checkpointer::destroy() { |
| 4418 | if (!m_checkpointer_init) { |
| 4419 | return; |
| 4420 | } |
| 4421 | if (m_checkpointer_cron_init && !this->has_been_shutdown()) { |
| 4422 | // for test code only, production code uses toku_cachetable_minicron_shutdown() |
| 4423 | int r = this->shutdown(); |
| 4424 | assert(r == 0); |
| 4425 | } |
| 4426 | bjm_destroy(m_checkpoint_clones_bjm); |
| 4427 | } |
| 4428 | |
| 4429 | // |
| 4430 | // Sets how often the checkpoint thread will run, in seconds |
| 4431 | // |
| 4432 | void checkpointer::set_checkpoint_period(uint32_t new_period) { |
| 4433 | toku_minicron_change_period(&m_checkpointer_cron, new_period*1000); |
| 4434 | } |
| 4435 | |
| 4436 | // |
| 4437 | // Sets how often the checkpoint thread will run. |
| 4438 | // |
| 4439 | uint32_t checkpointer::get_checkpoint_period() { |
| 4440 | return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron); |
| 4441 | } |
| 4442 | |
| 4443 | // |
| 4444 | // Stops the checkpoint thread. |
| 4445 | // |
| 4446 | int checkpointer::shutdown() { |
| 4447 | return toku_minicron_shutdown(&m_checkpointer_cron); |
| 4448 | } |
| 4449 | |
| 4450 | // |
| 4451 | // If checkpointing is running, this returns false. |
| 4452 | // |
| 4453 | bool checkpointer::has_been_shutdown() { |
| 4454 | return toku_minicron_has_been_shutdown(&m_checkpointer_cron); |
| 4455 | } |
| 4456 | |
| 4457 | TOKULOGGER checkpointer::get_logger() { |
| 4458 | return m_logger; |
| 4459 | } |
| 4460 | |
| 4461 | void checkpointer::increment_num_txns() { |
| 4462 | m_checkpoint_num_txns++; |
| 4463 | } |
| 4464 | |
| 4465 | struct iterate_begin_checkpoint { |
| 4466 | LSN lsn_of_checkpoint_in_progress; |
| 4467 | iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { } |
| 4468 | static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) { |
| 4469 | assert(cf->begin_checkpoint_userdata); |
| 4470 | if (cf->for_checkpoint) { |
| 4471 | cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata); |
| 4472 | } |
| 4473 | return 0; |
| 4474 | } |
| 4475 | }; |
| 4476 | |
| 4477 | // |
| 4478 | // Update the user data in any cachefiles in our checkpoint list. |
| 4479 | // |
| 4480 | void checkpointer::update_cachefiles() { |
| 4481 | struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress); |
| 4482 | int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint, |
| 4483 | iterate_begin_checkpoint::fn>(&iterate); |
| 4484 | assert_zero(r); |
| 4485 | } |
| 4486 | |
| 4487 | struct iterate_note_pin { |
| 4488 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU()) { |
| 4489 | assert(cf->note_pin_by_checkpoint); |
| 4490 | cf->note_pin_by_checkpoint(cf, cf->userdata); |
| 4491 | cf->for_checkpoint = true; |
| 4492 | return 0; |
| 4493 | } |
| 4494 | }; |
| 4495 | |
| 4496 | // |
| 4497 | // Sets up and kicks off a checkpoint. |
| 4498 | // |
| 4499 | void checkpointer::begin_checkpoint() { |
| 4500 | // 1. Initialize the accountability counters. |
| 4501 | m_checkpoint_num_txns = 0; |
| 4502 | |
| 4503 | // 2. Make list of cachefiles to be included in the checkpoint. |
| 4504 | m_cf_list->read_lock(); |
| 4505 | m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr); |
| 4506 | m_checkpoint_num_files = m_cf_list->m_active_fileid.size(); |
| 4507 | m_cf_list->read_unlock(); |
| 4508 | |
| 4509 | // 3. Create log entries for this checkpoint. |
| 4510 | if (m_logger) { |
| 4511 | this->log_begin_checkpoint(); |
| 4512 | } |
| 4513 | |
| 4514 | bjm_reset(m_checkpoint_clones_bjm); |
| 4515 | |
| 4516 | m_list->write_pending_exp_lock(); |
| 4517 | m_list->read_list_lock(); |
| 4518 | m_cf_list->read_lock(); // needed for update_cachefiles |
| 4519 | m_list->write_pending_cheap_lock(); |
| 4520 | // 4. Turn on all the relevant checkpoint pending bits. |
| 4521 | this->turn_on_pending_bits(); |
| 4522 | |
| 4523 | // 5. |
| 4524 | this->update_cachefiles(); |
| 4525 | m_list->write_pending_cheap_unlock(); |
| 4526 | m_cf_list->read_unlock(); |
| 4527 | m_list->read_list_unlock(); |
| 4528 | m_list->write_pending_exp_unlock(); |
| 4529 | } |
| 4530 | |
| 4531 | struct iterate_log_fassociate { |
| 4532 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU()) { |
| 4533 | assert(cf->log_fassociate_during_checkpoint); |
| 4534 | cf->log_fassociate_during_checkpoint(cf, cf->userdata); |
| 4535 | return 0; |
| 4536 | } |
| 4537 | }; |
| 4538 | |
| 4539 | // |
| 4540 | // Assuming the logger exists, this will write out the folloing |
| 4541 | // information to the log. |
| 4542 | // |
| 4543 | // 1. Writes the BEGIN_CHECKPOINT to the log. |
| 4544 | // 2. Writes the list of open dictionaries to the log. |
| 4545 | // 3. Writes the list of open transactions to the log. |
| 4546 | // 4. Writes the list of dicionaries that have had rollback logs suppresed. |
| 4547 | // |
| 4548 | // NOTE: This also has the side effecto of setting the LSN |
| 4549 | // of checkpoint in progress. |
| 4550 | // |
| 4551 | void checkpointer::log_begin_checkpoint() { |
| 4552 | int r = 0; |
| 4553 | |
| 4554 | // Write the BEGIN_CHECKPOINT to the log. |
| 4555 | LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed. |
| 4556 | TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger); |
| 4557 | TXNID last_xid = toku_txn_manager_get_last_xid(mgr); |
| 4558 | toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid); |
| 4559 | m_lsn_of_checkpoint_in_progress = begin_lsn; |
| 4560 | |
| 4561 | // Log the list of open dictionaries. |
| 4562 | m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr); |
| 4563 | |
| 4564 | // Write open transactions to the log. |
| 4565 | r = toku_txn_manager_iter_over_live_txns( |
| 4566 | m_logger->txn_manager, |
| 4567 | log_open_txn, |
| 4568 | this |
| 4569 | ); |
| 4570 | assert(r == 0); |
| 4571 | } |
| 4572 | |
| 4573 | // |
| 4574 | // Sets the pending bits of EVERY PAIR in the cachetable, regardless of |
| 4575 | // whether the PAIR is clean or not. It will be the responsibility of |
| 4576 | // end_checkpoint or client threads to simply clear the pending bit |
| 4577 | // if the PAIR is clean. |
| 4578 | // |
| 4579 | // On entry and exit , the pair list's read list lock is grabbed, and |
| 4580 | // both pending locks are grabbed |
| 4581 | // |
| 4582 | void checkpointer::turn_on_pending_bits() { |
| 4583 | PAIR p = NULL; |
| 4584 | uint32_t i; |
| 4585 | for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) { |
| 4586 | assert(!p->checkpoint_pending); |
| 4587 | //Only include pairs belonging to cachefiles in the checkpoint |
| 4588 | if (!p->cachefile->for_checkpoint) { |
| 4589 | continue; |
| 4590 | } |
| 4591 | // Mark everything as pending a checkpoint |
| 4592 | // |
| 4593 | // The rule for the checkpoint_pending bit is as follows: |
| 4594 | // - begin_checkpoint may set checkpoint_pending to true |
| 4595 | // even though the pair lock on the node is not held. |
| 4596 | // - any thread that wants to clear the pending bit must own |
| 4597 | // the PAIR lock. Otherwise, |
| 4598 | // we may end up clearing the pending bit before the |
| 4599 | // current lock is ever released. |
| 4600 | p->checkpoint_pending = true; |
| 4601 | if (m_list->m_pending_head) { |
| 4602 | m_list->m_pending_head->pending_prev = p; |
| 4603 | } |
| 4604 | p->pending_next = m_list->m_pending_head; |
| 4605 | p->pending_prev = NULL; |
| 4606 | m_list->m_pending_head = p; |
| 4607 | } |
| 4608 | invariant(p == m_list->m_checkpoint_head); |
| 4609 | } |
| 4610 | |
| 4611 | void checkpointer::add_background_job() { |
| 4612 | int r = bjm_add_background_job(m_checkpoint_clones_bjm); |
| 4613 | assert_zero(r); |
| 4614 | } |
| 4615 | void checkpointer::remove_background_job() { |
| 4616 | bjm_remove_background_job(m_checkpoint_clones_bjm); |
| 4617 | } |
| 4618 | |
| 4619 | void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* ) { |
| 4620 | toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE)); |
| 4621 | CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get()); |
| 4622 | |
| 4623 | this->fill_checkpoint_cfs(checkpoint_cfs); |
| 4624 | this->checkpoint_pending_pairs(); |
| 4625 | this->checkpoint_userdata(checkpoint_cfs); |
| 4626 | // For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written. |
| 4627 | if (testcallback_f) { |
| 4628 | testcallback_f(testextra); |
| 4629 | } |
| 4630 | this->log_end_checkpoint(); |
| 4631 | this->end_checkpoint_userdata(checkpoint_cfs); |
| 4632 | |
| 4633 | // Delete list of cachefiles in the checkpoint, |
| 4634 | this->remove_cachefiles(checkpoint_cfs); |
| 4635 | } |
| 4636 | |
| 4637 | struct iterate_checkpoint_cfs { |
| 4638 | CACHEFILE *checkpoint_cfs; |
| 4639 | uint32_t checkpoint_num_files; |
| 4640 | uint32_t curr_index; |
| 4641 | iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) : |
| 4642 | checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) { |
| 4643 | } |
| 4644 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) { |
| 4645 | if (cf->for_checkpoint) { |
| 4646 | assert(info->curr_index < info->checkpoint_num_files); |
| 4647 | info->checkpoint_cfs[info->curr_index] = cf; |
| 4648 | info->curr_index++; |
| 4649 | } |
| 4650 | return 0; |
| 4651 | } |
| 4652 | }; |
| 4653 | |
| 4654 | void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) { |
| 4655 | struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files); |
| 4656 | |
| 4657 | m_cf_list->read_lock(); |
| 4658 | m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate); |
| 4659 | assert(iterate.curr_index == m_checkpoint_num_files); |
| 4660 | m_cf_list->read_unlock(); |
| 4661 | } |
| 4662 | |
| 4663 | void checkpointer::checkpoint_pending_pairs() { |
| 4664 | PAIR p; |
| 4665 | m_list->read_list_lock(); |
| 4666 | while ((p = m_list->m_pending_head)!=0) { |
| 4667 | // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call. |
| 4668 | m_list->m_pending_head = m_list->m_pending_head->pending_next; |
| 4669 | m_list->pending_pairs_remove(p); |
| 4670 | // if still pending, clear the pending bit and write out the node |
| 4671 | pair_lock(p); |
| 4672 | m_list->read_list_unlock(); |
| 4673 | write_pair_for_checkpoint_thread(m_ev, p); |
| 4674 | pair_unlock(p); |
| 4675 | m_list->read_list_lock(); |
| 4676 | } |
| 4677 | assert(!m_list->m_pending_head); |
| 4678 | m_list->read_list_unlock(); |
| 4679 | bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm); |
| 4680 | } |
| 4681 | |
| 4682 | void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) { |
| 4683 | // have just written data blocks, so next write the translation and header for each open dictionary |
| 4684 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
| 4685 | CACHEFILE cf = checkpoint_cfs[i]; |
| 4686 | assert(cf->for_checkpoint); |
| 4687 | assert(cf->checkpoint_userdata); |
| 4688 | toku_cachetable_set_checkpointing_user_data_status(1); |
| 4689 | cf->checkpoint_userdata(cf, cf->fd, cf->userdata); |
| 4690 | toku_cachetable_set_checkpointing_user_data_status(0); |
| 4691 | } |
| 4692 | } |
| 4693 | |
| 4694 | void checkpointer::log_end_checkpoint() { |
| 4695 | if (m_logger) { |
| 4696 | toku_log_end_checkpoint(m_logger, NULL, |
| 4697 | 1, // want the end_checkpoint to be fsync'd |
| 4698 | m_lsn_of_checkpoint_in_progress, |
| 4699 | 0, |
| 4700 | m_checkpoint_num_files, |
| 4701 | m_checkpoint_num_txns); |
| 4702 | toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress); |
| 4703 | } |
| 4704 | } |
| 4705 | |
| 4706 | void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) { |
| 4707 | // everything has been written to file and fsynced |
| 4708 | // ... call checkpoint-end function in block translator |
| 4709 | // to free obsolete blocks on disk used by previous checkpoint |
| 4710 | //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock |
| 4711 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
| 4712 | CACHEFILE cf = checkpoint_cfs[i]; |
| 4713 | assert(cf->for_checkpoint); |
| 4714 | assert(cf->end_checkpoint_userdata); |
| 4715 | cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata); |
| 4716 | } |
| 4717 | } |
| 4718 | |
| 4719 | // |
| 4720 | // Deletes all the cachefiles in this checkpointers cachefile list. |
| 4721 | // |
| 4722 | void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) { |
| 4723 | // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile |
| 4724 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
| 4725 | CACHEFILE cf = checkpoint_cfs[i]; |
| 4726 | // Checking for function existing so that this function |
| 4727 | // can be called from cachetable tests. |
| 4728 | assert(cf->for_checkpoint); |
| 4729 | cf->for_checkpoint = false; |
| 4730 | assert(cf->note_unpin_by_checkpoint); |
| 4731 | // Clear the bit saying theis file is in the checkpoint. |
| 4732 | cf->note_unpin_by_checkpoint(cf, cf->userdata); |
| 4733 | } |
| 4734 | } |
| 4735 | |
| 4736 | |
| 4737 | //////////////////////////////////////////////////////// |
| 4738 | // |
| 4739 | // cachefiles list |
| 4740 | // |
| 4741 | static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD" ); |
| 4742 | |
| 4743 | void cachefile_list::init() { |
| 4744 | m_next_filenum_to_use.fileid = 0; |
| 4745 | m_next_hash_id_to_use = 0; |
| 4746 | toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr); |
| 4747 | m_active_filenum.create(); |
| 4748 | m_active_fileid.create(); |
| 4749 | m_stale_fileid.create(); |
| 4750 | } |
| 4751 | |
| 4752 | void cachefile_list::destroy() { |
| 4753 | m_active_filenum.destroy(); |
| 4754 | m_active_fileid.destroy(); |
| 4755 | m_stale_fileid.destroy(); |
| 4756 | toku_pthread_rwlock_destroy(&m_lock); |
| 4757 | } |
| 4758 | |
| 4759 | void cachefile_list::read_lock() { |
| 4760 | toku_pthread_rwlock_rdlock(&m_lock); |
| 4761 | } |
| 4762 | |
| 4763 | void cachefile_list::read_unlock() { |
| 4764 | toku_pthread_rwlock_rdunlock(&m_lock); |
| 4765 | } |
| 4766 | |
| 4767 | void cachefile_list::write_lock() { |
| 4768 | toku_pthread_rwlock_wrlock(&m_lock); |
| 4769 | } |
| 4770 | |
| 4771 | void cachefile_list::write_unlock() { |
| 4772 | toku_pthread_rwlock_wrunlock(&m_lock); |
| 4773 | } |
| 4774 | |
| 4775 | struct iterate_find_iname { |
| 4776 | const char *iname_in_env; |
| 4777 | CACHEFILE found_cf; |
| 4778 | iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { } |
| 4779 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) { |
| 4780 | if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) { |
| 4781 | info->found_cf = cf; |
| 4782 | return -1; |
| 4783 | } |
| 4784 | return 0; |
| 4785 | } |
| 4786 | }; |
| 4787 | |
| 4788 | int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) { |
| 4789 | struct iterate_find_iname iterate(iname_in_env); |
| 4790 | |
| 4791 | read_lock(); |
| 4792 | int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate); |
| 4793 | if (iterate.found_cf != nullptr) { |
| 4794 | assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0); |
| 4795 | *cf = iterate.found_cf; |
| 4796 | r = 0; |
| 4797 | } else { |
| 4798 | r = ENOENT; |
| 4799 | } |
| 4800 | read_unlock(); |
| 4801 | return r; |
| 4802 | } |
| 4803 | |
| 4804 | static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) { |
| 4805 | const FILENUM a = a_cf->filenum; |
| 4806 | if (a.fileid < b.fileid) { |
| 4807 | return -1; |
| 4808 | } else if (a.fileid == b.fileid) { |
| 4809 | return 0; |
| 4810 | } else { |
| 4811 | return 1; |
| 4812 | } |
| 4813 | } |
| 4814 | |
| 4815 | int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) { |
| 4816 | read_lock(); |
| 4817 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr); |
| 4818 | if (r == DB_NOTFOUND) { |
| 4819 | r = ENOENT; |
| 4820 | } else { |
| 4821 | invariant_zero(r); |
| 4822 | } |
| 4823 | read_unlock(); |
| 4824 | return r; |
| 4825 | } |
| 4826 | |
| 4827 | static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) { |
| 4828 | return toku_fileid_cmp(a_cf->fileid, b); |
| 4829 | } |
| 4830 | |
| 4831 | void cachefile_list::add_cf_unlocked(CACHEFILE cf) { |
| 4832 | int r; |
| 4833 | r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr); |
| 4834 | assert_zero(r); |
| 4835 | r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); |
| 4836 | assert_zero(r); |
| 4837 | } |
| 4838 | |
| 4839 | void cachefile_list::add_stale_cf(CACHEFILE cf) { |
| 4840 | write_lock(); |
| 4841 | int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); |
| 4842 | assert_zero(r); |
| 4843 | write_unlock(); |
| 4844 | } |
| 4845 | |
| 4846 | void cachefile_list::remove_cf(CACHEFILE cf) { |
| 4847 | write_lock(); |
| 4848 | |
| 4849 | uint32_t idx; |
| 4850 | int r; |
| 4851 | r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx); |
| 4852 | assert_zero(r); |
| 4853 | r = m_active_filenum.delete_at(idx); |
| 4854 | assert_zero(r); |
| 4855 | |
| 4856 | r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); |
| 4857 | assert_zero(r); |
| 4858 | r = m_active_fileid.delete_at(idx); |
| 4859 | assert_zero(r); |
| 4860 | |
| 4861 | write_unlock(); |
| 4862 | } |
| 4863 | |
| 4864 | void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) { |
| 4865 | uint32_t idx; |
| 4866 | int r; |
| 4867 | r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); |
| 4868 | assert_zero(r); |
| 4869 | r = m_stale_fileid.delete_at(idx); |
| 4870 | assert_zero(r); |
| 4871 | } |
| 4872 | |
| 4873 | FILENUM cachefile_list::reserve_filenum() { |
| 4874 | // taking a write lock because we are modifying next_filenum_to_use |
| 4875 | FILENUM filenum = FILENUM_NONE; |
| 4876 | write_lock(); |
| 4877 | while (1) { |
| 4878 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr); |
| 4879 | if (r == 0) { |
| 4880 | m_next_filenum_to_use.fileid++; |
| 4881 | continue; |
| 4882 | } |
| 4883 | assert(r == DB_NOTFOUND); |
| 4884 | |
| 4885 | // skip the reserved value UINT32_MAX and wrap around to zero |
| 4886 | if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) { |
| 4887 | m_next_filenum_to_use.fileid = 0; |
| 4888 | continue; |
| 4889 | } |
| 4890 | |
| 4891 | filenum = m_next_filenum_to_use; |
| 4892 | m_next_filenum_to_use.fileid++; |
| 4893 | break; |
| 4894 | } |
| 4895 | write_unlock(); |
| 4896 | return filenum; |
| 4897 | } |
| 4898 | |
| 4899 | uint32_t cachefile_list::get_new_hash_id_unlocked() { |
| 4900 | uint32_t retval = m_next_hash_id_to_use; |
| 4901 | m_next_hash_id_to_use++; |
| 4902 | return retval; |
| 4903 | } |
| 4904 | |
| 4905 | CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) { |
| 4906 | CACHEFILE cf = nullptr; |
| 4907 | int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); |
| 4908 | if (r == 0) { |
| 4909 | assert(!cf->unlink_on_close); |
| 4910 | } |
| 4911 | return cf; |
| 4912 | } |
| 4913 | |
| 4914 | CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) { |
| 4915 | CACHEFILE cf = nullptr; |
| 4916 | int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); |
| 4917 | if (r == 0) { |
| 4918 | assert(!cf->unlink_on_close); |
| 4919 | } |
| 4920 | return cf; |
| 4921 | } |
| 4922 | |
| 4923 | void cachefile_list::verify_unused_filenum(FILENUM filenum) { |
| 4924 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr); |
| 4925 | assert(r == DB_NOTFOUND); |
| 4926 | } |
| 4927 | |
| 4928 | // returns true if some eviction ran, false otherwise |
| 4929 | bool cachefile_list::evict_some_stale_pair(evictor* ev) { |
| 4930 | write_lock(); |
| 4931 | if (m_stale_fileid.size() == 0) { |
| 4932 | write_unlock(); |
| 4933 | return false; |
| 4934 | } |
| 4935 | |
| 4936 | CACHEFILE stale_cf = nullptr; |
| 4937 | int r = m_stale_fileid.fetch(0, &stale_cf); |
| 4938 | assert_zero(r); |
| 4939 | |
| 4940 | // we should not have a cf in the stale list |
| 4941 | // that does not have any pairs |
| 4942 | PAIR p = stale_cf->cf_head; |
| 4943 | paranoid_invariant(p != NULL); |
| 4944 | evict_pair_from_cachefile(p); |
| 4945 | |
| 4946 | // now that we have evicted something, |
| 4947 | // let's check if the cachefile is needed anymore |
| 4948 | // |
| 4949 | // it is not needed if the latest eviction caused |
| 4950 | // the cf_head for that cf to become null |
| 4951 | bool destroy_cf = stale_cf->cf_head == nullptr; |
| 4952 | if (destroy_cf) { |
| 4953 | remove_stale_cf_unlocked(stale_cf); |
| 4954 | } |
| 4955 | |
| 4956 | write_unlock(); |
| 4957 | |
| 4958 | ev->remove_pair_attr(p->attr); |
| 4959 | cachetable_free_pair(p); |
| 4960 | if (destroy_cf) { |
| 4961 | cachefile_destroy(stale_cf); |
| 4962 | } |
| 4963 | return true; |
| 4964 | } |
| 4965 | |
| 4966 | void cachefile_list::free_stale_data(evictor* ev) { |
| 4967 | write_lock(); |
| 4968 | while (m_stale_fileid.size() != 0) { |
| 4969 | CACHEFILE stale_cf = nullptr; |
| 4970 | int r = m_stale_fileid.fetch(0, &stale_cf); |
| 4971 | assert_zero(r); |
| 4972 | |
| 4973 | // we should not have a cf in the stale list |
| 4974 | // that does not have any pairs |
| 4975 | PAIR p = stale_cf->cf_head; |
| 4976 | paranoid_invariant(p != NULL); |
| 4977 | |
| 4978 | evict_pair_from_cachefile(p); |
| 4979 | ev->remove_pair_attr(p->attr); |
| 4980 | cachetable_free_pair(p); |
| 4981 | |
| 4982 | // now that we have evicted something, |
| 4983 | // let's check if the cachefile is needed anymore |
| 4984 | if (stale_cf->cf_head == NULL) { |
| 4985 | remove_stale_cf_unlocked(stale_cf); |
| 4986 | cachefile_destroy(stale_cf); |
| 4987 | } |
| 4988 | } |
| 4989 | write_unlock(); |
| 4990 | } |
| 4991 | |
| 4992 | void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void); |
| 4993 | void |
| 4994 | toku_cachetable_helgrind_ignore(void) { |
| 4995 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss); |
| 4996 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime); |
| 4997 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches); |
| 4998 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions); |
| 4999 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions); |
| 5000 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status); |
| 5001 | } |
| 5002 | |