1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <string.h>
41#include <time.h>
42#include <stdarg.h>
43
44#include <portability/memory.h>
45#include <portability/toku_race_tools.h>
46#include <portability/toku_atomic.h>
47#include <portability/toku_pthread.h>
48#include <portability/toku_portability.h>
49#include <portability/toku_stdlib.h>
50#include <portability/toku_time.h>
51
52#include "ft/cachetable/cachetable.h"
53#include "ft/cachetable/cachetable-internal.h"
54#include "ft/cachetable/checkpoint.h"
55#include "ft/logger/log-internal.h"
56#include "util/rwlock.h"
57#include "util/scoped_malloc.h"
58#include "util/status.h"
59#include "util/context.h"
60
61toku_instr_key *cachetable_m_mutex_key;
62toku_instr_key *cachetable_ev_thread_lock_mutex_key;
63
64toku_instr_key *cachetable_m_list_lock_key;
65toku_instr_key *cachetable_m_pending_lock_expensive_key;
66toku_instr_key *cachetable_m_pending_lock_cheap_key;
67toku_instr_key *cachetable_m_lock_key;
68
69toku_instr_key *cachetable_value_key;
70toku_instr_key *cachetable_disk_nb_rwlock_key;
71
72toku_instr_key *cachetable_p_refcount_wait_key;
73toku_instr_key *cachetable_m_flow_control_cond_key;
74toku_instr_key *cachetable_m_ev_thread_cond_key;
75
76toku_instr_key *cachetable_disk_nb_mutex_key;
77toku_instr_key *log_internal_lock_mutex_key;
78toku_instr_key *eviction_thread_key;
79
80///////////////////////////////////////////////////////////////////////////////////
81// Engine status
82//
83// Status is intended for display to humans to help understand system behavior.
84// It does not need to be perfectly thread-safe.
85
86// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
87// They were left here after engine status cleanup (#2949, rather than moved into the status struct)
88// so they are still easily available to the debugger and to save lots of typing.
89static uint64_t cachetable_miss;
90static uint64_t cachetable_misstime; // time spent waiting for disk read
91static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
92static uint64_t cachetable_evictions;
93static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
94
95
96// Note, toku_cachetable_get_status() is below, after declaration of cachetable.
97
98static void * const zero_value = nullptr;
99static PAIR_ATTR const zero_attr = {
100 .size = 0,
101 .nonleaf_size = 0,
102 .leaf_size = 0,
103 .rollback_size = 0,
104 .cache_pressure_size = 0,
105 .is_valid = true
106};
107
108
109static inline void ctpair_destroy(PAIR p) {
110 p->value_rwlock.deinit();
111 paranoid_invariant(p->refcount == 0);
112 nb_mutex_destroy(&p->disk_nb_mutex);
113 toku_cond_destroy(&p->refcount_wait);
114 toku_free(p);
115}
116
117static inline void pair_lock(PAIR p) {
118 toku_mutex_lock(p->mutex);
119}
120
121static inline void pair_unlock(PAIR p) {
122 toku_mutex_unlock(p->mutex);
123}
124
125// adds a reference to the PAIR
126// on input and output, PAIR mutex is held
127static void pair_add_ref_unlocked(PAIR p) {
128 p->refcount++;
129}
130
131// releases a reference to the PAIR
132// on input and output, PAIR mutex is held
133static void pair_release_ref_unlocked(PAIR p) {
134 paranoid_invariant(p->refcount > 0);
135 p->refcount--;
136 if (p->refcount == 0 && p->num_waiting_on_refs > 0) {
137 toku_cond_broadcast(&p->refcount_wait);
138 }
139}
140
141static void pair_wait_for_ref_release_unlocked(PAIR p) {
142 p->num_waiting_on_refs++;
143 while (p->refcount > 0) {
144 toku_cond_wait(&p->refcount_wait, p->mutex);
145 }
146 p->num_waiting_on_refs--;
147}
148
149bool toku_ctpair_is_write_locked(PAIR pair) {
150 return pair->value_rwlock.writers() == 1;
151}
152
153void
154toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
155 ct_status.init();
156 CT_STATUS_VAL(CT_MISS) = cachetable_miss;
157 CT_STATUS_VAL(CT_MISSTIME) = cachetable_misstime;
158 CT_STATUS_VAL(CT_PREFETCHES) = cachetable_prefetches;
159 CT_STATUS_VAL(CT_EVICTIONS) = cachetable_evictions;
160 CT_STATUS_VAL(CT_CLEANER_EXECUTIONS) = cleaner_executions;
161 CT_STATUS_VAL(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
162 CT_STATUS_VAL(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
163 toku_kibbutz_get_status(ct->client_kibbutz,
164 &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS),
165 &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE),
166 &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE),
167 &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE),
168 &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED),
169 &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME));
170 toku_kibbutz_get_status(ct->ct_kibbutz,
171 &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS),
172 &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE),
173 &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE),
174 &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE),
175 &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED),
176 &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME));
177 toku_kibbutz_get_status(ct->checkpointing_kibbutz,
178 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS),
179 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE),
180 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE),
181 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE),
182 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED),
183 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME));
184 ct->ev.fill_engine_status();
185 *statp = ct_status;
186}
187
188// FIXME global with no toku prefix
189void remove_background_job_from_cf(CACHEFILE cf)
190{
191 bjm_remove_background_job(cf->bjm);
192}
193
194// FIXME global with no toku prefix
195void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
196// The function f must call remove_background_job_from_cf when it completes
197{
198 int r = bjm_add_background_job(cf->bjm);
199 // if client is adding a background job, then it must be done
200 // at a time when the manager is accepting background jobs, otherwise
201 // the client is screwing up
202 assert_zero(r);
203 toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra);
204}
205
206static int
207checkpoint_thread (void *checkpointer_v)
208// Effect: If checkpoint_period>0 thn periodically run a checkpoint.
209// If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later.
210// If someone sets the checkpoint_shutdown boolean , then this thread exits.
211// This thread notices those changes by waiting on a condition variable.
212{
213 CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v);
214 int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT);
215 invariant_zero(r);
216 return r;
217}
218
219void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) {
220 ct->cp.set_checkpoint_period(new_period);
221}
222
223uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) {
224 return ct->cp.get_checkpoint_period();
225}
226
227void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) {
228 ct->cl.set_period(new_period);
229}
230
231uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) {
232 return ct->cl.get_period_unlocked();
233}
234
235void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) {
236 ct->cl.set_iterations(new_iterations);
237}
238
239uint32_t toku_get_cleaner_iterations (CACHETABLE ct) {
240 return ct->cl.get_iterations();
241}
242
243uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
244 return ct->cl.get_iterations();
245}
246
247void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) {
248 ct->ev.set_enable_partial_eviction(enabled);
249}
250
251bool toku_get_enable_partial_eviction (CACHETABLE ct) {
252 return ct->ev.get_enable_partial_eviction();
253}
254
255// reserve 25% as "unreservable". The loader cannot have it.
256#define unreservable_memory(size) ((size)/4)
257
258int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit,
259 unsigned long client_pool_threads,
260 unsigned long cachetable_pool_threads,
261 unsigned long checkpoint_pool_threads,
262 LSN UU(initial_lsn), TOKULOGGER logger) {
263 int result = 0;
264 int r;
265
266 if (size_limit == 0) {
267 size_limit = 128*1024*1024;
268 }
269
270 CACHETABLE XCALLOC(ct);
271 ct->list.init();
272 ct->cf_list.init();
273
274 int num_processors = toku_os_get_number_active_processors();
275 int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
276 r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors,
277 &ct->client_kibbutz);
278 if (r != 0) {
279 result = r;
280 goto cleanup;
281 }
282 r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors,
283 &ct->ct_kibbutz);
284 if (r != 0) {
285 result = r;
286 goto cleanup;
287 }
288 r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers,
289 &ct->checkpointing_kibbutz);
290 if (r != 0) {
291 result = r;
292 goto cleanup;
293 }
294 // must be done after creating ct_kibbutz
295 r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
296 if (r != 0) {
297 result = r;
298 goto cleanup;
299 }
300 r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
301 if (r != 0) {
302 result = r;
303 goto cleanup;
304 }
305 r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
306 if (r != 0) {
307 result = r;
308 goto cleanup;
309 }
310 ct->env_dir = toku_xstrdup(".");
311cleanup:
312 if (result == 0) {
313 *ct_result = ct;
314 } else {
315 toku_cachetable_close(&ct);
316 }
317 return result;
318}
319
320// Returns a pointer to the checkpoint contained within
321// the given cachetable.
322CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) {
323 return &ct->cp;
324}
325
326uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) {
327 uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound);
328 return reserved_memory;
329}
330
331void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) {
332 ct->ev.release_reserved_memory(reserved_memory);
333}
334
335void
336toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) {
337 toku_free(ct->env_dir);
338 ct->env_dir = toku_xstrdup(env_dir);
339}
340
341// What cachefile goes with particular iname (iname relative to env)?
342// The transaction that is adding the reference might not have a reference
343// to the ft, therefore the cachefile might be closing.
344// If closing, we want to return that it is not there, but must wait till after
345// the close has finished.
346// Once the close has finished, there must not be a cachefile with that name
347// in the cachetable.
348int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) {
349 return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf);
350}
351
352// What cachefile goes with particular fd?
353// This function can only be called if the ft is still open, so file must
354// still be open
355int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
356 return ct->cf_list.cachefile_of_filenum(filenum, cf);
357}
358
359// TEST-ONLY function
360// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
361int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) {
362 FILENUM filenum = toku_cachetable_reserve_filenum(ct);
363 bool was_open;
364 return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open);
365}
366
367// Get a unique filenum from the cachetable
368FILENUM
369toku_cachetable_reserve_filenum(CACHETABLE ct) {
370 return ct->cf_list.reserve_filenum();
371}
372
373static void create_new_cachefile(
374 CACHETABLE ct,
375 FILENUM filenum,
376 uint32_t hash_id,
377 int fd,
378 const char *fname_in_env,
379 struct fileid fileid,
380 CACHEFILE *cfptr
381 ) {
382 // File is not open. Make a new cachefile.
383 CACHEFILE newcf = NULL;
384 XCALLOC(newcf);
385 newcf->cachetable = ct;
386 newcf->hash_id = hash_id;
387 newcf->fileid = fileid;
388
389 newcf->filenum = filenum;
390 newcf->fd = fd;
391 newcf->fname_in_env = toku_xstrdup(fname_in_env);
392 bjm_init(&newcf->bjm);
393 *cfptr = newcf;
394}
395
396int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd,
397 const char *fname_in_env,
398 FILENUM filenum, bool* was_open) {
399 int r;
400 CACHEFILE newcf;
401 struct fileid fileid;
402
403 assert(filenum.fileid != FILENUM_NONE.fileid);
404 r = toku_os_get_unique_file_id(fd, &fileid);
405 if (r != 0) {
406 r = get_error_errno();
407 close(fd);
408 return r;
409 }
410 ct->cf_list.write_lock();
411 CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid);
412 if (existing_cf) {
413 *was_open = true;
414 // Reuse an existing cachefile and close the caller's fd, whose
415 // responsibility has been passed to us.
416 r = close(fd);
417 assert(r == 0);
418 *cfptr = existing_cf;
419 r = 0;
420 goto exit;
421 }
422 *was_open = false;
423 ct->cf_list.verify_unused_filenum(filenum);
424 // now let's try to find it in the stale cachefiles
425 existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid);
426 // found the stale file,
427 if (existing_cf) {
428 // fix up the fields in the cachefile
429 existing_cf->filenum = filenum;
430 existing_cf->fd = fd;
431 existing_cf->fname_in_env = toku_xstrdup(fname_in_env);
432 bjm_init(&existing_cf->bjm);
433
434 // now we need to move all the PAIRs in it back into the cachetable
435 ct->list.write_list_lock();
436 for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) {
437 pair_lock(curr_pair);
438 ct->list.add_to_cachetable_only(curr_pair);
439 pair_unlock(curr_pair);
440 }
441 ct->list.write_list_unlock();
442 // move the cachefile back to the list of active cachefiles
443 ct->cf_list.remove_stale_cf_unlocked(existing_cf);
444 ct->cf_list.add_cf_unlocked(existing_cf);
445 *cfptr = existing_cf;
446 r = 0;
447 goto exit;
448 }
449
450 create_new_cachefile(
451 ct,
452 filenum,
453 ct->cf_list.get_new_hash_id_unlocked(),
454 fd,
455 fname_in_env,
456 fileid,
457 &newcf
458 );
459
460 ct->cf_list.add_cf_unlocked(newcf);
461
462 *cfptr = newcf;
463 r = 0;
464 exit:
465 ct->cf_list.write_unlock();
466 return r;
467}
468
469static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely);
470
471//TEST_ONLY_FUNCTION
472int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) {
473 char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env);
474 int fd = open(fname_in_cwd, flags+O_BINARY, mode);
475 int r;
476 if (fd < 0) {
477 r = get_error_errno();
478 } else {
479 r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env);
480 }
481 toku_free(fname_in_cwd);
482 return r;
483}
484
485char *
486toku_cachefile_fname_in_env (CACHEFILE cf) {
487 if (cf) {
488 return cf->fname_in_env;
489 }
490 return nullptr;
491}
492
493void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) {
494 cf->fname_in_env = new_fname_in_env;
495}
496
497int
498toku_cachefile_get_fd (CACHEFILE cf) {
499 return cf->fd;
500}
501
502static void cachefile_destroy(CACHEFILE cf) {
503 if (cf->free_userdata) {
504 cf->free_userdata(cf, cf->userdata);
505 }
506 toku_free(cf);
507}
508
509void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
510 CACHEFILE cf = *cfp;
511 CACHETABLE ct = cf->cachetable;
512
513 bjm_wait_for_jobs_to_finish(cf->bjm);
514
515 // Clients should never attempt to close a cachefile that is being
516 // checkpointed. We notify clients this is happening in the
517 // note_pin_by_checkpoint callback.
518 assert(!cf->for_checkpoint);
519
520 // Flush the cachefile and remove all of its pairs from the cachetable,
521 // but keep the PAIRs linked in the cachefile. We will store the cachefile
522 // away in case it gets opened immedietely
523 //
524 // if we are unlinking on close, then we want to evict completely,
525 // otherwise, we will keep the PAIRs and cachefile around in case
526 // a subsequent open comes soon
527 cachetable_flush_cachefile(ct, cf, cf->unlink_on_close);
528
529 // Call the close userdata callback to notify the client this cachefile
530 // and its underlying file are going to be closed
531 if (cf->close_userdata) {
532 cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
533 }
534 // fsync and close the fd.
535 toku_file_fsync_without_accounting(cf->fd);
536 int r = close(cf->fd);
537 assert(r == 0);
538 cf->fd = -1;
539
540 // destroy the parts of the cachefile
541 // that do not persist across opens/closes
542 bjm_destroy(cf->bjm);
543 cf->bjm = NULL;
544
545 // remove the cf from the list of active cachefiles
546 ct->cf_list.remove_cf(cf);
547 cf->filenum = FILENUM_NONE;
548
549 // Unlink the file if the bit was set
550 if (cf->unlink_on_close) {
551 char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env);
552 r = unlink(fname_in_cwd);
553 assert_zero(r);
554 toku_free(fname_in_cwd);
555 }
556 toku_free(cf->fname_in_env);
557 cf->fname_in_env = NULL;
558
559 // we destroy the cf if the unlink bit was set or if no PAIRs exist
560 // if no PAIRs exist, there is no sense in keeping the cachefile around
561 bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL);
562 if (destroy_cf) {
563 cachefile_destroy(cf);
564 }
565 else {
566 ct->cf_list.add_stale_cf(cf);
567 }
568}
569
570// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
571// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
572// Instead we can use a bitmask on a table of size power of two.
573// This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
574static inline uint32_t rot(uint32_t x, uint32_t k) {
575 return (x<<k) | (x>>(32-k));
576}
577static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) {
578 c ^= b; c -= rot(b,14);
579 a ^= c; a -= rot(c,11);
580 b ^= a; b -= rot(a,25);
581 c ^= b; c -= rot(b,16);
582 a ^= c; a -= rot(c,4);
583 b ^= a; b -= rot(a,14);
584 c ^= b; c -= rot(b,24);
585 return c;
586}
587
588uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
589// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
590{
591 return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b);
592}
593
594#define CLOCK_SATURATION 15
595#define CLOCK_INITIAL_COUNT 3
596
597// Requires pair's mutex to be held
598static void pair_touch (PAIR p) {
599 p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION;
600}
601
602// Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held
603// Effects: the pair is removed from the LRU list and from the cachetable's hash table.
604// The size of the objects in the cachetable is adjusted by the size of the pair being
605// removed.
606static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) {
607 list->evict_completely(p);
608 ev->remove_pair_attr(p->attr);
609}
610
611static void cachetable_free_pair(PAIR p) {
612 CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
613 CACHEKEY key = p->key;
614 void *value = p->value_data;
615 void* disk_data = p->disk_data;
616 void *write_extraargs = p->write_extraargs;
617 PAIR_ATTR old_attr = p->attr;
618
619 cachetable_evictions++;
620 PAIR_ATTR new_attr = p->attr;
621 // Note that flush_callback is called with write_me false, so the only purpose of this
622 // call is to tell the ft layer to evict the node (keep_me is false).
623 // Also, because we have already removed the PAIR from the cachetable in
624 // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd
625 // for the first two parameters, as these may be invalid (#5171), so, we
626 // pass in NULL and -1, dummy values
627 flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false);
628
629 ctpair_destroy(p);
630}
631
632// assumes value_rwlock and disk_nb_mutex held on entry
633// responsibility of this function is to only write a locked PAIR to disk
634// and NOTHING else. We do not manipulate the state of the PAIR
635// of the cachetable here (with the exception of ct->size_current for clones)
636//
637// No pair_list lock should be held, and the PAIR mutex should not be held
638//
639static void cachetable_only_write_locked_data(
640 evictor* ev,
641 PAIR p,
642 bool for_checkpoint,
643 PAIR_ATTR* new_attr,
644 bool is_clone
645 )
646{
647 CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
648 CACHEFILE cachefile = p->cachefile;
649 CACHEKEY key = p->key;
650 void *value = is_clone ? p->cloned_value_data : p->value_data;
651 void *disk_data = p->disk_data;
652 void *write_extraargs = p->write_extraargs;
653 PAIR_ATTR old_attr;
654 // we do this for drd. If we are a cloned pair and only
655 // have the disk_nb_mutex, it is a race to access p->attr.
656 // Luckily, old_attr here is only used for some test applications,
657 // so inaccurate non-size fields are ok.
658 if (is_clone) {
659 old_attr = make_pair_attr(p->cloned_value_size);
660 }
661 else {
662 old_attr = p->attr;
663 }
664 bool dowrite = true;
665
666 // write callback
667 flush_callback(
668 cachefile,
669 cachefile->fd,
670 key,
671 value,
672 &disk_data,
673 write_extraargs,
674 old_attr,
675 new_attr,
676 dowrite,
677 is_clone ? false : true, // keep_me (only keep if this is not cloned pointer)
678 for_checkpoint,
679 is_clone //is_clone
680 );
681 p->disk_data = disk_data;
682 if (is_clone) {
683 p->cloned_value_data = NULL;
684 ev->remove_cloned_data_size(p->cloned_value_size);
685 p->cloned_value_size = 0;
686 }
687}
688
689
690//
691// This function writes a PAIR's value out to disk. Currently, it is called
692// by get_and_pin functions that write a PAIR out for checkpoint, by
693// evictor threads that evict dirty PAIRS, and by the checkpoint thread
694// that needs to write out a dirty node for checkpoint.
695//
696// Requires on entry for p->mutex to NOT be held, otherwise
697// calling cachetable_only_write_locked_data will be very expensive
698//
699static void cachetable_write_locked_pair(
700 evictor* ev,
701 PAIR p,
702 bool for_checkpoint
703 )
704{
705 PAIR_ATTR old_attr = p->attr;
706 PAIR_ATTR new_attr = p->attr;
707 // grabbing the disk_nb_mutex here ensures that
708 // after this point, no one is writing out a cloned value
709 // if we grab the disk_nb_mutex inside the if clause,
710 // then we may try to evict a PAIR that is in the process
711 // of having its clone be written out
712 pair_lock(p);
713 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
714 pair_unlock(p);
715 // make sure that assumption about cloned_value_data is true
716 // if we have grabbed the disk_nb_mutex, then that means that
717 // there should be no cloned value data
718 assert(p->cloned_value_data == NULL);
719 if (p->dirty) {
720 cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false);
721 //
722 // now let's update variables
723 //
724 if (new_attr.is_valid) {
725 p->attr = new_attr;
726 ev->change_pair_attr(old_attr, new_attr);
727 }
728 }
729 // the pair is no longer dirty once written
730 p->dirty = CACHETABLE_CLEAN;
731 pair_lock(p);
732 nb_mutex_unlock(&p->disk_nb_mutex);
733 pair_unlock(p);
734}
735
736// Worker thread function to writes and evicts a pair from memory to its cachefile
737static void cachetable_evicter(void* extra) {
738 PAIR p = (PAIR)extra;
739 pair_list* pl = p->list;
740 CACHEFILE cf = p->cachefile;
741 pl->read_pending_exp_lock();
742 bool for_checkpoint = p->checkpoint_pending;
743 p->checkpoint_pending = false;
744 // per the contract of evictor::evict_pair,
745 // the pair's mutex, p->mutex, must be held on entry
746 pair_lock(p);
747 p->ev->evict_pair(p, for_checkpoint);
748 pl->read_pending_exp_unlock();
749 bjm_remove_background_job(cf->bjm);
750}
751
752static void cachetable_partial_eviction(void* extra) {
753 PAIR p = (PAIR)extra;
754 CACHEFILE cf = p->cachefile;
755 p->ev->do_partial_eviction(p);
756 bjm_remove_background_job(cf->bjm);
757}
758
759void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) {
760 void* old_value = old_pair->value_data;
761 void* new_value = new_pair->value_data;
762 old_pair->value_data = new_value;
763 new_pair->value_data = old_value;
764}
765
766void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
767 // TODO: <CER> Maybe move this...
768 ct->ev.signal_eviction_thread();
769}
770
771// Initializes a pair's members.
772//
773void pair_init(PAIR p,
774 CACHEFILE cachefile,
775 CACHEKEY key,
776 void *value,
777 PAIR_ATTR attr,
778 enum cachetable_dirty dirty,
779 uint32_t fullhash,
780 CACHETABLE_WRITE_CALLBACK write_callback,
781 evictor *ev,
782 pair_list *list)
783{
784 p->cachefile = cachefile;
785 p->key = key;
786 p->value_data = value;
787 p->cloned_value_data = NULL;
788 p->cloned_value_size = 0;
789 p->disk_data = NULL;
790 p->attr = attr;
791 p->dirty = dirty;
792 p->fullhash = fullhash;
793
794 p->flush_callback = write_callback.flush_callback;
795 p->pe_callback = write_callback.pe_callback;
796 p->pe_est_callback = write_callback.pe_est_callback;
797 p->cleaner_callback = write_callback.cleaner_callback;
798 p->clone_callback = write_callback.clone_callback;
799 p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback;
800 p->write_extraargs = write_callback.write_extraargs;
801
802 p->count = 0; // <CER> Is zero the correct init value?
803 p->refcount = 0;
804 p->num_waiting_on_refs = 0;
805 toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr);
806 p->checkpoint_pending = false;
807
808 p->mutex = list->get_mutex_for_pair(fullhash);
809 assert(p->mutex);
810 p->value_rwlock.init(p->mutex
811#ifdef TOKU_MYSQL_WITH_PFS
812 ,
813 *cachetable_value_key
814#endif
815 );
816 nb_mutex_init(*cachetable_disk_nb_mutex_key,
817 *cachetable_disk_nb_rwlock_key,
818 &p->disk_nb_mutex);
819
820 p->size_evicting_estimate = 0; // <CER> Is zero the correct init value?
821
822 p->ev = ev;
823 p->list = list;
824
825 p->clock_next = p->clock_prev = NULL;
826 p->pending_next = p->pending_prev = NULL;
827 p->cf_next = p->cf_prev = NULL;
828 p->hash_chain = NULL;
829}
830
831// has ct locked on entry
832// This function MUST NOT release and reacquire the cachetable lock
833// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
834//
835// Requires pair list's write lock to be held on entry.
836// the pair's mutex must be held as wel
837//
838//
839static PAIR cachetable_insert_at(CACHETABLE ct,
840 CACHEFILE cachefile, CACHEKEY key, void *value,
841 uint32_t fullhash,
842 PAIR_ATTR attr,
843 CACHETABLE_WRITE_CALLBACK write_callback,
844 enum cachetable_dirty dirty) {
845 PAIR MALLOC(p);
846 assert(p);
847 memset(p, 0, sizeof *p);
848 pair_init(p,
849 cachefile,
850 key,
851 value,
852 attr,
853 dirty,
854 fullhash,
855 write_callback,
856 &ct->ev,
857 &ct->list
858 );
859
860 ct->list.put(p);
861 ct->ev.add_pair_attr(attr);
862 return p;
863}
864
865// on input, the write list lock must be held AND
866// the pair's mutex must be held as wel
867static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
868 ct->list.put(p);
869 ct->ev.add_pair_attr(attr);
870}
871
872
873// has ct locked on entry
874// This function MUST NOT release and reacquire the cachetable lock
875// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
876//
877// Requires pair list's write lock to be held on entry
878//
879static void cachetable_put_internal(
880 CACHEFILE cachefile,
881 PAIR p,
882 void *value,
883 PAIR_ATTR attr,
884 CACHETABLE_PUT_CALLBACK put_callback
885 )
886{
887 CACHETABLE ct = cachefile->cachetable;
888 //
889 //
890 // TODO: (Zardosht), make code run in debug only
891 //
892 //
893 //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash);
894 //invariant_null(dummy_p);
895 cachetable_insert_pair_at(ct, p, attr);
896 invariant_notnull(put_callback);
897 put_callback(p->key, value, p);
898}
899
900// Pair mutex (p->mutex) is may or may not be held on entry,
901// Holding the pair mutex on entry is not important
902// for performance or corrrectness
903// Pair is pinned on entry
904static void
905clone_pair(evictor* ev, PAIR p) {
906 PAIR_ATTR old_attr = p->attr;
907 PAIR_ATTR new_attr;
908 long clone_size = 0;
909
910 // act of cloning should be fast,
911 // not sure if we have to release
912 // and regrab the cachetable lock,
913 // but doing it for now
914 p->clone_callback(
915 p->value_data,
916 &p->cloned_value_data,
917 &clone_size,
918 &new_attr,
919 true,
920 p->write_extraargs
921 );
922
923 // now we need to do the same actions we would do
924 // if the PAIR had been written to disk
925 //
926 // because we hold the value_rwlock,
927 // it doesn't matter whether we clear
928 // the pending bit before the clone
929 // or after the clone
930 p->dirty = CACHETABLE_CLEAN;
931 if (new_attr.is_valid) {
932 p->attr = new_attr;
933 ev->change_pair_attr(old_attr, new_attr);
934 }
935 p->cloned_value_size = clone_size;
936 ev->add_cloned_data_size(p->cloned_value_size);
937}
938
939static void checkpoint_cloned_pair(void* extra) {
940 PAIR p = (PAIR)extra;
941 CACHETABLE ct = p->cachefile->cachetable;
942 PAIR_ATTR new_attr;
943 // note that pending lock is not needed here because
944 // we KNOW we are in the middle of a checkpoint
945 // and that a begin_checkpoint cannot happen
946 cachetable_only_write_locked_data(
947 p->ev,
948 p,
949 true, //for_checkpoint
950 &new_attr,
951 true //is_clone
952 );
953 pair_lock(p);
954 nb_mutex_unlock(&p->disk_nb_mutex);
955 pair_unlock(p);
956 ct->cp.remove_background_job();
957}
958
959static void
960checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
961 toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p);
962}
963
964
965//
966// Given a PAIR p with the value_rwlock altready held, do the following:
967// - If the PAIR needs to be written out to disk for checkpoint:
968// - If the PAIR is cloneable, clone the PAIR and place the work
969// of writing the PAIR on a background thread.
970// - If the PAIR is not cloneable, write the PAIR to disk for checkpoint
971// on the current thread
972//
973// On entry, pair's mutex is NOT held
974//
975static void
976write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
977{
978 if (checkpoint_pending && p->checkpoint_complete_callback) {
979 p->checkpoint_complete_callback(p->value_data);
980 }
981 if (p->dirty && checkpoint_pending) {
982 if (p->clone_callback) {
983 pair_lock(p);
984 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
985 pair_unlock(p);
986 assert(!p->cloned_value_data);
987 clone_pair(&ct->ev, p);
988 assert(p->cloned_value_data);
989 // place it on the background thread and continue
990 // responsibility of writer thread to release disk_nb_mutex
991 ct->cp.add_background_job();
992 checkpoint_cloned_pair_on_writer_thread(ct, p);
993 }
994 else {
995 // The pair is not cloneable, just write the pair to disk
996 // we already have p->value_rwlock and we just do the write in our own thread.
997 cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock
998 }
999 }
1000}
1001
1002// On entry and exit: hold the pair's mutex (p->mutex)
1003// Method: take write lock
1004// maybe write out the node
1005// Else release write lock
1006//
1007static void
1008write_pair_for_checkpoint_thread (evictor* ev, PAIR p)
1009{
1010 // Grab an exclusive lock on the pair.
1011 // If we grab an expensive lock, then other threads will return
1012 // TRY_AGAIN rather than waiting. In production, the only time
1013 // another thread will check if grabbing a lock is expensive is when
1014 // we have a clone_callback (FTNODEs), so the act of checkpointing
1015 // will be cheap. Also, much of the time we'll just be clearing
1016 // pending bits and that's definitely cheap. (see #5427)
1017 p->value_rwlock.write_lock(false);
1018 if (p->checkpoint_pending && p->checkpoint_complete_callback) {
1019 p->checkpoint_complete_callback(p->value_data);
1020 }
1021 if (p->dirty && p->checkpoint_pending) {
1022 if (p->clone_callback) {
1023 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1024 assert(!p->cloned_value_data);
1025 clone_pair(ev, p);
1026 assert(p->cloned_value_data);
1027 }
1028 else {
1029 // The pair is not cloneable, just write the pair to disk
1030 // we already have p->value_rwlock and we just do the write in our own thread.
1031 // this will grab and release disk_nb_mutex
1032 pair_unlock(p);
1033 cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock
1034 pair_lock(p);
1035 }
1036 p->checkpoint_pending = false;
1037
1038 // now release value_rwlock, before we write the PAIR out
1039 // so that the PAIR is available to client threads
1040 p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves.
1041 if (p->clone_callback) {
1042 // note that pending lock is not needed here because
1043 // we KNOW we are in the middle of a checkpoint
1044 // and that a begin_checkpoint cannot happen
1045 PAIR_ATTR attr;
1046 pair_unlock(p);
1047 cachetable_only_write_locked_data(
1048 ev,
1049 p,
1050 true, //for_checkpoint
1051 &attr,
1052 true //is_clone
1053 );
1054 pair_lock(p);
1055 nb_mutex_unlock(&p->disk_nb_mutex);
1056 }
1057 }
1058 else {
1059 //
1060 // we may clear the pending bit here because we have
1061 // both the cachetable lock and the PAIR lock.
1062 // The rule, as mentioned in toku_cachetable_begin_checkpoint,
1063 // is that to clear the bit, we must have both the PAIR lock
1064 // and the pending lock
1065 //
1066 p->checkpoint_pending = false;
1067 p->value_rwlock.write_unlock();
1068 }
1069}
1070
1071//
1072// For each PAIR associated with these CACHEFILEs and CACHEKEYs
1073// if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR
1074// to disk.
1075// We assume the PAIRs passed in have been locked by the client that made calls
1076// into the cachetable that eventually make it here.
1077//
1078static void checkpoint_dependent_pairs(
1079 CACHETABLE ct,
1080 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1081 PAIR* dependent_pairs,
1082 bool* checkpoint_pending,
1083 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1084 )
1085{
1086 for (uint32_t i =0; i < num_dependent_pairs; i++) {
1087 PAIR curr_dep_pair = dependent_pairs[i];
1088 // we need to update the dirtyness of the dependent pair,
1089 // because the client may have dirtied it while holding its lock,
1090 // and if the pair is pending a checkpoint, it needs to be written out
1091 if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY;
1092 if (checkpoint_pending[i]) {
1093 write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]);
1094 }
1095 }
1096}
1097
1098void toku_cachetable_put_with_dep_pairs(
1099 CACHEFILE cachefile,
1100 CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
1101 void *value,
1102 PAIR_ATTR attr,
1103 CACHETABLE_WRITE_CALLBACK write_callback,
1104 void *get_key_and_fullhash_extra,
1105 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1106 PAIR* dependent_pairs,
1107 enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
1108 CACHEKEY* key,
1109 uint32_t* fullhash,
1110 CACHETABLE_PUT_CALLBACK put_callback
1111 )
1112{
1113 //
1114 // need to get the key and filehash
1115 //
1116 CACHETABLE ct = cachefile->cachetable;
1117 if (ct->ev.should_client_thread_sleep()) {
1118 ct->ev.wait_for_cache_pressure_to_subside();
1119 }
1120 if (ct->ev.should_client_wake_eviction_thread()) {
1121 ct->ev.signal_eviction_thread();
1122 }
1123
1124 PAIR p = NULL;
1125 XMALLOC(p);
1126 memset(p, 0, sizeof *p);
1127
1128 ct->list.write_list_lock();
1129 get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
1130 pair_init(
1131 p,
1132 cachefile,
1133 *key,
1134 value,
1135 attr,
1136 CACHETABLE_DIRTY,
1137 *fullhash,
1138 write_callback,
1139 &ct->ev,
1140 &ct->list
1141 );
1142 pair_lock(p);
1143 p->value_rwlock.write_lock(true);
1144 cachetable_put_internal(
1145 cachefile,
1146 p,
1147 value,
1148 attr,
1149 put_callback
1150 );
1151 pair_unlock(p);
1152 bool checkpoint_pending[num_dependent_pairs];
1153 ct->list.write_pending_cheap_lock();
1154 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1155 checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1156 dependent_pairs[i]->checkpoint_pending = false;
1157 }
1158 ct->list.write_pending_cheap_unlock();
1159 ct->list.write_list_unlock();
1160
1161 //
1162 // now that we have inserted the row, let's checkpoint the
1163 // dependent nodes, if they need checkpointing
1164 //
1165 checkpoint_dependent_pairs(
1166 ct,
1167 num_dependent_pairs,
1168 dependent_pairs,
1169 checkpoint_pending,
1170 dependent_dirty
1171 );
1172}
1173
1174void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr,
1175 CACHETABLE_WRITE_CALLBACK write_callback,
1176 CACHETABLE_PUT_CALLBACK put_callback
1177 ) {
1178 CACHETABLE ct = cachefile->cachetable;
1179 if (ct->ev.should_client_thread_sleep()) {
1180 ct->ev.wait_for_cache_pressure_to_subside();
1181 }
1182 if (ct->ev.should_client_wake_eviction_thread()) {
1183 ct->ev.signal_eviction_thread();
1184 }
1185
1186 PAIR p = NULL;
1187 XMALLOC(p);
1188 memset(p, 0, sizeof *p);
1189
1190 ct->list.write_list_lock();
1191 pair_init(
1192 p,
1193 cachefile,
1194 key,
1195 value,
1196 attr,
1197 CACHETABLE_DIRTY,
1198 fullhash,
1199 write_callback,
1200 &ct->ev,
1201 &ct->list
1202 );
1203 pair_lock(p);
1204 p->value_rwlock.write_lock(true);
1205 cachetable_put_internal(
1206 cachefile,
1207 p,
1208 value,
1209 attr,
1210 put_callback
1211 );
1212 pair_unlock(p);
1213 ct->list.write_list_unlock();
1214}
1215
1216static uint64_t get_tnow(void) {
1217 struct timeval tv;
1218 int r = gettimeofday(&tv, NULL); assert(r == 0);
1219 return tv.tv_sec * 1000000ULL + tv.tv_usec;
1220}
1221
1222//
1223// cachetable lock and PAIR lock are held on entry
1224// On exit, cachetable lock is still held, but PAIR lock
1225// is either released.
1226//
1227// No locks are held on entry (besides the rwlock write lock of the PAIR)
1228//
1229static void
1230do_partial_fetch(
1231 CACHETABLE ct,
1232 CACHEFILE cachefile,
1233 PAIR p,
1234 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1235 void *read_extraargs,
1236 bool keep_pair_locked
1237 )
1238{
1239 PAIR_ATTR old_attr = p->attr;
1240 PAIR_ATTR new_attr = zero_attr;
1241 // As of Dr. No, only clean PAIRs may have pieces missing,
1242 // so we do a sanity check here.
1243 assert(!p->dirty);
1244
1245 pair_lock(p);
1246 invariant(p->value_rwlock.writers());
1247 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1248 pair_unlock(p);
1249 int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
1250 lazy_assert_zero(r);
1251 p->attr = new_attr;
1252 ct->ev.change_pair_attr(old_attr, new_attr);
1253 pair_lock(p);
1254 nb_mutex_unlock(&p->disk_nb_mutex);
1255 if (!keep_pair_locked) {
1256 p->value_rwlock.write_unlock();
1257 }
1258 pair_unlock(p);
1259}
1260
1261void toku_cachetable_pf_pinned_pair(
1262 void* value,
1263 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1264 void* read_extraargs,
1265 CACHEFILE cf,
1266 CACHEKEY key,
1267 uint32_t fullhash
1268 )
1269{
1270 PAIR_ATTR attr;
1271 PAIR p = NULL;
1272 CACHETABLE ct = cf->cachetable;
1273 ct->list.pair_lock_by_fullhash(fullhash);
1274 p = ct->list.find_pair(cf, key, fullhash);
1275 assert(p != NULL);
1276 assert(p->value_data == value);
1277 assert(p->value_rwlock.writers());
1278 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1279 pair_unlock(p);
1280
1281 int fd = cf->fd;
1282 pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
1283
1284 pair_lock(p);
1285 nb_mutex_unlock(&p->disk_nb_mutex);
1286 pair_unlock(p);
1287}
1288
1289int toku_cachetable_get_and_pin (
1290 CACHEFILE cachefile,
1291 CACHEKEY key,
1292 uint32_t fullhash,
1293 void**value,
1294 long *sizep,
1295 CACHETABLE_WRITE_CALLBACK write_callback,
1296 CACHETABLE_FETCH_CALLBACK fetch_callback,
1297 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1298 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1299 bool may_modify_value,
1300 void* read_extraargs // parameter for fetch_callback, pf_req_callback, and pf_callback
1301 )
1302{
1303 pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ;
1304 // We have separate parameters of read_extraargs and write_extraargs because
1305 // the lifetime of the two parameters are different. write_extraargs may be used
1306 // long after this function call (e.g. after a flush to disk), whereas read_extraargs
1307 // will not be used after this function returns. As a result, the caller may allocate
1308 // read_extraargs on the stack, whereas write_extraargs must be allocated
1309 // on the heap.
1310 return toku_cachetable_get_and_pin_with_dep_pairs (
1311 cachefile,
1312 key,
1313 fullhash,
1314 value,
1315 sizep,
1316 write_callback,
1317 fetch_callback,
1318 pf_req_callback,
1319 pf_callback,
1320 lock_type,
1321 read_extraargs,
1322 0, // number of dependent pairs that we may need to checkpoint
1323 NULL, // array of dependent pairs
1324 NULL // array stating dirty/cleanness of dependent pairs
1325 );
1326}
1327
1328// Read a pair from a cachefile into memory using the pair's fetch callback
1329// on entry, pair mutex (p->mutex) is NOT held, but pair is pinned
1330static void cachetable_fetch_pair(
1331 CACHETABLE ct,
1332 CACHEFILE cf,
1333 PAIR p,
1334 CACHETABLE_FETCH_CALLBACK fetch_callback,
1335 void* read_extraargs,
1336 bool keep_pair_locked
1337 )
1338{
1339 // helgrind
1340 CACHEKEY key = p->key;
1341 uint32_t fullhash = p->fullhash;
1342
1343 void *toku_value = NULL;
1344 void *disk_data = NULL;
1345 PAIR_ATTR attr;
1346
1347 // FIXME this should be enum cachetable_dirty, right?
1348 int dirty = 0;
1349
1350 pair_lock(p);
1351 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1352 pair_unlock(p);
1353
1354 int r;
1355 r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs);
1356 if (dirty) {
1357 p->dirty = CACHETABLE_DIRTY;
1358 }
1359 assert(r == 0);
1360
1361 p->value_data = toku_value;
1362 p->disk_data = disk_data;
1363 p->attr = attr;
1364 ct->ev.add_pair_attr(attr);
1365 pair_lock(p);
1366 nb_mutex_unlock(&p->disk_nb_mutex);
1367 if (!keep_pair_locked) {
1368 p->value_rwlock.write_unlock();
1369 }
1370 pair_unlock(p);
1371}
1372
1373static bool get_checkpoint_pending(PAIR p, pair_list* pl) {
1374 bool checkpoint_pending = false;
1375 pl->read_pending_cheap_lock();
1376 checkpoint_pending = p->checkpoint_pending;
1377 p->checkpoint_pending = false;
1378 pl->read_pending_cheap_unlock();
1379 return checkpoint_pending;
1380}
1381
1382static void checkpoint_pair_and_dependent_pairs(
1383 CACHETABLE ct,
1384 PAIR p,
1385 bool p_is_pending_checkpoint,
1386 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1387 PAIR* dependent_pairs,
1388 bool* dependent_pairs_pending_checkpoint,
1389 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1390 )
1391{
1392
1393 //
1394 // A checkpoint must not begin while we are checking dependent pairs or pending bits.
1395 // Here is why.
1396 //
1397 // Now that we have all of the locks on the pairs we
1398 // care about, we can take care of the necessary checkpointing.
1399 // For each pair, we simply need to write the pair if it is
1400 // pending a checkpoint. If no pair is pending a checkpoint,
1401 // then all of this work will be done with the cachetable lock held,
1402 // so we don't need to worry about a checkpoint beginning
1403 // in the middle of any operation below. If some pair
1404 // is pending a checkpoint, then the checkpoint thread
1405 // will not complete its current checkpoint until it can
1406 // successfully grab a lock on the pending pair and
1407 // remove it from its list of pairs pending a checkpoint.
1408 // This cannot be done until we release the lock
1409 // that we have, which is not done in this function.
1410 // So, the point is, it is impossible for a checkpoint
1411 // to begin while we write any of these locked pairs
1412 // for checkpoint, even though writing a pair releases
1413 // the cachetable lock.
1414 //
1415 write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint);
1416
1417 checkpoint_dependent_pairs(
1418 ct,
1419 num_dependent_pairs,
1420 dependent_pairs,
1421 dependent_pairs_pending_checkpoint,
1422 dependent_dirty
1423 );
1424}
1425
1426static void unpin_pair(PAIR p, bool read_lock_grabbed) {
1427 if (read_lock_grabbed) {
1428 p->value_rwlock.read_unlock();
1429 }
1430 else {
1431 p->value_rwlock.write_unlock();
1432 }
1433}
1434
1435
1436// on input, the pair's mutex is held,
1437// on output, the pair's mutex is not held.
1438// if true, we must try again, and pair is not pinned
1439// if false, we succeeded, the pair is pinned
1440static bool try_pin_pair(
1441 PAIR p,
1442 CACHETABLE ct,
1443 CACHEFILE cachefile,
1444 pair_lock_type lock_type,
1445 uint32_t num_dependent_pairs,
1446 PAIR* dependent_pairs,
1447 enum cachetable_dirty* dependent_dirty,
1448 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1449 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1450 void* read_extraargs,
1451 bool already_slept
1452 )
1453{
1454 bool dep_checkpoint_pending[num_dependent_pairs];
1455 bool try_again = true;
1456 bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1457 if (lock_type != PL_READ) {
1458 p->value_rwlock.write_lock(expensive);
1459 }
1460 else {
1461 p->value_rwlock.read_lock();
1462 }
1463 pair_touch(p);
1464 pair_unlock(p);
1465
1466 bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1467
1468 if (partial_fetch_required) {
1469 toku::context pf_ctx(CTX_PARTIAL_FETCH);
1470
1471 if (ct->ev.should_client_thread_sleep() && !already_slept) {
1472 pair_lock(p);
1473 unpin_pair(p, (lock_type == PL_READ));
1474 pair_unlock(p);
1475 try_again = true;
1476 goto exit;
1477 }
1478 if (ct->ev.should_client_wake_eviction_thread()) {
1479 ct->ev.signal_eviction_thread();
1480 }
1481 //
1482 // Just because the PAIR exists does necessarily mean the all the data the caller requires
1483 // is in memory. A partial fetch may be required, which is evaluated above
1484 // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock
1485 // and then call a callback to retrieve what we need
1486 //
1487 assert(partial_fetch_required);
1488 // As of Dr. No, only clean PAIRs may have pieces missing,
1489 // so we do a sanity check here.
1490 assert(!p->dirty);
1491
1492 if (lock_type == PL_READ) {
1493 pair_lock(p);
1494 p->value_rwlock.read_unlock();
1495 p->value_rwlock.write_lock(true);
1496 pair_unlock(p);
1497 }
1498 else if (lock_type == PL_WRITE_CHEAP) {
1499 pair_lock(p);
1500 p->value_rwlock.write_unlock();
1501 p->value_rwlock.write_lock(true);
1502 pair_unlock(p);
1503 }
1504
1505 partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1506 if (partial_fetch_required) {
1507 do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true);
1508 }
1509 if (lock_type == PL_READ) {
1510 //
1511 // TODO: Zardosht, somehow ensure that a partial eviction cannot happen
1512 // between these two calls
1513 //
1514 pair_lock(p);
1515 p->value_rwlock.write_unlock();
1516 p->value_rwlock.read_lock();
1517 pair_unlock(p);
1518 }
1519 else if (lock_type == PL_WRITE_CHEAP) {
1520 pair_lock(p);
1521 p->value_rwlock.write_unlock();
1522 p->value_rwlock.write_lock(false);
1523 pair_unlock(p);
1524 }
1525 // small hack here for #5439,
1526 // for queries, pf_req_callback does some work for the caller,
1527 // that information may be out of date after a write_unlock
1528 // followed by a relock, so we do it again.
1529 bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1530 assert(!pf_required);
1531 }
1532
1533 if (lock_type != PL_READ) {
1534 ct->list.read_pending_cheap_lock();
1535 bool p_checkpoint_pending = p->checkpoint_pending;
1536 p->checkpoint_pending = false;
1537 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1538 dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1539 dependent_pairs[i]->checkpoint_pending = false;
1540 }
1541 ct->list.read_pending_cheap_unlock();
1542 checkpoint_pair_and_dependent_pairs(
1543 ct,
1544 p,
1545 p_checkpoint_pending,
1546 num_dependent_pairs,
1547 dependent_pairs,
1548 dep_checkpoint_pending,
1549 dependent_dirty
1550 );
1551 }
1552
1553 try_again = false;
1554exit:
1555 return try_again;
1556}
1557
1558int toku_cachetable_get_and_pin_with_dep_pairs (
1559 CACHEFILE cachefile,
1560 CACHEKEY key,
1561 uint32_t fullhash,
1562 void**value,
1563 long *sizep,
1564 CACHETABLE_WRITE_CALLBACK write_callback,
1565 CACHETABLE_FETCH_CALLBACK fetch_callback,
1566 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1567 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1568 pair_lock_type lock_type,
1569 void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
1570 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1571 PAIR* dependent_pairs,
1572 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1573 )
1574// See cachetable/cachetable.h
1575{
1576 CACHETABLE ct = cachefile->cachetable;
1577 bool wait = false;
1578 bool already_slept = false;
1579 bool dep_checkpoint_pending[num_dependent_pairs];
1580
1581 //
1582 // If in the process of pinning the node we add data to the cachetable via a partial fetch
1583 // or a full fetch, we may need to first sleep because there is too much data in the
1584 // cachetable. In those cases, we set the bool wait to true and goto try_again, so that
1585 // we can do our sleep and then restart the function.
1586 //
1587beginning:
1588 if (wait) {
1589 // We shouldn't be holding the read list lock while
1590 // waiting for the evictor to remove pairs.
1591 already_slept = true;
1592 ct->ev.wait_for_cache_pressure_to_subside();
1593 }
1594
1595 ct->list.pair_lock_by_fullhash(fullhash);
1596 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1597 if (p) {
1598 // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash)
1599 // on exit, does not hold p->mutex
1600 bool try_again = try_pin_pair(
1601 p,
1602 ct,
1603 cachefile,
1604 lock_type,
1605 num_dependent_pairs,
1606 dependent_pairs,
1607 dependent_dirty,
1608 pf_req_callback,
1609 pf_callback,
1610 read_extraargs,
1611 already_slept
1612 );
1613 if (try_again) {
1614 wait = true;
1615 goto beginning;
1616 }
1617 else {
1618 goto got_value;
1619 }
1620 }
1621 else {
1622 toku::context fetch_ctx(CTX_FULL_FETCH);
1623
1624 ct->list.pair_unlock_by_fullhash(fullhash);
1625 // we only want to sleep once per call to get_and_pin. If we have already
1626 // slept and there is still cache pressure, then we might as
1627 // well just complete the call, because the sleep did not help
1628 // By sleeping only once per get_and_pin, we prevent starvation and ensure
1629 // that we make progress (however slow) on each thread, which allows
1630 // assumptions of the form 'x will eventually happen'.
1631 // This happens in extreme scenarios.
1632 if (ct->ev.should_client_thread_sleep() && !already_slept) {
1633 wait = true;
1634 goto beginning;
1635 }
1636 if (ct->ev.should_client_wake_eviction_thread()) {
1637 ct->ev.signal_eviction_thread();
1638 }
1639 // Since the pair was not found, we need the write list
1640 // lock to add it. So, we have to release the read list lock
1641 // first.
1642 ct->list.write_list_lock();
1643 ct->list.pair_lock_by_fullhash(fullhash);
1644 p = ct->list.find_pair(cachefile, key, fullhash);
1645 if (p != NULL) {
1646 ct->list.write_list_unlock();
1647 // on entry, holds p->mutex,
1648 // on exit, does not hold p->mutex
1649 bool try_again = try_pin_pair(
1650 p,
1651 ct,
1652 cachefile,
1653 lock_type,
1654 num_dependent_pairs,
1655 dependent_pairs,
1656 dependent_dirty,
1657 pf_req_callback,
1658 pf_callback,
1659 read_extraargs,
1660 already_slept
1661 );
1662 if (try_again) {
1663 wait = true;
1664 goto beginning;
1665 }
1666 else {
1667 goto got_value;
1668 }
1669 }
1670 assert(p == NULL);
1671
1672 // Insert a PAIR into the cachetable
1673 // NOTE: At this point we still have the write list lock held.
1674 p = cachetable_insert_at(
1675 ct,
1676 cachefile,
1677 key,
1678 zero_value,
1679 fullhash,
1680 zero_attr,
1681 write_callback,
1682 CACHETABLE_CLEAN
1683 );
1684 invariant_notnull(p);
1685
1686 // Pin the pair.
1687 p->value_rwlock.write_lock(true);
1688 pair_unlock(p);
1689
1690
1691 if (lock_type != PL_READ) {
1692 ct->list.read_pending_cheap_lock();
1693 invariant(!p->checkpoint_pending);
1694 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1695 dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1696 dependent_pairs[i]->checkpoint_pending = false;
1697 }
1698 ct->list.read_pending_cheap_unlock();
1699 }
1700 // We should release the lock before we perform
1701 // these expensive operations.
1702 ct->list.write_list_unlock();
1703
1704 if (lock_type != PL_READ) {
1705 checkpoint_dependent_pairs(
1706 ct,
1707 num_dependent_pairs,
1708 dependent_pairs,
1709 dep_checkpoint_pending,
1710 dependent_dirty
1711 );
1712 }
1713 uint64_t t0 = get_tnow();
1714
1715 // Retrieve the value of the PAIR from disk.
1716 // The pair being fetched will be marked as pending if a checkpoint happens during the
1717 // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean.
1718 cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true);
1719 cachetable_miss++;
1720 cachetable_misstime += get_tnow() - t0;
1721
1722 // If the lock_type requested was a PL_READ, we downgrade to PL_READ,
1723 // but if the request was for a PL_WRITE_CHEAP, we don't bother
1724 // downgrading, because we would have to possibly resolve the
1725 // checkpointing again, and that would just make this function even
1726 // messier.
1727 //
1728 // TODO(yoni): in case of PL_WRITE_CHEAP, write and use
1729 // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better)
1730 // to downgrade from an expensive write lock to a cheap one
1731 if (lock_type == PL_READ) {
1732 pair_lock(p);
1733 p->value_rwlock.write_unlock();
1734 p->value_rwlock.read_lock();
1735 pair_unlock(p);
1736 // small hack here for #5439,
1737 // for queries, pf_req_callback does some work for the caller,
1738 // that information may be out of date after a write_unlock
1739 // followed by a read_lock, so we do it again.
1740 bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1741 assert(!pf_required);
1742 }
1743 goto got_value;
1744 }
1745got_value:
1746 *value = p->value_data;
1747 if (sizep) *sizep = p->attr.size;
1748 return 0;
1749}
1750
1751// Lookup a key in the cachetable. If it is found and it is not being written, then
1752// acquire a read lock on the pair, update the LRU list, and return sucess.
1753//
1754// However, if the page is clean or has checkpoint pending, don't return success.
1755// This will minimize the number of dirty nodes.
1756// Rationale: maybe_get_and_pin is used when the system has an alternative to modifying a node.
1757// In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O.
1758// For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify
1759// the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit.
1760// Similarly, if the checkpoint is actually pending, we don't want to block on it.
1761int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1762 CACHETABLE ct = cachefile->cachetable;
1763 int r = -1;
1764 ct->list.pair_lock_by_fullhash(fullhash);
1765 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1766 if (p) {
1767 const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1768 bool got_lock = false;
1769 switch (lock_type) {
1770 case PL_READ:
1771 if (p->value_rwlock.try_read_lock()) {
1772 got_lock = p->dirty;
1773
1774 if (!got_lock) {
1775 p->value_rwlock.read_unlock();
1776 }
1777 }
1778 break;
1779 case PL_WRITE_CHEAP:
1780 case PL_WRITE_EXPENSIVE:
1781 if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1782 // we got the lock fast, so continue
1783 ct->list.read_pending_cheap_lock();
1784
1785 // if pending a checkpoint, then we don't want to return
1786 // the value to the user, because we are responsible for
1787 // handling the checkpointing, which we do not want to do,
1788 // because it is expensive
1789 got_lock = p->dirty && !p->checkpoint_pending;
1790
1791 ct->list.read_pending_cheap_unlock();
1792 if (!got_lock) {
1793 p->value_rwlock.write_unlock();
1794 }
1795 }
1796 break;
1797 }
1798 if (got_lock) {
1799 pair_touch(p);
1800 *value = p->value_data;
1801 r = 0;
1802 }
1803 }
1804 ct->list.pair_unlock_by_fullhash(fullhash);
1805 return r;
1806}
1807
1808//Used by flusher threads to possibly pin child on client thread if pinning is cheap
1809//Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless).
1810//All other conditions remain the same.
1811int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1812 CACHETABLE ct = cachefile->cachetable;
1813 int r = -1;
1814 ct->list.pair_lock_by_fullhash(fullhash);
1815 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1816 if (p) {
1817 const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1818 bool got_lock = false;
1819 switch (lock_type) {
1820 case PL_READ:
1821 if (p->value_rwlock.try_read_lock()) {
1822 got_lock = true;
1823 } else if (!p->value_rwlock.read_lock_is_expensive()) {
1824 p->value_rwlock.write_lock(lock_is_expensive);
1825 got_lock = true;
1826 }
1827 if (got_lock) {
1828 pair_touch(p);
1829 }
1830 pair_unlock(p);
1831 break;
1832 case PL_WRITE_CHEAP:
1833 case PL_WRITE_EXPENSIVE:
1834 if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1835 got_lock = true;
1836 } else if (!p->value_rwlock.write_lock_is_expensive()) {
1837 p->value_rwlock.write_lock(lock_is_expensive);
1838 got_lock = true;
1839 }
1840 if (got_lock) {
1841 pair_touch(p);
1842 }
1843 pair_unlock(p);
1844 if (got_lock) {
1845 bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
1846 write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
1847 }
1848 break;
1849 }
1850 if (got_lock) {
1851 *value = p->value_data;
1852 r = 0;
1853 }
1854 } else {
1855 ct->list.pair_unlock_by_fullhash(fullhash);
1856 }
1857 return r;
1858}
1859
1860//
1861// internal function to unpin a PAIR.
1862// As of Clayface, this is may be called in two ways:
1863// - with flush false
1864// - with flush true
1865// The first is for when this is run during run_unlockers in
1866// toku_cachetable_get_and_pin_nonblocking, the second is during
1867// normal operations. Only during normal operations do we want to possibly
1868// induce evictions or sleep.
1869//
1870static int
1871cachetable_unpin_internal(
1872 CACHEFILE cachefile,
1873 PAIR p,
1874 enum cachetable_dirty dirty,
1875 PAIR_ATTR attr,
1876 bool flush
1877 )
1878{
1879 invariant_notnull(p);
1880
1881 CACHETABLE ct = cachefile->cachetable;
1882 bool added_data_to_cachetable = false;
1883
1884 // hack for #3969, only exists in case where we run unlockers
1885 pair_lock(p);
1886 PAIR_ATTR old_attr = p->attr;
1887 PAIR_ATTR new_attr = attr;
1888 if (dirty) {
1889 p->dirty = CACHETABLE_DIRTY;
1890 }
1891 if (attr.is_valid) {
1892 p->attr = attr;
1893 }
1894 bool read_lock_grabbed = p->value_rwlock.readers() != 0;
1895 unpin_pair(p, read_lock_grabbed);
1896 pair_unlock(p);
1897
1898 if (attr.is_valid) {
1899 if (new_attr.size > old_attr.size) {
1900 added_data_to_cachetable = true;
1901 }
1902 ct->ev.change_pair_attr(old_attr, new_attr);
1903 }
1904
1905 // see comments above this function to understand this code
1906 if (flush && added_data_to_cachetable) {
1907 if (ct->ev.should_client_thread_sleep()) {
1908 ct->ev.wait_for_cache_pressure_to_subside();
1909 }
1910 if (ct->ev.should_client_wake_eviction_thread()) {
1911 ct->ev.signal_eviction_thread();
1912 }
1913 }
1914 return 0;
1915}
1916
1917int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1918 return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
1919}
1920int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1921 return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
1922}
1923
1924static void
1925run_unlockers (UNLOCKERS unlockers) {
1926 while (unlockers) {
1927 assert(unlockers->locked);
1928 unlockers->locked = false;
1929 unlockers->f(unlockers->extra);
1930 unlockers=unlockers->next;
1931 }
1932}
1933
1934//
1935// This function tries to pin the pair without running the unlockers.
1936// If it can pin the pair cheaply, it does so, and returns 0.
1937// If the pin will be expensive, it runs unlockers,
1938// pins the pair, then releases the pin,
1939// and then returns TOKUDB_TRY_AGAIN
1940//
1941// on entry, pair mutex is held,
1942// on exit, pair mutex is NOT held
1943static int
1944maybe_pin_pair(
1945 PAIR p,
1946 pair_lock_type lock_type,
1947 UNLOCKERS unlockers
1948 )
1949{
1950 int retval = 0;
1951 bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1952
1953 // we can pin the PAIR. In each case, we check to see
1954 // if acquiring the pin is expensive. If so, we run the unlockers, set the
1955 // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR.
1956 // If not, then we pin the PAIR, keep retval at 0, and do not
1957 // run the unlockers, as we intend to return the value to the user
1958 if (lock_type == PL_READ) {
1959 if (p->value_rwlock.read_lock_is_expensive()) {
1960 pair_add_ref_unlocked(p);
1961 pair_unlock(p);
1962 run_unlockers(unlockers);
1963 retval = TOKUDB_TRY_AGAIN;
1964 pair_lock(p);
1965 pair_release_ref_unlocked(p);
1966 }
1967 p->value_rwlock.read_lock();
1968 }
1969 else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
1970 if (p->value_rwlock.write_lock_is_expensive()) {
1971 pair_add_ref_unlocked(p);
1972 pair_unlock(p);
1973 run_unlockers(unlockers);
1974 // change expensive to false because
1975 // we will unpin the pair immedietely
1976 // after pinning it
1977 expensive = false;
1978 retval = TOKUDB_TRY_AGAIN;
1979 pair_lock(p);
1980 pair_release_ref_unlocked(p);
1981 }
1982 p->value_rwlock.write_lock(expensive);
1983 }
1984 else {
1985 abort();
1986 }
1987
1988 if (retval == TOKUDB_TRY_AGAIN) {
1989 unpin_pair(p, (lock_type == PL_READ));
1990 }
1991 pair_touch(p);
1992 pair_unlock(p);
1993 return retval;
1994}
1995
1996int toku_cachetable_get_and_pin_nonblocking(
1997 CACHEFILE cf,
1998 CACHEKEY key,
1999 uint32_t fullhash,
2000 void**value,
2001 long* UU(sizep),
2002 CACHETABLE_WRITE_CALLBACK write_callback,
2003 CACHETABLE_FETCH_CALLBACK fetch_callback,
2004 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2005 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2006 pair_lock_type lock_type,
2007 void *read_extraargs,
2008 UNLOCKERS unlockers
2009 )
2010// See cachetable/cachetable.h.
2011{
2012 CACHETABLE ct = cf->cachetable;
2013 assert(lock_type == PL_READ ||
2014 lock_type == PL_WRITE_CHEAP ||
2015 lock_type == PL_WRITE_EXPENSIVE
2016 );
2017try_again:
2018 ct->list.pair_lock_by_fullhash(fullhash);
2019 PAIR p = ct->list.find_pair(cf, key, fullhash);
2020 if (p == NULL) {
2021 toku::context fetch_ctx(CTX_FULL_FETCH);
2022
2023 // Not found
2024 ct->list.pair_unlock_by_fullhash(fullhash);
2025 ct->list.write_list_lock();
2026 ct->list.pair_lock_by_fullhash(fullhash);
2027 p = ct->list.find_pair(cf, key, fullhash);
2028 if (p != NULL) {
2029 // we just did another search with the write list lock and
2030 // found the pair this means that in between our
2031 // releasing the read list lock and grabbing the write list lock,
2032 // another thread snuck in and inserted the PAIR into
2033 // the cachetable. For simplicity, we just return
2034 // to the top and restart the function
2035 ct->list.write_list_unlock();
2036 ct->list.pair_unlock_by_fullhash(fullhash);
2037 goto try_again;
2038 }
2039
2040 p = cachetable_insert_at(
2041 ct,
2042 cf,
2043 key,
2044 zero_value,
2045 fullhash,
2046 zero_attr,
2047 write_callback,
2048 CACHETABLE_CLEAN
2049 );
2050 assert(p);
2051 // grab expensive write lock, because we are about to do a fetch
2052 // off disk
2053 // No one can access this pair because
2054 // we hold the write list lock and we just injected
2055 // the pair into the cachetable. Therefore, this lock acquisition
2056 // will not block.
2057 p->value_rwlock.write_lock(true);
2058 pair_unlock(p);
2059 run_unlockers(unlockers); // we hold the write list_lock.
2060 ct->list.write_list_unlock();
2061
2062 // at this point, only the pair is pinned,
2063 // and no pair mutex held, and
2064 // no list lock is held
2065 uint64_t t0 = get_tnow();
2066 cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false);
2067 cachetable_miss++;
2068 cachetable_misstime += get_tnow() - t0;
2069
2070 if (ct->ev.should_client_thread_sleep()) {
2071 ct->ev.wait_for_cache_pressure_to_subside();
2072 }
2073 if (ct->ev.should_client_wake_eviction_thread()) {
2074 ct->ev.signal_eviction_thread();
2075 }
2076
2077 return TOKUDB_TRY_AGAIN;
2078 }
2079 else {
2080 int r = maybe_pin_pair(p, lock_type, unlockers);
2081 if (r == TOKUDB_TRY_AGAIN) {
2082 return TOKUDB_TRY_AGAIN;
2083 }
2084 assert_zero(r);
2085
2086 if (lock_type != PL_READ) {
2087 bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
2088 write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
2089 }
2090
2091 // At this point, we have pinned the PAIR
2092 // and resolved its checkpointing. The pair's
2093 // mutex is not held. The read list lock IS held. Before
2094 // returning the PAIR to the user, we must
2095 // still check for partial fetch
2096 bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2097 if (partial_fetch_required) {
2098 toku::context fetch_ctx(CTX_PARTIAL_FETCH);
2099
2100 run_unlockers(unlockers);
2101
2102 // we are now getting an expensive write lock, because we
2103 // are doing a partial fetch. So, if we previously have
2104 // either a read lock or a cheap write lock, we need to
2105 // release and reacquire the correct lock type
2106 if (lock_type == PL_READ) {
2107 pair_lock(p);
2108 p->value_rwlock.read_unlock();
2109 p->value_rwlock.write_lock(true);
2110 pair_unlock(p);
2111 }
2112 else if (lock_type == PL_WRITE_CHEAP) {
2113 pair_lock(p);
2114 p->value_rwlock.write_unlock();
2115 p->value_rwlock.write_lock(true);
2116 pair_unlock(p);
2117 }
2118
2119 // Now wait for the I/O to occur.
2120 partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2121 if (partial_fetch_required) {
2122 do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false);
2123 }
2124 else {
2125 pair_lock(p);
2126 p->value_rwlock.write_unlock();
2127 pair_unlock(p);
2128 }
2129
2130 if (ct->ev.should_client_thread_sleep()) {
2131 ct->ev.wait_for_cache_pressure_to_subside();
2132 }
2133 if (ct->ev.should_client_wake_eviction_thread()) {
2134 ct->ev.signal_eviction_thread();
2135 }
2136
2137 return TOKUDB_TRY_AGAIN;
2138 }
2139 else {
2140 *value = p->value_data;
2141 return 0;
2142 }
2143 }
2144 // We should not get here. Above code should hit a return in all cases.
2145 abort();
2146}
2147
2148struct cachefile_prefetch_args {
2149 PAIR p;
2150 CACHETABLE_FETCH_CALLBACK fetch_callback;
2151 void* read_extraargs;
2152};
2153
2154struct cachefile_partial_prefetch_args {
2155 PAIR p;
2156 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback;
2157 void *read_extraargs;
2158};
2159
2160// Worker thread function to read a pair from a cachefile to memory
2161static void cachetable_reader(void* extra) {
2162 struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra;
2163 CACHEFILE cf = cpargs->p->cachefile;
2164 CACHETABLE ct = cf->cachetable;
2165 cachetable_fetch_pair(
2166 ct,
2167 cpargs->p->cachefile,
2168 cpargs->p,
2169 cpargs->fetch_callback,
2170 cpargs->read_extraargs,
2171 false
2172 );
2173 bjm_remove_background_job(cf->bjm);
2174 toku_free(cpargs);
2175}
2176
2177static void cachetable_partial_reader(void* extra) {
2178 struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra;
2179 CACHEFILE cf = cpargs->p->cachefile;
2180 CACHETABLE ct = cf->cachetable;
2181 do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false);
2182 bjm_remove_background_job(cf->bjm);
2183 toku_free(cpargs);
2184}
2185
2186int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
2187 CACHETABLE_WRITE_CALLBACK write_callback,
2188 CACHETABLE_FETCH_CALLBACK fetch_callback,
2189 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2190 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2191 void *read_extraargs,
2192 bool *doing_prefetch)
2193// Effect: See the documentation for this function in cachetable/cachetable.h
2194{
2195 int r = 0;
2196 PAIR p = NULL;
2197 if (doing_prefetch) {
2198 *doing_prefetch = false;
2199 }
2200 CACHETABLE ct = cf->cachetable;
2201 // if cachetable has too much data, don't bother prefetching
2202 if (ct->ev.should_client_thread_sleep()) {
2203 goto exit;
2204 }
2205 ct->list.pair_lock_by_fullhash(fullhash);
2206 // lookup
2207 p = ct->list.find_pair(cf, key, fullhash);
2208 // if not found then create a pair and fetch it
2209 if (p == NULL) {
2210 cachetable_prefetches++;
2211 ct->list.pair_unlock_by_fullhash(fullhash);
2212 ct->list.write_list_lock();
2213 ct->list.pair_lock_by_fullhash(fullhash);
2214 p = ct->list.find_pair(cf, key, fullhash);
2215 if (p != NULL) {
2216 ct->list.write_list_unlock();
2217 goto found_pair;
2218 }
2219
2220 r = bjm_add_background_job(cf->bjm);
2221 assert_zero(r);
2222 p = cachetable_insert_at(
2223 ct,
2224 cf,
2225 key,
2226 zero_value,
2227 fullhash,
2228 zero_attr,
2229 write_callback,
2230 CACHETABLE_CLEAN
2231 );
2232 assert(p);
2233 p->value_rwlock.write_lock(true);
2234 pair_unlock(p);
2235 ct->list.write_list_unlock();
2236
2237 struct cachefile_prefetch_args *MALLOC(cpargs);
2238 cpargs->p = p;
2239 cpargs->fetch_callback = fetch_callback;
2240 cpargs->read_extraargs = read_extraargs;
2241 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs);
2242 if (doing_prefetch) {
2243 *doing_prefetch = true;
2244 }
2245 goto exit;
2246 }
2247
2248found_pair:
2249 // at this point, p is found, pair's mutex is grabbed, and
2250 // no list lock is held
2251 // TODO(leif): should this also just go ahead and wait if all there
2252 // are to wait for are readers?
2253 if (p->value_rwlock.try_write_lock(true)) {
2254 // nobody else is using the node, so we should go ahead and prefetch
2255 pair_touch(p);
2256 pair_unlock(p);
2257 bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
2258
2259 if (partial_fetch_required) {
2260 r = bjm_add_background_job(cf->bjm);
2261 assert_zero(r);
2262 struct cachefile_partial_prefetch_args *MALLOC(cpargs);
2263 cpargs->p = p;
2264 cpargs->pf_callback = pf_callback;
2265 cpargs->read_extraargs = read_extraargs;
2266 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs);
2267 if (doing_prefetch) {
2268 *doing_prefetch = true;
2269 }
2270 }
2271 else {
2272 pair_lock(p);
2273 p->value_rwlock.write_unlock();
2274 pair_unlock(p);
2275 }
2276 }
2277 else {
2278 // Couldn't get the write lock cheaply
2279 pair_unlock(p);
2280 }
2281exit:
2282 return 0;
2283}
2284
2285void toku_cachefile_verify (CACHEFILE cf) {
2286 toku_cachetable_verify(cf->cachetable);
2287}
2288
2289void toku_cachetable_verify (CACHETABLE ct) {
2290 ct->list.verify();
2291}
2292
2293
2294
2295struct pair_flush_for_close{
2296 PAIR p;
2297 BACKGROUND_JOB_MANAGER bjm;
2298};
2299
2300static void cachetable_flush_pair_for_close(void* extra) {
2301 struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra);
2302 PAIR p = args->p;
2303 CACHEFILE cf = p->cachefile;
2304 CACHETABLE ct = cf->cachetable;
2305 PAIR_ATTR attr;
2306 cachetable_only_write_locked_data(
2307 &ct->ev,
2308 p,
2309 false, // not for a checkpoint, as we assert above
2310 &attr,
2311 false // not a clone
2312 );
2313 p->dirty = CACHETABLE_CLEAN;
2314 bjm_remove_background_job(args->bjm);
2315 toku_free(args);
2316}
2317
2318
2319static void flush_pair_for_close_on_background_thread(
2320 PAIR p,
2321 BACKGROUND_JOB_MANAGER bjm,
2322 CACHETABLE ct
2323 )
2324{
2325 pair_lock(p);
2326 assert(p->value_rwlock.users() == 0);
2327 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2328 assert(!p->cloned_value_data);
2329 if (p->dirty == CACHETABLE_DIRTY) {
2330 int r = bjm_add_background_job(bjm);
2331 assert_zero(r);
2332 struct pair_flush_for_close *XMALLOC(args);
2333 args->p = p;
2334 args->bjm = bjm;
2335 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
2336 }
2337 pair_unlock(p);
2338}
2339
2340static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) {
2341 pair_lock(p);
2342 assert(p->value_rwlock.users() == 0);
2343 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2344 assert(!p->cloned_value_data);
2345 assert(p->dirty == CACHETABLE_CLEAN);
2346 assert(p->refcount == 0);
2347 if (completely) {
2348 cachetable_remove_pair(&ct->list, &ct->ev, p);
2349 pair_unlock(p);
2350 // TODO: Eventually, we should not hold the write list lock during free
2351 cachetable_free_pair(p);
2352 }
2353 else {
2354 // if we are not evicting completely,
2355 // we only want to remove the PAIR from the cachetable,
2356 // that is, remove from the hashtable and various linked
2357 // list, but we will keep the PAIRS and the linked list
2358 // in the cachefile intact, as they will be cached away
2359 // in case an open comes soon.
2360 ct->list.evict_from_cachetable(p);
2361 pair_unlock(p);
2362 }
2363}
2364
2365// helper function for cachetable_flush_cachefile, which happens on a close
2366// writes out the dirty pairs on background threads and returns when
2367// the writing is done
2368static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
2369 BACKGROUND_JOB_MANAGER bjm = NULL;
2370 bjm_init(&bjm);
2371 ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
2372 PAIR p = NULL;
2373 // write out dirty PAIRs
2374 uint32_t i;
2375 if (cf) {
2376 for (i = 0, p = cf->cf_head;
2377 i < cf->num_pairs;
2378 i++, p = p->cf_next)
2379 {
2380 flush_pair_for_close_on_background_thread(p, bjm, ct);
2381 }
2382 }
2383 else {
2384 for (i = 0, p = ct->list.m_checkpoint_head;
2385 i < ct->list.m_n_in_table;
2386 i++, p = p->clock_next)
2387 {
2388 flush_pair_for_close_on_background_thread(p, bjm, ct);
2389 }
2390 }
2391 ct->list.write_list_unlock();
2392 bjm_wait_for_jobs_to_finish(bjm);
2393 bjm_destroy(bjm);
2394}
2395
2396static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2397 ct->list.write_list_lock();
2398 if (cf) {
2399 if (evict_completely) {
2400 // if we are evicting completely, then the PAIRs will
2401 // be removed from the linked list managed by the
2402 // cachefile, so this while loop works
2403 while (cf->num_pairs > 0) {
2404 PAIR p = cf->cf_head;
2405 remove_pair_for_close(p, ct, evict_completely);
2406 }
2407 }
2408 else {
2409 // on the other hand, if we are not evicting completely,
2410 // then the cachefile's linked list stays intact, and we must
2411 // iterate like this.
2412 for (PAIR p = cf->cf_head; p; p = p->cf_next) {
2413 remove_pair_for_close(p, ct, evict_completely);
2414 }
2415 }
2416 }
2417 else {
2418 while (ct->list.m_n_in_table > 0) {
2419 PAIR p = ct->list.m_checkpoint_head;
2420 // if there is no cachefile, then we better
2421 // be evicting completely because we have no
2422 // cachefile to save the PAIRs to. At least,
2423 // we have no guarantees that the cachefile
2424 // will remain good
2425 invariant(evict_completely);
2426 remove_pair_for_close(p, ct, true);
2427 }
2428 }
2429 ct->list.write_list_unlock();
2430}
2431
2432static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) {
2433#ifdef TOKU_DEBUG_PARANOID
2434 // assert here that cachefile is flushed by checking
2435 // pair_list and finding no pairs belonging to this cachefile
2436 // Make a list of pairs that belong to this cachefile.
2437 if (cf) {
2438 ct->list.write_list_lock();
2439 // assert here that cachefile is flushed by checking
2440 // pair_list and finding no pairs belonging to this cachefile
2441 // Make a list of pairs that belong to this cachefile.
2442 uint32_t i;
2443 PAIR p = NULL;
2444 for (i = 0, p = ct->list.m_checkpoint_head;
2445 i < ct->list.m_n_in_table;
2446 i++, p = p->clock_next)
2447 {
2448 assert(p->cachefile != cf);
2449 }
2450 ct->list.write_list_unlock();
2451 }
2452#endif
2453}
2454
2455// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
2456// the cachefile is NULL.
2457// Must be holding cachetable lock on entry.
2458//
2459// This function assumes that no client thread is accessing or
2460// trying to access the cachefile while this function is executing.
2461// This implies no client thread will be trying to lock any nodes
2462// belonging to the cachefile.
2463//
2464// This function also assumes that the cachefile is not in the process
2465// of being used by a checkpoint. If a checkpoint is currently happening,
2466// it does NOT include this cachefile.
2467//
2468static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2469 //
2470 // Because work on a kibbutz is always done by the client thread,
2471 // and this function assumes that no client thread is doing any work
2472 // on the cachefile, we assume that no client thread will be adding jobs
2473 // to this cachefile's kibbutz.
2474 //
2475 // The caller of this function must ensure that there are
2476 // no jobs added to the kibbutz. This implies that the only work other
2477 // threads may be doing is work by the writer threads.
2478 //
2479 // first write out dirty PAIRs
2480 write_dirty_pairs_for_close(ct, cf);
2481
2482 // now that everything is clean, get rid of everything
2483 remove_all_pairs_for_close(ct, cf, evict_completely);
2484
2485 verify_cachefile_flushed(ct, cf);
2486}
2487
2488/* Requires that no locks be held that are used by the checkpoint logic */
2489void
2490toku_cachetable_minicron_shutdown(CACHETABLE ct) {
2491 int r = ct->cp.shutdown();
2492 assert(r==0);
2493 ct->cl.destroy();
2494}
2495
2496void toku_cachetable_prepare_close(CACHETABLE ct UU()) {
2497 extern bool toku_serialize_in_parallel;
2498 toku_unsafe_set(&toku_serialize_in_parallel, true);
2499}
2500
2501/* Requires that it all be flushed. */
2502void toku_cachetable_close (CACHETABLE *ctp) {
2503 CACHETABLE ct = *ctp;
2504 ct->cp.destroy();
2505 ct->cl.destroy();
2506 ct->cf_list.free_stale_data(&ct->ev);
2507 cachetable_flush_cachefile(ct, NULL, true);
2508 ct->ev.destroy();
2509 ct->list.destroy();
2510 ct->cf_list.destroy();
2511
2512 if (ct->client_kibbutz)
2513 toku_kibbutz_destroy(ct->client_kibbutz);
2514 if (ct->ct_kibbutz)
2515 toku_kibbutz_destroy(ct->ct_kibbutz);
2516 if (ct->checkpointing_kibbutz)
2517 toku_kibbutz_destroy(ct->checkpointing_kibbutz);
2518 toku_free(ct->env_dir);
2519 toku_free(ct);
2520 *ctp = 0;
2521}
2522
2523static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) {
2524 CACHETABLE ct = cachefile->cachetable;
2525
2526 if (!have_ct_lock) {
2527 ct->list.read_list_lock();
2528 }
2529
2530 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
2531 assert(p != NULL);
2532 if (!have_ct_lock) {
2533 ct->list.read_list_unlock();
2534 }
2535 return p;
2536}
2537
2538//test-only wrapper
2539int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2540 // By default we don't have the lock
2541 PAIR p = test_get_pair(cachefile, key, fullhash, false);
2542 return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock
2543}
2544
2545//test-only wrapper
2546int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2547 // We hold the cachetable mutex.
2548 PAIR p = test_get_pair(cachefile, key, fullhash, true);
2549 return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
2550}
2551
2552//test-only wrapper
2553int toku_test_cachetable_unpin_and_remove (
2554 CACHEFILE cachefile,
2555 CACHEKEY key,
2556 CACHETABLE_REMOVE_KEY remove_key,
2557 void* remove_key_extra)
2558{
2559 uint32_t fullhash = toku_cachetable_hash(cachefile, key);
2560 PAIR p = test_get_pair(cachefile, key, fullhash, false);
2561 return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra);
2562}
2563
2564int toku_cachetable_unpin_and_remove (
2565 CACHEFILE cachefile,
2566 PAIR p,
2567 CACHETABLE_REMOVE_KEY remove_key,
2568 void* remove_key_extra
2569 )
2570{
2571 invariant_notnull(p);
2572 int r = ENOENT;
2573 CACHETABLE ct = cachefile->cachetable;
2574
2575 p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
2576 // grab disk_nb_mutex to ensure any background thread writing
2577 // out a cloned value completes
2578 pair_lock(p);
2579 assert(p->value_rwlock.writers());
2580 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
2581 pair_unlock(p);
2582 assert(p->cloned_value_data == NULL);
2583
2584 //
2585 // take care of key removal
2586 //
2587 ct->list.write_list_lock();
2588 ct->list.read_pending_cheap_lock();
2589 bool for_checkpoint = p->checkpoint_pending;
2590 // now let's wipe out the pending bit, because we are
2591 // removing the PAIR
2592 p->checkpoint_pending = false;
2593
2594 // For the PAIR to not be picked by the
2595 // cleaner thread, we mark the cachepressure_size to be 0
2596 // (This is redundant since we have the write_list_lock)
2597 // This should not be an issue because we call
2598 // cachetable_remove_pair before
2599 // releasing the cachetable lock.
2600 //
2601 CACHEKEY key_to_remove = p->key;
2602 p->attr.cache_pressure_size = 0;
2603 //
2604 // callback for removing the key
2605 // for FTNODEs, this leads to calling
2606 // toku_free_blocknum
2607 //
2608 if (remove_key) {
2609 remove_key(
2610 &key_to_remove,
2611 for_checkpoint,
2612 remove_key_extra
2613 );
2614 }
2615 ct->list.read_pending_cheap_unlock();
2616
2617 pair_lock(p);
2618 p->value_rwlock.write_unlock();
2619 nb_mutex_unlock(&p->disk_nb_mutex);
2620 //
2621 // As of Clayface (6.5), only these threads may be
2622 // blocked waiting to lock this PAIR:
2623 // - the checkpoint thread (because a checkpoint is in progress
2624 // and the PAIR was in the list of pending pairs)
2625 // - a client thread running get_and_pin_nonblocking, who
2626 // ran unlockers, then waited on the PAIR lock.
2627 // While waiting on a PAIR lock, another thread comes in,
2628 // locks the PAIR, and ends up calling unpin_and_remove,
2629 // all while get_and_pin_nonblocking is waiting on the PAIR lock.
2630 // We did not realize this at first, which caused bug #4357
2631 // The following threads CANNOT be blocked waiting on
2632 // the PAIR lock:
2633 // - a thread trying to run eviction via run_eviction.
2634 // That cannot happen because run_eviction only
2635 // attempts to lock PAIRS that are not locked, and this PAIR
2636 // is locked.
2637 // - cleaner thread, for the same reason as a thread running
2638 // eviction
2639 // - client thread doing a normal get_and_pin. The client is smart
2640 // enough to not try to lock a PAIR that another client thread
2641 // is trying to unpin and remove. Note that this includes work
2642 // done on kibbutzes.
2643 // - writer thread. Writer threads do not grab PAIR locks. They
2644 // get PAIR locks transferred to them by client threads.
2645 //
2646
2647 // first thing we do is remove the PAIR from the various
2648 // cachetable data structures, so no other thread can possibly
2649 // access it. We do not want to risk some other thread
2650 // trying to lock this PAIR if we release the write list lock
2651 // below. If some thread is already waiting on the lock,
2652 // then we let that thread grab the lock and finish, but
2653 // we don't want any NEW threads to try to grab the PAIR
2654 // lock.
2655 //
2656 // Because we call cachetable_remove_pair and wait,
2657 // the threads that may be waiting
2658 // on this PAIR lock must be careful to do NOTHING with the PAIR
2659 // As per our analysis above, we only need
2660 // to make sure the checkpoint thread and get_and_pin_nonblocking do
2661 // nothing, and looking at those functions, it is clear they do nothing.
2662 //
2663 cachetable_remove_pair(&ct->list, &ct->ev, p);
2664 ct->list.write_list_unlock();
2665 if (p->refcount > 0) {
2666 pair_wait_for_ref_release_unlocked(p);
2667 }
2668 if (p->value_rwlock.users() > 0) {
2669 // Need to wait for everyone else to leave
2670 // This write lock will be granted only after all waiting
2671 // threads are done.
2672 p->value_rwlock.write_lock(true);
2673 assert(p->refcount == 0);
2674 assert(p->value_rwlock.users() == 1); // us
2675 assert(!p->checkpoint_pending);
2676 assert(p->attr.cache_pressure_size == 0);
2677 p->value_rwlock.write_unlock();
2678 }
2679 // just a sanity check
2680 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2681 assert(p->cloned_value_data == NULL);
2682 //Remove pair.
2683 pair_unlock(p);
2684 cachetable_free_pair(p);
2685 r = 0;
2686 return r;
2687}
2688
2689int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array);
2690int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) {
2691 array[index] = toku_cachefile_filenum(ft->cf);
2692 return 0;
2693}
2694
2695static int log_open_txn (TOKUTXN txn, void* extra) {
2696 int r;
2697 checkpointer* cp = (checkpointer *)extra;
2698 TOKULOGGER logger = txn->logger;
2699 FILENUMS open_filenums;
2700 uint32_t num_filenums = txn->open_fts.size();
2701 FILENUM array[num_filenums];
2702 if (toku_txn_is_read_only(txn)) {
2703 goto cleanup;
2704 }
2705 else {
2706 cp->increment_num_txns();
2707 }
2708
2709 open_filenums.num = num_filenums;
2710 open_filenums.filenums = array;
2711 //Fill in open_filenums
2712 r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array);
2713 invariant(r==0);
2714 switch (toku_txn_get_state(txn)) {
2715 case TOKUTXN_LIVE:{
2716 toku_log_xstillopen(logger, NULL, 0, txn,
2717 toku_txn_get_txnid(txn),
2718 toku_txn_get_txnid(toku_logger_txn_parent(txn)),
2719 txn->roll_info.rollentry_raw_count,
2720 open_filenums,
2721 txn->force_fsync_on_commit,
2722 txn->roll_info.num_rollback_nodes,
2723 txn->roll_info.num_rollentries,
2724 txn->roll_info.spilled_rollback_head,
2725 txn->roll_info.spilled_rollback_tail,
2726 txn->roll_info.current_rollback);
2727 goto cleanup;
2728 }
2729 case TOKUTXN_PREPARING: {
2730 TOKU_XA_XID xa_xid;
2731 toku_txn_get_prepared_xa_xid(txn, &xa_xid);
2732 toku_log_xstillopenprepared(logger, NULL, 0, txn,
2733 toku_txn_get_txnid(txn),
2734 &xa_xid,
2735 txn->roll_info.rollentry_raw_count,
2736 open_filenums,
2737 txn->force_fsync_on_commit,
2738 txn->roll_info.num_rollback_nodes,
2739 txn->roll_info.num_rollentries,
2740 txn->roll_info.spilled_rollback_head,
2741 txn->roll_info.spilled_rollback_tail,
2742 txn->roll_info.current_rollback);
2743 goto cleanup;
2744 }
2745 case TOKUTXN_RETIRED:
2746 case TOKUTXN_COMMITTING:
2747 case TOKUTXN_ABORTING: {
2748 assert(0);
2749 }
2750 }
2751 // default is an error
2752 assert(0);
2753cleanup:
2754 return 0;
2755}
2756
2757// Requires: All three checkpoint-relevant locks must be held (see checkpoint.c).
2758// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
2759// Use the begin_checkpoint callback to take necessary snapshots (header, btt)
2760// Mark every dirty node as "pending." ("Pending" means that the node must be
2761// written to disk before it can be modified.)
2762void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) {
2763 cp->begin_checkpoint();
2764}
2765
2766
2767// This is used by the cachetable_race test.
2768static volatile int toku_checkpointing_user_data_status = 0;
2769static void toku_cachetable_set_checkpointing_user_data_status (int v) {
2770 toku_checkpointing_user_data_status = v;
2771}
2772int toku_cachetable_get_checkpointing_user_data_status (void) {
2773 return toku_checkpointing_user_data_status;
2774}
2775
2776// Requires: The big checkpoint lock must be held (see checkpoint.c).
2777// Algorithm: Write all pending nodes to disk
2778// Use checkpoint callback to write snapshot information to disk (header, btt)
2779// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
2780// Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log
2781void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger),
2782 void (*testcallback_f)(void*), void* testextra) {
2783 cp->end_checkpoint(testcallback_f, testextra);
2784}
2785
2786TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
2787 return cf->cachetable->cp.get_logger();
2788}
2789
2790FILENUM toku_cachefile_filenum (CACHEFILE cf) {
2791 return cf->filenum;
2792}
2793
2794// debug functions
2795
2796int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
2797 uint32_t i;
2798 int some_pinned=0;
2799 ct->list.read_list_lock();
2800 for (i=0; i<ct->list.m_table_size; i++) {
2801 PAIR p;
2802 for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2803 pair_lock(p);
2804 if (p->value_rwlock.users()) {
2805 //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
2806 some_pinned=1;
2807 }
2808 pair_unlock(p);
2809 }
2810 }
2811 ct->list.read_list_unlock();
2812 return some_pinned;
2813}
2814
2815int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
2816 assert(cf != NULL);
2817 int n_pinned=0;
2818 CACHETABLE ct = cf->cachetable;
2819 ct->list.read_list_lock();
2820
2821 // Iterate over all the pairs to find pairs specific to the
2822 // given cachefile.
2823 for (uint32_t i = 0; i < ct->list.m_table_size; i++) {
2824 for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) {
2825 if (p->cachefile == cf) {
2826 pair_lock(p);
2827 if (p->value_rwlock.users()) {
2828 if (print_them) {
2829 printf("%s:%d pinned: %" PRId64 " (%p)\n",
2830 __FILE__,
2831 __LINE__,
2832 p->key.b,
2833 p->value_data);
2834 }
2835 n_pinned++;
2836 }
2837 pair_unlock(p);
2838 }
2839 }
2840 }
2841
2842 ct->list.read_list_unlock();
2843 return n_pinned;
2844}
2845
2846void toku_cachetable_print_state (CACHETABLE ct) {
2847 uint32_t i;
2848 ct->list.read_list_lock();
2849 for (i=0; i<ct->list.m_table_size; i++) {
2850 PAIR p = ct->list.m_table[i];
2851 if (p != 0) {
2852 pair_lock(p);
2853 printf("t[%u]=", i);
2854 for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2855 printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size);
2856 }
2857 printf("\n");
2858 pair_unlock(p);
2859 }
2860 }
2861 ct->list.read_list_unlock();
2862}
2863
2864void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
2865 ct->list.get_state(num_entries_ptr, hash_size_ptr);
2866 ct->ev.get_state(size_current_ptr, size_limit_ptr);
2867}
2868
2869int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
2870 int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
2871 int r = -1;
2872 uint32_t fullhash = toku_cachetable_hash(cf, key);
2873 ct->list.read_list_lock();
2874 PAIR p = ct->list.find_pair(cf, key, fullhash);
2875 if (p) {
2876 pair_lock(p);
2877 if (value_ptr)
2878 *value_ptr = p->value_data;
2879 if (dirty_ptr)
2880 *dirty_ptr = p->dirty;
2881 if (pin_ptr)
2882 *pin_ptr = p->value_rwlock.users();
2883 if (size_ptr)
2884 *size_ptr = p->attr.size;
2885 r = 0;
2886 pair_unlock(p);
2887 }
2888 ct->list.read_list_unlock();
2889 return r;
2890}
2891
2892void
2893toku_cachefile_set_userdata (CACHEFILE cf,
2894 void *userdata,
2895 void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
2896 void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
2897 void (*free_userdata)(CACHEFILE, void*),
2898 void (*checkpoint_userdata)(CACHEFILE, int, void*),
2899 void (*begin_checkpoint_userdata)(LSN, void*),
2900 void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
2901 void (*note_pin_by_checkpoint)(CACHEFILE, void*),
2902 void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
2903 cf->userdata = userdata;
2904 cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
2905 cf->close_userdata = close_userdata;
2906 cf->free_userdata = free_userdata;
2907 cf->checkpoint_userdata = checkpoint_userdata;
2908 cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
2909 cf->end_checkpoint_userdata = end_checkpoint_userdata;
2910 cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
2911 cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
2912}
2913
2914void *toku_cachefile_get_userdata(CACHEFILE cf) {
2915 return cf->userdata;
2916}
2917
2918CACHETABLE
2919toku_cachefile_get_cachetable(CACHEFILE cf) {
2920 return cf->cachetable;
2921}
2922
2923CACHEFILE toku_pair_get_cachefile(PAIR pair) {
2924 return pair->cachefile;
2925}
2926
2927//Only called by ft_end_checkpoint
2928//Must have access to cf->fd (must be protected)
2929void toku_cachefile_fsync(CACHEFILE cf) {
2930 toku_file_fsync(cf->fd);
2931}
2932
2933// Make it so when the cachefile closes, the underlying file is unlinked
2934void toku_cachefile_unlink_on_close(CACHEFILE cf) {
2935 assert(!cf->unlink_on_close);
2936 cf->unlink_on_close = true;
2937}
2938
2939// is this cachefile marked as unlink on close?
2940bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) {
2941 return cf->unlink_on_close;
2942}
2943
2944void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) {
2945 cf->skip_log_recover_on_close = true;
2946}
2947
2948void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) {
2949 cf->skip_log_recover_on_close = false;
2950}
2951
2952bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) {
2953 return cf->skip_log_recover_on_close;
2954}
2955
2956uint64_t toku_cachefile_size(CACHEFILE cf) {
2957 int64_t file_size;
2958 int fd = toku_cachefile_get_fd(cf);
2959 int r = toku_os_get_file_size(fd, &file_size);
2960 assert_zero(r);
2961 return file_size;
2962}
2963
2964char *
2965toku_construct_full_name(int count, ...) {
2966 va_list ap;
2967 char *name = NULL;
2968 size_t n = 0;
2969 int i;
2970 va_start(ap, count);
2971 for (i=0; i<count; i++) {
2972 char *arg = va_arg(ap, char *);
2973 if (arg) {
2974 n += 1 + strlen(arg) + 1;
2975 char *XMALLOC_N(n, newname);
2976 if (name && !toku_os_is_absolute_name(arg))
2977 snprintf(newname, n, "%s/%s", name, arg);
2978 else
2979 snprintf(newname, n, "%s", arg);
2980 toku_free(name);
2981 name = newname;
2982 }
2983 }
2984 va_end(ap);
2985
2986 return name;
2987}
2988
2989char *
2990toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) {
2991 return toku_construct_full_name(2, ct->env_dir, fname_in_env);
2992}
2993
2994static long
2995cleaner_thread_rate_pair(PAIR p)
2996{
2997 return p->attr.cache_pressure_size;
2998}
2999
3000static int const CLEANER_N_TO_CHECK = 8;
3001
3002int toku_cleaner_thread_for_test (CACHETABLE ct) {
3003 return ct->cl.run_cleaner();
3004}
3005
3006int toku_cleaner_thread (void *cleaner_v) {
3007 cleaner* cl = (cleaner *) cleaner_v;
3008 assert(cl);
3009 return cl->run_cleaner();
3010}
3011
3012/////////////////////////////////////////////////////////////////////////
3013//
3014// cleaner methods
3015//
3016ENSURE_POD(cleaner);
3017
3018int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
3019 // default is no cleaner, for now
3020 m_cleaner_cron_init = false;
3021 int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
3022 if (r == 0) {
3023 m_cleaner_cron_init = true;
3024 }
3025 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations);
3026 m_cleaner_iterations = _cleaner_iterations;
3027 m_pl = _pl;
3028 m_ct = _ct;
3029 m_cleaner_init = true;
3030 return r;
3031}
3032
3033// this function is allowed to be called multiple times
3034void cleaner::destroy(void) {
3035 if (!m_cleaner_init) {
3036 return;
3037 }
3038 if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
3039 // for test code only, production code uses toku_cachetable_minicron_shutdown()
3040 int r = toku_minicron_shutdown(&m_cleaner_cron);
3041 assert(r==0);
3042 }
3043}
3044
3045uint32_t cleaner::get_iterations(void) {
3046 return m_cleaner_iterations;
3047}
3048
3049void cleaner::set_iterations(uint32_t new_iterations) {
3050 m_cleaner_iterations = new_iterations;
3051}
3052
3053uint32_t cleaner::get_period_unlocked(void) {
3054 return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron);
3055}
3056
3057//
3058// Sets how often the cleaner thread will run, in seconds
3059//
3060void cleaner::set_period(uint32_t new_period) {
3061 toku_minicron_change_period(&m_cleaner_cron, new_period*1000);
3062}
3063
3064// Effect: runs a cleaner.
3065//
3066// We look through some number of nodes, the first N that we see which are
3067// unlocked and are not involved in a cachefile flush, pick one, and call
3068// the cleaner callback. While we're picking a node, we have the
3069// cachetable lock the whole time, so we don't need any extra
3070// synchronization. Once we have one we want, we lock it and notify the
3071// cachefile that we're doing some background work (so a flush won't
3072// start). At this point, we can safely unlock the cachetable, do the
3073// work (callback), and unlock/release our claim to the cachefile.
3074int cleaner::run_cleaner(void) {
3075 toku::context cleaner_ctx(CTX_CLEANER);
3076
3077 int r;
3078 uint32_t num_iterations = this->get_iterations();
3079 for (uint32_t i = 0; i < num_iterations; ++i) {
3080 cleaner_executions++;
3081 m_pl->read_list_lock();
3082 PAIR best_pair = NULL;
3083 int n_seen = 0;
3084 long best_score = 0;
3085 const PAIR first_pair = m_pl->m_cleaner_head;
3086 if (first_pair == NULL) {
3087 // nothing in the cachetable, just get out now
3088 m_pl->read_list_unlock();
3089 break;
3090 }
3091 // here we select a PAIR for cleaning
3092 // look at some number of PAIRS, and
3093 // pick what we think is the best one for cleaning
3094 //***** IMPORTANT ******
3095 // we MUST not pick a PAIR whose rating is 0. We have
3096 // numerous assumptions in other parts of the code that
3097 // this is the case:
3098 // - this is how rollback nodes and leaf nodes are not selected for cleaning
3099 // - this is how a thread that is calling unpin_and_remove will prevent
3100 // the cleaner thread from picking its PAIR (see comments in that function)
3101 do {
3102 //
3103 // We are already holding onto best_pair, if we run across a pair that
3104 // has the same mutex due to a collision in the hashtable, we need
3105 // to be careful.
3106 //
3107 if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) {
3108 // Advance the cleaner head.
3109 long score = 0;
3110 // only bother with this pair if it has no current users
3111 if (m_pl->m_cleaner_head->value_rwlock.users() == 0) {
3112 score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3113 if (score > best_score) {
3114 best_score = score;
3115 best_pair = m_pl->m_cleaner_head;
3116 }
3117 }
3118 m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3119 continue;
3120 }
3121 pair_lock(m_pl->m_cleaner_head);
3122 if (m_pl->m_cleaner_head->value_rwlock.users() > 0) {
3123 pair_unlock(m_pl->m_cleaner_head);
3124 }
3125 else {
3126 n_seen++;
3127 long score = 0;
3128 score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3129 if (score > best_score) {
3130 best_score = score;
3131 // Since we found a new best pair, we need to
3132 // free the old best pair.
3133 if (best_pair) {
3134 pair_unlock(best_pair);
3135 }
3136 best_pair = m_pl->m_cleaner_head;
3137 }
3138 else {
3139 pair_unlock(m_pl->m_cleaner_head);
3140 }
3141 }
3142 // Advance the cleaner head.
3143 m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3144 } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK);
3145 m_pl->read_list_unlock();
3146
3147 //
3148 // at this point, if we have found a PAIR for cleaning,
3149 // that is, best_pair != NULL, we do the clean
3150 //
3151 // if best_pair !=NULL, then best_pair->mutex is held
3152 // no list lock is held
3153 //
3154 if (best_pair) {
3155 CACHEFILE cf = best_pair->cachefile;
3156 // try to add a background job to the manager
3157 // if we can't, that means the cachefile is flushing, so
3158 // we simply continue the for loop and this iteration
3159 // becomes a no-op
3160 r = bjm_add_background_job(cf->bjm);
3161 if (r) {
3162 pair_unlock(best_pair);
3163 continue;
3164 }
3165 best_pair->value_rwlock.write_lock(true);
3166 pair_unlock(best_pair);
3167 // verify a key assumption.
3168 assert(cleaner_thread_rate_pair(best_pair) > 0);
3169 // check the checkpoint_pending bit
3170 m_pl->read_pending_cheap_lock();
3171 bool checkpoint_pending = best_pair->checkpoint_pending;
3172 best_pair->checkpoint_pending = false;
3173 m_pl->read_pending_cheap_unlock();
3174 if (checkpoint_pending) {
3175 write_locked_pair_for_checkpoint(m_ct, best_pair, true);
3176 }
3177
3178 bool cleaner_callback_called = false;
3179
3180 // it's theoretically possible that after writing a PAIR for checkpoint, the
3181 // PAIR's heuristic tells us nothing needs to be done. It is not possible
3182 // in Dr. Noga, but unit tests verify this behavior works properly.
3183 if (cleaner_thread_rate_pair(best_pair) > 0) {
3184 r = best_pair->cleaner_callback(best_pair->value_data,
3185 best_pair->key,
3186 best_pair->fullhash,
3187 best_pair->write_extraargs);
3188 assert_zero(r);
3189 cleaner_callback_called = true;
3190 }
3191
3192 // The cleaner callback must have unlocked the pair, so we
3193 // don't need to unlock it if the cleaner callback is called.
3194 if (!cleaner_callback_called) {
3195 pair_lock(best_pair);
3196 best_pair->value_rwlock.write_unlock();
3197 pair_unlock(best_pair);
3198 }
3199 // We need to make sure the cachefile sticks around so a close
3200 // can't come destroy it. That's the purpose of this
3201 // "add/remove_background_job" business, which means the
3202 // cachefile is still valid here, even though the cleaner
3203 // callback unlocks the pair.
3204 bjm_remove_background_job(cf->bjm);
3205 }
3206 else {
3207 // If we didn't find anything this time around the cachetable,
3208 // we probably won't find anything if we run around again, so
3209 // just break out from the for-loop now and
3210 // we'll try again when the cleaner thread runs again.
3211 break;
3212 }
3213 }
3214 return 0;
3215}
3216
3217static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD");
3218
3219const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20;
3220uint32_t PAIR_LOCK_SIZE = 1<<20;
3221
3222void toku_pair_list_set_lock_size(uint32_t num_locks) {
3223 PAIR_LOCK_SIZE = num_locks;
3224}
3225
3226static void evict_pair_from_cachefile(PAIR p) {
3227 CACHEFILE cf = p->cachefile;
3228 if (p->cf_next) {
3229 p->cf_next->cf_prev = p->cf_prev;
3230 }
3231 if (p->cf_prev) {
3232 p->cf_prev->cf_next = p->cf_next;
3233 }
3234 else if (p->cachefile->cf_head == p) {
3235 cf->cf_head = p->cf_next;
3236 }
3237 p->cf_prev = p->cf_next = NULL;
3238 cf->num_pairs--;
3239}
3240
3241// Allocates the hash table of pairs inside this pair list.
3242//
3243void pair_list::init() {
3244 m_table_size = INITIAL_PAIR_LIST_SIZE;
3245 m_num_locks = PAIR_LOCK_SIZE;
3246 m_n_in_table = 0;
3247 m_clock_head = NULL;
3248 m_cleaner_head = NULL;
3249 m_checkpoint_head = NULL;
3250 m_pending_head = NULL;
3251 m_table = NULL;
3252
3253
3254 pthread_rwlockattr_t attr;
3255 pthread_rwlockattr_init(&attr);
3256#if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP)
3257 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3258#else
3259// TODO: need to figure out how to make writer-preferential rwlocks
3260// happen on osx
3261#endif
3262 toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr);
3263 toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key,
3264 &m_pending_lock_expensive,
3265 &attr);
3266 toku_pthread_rwlock_init(
3267 *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr);
3268 XCALLOC_N(m_table_size, m_table);
3269 XCALLOC_N(m_num_locks, m_mutexes);
3270 for (uint64_t i = 0; i < m_num_locks; i++) {
3271 toku_mutex_init(
3272#ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX
3273 *cachetable_m_mutex_key,
3274#else
3275 toku_uninstrumented,
3276#endif
3277 &m_mutexes[i].aligned_mutex,
3278 nullptr);
3279 }
3280}
3281
3282// Frees the pair_list hash table. It is expected to be empty by
3283// the time this is called. Returns an error if there are any
3284// pairs in any of the hash table slots.
3285void pair_list::destroy() {
3286 // Check if any entries exist in the hash table.
3287 for (uint32_t i = 0; i < m_table_size; ++i) {
3288 invariant_null(m_table[i]);
3289 }
3290 for (uint64_t i = 0; i < m_num_locks; i++) {
3291 toku_mutex_destroy(&m_mutexes[i].aligned_mutex);
3292 }
3293 toku_pthread_rwlock_destroy(&m_list_lock);
3294 toku_pthread_rwlock_destroy(&m_pending_lock_expensive);
3295 toku_pthread_rwlock_destroy(&m_pending_lock_cheap);
3296 toku_free(m_table);
3297 toku_free(m_mutexes);
3298}
3299
3300// adds a PAIR to the cachetable's structures,
3301// but does NOT add it to the list maintained by
3302// the cachefile
3303void pair_list::add_to_cachetable_only(PAIR p) {
3304 // sanity check to make sure that the PAIR does not already exist
3305 PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash);
3306 assert(pp == NULL);
3307
3308 this->add_to_clock(p);
3309 this->add_to_hash_chain(p);
3310 m_n_in_table++;
3311}
3312
3313// This places the given pair inside of the pair list.
3314//
3315// requires caller to have grabbed write lock on list.
3316// requires caller to have p->mutex held as well
3317//
3318void pair_list::put(PAIR p) {
3319 this->add_to_cachetable_only(p);
3320 this->add_to_cf_list(p);
3321}
3322
3323// This removes the given pair from completely from the pair list.
3324//
3325// requires caller to have grabbed write lock on list, and p->mutex held
3326//
3327void pair_list::evict_completely(PAIR p) {
3328 this->evict_from_cachetable(p);
3329 this->evict_from_cachefile(p);
3330}
3331
3332// Removes the PAIR from the cachetable's lists,
3333// but does NOT impact the list maintained by the cachefile
3334void pair_list::evict_from_cachetable(PAIR p) {
3335 this->pair_remove(p);
3336 this->pending_pairs_remove(p);
3337 this->remove_from_hash_chain(p);
3338
3339 assert(m_n_in_table > 0);
3340 m_n_in_table--;
3341}
3342
3343// Removes the PAIR from the cachefile's list of PAIRs
3344void pair_list::evict_from_cachefile(PAIR p) {
3345 evict_pair_from_cachefile(p);
3346}
3347
3348//
3349// Remove pair from linked list for cleaner/clock
3350//
3351//
3352// requires caller to have grabbed write lock on list.
3353//
3354void pair_list::pair_remove (PAIR p) {
3355 if (p->clock_prev == p) {
3356 invariant(m_clock_head == p);
3357 invariant(p->clock_next == p);
3358 invariant(m_cleaner_head == p);
3359 invariant(m_checkpoint_head == p);
3360 m_clock_head = NULL;
3361 m_cleaner_head = NULL;
3362 m_checkpoint_head = NULL;
3363 }
3364 else {
3365 if (p == m_clock_head) {
3366 m_clock_head = m_clock_head->clock_next;
3367 }
3368 if (p == m_cleaner_head) {
3369 m_cleaner_head = m_cleaner_head->clock_next;
3370 }
3371 if (p == m_checkpoint_head) {
3372 m_checkpoint_head = m_checkpoint_head->clock_next;
3373 }
3374 p->clock_prev->clock_next = p->clock_next;
3375 p->clock_next->clock_prev = p->clock_prev;
3376 }
3377 p->clock_prev = p->clock_next = NULL;
3378}
3379
3380//Remove a pair from the list of pairs that were marked with the
3381//pending bit for the in-progress checkpoint.
3382//
3383// requires that if the caller is the checkpoint thread, then a read lock
3384// is grabbed on the list. Otherwise, must have write lock on list.
3385//
3386void pair_list::pending_pairs_remove (PAIR p) {
3387 if (p->pending_next) {
3388 p->pending_next->pending_prev = p->pending_prev;
3389 }
3390 if (p->pending_prev) {
3391 p->pending_prev->pending_next = p->pending_next;
3392 }
3393 else if (m_pending_head==p) {
3394 m_pending_head = p->pending_next;
3395 }
3396 p->pending_prev = p->pending_next = NULL;
3397}
3398
3399void pair_list::remove_from_hash_chain(PAIR p) {
3400 // Remove it from the hash chain.
3401 unsigned int h = p->fullhash&(m_table_size - 1);
3402 paranoid_invariant(m_table[h] != NULL);
3403 if (m_table[h] == p) {
3404 m_table[h] = p->hash_chain;
3405 }
3406 else {
3407 PAIR curr = m_table[h];
3408 while (curr->hash_chain != p) {
3409 curr = curr->hash_chain;
3410 }
3411 // remove p from the singular linked list
3412 curr->hash_chain = p->hash_chain;
3413 }
3414 p->hash_chain = NULL;
3415}
3416
3417// Returns a pair from the pair list, using the given
3418// pair. If the pair cannot be found, null is returned.
3419//
3420// requires caller to have grabbed either a read lock on the list or
3421// bucket's mutex.
3422//
3423PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) {
3424 PAIR found_pair = nullptr;
3425 for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) {
3426 if (p->key.b == key.b && p->cachefile == file) {
3427 found_pair = p;
3428 break;
3429 }
3430 }
3431 return found_pair;
3432}
3433
3434// Add PAIR to linked list shared by cleaner thread and clock
3435//
3436// requires caller to have grabbed write lock on list.
3437//
3438void pair_list::add_to_clock (PAIR p) {
3439 // requires that p is not currently in the table.
3440 // inserts p into the clock list at the tail.
3441
3442 p->count = CLOCK_INITIAL_COUNT;
3443 //assert either both head and tail are set or they are both NULL
3444 // tail and head exist
3445 if (m_clock_head) {
3446 assert(m_cleaner_head);
3447 assert(m_checkpoint_head);
3448 // insert right before the head
3449 p->clock_next = m_clock_head;
3450 p->clock_prev = m_clock_head->clock_prev;
3451
3452 p->clock_prev->clock_next = p;
3453 p->clock_next->clock_prev = p;
3454
3455 }
3456 // this is the first element in the list
3457 else {
3458 m_clock_head = p;
3459 p->clock_next = p->clock_prev = m_clock_head;
3460 m_cleaner_head = p;
3461 m_checkpoint_head = p;
3462 }
3463}
3464
3465// add the pair to the linked list that of PAIRs belonging
3466// to the same cachefile. This linked list is used
3467// in cachetable_flush_cachefile.
3468void pair_list::add_to_cf_list(PAIR p) {
3469 CACHEFILE cf = p->cachefile;
3470 if (cf->cf_head) {
3471 cf->cf_head->cf_prev = p;
3472 }
3473 p->cf_next = cf->cf_head;
3474 p->cf_prev = NULL;
3475 cf->cf_head = p;
3476 cf->num_pairs++;
3477}
3478
3479// Add PAIR to the hashtable
3480//
3481// requires caller to have grabbed write lock on list
3482// and to have grabbed the p->mutex.
3483void pair_list::add_to_hash_chain(PAIR p) {
3484 uint32_t h = p->fullhash & (m_table_size - 1);
3485 p->hash_chain = m_table[h];
3486 m_table[h] = p;
3487}
3488
3489// test function
3490//
3491// grabs and releases write list lock
3492//
3493void pair_list::verify() {
3494 this->write_list_lock();
3495 uint32_t num_found = 0;
3496
3497 // First clear all the verify flags by going through the hash chains
3498 {
3499 uint32_t i;
3500 for (i = 0; i < m_table_size; i++) {
3501 PAIR p;
3502 for (p = m_table[i]; p; p = p->hash_chain) {
3503 num_found++;
3504 }
3505 }
3506 }
3507 assert(num_found == m_n_in_table);
3508 num_found = 0;
3509 // Now go through the clock chain, make sure everything in the LRU chain is hashed.
3510 {
3511 PAIR p;
3512 bool is_first = true;
3513 for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) {
3514 is_first=false;
3515 PAIR p2;
3516 uint32_t fullhash = p->fullhash;
3517 //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
3518 for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) {
3519 if (p2==p) {
3520 /* found it */
3521 num_found++;
3522 goto next;
3523 }
3524 }
3525 fprintf(stderr, "Something in the clock chain is not hashed\n");
3526 assert(0);
3527 next:;
3528 }
3529 assert (num_found == m_n_in_table);
3530 }
3531 this->write_list_unlock();
3532}
3533
3534// If given pointers are not null, assign the hash table size of
3535// this pair list and the number of pairs in this pair list.
3536//
3537//
3538// grabs and releases read list lock
3539//
3540void pair_list::get_state(int *num_entries, int *hash_size) {
3541 this->read_list_lock();
3542 if (num_entries) {
3543 *num_entries = m_n_in_table;
3544 }
3545 if (hash_size) {
3546 *hash_size = m_table_size;
3547 }
3548 this->read_list_unlock();
3549}
3550
3551void pair_list::read_list_lock() {
3552 toku_pthread_rwlock_rdlock(&m_list_lock);
3553}
3554
3555void pair_list::read_list_unlock() {
3556 toku_pthread_rwlock_rdunlock(&m_list_lock);
3557}
3558
3559void pair_list::write_list_lock() {
3560 toku_pthread_rwlock_wrlock(&m_list_lock);
3561}
3562
3563void pair_list::write_list_unlock() {
3564 toku_pthread_rwlock_wrunlock(&m_list_lock);
3565}
3566
3567void pair_list::read_pending_exp_lock() {
3568 toku_pthread_rwlock_rdlock(&m_pending_lock_expensive);
3569}
3570
3571void pair_list::read_pending_exp_unlock() {
3572 toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive);
3573}
3574
3575void pair_list::write_pending_exp_lock() {
3576 toku_pthread_rwlock_wrlock(&m_pending_lock_expensive);
3577}
3578
3579void pair_list::write_pending_exp_unlock() {
3580 toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive);
3581}
3582
3583void pair_list::read_pending_cheap_lock() {
3584 toku_pthread_rwlock_rdlock(&m_pending_lock_cheap);
3585}
3586
3587void pair_list::read_pending_cheap_unlock() {
3588 toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap);
3589}
3590
3591void pair_list::write_pending_cheap_lock() {
3592 toku_pthread_rwlock_wrlock(&m_pending_lock_cheap);
3593}
3594
3595void pair_list::write_pending_cheap_unlock() {
3596 toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap);
3597}
3598
3599toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) {
3600 return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex;
3601}
3602
3603void pair_list::pair_lock_by_fullhash(uint32_t fullhash) {
3604 toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3605}
3606
3607void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) {
3608 toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3609}
3610
3611
3612ENSURE_POD(evictor);
3613
3614//
3615// This is the function that runs eviction on its own thread.
3616//
3617static void *eviction_thread(void *evictor_v) {
3618 evictor *CAST_FROM_VOIDP(evictor, evictor_v);
3619 evictor->run_eviction_thread();
3620 return toku_pthread_done(evictor_v);
3621}
3622
3623//
3624// Starts the eviction thread, assigns external object references,
3625// and initializes all counters and condition variables.
3626//
3627int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
3628 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running);
3629 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting);
3630
3631 // set max difference to around 500MB
3632 int64_t max_diff = (1 << 29);
3633
3634 m_low_size_watermark = _size_limit;
3635 // these values are selected kind of arbitrarily right now as
3636 // being a percentage more than low_size_watermark, which is provided
3637 // by the caller.
3638 m_low_size_hysteresis = (11 * _size_limit)/10; //10% more
3639 if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) {
3640 m_low_size_hysteresis = m_low_size_watermark + max_diff;
3641 }
3642 m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more
3643 if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) {
3644 m_high_size_hysteresis = m_low_size_hysteresis + max_diff;
3645 }
3646 m_high_size_watermark = (3 * _size_limit)/2; // 50% more
3647 if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) {
3648 m_high_size_watermark = m_high_size_hysteresis + max_diff;
3649 }
3650
3651 m_enable_partial_eviction = true;
3652
3653 m_size_reserved = unreservable_memory(_size_limit);
3654 m_size_current = 0;
3655 m_size_cloned_data = 0;
3656 m_size_evicting = 0;
3657
3658 m_size_nonleaf = create_partitioned_counter();
3659 m_size_leaf = create_partitioned_counter();
3660 m_size_rollback = create_partitioned_counter();
3661 m_size_cachepressure = create_partitioned_counter();
3662 m_wait_pressure_count = create_partitioned_counter();
3663 m_wait_pressure_time = create_partitioned_counter();
3664 m_long_wait_pressure_count = create_partitioned_counter();
3665 m_long_wait_pressure_time = create_partitioned_counter();
3666
3667 m_pl = _pl;
3668 m_cf_list = _cf_list;
3669 m_kibbutz = _kibbutz;
3670 toku_mutex_init(
3671 *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr);
3672 toku_cond_init(
3673 *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr);
3674 toku_cond_init(
3675 *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr);
3676 m_num_sleepers = 0;
3677 m_ev_thread_is_running = false;
3678 m_period_in_seconds = eviction_period;
3679
3680 unsigned int seed = (unsigned int) time(NULL);
3681 int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data);
3682 assert_zero(r);
3683
3684 // start the background thread
3685 m_run_thread = true;
3686 m_num_eviction_thread_runs = 0;
3687 m_ev_thread_init = false;
3688 r = toku_pthread_create(
3689 *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this);
3690 if (r == 0) {
3691 m_ev_thread_init = true;
3692 }
3693 m_evictor_init = true;
3694 return r;
3695}
3696
3697//
3698// This stops the eviction thread and clears the condition variable.
3699//
3700// NOTE: This should only be called if there are no evictions in progress.
3701//
3702void evictor::destroy() {
3703 if (!m_evictor_init) {
3704 return;
3705 }
3706 assert(m_size_evicting == 0);
3707 //
3708 // commented out of Ming, because we could not finish
3709 // #5672. Once #5672 is solved, we should restore this
3710 //
3711 //assert(m_size_current == 0);
3712
3713 // Stop the eviction thread.
3714 if (m_ev_thread_init) {
3715 toku_mutex_lock(&m_ev_thread_lock);
3716 m_run_thread = false;
3717 this->signal_eviction_thread_locked();
3718 toku_mutex_unlock(&m_ev_thread_lock);
3719 void *ret;
3720 int r = toku_pthread_join(m_ev_thread, &ret);
3721 assert_zero(r);
3722 assert(!m_ev_thread_is_running);
3723 }
3724 destroy_partitioned_counter(m_size_nonleaf);
3725 m_size_nonleaf = NULL;
3726 destroy_partitioned_counter(m_size_leaf);
3727 m_size_leaf = NULL;
3728 destroy_partitioned_counter(m_size_rollback);
3729 m_size_rollback = NULL;
3730 destroy_partitioned_counter(m_size_cachepressure);
3731 m_size_cachepressure = NULL;
3732
3733 destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL;
3734 destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL;
3735 destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL;
3736 destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL;
3737
3738 toku_cond_destroy(&m_flow_control_cond);
3739 toku_cond_destroy(&m_ev_thread_cond);
3740 toku_mutex_destroy(&m_ev_thread_lock);
3741}
3742
3743//
3744// Increases status variables and the current size variable
3745// of the evictor based on the given pair attribute.
3746//
3747void evictor::add_pair_attr(PAIR_ATTR attr) {
3748 assert(attr.is_valid);
3749 add_to_size_current(attr.size);
3750 increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size);
3751 increment_partitioned_counter(m_size_leaf, attr.leaf_size);
3752 increment_partitioned_counter(m_size_rollback, attr.rollback_size);
3753 increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size);
3754}
3755
3756//
3757// Decreases status variables and the current size variable
3758// of the evictor based on the given pair attribute.
3759//
3760void evictor::remove_pair_attr(PAIR_ATTR attr) {
3761 assert(attr.is_valid);
3762 remove_from_size_current(attr.size);
3763 increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size);
3764 increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size);
3765 increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size);
3766 increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size);
3767}
3768
3769//
3770// Updates this evictor's stats to match the "new" pair attribute given
3771// while also removing the given "old" pair attribute.
3772//
3773void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) {
3774 this->add_pair_attr(new_attr);
3775 this->remove_pair_attr(old_attr);
3776}
3777
3778//
3779// Adds the given size to the evictor's estimation of
3780// the size of the cachetable.
3781//
3782void evictor::add_to_size_current(long size) {
3783 (void) toku_sync_fetch_and_add(&m_size_current, size);
3784}
3785
3786//
3787// Subtracts the given size from the evictor's current
3788// approximation of the cachetable size.
3789//
3790void evictor::remove_from_size_current(long size) {
3791 (void) toku_sync_fetch_and_sub(&m_size_current, size);
3792}
3793
3794//
3795// Adds the size of cloned data to necessary variables in the evictor
3796//
3797void evictor::add_cloned_data_size(long size) {
3798 (void) toku_sync_fetch_and_add(&m_size_cloned_data, size);
3799 add_to_size_current(size);
3800}
3801
3802//
3803// Removes the size of cloned data to necessary variables in the evictor
3804//
3805void evictor::remove_cloned_data_size(long size) {
3806 (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size);
3807 remove_from_size_current(size);
3808}
3809
3810//
3811// TODO: (Zardosht) comment this function
3812//
3813uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) {
3814 toku_mutex_lock(&m_ev_thread_lock);
3815 uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved);
3816 if (0) { // debug
3817 fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n", __PRETTY_FUNCTION__, reserved_memory, upper_bound);
3818 }
3819 if (upper_bound > 0 && reserved_memory > upper_bound) {
3820 reserved_memory = upper_bound;
3821 }
3822 m_size_reserved += reserved_memory;
3823 (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory);
3824 this->signal_eviction_thread_locked();
3825 toku_mutex_unlock(&m_ev_thread_lock);
3826
3827 if (this->should_client_thread_sleep()) {
3828 this->wait_for_cache_pressure_to_subside();
3829 }
3830 return reserved_memory;
3831}
3832
3833//
3834// TODO: (Zardosht) comment this function
3835//
3836void evictor::release_reserved_memory(uint64_t reserved_memory){
3837 (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory);
3838 toku_mutex_lock(&m_ev_thread_lock);
3839 m_size_reserved -= reserved_memory;
3840 // signal the eviction thread in order to possibly wake up sleeping clients
3841 if (m_num_sleepers > 0) {
3842 this->signal_eviction_thread_locked();
3843 }
3844 toku_mutex_unlock(&m_ev_thread_lock);
3845}
3846
3847//
3848// This function is the eviction thread. It runs for the lifetime of
3849// the evictor. Goes to sleep for period_in_seconds
3850// by waiting on m_ev_thread_cond.
3851//
3852void evictor::run_eviction_thread(){
3853 toku_mutex_lock(&m_ev_thread_lock);
3854 while (m_run_thread) {
3855 m_num_eviction_thread_runs++; // for test purposes only
3856 m_ev_thread_is_running = true;
3857 // responsibility of run_eviction to release and
3858 // regrab ev_thread_lock as it sees fit
3859 this->run_eviction();
3860 m_ev_thread_is_running = false;
3861
3862 if (m_run_thread) {
3863 //
3864 // sleep until either we are signaled
3865 // via signal_eviction_thread or
3866 // m_period_in_seconds amount of time has passed
3867 //
3868 if (m_period_in_seconds) {
3869 toku_timespec_t wakeup_time;
3870 struct timeval tv;
3871 gettimeofday(&tv, 0);
3872 wakeup_time.tv_sec = tv.tv_sec;
3873 wakeup_time.tv_nsec = tv.tv_usec * 1000LL;
3874 wakeup_time.tv_sec += m_period_in_seconds;
3875 toku_cond_timedwait(
3876 &m_ev_thread_cond,
3877 &m_ev_thread_lock,
3878 &wakeup_time
3879 );
3880 }
3881 // for test purposes, we have an option of
3882 // not waiting on a period, but rather sleeping indefinitely
3883 else {
3884 toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock);
3885 }
3886 }
3887 }
3888 toku_mutex_unlock(&m_ev_thread_lock);
3889}
3890
3891//
3892// runs eviction.
3893// on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed
3894// it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit.
3895//
3896void evictor::run_eviction(){
3897 //
3898 // These variables will help us detect if everything in the clock is currently being accessed.
3899 // We must detect this case otherwise we will end up in an infinite loop below.
3900 //
3901 bool exited_early = false;
3902 uint32_t num_pairs_examined_without_evicting = 0;
3903
3904 while (this->eviction_needed()) {
3905 if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) {
3906 toku_cond_broadcast(&m_flow_control_cond);
3907 }
3908 // release ev_thread_lock so that eviction may run without holding mutex
3909 toku_mutex_unlock(&m_ev_thread_lock);
3910
3911 // first try to do an eviction from stale cachefiles
3912 bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this);
3913 if (!some_eviction_ran) {
3914 m_pl->read_list_lock();
3915 PAIR curr_in_clock = m_pl->m_clock_head;
3916 // if nothing to evict, we need to exit
3917 if (!curr_in_clock) {
3918 m_pl->read_list_unlock();
3919 toku_mutex_lock(&m_ev_thread_lock);
3920 exited_early = true;
3921 goto exit;
3922 }
3923 if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) {
3924 // we have a cycle where everything in the clock is in use
3925 // do not return an error
3926 // just let memory be overfull
3927 m_pl->read_list_unlock();
3928 toku_mutex_lock(&m_ev_thread_lock);
3929 exited_early = true;
3930 goto exit;
3931 }
3932 bool eviction_run = run_eviction_on_pair(curr_in_clock);
3933 if (eviction_run) {
3934 // reset the count
3935 num_pairs_examined_without_evicting = 0;
3936 }
3937 else {
3938 num_pairs_examined_without_evicting++;
3939 }
3940 // at this point, either curr_in_clock is still in the list because it has not been fully evicted,
3941 // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted
3942 // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock
3943 // modified ct->m_clock_head
3944 if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) {
3945 m_pl->m_clock_head = m_pl->m_clock_head->clock_next;
3946 }
3947 m_pl->read_list_unlock();
3948 }
3949 toku_mutex_lock(&m_ev_thread_lock);
3950 }
3951
3952exit:
3953 if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) {
3954 toku_cond_broadcast(&m_flow_control_cond);
3955 }
3956 return;
3957}
3958
3959//
3960// NOTE: Cachetable lock held on entry.
3961// Runs eviction on the given PAIR. This may be a
3962// partial eviction or full eviction.
3963//
3964// on entry, pair mutex is NOT held, but pair list's read list lock
3965// IS held
3966// on exit, the same conditions must apply
3967//
3968bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
3969 uint32_t n_in_table;
3970 int64_t size_current;
3971 bool ret_val = false;
3972 // function meant to be called on PAIR that is not being accessed right now
3973 CACHEFILE cf = curr_in_clock->cachefile;
3974 int r = bjm_add_background_job(cf->bjm);
3975 if (r) {
3976 goto exit;
3977 }
3978 pair_lock(curr_in_clock);
3979 // these are the circumstances under which we don't run eviction on a pair:
3980 // - if other users are waiting on the lock
3981 // - if the PAIR is referenced by users
3982 // - if the PAIR's disk_nb_mutex is in use, implying that it is
3983 // undergoing a checkpoint
3984 if (curr_in_clock->value_rwlock.users() ||
3985 curr_in_clock->refcount > 0 ||
3986 nb_mutex_users(&curr_in_clock->disk_nb_mutex))
3987 {
3988 pair_unlock(curr_in_clock);
3989 bjm_remove_background_job(cf->bjm);
3990 goto exit;
3991 }
3992
3993 // extract and use these values so that we don't risk them changing
3994 // out from underneath us in calculations below.
3995 n_in_table = m_pl->m_n_in_table;
3996 size_current = m_size_current;
3997
3998 // now that we have the pair mutex we care about, we can
3999 // release the read list lock and reacquire it at the end of the function
4000 m_pl->read_list_unlock();
4001 ret_val = true;
4002 if (curr_in_clock->count > 0) {
4003 toku::context pe_ctx(CTX_PARTIAL_EVICTION);
4004
4005 uint32_t curr_size = curr_in_clock->attr.size;
4006 // if the size of this PAIR is greater than the average size of PAIRs
4007 // in the cachetable, then decrement it, otherwise, decrement
4008 // probabilistically
4009 if (curr_size*n_in_table >= size_current) {
4010 curr_in_clock->count--;
4011 } else {
4012 // generate a random number between 0 and 2^16
4013 assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows
4014 int32_t rnd = myrandom_r(&m_random_data) % (1<<16);
4015 // The if-statement below will be true with probability of
4016 // curr_size/(average size of PAIR in cachetable)
4017 // Here is how the math is done:
4018 // average_size = size_current/n_in_table
4019 // curr_size/average_size = curr_size*n_in_table/size_current
4020 // we evaluate if a random number from 0 to 2^16 is less than
4021 // than curr_size/average_size * 2^16. So, our if-clause should be
4022 // if (2^16*curr_size/average_size > rnd)
4023 // this evaluates to:
4024 // if (2^16*curr_size*n_in_table/size_current > rnd)
4025 // by multiplying each side of the equation by size_current, we get
4026 // if (2^16*curr_size*n_in_table > rnd*size_current)
4027 // and dividing each side by 2^16,
4028 // we get the if-clause below
4029 //
4030 if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) {
4031 curr_in_clock->count--;
4032 }
4033 }
4034
4035 if (m_enable_partial_eviction) {
4036 // call the partial eviction callback
4037 curr_in_clock->value_rwlock.write_lock(true);
4038
4039 void *value = curr_in_clock->value_data;
4040 void* disk_data = curr_in_clock->disk_data;
4041 void *write_extraargs = curr_in_clock->write_extraargs;
4042 enum partial_eviction_cost cost;
4043 long bytes_freed_estimate = 0;
4044 curr_in_clock->pe_est_callback(value, disk_data,
4045 &bytes_freed_estimate, &cost,
4046 write_extraargs);
4047 if (cost == PE_CHEAP) {
4048 pair_unlock(curr_in_clock);
4049 curr_in_clock->size_evicting_estimate = 0;
4050 this->do_partial_eviction(curr_in_clock);
4051 bjm_remove_background_job(cf->bjm);
4052 } else if (cost == PE_EXPENSIVE) {
4053 // only bother running an expensive partial eviction
4054 // if it is expected to free space
4055 if (bytes_freed_estimate > 0) {
4056 pair_unlock(curr_in_clock);
4057 curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
4058 toku_mutex_lock(&m_ev_thread_lock);
4059 m_size_evicting += bytes_freed_estimate;
4060 toku_mutex_unlock(&m_ev_thread_lock);
4061 toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction,
4062 curr_in_clock);
4063 } else {
4064 curr_in_clock->value_rwlock.write_unlock();
4065 pair_unlock(curr_in_clock);
4066 bjm_remove_background_job(cf->bjm);
4067 }
4068 } else {
4069 assert(false);
4070 }
4071 } else {
4072 pair_unlock(curr_in_clock);
4073 bjm_remove_background_job(cf->bjm);
4074 }
4075 } else {
4076 toku::context pe_ctx(CTX_FULL_EVICTION);
4077
4078 // responsibility of try_evict_pair to eventually remove background job
4079 // pair's mutex is still grabbed here
4080 this->try_evict_pair(curr_in_clock);
4081 }
4082 // regrab the read list lock, because the caller assumes
4083 // that it is held. The contract requires this.
4084 m_pl->read_list_lock();
4085exit:
4086 return ret_val;
4087}
4088
4089struct pair_unpin_with_new_attr_extra {
4090 pair_unpin_with_new_attr_extra(evictor *e, PAIR p) :
4091 ev(e), pair(p) {
4092 }
4093 evictor *ev;
4094 PAIR pair;
4095};
4096
4097static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) {
4098 struct pair_unpin_with_new_attr_extra *info =
4099 reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra);
4100 PAIR p = info->pair;
4101 evictor *ev = info->ev;
4102
4103 // change the attr in the evictor, then update the value in the pair
4104 ev->change_pair_attr(p->attr, new_attr);
4105 p->attr = new_attr;
4106
4107 // unpin
4108 pair_lock(p);
4109 p->value_rwlock.write_unlock();
4110 pair_unlock(p);
4111}
4112
4113//
4114// on entry and exit, pair's mutex is not held
4115// on exit, PAIR is unpinned
4116//
4117void evictor::do_partial_eviction(PAIR p) {
4118 // Copy the old attr
4119 PAIR_ATTR old_attr = p->attr;
4120 long long size_evicting_estimate = p->size_evicting_estimate;
4121
4122 struct pair_unpin_with_new_attr_extra extra(this, p);
4123 p->pe_callback(p->value_data, old_attr, p->write_extraargs,
4124 // passed as the finalize continuation, which allows the
4125 // pe_callback to unpin the node before doing expensive cleanup
4126 pair_unpin_with_new_attr, &extra);
4127
4128 // now that the pe_callback (and its pair_unpin_with_new_attr continuation)
4129 // have finished, we can safely decrease size_evicting
4130 this->decrease_size_evicting(size_evicting_estimate);
4131}
4132
4133//
4134// CT lock held on entry
4135// background job has been added for p->cachefile on entry
4136// responsibility of this function to make sure that background job is removed
4137//
4138// on entry, pair's mutex is held, on exit, the pair's mutex is NOT held
4139//
4140void evictor::try_evict_pair(PAIR p) {
4141 CACHEFILE cf = p->cachefile;
4142 // evictions without a write or unpinned pair's that are clean
4143 // can be run in the current thread
4144
4145 // the only caller, run_eviction_on_pair, should call this function
4146 // only if no one else is trying to use it
4147 assert(!p->value_rwlock.users());
4148 p->value_rwlock.write_lock(true);
4149 // if the PAIR is dirty, the running eviction requires writing the
4150 // PAIR out. if the disk_nb_mutex is grabbed, then running
4151 // eviction requires waiting for the disk_nb_mutex to become available,
4152 // which may be expensive. Hence, if either is true, we
4153 // do the eviction on a writer thread
4154 if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) {
4155 p->size_evicting_estimate = 0;
4156 //
4157 // This method will unpin PAIR and release PAIR mutex
4158 //
4159 // because the PAIR is not dirty, we can safely pass
4160 // false for the for_checkpoint parameter
4161 this->evict_pair(p, false);
4162 bjm_remove_background_job(cf->bjm);
4163 }
4164 else {
4165 pair_unlock(p);
4166 toku_mutex_lock(&m_ev_thread_lock);
4167 assert(m_size_evicting >= 0);
4168 p->size_evicting_estimate = p->attr.size;
4169 m_size_evicting += p->size_evicting_estimate;
4170 assert(m_size_evicting >= 0);
4171 toku_mutex_unlock(&m_ev_thread_lock);
4172 toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p);
4173 }
4174}
4175
4176//
4177// Requires: This thread must hold the write lock (nb_mutex) for the pair.
4178// The pair's mutex (p->mutex) is also held.
4179// on exit, neither is held
4180//
4181void evictor::evict_pair(PAIR p, bool for_checkpoint) {
4182 if (p->dirty) {
4183 pair_unlock(p);
4184 cachetable_write_locked_pair(this, p, for_checkpoint);
4185 pair_lock(p);
4186 }
4187 // one thing we can do here is extract the size_evicting estimate,
4188 // have decrease_size_evicting take the estimate and not the pair,
4189 // and do this work after we have called
4190 // cachetable_maybe_remove_and_free_pair
4191 this->decrease_size_evicting(p->size_evicting_estimate);
4192 // if we are to remove this pair, we need the write list lock,
4193 // to get it in a way that avoids deadlocks, we must first release
4194 // the pair's mutex, then grab the write list lock, then regrab the
4195 // pair's mutex. The pair cannot go anywhere because
4196 // the pair is still pinned
4197 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
4198 pair_unlock(p);
4199 m_pl->write_list_lock();
4200 pair_lock(p);
4201 p->value_rwlock.write_unlock();
4202 nb_mutex_unlock(&p->disk_nb_mutex);
4203 // at this point, we have the pair list's write list lock
4204 // and we have the pair's mutex (p->mutex) held
4205
4206 // this ensures that a clone running in the background first completes
4207 bool removed = false;
4208 if (p->value_rwlock.users() == 0 && p->refcount == 0) {
4209 // assumption is that if we are about to remove the pair
4210 // that no one has grabbed the disk_nb_mutex,
4211 // and that there is no cloned_value_data, because
4212 // no one is writing a cloned value out.
4213 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
4214 assert(p->cloned_value_data == NULL);
4215 cachetable_remove_pair(m_pl, this, p);
4216 removed = true;
4217 }
4218 pair_unlock(p);
4219 m_pl->write_list_unlock();
4220 // do not want to hold the write list lock while freeing a pair
4221 if (removed) {
4222 cachetable_free_pair(p);
4223 }
4224}
4225
4226//
4227// this function handles the responsibilities for writer threads when they
4228// decrease size_evicting. The responsibilities are:
4229// - decrease m_size_evicting in a thread safe manner
4230// - in some circumstances, signal the eviction thread
4231//
4232void evictor::decrease_size_evicting(long size_evicting_estimate) {
4233 if (size_evicting_estimate > 0) {
4234 toku_mutex_lock(&m_ev_thread_lock);
4235 int64_t buffer = m_high_size_hysteresis - m_low_size_watermark;
4236 // if size_evicting is transitioning from greater than buffer to below buffer, and
4237 // some client threads are sleeping, we need to wake up the eviction thread.
4238 // Here is why. In this scenario, we are in one of two cases:
4239 // - size_current - size_evicting < low_size_watermark
4240 // If this is true, then size_current < high_size_hysteresis, which
4241 // means we need to wake up sleeping clients
4242 // - size_current - size_evicting > low_size_watermark,
4243 // which means more evictions must be run.
4244 // The consequences of both cases are the responsibility
4245 // of the eviction thread.
4246 //
4247 bool need_to_signal_ev_thread =
4248 (m_num_sleepers > 0) &&
4249 !m_ev_thread_is_running &&
4250 (m_size_evicting > buffer) &&
4251 ((m_size_evicting - size_evicting_estimate) <= buffer);
4252 m_size_evicting -= size_evicting_estimate;
4253 assert(m_size_evicting >= 0);
4254 if (need_to_signal_ev_thread) {
4255 this->signal_eviction_thread_locked();
4256 }
4257 toku_mutex_unlock(&m_ev_thread_lock);
4258 }
4259}
4260
4261//
4262// Wait for cache table space to become available
4263// size_current is number of bytes currently occupied by data (referred to by pairs)
4264// size_evicting is number of bytes queued up to be evicted
4265//
4266void evictor::wait_for_cache_pressure_to_subside() {
4267 uint64_t t0 = toku_current_time_microsec();
4268 toku_mutex_lock(&m_ev_thread_lock);
4269 m_num_sleepers++;
4270 this->signal_eviction_thread_locked();
4271 toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock);
4272 m_num_sleepers--;
4273 toku_mutex_unlock(&m_ev_thread_lock);
4274 uint64_t t1 = toku_current_time_microsec();
4275 increment_partitioned_counter(m_wait_pressure_count, 1);
4276 uint64_t tdelta = t1 - t0;
4277 increment_partitioned_counter(m_wait_pressure_time, tdelta);
4278 if (tdelta > 1000000) {
4279 increment_partitioned_counter(m_long_wait_pressure_count, 1);
4280 increment_partitioned_counter(m_long_wait_pressure_time, tdelta);
4281 }
4282}
4283
4284//
4285// Get the status of the current estimated size of the cachetable,
4286// and the evictor's set limit.
4287//
4288void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) {
4289 if (size_current_ptr) {
4290 *size_current_ptr = m_size_current;
4291 }
4292 if (size_limit_ptr) {
4293 *size_limit_ptr = m_low_size_watermark;
4294 }
4295}
4296
4297//
4298// Force the eviction thread to do some work.
4299//
4300// This function does not require any mutex to be held.
4301// As a result, scheduling is not guaranteed, but that is tolerable.
4302//
4303void evictor::signal_eviction_thread() {
4304 toku_mutex_lock(&m_ev_thread_lock);
4305 toku_cond_signal(&m_ev_thread_cond);
4306 toku_mutex_unlock(&m_ev_thread_lock);
4307}
4308
4309void evictor::signal_eviction_thread_locked() {
4310 toku_cond_signal(&m_ev_thread_cond);
4311}
4312
4313//
4314// Returns true if the cachetable is so over subscribed, that a client thread should sleep
4315//
4316// This function may be called in a thread-unsafe manner. Locks are not
4317// required to read size_current. The result is that
4318// the values may be a little off, but we think that is tolerable.
4319//
4320bool evictor::should_client_thread_sleep(){
4321 return unsafe_read_size_current() > m_high_size_watermark;
4322}
4323
4324//
4325// Returns true if a sleeping client should be woken up because
4326// the cachetable is not overly subscribed
4327//
4328// This function may be called in a thread-unsafe manner. Locks are not
4329// required to read size_current. The result is that
4330// the values may be a little off, but we think that is tolerable.
4331//
4332bool evictor::should_sleeping_clients_wakeup() {
4333 return unsafe_read_size_current() <= m_high_size_hysteresis;
4334}
4335
4336//
4337// Returns true if a client thread should try to wake up the eviction
4338// thread because the client thread has noticed too much data taken
4339// up in the cachetable.
4340//
4341// This function may be called in a thread-unsafe manner. Locks are not
4342// required to read size_current or size_evicting. The result is that
4343// the values may be a little off, but we think that is tolerable.
4344// If the caller wants to ensure that ev_thread_is_running and size_evicting
4345// are accurate, then the caller must hold ev_thread_lock before
4346// calling this function.
4347//
4348bool evictor::should_client_wake_eviction_thread() {
4349 return
4350 !m_ev_thread_is_running &&
4351 ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis);
4352}
4353
4354//
4355// Determines if eviction is needed. If the current size of
4356// the cachetable exceeds the sum of our fixed size limit and
4357// the amount of data currently being evicted, then eviction is needed
4358//
4359bool evictor::eviction_needed() {
4360 return (m_size_current - m_size_evicting) > m_low_size_watermark;
4361}
4362
4363inline int64_t evictor::unsafe_read_size_current(void) const {
4364 return m_size_current;
4365}
4366
4367void evictor::fill_engine_status() {
4368 CT_STATUS_VAL(CT_SIZE_CURRENT) = m_size_current;
4369 CT_STATUS_VAL(CT_SIZE_LIMIT) = m_low_size_hysteresis;
4370 CT_STATUS_VAL(CT_SIZE_WRITING) = m_size_evicting;
4371 CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf);
4372 CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf);
4373 CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback);
4374 CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure);
4375 CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data;
4376 CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count);
4377 CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time);
4378 CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count);
4379 CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time);
4380}
4381
4382void evictor::set_enable_partial_eviction(bool enabled) {
4383 m_enable_partial_eviction = enabled;
4384}
4385
4386bool evictor::get_enable_partial_eviction(void) const {
4387 return m_enable_partial_eviction;
4388}
4389
4390////////////////////////////////////////////////////////////////////////////////
4391
4392ENSURE_POD(checkpointer);
4393
4394//
4395// Sets the cachetable reference in this checkpointer class, this is temporary.
4396//
4397int checkpointer::init(pair_list *_pl,
4398 TOKULOGGER _logger,
4399 evictor *_ev,
4400 cachefile_list *files) {
4401 m_list = _pl;
4402 m_logger = _logger;
4403 m_ev = _ev;
4404 m_cf_list = files;
4405 bjm_init(&m_checkpoint_clones_bjm);
4406
4407 // Default is no checkpointing.
4408 m_checkpointer_cron_init = false;
4409 int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
4410 if (r == 0) {
4411 m_checkpointer_cron_init = true;
4412 }
4413 m_checkpointer_init = true;
4414 return r;
4415}
4416
4417void checkpointer::destroy() {
4418 if (!m_checkpointer_init) {
4419 return;
4420 }
4421 if (m_checkpointer_cron_init && !this->has_been_shutdown()) {
4422 // for test code only, production code uses toku_cachetable_minicron_shutdown()
4423 int r = this->shutdown();
4424 assert(r == 0);
4425 }
4426 bjm_destroy(m_checkpoint_clones_bjm);
4427}
4428
4429//
4430// Sets how often the checkpoint thread will run, in seconds
4431//
4432void checkpointer::set_checkpoint_period(uint32_t new_period) {
4433 toku_minicron_change_period(&m_checkpointer_cron, new_period*1000);
4434}
4435
4436//
4437// Sets how often the checkpoint thread will run.
4438//
4439uint32_t checkpointer::get_checkpoint_period() {
4440 return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron);
4441}
4442
4443//
4444// Stops the checkpoint thread.
4445//
4446int checkpointer::shutdown() {
4447 return toku_minicron_shutdown(&m_checkpointer_cron);
4448}
4449
4450//
4451// If checkpointing is running, this returns false.
4452//
4453bool checkpointer::has_been_shutdown() {
4454 return toku_minicron_has_been_shutdown(&m_checkpointer_cron);
4455}
4456
4457TOKULOGGER checkpointer::get_logger() {
4458 return m_logger;
4459}
4460
4461void checkpointer::increment_num_txns() {
4462 m_checkpoint_num_txns++;
4463}
4464
4465struct iterate_begin_checkpoint {
4466 LSN lsn_of_checkpoint_in_progress;
4467 iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { }
4468 static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) {
4469 assert(cf->begin_checkpoint_userdata);
4470 if (cf->for_checkpoint) {
4471 cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata);
4472 }
4473 return 0;
4474 }
4475};
4476
4477//
4478// Update the user data in any cachefiles in our checkpoint list.
4479//
4480void checkpointer::update_cachefiles() {
4481 struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress);
4482 int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint,
4483 iterate_begin_checkpoint::fn>(&iterate);
4484 assert_zero(r);
4485}
4486
4487struct iterate_note_pin {
4488 static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4489 assert(cf->note_pin_by_checkpoint);
4490 cf->note_pin_by_checkpoint(cf, cf->userdata);
4491 cf->for_checkpoint = true;
4492 return 0;
4493 }
4494};
4495
4496//
4497// Sets up and kicks off a checkpoint.
4498//
4499void checkpointer::begin_checkpoint() {
4500 // 1. Initialize the accountability counters.
4501 m_checkpoint_num_txns = 0;
4502
4503 // 2. Make list of cachefiles to be included in the checkpoint.
4504 m_cf_list->read_lock();
4505 m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr);
4506 m_checkpoint_num_files = m_cf_list->m_active_fileid.size();
4507 m_cf_list->read_unlock();
4508
4509 // 3. Create log entries for this checkpoint.
4510 if (m_logger) {
4511 this->log_begin_checkpoint();
4512 }
4513
4514 bjm_reset(m_checkpoint_clones_bjm);
4515
4516 m_list->write_pending_exp_lock();
4517 m_list->read_list_lock();
4518 m_cf_list->read_lock(); // needed for update_cachefiles
4519 m_list->write_pending_cheap_lock();
4520 // 4. Turn on all the relevant checkpoint pending bits.
4521 this->turn_on_pending_bits();
4522
4523 // 5.
4524 this->update_cachefiles();
4525 m_list->write_pending_cheap_unlock();
4526 m_cf_list->read_unlock();
4527 m_list->read_list_unlock();
4528 m_list->write_pending_exp_unlock();
4529}
4530
4531struct iterate_log_fassociate {
4532 static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4533 assert(cf->log_fassociate_during_checkpoint);
4534 cf->log_fassociate_during_checkpoint(cf, cf->userdata);
4535 return 0;
4536 }
4537};
4538
4539//
4540// Assuming the logger exists, this will write out the folloing
4541// information to the log.
4542//
4543// 1. Writes the BEGIN_CHECKPOINT to the log.
4544// 2. Writes the list of open dictionaries to the log.
4545// 3. Writes the list of open transactions to the log.
4546// 4. Writes the list of dicionaries that have had rollback logs suppresed.
4547//
4548// NOTE: This also has the side effecto of setting the LSN
4549// of checkpoint in progress.
4550//
4551void checkpointer::log_begin_checkpoint() {
4552 int r = 0;
4553
4554 // Write the BEGIN_CHECKPOINT to the log.
4555 LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
4556 TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger);
4557 TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
4558 toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid);
4559 m_lsn_of_checkpoint_in_progress = begin_lsn;
4560
4561 // Log the list of open dictionaries.
4562 m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr);
4563
4564 // Write open transactions to the log.
4565 r = toku_txn_manager_iter_over_live_txns(
4566 m_logger->txn_manager,
4567 log_open_txn,
4568 this
4569 );
4570 assert(r == 0);
4571}
4572
4573//
4574// Sets the pending bits of EVERY PAIR in the cachetable, regardless of
4575// whether the PAIR is clean or not. It will be the responsibility of
4576// end_checkpoint or client threads to simply clear the pending bit
4577// if the PAIR is clean.
4578//
4579// On entry and exit , the pair list's read list lock is grabbed, and
4580// both pending locks are grabbed
4581//
4582void checkpointer::turn_on_pending_bits() {
4583 PAIR p = NULL;
4584 uint32_t i;
4585 for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) {
4586 assert(!p->checkpoint_pending);
4587 //Only include pairs belonging to cachefiles in the checkpoint
4588 if (!p->cachefile->for_checkpoint) {
4589 continue;
4590 }
4591 // Mark everything as pending a checkpoint
4592 //
4593 // The rule for the checkpoint_pending bit is as follows:
4594 // - begin_checkpoint may set checkpoint_pending to true
4595 // even though the pair lock on the node is not held.
4596 // - any thread that wants to clear the pending bit must own
4597 // the PAIR lock. Otherwise,
4598 // we may end up clearing the pending bit before the
4599 // current lock is ever released.
4600 p->checkpoint_pending = true;
4601 if (m_list->m_pending_head) {
4602 m_list->m_pending_head->pending_prev = p;
4603 }
4604 p->pending_next = m_list->m_pending_head;
4605 p->pending_prev = NULL;
4606 m_list->m_pending_head = p;
4607 }
4608 invariant(p == m_list->m_checkpoint_head);
4609}
4610
4611void checkpointer::add_background_job() {
4612 int r = bjm_add_background_job(m_checkpoint_clones_bjm);
4613 assert_zero(r);
4614}
4615void checkpointer::remove_background_job() {
4616 bjm_remove_background_job(m_checkpoint_clones_bjm);
4617}
4618
4619void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextra) {
4620 toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE));
4621 CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get());
4622
4623 this->fill_checkpoint_cfs(checkpoint_cfs);
4624 this->checkpoint_pending_pairs();
4625 this->checkpoint_userdata(checkpoint_cfs);
4626 // For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written.
4627 if (testcallback_f) {
4628 testcallback_f(testextra);
4629 }
4630 this->log_end_checkpoint();
4631 this->end_checkpoint_userdata(checkpoint_cfs);
4632
4633 // Delete list of cachefiles in the checkpoint,
4634 this->remove_cachefiles(checkpoint_cfs);
4635}
4636
4637struct iterate_checkpoint_cfs {
4638 CACHEFILE *checkpoint_cfs;
4639 uint32_t checkpoint_num_files;
4640 uint32_t curr_index;
4641 iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) :
4642 checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) {
4643 }
4644 static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) {
4645 if (cf->for_checkpoint) {
4646 assert(info->curr_index < info->checkpoint_num_files);
4647 info->checkpoint_cfs[info->curr_index] = cf;
4648 info->curr_index++;
4649 }
4650 return 0;
4651 }
4652};
4653
4654void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
4655 struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files);
4656
4657 m_cf_list->read_lock();
4658 m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate);
4659 assert(iterate.curr_index == m_checkpoint_num_files);
4660 m_cf_list->read_unlock();
4661}
4662
4663void checkpointer::checkpoint_pending_pairs() {
4664 PAIR p;
4665 m_list->read_list_lock();
4666 while ((p = m_list->m_pending_head)!=0) {
4667 // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call.
4668 m_list->m_pending_head = m_list->m_pending_head->pending_next;
4669 m_list->pending_pairs_remove(p);
4670 // if still pending, clear the pending bit and write out the node
4671 pair_lock(p);
4672 m_list->read_list_unlock();
4673 write_pair_for_checkpoint_thread(m_ev, p);
4674 pair_unlock(p);
4675 m_list->read_list_lock();
4676 }
4677 assert(!m_list->m_pending_head);
4678 m_list->read_list_unlock();
4679 bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm);
4680}
4681
4682void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4683 // have just written data blocks, so next write the translation and header for each open dictionary
4684 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4685 CACHEFILE cf = checkpoint_cfs[i];
4686 assert(cf->for_checkpoint);
4687 assert(cf->checkpoint_userdata);
4688 toku_cachetable_set_checkpointing_user_data_status(1);
4689 cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
4690 toku_cachetable_set_checkpointing_user_data_status(0);
4691 }
4692}
4693
4694void checkpointer::log_end_checkpoint() {
4695 if (m_logger) {
4696 toku_log_end_checkpoint(m_logger, NULL,
4697 1, // want the end_checkpoint to be fsync'd
4698 m_lsn_of_checkpoint_in_progress,
4699 0,
4700 m_checkpoint_num_files,
4701 m_checkpoint_num_txns);
4702 toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress);
4703 }
4704}
4705
4706void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4707 // everything has been written to file and fsynced
4708 // ... call checkpoint-end function in block translator
4709 // to free obsolete blocks on disk used by previous checkpoint
4710 //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock
4711 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4712 CACHEFILE cf = checkpoint_cfs[i];
4713 assert(cf->for_checkpoint);
4714 assert(cf->end_checkpoint_userdata);
4715 cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
4716 }
4717}
4718
4719//
4720// Deletes all the cachefiles in this checkpointers cachefile list.
4721//
4722void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
4723 // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile
4724 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4725 CACHEFILE cf = checkpoint_cfs[i];
4726 // Checking for function existing so that this function
4727 // can be called from cachetable tests.
4728 assert(cf->for_checkpoint);
4729 cf->for_checkpoint = false;
4730 assert(cf->note_unpin_by_checkpoint);
4731 // Clear the bit saying theis file is in the checkpoint.
4732 cf->note_unpin_by_checkpoint(cf, cf->userdata);
4733 }
4734}
4735
4736
4737////////////////////////////////////////////////////////
4738//
4739// cachefiles list
4740//
4741static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
4742
4743void cachefile_list::init() {
4744 m_next_filenum_to_use.fileid = 0;
4745 m_next_hash_id_to_use = 0;
4746 toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr);
4747 m_active_filenum.create();
4748 m_active_fileid.create();
4749 m_stale_fileid.create();
4750}
4751
4752void cachefile_list::destroy() {
4753 m_active_filenum.destroy();
4754 m_active_fileid.destroy();
4755 m_stale_fileid.destroy();
4756 toku_pthread_rwlock_destroy(&m_lock);
4757}
4758
4759void cachefile_list::read_lock() {
4760 toku_pthread_rwlock_rdlock(&m_lock);
4761}
4762
4763void cachefile_list::read_unlock() {
4764 toku_pthread_rwlock_rdunlock(&m_lock);
4765}
4766
4767void cachefile_list::write_lock() {
4768 toku_pthread_rwlock_wrlock(&m_lock);
4769}
4770
4771void cachefile_list::write_unlock() {
4772 toku_pthread_rwlock_wrunlock(&m_lock);
4773}
4774
4775struct iterate_find_iname {
4776 const char *iname_in_env;
4777 CACHEFILE found_cf;
4778 iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { }
4779 static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) {
4780 if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) {
4781 info->found_cf = cf;
4782 return -1;
4783 }
4784 return 0;
4785 }
4786};
4787
4788int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) {
4789 struct iterate_find_iname iterate(iname_in_env);
4790
4791 read_lock();
4792 int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate);
4793 if (iterate.found_cf != nullptr) {
4794 assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0);
4795 *cf = iterate.found_cf;
4796 r = 0;
4797 } else {
4798 r = ENOENT;
4799 }
4800 read_unlock();
4801 return r;
4802}
4803
4804static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) {
4805 const FILENUM a = a_cf->filenum;
4806 if (a.fileid < b.fileid) {
4807 return -1;
4808 } else if (a.fileid == b.fileid) {
4809 return 0;
4810 } else {
4811 return 1;
4812 }
4813}
4814
4815int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
4816 read_lock();
4817 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr);
4818 if (r == DB_NOTFOUND) {
4819 r = ENOENT;
4820 } else {
4821 invariant_zero(r);
4822 }
4823 read_unlock();
4824 return r;
4825}
4826
4827static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) {
4828 return toku_fileid_cmp(a_cf->fileid, b);
4829}
4830
4831void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
4832 int r;
4833 r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr);
4834 assert_zero(r);
4835 r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4836 assert_zero(r);
4837}
4838
4839void cachefile_list::add_stale_cf(CACHEFILE cf) {
4840 write_lock();
4841 int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4842 assert_zero(r);
4843 write_unlock();
4844}
4845
4846void cachefile_list::remove_cf(CACHEFILE cf) {
4847 write_lock();
4848
4849 uint32_t idx;
4850 int r;
4851 r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx);
4852 assert_zero(r);
4853 r = m_active_filenum.delete_at(idx);
4854 assert_zero(r);
4855
4856 r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4857 assert_zero(r);
4858 r = m_active_fileid.delete_at(idx);
4859 assert_zero(r);
4860
4861 write_unlock();
4862}
4863
4864void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) {
4865 uint32_t idx;
4866 int r;
4867 r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4868 assert_zero(r);
4869 r = m_stale_fileid.delete_at(idx);
4870 assert_zero(r);
4871}
4872
4873FILENUM cachefile_list::reserve_filenum() {
4874 // taking a write lock because we are modifying next_filenum_to_use
4875 FILENUM filenum = FILENUM_NONE;
4876 write_lock();
4877 while (1) {
4878 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr);
4879 if (r == 0) {
4880 m_next_filenum_to_use.fileid++;
4881 continue;
4882 }
4883 assert(r == DB_NOTFOUND);
4884
4885 // skip the reserved value UINT32_MAX and wrap around to zero
4886 if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) {
4887 m_next_filenum_to_use.fileid = 0;
4888 continue;
4889 }
4890
4891 filenum = m_next_filenum_to_use;
4892 m_next_filenum_to_use.fileid++;
4893 break;
4894 }
4895 write_unlock();
4896 return filenum;
4897}
4898
4899uint32_t cachefile_list::get_new_hash_id_unlocked() {
4900 uint32_t retval = m_next_hash_id_to_use;
4901 m_next_hash_id_to_use++;
4902 return retval;
4903}
4904
4905CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
4906 CACHEFILE cf = nullptr;
4907 int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4908 if (r == 0) {
4909 assert(!cf->unlink_on_close);
4910 }
4911 return cf;
4912}
4913
4914CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) {
4915 CACHEFILE cf = nullptr;
4916 int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4917 if (r == 0) {
4918 assert(!cf->unlink_on_close);
4919 }
4920 return cf;
4921}
4922
4923void cachefile_list::verify_unused_filenum(FILENUM filenum) {
4924 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr);
4925 assert(r == DB_NOTFOUND);
4926}
4927
4928// returns true if some eviction ran, false otherwise
4929bool cachefile_list::evict_some_stale_pair(evictor* ev) {
4930 write_lock();
4931 if (m_stale_fileid.size() == 0) {
4932 write_unlock();
4933 return false;
4934 }
4935
4936 CACHEFILE stale_cf = nullptr;
4937 int r = m_stale_fileid.fetch(0, &stale_cf);
4938 assert_zero(r);
4939
4940 // we should not have a cf in the stale list
4941 // that does not have any pairs
4942 PAIR p = stale_cf->cf_head;
4943 paranoid_invariant(p != NULL);
4944 evict_pair_from_cachefile(p);
4945
4946 // now that we have evicted something,
4947 // let's check if the cachefile is needed anymore
4948 //
4949 // it is not needed if the latest eviction caused
4950 // the cf_head for that cf to become null
4951 bool destroy_cf = stale_cf->cf_head == nullptr;
4952 if (destroy_cf) {
4953 remove_stale_cf_unlocked(stale_cf);
4954 }
4955
4956 write_unlock();
4957
4958 ev->remove_pair_attr(p->attr);
4959 cachetable_free_pair(p);
4960 if (destroy_cf) {
4961 cachefile_destroy(stale_cf);
4962 }
4963 return true;
4964}
4965
4966void cachefile_list::free_stale_data(evictor* ev) {
4967 write_lock();
4968 while (m_stale_fileid.size() != 0) {
4969 CACHEFILE stale_cf = nullptr;
4970 int r = m_stale_fileid.fetch(0, &stale_cf);
4971 assert_zero(r);
4972
4973 // we should not have a cf in the stale list
4974 // that does not have any pairs
4975 PAIR p = stale_cf->cf_head;
4976 paranoid_invariant(p != NULL);
4977
4978 evict_pair_from_cachefile(p);
4979 ev->remove_pair_attr(p->attr);
4980 cachetable_free_pair(p);
4981
4982 // now that we have evicted something,
4983 // let's check if the cachefile is needed anymore
4984 if (stale_cf->cf_head == NULL) {
4985 remove_stale_cf_unlocked(stale_cf);
4986 cachefile_destroy(stale_cf);
4987 }
4988 }
4989 write_unlock();
4990}
4991
4992void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void);
4993void
4994toku_cachetable_helgrind_ignore(void) {
4995 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
4996 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
4997 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
4998 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
4999 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
5000 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status);
5001}
5002