1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <my_global.h> |
40 | #include <string.h> |
41 | #include <time.h> |
42 | #include <stdarg.h> |
43 | |
44 | #include <portability/memory.h> |
45 | #include <portability/toku_race_tools.h> |
46 | #include <portability/toku_atomic.h> |
47 | #include <portability/toku_pthread.h> |
48 | #include <portability/toku_portability.h> |
49 | #include <portability/toku_stdlib.h> |
50 | #include <portability/toku_time.h> |
51 | |
52 | #include "ft/cachetable/cachetable.h" |
53 | #include "ft/cachetable/cachetable-internal.h" |
54 | #include "ft/cachetable/checkpoint.h" |
55 | #include "ft/logger/log-internal.h" |
56 | #include "util/rwlock.h" |
57 | #include "util/scoped_malloc.h" |
58 | #include "util/status.h" |
59 | #include "util/context.h" |
60 | |
61 | toku_instr_key *cachetable_m_mutex_key; |
62 | toku_instr_key *cachetable_ev_thread_lock_mutex_key; |
63 | |
64 | toku_instr_key *cachetable_m_list_lock_key; |
65 | toku_instr_key *cachetable_m_pending_lock_expensive_key; |
66 | toku_instr_key *cachetable_m_pending_lock_cheap_key; |
67 | toku_instr_key *cachetable_m_lock_key; |
68 | |
69 | toku_instr_key *cachetable_value_key; |
70 | toku_instr_key *cachetable_disk_nb_rwlock_key; |
71 | |
72 | toku_instr_key *cachetable_p_refcount_wait_key; |
73 | toku_instr_key *cachetable_m_flow_control_cond_key; |
74 | toku_instr_key *cachetable_m_ev_thread_cond_key; |
75 | |
76 | toku_instr_key *cachetable_disk_nb_mutex_key; |
77 | toku_instr_key *log_internal_lock_mutex_key; |
78 | toku_instr_key *eviction_thread_key; |
79 | |
80 | /////////////////////////////////////////////////////////////////////////////////// |
81 | // Engine status |
82 | // |
83 | // Status is intended for display to humans to help understand system behavior. |
84 | // It does not need to be perfectly thread-safe. |
85 | |
86 | // These should be in the cachetable object, but we make them file-wide so that gdb can get them easily. |
87 | // They were left here after engine status cleanup (#2949, rather than moved into the status struct) |
88 | // so they are still easily available to the debugger and to save lots of typing. |
89 | static uint64_t cachetable_miss; |
90 | static uint64_t cachetable_misstime; // time spent waiting for disk read |
91 | static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? |
92 | static uint64_t cachetable_evictions; |
93 | static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed |
94 | |
95 | |
96 | // Note, toku_cachetable_get_status() is below, after declaration of cachetable. |
97 | |
98 | static void * const zero_value = nullptr; |
99 | static PAIR_ATTR const zero_attr = { |
100 | .size = 0, |
101 | .nonleaf_size = 0, |
102 | .leaf_size = 0, |
103 | .rollback_size = 0, |
104 | .cache_pressure_size = 0, |
105 | .is_valid = true |
106 | }; |
107 | |
108 | |
109 | static inline void ctpair_destroy(PAIR p) { |
110 | p->value_rwlock.deinit(); |
111 | paranoid_invariant(p->refcount == 0); |
112 | nb_mutex_destroy(&p->disk_nb_mutex); |
113 | toku_cond_destroy(&p->refcount_wait); |
114 | toku_free(p); |
115 | } |
116 | |
117 | static inline void pair_lock(PAIR p) { |
118 | toku_mutex_lock(p->mutex); |
119 | } |
120 | |
121 | static inline void pair_unlock(PAIR p) { |
122 | toku_mutex_unlock(p->mutex); |
123 | } |
124 | |
125 | // adds a reference to the PAIR |
126 | // on input and output, PAIR mutex is held |
127 | static void pair_add_ref_unlocked(PAIR p) { |
128 | p->refcount++; |
129 | } |
130 | |
131 | // releases a reference to the PAIR |
132 | // on input and output, PAIR mutex is held |
133 | static void pair_release_ref_unlocked(PAIR p) { |
134 | paranoid_invariant(p->refcount > 0); |
135 | p->refcount--; |
136 | if (p->refcount == 0 && p->num_waiting_on_refs > 0) { |
137 | toku_cond_broadcast(&p->refcount_wait); |
138 | } |
139 | } |
140 | |
141 | static void pair_wait_for_ref_release_unlocked(PAIR p) { |
142 | p->num_waiting_on_refs++; |
143 | while (p->refcount > 0) { |
144 | toku_cond_wait(&p->refcount_wait, p->mutex); |
145 | } |
146 | p->num_waiting_on_refs--; |
147 | } |
148 | |
149 | bool toku_ctpair_is_write_locked(PAIR pair) { |
150 | return pair->value_rwlock.writers() == 1; |
151 | } |
152 | |
153 | void |
154 | toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { |
155 | ct_status.init(); |
156 | CT_STATUS_VAL(CT_MISS) = cachetable_miss; |
157 | CT_STATUS_VAL(CT_MISSTIME) = cachetable_misstime; |
158 | CT_STATUS_VAL(CT_PREFETCHES) = cachetable_prefetches; |
159 | CT_STATUS_VAL(CT_EVICTIONS) = cachetable_evictions; |
160 | CT_STATUS_VAL(CT_CLEANER_EXECUTIONS) = cleaner_executions; |
161 | CT_STATUS_VAL(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct); |
162 | CT_STATUS_VAL(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct); |
163 | toku_kibbutz_get_status(ct->client_kibbutz, |
164 | &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS), |
165 | &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE), |
166 | &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE), |
167 | &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE), |
168 | &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED), |
169 | &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME)); |
170 | toku_kibbutz_get_status(ct->ct_kibbutz, |
171 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS), |
172 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE), |
173 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE), |
174 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE), |
175 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED), |
176 | &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME)); |
177 | toku_kibbutz_get_status(ct->checkpointing_kibbutz, |
178 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS), |
179 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE), |
180 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE), |
181 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE), |
182 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED), |
183 | &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME)); |
184 | ct->ev.fill_engine_status(); |
185 | *statp = ct_status; |
186 | } |
187 | |
188 | // FIXME global with no toku prefix |
189 | void remove_background_job_from_cf(CACHEFILE cf) |
190 | { |
191 | bjm_remove_background_job(cf->bjm); |
192 | } |
193 | |
194 | // FIXME global with no toku prefix |
195 | void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *) |
196 | // The function f must call remove_background_job_from_cf when it completes |
197 | { |
198 | int r = bjm_add_background_job(cf->bjm); |
199 | // if client is adding a background job, then it must be done |
200 | // at a time when the manager is accepting background jobs, otherwise |
201 | // the client is screwing up |
202 | assert_zero(r); |
203 | toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra); |
204 | } |
205 | |
206 | static int |
207 | checkpoint_thread (void *checkpointer_v) |
208 | // Effect: If checkpoint_period>0 thn periodically run a checkpoint. |
209 | // If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later. |
210 | // If someone sets the checkpoint_shutdown boolean , then this thread exits. |
211 | // This thread notices those changes by waiting on a condition variable. |
212 | { |
213 | CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v); |
214 | int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT); |
215 | invariant_zero(r); |
216 | return r; |
217 | } |
218 | |
219 | void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) { |
220 | ct->cp.set_checkpoint_period(new_period); |
221 | } |
222 | |
223 | uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) { |
224 | return ct->cp.get_checkpoint_period(); |
225 | } |
226 | |
227 | void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) { |
228 | ct->cl.set_period(new_period); |
229 | } |
230 | |
231 | uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) { |
232 | return ct->cl.get_period_unlocked(); |
233 | } |
234 | |
235 | void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) { |
236 | ct->cl.set_iterations(new_iterations); |
237 | } |
238 | |
239 | uint32_t toku_get_cleaner_iterations (CACHETABLE ct) { |
240 | return ct->cl.get_iterations(); |
241 | } |
242 | |
243 | uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) { |
244 | return ct->cl.get_iterations(); |
245 | } |
246 | |
247 | void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) { |
248 | ct->ev.set_enable_partial_eviction(enabled); |
249 | } |
250 | |
251 | bool toku_get_enable_partial_eviction (CACHETABLE ct) { |
252 | return ct->ev.get_enable_partial_eviction(); |
253 | } |
254 | |
255 | // reserve 25% as "unreservable". The loader cannot have it. |
256 | #define unreservable_memory(size) ((size)/4) |
257 | |
258 | int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit, |
259 | unsigned long client_pool_threads, |
260 | unsigned long cachetable_pool_threads, |
261 | unsigned long checkpoint_pool_threads, |
262 | LSN UU(initial_lsn), TOKULOGGER logger) { |
263 | int result = 0; |
264 | int r; |
265 | |
266 | if (size_limit == 0) { |
267 | size_limit = 128*1024*1024; |
268 | } |
269 | |
270 | CACHETABLE XCALLOC(ct); |
271 | ct->list.init(); |
272 | ct->cf_list.init(); |
273 | |
274 | int num_processors = toku_os_get_number_active_processors(); |
275 | int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1; |
276 | r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors, |
277 | &ct->client_kibbutz); |
278 | if (r != 0) { |
279 | result = r; |
280 | goto cleanup; |
281 | } |
282 | r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors, |
283 | &ct->ct_kibbutz); |
284 | if (r != 0) { |
285 | result = r; |
286 | goto cleanup; |
287 | } |
288 | r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers, |
289 | &ct->checkpointing_kibbutz); |
290 | if (r != 0) { |
291 | result = r; |
292 | goto cleanup; |
293 | } |
294 | // must be done after creating ct_kibbutz |
295 | r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD); |
296 | if (r != 0) { |
297 | result = r; |
298 | goto cleanup; |
299 | } |
300 | r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list); |
301 | if (r != 0) { |
302 | result = r; |
303 | goto cleanup; |
304 | } |
305 | r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration |
306 | if (r != 0) { |
307 | result = r; |
308 | goto cleanup; |
309 | } |
310 | ct->env_dir = toku_xstrdup("." ); |
311 | cleanup: |
312 | if (result == 0) { |
313 | *ct_result = ct; |
314 | } else { |
315 | toku_cachetable_close(&ct); |
316 | } |
317 | return result; |
318 | } |
319 | |
320 | // Returns a pointer to the checkpoint contained within |
321 | // the given cachetable. |
322 | CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) { |
323 | return &ct->cp; |
324 | } |
325 | |
326 | uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) { |
327 | uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound); |
328 | return reserved_memory; |
329 | } |
330 | |
331 | void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) { |
332 | ct->ev.release_reserved_memory(reserved_memory); |
333 | } |
334 | |
335 | void |
336 | toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) { |
337 | toku_free(ct->env_dir); |
338 | ct->env_dir = toku_xstrdup(env_dir); |
339 | } |
340 | |
341 | // What cachefile goes with particular iname (iname relative to env)? |
342 | // The transaction that is adding the reference might not have a reference |
343 | // to the ft, therefore the cachefile might be closing. |
344 | // If closing, we want to return that it is not there, but must wait till after |
345 | // the close has finished. |
346 | // Once the close has finished, there must not be a cachefile with that name |
347 | // in the cachetable. |
348 | int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) { |
349 | return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf); |
350 | } |
351 | |
352 | // What cachefile goes with particular fd? |
353 | // This function can only be called if the ft is still open, so file must |
354 | // still be open |
355 | int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) { |
356 | return ct->cf_list.cachefile_of_filenum(filenum, cf); |
357 | } |
358 | |
359 | // TEST-ONLY function |
360 | // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. |
361 | int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) { |
362 | FILENUM filenum = toku_cachetable_reserve_filenum(ct); |
363 | bool was_open; |
364 | return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open); |
365 | } |
366 | |
367 | // Get a unique filenum from the cachetable |
368 | FILENUM |
369 | toku_cachetable_reserve_filenum(CACHETABLE ct) { |
370 | return ct->cf_list.reserve_filenum(); |
371 | } |
372 | |
373 | static void create_new_cachefile( |
374 | CACHETABLE ct, |
375 | FILENUM filenum, |
376 | uint32_t hash_id, |
377 | int fd, |
378 | const char *fname_in_env, |
379 | struct fileid fileid, |
380 | CACHEFILE *cfptr |
381 | ) { |
382 | // File is not open. Make a new cachefile. |
383 | CACHEFILE newcf = NULL; |
384 | XCALLOC(newcf); |
385 | newcf->cachetable = ct; |
386 | newcf->hash_id = hash_id; |
387 | newcf->fileid = fileid; |
388 | |
389 | newcf->filenum = filenum; |
390 | newcf->fd = fd; |
391 | newcf->fname_in_env = toku_xstrdup(fname_in_env); |
392 | bjm_init(&newcf->bjm); |
393 | *cfptr = newcf; |
394 | } |
395 | |
396 | int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, |
397 | const char *fname_in_env, |
398 | FILENUM filenum, bool* was_open) { |
399 | int r; |
400 | CACHEFILE newcf; |
401 | struct fileid fileid; |
402 | |
403 | assert(filenum.fileid != FILENUM_NONE.fileid); |
404 | r = toku_os_get_unique_file_id(fd, &fileid); |
405 | if (r != 0) { |
406 | r = get_error_errno(); |
407 | close(fd); |
408 | return r; |
409 | } |
410 | ct->cf_list.write_lock(); |
411 | CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid); |
412 | if (existing_cf) { |
413 | *was_open = true; |
414 | // Reuse an existing cachefile and close the caller's fd, whose |
415 | // responsibility has been passed to us. |
416 | r = close(fd); |
417 | assert(r == 0); |
418 | *cfptr = existing_cf; |
419 | r = 0; |
420 | goto exit; |
421 | } |
422 | *was_open = false; |
423 | ct->cf_list.verify_unused_filenum(filenum); |
424 | // now let's try to find it in the stale cachefiles |
425 | existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid); |
426 | // found the stale file, |
427 | if (existing_cf) { |
428 | // fix up the fields in the cachefile |
429 | existing_cf->filenum = filenum; |
430 | existing_cf->fd = fd; |
431 | existing_cf->fname_in_env = toku_xstrdup(fname_in_env); |
432 | bjm_init(&existing_cf->bjm); |
433 | |
434 | // now we need to move all the PAIRs in it back into the cachetable |
435 | ct->list.write_list_lock(); |
436 | for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) { |
437 | pair_lock(curr_pair); |
438 | ct->list.add_to_cachetable_only(curr_pair); |
439 | pair_unlock(curr_pair); |
440 | } |
441 | ct->list.write_list_unlock(); |
442 | // move the cachefile back to the list of active cachefiles |
443 | ct->cf_list.remove_stale_cf_unlocked(existing_cf); |
444 | ct->cf_list.add_cf_unlocked(existing_cf); |
445 | *cfptr = existing_cf; |
446 | r = 0; |
447 | goto exit; |
448 | } |
449 | |
450 | create_new_cachefile( |
451 | ct, |
452 | filenum, |
453 | ct->cf_list.get_new_hash_id_unlocked(), |
454 | fd, |
455 | fname_in_env, |
456 | fileid, |
457 | &newcf |
458 | ); |
459 | |
460 | ct->cf_list.add_cf_unlocked(newcf); |
461 | |
462 | *cfptr = newcf; |
463 | r = 0; |
464 | exit: |
465 | ct->cf_list.write_unlock(); |
466 | return r; |
467 | } |
468 | |
469 | static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely); |
470 | |
471 | //TEST_ONLY_FUNCTION |
472 | int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) { |
473 | char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env); |
474 | int fd = open(fname_in_cwd, flags+O_BINARY, mode); |
475 | int r; |
476 | if (fd < 0) { |
477 | r = get_error_errno(); |
478 | } else { |
479 | r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env); |
480 | } |
481 | toku_free(fname_in_cwd); |
482 | return r; |
483 | } |
484 | |
485 | char * |
486 | toku_cachefile_fname_in_env (CACHEFILE cf) { |
487 | if (cf) { |
488 | return cf->fname_in_env; |
489 | } |
490 | return nullptr; |
491 | } |
492 | |
493 | void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) { |
494 | cf->fname_in_env = new_fname_in_env; |
495 | } |
496 | |
497 | int |
498 | toku_cachefile_get_fd (CACHEFILE cf) { |
499 | return cf->fd; |
500 | } |
501 | |
502 | static void cachefile_destroy(CACHEFILE cf) { |
503 | if (cf->free_userdata) { |
504 | cf->free_userdata(cf, cf->userdata); |
505 | } |
506 | toku_free(cf); |
507 | } |
508 | |
509 | void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { |
510 | CACHEFILE cf = *cfp; |
511 | CACHETABLE ct = cf->cachetable; |
512 | |
513 | bjm_wait_for_jobs_to_finish(cf->bjm); |
514 | |
515 | // Clients should never attempt to close a cachefile that is being |
516 | // checkpointed. We notify clients this is happening in the |
517 | // note_pin_by_checkpoint callback. |
518 | assert(!cf->for_checkpoint); |
519 | |
520 | // Flush the cachefile and remove all of its pairs from the cachetable, |
521 | // but keep the PAIRs linked in the cachefile. We will store the cachefile |
522 | // away in case it gets opened immedietely |
523 | // |
524 | // if we are unlinking on close, then we want to evict completely, |
525 | // otherwise, we will keep the PAIRs and cachefile around in case |
526 | // a subsequent open comes soon |
527 | cachetable_flush_cachefile(ct, cf, cf->unlink_on_close); |
528 | |
529 | // Call the close userdata callback to notify the client this cachefile |
530 | // and its underlying file are going to be closed |
531 | if (cf->close_userdata) { |
532 | cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn); |
533 | } |
534 | // fsync and close the fd. |
535 | toku_file_fsync_without_accounting(cf->fd); |
536 | int r = close(cf->fd); |
537 | assert(r == 0); |
538 | cf->fd = -1; |
539 | |
540 | // destroy the parts of the cachefile |
541 | // that do not persist across opens/closes |
542 | bjm_destroy(cf->bjm); |
543 | cf->bjm = NULL; |
544 | |
545 | // remove the cf from the list of active cachefiles |
546 | ct->cf_list.remove_cf(cf); |
547 | cf->filenum = FILENUM_NONE; |
548 | |
549 | // Unlink the file if the bit was set |
550 | if (cf->unlink_on_close) { |
551 | char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env); |
552 | r = unlink(fname_in_cwd); |
553 | assert_zero(r); |
554 | toku_free(fname_in_cwd); |
555 | } |
556 | toku_free(cf->fname_in_env); |
557 | cf->fname_in_env = NULL; |
558 | |
559 | // we destroy the cf if the unlink bit was set or if no PAIRs exist |
560 | // if no PAIRs exist, there is no sense in keeping the cachefile around |
561 | bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL); |
562 | if (destroy_cf) { |
563 | cachefile_destroy(cf); |
564 | } |
565 | else { |
566 | ct->cf_list.add_stale_cf(cf); |
567 | } |
568 | } |
569 | |
570 | // This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c |
571 | // The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number. |
572 | // Instead we can use a bitmask on a table of size power of two. |
573 | // This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan |
574 | static inline uint32_t rot(uint32_t x, uint32_t k) { |
575 | return (x<<k) | (x>>(32-k)); |
576 | } |
577 | static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) { |
578 | c ^= b; c -= rot(b,14); |
579 | a ^= c; a -= rot(c,11); |
580 | b ^= a; b -= rot(a,25); |
581 | c ^= b; c -= rot(b,16); |
582 | a ^= c; a -= rot(c,4); |
583 | b ^= a; b -= rot(a,14); |
584 | c ^= b; c -= rot(b,24); |
585 | return c; |
586 | } |
587 | |
588 | uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key) |
589 | // Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two. |
590 | { |
591 | return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b); |
592 | } |
593 | |
594 | #define CLOCK_SATURATION 15 |
595 | #define CLOCK_INITIAL_COUNT 3 |
596 | |
597 | // Requires pair's mutex to be held |
598 | static void pair_touch (PAIR p) { |
599 | p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION; |
600 | } |
601 | |
602 | // Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held |
603 | // Effects: the pair is removed from the LRU list and from the cachetable's hash table. |
604 | // The size of the objects in the cachetable is adjusted by the size of the pair being |
605 | // removed. |
606 | static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) { |
607 | list->evict_completely(p); |
608 | ev->remove_pair_attr(p->attr); |
609 | } |
610 | |
611 | static void cachetable_free_pair(PAIR p) { |
612 | CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; |
613 | CACHEKEY key = p->key; |
614 | void *value = p->value_data; |
615 | void* disk_data = p->disk_data; |
616 | void * = p->write_extraargs; |
617 | PAIR_ATTR old_attr = p->attr; |
618 | |
619 | cachetable_evictions++; |
620 | PAIR_ATTR new_attr = p->attr; |
621 | // Note that flush_callback is called with write_me false, so the only purpose of this |
622 | // call is to tell the ft layer to evict the node (keep_me is false). |
623 | // Also, because we have already removed the PAIR from the cachetable in |
624 | // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd |
625 | // for the first two parameters, as these may be invalid (#5171), so, we |
626 | // pass in NULL and -1, dummy values |
627 | flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false); |
628 | |
629 | ctpair_destroy(p); |
630 | } |
631 | |
632 | // assumes value_rwlock and disk_nb_mutex held on entry |
633 | // responsibility of this function is to only write a locked PAIR to disk |
634 | // and NOTHING else. We do not manipulate the state of the PAIR |
635 | // of the cachetable here (with the exception of ct->size_current for clones) |
636 | // |
637 | // No pair_list lock should be held, and the PAIR mutex should not be held |
638 | // |
639 | static void cachetable_only_write_locked_data( |
640 | evictor* ev, |
641 | PAIR p, |
642 | bool for_checkpoint, |
643 | PAIR_ATTR* new_attr, |
644 | bool is_clone |
645 | ) |
646 | { |
647 | CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; |
648 | CACHEFILE cachefile = p->cachefile; |
649 | CACHEKEY key = p->key; |
650 | void *value = is_clone ? p->cloned_value_data : p->value_data; |
651 | void *disk_data = p->disk_data; |
652 | void * = p->write_extraargs; |
653 | PAIR_ATTR old_attr; |
654 | // we do this for drd. If we are a cloned pair and only |
655 | // have the disk_nb_mutex, it is a race to access p->attr. |
656 | // Luckily, old_attr here is only used for some test applications, |
657 | // so inaccurate non-size fields are ok. |
658 | if (is_clone) { |
659 | old_attr = make_pair_attr(p->cloned_value_size); |
660 | } |
661 | else { |
662 | old_attr = p->attr; |
663 | } |
664 | bool dowrite = true; |
665 | |
666 | // write callback |
667 | flush_callback( |
668 | cachefile, |
669 | cachefile->fd, |
670 | key, |
671 | value, |
672 | &disk_data, |
673 | write_extraargs, |
674 | old_attr, |
675 | new_attr, |
676 | dowrite, |
677 | is_clone ? false : true, // keep_me (only keep if this is not cloned pointer) |
678 | for_checkpoint, |
679 | is_clone //is_clone |
680 | ); |
681 | p->disk_data = disk_data; |
682 | if (is_clone) { |
683 | p->cloned_value_data = NULL; |
684 | ev->remove_cloned_data_size(p->cloned_value_size); |
685 | p->cloned_value_size = 0; |
686 | } |
687 | } |
688 | |
689 | |
690 | // |
691 | // This function writes a PAIR's value out to disk. Currently, it is called |
692 | // by get_and_pin functions that write a PAIR out for checkpoint, by |
693 | // evictor threads that evict dirty PAIRS, and by the checkpoint thread |
694 | // that needs to write out a dirty node for checkpoint. |
695 | // |
696 | // Requires on entry for p->mutex to NOT be held, otherwise |
697 | // calling cachetable_only_write_locked_data will be very expensive |
698 | // |
699 | static void cachetable_write_locked_pair( |
700 | evictor* ev, |
701 | PAIR p, |
702 | bool for_checkpoint |
703 | ) |
704 | { |
705 | PAIR_ATTR old_attr = p->attr; |
706 | PAIR_ATTR new_attr = p->attr; |
707 | // grabbing the disk_nb_mutex here ensures that |
708 | // after this point, no one is writing out a cloned value |
709 | // if we grab the disk_nb_mutex inside the if clause, |
710 | // then we may try to evict a PAIR that is in the process |
711 | // of having its clone be written out |
712 | pair_lock(p); |
713 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
714 | pair_unlock(p); |
715 | // make sure that assumption about cloned_value_data is true |
716 | // if we have grabbed the disk_nb_mutex, then that means that |
717 | // there should be no cloned value data |
718 | assert(p->cloned_value_data == NULL); |
719 | if (p->dirty) { |
720 | cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false); |
721 | // |
722 | // now let's update variables |
723 | // |
724 | if (new_attr.is_valid) { |
725 | p->attr = new_attr; |
726 | ev->change_pair_attr(old_attr, new_attr); |
727 | } |
728 | } |
729 | // the pair is no longer dirty once written |
730 | p->dirty = CACHETABLE_CLEAN; |
731 | pair_lock(p); |
732 | nb_mutex_unlock(&p->disk_nb_mutex); |
733 | pair_unlock(p); |
734 | } |
735 | |
736 | // Worker thread function to writes and evicts a pair from memory to its cachefile |
737 | static void cachetable_evicter(void* ) { |
738 | PAIR p = (PAIR)extra; |
739 | pair_list* pl = p->list; |
740 | CACHEFILE cf = p->cachefile; |
741 | pl->read_pending_exp_lock(); |
742 | bool for_checkpoint = p->checkpoint_pending; |
743 | p->checkpoint_pending = false; |
744 | // per the contract of evictor::evict_pair, |
745 | // the pair's mutex, p->mutex, must be held on entry |
746 | pair_lock(p); |
747 | p->ev->evict_pair(p, for_checkpoint); |
748 | pl->read_pending_exp_unlock(); |
749 | bjm_remove_background_job(cf->bjm); |
750 | } |
751 | |
752 | static void cachetable_partial_eviction(void* ) { |
753 | PAIR p = (PAIR)extra; |
754 | CACHEFILE cf = p->cachefile; |
755 | p->ev->do_partial_eviction(p); |
756 | bjm_remove_background_job(cf->bjm); |
757 | } |
758 | |
759 | void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) { |
760 | void* old_value = old_pair->value_data; |
761 | void* new_value = new_pair->value_data; |
762 | old_pair->value_data = new_value; |
763 | new_pair->value_data = old_value; |
764 | } |
765 | |
766 | void toku_cachetable_maybe_flush_some(CACHETABLE ct) { |
767 | // TODO: <CER> Maybe move this... |
768 | ct->ev.signal_eviction_thread(); |
769 | } |
770 | |
771 | // Initializes a pair's members. |
772 | // |
773 | void pair_init(PAIR p, |
774 | CACHEFILE cachefile, |
775 | CACHEKEY key, |
776 | void *value, |
777 | PAIR_ATTR attr, |
778 | enum cachetable_dirty dirty, |
779 | uint32_t fullhash, |
780 | CACHETABLE_WRITE_CALLBACK write_callback, |
781 | evictor *ev, |
782 | pair_list *list) |
783 | { |
784 | p->cachefile = cachefile; |
785 | p->key = key; |
786 | p->value_data = value; |
787 | p->cloned_value_data = NULL; |
788 | p->cloned_value_size = 0; |
789 | p->disk_data = NULL; |
790 | p->attr = attr; |
791 | p->dirty = dirty; |
792 | p->fullhash = fullhash; |
793 | |
794 | p->flush_callback = write_callback.flush_callback; |
795 | p->pe_callback = write_callback.pe_callback; |
796 | p->pe_est_callback = write_callback.pe_est_callback; |
797 | p->cleaner_callback = write_callback.cleaner_callback; |
798 | p->clone_callback = write_callback.clone_callback; |
799 | p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback; |
800 | p->write_extraargs = write_callback.write_extraargs; |
801 | |
802 | p->count = 0; // <CER> Is zero the correct init value? |
803 | p->refcount = 0; |
804 | p->num_waiting_on_refs = 0; |
805 | toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr); |
806 | p->checkpoint_pending = false; |
807 | |
808 | p->mutex = list->get_mutex_for_pair(fullhash); |
809 | assert(p->mutex); |
810 | p->value_rwlock.init(p->mutex |
811 | #ifdef TOKU_MYSQL_WITH_PFS |
812 | , |
813 | *cachetable_value_key |
814 | #endif |
815 | ); |
816 | nb_mutex_init(*cachetable_disk_nb_mutex_key, |
817 | *cachetable_disk_nb_rwlock_key, |
818 | &p->disk_nb_mutex); |
819 | |
820 | p->size_evicting_estimate = 0; // <CER> Is zero the correct init value? |
821 | |
822 | p->ev = ev; |
823 | p->list = list; |
824 | |
825 | p->clock_next = p->clock_prev = NULL; |
826 | p->pending_next = p->pending_prev = NULL; |
827 | p->cf_next = p->cf_prev = NULL; |
828 | p->hash_chain = NULL; |
829 | } |
830 | |
831 | // has ct locked on entry |
832 | // This function MUST NOT release and reacquire the cachetable lock |
833 | // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. |
834 | // |
835 | // Requires pair list's write lock to be held on entry. |
836 | // the pair's mutex must be held as wel |
837 | // |
838 | // |
839 | static PAIR cachetable_insert_at(CACHETABLE ct, |
840 | CACHEFILE cachefile, CACHEKEY key, void *value, |
841 | uint32_t fullhash, |
842 | PAIR_ATTR attr, |
843 | CACHETABLE_WRITE_CALLBACK write_callback, |
844 | enum cachetable_dirty dirty) { |
845 | PAIR MALLOC(p); |
846 | assert(p); |
847 | memset(p, 0, sizeof *p); |
848 | pair_init(p, |
849 | cachefile, |
850 | key, |
851 | value, |
852 | attr, |
853 | dirty, |
854 | fullhash, |
855 | write_callback, |
856 | &ct->ev, |
857 | &ct->list |
858 | ); |
859 | |
860 | ct->list.put(p); |
861 | ct->ev.add_pair_attr(attr); |
862 | return p; |
863 | } |
864 | |
865 | // on input, the write list lock must be held AND |
866 | // the pair's mutex must be held as wel |
867 | static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) { |
868 | ct->list.put(p); |
869 | ct->ev.add_pair_attr(attr); |
870 | } |
871 | |
872 | |
873 | // has ct locked on entry |
874 | // This function MUST NOT release and reacquire the cachetable lock |
875 | // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. |
876 | // |
877 | // Requires pair list's write lock to be held on entry |
878 | // |
879 | static void cachetable_put_internal( |
880 | CACHEFILE cachefile, |
881 | PAIR p, |
882 | void *value, |
883 | PAIR_ATTR attr, |
884 | CACHETABLE_PUT_CALLBACK put_callback |
885 | ) |
886 | { |
887 | CACHETABLE ct = cachefile->cachetable; |
888 | // |
889 | // |
890 | // TODO: (Zardosht), make code run in debug only |
891 | // |
892 | // |
893 | //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash); |
894 | //invariant_null(dummy_p); |
895 | cachetable_insert_pair_at(ct, p, attr); |
896 | invariant_notnull(put_callback); |
897 | put_callback(p->key, value, p); |
898 | } |
899 | |
900 | // Pair mutex (p->mutex) is may or may not be held on entry, |
901 | // Holding the pair mutex on entry is not important |
902 | // for performance or corrrectness |
903 | // Pair is pinned on entry |
904 | static void |
905 | clone_pair(evictor* ev, PAIR p) { |
906 | PAIR_ATTR old_attr = p->attr; |
907 | PAIR_ATTR new_attr; |
908 | long clone_size = 0; |
909 | |
910 | // act of cloning should be fast, |
911 | // not sure if we have to release |
912 | // and regrab the cachetable lock, |
913 | // but doing it for now |
914 | p->clone_callback( |
915 | p->value_data, |
916 | &p->cloned_value_data, |
917 | &clone_size, |
918 | &new_attr, |
919 | true, |
920 | p->write_extraargs |
921 | ); |
922 | |
923 | // now we need to do the same actions we would do |
924 | // if the PAIR had been written to disk |
925 | // |
926 | // because we hold the value_rwlock, |
927 | // it doesn't matter whether we clear |
928 | // the pending bit before the clone |
929 | // or after the clone |
930 | p->dirty = CACHETABLE_CLEAN; |
931 | if (new_attr.is_valid) { |
932 | p->attr = new_attr; |
933 | ev->change_pair_attr(old_attr, new_attr); |
934 | } |
935 | p->cloned_value_size = clone_size; |
936 | ev->add_cloned_data_size(p->cloned_value_size); |
937 | } |
938 | |
939 | static void checkpoint_cloned_pair(void* ) { |
940 | PAIR p = (PAIR)extra; |
941 | CACHETABLE ct = p->cachefile->cachetable; |
942 | PAIR_ATTR new_attr; |
943 | // note that pending lock is not needed here because |
944 | // we KNOW we are in the middle of a checkpoint |
945 | // and that a begin_checkpoint cannot happen |
946 | cachetable_only_write_locked_data( |
947 | p->ev, |
948 | p, |
949 | true, //for_checkpoint |
950 | &new_attr, |
951 | true //is_clone |
952 | ); |
953 | pair_lock(p); |
954 | nb_mutex_unlock(&p->disk_nb_mutex); |
955 | pair_unlock(p); |
956 | ct->cp.remove_background_job(); |
957 | } |
958 | |
959 | static void |
960 | checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { |
961 | toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p); |
962 | } |
963 | |
964 | |
965 | // |
966 | // Given a PAIR p with the value_rwlock altready held, do the following: |
967 | // - If the PAIR needs to be written out to disk for checkpoint: |
968 | // - If the PAIR is cloneable, clone the PAIR and place the work |
969 | // of writing the PAIR on a background thread. |
970 | // - If the PAIR is not cloneable, write the PAIR to disk for checkpoint |
971 | // on the current thread |
972 | // |
973 | // On entry, pair's mutex is NOT held |
974 | // |
975 | static void |
976 | write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending) |
977 | { |
978 | if (checkpoint_pending && p->checkpoint_complete_callback) { |
979 | p->checkpoint_complete_callback(p->value_data); |
980 | } |
981 | if (p->dirty && checkpoint_pending) { |
982 | if (p->clone_callback) { |
983 | pair_lock(p); |
984 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
985 | pair_unlock(p); |
986 | assert(!p->cloned_value_data); |
987 | clone_pair(&ct->ev, p); |
988 | assert(p->cloned_value_data); |
989 | // place it on the background thread and continue |
990 | // responsibility of writer thread to release disk_nb_mutex |
991 | ct->cp.add_background_job(); |
992 | checkpoint_cloned_pair_on_writer_thread(ct, p); |
993 | } |
994 | else { |
995 | // The pair is not cloneable, just write the pair to disk |
996 | // we already have p->value_rwlock and we just do the write in our own thread. |
997 | cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock |
998 | } |
999 | } |
1000 | } |
1001 | |
1002 | // On entry and exit: hold the pair's mutex (p->mutex) |
1003 | // Method: take write lock |
1004 | // maybe write out the node |
1005 | // Else release write lock |
1006 | // |
1007 | static void |
1008 | write_pair_for_checkpoint_thread (evictor* ev, PAIR p) |
1009 | { |
1010 | // Grab an exclusive lock on the pair. |
1011 | // If we grab an expensive lock, then other threads will return |
1012 | // TRY_AGAIN rather than waiting. In production, the only time |
1013 | // another thread will check if grabbing a lock is expensive is when |
1014 | // we have a clone_callback (FTNODEs), so the act of checkpointing |
1015 | // will be cheap. Also, much of the time we'll just be clearing |
1016 | // pending bits and that's definitely cheap. (see #5427) |
1017 | p->value_rwlock.write_lock(false); |
1018 | if (p->checkpoint_pending && p->checkpoint_complete_callback) { |
1019 | p->checkpoint_complete_callback(p->value_data); |
1020 | } |
1021 | if (p->dirty && p->checkpoint_pending) { |
1022 | if (p->clone_callback) { |
1023 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
1024 | assert(!p->cloned_value_data); |
1025 | clone_pair(ev, p); |
1026 | assert(p->cloned_value_data); |
1027 | } |
1028 | else { |
1029 | // The pair is not cloneable, just write the pair to disk |
1030 | // we already have p->value_rwlock and we just do the write in our own thread. |
1031 | // this will grab and release disk_nb_mutex |
1032 | pair_unlock(p); |
1033 | cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock |
1034 | pair_lock(p); |
1035 | } |
1036 | p->checkpoint_pending = false; |
1037 | |
1038 | // now release value_rwlock, before we write the PAIR out |
1039 | // so that the PAIR is available to client threads |
1040 | p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves. |
1041 | if (p->clone_callback) { |
1042 | // note that pending lock is not needed here because |
1043 | // we KNOW we are in the middle of a checkpoint |
1044 | // and that a begin_checkpoint cannot happen |
1045 | PAIR_ATTR attr; |
1046 | pair_unlock(p); |
1047 | cachetable_only_write_locked_data( |
1048 | ev, |
1049 | p, |
1050 | true, //for_checkpoint |
1051 | &attr, |
1052 | true //is_clone |
1053 | ); |
1054 | pair_lock(p); |
1055 | nb_mutex_unlock(&p->disk_nb_mutex); |
1056 | } |
1057 | } |
1058 | else { |
1059 | // |
1060 | // we may clear the pending bit here because we have |
1061 | // both the cachetable lock and the PAIR lock. |
1062 | // The rule, as mentioned in toku_cachetable_begin_checkpoint, |
1063 | // is that to clear the bit, we must have both the PAIR lock |
1064 | // and the pending lock |
1065 | // |
1066 | p->checkpoint_pending = false; |
1067 | p->value_rwlock.write_unlock(); |
1068 | } |
1069 | } |
1070 | |
1071 | // |
1072 | // For each PAIR associated with these CACHEFILEs and CACHEKEYs |
1073 | // if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR |
1074 | // to disk. |
1075 | // We assume the PAIRs passed in have been locked by the client that made calls |
1076 | // into the cachetable that eventually make it here. |
1077 | // |
1078 | static void checkpoint_dependent_pairs( |
1079 | CACHETABLE ct, |
1080 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
1081 | PAIR* dependent_pairs, |
1082 | bool* checkpoint_pending, |
1083 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
1084 | ) |
1085 | { |
1086 | for (uint32_t i =0; i < num_dependent_pairs; i++) { |
1087 | PAIR curr_dep_pair = dependent_pairs[i]; |
1088 | // we need to update the dirtyness of the dependent pair, |
1089 | // because the client may have dirtied it while holding its lock, |
1090 | // and if the pair is pending a checkpoint, it needs to be written out |
1091 | if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY; |
1092 | if (checkpoint_pending[i]) { |
1093 | write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]); |
1094 | } |
1095 | } |
1096 | } |
1097 | |
1098 | void toku_cachetable_put_with_dep_pairs( |
1099 | CACHEFILE cachefile, |
1100 | CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, |
1101 | void *value, |
1102 | PAIR_ATTR attr, |
1103 | CACHETABLE_WRITE_CALLBACK write_callback, |
1104 | void *get_key_and_fullhash_extra, |
1105 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
1106 | PAIR* dependent_pairs, |
1107 | enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs |
1108 | CACHEKEY* key, |
1109 | uint32_t* fullhash, |
1110 | CACHETABLE_PUT_CALLBACK put_callback |
1111 | ) |
1112 | { |
1113 | // |
1114 | // need to get the key and filehash |
1115 | // |
1116 | CACHETABLE ct = cachefile->cachetable; |
1117 | if (ct->ev.should_client_thread_sleep()) { |
1118 | ct->ev.wait_for_cache_pressure_to_subside(); |
1119 | } |
1120 | if (ct->ev.should_client_wake_eviction_thread()) { |
1121 | ct->ev.signal_eviction_thread(); |
1122 | } |
1123 | |
1124 | PAIR p = NULL; |
1125 | XMALLOC(p); |
1126 | memset(p, 0, sizeof *p); |
1127 | |
1128 | ct->list.write_list_lock(); |
1129 | get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); |
1130 | pair_init( |
1131 | p, |
1132 | cachefile, |
1133 | *key, |
1134 | value, |
1135 | attr, |
1136 | CACHETABLE_DIRTY, |
1137 | *fullhash, |
1138 | write_callback, |
1139 | &ct->ev, |
1140 | &ct->list |
1141 | ); |
1142 | pair_lock(p); |
1143 | p->value_rwlock.write_lock(true); |
1144 | cachetable_put_internal( |
1145 | cachefile, |
1146 | p, |
1147 | value, |
1148 | attr, |
1149 | put_callback |
1150 | ); |
1151 | pair_unlock(p); |
1152 | bool checkpoint_pending[num_dependent_pairs]; |
1153 | ct->list.write_pending_cheap_lock(); |
1154 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
1155 | checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
1156 | dependent_pairs[i]->checkpoint_pending = false; |
1157 | } |
1158 | ct->list.write_pending_cheap_unlock(); |
1159 | ct->list.write_list_unlock(); |
1160 | |
1161 | // |
1162 | // now that we have inserted the row, let's checkpoint the |
1163 | // dependent nodes, if they need checkpointing |
1164 | // |
1165 | checkpoint_dependent_pairs( |
1166 | ct, |
1167 | num_dependent_pairs, |
1168 | dependent_pairs, |
1169 | checkpoint_pending, |
1170 | dependent_dirty |
1171 | ); |
1172 | } |
1173 | |
1174 | void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr, |
1175 | CACHETABLE_WRITE_CALLBACK write_callback, |
1176 | CACHETABLE_PUT_CALLBACK put_callback |
1177 | ) { |
1178 | CACHETABLE ct = cachefile->cachetable; |
1179 | if (ct->ev.should_client_thread_sleep()) { |
1180 | ct->ev.wait_for_cache_pressure_to_subside(); |
1181 | } |
1182 | if (ct->ev.should_client_wake_eviction_thread()) { |
1183 | ct->ev.signal_eviction_thread(); |
1184 | } |
1185 | |
1186 | PAIR p = NULL; |
1187 | XMALLOC(p); |
1188 | memset(p, 0, sizeof *p); |
1189 | |
1190 | ct->list.write_list_lock(); |
1191 | pair_init( |
1192 | p, |
1193 | cachefile, |
1194 | key, |
1195 | value, |
1196 | attr, |
1197 | CACHETABLE_DIRTY, |
1198 | fullhash, |
1199 | write_callback, |
1200 | &ct->ev, |
1201 | &ct->list |
1202 | ); |
1203 | pair_lock(p); |
1204 | p->value_rwlock.write_lock(true); |
1205 | cachetable_put_internal( |
1206 | cachefile, |
1207 | p, |
1208 | value, |
1209 | attr, |
1210 | put_callback |
1211 | ); |
1212 | pair_unlock(p); |
1213 | ct->list.write_list_unlock(); |
1214 | } |
1215 | |
1216 | static uint64_t get_tnow(void) { |
1217 | struct timeval tv; |
1218 | int r = gettimeofday(&tv, NULL); assert(r == 0); |
1219 | return tv.tv_sec * 1000000ULL + tv.tv_usec; |
1220 | } |
1221 | |
1222 | // |
1223 | // cachetable lock and PAIR lock are held on entry |
1224 | // On exit, cachetable lock is still held, but PAIR lock |
1225 | // is either released. |
1226 | // |
1227 | // No locks are held on entry (besides the rwlock write lock of the PAIR) |
1228 | // |
1229 | static void |
1230 | do_partial_fetch( |
1231 | CACHETABLE ct, |
1232 | CACHEFILE cachefile, |
1233 | PAIR p, |
1234 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
1235 | void *, |
1236 | bool keep_pair_locked |
1237 | ) |
1238 | { |
1239 | PAIR_ATTR old_attr = p->attr; |
1240 | PAIR_ATTR new_attr = zero_attr; |
1241 | // As of Dr. No, only clean PAIRs may have pieces missing, |
1242 | // so we do a sanity check here. |
1243 | assert(!p->dirty); |
1244 | |
1245 | pair_lock(p); |
1246 | invariant(p->value_rwlock.writers()); |
1247 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
1248 | pair_unlock(p); |
1249 | int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr); |
1250 | lazy_assert_zero(r); |
1251 | p->attr = new_attr; |
1252 | ct->ev.change_pair_attr(old_attr, new_attr); |
1253 | pair_lock(p); |
1254 | nb_mutex_unlock(&p->disk_nb_mutex); |
1255 | if (!keep_pair_locked) { |
1256 | p->value_rwlock.write_unlock(); |
1257 | } |
1258 | pair_unlock(p); |
1259 | } |
1260 | |
1261 | void toku_cachetable_pf_pinned_pair( |
1262 | void* value, |
1263 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
1264 | void* , |
1265 | CACHEFILE cf, |
1266 | CACHEKEY key, |
1267 | uint32_t fullhash |
1268 | ) |
1269 | { |
1270 | PAIR_ATTR attr; |
1271 | PAIR p = NULL; |
1272 | CACHETABLE ct = cf->cachetable; |
1273 | ct->list.pair_lock_by_fullhash(fullhash); |
1274 | p = ct->list.find_pair(cf, key, fullhash); |
1275 | assert(p != NULL); |
1276 | assert(p->value_data == value); |
1277 | assert(p->value_rwlock.writers()); |
1278 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
1279 | pair_unlock(p); |
1280 | |
1281 | int fd = cf->fd; |
1282 | pf_callback(value, p->disk_data, read_extraargs, fd, &attr); |
1283 | |
1284 | pair_lock(p); |
1285 | nb_mutex_unlock(&p->disk_nb_mutex); |
1286 | pair_unlock(p); |
1287 | } |
1288 | |
1289 | int toku_cachetable_get_and_pin ( |
1290 | CACHEFILE cachefile, |
1291 | CACHEKEY key, |
1292 | uint32_t fullhash, |
1293 | void**value, |
1294 | long *sizep, |
1295 | CACHETABLE_WRITE_CALLBACK write_callback, |
1296 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
1297 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
1298 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
1299 | bool may_modify_value, |
1300 | void* // parameter for fetch_callback, pf_req_callback, and pf_callback |
1301 | ) |
1302 | { |
1303 | pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ; |
1304 | // We have separate parameters of read_extraargs and write_extraargs because |
1305 | // the lifetime of the two parameters are different. write_extraargs may be used |
1306 | // long after this function call (e.g. after a flush to disk), whereas read_extraargs |
1307 | // will not be used after this function returns. As a result, the caller may allocate |
1308 | // read_extraargs on the stack, whereas write_extraargs must be allocated |
1309 | // on the heap. |
1310 | return toku_cachetable_get_and_pin_with_dep_pairs ( |
1311 | cachefile, |
1312 | key, |
1313 | fullhash, |
1314 | value, |
1315 | sizep, |
1316 | write_callback, |
1317 | fetch_callback, |
1318 | pf_req_callback, |
1319 | pf_callback, |
1320 | lock_type, |
1321 | read_extraargs, |
1322 | 0, // number of dependent pairs that we may need to checkpoint |
1323 | NULL, // array of dependent pairs |
1324 | NULL // array stating dirty/cleanness of dependent pairs |
1325 | ); |
1326 | } |
1327 | |
1328 | // Read a pair from a cachefile into memory using the pair's fetch callback |
1329 | // on entry, pair mutex (p->mutex) is NOT held, but pair is pinned |
1330 | static void cachetable_fetch_pair( |
1331 | CACHETABLE ct, |
1332 | CACHEFILE cf, |
1333 | PAIR p, |
1334 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
1335 | void* , |
1336 | bool keep_pair_locked |
1337 | ) |
1338 | { |
1339 | // helgrind |
1340 | CACHEKEY key = p->key; |
1341 | uint32_t fullhash = p->fullhash; |
1342 | |
1343 | void *toku_value = NULL; |
1344 | void *disk_data = NULL; |
1345 | PAIR_ATTR attr; |
1346 | |
1347 | // FIXME this should be enum cachetable_dirty, right? |
1348 | int dirty = 0; |
1349 | |
1350 | pair_lock(p); |
1351 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
1352 | pair_unlock(p); |
1353 | |
1354 | int r; |
1355 | r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs); |
1356 | if (dirty) { |
1357 | p->dirty = CACHETABLE_DIRTY; |
1358 | } |
1359 | assert(r == 0); |
1360 | |
1361 | p->value_data = toku_value; |
1362 | p->disk_data = disk_data; |
1363 | p->attr = attr; |
1364 | ct->ev.add_pair_attr(attr); |
1365 | pair_lock(p); |
1366 | nb_mutex_unlock(&p->disk_nb_mutex); |
1367 | if (!keep_pair_locked) { |
1368 | p->value_rwlock.write_unlock(); |
1369 | } |
1370 | pair_unlock(p); |
1371 | } |
1372 | |
1373 | static bool get_checkpoint_pending(PAIR p, pair_list* pl) { |
1374 | bool checkpoint_pending = false; |
1375 | pl->read_pending_cheap_lock(); |
1376 | checkpoint_pending = p->checkpoint_pending; |
1377 | p->checkpoint_pending = false; |
1378 | pl->read_pending_cheap_unlock(); |
1379 | return checkpoint_pending; |
1380 | } |
1381 | |
1382 | static void checkpoint_pair_and_dependent_pairs( |
1383 | CACHETABLE ct, |
1384 | PAIR p, |
1385 | bool p_is_pending_checkpoint, |
1386 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
1387 | PAIR* dependent_pairs, |
1388 | bool* dependent_pairs_pending_checkpoint, |
1389 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
1390 | ) |
1391 | { |
1392 | |
1393 | // |
1394 | // A checkpoint must not begin while we are checking dependent pairs or pending bits. |
1395 | // Here is why. |
1396 | // |
1397 | // Now that we have all of the locks on the pairs we |
1398 | // care about, we can take care of the necessary checkpointing. |
1399 | // For each pair, we simply need to write the pair if it is |
1400 | // pending a checkpoint. If no pair is pending a checkpoint, |
1401 | // then all of this work will be done with the cachetable lock held, |
1402 | // so we don't need to worry about a checkpoint beginning |
1403 | // in the middle of any operation below. If some pair |
1404 | // is pending a checkpoint, then the checkpoint thread |
1405 | // will not complete its current checkpoint until it can |
1406 | // successfully grab a lock on the pending pair and |
1407 | // remove it from its list of pairs pending a checkpoint. |
1408 | // This cannot be done until we release the lock |
1409 | // that we have, which is not done in this function. |
1410 | // So, the point is, it is impossible for a checkpoint |
1411 | // to begin while we write any of these locked pairs |
1412 | // for checkpoint, even though writing a pair releases |
1413 | // the cachetable lock. |
1414 | // |
1415 | write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint); |
1416 | |
1417 | checkpoint_dependent_pairs( |
1418 | ct, |
1419 | num_dependent_pairs, |
1420 | dependent_pairs, |
1421 | dependent_pairs_pending_checkpoint, |
1422 | dependent_dirty |
1423 | ); |
1424 | } |
1425 | |
1426 | static void unpin_pair(PAIR p, bool read_lock_grabbed) { |
1427 | if (read_lock_grabbed) { |
1428 | p->value_rwlock.read_unlock(); |
1429 | } |
1430 | else { |
1431 | p->value_rwlock.write_unlock(); |
1432 | } |
1433 | } |
1434 | |
1435 | |
1436 | // on input, the pair's mutex is held, |
1437 | // on output, the pair's mutex is not held. |
1438 | // if true, we must try again, and pair is not pinned |
1439 | // if false, we succeeded, the pair is pinned |
1440 | static bool try_pin_pair( |
1441 | PAIR p, |
1442 | CACHETABLE ct, |
1443 | CACHEFILE cachefile, |
1444 | pair_lock_type lock_type, |
1445 | uint32_t num_dependent_pairs, |
1446 | PAIR* dependent_pairs, |
1447 | enum cachetable_dirty* dependent_dirty, |
1448 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
1449 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
1450 | void* , |
1451 | bool already_slept |
1452 | ) |
1453 | { |
1454 | bool dep_checkpoint_pending[num_dependent_pairs]; |
1455 | bool try_again = true; |
1456 | bool expensive = (lock_type == PL_WRITE_EXPENSIVE); |
1457 | if (lock_type != PL_READ) { |
1458 | p->value_rwlock.write_lock(expensive); |
1459 | } |
1460 | else { |
1461 | p->value_rwlock.read_lock(); |
1462 | } |
1463 | pair_touch(p); |
1464 | pair_unlock(p); |
1465 | |
1466 | bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
1467 | |
1468 | if (partial_fetch_required) { |
1469 | toku::context pf_ctx(CTX_PARTIAL_FETCH); |
1470 | |
1471 | if (ct->ev.should_client_thread_sleep() && !already_slept) { |
1472 | pair_lock(p); |
1473 | unpin_pair(p, (lock_type == PL_READ)); |
1474 | pair_unlock(p); |
1475 | try_again = true; |
1476 | goto exit; |
1477 | } |
1478 | if (ct->ev.should_client_wake_eviction_thread()) { |
1479 | ct->ev.signal_eviction_thread(); |
1480 | } |
1481 | // |
1482 | // Just because the PAIR exists does necessarily mean the all the data the caller requires |
1483 | // is in memory. A partial fetch may be required, which is evaluated above |
1484 | // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock |
1485 | // and then call a callback to retrieve what we need |
1486 | // |
1487 | assert(partial_fetch_required); |
1488 | // As of Dr. No, only clean PAIRs may have pieces missing, |
1489 | // so we do a sanity check here. |
1490 | assert(!p->dirty); |
1491 | |
1492 | if (lock_type == PL_READ) { |
1493 | pair_lock(p); |
1494 | p->value_rwlock.read_unlock(); |
1495 | p->value_rwlock.write_lock(true); |
1496 | pair_unlock(p); |
1497 | } |
1498 | else if (lock_type == PL_WRITE_CHEAP) { |
1499 | pair_lock(p); |
1500 | p->value_rwlock.write_unlock(); |
1501 | p->value_rwlock.write_lock(true); |
1502 | pair_unlock(p); |
1503 | } |
1504 | |
1505 | partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
1506 | if (partial_fetch_required) { |
1507 | do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true); |
1508 | } |
1509 | if (lock_type == PL_READ) { |
1510 | // |
1511 | // TODO: Zardosht, somehow ensure that a partial eviction cannot happen |
1512 | // between these two calls |
1513 | // |
1514 | pair_lock(p); |
1515 | p->value_rwlock.write_unlock(); |
1516 | p->value_rwlock.read_lock(); |
1517 | pair_unlock(p); |
1518 | } |
1519 | else if (lock_type == PL_WRITE_CHEAP) { |
1520 | pair_lock(p); |
1521 | p->value_rwlock.write_unlock(); |
1522 | p->value_rwlock.write_lock(false); |
1523 | pair_unlock(p); |
1524 | } |
1525 | // small hack here for #5439, |
1526 | // for queries, pf_req_callback does some work for the caller, |
1527 | // that information may be out of date after a write_unlock |
1528 | // followed by a relock, so we do it again. |
1529 | bool pf_required = pf_req_callback(p->value_data,read_extraargs); |
1530 | assert(!pf_required); |
1531 | } |
1532 | |
1533 | if (lock_type != PL_READ) { |
1534 | ct->list.read_pending_cheap_lock(); |
1535 | bool p_checkpoint_pending = p->checkpoint_pending; |
1536 | p->checkpoint_pending = false; |
1537 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
1538 | dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
1539 | dependent_pairs[i]->checkpoint_pending = false; |
1540 | } |
1541 | ct->list.read_pending_cheap_unlock(); |
1542 | checkpoint_pair_and_dependent_pairs( |
1543 | ct, |
1544 | p, |
1545 | p_checkpoint_pending, |
1546 | num_dependent_pairs, |
1547 | dependent_pairs, |
1548 | dep_checkpoint_pending, |
1549 | dependent_dirty |
1550 | ); |
1551 | } |
1552 | |
1553 | try_again = false; |
1554 | exit: |
1555 | return try_again; |
1556 | } |
1557 | |
1558 | int toku_cachetable_get_and_pin_with_dep_pairs ( |
1559 | CACHEFILE cachefile, |
1560 | CACHEKEY key, |
1561 | uint32_t fullhash, |
1562 | void**value, |
1563 | long *sizep, |
1564 | CACHETABLE_WRITE_CALLBACK write_callback, |
1565 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
1566 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
1567 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
1568 | pair_lock_type lock_type, |
1569 | void* , // parameter for fetch_callback, pf_req_callback, and pf_callback |
1570 | uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint |
1571 | PAIR* dependent_pairs, |
1572 | enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs |
1573 | ) |
1574 | // See cachetable/cachetable.h |
1575 | { |
1576 | CACHETABLE ct = cachefile->cachetable; |
1577 | bool wait = false; |
1578 | bool already_slept = false; |
1579 | bool dep_checkpoint_pending[num_dependent_pairs]; |
1580 | |
1581 | // |
1582 | // If in the process of pinning the node we add data to the cachetable via a partial fetch |
1583 | // or a full fetch, we may need to first sleep because there is too much data in the |
1584 | // cachetable. In those cases, we set the bool wait to true and goto try_again, so that |
1585 | // we can do our sleep and then restart the function. |
1586 | // |
1587 | beginning: |
1588 | if (wait) { |
1589 | // We shouldn't be holding the read list lock while |
1590 | // waiting for the evictor to remove pairs. |
1591 | already_slept = true; |
1592 | ct->ev.wait_for_cache_pressure_to_subside(); |
1593 | } |
1594 | |
1595 | ct->list.pair_lock_by_fullhash(fullhash); |
1596 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
1597 | if (p) { |
1598 | // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash) |
1599 | // on exit, does not hold p->mutex |
1600 | bool try_again = try_pin_pair( |
1601 | p, |
1602 | ct, |
1603 | cachefile, |
1604 | lock_type, |
1605 | num_dependent_pairs, |
1606 | dependent_pairs, |
1607 | dependent_dirty, |
1608 | pf_req_callback, |
1609 | pf_callback, |
1610 | read_extraargs, |
1611 | already_slept |
1612 | ); |
1613 | if (try_again) { |
1614 | wait = true; |
1615 | goto beginning; |
1616 | } |
1617 | else { |
1618 | goto got_value; |
1619 | } |
1620 | } |
1621 | else { |
1622 | toku::context fetch_ctx(CTX_FULL_FETCH); |
1623 | |
1624 | ct->list.pair_unlock_by_fullhash(fullhash); |
1625 | // we only want to sleep once per call to get_and_pin. If we have already |
1626 | // slept and there is still cache pressure, then we might as |
1627 | // well just complete the call, because the sleep did not help |
1628 | // By sleeping only once per get_and_pin, we prevent starvation and ensure |
1629 | // that we make progress (however slow) on each thread, which allows |
1630 | // assumptions of the form 'x will eventually happen'. |
1631 | // This happens in extreme scenarios. |
1632 | if (ct->ev.should_client_thread_sleep() && !already_slept) { |
1633 | wait = true; |
1634 | goto beginning; |
1635 | } |
1636 | if (ct->ev.should_client_wake_eviction_thread()) { |
1637 | ct->ev.signal_eviction_thread(); |
1638 | } |
1639 | // Since the pair was not found, we need the write list |
1640 | // lock to add it. So, we have to release the read list lock |
1641 | // first. |
1642 | ct->list.write_list_lock(); |
1643 | ct->list.pair_lock_by_fullhash(fullhash); |
1644 | p = ct->list.find_pair(cachefile, key, fullhash); |
1645 | if (p != NULL) { |
1646 | ct->list.write_list_unlock(); |
1647 | // on entry, holds p->mutex, |
1648 | // on exit, does not hold p->mutex |
1649 | bool try_again = try_pin_pair( |
1650 | p, |
1651 | ct, |
1652 | cachefile, |
1653 | lock_type, |
1654 | num_dependent_pairs, |
1655 | dependent_pairs, |
1656 | dependent_dirty, |
1657 | pf_req_callback, |
1658 | pf_callback, |
1659 | read_extraargs, |
1660 | already_slept |
1661 | ); |
1662 | if (try_again) { |
1663 | wait = true; |
1664 | goto beginning; |
1665 | } |
1666 | else { |
1667 | goto got_value; |
1668 | } |
1669 | } |
1670 | assert(p == NULL); |
1671 | |
1672 | // Insert a PAIR into the cachetable |
1673 | // NOTE: At this point we still have the write list lock held. |
1674 | p = cachetable_insert_at( |
1675 | ct, |
1676 | cachefile, |
1677 | key, |
1678 | zero_value, |
1679 | fullhash, |
1680 | zero_attr, |
1681 | write_callback, |
1682 | CACHETABLE_CLEAN |
1683 | ); |
1684 | invariant_notnull(p); |
1685 | |
1686 | // Pin the pair. |
1687 | p->value_rwlock.write_lock(true); |
1688 | pair_unlock(p); |
1689 | |
1690 | |
1691 | if (lock_type != PL_READ) { |
1692 | ct->list.read_pending_cheap_lock(); |
1693 | invariant(!p->checkpoint_pending); |
1694 | for (uint32_t i = 0; i < num_dependent_pairs; i++) { |
1695 | dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; |
1696 | dependent_pairs[i]->checkpoint_pending = false; |
1697 | } |
1698 | ct->list.read_pending_cheap_unlock(); |
1699 | } |
1700 | // We should release the lock before we perform |
1701 | // these expensive operations. |
1702 | ct->list.write_list_unlock(); |
1703 | |
1704 | if (lock_type != PL_READ) { |
1705 | checkpoint_dependent_pairs( |
1706 | ct, |
1707 | num_dependent_pairs, |
1708 | dependent_pairs, |
1709 | dep_checkpoint_pending, |
1710 | dependent_dirty |
1711 | ); |
1712 | } |
1713 | uint64_t t0 = get_tnow(); |
1714 | |
1715 | // Retrieve the value of the PAIR from disk. |
1716 | // The pair being fetched will be marked as pending if a checkpoint happens during the |
1717 | // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean. |
1718 | cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true); |
1719 | cachetable_miss++; |
1720 | cachetable_misstime += get_tnow() - t0; |
1721 | |
1722 | // If the lock_type requested was a PL_READ, we downgrade to PL_READ, |
1723 | // but if the request was for a PL_WRITE_CHEAP, we don't bother |
1724 | // downgrading, because we would have to possibly resolve the |
1725 | // checkpointing again, and that would just make this function even |
1726 | // messier. |
1727 | // |
1728 | // TODO(yoni): in case of PL_WRITE_CHEAP, write and use |
1729 | // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better) |
1730 | // to downgrade from an expensive write lock to a cheap one |
1731 | if (lock_type == PL_READ) { |
1732 | pair_lock(p); |
1733 | p->value_rwlock.write_unlock(); |
1734 | p->value_rwlock.read_lock(); |
1735 | pair_unlock(p); |
1736 | // small hack here for #5439, |
1737 | // for queries, pf_req_callback does some work for the caller, |
1738 | // that information may be out of date after a write_unlock |
1739 | // followed by a read_lock, so we do it again. |
1740 | bool pf_required = pf_req_callback(p->value_data,read_extraargs); |
1741 | assert(!pf_required); |
1742 | } |
1743 | goto got_value; |
1744 | } |
1745 | got_value: |
1746 | *value = p->value_data; |
1747 | if (sizep) *sizep = p->attr.size; |
1748 | return 0; |
1749 | } |
1750 | |
1751 | // Lookup a key in the cachetable. If it is found and it is not being written, then |
1752 | // acquire a read lock on the pair, update the LRU list, and return sucess. |
1753 | // |
1754 | // However, if the page is clean or has checkpoint pending, don't return success. |
1755 | // This will minimize the number of dirty nodes. |
1756 | // Rationale: maybe_get_and_pin is used when the system has an alternative to modifying a node. |
1757 | // In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O. |
1758 | // For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify |
1759 | // the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit. |
1760 | // Similarly, if the checkpoint is actually pending, we don't want to block on it. |
1761 | int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { |
1762 | CACHETABLE ct = cachefile->cachetable; |
1763 | int r = -1; |
1764 | ct->list.pair_lock_by_fullhash(fullhash); |
1765 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
1766 | if (p) { |
1767 | const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); |
1768 | bool got_lock = false; |
1769 | switch (lock_type) { |
1770 | case PL_READ: |
1771 | if (p->value_rwlock.try_read_lock()) { |
1772 | got_lock = p->dirty; |
1773 | |
1774 | if (!got_lock) { |
1775 | p->value_rwlock.read_unlock(); |
1776 | } |
1777 | } |
1778 | break; |
1779 | case PL_WRITE_CHEAP: |
1780 | case PL_WRITE_EXPENSIVE: |
1781 | if (p->value_rwlock.try_write_lock(lock_is_expensive)) { |
1782 | // we got the lock fast, so continue |
1783 | ct->list.read_pending_cheap_lock(); |
1784 | |
1785 | // if pending a checkpoint, then we don't want to return |
1786 | // the value to the user, because we are responsible for |
1787 | // handling the checkpointing, which we do not want to do, |
1788 | // because it is expensive |
1789 | got_lock = p->dirty && !p->checkpoint_pending; |
1790 | |
1791 | ct->list.read_pending_cheap_unlock(); |
1792 | if (!got_lock) { |
1793 | p->value_rwlock.write_unlock(); |
1794 | } |
1795 | } |
1796 | break; |
1797 | } |
1798 | if (got_lock) { |
1799 | pair_touch(p); |
1800 | *value = p->value_data; |
1801 | r = 0; |
1802 | } |
1803 | } |
1804 | ct->list.pair_unlock_by_fullhash(fullhash); |
1805 | return r; |
1806 | } |
1807 | |
1808 | //Used by flusher threads to possibly pin child on client thread if pinning is cheap |
1809 | //Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless). |
1810 | //All other conditions remain the same. |
1811 | int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { |
1812 | CACHETABLE ct = cachefile->cachetable; |
1813 | int r = -1; |
1814 | ct->list.pair_lock_by_fullhash(fullhash); |
1815 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
1816 | if (p) { |
1817 | const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); |
1818 | bool got_lock = false; |
1819 | switch (lock_type) { |
1820 | case PL_READ: |
1821 | if (p->value_rwlock.try_read_lock()) { |
1822 | got_lock = true; |
1823 | } else if (!p->value_rwlock.read_lock_is_expensive()) { |
1824 | p->value_rwlock.write_lock(lock_is_expensive); |
1825 | got_lock = true; |
1826 | } |
1827 | if (got_lock) { |
1828 | pair_touch(p); |
1829 | } |
1830 | pair_unlock(p); |
1831 | break; |
1832 | case PL_WRITE_CHEAP: |
1833 | case PL_WRITE_EXPENSIVE: |
1834 | if (p->value_rwlock.try_write_lock(lock_is_expensive)) { |
1835 | got_lock = true; |
1836 | } else if (!p->value_rwlock.write_lock_is_expensive()) { |
1837 | p->value_rwlock.write_lock(lock_is_expensive); |
1838 | got_lock = true; |
1839 | } |
1840 | if (got_lock) { |
1841 | pair_touch(p); |
1842 | } |
1843 | pair_unlock(p); |
1844 | if (got_lock) { |
1845 | bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); |
1846 | write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); |
1847 | } |
1848 | break; |
1849 | } |
1850 | if (got_lock) { |
1851 | *value = p->value_data; |
1852 | r = 0; |
1853 | } |
1854 | } else { |
1855 | ct->list.pair_unlock_by_fullhash(fullhash); |
1856 | } |
1857 | return r; |
1858 | } |
1859 | |
1860 | // |
1861 | // internal function to unpin a PAIR. |
1862 | // As of Clayface, this is may be called in two ways: |
1863 | // - with flush false |
1864 | // - with flush true |
1865 | // The first is for when this is run during run_unlockers in |
1866 | // toku_cachetable_get_and_pin_nonblocking, the second is during |
1867 | // normal operations. Only during normal operations do we want to possibly |
1868 | // induce evictions or sleep. |
1869 | // |
1870 | static int |
1871 | cachetable_unpin_internal( |
1872 | CACHEFILE cachefile, |
1873 | PAIR p, |
1874 | enum cachetable_dirty dirty, |
1875 | PAIR_ATTR attr, |
1876 | bool flush |
1877 | ) |
1878 | { |
1879 | invariant_notnull(p); |
1880 | |
1881 | CACHETABLE ct = cachefile->cachetable; |
1882 | bool added_data_to_cachetable = false; |
1883 | |
1884 | // hack for #3969, only exists in case where we run unlockers |
1885 | pair_lock(p); |
1886 | PAIR_ATTR old_attr = p->attr; |
1887 | PAIR_ATTR new_attr = attr; |
1888 | if (dirty) { |
1889 | p->dirty = CACHETABLE_DIRTY; |
1890 | } |
1891 | if (attr.is_valid) { |
1892 | p->attr = attr; |
1893 | } |
1894 | bool read_lock_grabbed = p->value_rwlock.readers() != 0; |
1895 | unpin_pair(p, read_lock_grabbed); |
1896 | pair_unlock(p); |
1897 | |
1898 | if (attr.is_valid) { |
1899 | if (new_attr.size > old_attr.size) { |
1900 | added_data_to_cachetable = true; |
1901 | } |
1902 | ct->ev.change_pair_attr(old_attr, new_attr); |
1903 | } |
1904 | |
1905 | // see comments above this function to understand this code |
1906 | if (flush && added_data_to_cachetable) { |
1907 | if (ct->ev.should_client_thread_sleep()) { |
1908 | ct->ev.wait_for_cache_pressure_to_subside(); |
1909 | } |
1910 | if (ct->ev.should_client_wake_eviction_thread()) { |
1911 | ct->ev.signal_eviction_thread(); |
1912 | } |
1913 | } |
1914 | return 0; |
1915 | } |
1916 | |
1917 | int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
1918 | return cachetable_unpin_internal(cachefile, p, dirty, attr, true); |
1919 | } |
1920 | int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
1921 | return cachetable_unpin_internal(cachefile, p, dirty, attr, false); |
1922 | } |
1923 | |
1924 | static void |
1925 | run_unlockers (UNLOCKERS unlockers) { |
1926 | while (unlockers) { |
1927 | assert(unlockers->locked); |
1928 | unlockers->locked = false; |
1929 | unlockers->f(unlockers->extra); |
1930 | unlockers=unlockers->next; |
1931 | } |
1932 | } |
1933 | |
1934 | // |
1935 | // This function tries to pin the pair without running the unlockers. |
1936 | // If it can pin the pair cheaply, it does so, and returns 0. |
1937 | // If the pin will be expensive, it runs unlockers, |
1938 | // pins the pair, then releases the pin, |
1939 | // and then returns TOKUDB_TRY_AGAIN |
1940 | // |
1941 | // on entry, pair mutex is held, |
1942 | // on exit, pair mutex is NOT held |
1943 | static int |
1944 | maybe_pin_pair( |
1945 | PAIR p, |
1946 | pair_lock_type lock_type, |
1947 | UNLOCKERS unlockers |
1948 | ) |
1949 | { |
1950 | int retval = 0; |
1951 | bool expensive = (lock_type == PL_WRITE_EXPENSIVE); |
1952 | |
1953 | // we can pin the PAIR. In each case, we check to see |
1954 | // if acquiring the pin is expensive. If so, we run the unlockers, set the |
1955 | // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR. |
1956 | // If not, then we pin the PAIR, keep retval at 0, and do not |
1957 | // run the unlockers, as we intend to return the value to the user |
1958 | if (lock_type == PL_READ) { |
1959 | if (p->value_rwlock.read_lock_is_expensive()) { |
1960 | pair_add_ref_unlocked(p); |
1961 | pair_unlock(p); |
1962 | run_unlockers(unlockers); |
1963 | retval = TOKUDB_TRY_AGAIN; |
1964 | pair_lock(p); |
1965 | pair_release_ref_unlocked(p); |
1966 | } |
1967 | p->value_rwlock.read_lock(); |
1968 | } |
1969 | else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){ |
1970 | if (p->value_rwlock.write_lock_is_expensive()) { |
1971 | pair_add_ref_unlocked(p); |
1972 | pair_unlock(p); |
1973 | run_unlockers(unlockers); |
1974 | // change expensive to false because |
1975 | // we will unpin the pair immedietely |
1976 | // after pinning it |
1977 | expensive = false; |
1978 | retval = TOKUDB_TRY_AGAIN; |
1979 | pair_lock(p); |
1980 | pair_release_ref_unlocked(p); |
1981 | } |
1982 | p->value_rwlock.write_lock(expensive); |
1983 | } |
1984 | else { |
1985 | abort(); |
1986 | } |
1987 | |
1988 | if (retval == TOKUDB_TRY_AGAIN) { |
1989 | unpin_pair(p, (lock_type == PL_READ)); |
1990 | } |
1991 | pair_touch(p); |
1992 | pair_unlock(p); |
1993 | return retval; |
1994 | } |
1995 | |
1996 | int toku_cachetable_get_and_pin_nonblocking( |
1997 | CACHEFILE cf, |
1998 | CACHEKEY key, |
1999 | uint32_t fullhash, |
2000 | void**value, |
2001 | long* UU(sizep), |
2002 | CACHETABLE_WRITE_CALLBACK write_callback, |
2003 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
2004 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
2005 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
2006 | pair_lock_type lock_type, |
2007 | void *, |
2008 | UNLOCKERS unlockers |
2009 | ) |
2010 | // See cachetable/cachetable.h. |
2011 | { |
2012 | CACHETABLE ct = cf->cachetable; |
2013 | assert(lock_type == PL_READ || |
2014 | lock_type == PL_WRITE_CHEAP || |
2015 | lock_type == PL_WRITE_EXPENSIVE |
2016 | ); |
2017 | try_again: |
2018 | ct->list.pair_lock_by_fullhash(fullhash); |
2019 | PAIR p = ct->list.find_pair(cf, key, fullhash); |
2020 | if (p == NULL) { |
2021 | toku::context fetch_ctx(CTX_FULL_FETCH); |
2022 | |
2023 | // Not found |
2024 | ct->list.pair_unlock_by_fullhash(fullhash); |
2025 | ct->list.write_list_lock(); |
2026 | ct->list.pair_lock_by_fullhash(fullhash); |
2027 | p = ct->list.find_pair(cf, key, fullhash); |
2028 | if (p != NULL) { |
2029 | // we just did another search with the write list lock and |
2030 | // found the pair this means that in between our |
2031 | // releasing the read list lock and grabbing the write list lock, |
2032 | // another thread snuck in and inserted the PAIR into |
2033 | // the cachetable. For simplicity, we just return |
2034 | // to the top and restart the function |
2035 | ct->list.write_list_unlock(); |
2036 | ct->list.pair_unlock_by_fullhash(fullhash); |
2037 | goto try_again; |
2038 | } |
2039 | |
2040 | p = cachetable_insert_at( |
2041 | ct, |
2042 | cf, |
2043 | key, |
2044 | zero_value, |
2045 | fullhash, |
2046 | zero_attr, |
2047 | write_callback, |
2048 | CACHETABLE_CLEAN |
2049 | ); |
2050 | assert(p); |
2051 | // grab expensive write lock, because we are about to do a fetch |
2052 | // off disk |
2053 | // No one can access this pair because |
2054 | // we hold the write list lock and we just injected |
2055 | // the pair into the cachetable. Therefore, this lock acquisition |
2056 | // will not block. |
2057 | p->value_rwlock.write_lock(true); |
2058 | pair_unlock(p); |
2059 | run_unlockers(unlockers); // we hold the write list_lock. |
2060 | ct->list.write_list_unlock(); |
2061 | |
2062 | // at this point, only the pair is pinned, |
2063 | // and no pair mutex held, and |
2064 | // no list lock is held |
2065 | uint64_t t0 = get_tnow(); |
2066 | cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false); |
2067 | cachetable_miss++; |
2068 | cachetable_misstime += get_tnow() - t0; |
2069 | |
2070 | if (ct->ev.should_client_thread_sleep()) { |
2071 | ct->ev.wait_for_cache_pressure_to_subside(); |
2072 | } |
2073 | if (ct->ev.should_client_wake_eviction_thread()) { |
2074 | ct->ev.signal_eviction_thread(); |
2075 | } |
2076 | |
2077 | return TOKUDB_TRY_AGAIN; |
2078 | } |
2079 | else { |
2080 | int r = maybe_pin_pair(p, lock_type, unlockers); |
2081 | if (r == TOKUDB_TRY_AGAIN) { |
2082 | return TOKUDB_TRY_AGAIN; |
2083 | } |
2084 | assert_zero(r); |
2085 | |
2086 | if (lock_type != PL_READ) { |
2087 | bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); |
2088 | write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); |
2089 | } |
2090 | |
2091 | // At this point, we have pinned the PAIR |
2092 | // and resolved its checkpointing. The pair's |
2093 | // mutex is not held. The read list lock IS held. Before |
2094 | // returning the PAIR to the user, we must |
2095 | // still check for partial fetch |
2096 | bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
2097 | if (partial_fetch_required) { |
2098 | toku::context fetch_ctx(CTX_PARTIAL_FETCH); |
2099 | |
2100 | run_unlockers(unlockers); |
2101 | |
2102 | // we are now getting an expensive write lock, because we |
2103 | // are doing a partial fetch. So, if we previously have |
2104 | // either a read lock or a cheap write lock, we need to |
2105 | // release and reacquire the correct lock type |
2106 | if (lock_type == PL_READ) { |
2107 | pair_lock(p); |
2108 | p->value_rwlock.read_unlock(); |
2109 | p->value_rwlock.write_lock(true); |
2110 | pair_unlock(p); |
2111 | } |
2112 | else if (lock_type == PL_WRITE_CHEAP) { |
2113 | pair_lock(p); |
2114 | p->value_rwlock.write_unlock(); |
2115 | p->value_rwlock.write_lock(true); |
2116 | pair_unlock(p); |
2117 | } |
2118 | |
2119 | // Now wait for the I/O to occur. |
2120 | partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); |
2121 | if (partial_fetch_required) { |
2122 | do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false); |
2123 | } |
2124 | else { |
2125 | pair_lock(p); |
2126 | p->value_rwlock.write_unlock(); |
2127 | pair_unlock(p); |
2128 | } |
2129 | |
2130 | if (ct->ev.should_client_thread_sleep()) { |
2131 | ct->ev.wait_for_cache_pressure_to_subside(); |
2132 | } |
2133 | if (ct->ev.should_client_wake_eviction_thread()) { |
2134 | ct->ev.signal_eviction_thread(); |
2135 | } |
2136 | |
2137 | return TOKUDB_TRY_AGAIN; |
2138 | } |
2139 | else { |
2140 | *value = p->value_data; |
2141 | return 0; |
2142 | } |
2143 | } |
2144 | // We should not get here. Above code should hit a return in all cases. |
2145 | abort(); |
2146 | } |
2147 | |
2148 | struct cachefile_prefetch_args { |
2149 | PAIR p; |
2150 | CACHETABLE_FETCH_CALLBACK fetch_callback; |
2151 | void* ; |
2152 | }; |
2153 | |
2154 | struct cachefile_partial_prefetch_args { |
2155 | PAIR p; |
2156 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback; |
2157 | void *; |
2158 | }; |
2159 | |
2160 | // Worker thread function to read a pair from a cachefile to memory |
2161 | static void cachetable_reader(void* ) { |
2162 | struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra; |
2163 | CACHEFILE cf = cpargs->p->cachefile; |
2164 | CACHETABLE ct = cf->cachetable; |
2165 | cachetable_fetch_pair( |
2166 | ct, |
2167 | cpargs->p->cachefile, |
2168 | cpargs->p, |
2169 | cpargs->fetch_callback, |
2170 | cpargs->read_extraargs, |
2171 | false |
2172 | ); |
2173 | bjm_remove_background_job(cf->bjm); |
2174 | toku_free(cpargs); |
2175 | } |
2176 | |
2177 | static void cachetable_partial_reader(void* ) { |
2178 | struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra; |
2179 | CACHEFILE cf = cpargs->p->cachefile; |
2180 | CACHETABLE ct = cf->cachetable; |
2181 | do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false); |
2182 | bjm_remove_background_job(cf->bjm); |
2183 | toku_free(cpargs); |
2184 | } |
2185 | |
2186 | int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash, |
2187 | CACHETABLE_WRITE_CALLBACK write_callback, |
2188 | CACHETABLE_FETCH_CALLBACK fetch_callback, |
2189 | CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, |
2190 | CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, |
2191 | void *, |
2192 | bool *doing_prefetch) |
2193 | // Effect: See the documentation for this function in cachetable/cachetable.h |
2194 | { |
2195 | int r = 0; |
2196 | PAIR p = NULL; |
2197 | if (doing_prefetch) { |
2198 | *doing_prefetch = false; |
2199 | } |
2200 | CACHETABLE ct = cf->cachetable; |
2201 | // if cachetable has too much data, don't bother prefetching |
2202 | if (ct->ev.should_client_thread_sleep()) { |
2203 | goto exit; |
2204 | } |
2205 | ct->list.pair_lock_by_fullhash(fullhash); |
2206 | // lookup |
2207 | p = ct->list.find_pair(cf, key, fullhash); |
2208 | // if not found then create a pair and fetch it |
2209 | if (p == NULL) { |
2210 | cachetable_prefetches++; |
2211 | ct->list.pair_unlock_by_fullhash(fullhash); |
2212 | ct->list.write_list_lock(); |
2213 | ct->list.pair_lock_by_fullhash(fullhash); |
2214 | p = ct->list.find_pair(cf, key, fullhash); |
2215 | if (p != NULL) { |
2216 | ct->list.write_list_unlock(); |
2217 | goto found_pair; |
2218 | } |
2219 | |
2220 | r = bjm_add_background_job(cf->bjm); |
2221 | assert_zero(r); |
2222 | p = cachetable_insert_at( |
2223 | ct, |
2224 | cf, |
2225 | key, |
2226 | zero_value, |
2227 | fullhash, |
2228 | zero_attr, |
2229 | write_callback, |
2230 | CACHETABLE_CLEAN |
2231 | ); |
2232 | assert(p); |
2233 | p->value_rwlock.write_lock(true); |
2234 | pair_unlock(p); |
2235 | ct->list.write_list_unlock(); |
2236 | |
2237 | struct cachefile_prefetch_args *MALLOC(cpargs); |
2238 | cpargs->p = p; |
2239 | cpargs->fetch_callback = fetch_callback; |
2240 | cpargs->read_extraargs = read_extraargs; |
2241 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs); |
2242 | if (doing_prefetch) { |
2243 | *doing_prefetch = true; |
2244 | } |
2245 | goto exit; |
2246 | } |
2247 | |
2248 | found_pair: |
2249 | // at this point, p is found, pair's mutex is grabbed, and |
2250 | // no list lock is held |
2251 | // TODO(leif): should this also just go ahead and wait if all there |
2252 | // are to wait for are readers? |
2253 | if (p->value_rwlock.try_write_lock(true)) { |
2254 | // nobody else is using the node, so we should go ahead and prefetch |
2255 | pair_touch(p); |
2256 | pair_unlock(p); |
2257 | bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs); |
2258 | |
2259 | if (partial_fetch_required) { |
2260 | r = bjm_add_background_job(cf->bjm); |
2261 | assert_zero(r); |
2262 | struct cachefile_partial_prefetch_args *MALLOC(cpargs); |
2263 | cpargs->p = p; |
2264 | cpargs->pf_callback = pf_callback; |
2265 | cpargs->read_extraargs = read_extraargs; |
2266 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs); |
2267 | if (doing_prefetch) { |
2268 | *doing_prefetch = true; |
2269 | } |
2270 | } |
2271 | else { |
2272 | pair_lock(p); |
2273 | p->value_rwlock.write_unlock(); |
2274 | pair_unlock(p); |
2275 | } |
2276 | } |
2277 | else { |
2278 | // Couldn't get the write lock cheaply |
2279 | pair_unlock(p); |
2280 | } |
2281 | exit: |
2282 | return 0; |
2283 | } |
2284 | |
2285 | void toku_cachefile_verify (CACHEFILE cf) { |
2286 | toku_cachetable_verify(cf->cachetable); |
2287 | } |
2288 | |
2289 | void toku_cachetable_verify (CACHETABLE ct) { |
2290 | ct->list.verify(); |
2291 | } |
2292 | |
2293 | |
2294 | |
2295 | struct pair_flush_for_close{ |
2296 | PAIR p; |
2297 | BACKGROUND_JOB_MANAGER bjm; |
2298 | }; |
2299 | |
2300 | static void cachetable_flush_pair_for_close(void* ) { |
2301 | struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra); |
2302 | PAIR p = args->p; |
2303 | CACHEFILE cf = p->cachefile; |
2304 | CACHETABLE ct = cf->cachetable; |
2305 | PAIR_ATTR attr; |
2306 | cachetable_only_write_locked_data( |
2307 | &ct->ev, |
2308 | p, |
2309 | false, // not for a checkpoint, as we assert above |
2310 | &attr, |
2311 | false // not a clone |
2312 | ); |
2313 | p->dirty = CACHETABLE_CLEAN; |
2314 | bjm_remove_background_job(args->bjm); |
2315 | toku_free(args); |
2316 | } |
2317 | |
2318 | |
2319 | static void flush_pair_for_close_on_background_thread( |
2320 | PAIR p, |
2321 | BACKGROUND_JOB_MANAGER bjm, |
2322 | CACHETABLE ct |
2323 | ) |
2324 | { |
2325 | pair_lock(p); |
2326 | assert(p->value_rwlock.users() == 0); |
2327 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
2328 | assert(!p->cloned_value_data); |
2329 | if (p->dirty == CACHETABLE_DIRTY) { |
2330 | int r = bjm_add_background_job(bjm); |
2331 | assert_zero(r); |
2332 | struct pair_flush_for_close *XMALLOC(args); |
2333 | args->p = p; |
2334 | args->bjm = bjm; |
2335 | toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args); |
2336 | } |
2337 | pair_unlock(p); |
2338 | } |
2339 | |
2340 | static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) { |
2341 | pair_lock(p); |
2342 | assert(p->value_rwlock.users() == 0); |
2343 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
2344 | assert(!p->cloned_value_data); |
2345 | assert(p->dirty == CACHETABLE_CLEAN); |
2346 | assert(p->refcount == 0); |
2347 | if (completely) { |
2348 | cachetable_remove_pair(&ct->list, &ct->ev, p); |
2349 | pair_unlock(p); |
2350 | // TODO: Eventually, we should not hold the write list lock during free |
2351 | cachetable_free_pair(p); |
2352 | } |
2353 | else { |
2354 | // if we are not evicting completely, |
2355 | // we only want to remove the PAIR from the cachetable, |
2356 | // that is, remove from the hashtable and various linked |
2357 | // list, but we will keep the PAIRS and the linked list |
2358 | // in the cachefile intact, as they will be cached away |
2359 | // in case an open comes soon. |
2360 | ct->list.evict_from_cachetable(p); |
2361 | pair_unlock(p); |
2362 | } |
2363 | } |
2364 | |
2365 | // helper function for cachetable_flush_cachefile, which happens on a close |
2366 | // writes out the dirty pairs on background threads and returns when |
2367 | // the writing is done |
2368 | static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) { |
2369 | BACKGROUND_JOB_MANAGER bjm = NULL; |
2370 | bjm_init(&bjm); |
2371 | ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here |
2372 | PAIR p = NULL; |
2373 | // write out dirty PAIRs |
2374 | uint32_t i; |
2375 | if (cf) { |
2376 | for (i = 0, p = cf->cf_head; |
2377 | i < cf->num_pairs; |
2378 | i++, p = p->cf_next) |
2379 | { |
2380 | flush_pair_for_close_on_background_thread(p, bjm, ct); |
2381 | } |
2382 | } |
2383 | else { |
2384 | for (i = 0, p = ct->list.m_checkpoint_head; |
2385 | i < ct->list.m_n_in_table; |
2386 | i++, p = p->clock_next) |
2387 | { |
2388 | flush_pair_for_close_on_background_thread(p, bjm, ct); |
2389 | } |
2390 | } |
2391 | ct->list.write_list_unlock(); |
2392 | bjm_wait_for_jobs_to_finish(bjm); |
2393 | bjm_destroy(bjm); |
2394 | } |
2395 | |
2396 | static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { |
2397 | ct->list.write_list_lock(); |
2398 | if (cf) { |
2399 | if (evict_completely) { |
2400 | // if we are evicting completely, then the PAIRs will |
2401 | // be removed from the linked list managed by the |
2402 | // cachefile, so this while loop works |
2403 | while (cf->num_pairs > 0) { |
2404 | PAIR p = cf->cf_head; |
2405 | remove_pair_for_close(p, ct, evict_completely); |
2406 | } |
2407 | } |
2408 | else { |
2409 | // on the other hand, if we are not evicting completely, |
2410 | // then the cachefile's linked list stays intact, and we must |
2411 | // iterate like this. |
2412 | for (PAIR p = cf->cf_head; p; p = p->cf_next) { |
2413 | remove_pair_for_close(p, ct, evict_completely); |
2414 | } |
2415 | } |
2416 | } |
2417 | else { |
2418 | while (ct->list.m_n_in_table > 0) { |
2419 | PAIR p = ct->list.m_checkpoint_head; |
2420 | // if there is no cachefile, then we better |
2421 | // be evicting completely because we have no |
2422 | // cachefile to save the PAIRs to. At least, |
2423 | // we have no guarantees that the cachefile |
2424 | // will remain good |
2425 | invariant(evict_completely); |
2426 | remove_pair_for_close(p, ct, true); |
2427 | } |
2428 | } |
2429 | ct->list.write_list_unlock(); |
2430 | } |
2431 | |
2432 | static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) { |
2433 | #ifdef TOKU_DEBUG_PARANOID |
2434 | // assert here that cachefile is flushed by checking |
2435 | // pair_list and finding no pairs belonging to this cachefile |
2436 | // Make a list of pairs that belong to this cachefile. |
2437 | if (cf) { |
2438 | ct->list.write_list_lock(); |
2439 | // assert here that cachefile is flushed by checking |
2440 | // pair_list and finding no pairs belonging to this cachefile |
2441 | // Make a list of pairs that belong to this cachefile. |
2442 | uint32_t i; |
2443 | PAIR p = NULL; |
2444 | for (i = 0, p = ct->list.m_checkpoint_head; |
2445 | i < ct->list.m_n_in_table; |
2446 | i++, p = p->clock_next) |
2447 | { |
2448 | assert(p->cachefile != cf); |
2449 | } |
2450 | ct->list.write_list_unlock(); |
2451 | } |
2452 | #endif |
2453 | } |
2454 | |
2455 | // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if |
2456 | // the cachefile is NULL. |
2457 | // Must be holding cachetable lock on entry. |
2458 | // |
2459 | // This function assumes that no client thread is accessing or |
2460 | // trying to access the cachefile while this function is executing. |
2461 | // This implies no client thread will be trying to lock any nodes |
2462 | // belonging to the cachefile. |
2463 | // |
2464 | // This function also assumes that the cachefile is not in the process |
2465 | // of being used by a checkpoint. If a checkpoint is currently happening, |
2466 | // it does NOT include this cachefile. |
2467 | // |
2468 | static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { |
2469 | // |
2470 | // Because work on a kibbutz is always done by the client thread, |
2471 | // and this function assumes that no client thread is doing any work |
2472 | // on the cachefile, we assume that no client thread will be adding jobs |
2473 | // to this cachefile's kibbutz. |
2474 | // |
2475 | // The caller of this function must ensure that there are |
2476 | // no jobs added to the kibbutz. This implies that the only work other |
2477 | // threads may be doing is work by the writer threads. |
2478 | // |
2479 | // first write out dirty PAIRs |
2480 | write_dirty_pairs_for_close(ct, cf); |
2481 | |
2482 | // now that everything is clean, get rid of everything |
2483 | remove_all_pairs_for_close(ct, cf, evict_completely); |
2484 | |
2485 | verify_cachefile_flushed(ct, cf); |
2486 | } |
2487 | |
2488 | /* Requires that no locks be held that are used by the checkpoint logic */ |
2489 | void |
2490 | toku_cachetable_minicron_shutdown(CACHETABLE ct) { |
2491 | int r = ct->cp.shutdown(); |
2492 | assert(r==0); |
2493 | ct->cl.destroy(); |
2494 | } |
2495 | |
2496 | void toku_cachetable_prepare_close(CACHETABLE ct UU()) { |
2497 | extern bool toku_serialize_in_parallel; |
2498 | toku_unsafe_set(&toku_serialize_in_parallel, true); |
2499 | } |
2500 | |
2501 | /* Requires that it all be flushed. */ |
2502 | void toku_cachetable_close (CACHETABLE *ctp) { |
2503 | CACHETABLE ct = *ctp; |
2504 | ct->cp.destroy(); |
2505 | ct->cl.destroy(); |
2506 | ct->cf_list.free_stale_data(&ct->ev); |
2507 | cachetable_flush_cachefile(ct, NULL, true); |
2508 | ct->ev.destroy(); |
2509 | ct->list.destroy(); |
2510 | ct->cf_list.destroy(); |
2511 | |
2512 | if (ct->client_kibbutz) |
2513 | toku_kibbutz_destroy(ct->client_kibbutz); |
2514 | if (ct->ct_kibbutz) |
2515 | toku_kibbutz_destroy(ct->ct_kibbutz); |
2516 | if (ct->checkpointing_kibbutz) |
2517 | toku_kibbutz_destroy(ct->checkpointing_kibbutz); |
2518 | toku_free(ct->env_dir); |
2519 | toku_free(ct); |
2520 | *ctp = 0; |
2521 | } |
2522 | |
2523 | static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) { |
2524 | CACHETABLE ct = cachefile->cachetable; |
2525 | |
2526 | if (!have_ct_lock) { |
2527 | ct->list.read_list_lock(); |
2528 | } |
2529 | |
2530 | PAIR p = ct->list.find_pair(cachefile, key, fullhash); |
2531 | assert(p != NULL); |
2532 | if (!have_ct_lock) { |
2533 | ct->list.read_list_unlock(); |
2534 | } |
2535 | return p; |
2536 | } |
2537 | |
2538 | //test-only wrapper |
2539 | int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
2540 | // By default we don't have the lock |
2541 | PAIR p = test_get_pair(cachefile, key, fullhash, false); |
2542 | return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock |
2543 | } |
2544 | |
2545 | //test-only wrapper |
2546 | int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { |
2547 | // We hold the cachetable mutex. |
2548 | PAIR p = test_get_pair(cachefile, key, fullhash, true); |
2549 | return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr); |
2550 | } |
2551 | |
2552 | //test-only wrapper |
2553 | int toku_test_cachetable_unpin_and_remove ( |
2554 | CACHEFILE cachefile, |
2555 | CACHEKEY key, |
2556 | CACHETABLE_REMOVE_KEY remove_key, |
2557 | void* ) |
2558 | { |
2559 | uint32_t fullhash = toku_cachetable_hash(cachefile, key); |
2560 | PAIR p = test_get_pair(cachefile, key, fullhash, false); |
2561 | return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra); |
2562 | } |
2563 | |
2564 | int toku_cachetable_unpin_and_remove ( |
2565 | CACHEFILE cachefile, |
2566 | PAIR p, |
2567 | CACHETABLE_REMOVE_KEY remove_key, |
2568 | void* |
2569 | ) |
2570 | { |
2571 | invariant_notnull(p); |
2572 | int r = ENOENT; |
2573 | CACHETABLE ct = cachefile->cachetable; |
2574 | |
2575 | p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it. |
2576 | // grab disk_nb_mutex to ensure any background thread writing |
2577 | // out a cloned value completes |
2578 | pair_lock(p); |
2579 | assert(p->value_rwlock.writers()); |
2580 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
2581 | pair_unlock(p); |
2582 | assert(p->cloned_value_data == NULL); |
2583 | |
2584 | // |
2585 | // take care of key removal |
2586 | // |
2587 | ct->list.write_list_lock(); |
2588 | ct->list.read_pending_cheap_lock(); |
2589 | bool for_checkpoint = p->checkpoint_pending; |
2590 | // now let's wipe out the pending bit, because we are |
2591 | // removing the PAIR |
2592 | p->checkpoint_pending = false; |
2593 | |
2594 | // For the PAIR to not be picked by the |
2595 | // cleaner thread, we mark the cachepressure_size to be 0 |
2596 | // (This is redundant since we have the write_list_lock) |
2597 | // This should not be an issue because we call |
2598 | // cachetable_remove_pair before |
2599 | // releasing the cachetable lock. |
2600 | // |
2601 | CACHEKEY key_to_remove = p->key; |
2602 | p->attr.cache_pressure_size = 0; |
2603 | // |
2604 | // callback for removing the key |
2605 | // for FTNODEs, this leads to calling |
2606 | // toku_free_blocknum |
2607 | // |
2608 | if (remove_key) { |
2609 | remove_key( |
2610 | &key_to_remove, |
2611 | for_checkpoint, |
2612 | remove_key_extra |
2613 | ); |
2614 | } |
2615 | ct->list.read_pending_cheap_unlock(); |
2616 | |
2617 | pair_lock(p); |
2618 | p->value_rwlock.write_unlock(); |
2619 | nb_mutex_unlock(&p->disk_nb_mutex); |
2620 | // |
2621 | // As of Clayface (6.5), only these threads may be |
2622 | // blocked waiting to lock this PAIR: |
2623 | // - the checkpoint thread (because a checkpoint is in progress |
2624 | // and the PAIR was in the list of pending pairs) |
2625 | // - a client thread running get_and_pin_nonblocking, who |
2626 | // ran unlockers, then waited on the PAIR lock. |
2627 | // While waiting on a PAIR lock, another thread comes in, |
2628 | // locks the PAIR, and ends up calling unpin_and_remove, |
2629 | // all while get_and_pin_nonblocking is waiting on the PAIR lock. |
2630 | // We did not realize this at first, which caused bug #4357 |
2631 | // The following threads CANNOT be blocked waiting on |
2632 | // the PAIR lock: |
2633 | // - a thread trying to run eviction via run_eviction. |
2634 | // That cannot happen because run_eviction only |
2635 | // attempts to lock PAIRS that are not locked, and this PAIR |
2636 | // is locked. |
2637 | // - cleaner thread, for the same reason as a thread running |
2638 | // eviction |
2639 | // - client thread doing a normal get_and_pin. The client is smart |
2640 | // enough to not try to lock a PAIR that another client thread |
2641 | // is trying to unpin and remove. Note that this includes work |
2642 | // done on kibbutzes. |
2643 | // - writer thread. Writer threads do not grab PAIR locks. They |
2644 | // get PAIR locks transferred to them by client threads. |
2645 | // |
2646 | |
2647 | // first thing we do is remove the PAIR from the various |
2648 | // cachetable data structures, so no other thread can possibly |
2649 | // access it. We do not want to risk some other thread |
2650 | // trying to lock this PAIR if we release the write list lock |
2651 | // below. If some thread is already waiting on the lock, |
2652 | // then we let that thread grab the lock and finish, but |
2653 | // we don't want any NEW threads to try to grab the PAIR |
2654 | // lock. |
2655 | // |
2656 | // Because we call cachetable_remove_pair and wait, |
2657 | // the threads that may be waiting |
2658 | // on this PAIR lock must be careful to do NOTHING with the PAIR |
2659 | // As per our analysis above, we only need |
2660 | // to make sure the checkpoint thread and get_and_pin_nonblocking do |
2661 | // nothing, and looking at those functions, it is clear they do nothing. |
2662 | // |
2663 | cachetable_remove_pair(&ct->list, &ct->ev, p); |
2664 | ct->list.write_list_unlock(); |
2665 | if (p->refcount > 0) { |
2666 | pair_wait_for_ref_release_unlocked(p); |
2667 | } |
2668 | if (p->value_rwlock.users() > 0) { |
2669 | // Need to wait for everyone else to leave |
2670 | // This write lock will be granted only after all waiting |
2671 | // threads are done. |
2672 | p->value_rwlock.write_lock(true); |
2673 | assert(p->refcount == 0); |
2674 | assert(p->value_rwlock.users() == 1); // us |
2675 | assert(!p->checkpoint_pending); |
2676 | assert(p->attr.cache_pressure_size == 0); |
2677 | p->value_rwlock.write_unlock(); |
2678 | } |
2679 | // just a sanity check |
2680 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
2681 | assert(p->cloned_value_data == NULL); |
2682 | //Remove pair. |
2683 | pair_unlock(p); |
2684 | cachetable_free_pair(p); |
2685 | r = 0; |
2686 | return r; |
2687 | } |
2688 | |
2689 | int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array); |
2690 | int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) { |
2691 | array[index] = toku_cachefile_filenum(ft->cf); |
2692 | return 0; |
2693 | } |
2694 | |
2695 | static int log_open_txn (TOKUTXN txn, void* ) { |
2696 | int r; |
2697 | checkpointer* cp = (checkpointer *)extra; |
2698 | TOKULOGGER logger = txn->logger; |
2699 | FILENUMS open_filenums; |
2700 | uint32_t num_filenums = txn->open_fts.size(); |
2701 | FILENUM array[num_filenums]; |
2702 | if (toku_txn_is_read_only(txn)) { |
2703 | goto cleanup; |
2704 | } |
2705 | else { |
2706 | cp->increment_num_txns(); |
2707 | } |
2708 | |
2709 | open_filenums.num = num_filenums; |
2710 | open_filenums.filenums = array; |
2711 | //Fill in open_filenums |
2712 | r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array); |
2713 | invariant(r==0); |
2714 | switch (toku_txn_get_state(txn)) { |
2715 | case TOKUTXN_LIVE:{ |
2716 | toku_log_xstillopen(logger, NULL, 0, txn, |
2717 | toku_txn_get_txnid(txn), |
2718 | toku_txn_get_txnid(toku_logger_txn_parent(txn)), |
2719 | txn->roll_info.rollentry_raw_count, |
2720 | open_filenums, |
2721 | txn->force_fsync_on_commit, |
2722 | txn->roll_info.num_rollback_nodes, |
2723 | txn->roll_info.num_rollentries, |
2724 | txn->roll_info.spilled_rollback_head, |
2725 | txn->roll_info.spilled_rollback_tail, |
2726 | txn->roll_info.current_rollback); |
2727 | goto cleanup; |
2728 | } |
2729 | case TOKUTXN_PREPARING: { |
2730 | TOKU_XA_XID xa_xid; |
2731 | toku_txn_get_prepared_xa_xid(txn, &xa_xid); |
2732 | toku_log_xstillopenprepared(logger, NULL, 0, txn, |
2733 | toku_txn_get_txnid(txn), |
2734 | &xa_xid, |
2735 | txn->roll_info.rollentry_raw_count, |
2736 | open_filenums, |
2737 | txn->force_fsync_on_commit, |
2738 | txn->roll_info.num_rollback_nodes, |
2739 | txn->roll_info.num_rollentries, |
2740 | txn->roll_info.spilled_rollback_head, |
2741 | txn->roll_info.spilled_rollback_tail, |
2742 | txn->roll_info.current_rollback); |
2743 | goto cleanup; |
2744 | } |
2745 | case TOKUTXN_RETIRED: |
2746 | case TOKUTXN_COMMITTING: |
2747 | case TOKUTXN_ABORTING: { |
2748 | assert(0); |
2749 | } |
2750 | } |
2751 | // default is an error |
2752 | assert(0); |
2753 | cleanup: |
2754 | return 0; |
2755 | } |
2756 | |
2757 | // Requires: All three checkpoint-relevant locks must be held (see checkpoint.c). |
2758 | // Algorithm: Write a checkpoint record to the log, noting the LSN of that record. |
2759 | // Use the begin_checkpoint callback to take necessary snapshots (header, btt) |
2760 | // Mark every dirty node as "pending." ("Pending" means that the node must be |
2761 | // written to disk before it can be modified.) |
2762 | void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) { |
2763 | cp->begin_checkpoint(); |
2764 | } |
2765 | |
2766 | |
2767 | // This is used by the cachetable_race test. |
2768 | static volatile int toku_checkpointing_user_data_status = 0; |
2769 | static void toku_cachetable_set_checkpointing_user_data_status (int v) { |
2770 | toku_checkpointing_user_data_status = v; |
2771 | } |
2772 | int toku_cachetable_get_checkpointing_user_data_status (void) { |
2773 | return toku_checkpointing_user_data_status; |
2774 | } |
2775 | |
2776 | // Requires: The big checkpoint lock must be held (see checkpoint.c). |
2777 | // Algorithm: Write all pending nodes to disk |
2778 | // Use checkpoint callback to write snapshot information to disk (header, btt) |
2779 | // Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks |
2780 | // Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log |
2781 | void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger), |
2782 | void (*testcallback_f)(void*), void* ) { |
2783 | cp->end_checkpoint(testcallback_f, testextra); |
2784 | } |
2785 | |
2786 | TOKULOGGER toku_cachefile_logger (CACHEFILE cf) { |
2787 | return cf->cachetable->cp.get_logger(); |
2788 | } |
2789 | |
2790 | FILENUM toku_cachefile_filenum (CACHEFILE cf) { |
2791 | return cf->filenum; |
2792 | } |
2793 | |
2794 | // debug functions |
2795 | |
2796 | int toku_cachetable_assert_all_unpinned (CACHETABLE ct) { |
2797 | uint32_t i; |
2798 | int some_pinned=0; |
2799 | ct->list.read_list_lock(); |
2800 | for (i=0; i<ct->list.m_table_size; i++) { |
2801 | PAIR p; |
2802 | for (p=ct->list.m_table[i]; p; p=p->hash_chain) { |
2803 | pair_lock(p); |
2804 | if (p->value_rwlock.users()) { |
2805 | //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data); |
2806 | some_pinned=1; |
2807 | } |
2808 | pair_unlock(p); |
2809 | } |
2810 | } |
2811 | ct->list.read_list_unlock(); |
2812 | return some_pinned; |
2813 | } |
2814 | |
2815 | int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) { |
2816 | assert(cf != NULL); |
2817 | int n_pinned=0; |
2818 | CACHETABLE ct = cf->cachetable; |
2819 | ct->list.read_list_lock(); |
2820 | |
2821 | // Iterate over all the pairs to find pairs specific to the |
2822 | // given cachefile. |
2823 | for (uint32_t i = 0; i < ct->list.m_table_size; i++) { |
2824 | for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) { |
2825 | if (p->cachefile == cf) { |
2826 | pair_lock(p); |
2827 | if (p->value_rwlock.users()) { |
2828 | if (print_them) { |
2829 | printf("%s:%d pinned: %" PRId64 " (%p)\n" , |
2830 | __FILE__, |
2831 | __LINE__, |
2832 | p->key.b, |
2833 | p->value_data); |
2834 | } |
2835 | n_pinned++; |
2836 | } |
2837 | pair_unlock(p); |
2838 | } |
2839 | } |
2840 | } |
2841 | |
2842 | ct->list.read_list_unlock(); |
2843 | return n_pinned; |
2844 | } |
2845 | |
2846 | void toku_cachetable_print_state (CACHETABLE ct) { |
2847 | uint32_t i; |
2848 | ct->list.read_list_lock(); |
2849 | for (i=0; i<ct->list.m_table_size; i++) { |
2850 | PAIR p = ct->list.m_table[i]; |
2851 | if (p != 0) { |
2852 | pair_lock(p); |
2853 | printf("t[%u]=" , i); |
2854 | for (p=ct->list.m_table[i]; p; p=p->hash_chain) { |
2855 | printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}" , p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size); |
2856 | } |
2857 | printf("\n" ); |
2858 | pair_unlock(p); |
2859 | } |
2860 | } |
2861 | ct->list.read_list_unlock(); |
2862 | } |
2863 | |
2864 | void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) { |
2865 | ct->list.get_state(num_entries_ptr, hash_size_ptr); |
2866 | ct->ev.get_state(size_current_ptr, size_limit_ptr); |
2867 | } |
2868 | |
2869 | int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr, |
2870 | int *dirty_ptr, long long *pin_ptr, long *size_ptr) { |
2871 | int r = -1; |
2872 | uint32_t fullhash = toku_cachetable_hash(cf, key); |
2873 | ct->list.read_list_lock(); |
2874 | PAIR p = ct->list.find_pair(cf, key, fullhash); |
2875 | if (p) { |
2876 | pair_lock(p); |
2877 | if (value_ptr) |
2878 | *value_ptr = p->value_data; |
2879 | if (dirty_ptr) |
2880 | *dirty_ptr = p->dirty; |
2881 | if (pin_ptr) |
2882 | *pin_ptr = p->value_rwlock.users(); |
2883 | if (size_ptr) |
2884 | *size_ptr = p->attr.size; |
2885 | r = 0; |
2886 | pair_unlock(p); |
2887 | } |
2888 | ct->list.read_list_unlock(); |
2889 | return r; |
2890 | } |
2891 | |
2892 | void |
2893 | toku_cachefile_set_userdata (CACHEFILE cf, |
2894 | void *userdata, |
2895 | void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), |
2896 | void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), |
2897 | void (*free_userdata)(CACHEFILE, void*), |
2898 | void (*checkpoint_userdata)(CACHEFILE, int, void*), |
2899 | void (*begin_checkpoint_userdata)(LSN, void*), |
2900 | void (*end_checkpoint_userdata)(CACHEFILE, int, void*), |
2901 | void (*note_pin_by_checkpoint)(CACHEFILE, void*), |
2902 | void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { |
2903 | cf->userdata = userdata; |
2904 | cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; |
2905 | cf->close_userdata = close_userdata; |
2906 | cf->free_userdata = free_userdata; |
2907 | cf->checkpoint_userdata = checkpoint_userdata; |
2908 | cf->begin_checkpoint_userdata = begin_checkpoint_userdata; |
2909 | cf->end_checkpoint_userdata = end_checkpoint_userdata; |
2910 | cf->note_pin_by_checkpoint = note_pin_by_checkpoint; |
2911 | cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint; |
2912 | } |
2913 | |
2914 | void *toku_cachefile_get_userdata(CACHEFILE cf) { |
2915 | return cf->userdata; |
2916 | } |
2917 | |
2918 | CACHETABLE |
2919 | toku_cachefile_get_cachetable(CACHEFILE cf) { |
2920 | return cf->cachetable; |
2921 | } |
2922 | |
2923 | CACHEFILE toku_pair_get_cachefile(PAIR pair) { |
2924 | return pair->cachefile; |
2925 | } |
2926 | |
2927 | //Only called by ft_end_checkpoint |
2928 | //Must have access to cf->fd (must be protected) |
2929 | void toku_cachefile_fsync(CACHEFILE cf) { |
2930 | toku_file_fsync(cf->fd); |
2931 | } |
2932 | |
2933 | // Make it so when the cachefile closes, the underlying file is unlinked |
2934 | void toku_cachefile_unlink_on_close(CACHEFILE cf) { |
2935 | assert(!cf->unlink_on_close); |
2936 | cf->unlink_on_close = true; |
2937 | } |
2938 | |
2939 | // is this cachefile marked as unlink on close? |
2940 | bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) { |
2941 | return cf->unlink_on_close; |
2942 | } |
2943 | |
2944 | void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) { |
2945 | cf->skip_log_recover_on_close = true; |
2946 | } |
2947 | |
2948 | void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) { |
2949 | cf->skip_log_recover_on_close = false; |
2950 | } |
2951 | |
2952 | bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) { |
2953 | return cf->skip_log_recover_on_close; |
2954 | } |
2955 | |
2956 | uint64_t toku_cachefile_size(CACHEFILE cf) { |
2957 | int64_t file_size; |
2958 | int fd = toku_cachefile_get_fd(cf); |
2959 | int r = toku_os_get_file_size(fd, &file_size); |
2960 | assert_zero(r); |
2961 | return file_size; |
2962 | } |
2963 | |
2964 | char * |
2965 | toku_construct_full_name(int count, ...) { |
2966 | va_list ap; |
2967 | char *name = NULL; |
2968 | size_t n = 0; |
2969 | int i; |
2970 | va_start(ap, count); |
2971 | for (i=0; i<count; i++) { |
2972 | char *arg = va_arg(ap, char *); |
2973 | if (arg) { |
2974 | n += 1 + strlen(arg) + 1; |
2975 | char *XMALLOC_N(n, newname); |
2976 | if (name && !toku_os_is_absolute_name(arg)) |
2977 | snprintf(newname, n, "%s/%s" , name, arg); |
2978 | else |
2979 | snprintf(newname, n, "%s" , arg); |
2980 | toku_free(name); |
2981 | name = newname; |
2982 | } |
2983 | } |
2984 | va_end(ap); |
2985 | |
2986 | return name; |
2987 | } |
2988 | |
2989 | char * |
2990 | toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) { |
2991 | return toku_construct_full_name(2, ct->env_dir, fname_in_env); |
2992 | } |
2993 | |
2994 | static long |
2995 | cleaner_thread_rate_pair(PAIR p) |
2996 | { |
2997 | return p->attr.cache_pressure_size; |
2998 | } |
2999 | |
3000 | static int const CLEANER_N_TO_CHECK = 8; |
3001 | |
3002 | int toku_cleaner_thread_for_test (CACHETABLE ct) { |
3003 | return ct->cl.run_cleaner(); |
3004 | } |
3005 | |
3006 | int toku_cleaner_thread (void *cleaner_v) { |
3007 | cleaner* cl = (cleaner *) cleaner_v; |
3008 | assert(cl); |
3009 | return cl->run_cleaner(); |
3010 | } |
3011 | |
3012 | ///////////////////////////////////////////////////////////////////////// |
3013 | // |
3014 | // cleaner methods |
3015 | // |
3016 | ENSURE_POD(cleaner); |
3017 | |
3018 | int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) { |
3019 | // default is no cleaner, for now |
3020 | m_cleaner_cron_init = false; |
3021 | int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this); |
3022 | if (r == 0) { |
3023 | m_cleaner_cron_init = true; |
3024 | } |
3025 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations); |
3026 | m_cleaner_iterations = _cleaner_iterations; |
3027 | m_pl = _pl; |
3028 | m_ct = _ct; |
3029 | m_cleaner_init = true; |
3030 | return r; |
3031 | } |
3032 | |
3033 | // this function is allowed to be called multiple times |
3034 | void cleaner::destroy(void) { |
3035 | if (!m_cleaner_init) { |
3036 | return; |
3037 | } |
3038 | if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) { |
3039 | // for test code only, production code uses toku_cachetable_minicron_shutdown() |
3040 | int r = toku_minicron_shutdown(&m_cleaner_cron); |
3041 | assert(r==0); |
3042 | } |
3043 | } |
3044 | |
3045 | uint32_t cleaner::get_iterations(void) { |
3046 | return m_cleaner_iterations; |
3047 | } |
3048 | |
3049 | void cleaner::set_iterations(uint32_t new_iterations) { |
3050 | m_cleaner_iterations = new_iterations; |
3051 | } |
3052 | |
3053 | uint32_t cleaner::get_period_unlocked(void) { |
3054 | return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron); |
3055 | } |
3056 | |
3057 | // |
3058 | // Sets how often the cleaner thread will run, in seconds |
3059 | // |
3060 | void cleaner::set_period(uint32_t new_period) { |
3061 | toku_minicron_change_period(&m_cleaner_cron, new_period*1000); |
3062 | } |
3063 | |
3064 | // Effect: runs a cleaner. |
3065 | // |
3066 | // We look through some number of nodes, the first N that we see which are |
3067 | // unlocked and are not involved in a cachefile flush, pick one, and call |
3068 | // the cleaner callback. While we're picking a node, we have the |
3069 | // cachetable lock the whole time, so we don't need any extra |
3070 | // synchronization. Once we have one we want, we lock it and notify the |
3071 | // cachefile that we're doing some background work (so a flush won't |
3072 | // start). At this point, we can safely unlock the cachetable, do the |
3073 | // work (callback), and unlock/release our claim to the cachefile. |
3074 | int cleaner::run_cleaner(void) { |
3075 | toku::context cleaner_ctx(CTX_CLEANER); |
3076 | |
3077 | int r; |
3078 | uint32_t num_iterations = this->get_iterations(); |
3079 | for (uint32_t i = 0; i < num_iterations; ++i) { |
3080 | cleaner_executions++; |
3081 | m_pl->read_list_lock(); |
3082 | PAIR best_pair = NULL; |
3083 | int n_seen = 0; |
3084 | long best_score = 0; |
3085 | const PAIR first_pair = m_pl->m_cleaner_head; |
3086 | if (first_pair == NULL) { |
3087 | // nothing in the cachetable, just get out now |
3088 | m_pl->read_list_unlock(); |
3089 | break; |
3090 | } |
3091 | // here we select a PAIR for cleaning |
3092 | // look at some number of PAIRS, and |
3093 | // pick what we think is the best one for cleaning |
3094 | //***** IMPORTANT ****** |
3095 | // we MUST not pick a PAIR whose rating is 0. We have |
3096 | // numerous assumptions in other parts of the code that |
3097 | // this is the case: |
3098 | // - this is how rollback nodes and leaf nodes are not selected for cleaning |
3099 | // - this is how a thread that is calling unpin_and_remove will prevent |
3100 | // the cleaner thread from picking its PAIR (see comments in that function) |
3101 | do { |
3102 | // |
3103 | // We are already holding onto best_pair, if we run across a pair that |
3104 | // has the same mutex due to a collision in the hashtable, we need |
3105 | // to be careful. |
3106 | // |
3107 | if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) { |
3108 | // Advance the cleaner head. |
3109 | long score = 0; |
3110 | // only bother with this pair if it has no current users |
3111 | if (m_pl->m_cleaner_head->value_rwlock.users() == 0) { |
3112 | score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); |
3113 | if (score > best_score) { |
3114 | best_score = score; |
3115 | best_pair = m_pl->m_cleaner_head; |
3116 | } |
3117 | } |
3118 | m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; |
3119 | continue; |
3120 | } |
3121 | pair_lock(m_pl->m_cleaner_head); |
3122 | if (m_pl->m_cleaner_head->value_rwlock.users() > 0) { |
3123 | pair_unlock(m_pl->m_cleaner_head); |
3124 | } |
3125 | else { |
3126 | n_seen++; |
3127 | long score = 0; |
3128 | score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); |
3129 | if (score > best_score) { |
3130 | best_score = score; |
3131 | // Since we found a new best pair, we need to |
3132 | // free the old best pair. |
3133 | if (best_pair) { |
3134 | pair_unlock(best_pair); |
3135 | } |
3136 | best_pair = m_pl->m_cleaner_head; |
3137 | } |
3138 | else { |
3139 | pair_unlock(m_pl->m_cleaner_head); |
3140 | } |
3141 | } |
3142 | // Advance the cleaner head. |
3143 | m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; |
3144 | } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK); |
3145 | m_pl->read_list_unlock(); |
3146 | |
3147 | // |
3148 | // at this point, if we have found a PAIR for cleaning, |
3149 | // that is, best_pair != NULL, we do the clean |
3150 | // |
3151 | // if best_pair !=NULL, then best_pair->mutex is held |
3152 | // no list lock is held |
3153 | // |
3154 | if (best_pair) { |
3155 | CACHEFILE cf = best_pair->cachefile; |
3156 | // try to add a background job to the manager |
3157 | // if we can't, that means the cachefile is flushing, so |
3158 | // we simply continue the for loop and this iteration |
3159 | // becomes a no-op |
3160 | r = bjm_add_background_job(cf->bjm); |
3161 | if (r) { |
3162 | pair_unlock(best_pair); |
3163 | continue; |
3164 | } |
3165 | best_pair->value_rwlock.write_lock(true); |
3166 | pair_unlock(best_pair); |
3167 | // verify a key assumption. |
3168 | assert(cleaner_thread_rate_pair(best_pair) > 0); |
3169 | // check the checkpoint_pending bit |
3170 | m_pl->read_pending_cheap_lock(); |
3171 | bool checkpoint_pending = best_pair->checkpoint_pending; |
3172 | best_pair->checkpoint_pending = false; |
3173 | m_pl->read_pending_cheap_unlock(); |
3174 | if (checkpoint_pending) { |
3175 | write_locked_pair_for_checkpoint(m_ct, best_pair, true); |
3176 | } |
3177 | |
3178 | bool cleaner_callback_called = false; |
3179 | |
3180 | // it's theoretically possible that after writing a PAIR for checkpoint, the |
3181 | // PAIR's heuristic tells us nothing needs to be done. It is not possible |
3182 | // in Dr. Noga, but unit tests verify this behavior works properly. |
3183 | if (cleaner_thread_rate_pair(best_pair) > 0) { |
3184 | r = best_pair->cleaner_callback(best_pair->value_data, |
3185 | best_pair->key, |
3186 | best_pair->fullhash, |
3187 | best_pair->write_extraargs); |
3188 | assert_zero(r); |
3189 | cleaner_callback_called = true; |
3190 | } |
3191 | |
3192 | // The cleaner callback must have unlocked the pair, so we |
3193 | // don't need to unlock it if the cleaner callback is called. |
3194 | if (!cleaner_callback_called) { |
3195 | pair_lock(best_pair); |
3196 | best_pair->value_rwlock.write_unlock(); |
3197 | pair_unlock(best_pair); |
3198 | } |
3199 | // We need to make sure the cachefile sticks around so a close |
3200 | // can't come destroy it. That's the purpose of this |
3201 | // "add/remove_background_job" business, which means the |
3202 | // cachefile is still valid here, even though the cleaner |
3203 | // callback unlocks the pair. |
3204 | bjm_remove_background_job(cf->bjm); |
3205 | } |
3206 | else { |
3207 | // If we didn't find anything this time around the cachetable, |
3208 | // we probably won't find anything if we run around again, so |
3209 | // just break out from the for-loop now and |
3210 | // we'll try again when the cleaner thread runs again. |
3211 | break; |
3212 | } |
3213 | } |
3214 | return 0; |
3215 | } |
3216 | |
3217 | static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD" ); |
3218 | |
3219 | const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20; |
3220 | uint32_t PAIR_LOCK_SIZE = 1<<20; |
3221 | |
3222 | void toku_pair_list_set_lock_size(uint32_t num_locks) { |
3223 | PAIR_LOCK_SIZE = num_locks; |
3224 | } |
3225 | |
3226 | static void evict_pair_from_cachefile(PAIR p) { |
3227 | CACHEFILE cf = p->cachefile; |
3228 | if (p->cf_next) { |
3229 | p->cf_next->cf_prev = p->cf_prev; |
3230 | } |
3231 | if (p->cf_prev) { |
3232 | p->cf_prev->cf_next = p->cf_next; |
3233 | } |
3234 | else if (p->cachefile->cf_head == p) { |
3235 | cf->cf_head = p->cf_next; |
3236 | } |
3237 | p->cf_prev = p->cf_next = NULL; |
3238 | cf->num_pairs--; |
3239 | } |
3240 | |
3241 | // Allocates the hash table of pairs inside this pair list. |
3242 | // |
3243 | void pair_list::init() { |
3244 | m_table_size = INITIAL_PAIR_LIST_SIZE; |
3245 | m_num_locks = PAIR_LOCK_SIZE; |
3246 | m_n_in_table = 0; |
3247 | m_clock_head = NULL; |
3248 | m_cleaner_head = NULL; |
3249 | m_checkpoint_head = NULL; |
3250 | m_pending_head = NULL; |
3251 | m_table = NULL; |
3252 | |
3253 | |
3254 | pthread_rwlockattr_t attr; |
3255 | pthread_rwlockattr_init(&attr); |
3256 | #if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP) |
3257 | pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); |
3258 | #else |
3259 | // TODO: need to figure out how to make writer-preferential rwlocks |
3260 | // happen on osx |
3261 | #endif |
3262 | toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr); |
3263 | toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key, |
3264 | &m_pending_lock_expensive, |
3265 | &attr); |
3266 | toku_pthread_rwlock_init( |
3267 | *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr); |
3268 | XCALLOC_N(m_table_size, m_table); |
3269 | XCALLOC_N(m_num_locks, m_mutexes); |
3270 | for (uint64_t i = 0; i < m_num_locks; i++) { |
3271 | toku_mutex_init( |
3272 | #ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX |
3273 | *cachetable_m_mutex_key, |
3274 | #else |
3275 | toku_uninstrumented, |
3276 | #endif |
3277 | &m_mutexes[i].aligned_mutex, |
3278 | nullptr); |
3279 | } |
3280 | } |
3281 | |
3282 | // Frees the pair_list hash table. It is expected to be empty by |
3283 | // the time this is called. Returns an error if there are any |
3284 | // pairs in any of the hash table slots. |
3285 | void pair_list::destroy() { |
3286 | // Check if any entries exist in the hash table. |
3287 | for (uint32_t i = 0; i < m_table_size; ++i) { |
3288 | invariant_null(m_table[i]); |
3289 | } |
3290 | for (uint64_t i = 0; i < m_num_locks; i++) { |
3291 | toku_mutex_destroy(&m_mutexes[i].aligned_mutex); |
3292 | } |
3293 | toku_pthread_rwlock_destroy(&m_list_lock); |
3294 | toku_pthread_rwlock_destroy(&m_pending_lock_expensive); |
3295 | toku_pthread_rwlock_destroy(&m_pending_lock_cheap); |
3296 | toku_free(m_table); |
3297 | toku_free(m_mutexes); |
3298 | } |
3299 | |
3300 | // adds a PAIR to the cachetable's structures, |
3301 | // but does NOT add it to the list maintained by |
3302 | // the cachefile |
3303 | void pair_list::add_to_cachetable_only(PAIR p) { |
3304 | // sanity check to make sure that the PAIR does not already exist |
3305 | PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash); |
3306 | assert(pp == NULL); |
3307 | |
3308 | this->add_to_clock(p); |
3309 | this->add_to_hash_chain(p); |
3310 | m_n_in_table++; |
3311 | } |
3312 | |
3313 | // This places the given pair inside of the pair list. |
3314 | // |
3315 | // requires caller to have grabbed write lock on list. |
3316 | // requires caller to have p->mutex held as well |
3317 | // |
3318 | void pair_list::put(PAIR p) { |
3319 | this->add_to_cachetable_only(p); |
3320 | this->add_to_cf_list(p); |
3321 | } |
3322 | |
3323 | // This removes the given pair from completely from the pair list. |
3324 | // |
3325 | // requires caller to have grabbed write lock on list, and p->mutex held |
3326 | // |
3327 | void pair_list::evict_completely(PAIR p) { |
3328 | this->evict_from_cachetable(p); |
3329 | this->evict_from_cachefile(p); |
3330 | } |
3331 | |
3332 | // Removes the PAIR from the cachetable's lists, |
3333 | // but does NOT impact the list maintained by the cachefile |
3334 | void pair_list::evict_from_cachetable(PAIR p) { |
3335 | this->pair_remove(p); |
3336 | this->pending_pairs_remove(p); |
3337 | this->remove_from_hash_chain(p); |
3338 | |
3339 | assert(m_n_in_table > 0); |
3340 | m_n_in_table--; |
3341 | } |
3342 | |
3343 | // Removes the PAIR from the cachefile's list of PAIRs |
3344 | void pair_list::evict_from_cachefile(PAIR p) { |
3345 | evict_pair_from_cachefile(p); |
3346 | } |
3347 | |
3348 | // |
3349 | // Remove pair from linked list for cleaner/clock |
3350 | // |
3351 | // |
3352 | // requires caller to have grabbed write lock on list. |
3353 | // |
3354 | void pair_list::pair_remove (PAIR p) { |
3355 | if (p->clock_prev == p) { |
3356 | invariant(m_clock_head == p); |
3357 | invariant(p->clock_next == p); |
3358 | invariant(m_cleaner_head == p); |
3359 | invariant(m_checkpoint_head == p); |
3360 | m_clock_head = NULL; |
3361 | m_cleaner_head = NULL; |
3362 | m_checkpoint_head = NULL; |
3363 | } |
3364 | else { |
3365 | if (p == m_clock_head) { |
3366 | m_clock_head = m_clock_head->clock_next; |
3367 | } |
3368 | if (p == m_cleaner_head) { |
3369 | m_cleaner_head = m_cleaner_head->clock_next; |
3370 | } |
3371 | if (p == m_checkpoint_head) { |
3372 | m_checkpoint_head = m_checkpoint_head->clock_next; |
3373 | } |
3374 | p->clock_prev->clock_next = p->clock_next; |
3375 | p->clock_next->clock_prev = p->clock_prev; |
3376 | } |
3377 | p->clock_prev = p->clock_next = NULL; |
3378 | } |
3379 | |
3380 | //Remove a pair from the list of pairs that were marked with the |
3381 | //pending bit for the in-progress checkpoint. |
3382 | // |
3383 | // requires that if the caller is the checkpoint thread, then a read lock |
3384 | // is grabbed on the list. Otherwise, must have write lock on list. |
3385 | // |
3386 | void pair_list::pending_pairs_remove (PAIR p) { |
3387 | if (p->pending_next) { |
3388 | p->pending_next->pending_prev = p->pending_prev; |
3389 | } |
3390 | if (p->pending_prev) { |
3391 | p->pending_prev->pending_next = p->pending_next; |
3392 | } |
3393 | else if (m_pending_head==p) { |
3394 | m_pending_head = p->pending_next; |
3395 | } |
3396 | p->pending_prev = p->pending_next = NULL; |
3397 | } |
3398 | |
3399 | void pair_list::remove_from_hash_chain(PAIR p) { |
3400 | // Remove it from the hash chain. |
3401 | unsigned int h = p->fullhash&(m_table_size - 1); |
3402 | paranoid_invariant(m_table[h] != NULL); |
3403 | if (m_table[h] == p) { |
3404 | m_table[h] = p->hash_chain; |
3405 | } |
3406 | else { |
3407 | PAIR curr = m_table[h]; |
3408 | while (curr->hash_chain != p) { |
3409 | curr = curr->hash_chain; |
3410 | } |
3411 | // remove p from the singular linked list |
3412 | curr->hash_chain = p->hash_chain; |
3413 | } |
3414 | p->hash_chain = NULL; |
3415 | } |
3416 | |
3417 | // Returns a pair from the pair list, using the given |
3418 | // pair. If the pair cannot be found, null is returned. |
3419 | // |
3420 | // requires caller to have grabbed either a read lock on the list or |
3421 | // bucket's mutex. |
3422 | // |
3423 | PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) { |
3424 | PAIR found_pair = nullptr; |
3425 | for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) { |
3426 | if (p->key.b == key.b && p->cachefile == file) { |
3427 | found_pair = p; |
3428 | break; |
3429 | } |
3430 | } |
3431 | return found_pair; |
3432 | } |
3433 | |
3434 | // Add PAIR to linked list shared by cleaner thread and clock |
3435 | // |
3436 | // requires caller to have grabbed write lock on list. |
3437 | // |
3438 | void pair_list::add_to_clock (PAIR p) { |
3439 | // requires that p is not currently in the table. |
3440 | // inserts p into the clock list at the tail. |
3441 | |
3442 | p->count = CLOCK_INITIAL_COUNT; |
3443 | //assert either both head and tail are set or they are both NULL |
3444 | // tail and head exist |
3445 | if (m_clock_head) { |
3446 | assert(m_cleaner_head); |
3447 | assert(m_checkpoint_head); |
3448 | // insert right before the head |
3449 | p->clock_next = m_clock_head; |
3450 | p->clock_prev = m_clock_head->clock_prev; |
3451 | |
3452 | p->clock_prev->clock_next = p; |
3453 | p->clock_next->clock_prev = p; |
3454 | |
3455 | } |
3456 | // this is the first element in the list |
3457 | else { |
3458 | m_clock_head = p; |
3459 | p->clock_next = p->clock_prev = m_clock_head; |
3460 | m_cleaner_head = p; |
3461 | m_checkpoint_head = p; |
3462 | } |
3463 | } |
3464 | |
3465 | // add the pair to the linked list that of PAIRs belonging |
3466 | // to the same cachefile. This linked list is used |
3467 | // in cachetable_flush_cachefile. |
3468 | void pair_list::add_to_cf_list(PAIR p) { |
3469 | CACHEFILE cf = p->cachefile; |
3470 | if (cf->cf_head) { |
3471 | cf->cf_head->cf_prev = p; |
3472 | } |
3473 | p->cf_next = cf->cf_head; |
3474 | p->cf_prev = NULL; |
3475 | cf->cf_head = p; |
3476 | cf->num_pairs++; |
3477 | } |
3478 | |
3479 | // Add PAIR to the hashtable |
3480 | // |
3481 | // requires caller to have grabbed write lock on list |
3482 | // and to have grabbed the p->mutex. |
3483 | void pair_list::add_to_hash_chain(PAIR p) { |
3484 | uint32_t h = p->fullhash & (m_table_size - 1); |
3485 | p->hash_chain = m_table[h]; |
3486 | m_table[h] = p; |
3487 | } |
3488 | |
3489 | // test function |
3490 | // |
3491 | // grabs and releases write list lock |
3492 | // |
3493 | void pair_list::verify() { |
3494 | this->write_list_lock(); |
3495 | uint32_t num_found = 0; |
3496 | |
3497 | // First clear all the verify flags by going through the hash chains |
3498 | { |
3499 | uint32_t i; |
3500 | for (i = 0; i < m_table_size; i++) { |
3501 | PAIR p; |
3502 | for (p = m_table[i]; p; p = p->hash_chain) { |
3503 | num_found++; |
3504 | } |
3505 | } |
3506 | } |
3507 | assert(num_found == m_n_in_table); |
3508 | num_found = 0; |
3509 | // Now go through the clock chain, make sure everything in the LRU chain is hashed. |
3510 | { |
3511 | PAIR p; |
3512 | bool is_first = true; |
3513 | for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) { |
3514 | is_first=false; |
3515 | PAIR p2; |
3516 | uint32_t fullhash = p->fullhash; |
3517 | //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key)); |
3518 | for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) { |
3519 | if (p2==p) { |
3520 | /* found it */ |
3521 | num_found++; |
3522 | goto next; |
3523 | } |
3524 | } |
3525 | fprintf(stderr, "Something in the clock chain is not hashed\n" ); |
3526 | assert(0); |
3527 | next:; |
3528 | } |
3529 | assert (num_found == m_n_in_table); |
3530 | } |
3531 | this->write_list_unlock(); |
3532 | } |
3533 | |
3534 | // If given pointers are not null, assign the hash table size of |
3535 | // this pair list and the number of pairs in this pair list. |
3536 | // |
3537 | // |
3538 | // grabs and releases read list lock |
3539 | // |
3540 | void pair_list::get_state(int *num_entries, int *hash_size) { |
3541 | this->read_list_lock(); |
3542 | if (num_entries) { |
3543 | *num_entries = m_n_in_table; |
3544 | } |
3545 | if (hash_size) { |
3546 | *hash_size = m_table_size; |
3547 | } |
3548 | this->read_list_unlock(); |
3549 | } |
3550 | |
3551 | void pair_list::read_list_lock() { |
3552 | toku_pthread_rwlock_rdlock(&m_list_lock); |
3553 | } |
3554 | |
3555 | void pair_list::read_list_unlock() { |
3556 | toku_pthread_rwlock_rdunlock(&m_list_lock); |
3557 | } |
3558 | |
3559 | void pair_list::write_list_lock() { |
3560 | toku_pthread_rwlock_wrlock(&m_list_lock); |
3561 | } |
3562 | |
3563 | void pair_list::write_list_unlock() { |
3564 | toku_pthread_rwlock_wrunlock(&m_list_lock); |
3565 | } |
3566 | |
3567 | void pair_list::read_pending_exp_lock() { |
3568 | toku_pthread_rwlock_rdlock(&m_pending_lock_expensive); |
3569 | } |
3570 | |
3571 | void pair_list::read_pending_exp_unlock() { |
3572 | toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive); |
3573 | } |
3574 | |
3575 | void pair_list::write_pending_exp_lock() { |
3576 | toku_pthread_rwlock_wrlock(&m_pending_lock_expensive); |
3577 | } |
3578 | |
3579 | void pair_list::write_pending_exp_unlock() { |
3580 | toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive); |
3581 | } |
3582 | |
3583 | void pair_list::read_pending_cheap_lock() { |
3584 | toku_pthread_rwlock_rdlock(&m_pending_lock_cheap); |
3585 | } |
3586 | |
3587 | void pair_list::read_pending_cheap_unlock() { |
3588 | toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap); |
3589 | } |
3590 | |
3591 | void pair_list::write_pending_cheap_lock() { |
3592 | toku_pthread_rwlock_wrlock(&m_pending_lock_cheap); |
3593 | } |
3594 | |
3595 | void pair_list::write_pending_cheap_unlock() { |
3596 | toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap); |
3597 | } |
3598 | |
3599 | toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) { |
3600 | return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex; |
3601 | } |
3602 | |
3603 | void pair_list::pair_lock_by_fullhash(uint32_t fullhash) { |
3604 | toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); |
3605 | } |
3606 | |
3607 | void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) { |
3608 | toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); |
3609 | } |
3610 | |
3611 | |
3612 | ENSURE_POD(evictor); |
3613 | |
3614 | // |
3615 | // This is the function that runs eviction on its own thread. |
3616 | // |
3617 | static void *eviction_thread(void *evictor_v) { |
3618 | evictor *CAST_FROM_VOIDP(evictor, evictor_v); |
3619 | evictor->run_eviction_thread(); |
3620 | return toku_pthread_done(evictor_v); |
3621 | } |
3622 | |
3623 | // |
3624 | // Starts the eviction thread, assigns external object references, |
3625 | // and initializes all counters and condition variables. |
3626 | // |
3627 | int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) { |
3628 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running); |
3629 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting); |
3630 | |
3631 | // set max difference to around 500MB |
3632 | int64_t max_diff = (1 << 29); |
3633 | |
3634 | m_low_size_watermark = _size_limit; |
3635 | // these values are selected kind of arbitrarily right now as |
3636 | // being a percentage more than low_size_watermark, which is provided |
3637 | // by the caller. |
3638 | m_low_size_hysteresis = (11 * _size_limit)/10; //10% more |
3639 | if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) { |
3640 | m_low_size_hysteresis = m_low_size_watermark + max_diff; |
3641 | } |
3642 | m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more |
3643 | if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) { |
3644 | m_high_size_hysteresis = m_low_size_hysteresis + max_diff; |
3645 | } |
3646 | m_high_size_watermark = (3 * _size_limit)/2; // 50% more |
3647 | if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) { |
3648 | m_high_size_watermark = m_high_size_hysteresis + max_diff; |
3649 | } |
3650 | |
3651 | m_enable_partial_eviction = true; |
3652 | |
3653 | m_size_reserved = unreservable_memory(_size_limit); |
3654 | m_size_current = 0; |
3655 | m_size_cloned_data = 0; |
3656 | m_size_evicting = 0; |
3657 | |
3658 | m_size_nonleaf = create_partitioned_counter(); |
3659 | m_size_leaf = create_partitioned_counter(); |
3660 | m_size_rollback = create_partitioned_counter(); |
3661 | m_size_cachepressure = create_partitioned_counter(); |
3662 | m_wait_pressure_count = create_partitioned_counter(); |
3663 | m_wait_pressure_time = create_partitioned_counter(); |
3664 | m_long_wait_pressure_count = create_partitioned_counter(); |
3665 | m_long_wait_pressure_time = create_partitioned_counter(); |
3666 | |
3667 | m_pl = _pl; |
3668 | m_cf_list = _cf_list; |
3669 | m_kibbutz = _kibbutz; |
3670 | toku_mutex_init( |
3671 | *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr); |
3672 | toku_cond_init( |
3673 | *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr); |
3674 | toku_cond_init( |
3675 | *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr); |
3676 | m_num_sleepers = 0; |
3677 | m_ev_thread_is_running = false; |
3678 | m_period_in_seconds = eviction_period; |
3679 | |
3680 | unsigned int seed = (unsigned int) time(NULL); |
3681 | int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data); |
3682 | assert_zero(r); |
3683 | |
3684 | // start the background thread |
3685 | m_run_thread = true; |
3686 | m_num_eviction_thread_runs = 0; |
3687 | m_ev_thread_init = false; |
3688 | r = toku_pthread_create( |
3689 | *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this); |
3690 | if (r == 0) { |
3691 | m_ev_thread_init = true; |
3692 | } |
3693 | m_evictor_init = true; |
3694 | return r; |
3695 | } |
3696 | |
3697 | // |
3698 | // This stops the eviction thread and clears the condition variable. |
3699 | // |
3700 | // NOTE: This should only be called if there are no evictions in progress. |
3701 | // |
3702 | void evictor::destroy() { |
3703 | if (!m_evictor_init) { |
3704 | return; |
3705 | } |
3706 | assert(m_size_evicting == 0); |
3707 | // |
3708 | // commented out of Ming, because we could not finish |
3709 | // #5672. Once #5672 is solved, we should restore this |
3710 | // |
3711 | //assert(m_size_current == 0); |
3712 | |
3713 | // Stop the eviction thread. |
3714 | if (m_ev_thread_init) { |
3715 | toku_mutex_lock(&m_ev_thread_lock); |
3716 | m_run_thread = false; |
3717 | this->signal_eviction_thread_locked(); |
3718 | toku_mutex_unlock(&m_ev_thread_lock); |
3719 | void *ret; |
3720 | int r = toku_pthread_join(m_ev_thread, &ret); |
3721 | assert_zero(r); |
3722 | assert(!m_ev_thread_is_running); |
3723 | } |
3724 | destroy_partitioned_counter(m_size_nonleaf); |
3725 | m_size_nonleaf = NULL; |
3726 | destroy_partitioned_counter(m_size_leaf); |
3727 | m_size_leaf = NULL; |
3728 | destroy_partitioned_counter(m_size_rollback); |
3729 | m_size_rollback = NULL; |
3730 | destroy_partitioned_counter(m_size_cachepressure); |
3731 | m_size_cachepressure = NULL; |
3732 | |
3733 | destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL; |
3734 | destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL; |
3735 | destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL; |
3736 | destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL; |
3737 | |
3738 | toku_cond_destroy(&m_flow_control_cond); |
3739 | toku_cond_destroy(&m_ev_thread_cond); |
3740 | toku_mutex_destroy(&m_ev_thread_lock); |
3741 | } |
3742 | |
3743 | // |
3744 | // Increases status variables and the current size variable |
3745 | // of the evictor based on the given pair attribute. |
3746 | // |
3747 | void evictor::add_pair_attr(PAIR_ATTR attr) { |
3748 | assert(attr.is_valid); |
3749 | add_to_size_current(attr.size); |
3750 | increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size); |
3751 | increment_partitioned_counter(m_size_leaf, attr.leaf_size); |
3752 | increment_partitioned_counter(m_size_rollback, attr.rollback_size); |
3753 | increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size); |
3754 | } |
3755 | |
3756 | // |
3757 | // Decreases status variables and the current size variable |
3758 | // of the evictor based on the given pair attribute. |
3759 | // |
3760 | void evictor::remove_pair_attr(PAIR_ATTR attr) { |
3761 | assert(attr.is_valid); |
3762 | remove_from_size_current(attr.size); |
3763 | increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size); |
3764 | increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size); |
3765 | increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size); |
3766 | increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size); |
3767 | } |
3768 | |
3769 | // |
3770 | // Updates this evictor's stats to match the "new" pair attribute given |
3771 | // while also removing the given "old" pair attribute. |
3772 | // |
3773 | void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) { |
3774 | this->add_pair_attr(new_attr); |
3775 | this->remove_pair_attr(old_attr); |
3776 | } |
3777 | |
3778 | // |
3779 | // Adds the given size to the evictor's estimation of |
3780 | // the size of the cachetable. |
3781 | // |
3782 | void evictor::add_to_size_current(long size) { |
3783 | (void) toku_sync_fetch_and_add(&m_size_current, size); |
3784 | } |
3785 | |
3786 | // |
3787 | // Subtracts the given size from the evictor's current |
3788 | // approximation of the cachetable size. |
3789 | // |
3790 | void evictor::remove_from_size_current(long size) { |
3791 | (void) toku_sync_fetch_and_sub(&m_size_current, size); |
3792 | } |
3793 | |
3794 | // |
3795 | // Adds the size of cloned data to necessary variables in the evictor |
3796 | // |
3797 | void evictor::add_cloned_data_size(long size) { |
3798 | (void) toku_sync_fetch_and_add(&m_size_cloned_data, size); |
3799 | add_to_size_current(size); |
3800 | } |
3801 | |
3802 | // |
3803 | // Removes the size of cloned data to necessary variables in the evictor |
3804 | // |
3805 | void evictor::remove_cloned_data_size(long size) { |
3806 | (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size); |
3807 | remove_from_size_current(size); |
3808 | } |
3809 | |
3810 | // |
3811 | // TODO: (Zardosht) comment this function |
3812 | // |
3813 | uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) { |
3814 | toku_mutex_lock(&m_ev_thread_lock); |
3815 | uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved); |
3816 | if (0) { // debug |
3817 | fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n" , __PRETTY_FUNCTION__, reserved_memory, upper_bound); |
3818 | } |
3819 | if (upper_bound > 0 && reserved_memory > upper_bound) { |
3820 | reserved_memory = upper_bound; |
3821 | } |
3822 | m_size_reserved += reserved_memory; |
3823 | (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory); |
3824 | this->signal_eviction_thread_locked(); |
3825 | toku_mutex_unlock(&m_ev_thread_lock); |
3826 | |
3827 | if (this->should_client_thread_sleep()) { |
3828 | this->wait_for_cache_pressure_to_subside(); |
3829 | } |
3830 | return reserved_memory; |
3831 | } |
3832 | |
3833 | // |
3834 | // TODO: (Zardosht) comment this function |
3835 | // |
3836 | void evictor::release_reserved_memory(uint64_t reserved_memory){ |
3837 | (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory); |
3838 | toku_mutex_lock(&m_ev_thread_lock); |
3839 | m_size_reserved -= reserved_memory; |
3840 | // signal the eviction thread in order to possibly wake up sleeping clients |
3841 | if (m_num_sleepers > 0) { |
3842 | this->signal_eviction_thread_locked(); |
3843 | } |
3844 | toku_mutex_unlock(&m_ev_thread_lock); |
3845 | } |
3846 | |
3847 | // |
3848 | // This function is the eviction thread. It runs for the lifetime of |
3849 | // the evictor. Goes to sleep for period_in_seconds |
3850 | // by waiting on m_ev_thread_cond. |
3851 | // |
3852 | void evictor::run_eviction_thread(){ |
3853 | toku_mutex_lock(&m_ev_thread_lock); |
3854 | while (m_run_thread) { |
3855 | m_num_eviction_thread_runs++; // for test purposes only |
3856 | m_ev_thread_is_running = true; |
3857 | // responsibility of run_eviction to release and |
3858 | // regrab ev_thread_lock as it sees fit |
3859 | this->run_eviction(); |
3860 | m_ev_thread_is_running = false; |
3861 | |
3862 | if (m_run_thread) { |
3863 | // |
3864 | // sleep until either we are signaled |
3865 | // via signal_eviction_thread or |
3866 | // m_period_in_seconds amount of time has passed |
3867 | // |
3868 | if (m_period_in_seconds) { |
3869 | toku_timespec_t wakeup_time; |
3870 | struct timeval tv; |
3871 | gettimeofday(&tv, 0); |
3872 | wakeup_time.tv_sec = tv.tv_sec; |
3873 | wakeup_time.tv_nsec = tv.tv_usec * 1000LL; |
3874 | wakeup_time.tv_sec += m_period_in_seconds; |
3875 | toku_cond_timedwait( |
3876 | &m_ev_thread_cond, |
3877 | &m_ev_thread_lock, |
3878 | &wakeup_time |
3879 | ); |
3880 | } |
3881 | // for test purposes, we have an option of |
3882 | // not waiting on a period, but rather sleeping indefinitely |
3883 | else { |
3884 | toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock); |
3885 | } |
3886 | } |
3887 | } |
3888 | toku_mutex_unlock(&m_ev_thread_lock); |
3889 | } |
3890 | |
3891 | // |
3892 | // runs eviction. |
3893 | // on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed |
3894 | // it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit. |
3895 | // |
3896 | void evictor::run_eviction(){ |
3897 | // |
3898 | // These variables will help us detect if everything in the clock is currently being accessed. |
3899 | // We must detect this case otherwise we will end up in an infinite loop below. |
3900 | // |
3901 | bool exited_early = false; |
3902 | uint32_t num_pairs_examined_without_evicting = 0; |
3903 | |
3904 | while (this->eviction_needed()) { |
3905 | if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) { |
3906 | toku_cond_broadcast(&m_flow_control_cond); |
3907 | } |
3908 | // release ev_thread_lock so that eviction may run without holding mutex |
3909 | toku_mutex_unlock(&m_ev_thread_lock); |
3910 | |
3911 | // first try to do an eviction from stale cachefiles |
3912 | bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this); |
3913 | if (!some_eviction_ran) { |
3914 | m_pl->read_list_lock(); |
3915 | PAIR curr_in_clock = m_pl->m_clock_head; |
3916 | // if nothing to evict, we need to exit |
3917 | if (!curr_in_clock) { |
3918 | m_pl->read_list_unlock(); |
3919 | toku_mutex_lock(&m_ev_thread_lock); |
3920 | exited_early = true; |
3921 | goto exit; |
3922 | } |
3923 | if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) { |
3924 | // we have a cycle where everything in the clock is in use |
3925 | // do not return an error |
3926 | // just let memory be overfull |
3927 | m_pl->read_list_unlock(); |
3928 | toku_mutex_lock(&m_ev_thread_lock); |
3929 | exited_early = true; |
3930 | goto exit; |
3931 | } |
3932 | bool eviction_run = run_eviction_on_pair(curr_in_clock); |
3933 | if (eviction_run) { |
3934 | // reset the count |
3935 | num_pairs_examined_without_evicting = 0; |
3936 | } |
3937 | else { |
3938 | num_pairs_examined_without_evicting++; |
3939 | } |
3940 | // at this point, either curr_in_clock is still in the list because it has not been fully evicted, |
3941 | // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted |
3942 | // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock |
3943 | // modified ct->m_clock_head |
3944 | if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) { |
3945 | m_pl->m_clock_head = m_pl->m_clock_head->clock_next; |
3946 | } |
3947 | m_pl->read_list_unlock(); |
3948 | } |
3949 | toku_mutex_lock(&m_ev_thread_lock); |
3950 | } |
3951 | |
3952 | exit: |
3953 | if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) { |
3954 | toku_cond_broadcast(&m_flow_control_cond); |
3955 | } |
3956 | return; |
3957 | } |
3958 | |
3959 | // |
3960 | // NOTE: Cachetable lock held on entry. |
3961 | // Runs eviction on the given PAIR. This may be a |
3962 | // partial eviction or full eviction. |
3963 | // |
3964 | // on entry, pair mutex is NOT held, but pair list's read list lock |
3965 | // IS held |
3966 | // on exit, the same conditions must apply |
3967 | // |
3968 | bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { |
3969 | uint32_t n_in_table; |
3970 | int64_t size_current; |
3971 | bool ret_val = false; |
3972 | // function meant to be called on PAIR that is not being accessed right now |
3973 | CACHEFILE cf = curr_in_clock->cachefile; |
3974 | int r = bjm_add_background_job(cf->bjm); |
3975 | if (r) { |
3976 | goto exit; |
3977 | } |
3978 | pair_lock(curr_in_clock); |
3979 | // these are the circumstances under which we don't run eviction on a pair: |
3980 | // - if other users are waiting on the lock |
3981 | // - if the PAIR is referenced by users |
3982 | // - if the PAIR's disk_nb_mutex is in use, implying that it is |
3983 | // undergoing a checkpoint |
3984 | if (curr_in_clock->value_rwlock.users() || |
3985 | curr_in_clock->refcount > 0 || |
3986 | nb_mutex_users(&curr_in_clock->disk_nb_mutex)) |
3987 | { |
3988 | pair_unlock(curr_in_clock); |
3989 | bjm_remove_background_job(cf->bjm); |
3990 | goto exit; |
3991 | } |
3992 | |
3993 | // extract and use these values so that we don't risk them changing |
3994 | // out from underneath us in calculations below. |
3995 | n_in_table = m_pl->m_n_in_table; |
3996 | size_current = m_size_current; |
3997 | |
3998 | // now that we have the pair mutex we care about, we can |
3999 | // release the read list lock and reacquire it at the end of the function |
4000 | m_pl->read_list_unlock(); |
4001 | ret_val = true; |
4002 | if (curr_in_clock->count > 0) { |
4003 | toku::context pe_ctx(CTX_PARTIAL_EVICTION); |
4004 | |
4005 | uint32_t curr_size = curr_in_clock->attr.size; |
4006 | // if the size of this PAIR is greater than the average size of PAIRs |
4007 | // in the cachetable, then decrement it, otherwise, decrement |
4008 | // probabilistically |
4009 | if (curr_size*n_in_table >= size_current) { |
4010 | curr_in_clock->count--; |
4011 | } else { |
4012 | // generate a random number between 0 and 2^16 |
4013 | assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows |
4014 | int32_t rnd = myrandom_r(&m_random_data) % (1<<16); |
4015 | // The if-statement below will be true with probability of |
4016 | // curr_size/(average size of PAIR in cachetable) |
4017 | // Here is how the math is done: |
4018 | // average_size = size_current/n_in_table |
4019 | // curr_size/average_size = curr_size*n_in_table/size_current |
4020 | // we evaluate if a random number from 0 to 2^16 is less than |
4021 | // than curr_size/average_size * 2^16. So, our if-clause should be |
4022 | // if (2^16*curr_size/average_size > rnd) |
4023 | // this evaluates to: |
4024 | // if (2^16*curr_size*n_in_table/size_current > rnd) |
4025 | // by multiplying each side of the equation by size_current, we get |
4026 | // if (2^16*curr_size*n_in_table > rnd*size_current) |
4027 | // and dividing each side by 2^16, |
4028 | // we get the if-clause below |
4029 | // |
4030 | if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) { |
4031 | curr_in_clock->count--; |
4032 | } |
4033 | } |
4034 | |
4035 | if (m_enable_partial_eviction) { |
4036 | // call the partial eviction callback |
4037 | curr_in_clock->value_rwlock.write_lock(true); |
4038 | |
4039 | void *value = curr_in_clock->value_data; |
4040 | void* disk_data = curr_in_clock->disk_data; |
4041 | void * = curr_in_clock->write_extraargs; |
4042 | enum partial_eviction_cost cost; |
4043 | long bytes_freed_estimate = 0; |
4044 | curr_in_clock->pe_est_callback(value, disk_data, |
4045 | &bytes_freed_estimate, &cost, |
4046 | write_extraargs); |
4047 | if (cost == PE_CHEAP) { |
4048 | pair_unlock(curr_in_clock); |
4049 | curr_in_clock->size_evicting_estimate = 0; |
4050 | this->do_partial_eviction(curr_in_clock); |
4051 | bjm_remove_background_job(cf->bjm); |
4052 | } else if (cost == PE_EXPENSIVE) { |
4053 | // only bother running an expensive partial eviction |
4054 | // if it is expected to free space |
4055 | if (bytes_freed_estimate > 0) { |
4056 | pair_unlock(curr_in_clock); |
4057 | curr_in_clock->size_evicting_estimate = bytes_freed_estimate; |
4058 | toku_mutex_lock(&m_ev_thread_lock); |
4059 | m_size_evicting += bytes_freed_estimate; |
4060 | toku_mutex_unlock(&m_ev_thread_lock); |
4061 | toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction, |
4062 | curr_in_clock); |
4063 | } else { |
4064 | curr_in_clock->value_rwlock.write_unlock(); |
4065 | pair_unlock(curr_in_clock); |
4066 | bjm_remove_background_job(cf->bjm); |
4067 | } |
4068 | } else { |
4069 | assert(false); |
4070 | } |
4071 | } else { |
4072 | pair_unlock(curr_in_clock); |
4073 | bjm_remove_background_job(cf->bjm); |
4074 | } |
4075 | } else { |
4076 | toku::context pe_ctx(CTX_FULL_EVICTION); |
4077 | |
4078 | // responsibility of try_evict_pair to eventually remove background job |
4079 | // pair's mutex is still grabbed here |
4080 | this->try_evict_pair(curr_in_clock); |
4081 | } |
4082 | // regrab the read list lock, because the caller assumes |
4083 | // that it is held. The contract requires this. |
4084 | m_pl->read_list_lock(); |
4085 | exit: |
4086 | return ret_val; |
4087 | } |
4088 | |
4089 | struct { |
4090 | (evictor *e, PAIR p) : |
4091 | ev(e), pair(p) { |
4092 | } |
4093 | evictor *; |
4094 | PAIR ; |
4095 | }; |
4096 | |
4097 | static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *) { |
4098 | struct pair_unpin_with_new_attr_extra *info = |
4099 | reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra); |
4100 | PAIR p = info->pair; |
4101 | evictor *ev = info->ev; |
4102 | |
4103 | // change the attr in the evictor, then update the value in the pair |
4104 | ev->change_pair_attr(p->attr, new_attr); |
4105 | p->attr = new_attr; |
4106 | |
4107 | // unpin |
4108 | pair_lock(p); |
4109 | p->value_rwlock.write_unlock(); |
4110 | pair_unlock(p); |
4111 | } |
4112 | |
4113 | // |
4114 | // on entry and exit, pair's mutex is not held |
4115 | // on exit, PAIR is unpinned |
4116 | // |
4117 | void evictor::do_partial_eviction(PAIR p) { |
4118 | // Copy the old attr |
4119 | PAIR_ATTR old_attr = p->attr; |
4120 | long long size_evicting_estimate = p->size_evicting_estimate; |
4121 | |
4122 | struct pair_unpin_with_new_attr_extra (this, p); |
4123 | p->pe_callback(p->value_data, old_attr, p->write_extraargs, |
4124 | // passed as the finalize continuation, which allows the |
4125 | // pe_callback to unpin the node before doing expensive cleanup |
4126 | pair_unpin_with_new_attr, &extra); |
4127 | |
4128 | // now that the pe_callback (and its pair_unpin_with_new_attr continuation) |
4129 | // have finished, we can safely decrease size_evicting |
4130 | this->decrease_size_evicting(size_evicting_estimate); |
4131 | } |
4132 | |
4133 | // |
4134 | // CT lock held on entry |
4135 | // background job has been added for p->cachefile on entry |
4136 | // responsibility of this function to make sure that background job is removed |
4137 | // |
4138 | // on entry, pair's mutex is held, on exit, the pair's mutex is NOT held |
4139 | // |
4140 | void evictor::try_evict_pair(PAIR p) { |
4141 | CACHEFILE cf = p->cachefile; |
4142 | // evictions without a write or unpinned pair's that are clean |
4143 | // can be run in the current thread |
4144 | |
4145 | // the only caller, run_eviction_on_pair, should call this function |
4146 | // only if no one else is trying to use it |
4147 | assert(!p->value_rwlock.users()); |
4148 | p->value_rwlock.write_lock(true); |
4149 | // if the PAIR is dirty, the running eviction requires writing the |
4150 | // PAIR out. if the disk_nb_mutex is grabbed, then running |
4151 | // eviction requires waiting for the disk_nb_mutex to become available, |
4152 | // which may be expensive. Hence, if either is true, we |
4153 | // do the eviction on a writer thread |
4154 | if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) { |
4155 | p->size_evicting_estimate = 0; |
4156 | // |
4157 | // This method will unpin PAIR and release PAIR mutex |
4158 | // |
4159 | // because the PAIR is not dirty, we can safely pass |
4160 | // false for the for_checkpoint parameter |
4161 | this->evict_pair(p, false); |
4162 | bjm_remove_background_job(cf->bjm); |
4163 | } |
4164 | else { |
4165 | pair_unlock(p); |
4166 | toku_mutex_lock(&m_ev_thread_lock); |
4167 | assert(m_size_evicting >= 0); |
4168 | p->size_evicting_estimate = p->attr.size; |
4169 | m_size_evicting += p->size_evicting_estimate; |
4170 | assert(m_size_evicting >= 0); |
4171 | toku_mutex_unlock(&m_ev_thread_lock); |
4172 | toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p); |
4173 | } |
4174 | } |
4175 | |
4176 | // |
4177 | // Requires: This thread must hold the write lock (nb_mutex) for the pair. |
4178 | // The pair's mutex (p->mutex) is also held. |
4179 | // on exit, neither is held |
4180 | // |
4181 | void evictor::evict_pair(PAIR p, bool for_checkpoint) { |
4182 | if (p->dirty) { |
4183 | pair_unlock(p); |
4184 | cachetable_write_locked_pair(this, p, for_checkpoint); |
4185 | pair_lock(p); |
4186 | } |
4187 | // one thing we can do here is extract the size_evicting estimate, |
4188 | // have decrease_size_evicting take the estimate and not the pair, |
4189 | // and do this work after we have called |
4190 | // cachetable_maybe_remove_and_free_pair |
4191 | this->decrease_size_evicting(p->size_evicting_estimate); |
4192 | // if we are to remove this pair, we need the write list lock, |
4193 | // to get it in a way that avoids deadlocks, we must first release |
4194 | // the pair's mutex, then grab the write list lock, then regrab the |
4195 | // pair's mutex. The pair cannot go anywhere because |
4196 | // the pair is still pinned |
4197 | nb_mutex_lock(&p->disk_nb_mutex, p->mutex); |
4198 | pair_unlock(p); |
4199 | m_pl->write_list_lock(); |
4200 | pair_lock(p); |
4201 | p->value_rwlock.write_unlock(); |
4202 | nb_mutex_unlock(&p->disk_nb_mutex); |
4203 | // at this point, we have the pair list's write list lock |
4204 | // and we have the pair's mutex (p->mutex) held |
4205 | |
4206 | // this ensures that a clone running in the background first completes |
4207 | bool removed = false; |
4208 | if (p->value_rwlock.users() == 0 && p->refcount == 0) { |
4209 | // assumption is that if we are about to remove the pair |
4210 | // that no one has grabbed the disk_nb_mutex, |
4211 | // and that there is no cloned_value_data, because |
4212 | // no one is writing a cloned value out. |
4213 | assert(nb_mutex_users(&p->disk_nb_mutex) == 0); |
4214 | assert(p->cloned_value_data == NULL); |
4215 | cachetable_remove_pair(m_pl, this, p); |
4216 | removed = true; |
4217 | } |
4218 | pair_unlock(p); |
4219 | m_pl->write_list_unlock(); |
4220 | // do not want to hold the write list lock while freeing a pair |
4221 | if (removed) { |
4222 | cachetable_free_pair(p); |
4223 | } |
4224 | } |
4225 | |
4226 | // |
4227 | // this function handles the responsibilities for writer threads when they |
4228 | // decrease size_evicting. The responsibilities are: |
4229 | // - decrease m_size_evicting in a thread safe manner |
4230 | // - in some circumstances, signal the eviction thread |
4231 | // |
4232 | void evictor::decrease_size_evicting(long size_evicting_estimate) { |
4233 | if (size_evicting_estimate > 0) { |
4234 | toku_mutex_lock(&m_ev_thread_lock); |
4235 | int64_t buffer = m_high_size_hysteresis - m_low_size_watermark; |
4236 | // if size_evicting is transitioning from greater than buffer to below buffer, and |
4237 | // some client threads are sleeping, we need to wake up the eviction thread. |
4238 | // Here is why. In this scenario, we are in one of two cases: |
4239 | // - size_current - size_evicting < low_size_watermark |
4240 | // If this is true, then size_current < high_size_hysteresis, which |
4241 | // means we need to wake up sleeping clients |
4242 | // - size_current - size_evicting > low_size_watermark, |
4243 | // which means more evictions must be run. |
4244 | // The consequences of both cases are the responsibility |
4245 | // of the eviction thread. |
4246 | // |
4247 | bool need_to_signal_ev_thread = |
4248 | (m_num_sleepers > 0) && |
4249 | !m_ev_thread_is_running && |
4250 | (m_size_evicting > buffer) && |
4251 | ((m_size_evicting - size_evicting_estimate) <= buffer); |
4252 | m_size_evicting -= size_evicting_estimate; |
4253 | assert(m_size_evicting >= 0); |
4254 | if (need_to_signal_ev_thread) { |
4255 | this->signal_eviction_thread_locked(); |
4256 | } |
4257 | toku_mutex_unlock(&m_ev_thread_lock); |
4258 | } |
4259 | } |
4260 | |
4261 | // |
4262 | // Wait for cache table space to become available |
4263 | // size_current is number of bytes currently occupied by data (referred to by pairs) |
4264 | // size_evicting is number of bytes queued up to be evicted |
4265 | // |
4266 | void evictor::wait_for_cache_pressure_to_subside() { |
4267 | uint64_t t0 = toku_current_time_microsec(); |
4268 | toku_mutex_lock(&m_ev_thread_lock); |
4269 | m_num_sleepers++; |
4270 | this->signal_eviction_thread_locked(); |
4271 | toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock); |
4272 | m_num_sleepers--; |
4273 | toku_mutex_unlock(&m_ev_thread_lock); |
4274 | uint64_t t1 = toku_current_time_microsec(); |
4275 | increment_partitioned_counter(m_wait_pressure_count, 1); |
4276 | uint64_t tdelta = t1 - t0; |
4277 | increment_partitioned_counter(m_wait_pressure_time, tdelta); |
4278 | if (tdelta > 1000000) { |
4279 | increment_partitioned_counter(m_long_wait_pressure_count, 1); |
4280 | increment_partitioned_counter(m_long_wait_pressure_time, tdelta); |
4281 | } |
4282 | } |
4283 | |
4284 | // |
4285 | // Get the status of the current estimated size of the cachetable, |
4286 | // and the evictor's set limit. |
4287 | // |
4288 | void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) { |
4289 | if (size_current_ptr) { |
4290 | *size_current_ptr = m_size_current; |
4291 | } |
4292 | if (size_limit_ptr) { |
4293 | *size_limit_ptr = m_low_size_watermark; |
4294 | } |
4295 | } |
4296 | |
4297 | // |
4298 | // Force the eviction thread to do some work. |
4299 | // |
4300 | // This function does not require any mutex to be held. |
4301 | // As a result, scheduling is not guaranteed, but that is tolerable. |
4302 | // |
4303 | void evictor::signal_eviction_thread() { |
4304 | toku_mutex_lock(&m_ev_thread_lock); |
4305 | toku_cond_signal(&m_ev_thread_cond); |
4306 | toku_mutex_unlock(&m_ev_thread_lock); |
4307 | } |
4308 | |
4309 | void evictor::signal_eviction_thread_locked() { |
4310 | toku_cond_signal(&m_ev_thread_cond); |
4311 | } |
4312 | |
4313 | // |
4314 | // Returns true if the cachetable is so over subscribed, that a client thread should sleep |
4315 | // |
4316 | // This function may be called in a thread-unsafe manner. Locks are not |
4317 | // required to read size_current. The result is that |
4318 | // the values may be a little off, but we think that is tolerable. |
4319 | // |
4320 | bool evictor::should_client_thread_sleep(){ |
4321 | return unsafe_read_size_current() > m_high_size_watermark; |
4322 | } |
4323 | |
4324 | // |
4325 | // Returns true if a sleeping client should be woken up because |
4326 | // the cachetable is not overly subscribed |
4327 | // |
4328 | // This function may be called in a thread-unsafe manner. Locks are not |
4329 | // required to read size_current. The result is that |
4330 | // the values may be a little off, but we think that is tolerable. |
4331 | // |
4332 | bool evictor::should_sleeping_clients_wakeup() { |
4333 | return unsafe_read_size_current() <= m_high_size_hysteresis; |
4334 | } |
4335 | |
4336 | // |
4337 | // Returns true if a client thread should try to wake up the eviction |
4338 | // thread because the client thread has noticed too much data taken |
4339 | // up in the cachetable. |
4340 | // |
4341 | // This function may be called in a thread-unsafe manner. Locks are not |
4342 | // required to read size_current or size_evicting. The result is that |
4343 | // the values may be a little off, but we think that is tolerable. |
4344 | // If the caller wants to ensure that ev_thread_is_running and size_evicting |
4345 | // are accurate, then the caller must hold ev_thread_lock before |
4346 | // calling this function. |
4347 | // |
4348 | bool evictor::should_client_wake_eviction_thread() { |
4349 | return |
4350 | !m_ev_thread_is_running && |
4351 | ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis); |
4352 | } |
4353 | |
4354 | // |
4355 | // Determines if eviction is needed. If the current size of |
4356 | // the cachetable exceeds the sum of our fixed size limit and |
4357 | // the amount of data currently being evicted, then eviction is needed |
4358 | // |
4359 | bool evictor::eviction_needed() { |
4360 | return (m_size_current - m_size_evicting) > m_low_size_watermark; |
4361 | } |
4362 | |
4363 | inline int64_t evictor::unsafe_read_size_current(void) const { |
4364 | return m_size_current; |
4365 | } |
4366 | |
4367 | void evictor::fill_engine_status() { |
4368 | CT_STATUS_VAL(CT_SIZE_CURRENT) = m_size_current; |
4369 | CT_STATUS_VAL(CT_SIZE_LIMIT) = m_low_size_hysteresis; |
4370 | CT_STATUS_VAL(CT_SIZE_WRITING) = m_size_evicting; |
4371 | CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf); |
4372 | CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf); |
4373 | CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback); |
4374 | CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure); |
4375 | CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data; |
4376 | CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count); |
4377 | CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time); |
4378 | CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count); |
4379 | CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time); |
4380 | } |
4381 | |
4382 | void evictor::set_enable_partial_eviction(bool enabled) { |
4383 | m_enable_partial_eviction = enabled; |
4384 | } |
4385 | |
4386 | bool evictor::get_enable_partial_eviction(void) const { |
4387 | return m_enable_partial_eviction; |
4388 | } |
4389 | |
4390 | //////////////////////////////////////////////////////////////////////////////// |
4391 | |
4392 | ENSURE_POD(checkpointer); |
4393 | |
4394 | // |
4395 | // Sets the cachetable reference in this checkpointer class, this is temporary. |
4396 | // |
4397 | int checkpointer::init(pair_list *_pl, |
4398 | TOKULOGGER _logger, |
4399 | evictor *_ev, |
4400 | cachefile_list *files) { |
4401 | m_list = _pl; |
4402 | m_logger = _logger; |
4403 | m_ev = _ev; |
4404 | m_cf_list = files; |
4405 | bjm_init(&m_checkpoint_clones_bjm); |
4406 | |
4407 | // Default is no checkpointing. |
4408 | m_checkpointer_cron_init = false; |
4409 | int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this); |
4410 | if (r == 0) { |
4411 | m_checkpointer_cron_init = true; |
4412 | } |
4413 | m_checkpointer_init = true; |
4414 | return r; |
4415 | } |
4416 | |
4417 | void checkpointer::destroy() { |
4418 | if (!m_checkpointer_init) { |
4419 | return; |
4420 | } |
4421 | if (m_checkpointer_cron_init && !this->has_been_shutdown()) { |
4422 | // for test code only, production code uses toku_cachetable_minicron_shutdown() |
4423 | int r = this->shutdown(); |
4424 | assert(r == 0); |
4425 | } |
4426 | bjm_destroy(m_checkpoint_clones_bjm); |
4427 | } |
4428 | |
4429 | // |
4430 | // Sets how often the checkpoint thread will run, in seconds |
4431 | // |
4432 | void checkpointer::set_checkpoint_period(uint32_t new_period) { |
4433 | toku_minicron_change_period(&m_checkpointer_cron, new_period*1000); |
4434 | } |
4435 | |
4436 | // |
4437 | // Sets how often the checkpoint thread will run. |
4438 | // |
4439 | uint32_t checkpointer::get_checkpoint_period() { |
4440 | return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron); |
4441 | } |
4442 | |
4443 | // |
4444 | // Stops the checkpoint thread. |
4445 | // |
4446 | int checkpointer::shutdown() { |
4447 | return toku_minicron_shutdown(&m_checkpointer_cron); |
4448 | } |
4449 | |
4450 | // |
4451 | // If checkpointing is running, this returns false. |
4452 | // |
4453 | bool checkpointer::has_been_shutdown() { |
4454 | return toku_minicron_has_been_shutdown(&m_checkpointer_cron); |
4455 | } |
4456 | |
4457 | TOKULOGGER checkpointer::get_logger() { |
4458 | return m_logger; |
4459 | } |
4460 | |
4461 | void checkpointer::increment_num_txns() { |
4462 | m_checkpoint_num_txns++; |
4463 | } |
4464 | |
4465 | struct iterate_begin_checkpoint { |
4466 | LSN lsn_of_checkpoint_in_progress; |
4467 | iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { } |
4468 | static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) { |
4469 | assert(cf->begin_checkpoint_userdata); |
4470 | if (cf->for_checkpoint) { |
4471 | cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata); |
4472 | } |
4473 | return 0; |
4474 | } |
4475 | }; |
4476 | |
4477 | // |
4478 | // Update the user data in any cachefiles in our checkpoint list. |
4479 | // |
4480 | void checkpointer::update_cachefiles() { |
4481 | struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress); |
4482 | int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint, |
4483 | iterate_begin_checkpoint::fn>(&iterate); |
4484 | assert_zero(r); |
4485 | } |
4486 | |
4487 | struct iterate_note_pin { |
4488 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU()) { |
4489 | assert(cf->note_pin_by_checkpoint); |
4490 | cf->note_pin_by_checkpoint(cf, cf->userdata); |
4491 | cf->for_checkpoint = true; |
4492 | return 0; |
4493 | } |
4494 | }; |
4495 | |
4496 | // |
4497 | // Sets up and kicks off a checkpoint. |
4498 | // |
4499 | void checkpointer::begin_checkpoint() { |
4500 | // 1. Initialize the accountability counters. |
4501 | m_checkpoint_num_txns = 0; |
4502 | |
4503 | // 2. Make list of cachefiles to be included in the checkpoint. |
4504 | m_cf_list->read_lock(); |
4505 | m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr); |
4506 | m_checkpoint_num_files = m_cf_list->m_active_fileid.size(); |
4507 | m_cf_list->read_unlock(); |
4508 | |
4509 | // 3. Create log entries for this checkpoint. |
4510 | if (m_logger) { |
4511 | this->log_begin_checkpoint(); |
4512 | } |
4513 | |
4514 | bjm_reset(m_checkpoint_clones_bjm); |
4515 | |
4516 | m_list->write_pending_exp_lock(); |
4517 | m_list->read_list_lock(); |
4518 | m_cf_list->read_lock(); // needed for update_cachefiles |
4519 | m_list->write_pending_cheap_lock(); |
4520 | // 4. Turn on all the relevant checkpoint pending bits. |
4521 | this->turn_on_pending_bits(); |
4522 | |
4523 | // 5. |
4524 | this->update_cachefiles(); |
4525 | m_list->write_pending_cheap_unlock(); |
4526 | m_cf_list->read_unlock(); |
4527 | m_list->read_list_unlock(); |
4528 | m_list->write_pending_exp_unlock(); |
4529 | } |
4530 | |
4531 | struct iterate_log_fassociate { |
4532 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU()) { |
4533 | assert(cf->log_fassociate_during_checkpoint); |
4534 | cf->log_fassociate_during_checkpoint(cf, cf->userdata); |
4535 | return 0; |
4536 | } |
4537 | }; |
4538 | |
4539 | // |
4540 | // Assuming the logger exists, this will write out the folloing |
4541 | // information to the log. |
4542 | // |
4543 | // 1. Writes the BEGIN_CHECKPOINT to the log. |
4544 | // 2. Writes the list of open dictionaries to the log. |
4545 | // 3. Writes the list of open transactions to the log. |
4546 | // 4. Writes the list of dicionaries that have had rollback logs suppresed. |
4547 | // |
4548 | // NOTE: This also has the side effecto of setting the LSN |
4549 | // of checkpoint in progress. |
4550 | // |
4551 | void checkpointer::log_begin_checkpoint() { |
4552 | int r = 0; |
4553 | |
4554 | // Write the BEGIN_CHECKPOINT to the log. |
4555 | LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed. |
4556 | TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger); |
4557 | TXNID last_xid = toku_txn_manager_get_last_xid(mgr); |
4558 | toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid); |
4559 | m_lsn_of_checkpoint_in_progress = begin_lsn; |
4560 | |
4561 | // Log the list of open dictionaries. |
4562 | m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr); |
4563 | |
4564 | // Write open transactions to the log. |
4565 | r = toku_txn_manager_iter_over_live_txns( |
4566 | m_logger->txn_manager, |
4567 | log_open_txn, |
4568 | this |
4569 | ); |
4570 | assert(r == 0); |
4571 | } |
4572 | |
4573 | // |
4574 | // Sets the pending bits of EVERY PAIR in the cachetable, regardless of |
4575 | // whether the PAIR is clean or not. It will be the responsibility of |
4576 | // end_checkpoint or client threads to simply clear the pending bit |
4577 | // if the PAIR is clean. |
4578 | // |
4579 | // On entry and exit , the pair list's read list lock is grabbed, and |
4580 | // both pending locks are grabbed |
4581 | // |
4582 | void checkpointer::turn_on_pending_bits() { |
4583 | PAIR p = NULL; |
4584 | uint32_t i; |
4585 | for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) { |
4586 | assert(!p->checkpoint_pending); |
4587 | //Only include pairs belonging to cachefiles in the checkpoint |
4588 | if (!p->cachefile->for_checkpoint) { |
4589 | continue; |
4590 | } |
4591 | // Mark everything as pending a checkpoint |
4592 | // |
4593 | // The rule for the checkpoint_pending bit is as follows: |
4594 | // - begin_checkpoint may set checkpoint_pending to true |
4595 | // even though the pair lock on the node is not held. |
4596 | // - any thread that wants to clear the pending bit must own |
4597 | // the PAIR lock. Otherwise, |
4598 | // we may end up clearing the pending bit before the |
4599 | // current lock is ever released. |
4600 | p->checkpoint_pending = true; |
4601 | if (m_list->m_pending_head) { |
4602 | m_list->m_pending_head->pending_prev = p; |
4603 | } |
4604 | p->pending_next = m_list->m_pending_head; |
4605 | p->pending_prev = NULL; |
4606 | m_list->m_pending_head = p; |
4607 | } |
4608 | invariant(p == m_list->m_checkpoint_head); |
4609 | } |
4610 | |
4611 | void checkpointer::add_background_job() { |
4612 | int r = bjm_add_background_job(m_checkpoint_clones_bjm); |
4613 | assert_zero(r); |
4614 | } |
4615 | void checkpointer::remove_background_job() { |
4616 | bjm_remove_background_job(m_checkpoint_clones_bjm); |
4617 | } |
4618 | |
4619 | void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* ) { |
4620 | toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE)); |
4621 | CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get()); |
4622 | |
4623 | this->fill_checkpoint_cfs(checkpoint_cfs); |
4624 | this->checkpoint_pending_pairs(); |
4625 | this->checkpoint_userdata(checkpoint_cfs); |
4626 | // For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written. |
4627 | if (testcallback_f) { |
4628 | testcallback_f(testextra); |
4629 | } |
4630 | this->log_end_checkpoint(); |
4631 | this->end_checkpoint_userdata(checkpoint_cfs); |
4632 | |
4633 | // Delete list of cachefiles in the checkpoint, |
4634 | this->remove_cachefiles(checkpoint_cfs); |
4635 | } |
4636 | |
4637 | struct iterate_checkpoint_cfs { |
4638 | CACHEFILE *checkpoint_cfs; |
4639 | uint32_t checkpoint_num_files; |
4640 | uint32_t curr_index; |
4641 | iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) : |
4642 | checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) { |
4643 | } |
4644 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) { |
4645 | if (cf->for_checkpoint) { |
4646 | assert(info->curr_index < info->checkpoint_num_files); |
4647 | info->checkpoint_cfs[info->curr_index] = cf; |
4648 | info->curr_index++; |
4649 | } |
4650 | return 0; |
4651 | } |
4652 | }; |
4653 | |
4654 | void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) { |
4655 | struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files); |
4656 | |
4657 | m_cf_list->read_lock(); |
4658 | m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate); |
4659 | assert(iterate.curr_index == m_checkpoint_num_files); |
4660 | m_cf_list->read_unlock(); |
4661 | } |
4662 | |
4663 | void checkpointer::checkpoint_pending_pairs() { |
4664 | PAIR p; |
4665 | m_list->read_list_lock(); |
4666 | while ((p = m_list->m_pending_head)!=0) { |
4667 | // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call. |
4668 | m_list->m_pending_head = m_list->m_pending_head->pending_next; |
4669 | m_list->pending_pairs_remove(p); |
4670 | // if still pending, clear the pending bit and write out the node |
4671 | pair_lock(p); |
4672 | m_list->read_list_unlock(); |
4673 | write_pair_for_checkpoint_thread(m_ev, p); |
4674 | pair_unlock(p); |
4675 | m_list->read_list_lock(); |
4676 | } |
4677 | assert(!m_list->m_pending_head); |
4678 | m_list->read_list_unlock(); |
4679 | bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm); |
4680 | } |
4681 | |
4682 | void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) { |
4683 | // have just written data blocks, so next write the translation and header for each open dictionary |
4684 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
4685 | CACHEFILE cf = checkpoint_cfs[i]; |
4686 | assert(cf->for_checkpoint); |
4687 | assert(cf->checkpoint_userdata); |
4688 | toku_cachetable_set_checkpointing_user_data_status(1); |
4689 | cf->checkpoint_userdata(cf, cf->fd, cf->userdata); |
4690 | toku_cachetable_set_checkpointing_user_data_status(0); |
4691 | } |
4692 | } |
4693 | |
4694 | void checkpointer::log_end_checkpoint() { |
4695 | if (m_logger) { |
4696 | toku_log_end_checkpoint(m_logger, NULL, |
4697 | 1, // want the end_checkpoint to be fsync'd |
4698 | m_lsn_of_checkpoint_in_progress, |
4699 | 0, |
4700 | m_checkpoint_num_files, |
4701 | m_checkpoint_num_txns); |
4702 | toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress); |
4703 | } |
4704 | } |
4705 | |
4706 | void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) { |
4707 | // everything has been written to file and fsynced |
4708 | // ... call checkpoint-end function in block translator |
4709 | // to free obsolete blocks on disk used by previous checkpoint |
4710 | //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock |
4711 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
4712 | CACHEFILE cf = checkpoint_cfs[i]; |
4713 | assert(cf->for_checkpoint); |
4714 | assert(cf->end_checkpoint_userdata); |
4715 | cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata); |
4716 | } |
4717 | } |
4718 | |
4719 | // |
4720 | // Deletes all the cachefiles in this checkpointers cachefile list. |
4721 | // |
4722 | void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) { |
4723 | // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile |
4724 | for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { |
4725 | CACHEFILE cf = checkpoint_cfs[i]; |
4726 | // Checking for function existing so that this function |
4727 | // can be called from cachetable tests. |
4728 | assert(cf->for_checkpoint); |
4729 | cf->for_checkpoint = false; |
4730 | assert(cf->note_unpin_by_checkpoint); |
4731 | // Clear the bit saying theis file is in the checkpoint. |
4732 | cf->note_unpin_by_checkpoint(cf, cf->userdata); |
4733 | } |
4734 | } |
4735 | |
4736 | |
4737 | //////////////////////////////////////////////////////// |
4738 | // |
4739 | // cachefiles list |
4740 | // |
4741 | static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD" ); |
4742 | |
4743 | void cachefile_list::init() { |
4744 | m_next_filenum_to_use.fileid = 0; |
4745 | m_next_hash_id_to_use = 0; |
4746 | toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr); |
4747 | m_active_filenum.create(); |
4748 | m_active_fileid.create(); |
4749 | m_stale_fileid.create(); |
4750 | } |
4751 | |
4752 | void cachefile_list::destroy() { |
4753 | m_active_filenum.destroy(); |
4754 | m_active_fileid.destroy(); |
4755 | m_stale_fileid.destroy(); |
4756 | toku_pthread_rwlock_destroy(&m_lock); |
4757 | } |
4758 | |
4759 | void cachefile_list::read_lock() { |
4760 | toku_pthread_rwlock_rdlock(&m_lock); |
4761 | } |
4762 | |
4763 | void cachefile_list::read_unlock() { |
4764 | toku_pthread_rwlock_rdunlock(&m_lock); |
4765 | } |
4766 | |
4767 | void cachefile_list::write_lock() { |
4768 | toku_pthread_rwlock_wrlock(&m_lock); |
4769 | } |
4770 | |
4771 | void cachefile_list::write_unlock() { |
4772 | toku_pthread_rwlock_wrunlock(&m_lock); |
4773 | } |
4774 | |
4775 | struct iterate_find_iname { |
4776 | const char *iname_in_env; |
4777 | CACHEFILE found_cf; |
4778 | iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { } |
4779 | static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) { |
4780 | if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) { |
4781 | info->found_cf = cf; |
4782 | return -1; |
4783 | } |
4784 | return 0; |
4785 | } |
4786 | }; |
4787 | |
4788 | int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) { |
4789 | struct iterate_find_iname iterate(iname_in_env); |
4790 | |
4791 | read_lock(); |
4792 | int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate); |
4793 | if (iterate.found_cf != nullptr) { |
4794 | assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0); |
4795 | *cf = iterate.found_cf; |
4796 | r = 0; |
4797 | } else { |
4798 | r = ENOENT; |
4799 | } |
4800 | read_unlock(); |
4801 | return r; |
4802 | } |
4803 | |
4804 | static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) { |
4805 | const FILENUM a = a_cf->filenum; |
4806 | if (a.fileid < b.fileid) { |
4807 | return -1; |
4808 | } else if (a.fileid == b.fileid) { |
4809 | return 0; |
4810 | } else { |
4811 | return 1; |
4812 | } |
4813 | } |
4814 | |
4815 | int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) { |
4816 | read_lock(); |
4817 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr); |
4818 | if (r == DB_NOTFOUND) { |
4819 | r = ENOENT; |
4820 | } else { |
4821 | invariant_zero(r); |
4822 | } |
4823 | read_unlock(); |
4824 | return r; |
4825 | } |
4826 | |
4827 | static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) { |
4828 | return toku_fileid_cmp(a_cf->fileid, b); |
4829 | } |
4830 | |
4831 | void cachefile_list::add_cf_unlocked(CACHEFILE cf) { |
4832 | int r; |
4833 | r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr); |
4834 | assert_zero(r); |
4835 | r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); |
4836 | assert_zero(r); |
4837 | } |
4838 | |
4839 | void cachefile_list::add_stale_cf(CACHEFILE cf) { |
4840 | write_lock(); |
4841 | int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); |
4842 | assert_zero(r); |
4843 | write_unlock(); |
4844 | } |
4845 | |
4846 | void cachefile_list::remove_cf(CACHEFILE cf) { |
4847 | write_lock(); |
4848 | |
4849 | uint32_t idx; |
4850 | int r; |
4851 | r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx); |
4852 | assert_zero(r); |
4853 | r = m_active_filenum.delete_at(idx); |
4854 | assert_zero(r); |
4855 | |
4856 | r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); |
4857 | assert_zero(r); |
4858 | r = m_active_fileid.delete_at(idx); |
4859 | assert_zero(r); |
4860 | |
4861 | write_unlock(); |
4862 | } |
4863 | |
4864 | void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) { |
4865 | uint32_t idx; |
4866 | int r; |
4867 | r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); |
4868 | assert_zero(r); |
4869 | r = m_stale_fileid.delete_at(idx); |
4870 | assert_zero(r); |
4871 | } |
4872 | |
4873 | FILENUM cachefile_list::reserve_filenum() { |
4874 | // taking a write lock because we are modifying next_filenum_to_use |
4875 | FILENUM filenum = FILENUM_NONE; |
4876 | write_lock(); |
4877 | while (1) { |
4878 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr); |
4879 | if (r == 0) { |
4880 | m_next_filenum_to_use.fileid++; |
4881 | continue; |
4882 | } |
4883 | assert(r == DB_NOTFOUND); |
4884 | |
4885 | // skip the reserved value UINT32_MAX and wrap around to zero |
4886 | if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) { |
4887 | m_next_filenum_to_use.fileid = 0; |
4888 | continue; |
4889 | } |
4890 | |
4891 | filenum = m_next_filenum_to_use; |
4892 | m_next_filenum_to_use.fileid++; |
4893 | break; |
4894 | } |
4895 | write_unlock(); |
4896 | return filenum; |
4897 | } |
4898 | |
4899 | uint32_t cachefile_list::get_new_hash_id_unlocked() { |
4900 | uint32_t retval = m_next_hash_id_to_use; |
4901 | m_next_hash_id_to_use++; |
4902 | return retval; |
4903 | } |
4904 | |
4905 | CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) { |
4906 | CACHEFILE cf = nullptr; |
4907 | int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); |
4908 | if (r == 0) { |
4909 | assert(!cf->unlink_on_close); |
4910 | } |
4911 | return cf; |
4912 | } |
4913 | |
4914 | CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) { |
4915 | CACHEFILE cf = nullptr; |
4916 | int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); |
4917 | if (r == 0) { |
4918 | assert(!cf->unlink_on_close); |
4919 | } |
4920 | return cf; |
4921 | } |
4922 | |
4923 | void cachefile_list::verify_unused_filenum(FILENUM filenum) { |
4924 | int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr); |
4925 | assert(r == DB_NOTFOUND); |
4926 | } |
4927 | |
4928 | // returns true if some eviction ran, false otherwise |
4929 | bool cachefile_list::evict_some_stale_pair(evictor* ev) { |
4930 | write_lock(); |
4931 | if (m_stale_fileid.size() == 0) { |
4932 | write_unlock(); |
4933 | return false; |
4934 | } |
4935 | |
4936 | CACHEFILE stale_cf = nullptr; |
4937 | int r = m_stale_fileid.fetch(0, &stale_cf); |
4938 | assert_zero(r); |
4939 | |
4940 | // we should not have a cf in the stale list |
4941 | // that does not have any pairs |
4942 | PAIR p = stale_cf->cf_head; |
4943 | paranoid_invariant(p != NULL); |
4944 | evict_pair_from_cachefile(p); |
4945 | |
4946 | // now that we have evicted something, |
4947 | // let's check if the cachefile is needed anymore |
4948 | // |
4949 | // it is not needed if the latest eviction caused |
4950 | // the cf_head for that cf to become null |
4951 | bool destroy_cf = stale_cf->cf_head == nullptr; |
4952 | if (destroy_cf) { |
4953 | remove_stale_cf_unlocked(stale_cf); |
4954 | } |
4955 | |
4956 | write_unlock(); |
4957 | |
4958 | ev->remove_pair_attr(p->attr); |
4959 | cachetable_free_pair(p); |
4960 | if (destroy_cf) { |
4961 | cachefile_destroy(stale_cf); |
4962 | } |
4963 | return true; |
4964 | } |
4965 | |
4966 | void cachefile_list::free_stale_data(evictor* ev) { |
4967 | write_lock(); |
4968 | while (m_stale_fileid.size() != 0) { |
4969 | CACHEFILE stale_cf = nullptr; |
4970 | int r = m_stale_fileid.fetch(0, &stale_cf); |
4971 | assert_zero(r); |
4972 | |
4973 | // we should not have a cf in the stale list |
4974 | // that does not have any pairs |
4975 | PAIR p = stale_cf->cf_head; |
4976 | paranoid_invariant(p != NULL); |
4977 | |
4978 | evict_pair_from_cachefile(p); |
4979 | ev->remove_pair_attr(p->attr); |
4980 | cachetable_free_pair(p); |
4981 | |
4982 | // now that we have evicted something, |
4983 | // let's check if the cachefile is needed anymore |
4984 | if (stale_cf->cf_head == NULL) { |
4985 | remove_stale_cf_unlocked(stale_cf); |
4986 | cachefile_destroy(stale_cf); |
4987 | } |
4988 | } |
4989 | write_unlock(); |
4990 | } |
4991 | |
4992 | void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void); |
4993 | void |
4994 | toku_cachetable_helgrind_ignore(void) { |
4995 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss); |
4996 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime); |
4997 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches); |
4998 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions); |
4999 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions); |
5000 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status); |
5001 | } |
5002 | |