1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <memory.h>
41#include <ctype.h>
42#include <limits.h>
43#include <unistd.h>
44
45#include "ft/serialize/block_table.h"
46#include "ft/ft.h"
47#include "ft/logger/log-internal.h"
48#include "ft/txn/txn_manager.h"
49#include "ft/txn/rollback_log_node_cache.h"
50
51#include "util/status.h"
52
53static const int log_format_version = TOKU_LOG_VERSION;
54
55toku_instr_key *result_output_condition_lock_mutex_key;
56toku_instr_key *result_output_condition_key;
57toku_instr_key *tokudb_file_log_key;
58
59static int open_logfile(TOKULOGGER logger);
60static void logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn);
61static void delete_logfile(TOKULOGGER logger,
62 long long index,
63 uint32_t version);
64static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
65static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
66
67static void toku_print_bytes (FILE *outf, uint32_t len, char *data) {
68 fprintf(outf, "\"");
69 uint32_t i;
70 for (i=0; i<len; i++) {
71 switch (data[i]) {
72 case '"': fprintf(outf, "\\\""); break;
73 case '\\': fprintf(outf, "\\\\"); break;
74 case '\n': fprintf(outf, "\\n"); break;
75 default:
76 if (isprint(data[i])) fprintf(outf, "%c", data[i]);
77 else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
78 }
79 }
80 fprintf(outf, "\"");
81}
82
83static bool is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) {
84 bool rval = true;
85 uint64_t result;
86 int n;
87 int r;
88 uint32_t version;
89 r = sscanf(name, "log%" SCNu64 ".tokulog%" SCNu32 "%n", &result, &version, &n);
90 if (r!=2 || name[n]!='\0' || version <= TOKU_LOG_VERSION_1) {
91 //Version 1 does NOT append 'version' to end of '.tokulog'
92 version = TOKU_LOG_VERSION_1;
93 r = sscanf(name, "log%" SCNu64 ".tokulog%n", &result, &n);
94 if (r!=1 || name[n]!='\0') {
95 rval = false;
96 }
97 }
98 if (rval) {
99 *number_result = result;
100 *version_of_log = version;
101 }
102
103 return rval;
104}
105
106// added for #2424, improved for #2521
107static bool is_a_logfile (const char *name, long long *number_result) {
108 bool rval;
109 uint64_t result;
110 uint32_t version;
111 rval = is_a_logfile_any_version(name, &result, &version);
112 if (rval && version != TOKU_LOG_VERSION)
113 rval = false;
114 if (rval)
115 *number_result = result;
116 return rval;
117}
118
119
120// TODO: can't fail
121int toku_logger_create (TOKULOGGER *resultp) {
122 TOKULOGGER CALLOC(result);
123 if (result==0) return get_error_errno();
124 result->is_open=false;
125 result->write_log_files = true;
126 result->trim_log_files = true;
127 result->directory=0;
128 // fd is uninitialized on purpose
129 // ct is uninitialized on purpose
130 result->lg_max = 100<<20; // 100MB default
131 // lsn is uninitialized
132 result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
133 result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
134 // written_lsn is uninitialized
135 // fsynced_lsn is uninitialized
136 result->last_completed_checkpoint_lsn = ZERO_LSN;
137 // next_log_file_number is uninitialized
138 // n_in_file is uninitialized
139 result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default ft block size
140 toku_logfilemgr_create(&result->logfilemgr);
141 *resultp = result;
142 ml_init(&result->input_lock);
143 toku_mutex_init(*result_output_condition_lock_mutex_key,
144 &result->output_condition_lock,
145 nullptr);
146 toku_cond_init(
147 *result_output_condition_key, &result->output_condition, nullptr);
148 result->rollback_cachefile = NULL;
149 result->output_is_available = true;
150 toku_txn_manager_init(&result->txn_manager);
151 return 0;
152}
153
154static void fsync_logdir(TOKULOGGER logger) {
155 toku_fsync_dirfd_without_accounting(logger->dir);
156}
157
158static int open_logdir(TOKULOGGER logger, const char *directory) {
159 if (toku_os_is_absolute_name(directory)) {
160 logger->directory = toku_strdup(directory);
161 } else {
162 char cwdbuf[PATH_MAX];
163 char *cwd = getcwd(cwdbuf, PATH_MAX);
164 if (cwd == NULL)
165 return -1;
166 char *MALLOC_N(strlen(cwd) + strlen(directory) + 2, new_log_dir);
167 if (new_log_dir == NULL) {
168 return -2;
169 }
170 sprintf(new_log_dir, "%s/%s", cwd, directory);
171 logger->directory = new_log_dir;
172 }
173 if (logger->directory==0) return get_error_errno();
174
175 logger->dir = opendir(logger->directory);
176 if ( logger->dir == NULL ) return -1;
177 return 0;
178}
179
180static int close_logdir(TOKULOGGER logger) {
181 return closedir(logger->dir);
182}
183
184int
185toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid) {
186 if (logger->is_open) return EINVAL;
187
188 int r;
189 TXNID last_xid_if_clean_shutdown = TXNID_NONE;
190 r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown);
191 if ( r!=0 )
192 return r;
193 logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
194 logger->written_lsn = logger->lsn;
195 logger->fsynced_lsn = logger->lsn;
196 logger->inbuf.max_lsn_in_buf = logger->lsn;
197 logger->outbuf.max_lsn_in_buf = logger->lsn;
198
199 // open directory, save pointer for fsyncing t:2445
200 r = open_logdir(logger, directory);
201 if (r!=0) return r;
202
203 long long nexti;
204 r = toku_logger_find_next_unused_log_file(logger->directory, &nexti);
205 if (r!=0) return r;
206
207 logger->next_log_file_number = nexti;
208 r = open_logfile(logger);
209 if (r!=0) return r;
210 if (last_xid == TXNID_NONE) {
211 last_xid = last_xid_if_clean_shutdown;
212 }
213 toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid);
214
215 logger->is_open = true;
216 return 0;
217}
218
219int toku_logger_open (const char *directory, TOKULOGGER logger) {
220 return toku_logger_open_with_last_xid(directory, logger, TXNID_NONE);
221}
222
223bool toku_logger_rollback_is_open (TOKULOGGER logger) {
224 return logger->rollback_cachefile != NULL;
225}
226
227#define MAX_CACHED_ROLLBACK_NODES 4096
228
229void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
230 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
231 logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
232}
233
234int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
235 assert(logger->is_open);
236 assert(!logger->rollback_cachefile);
237
238 FT_HANDLE ft_handle = nullptr; // Note, there is no DB associated with this FT.
239 toku_ft_handle_create(&ft_handle);
240 int r = toku_ft_handle_open(ft_handle, toku_product_name_strings.rollback_cachefile, create, create, cachetable, nullptr);
241 if (r == 0) {
242 FT ft = ft_handle->ft;
243 logger->rollback_cachefile = ft->cf;
244 toku_logger_initialize_rollback_cache(logger, ft_handle->ft);
245
246 // Verify it is empty
247 // Must have no data blocks (rollback logs or otherwise).
248 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
249 bool is_empty = toku_ft_is_empty_fast(ft_handle);
250 assert(is_empty);
251 } else {
252 toku_ft_handle_close(ft_handle);
253 }
254 return r;
255}
256
257
258// Requires: Rollback cachefile can only be closed immediately after a checkpoint,
259// so it will always be clean (!h->dirty) when about to be closed.
260// Rollback log can only be closed when there are no open transactions,
261// so it will always be empty (no data blocks) when about to be closed.
262void toku_logger_close_rollback_check_empty(TOKULOGGER logger, bool clean_shutdown) {
263 CACHEFILE cf = logger->rollback_cachefile; // stored in logger at rollback cachefile open
264 if (cf) {
265 FT_HANDLE ft_to_close;
266 { //Find "ft_to_close"
267 logger->rollback_cache.destroy();
268 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
269 if (clean_shutdown) {
270 //Verify it is safe to close it.
271 assert(!ft->h->dirty); //Must not be dirty.
272 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
273 // Must have no data blocks (rollback logs or otherwise).
274 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
275 assert(!ft->h->dirty);
276 } else {
277 ft->h->dirty = 0;
278 }
279 ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
280 if (clean_shutdown) {
281 bool is_empty;
282 is_empty = toku_ft_is_empty_fast(ft_to_close);
283 assert(is_empty);
284 assert(!ft->h->dirty); // it should not have been dirtied by the toku_ft_is_empty test.
285 }
286 }
287
288 toku_ft_handle_close(ft_to_close);
289 //Set as dealt with already.
290 logger->rollback_cachefile = NULL;
291 }
292}
293
294void toku_logger_close_rollback(TOKULOGGER logger) {
295 toku_logger_close_rollback_check_empty(logger, true);
296}
297
298// No locks held on entry
299// No locks held on exit.
300// No locks are needed, since you cannot legally close the log concurrently with doing anything else.
301// TODO: can't fail
302int toku_logger_close(TOKULOGGER *loggerp) {
303 int r;
304 TOKULOGGER logger = *loggerp;
305 if (!logger->is_open) {
306 goto is_closed;
307 }
308 ml_lock(&logger->input_lock);
309 LSN fsynced_lsn;
310 grab_output(logger, &fsynced_lsn);
311 logger_write_buffer(logger, &fsynced_lsn);
312 if (logger->fd!=-1) {
313 if (logger->write_log_files) {
314 toku_file_fsync_without_accounting(logger->fd);
315 }
316 r = toku_os_close(logger->fd);
317 assert(r == 0);
318 }
319 r = close_logdir(logger);
320 assert(r == 0);
321 logger->fd=-1;
322 release_output(logger, fsynced_lsn);
323
324is_closed:
325 toku_free(logger->inbuf.buf);
326 toku_free(logger->outbuf.buf);
327 // before destroying locks they must be left in the unlocked state.
328 ml_destroy(&logger->input_lock);
329 toku_mutex_destroy(&logger->output_condition_lock);
330 toku_cond_destroy(&logger->output_condition);
331 toku_txn_manager_destroy(logger->txn_manager);
332 if (logger->directory) toku_free(logger->directory);
333 toku_logfilemgr_destroy(&logger->logfilemgr);
334 toku_free(logger);
335 *loggerp=0;
336 return 0;
337}
338
339void toku_logger_shutdown(TOKULOGGER logger) {
340 if (logger->is_open) {
341 TXN_MANAGER mgr = logger->txn_manager;
342 if (toku_txn_manager_num_live_root_txns(mgr) == 0) {
343 TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
344 toku_log_shutdown(logger, NULL, true, 0, last_xid);
345 }
346 }
347}
348
349static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
350// Effect: close the current file, and open the next one.
351// Entry: This thread has permission to modify the output.
352// Exit: This thread has permission to modify the output.
353{
354 int r;
355 if (logger->write_log_files) {
356 toku_file_fsync_without_accounting(logger->fd);
357 *fsynced_lsn = logger->written_lsn;
358 toku_logfilemgr_update_last_lsn(logger->logfilemgr,
359 logger->written_lsn); // fixes t:2294
360 }
361 r = toku_os_close(logger->fd);
362
363 if (r != 0)
364 return get_error_errno();
365 return open_logfile(logger);
366}
367
368static int
369max_int (int a, int b)
370{
371 if (a>b) return a;
372 return b;
373}
374
375// ***********************************************************
376// output mutex/condition manipulation routines
377// ***********************************************************
378
379static void
380wait_till_output_available (TOKULOGGER logger)
381// Effect: Wait until output becomes available.
382// Implementation hint: Use a pthread_cond_wait.
383// Entry: Holds the output_condition_lock (but not the inlock)
384// Exit: Holds the output_condition_lock and logger->output_is_available
385//
386{
387 tokutime_t t0 = toku_time_now();
388 while (!logger->output_is_available) {
389 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
390 }
391 if (tokutime_to_seconds(toku_time_now() - t0) >= 0.100) {
392 logger->num_wait_buf_long++;
393 }
394}
395
396static void
397grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
398// Effect: Wait until output becomes available and get permission to modify output.
399// Entry: Holds no lock (including not holding the input lock, since we never hold both at once).
400// Exit: Hold permission to modify output (but none of the locks).
401{
402 toku_mutex_lock(&logger->output_condition_lock);
403 wait_till_output_available(logger);
404 logger->output_is_available = false;
405 if (fsynced_lsn) {
406 *fsynced_lsn = logger->fsynced_lsn;
407 }
408 toku_mutex_unlock(&logger->output_condition_lock);
409}
410
411static bool
412wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn)
413// Effect: Wait until either the output is available or the lsn has been written.
414// Return true iff the lsn has been written.
415// If returning true, then on exit we don't hold output permission.
416// If returning false, then on exit we do hold output permission.
417// Entry: Hold no locks.
418// Exit: Hold the output permission if returns false.
419{
420 bool result;
421 toku_mutex_lock(&logger->output_condition_lock);
422 while (1) {
423 if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
424 result = true;
425 break;
426 }
427 if (logger->output_is_available) {
428 logger->output_is_available = false;
429 result = false;
430 break;
431 }
432 // otherwise wait for a good time to look again.
433 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
434 }
435 *fsynced_lsn = logger->fsynced_lsn;
436 toku_mutex_unlock(&logger->output_condition_lock);
437 return result;
438}
439
440static void
441release_output (TOKULOGGER logger, LSN fsynced_lsn)
442// Effect: Release output permission.
443// Entry: Holds output permissions, but no locks.
444// Exit: Holds neither locks nor output permission.
445{
446 toku_mutex_lock(&logger->output_condition_lock);
447 logger->output_is_available = true;
448 if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
449 logger->fsynced_lsn = fsynced_lsn;
450 }
451 toku_cond_broadcast(&logger->output_condition);
452 toku_mutex_unlock(&logger->output_condition_lock);
453}
454
455static void
456swap_inbuf_outbuf (TOKULOGGER logger)
457// Effect: Swap the inbuf and outbuf
458// Entry and exit: Hold the input lock and permission to modify output.
459{
460 struct logbuf tmp = logger->inbuf;
461 logger->inbuf = logger->outbuf;
462 logger->outbuf = tmp;
463 assert(logger->inbuf.n_in_buf == 0);
464}
465
466static void
467write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
468// Effect: Write the contents of outbuf to logfile. Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
469// If the logfile gets too big, open the next one (that's the case where an fsync might happen).
470// Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
471{
472 if (logger->outbuf.n_in_buf>0) {
473 // Write the outbuf to disk, take accounting measurements
474 tokutime_t io_t0 = toku_time_now();
475 toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
476 tokutime_t io_t1 = toku_time_now();
477 logger->num_writes_to_disk++;
478 logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
479 logger->time_spent_writing_to_disk += (io_t1 - io_t0);
480
481 assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
482 logger->written_lsn = logger->outbuf.max_lsn_in_buf;
483 logger->n_in_file += logger->outbuf.n_in_buf;
484 logger->outbuf.n_in_buf = 0;
485 }
486 // If the file got too big, then open a new file.
487 if (logger->n_in_file > logger->lg_max) {
488 int r = close_and_open_logfile(logger, fsynced_lsn);
489 assert_zero(r);
490 }
491}
492
493void
494toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
495// Entry: Holds the inlock
496// Exit: Holds the inlock
497// Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
498// May release the inlock (and then reacquire it), so this is not atomic.
499// May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once).
500// (But may hold output permission and inlock at the same time.)
501// Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There might not be an fsync.
502// Arguments: logger: the logger (side effects)
503// n_bytes_needed: how many bytes to make space for.
504{
505 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
506 return;
507 }
508 ml_unlock(&logger->input_lock);
509 LSN fsynced_lsn;
510 grab_output(logger, &fsynced_lsn);
511
512 ml_lock(&logger->input_lock);
513 // Some other thread may have written the log out while we didn't have the lock. If we have space now, then be happy.
514 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
515 release_output(logger, fsynced_lsn);
516 return;
517 }
518 if (logger->inbuf.n_in_buf > 0) {
519 // There isn't enough space, and there is something in the buffer, so write the inbuf.
520 swap_inbuf_outbuf(logger);
521
522 // Don't release the inlock in this case, because we don't want to get starved.
523 write_outbuf_to_logfile(logger, &fsynced_lsn);
524 }
525 // the inbuf is empty. Make it big enough (just in case it is somehow smaller than a single log entry).
526 if (n_bytes_needed > logger->inbuf.buf_size) {
527 assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
528 int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
529 assert(new_size < (1<<30));
530 XREALLOC_N(new_size, logger->inbuf.buf);
531 logger->inbuf.buf_size = new_size;
532 }
533 release_output(logger, fsynced_lsn);
534}
535
536void toku_logger_fsync(TOKULOGGER logger)
537// Effect: This is the exported fsync used by ydb.c for env_log_flush. Group commit doesn't have to work.
538// Entry: Holds no locks
539// Exit: Holds no locks
540// Implementation note: Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
541// Then release everything. Hold the input lock while reading the current max lsn in buf to make drd happy that there is no data race.
542{
543 ml_lock(&logger->input_lock);
544 const LSN max_lsn_in_buf = logger->inbuf.max_lsn_in_buf;
545 ml_unlock(&logger->input_lock);
546
547 toku_logger_maybe_fsync(logger, max_lsn_in_buf, true, false);
548}
549
550void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
551 if (logger->write_log_files) {
552 toku_logger_maybe_fsync(logger, lsn, true, false);
553 }
554}
555
556int toku_logger_is_open(TOKULOGGER logger) {
557 if (logger==0) return 0;
558 return logger->is_open;
559}
560
561void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) {
562 logger->ct = ct;
563}
564
565int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max) {
566 if (logger==0) return EINVAL; // no logger
567 if (logger->is_open) return EINVAL;
568 if (lg_max>(1<<30)) return EINVAL; // too big
569 logger->lg_max = lg_max;
570 return 0;
571}
572int toku_logger_get_lg_max(TOKULOGGER logger, uint32_t *lg_maxp) {
573 if (logger==0) return EINVAL; // no logger
574 *lg_maxp = logger->lg_max;
575 return 0;
576}
577
578int toku_logger_set_lg_bsize(TOKULOGGER logger, uint32_t bsize) {
579 if (logger==0) return EINVAL; // no logger
580 if (logger->is_open) return EINVAL;
581 if (bsize<=0 || bsize>(1<<30)) return EINVAL;
582 logger->write_block_size = bsize;
583 return 0;
584}
585
586int toku_logger_find_next_unused_log_file(const char *directory, long long *result)
587// This is called during logger initialalization, and no locks are required.
588{
589 DIR *d=opendir(directory);
590 long long maxf=-1; *result = maxf;
591 struct dirent *de;
592 if (d==0) return get_error_errno();
593 while ((de=readdir(d))) {
594 if (de==0) return get_error_errno();
595 long long thisl = -1;
596 if ( is_a_logfile(de->d_name, &thisl) ) {
597 if ((long long)thisl > maxf) maxf = thisl;
598 }
599 }
600 *result=maxf+1;
601 int r = closedir(d);
602 return r;
603}
604
605// TODO: Put this in portability layer when ready
606// in: file pathname that may have a dirname prefix
607// return: file leaf name
608static char * fileleafname(char *pathname) {
609 const char delimiter = '/';
610 char *leafname = strrchr(pathname, delimiter);
611 if (leafname)
612 leafname++;
613 else
614 leafname = pathname;
615 return leafname;
616}
617
618static int logfilenamecompare (const void *ap, const void *bp) {
619 char *a=*(char**)ap;
620 char *a_leafname = fileleafname(a);
621 char *b=*(char**)bp;
622 char * b_leafname = fileleafname(b);
623 int rval;
624 bool valid;
625 uint64_t num_a = 0; // placate compiler
626 uint64_t num_b = 0;
627 uint32_t ver_a = 0;
628 uint32_t ver_b = 0;
629 valid = is_a_logfile_any_version(a_leafname, &num_a, &ver_a);
630 invariant(valid);
631 valid = is_a_logfile_any_version(b_leafname, &num_b, &ver_b);
632 invariant(valid);
633 if (ver_a < ver_b) rval = -1;
634 else if (ver_a > ver_b) rval = +1;
635 else if (num_a < num_b) rval = -1;
636 else if (num_a > num_b) rval = +1;
637 else rval = 0;
638 return rval;
639}
640
641// Return the log files in sorted order
642// Return a null_terminated array of strings, and also return the number of strings in the array.
643// Requires: Race conditions must be dealt with by caller. Either call during initialization or grab the output permission.
644int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles)
645{
646 int result_limit=2;
647 int n_results=0;
648 char **MALLOC_N(result_limit, result);
649 assert(result!= NULL);
650 struct dirent *de;
651 DIR *d=opendir(directory);
652 if (d==0) {
653 int er = get_error_errno();
654 toku_free(result);
655 return er;
656 }
657 int dirnamelen = strlen(directory);
658 while ((de=readdir(d))) {
659 uint64_t thisl;
660 uint32_t version_ignore;
661 if ( !(is_a_logfile_any_version(de->d_name, &thisl, &version_ignore)) ) continue; //#2424: Skip over files that don't match the exact logfile template
662 if (n_results+1>=result_limit) {
663 result_limit*=2;
664 XREALLOC_N(result_limit, result);
665 }
666 int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
667 char *XMALLOC_N(fnamelen, fname);
668 snprintf(fname, fnamelen, "%s/%s", directory, de->d_name);
669 result[n_results++] = fname;
670 }
671 // Return them in increasing order. Set width to allow for newer log file names ("xxx.tokulog13")
672 // which are one character longer than old log file names ("xxx.tokulog2"). The comparison function
673 // won't look beyond the terminating NUL, so an extra character in the comparison string doesn't matter.
674 // Allow room for terminating NUL after "xxx.tokulog13" even if result[0] is of form "xxx.tokulog2."
675 int width = sizeof(result[0]+2);
676 qsort(result, n_results, width, logfilenamecompare);
677 *resultp = result;
678 *n_logfiles = n_results;
679 result[n_results]=0; // make a trailing null
680 return d ? closedir(d) : 0;
681}
682
683void toku_logger_free_logfiles(char **logfiles, int n_logfiles) {
684 for (int i = 0; i < n_logfiles; i++)
685 toku_free(logfiles[i]);
686 toku_free(logfiles);
687}
688
689static int open_logfile (TOKULOGGER logger)
690// Entry and Exit: This thread has permission to modify the output.
691{
692 int fnamelen = strlen(logger->directory)+50;
693 char fname[fnamelen];
694 snprintf(fname,
695 fnamelen,
696 "%s/log%012lld.tokulog%d",
697 logger->directory,
698 logger->next_log_file_number,
699 TOKU_LOG_VERSION);
700 long long index = logger->next_log_file_number;
701 if (logger->write_log_files) {
702 logger->fd =
703 toku_os_open(fname,
704 O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY,
705 S_IRUSR + S_IWUSR,
706 *tokudb_file_log_key);
707 if (logger->fd == -1) {
708 return get_error_errno();
709 }
710 fsync_logdir(logger);
711 logger->next_log_file_number++;
712 } else {
713 logger->fd = toku_os_open(
714 DEV_NULL_FILE, O_WRONLY + O_BINARY, S_IWUSR, *tokudb_file_log_key);
715 if (logger->fd == -1) {
716 return get_error_errno();
717 }
718 }
719 toku_os_full_write(logger->fd, "tokulogg", 8);
720 int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
721 toku_os_full_write(logger->fd, &version_l, 4);
722 if ( logger->write_log_files ) {
723 TOKULOGFILEINFO XMALLOC(lf_info);
724 lf_info->index = index;
725 lf_info->maxlsn = logger->written_lsn;
726 lf_info->version = TOKU_LOG_VERSION;
727 toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
728 }
729 logger->fsynced_lsn = logger->written_lsn;
730 logger->n_in_file = 12;
731 return 0;
732}
733
734static void delete_logfile(TOKULOGGER logger, long long index, uint32_t version)
735// Entry and Exit: This thread has permission to modify the output.
736{
737 int fnamelen = strlen(logger->directory)+50;
738 char fname[fnamelen];
739 snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, index, version);
740 int r = remove(fname);
741 invariant_zero(r);
742}
743
744void toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn)
745// On entry and exit: No logger locks held.
746// Acquires and releases output permission.
747{
748 LSN fsynced_lsn;
749 grab_output(logger, &fsynced_lsn);
750 TOKULOGFILEMGR lfm = logger->logfilemgr;
751 int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
752
753 TOKULOGFILEINFO lf_info = NULL;
754
755 if ( logger->write_log_files && logger->trim_log_files) {
756 while ( n_logfiles > 1 ) { // don't delete current logfile
757 uint32_t log_version;
758 lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
759 log_version = lf_info->version;
760 if ( lf_info->maxlsn.lsn >= trim_lsn.lsn ) {
761 // file contains an open LSN, can't delete this or any newer log files
762 break;
763 }
764 // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
765 long index = lf_info->index;
766 toku_logfilemgr_delete_oldest_logfile_info(lfm);
767 n_logfiles--;
768 delete_logfile(logger, index, log_version);
769 }
770 }
771 release_output(logger, fsynced_lsn);
772}
773
774void toku_logger_write_log_files (TOKULOGGER logger, bool write_log_files)
775// Called only during initialization (or just after recovery), so no locks are needed.
776{
777 logger->write_log_files = write_log_files;
778}
779
780void toku_logger_trim_log_files (TOKULOGGER logger, bool trim_log_files)
781// Called only during initialization, so no locks are needed.
782{
783 logger->trim_log_files = trim_log_files;
784}
785
786bool toku_logger_txns_exist(TOKULOGGER logger)
787// Called during close of environment to ensure that transactions don't exist
788{
789 return toku_txn_manager_txns_exist(logger->txn_manager);
790}
791
792
793void toku_logger_maybe_fsync(TOKULOGGER logger, LSN lsn, int do_fsync, bool holds_input_lock)
794// Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
795// Entry: Holds input lock iff 'holds_input_lock'. The log entry has already been written to the input buffer.
796// Exit: Holds no locks.
797// The input lock may be released and then reacquired. Thus this function does not run atomically with respect to other threads.
798{
799 if (holds_input_lock) {
800 ml_unlock(&logger->input_lock);
801 }
802 if (do_fsync) {
803 // reacquire the locks (acquire output permission first)
804 LSN fsynced_lsn;
805 bool already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
806 if (already_done) {
807 return;
808 }
809
810 // otherwise we now own the output permission, and our lsn isn't outputed.
811
812 ml_lock(&logger->input_lock);
813
814 swap_inbuf_outbuf(logger);
815
816 ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.)
817
818 write_outbuf_to_logfile(logger, &fsynced_lsn);
819 if (fsynced_lsn.lsn < lsn.lsn) {
820 // it may have gotten fsynced by the write_outbuf_to_logfile.
821 toku_file_fsync_without_accounting(logger->fd);
822 assert(fsynced_lsn.lsn <= logger->written_lsn.lsn);
823 fsynced_lsn = logger->written_lsn;
824 }
825 // the last lsn is only accessed while holding output permission or else when the log file is old.
826 if (logger->write_log_files) {
827 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
828 }
829 release_output(logger, fsynced_lsn);
830 }
831}
832
833static void
834logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn)
835// Entry: Holds the input lock and permission to modify output.
836// Exit: Holds only the permission to modify output.
837// Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync.
838// Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
839{
840 swap_inbuf_outbuf(logger);
841 ml_unlock(&logger->input_lock);
842 write_outbuf_to_logfile(logger, fsynced_lsn);
843 if (logger->write_log_files) {
844 toku_file_fsync_without_accounting(logger->fd);
845 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // t:2294
846 }
847}
848
849int toku_logger_restart(TOKULOGGER logger, LSN lastlsn)
850// Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start).
851{
852 int r;
853
854 // flush out the log buffer
855 LSN fsynced_lsn;
856 grab_output(logger, &fsynced_lsn);
857 ml_lock(&logger->input_lock);
858 logger_write_buffer(logger, &fsynced_lsn);
859
860 // close the log file
861 if (logger->write_log_files) { // fsyncs don't work to /dev/null
862 toku_file_fsync_without_accounting(logger->fd);
863 }
864 r = toku_os_close(logger->fd);
865 assert(r == 0);
866 logger->fd = -1;
867
868 // reset the LSN's to the lastlsn when the logger was opened
869 logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn;
870 logger->write_log_files = true;
871 logger->trim_log_files = true;
872
873 // open a new log file
874 r = open_logfile(logger);
875 release_output(logger, fsynced_lsn);
876 return r;
877}
878
879// fname is the iname
880void toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, uint32_t mode,
881 uint32_t treeflags, uint32_t nodesize, uint32_t basementnodesize,
882 enum toku_compression_method compression_method) {
883 if (txn) {
884 BYTESTRING bs_fname = { .len = (uint32_t) strlen(fname), .data = (char *) fname };
885 // fsync log on fcreate
886 toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum,
887 bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
888 }
889}
890
891
892// We only do fdelete on open ft's, so we pass the filenum here
893void toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
894 if (txn) {
895 //No fsync.
896 toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum);
897 }
898}
899
900
901
902/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
903void toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags) {
904 if (txn) {
905 BYTESTRING bs;
906 bs.len = strlen(fname);
907 bs.data = (char*)fname;
908 toku_log_fopen (txn->logger, (LSN*)0, 0, bs, filenum, treeflags);
909 }
910}
911
912static int toku_fread_uint8_t_nocrclen (FILE *f, uint8_t *v) {
913 int vi=fgetc(f);
914 if (vi==EOF) return -1;
915 uint8_t vc=(uint8_t)vi;
916 *v = vc;
917 return 0;
918}
919
920int toku_fread_uint8_t (FILE *f, uint8_t *v, struct x1764 *mm, uint32_t *len) {
921 int vi=fgetc(f);
922 if (vi==EOF) return -1;
923 uint8_t vc=(uint8_t)vi;
924 toku_x1764_add(mm, &vc, 1);
925 (*len)++;
926 *v = vc;
927 return 0;
928}
929
930int toku_fread_uint32_t_nocrclen (FILE *f, uint32_t *v) {
931 uint32_t result;
932 uint8_t *cp = (uint8_t*)&result;
933 int r;
934 r = toku_fread_uint8_t_nocrclen (f, cp+0); if (r!=0) return r;
935 r = toku_fread_uint8_t_nocrclen (f, cp+1); if (r!=0) return r;
936 r = toku_fread_uint8_t_nocrclen (f, cp+2); if (r!=0) return r;
937 r = toku_fread_uint8_t_nocrclen (f, cp+3); if (r!=0) return r;
938 *v = toku_dtoh32(result);
939
940 return 0;
941}
942int toku_fread_uint32_t (FILE *f, uint32_t *v, struct x1764 *checksum, uint32_t *len) {
943 uint32_t result;
944 uint8_t *cp = (uint8_t*)&result;
945 int r;
946 r = toku_fread_uint8_t (f, cp+0, checksum, len); if(r!=0) return r;
947 r = toku_fread_uint8_t (f, cp+1, checksum, len); if(r!=0) return r;
948 r = toku_fread_uint8_t (f, cp+2, checksum, len); if(r!=0) return r;
949 r = toku_fread_uint8_t (f, cp+3, checksum, len); if(r!=0) return r;
950 *v = toku_dtoh32(result);
951 return 0;
952}
953
954int toku_fread_uint64_t (FILE *f, uint64_t *v, struct x1764 *checksum, uint32_t *len) {
955 uint32_t v1,v2;
956 int r;
957 r=toku_fread_uint32_t(f, &v1, checksum, len); if (r!=0) return r;
958 r=toku_fread_uint32_t(f, &v2, checksum, len); if (r!=0) return r;
959 *v = (((uint64_t)v1)<<32 ) | ((uint64_t)v2);
960 return 0;
961}
962
963int toku_fread_bool (FILE *f, bool *v, struct x1764 *mm, uint32_t *len) {
964 uint8_t iv;
965 int r = toku_fread_uint8_t(f, &iv, mm, len);
966 if (r == 0) {
967 *v = (iv!=0);
968 }
969 return r;
970}
971
972int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, uint32_t *len) {
973 return toku_fread_uint64_t (f, &lsn->lsn, checksum, len);
974}
975
976int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *b, struct x1764 *checksum, uint32_t *len) {
977 return toku_fread_uint64_t (f, (uint64_t*)&b->b, checksum, len);
978}
979
980int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, uint32_t *len) {
981 return toku_fread_uint32_t (f, &filenum->fileid, checksum, len);
982}
983
984int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, uint32_t *len) {
985 return toku_fread_uint64_t (f, txnid, checksum, len);
986}
987
988int toku_fread_TXNID_PAIR (FILE *f, TXNID_PAIR *txnid, struct x1764 *checksum, uint32_t *len) {
989 TXNID parent;
990 TXNID child;
991 int r;
992 r = toku_fread_TXNID(f, &parent, checksum, len); if (r != 0) { return r; }
993 r = toku_fread_TXNID(f, &child, checksum, len); if (r != 0) { return r; }
994 txnid->parent_id64 = parent;
995 txnid->child_id64 = child;
996 return 0;
997}
998
999
1000int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, uint32_t *len) {
1001 // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively.
1002 TOKU_XA_XID *XMALLOC(xid);
1003 {
1004 uint32_t formatID;
1005 int r = toku_fread_uint32_t(f, &formatID, checksum, len);
1006 if (r!=0) return r;
1007 xid->formatID = formatID;
1008 }
1009 {
1010 uint8_t gtrid_length;
1011 int r = toku_fread_uint8_t (f, &gtrid_length, checksum, len);
1012 if (r!=0) return r;
1013 xid->gtrid_length = gtrid_length;
1014 }
1015 {
1016 uint8_t bqual_length;
1017 int r = toku_fread_uint8_t (f, &bqual_length, checksum, len);
1018 if (r!=0) return r;
1019 xid->bqual_length = bqual_length;
1020 }
1021 for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) {
1022 uint8_t byte;
1023 int r = toku_fread_uint8_t(f, &byte, checksum, len);
1024 if (r!=0) return r;
1025 xid->data[i] = byte;
1026 }
1027 *xidp = xid;
1028 return 0;
1029}
1030
1031// fills in the bs with malloced data.
1032int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, uint32_t *len) {
1033 int r=toku_fread_uint32_t(f, (uint32_t*)&bs->len, checksum, len);
1034 if (r!=0) return r;
1035 XMALLOC_N(bs->len, bs->data);
1036 uint32_t i;
1037 for (i=0; i<bs->len; i++) {
1038 r=toku_fread_uint8_t(f, (uint8_t*)&bs->data[i], checksum, len);
1039 if (r!=0) {
1040 toku_free(bs->data);
1041 bs->data=0;
1042 return r;
1043 }
1044 }
1045 return 0;
1046}
1047
1048// fills in the fs with malloced data.
1049int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, uint32_t *len) {
1050 int r=toku_fread_uint32_t(f, (uint32_t*)&fs->num, checksum, len);
1051 if (r!=0) return r;
1052 XMALLOC_N(fs->num, fs->filenums);
1053 uint32_t i;
1054 for (i=0; i<fs->num; i++) {
1055 r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len);
1056 if (r!=0) {
1057 toku_free(fs->filenums);
1058 fs->filenums=0;
1059 return r;
1060 }
1061 }
1062 return 0;
1063}
1064
1065int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1066 LSN v;
1067 int r = toku_fread_LSN(inf, &v, checksum, len);
1068 if (r!=0) return r;
1069 fprintf(outf, " %s=%" PRIu64, fieldname, v.lsn);
1070 return 0;
1071}
1072
1073int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1074 TXNID v;
1075 int r = toku_fread_TXNID(inf, &v, checksum, len);
1076 if (r!=0) return r;
1077 fprintf(outf, " %s=%" PRIu64, fieldname, v);
1078 return 0;
1079}
1080
1081int toku_logprint_TXNID_PAIR (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1082 TXNID_PAIR v;
1083 int r = toku_fread_TXNID_PAIR(inf, &v, checksum, len);
1084 if (r!=0) return r;
1085 fprintf(outf, " %s=%" PRIu64 ",%" PRIu64, fieldname, v.parent_id64, v.child_id64);
1086 return 0;
1087}
1088
1089int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1090 XIDP vp;
1091 int r = toku_fread_XIDP(inf, &vp, checksum, len);
1092 if (r!=0) return r;
1093 fprintf(outf, " %s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length);
1094 toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data);
1095 fprintf(outf, "}");
1096 toku_free(vp);
1097 return 0;
1098}
1099
1100int toku_logprint_uint8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1101 uint8_t v;
1102 int r = toku_fread_uint8_t(inf, &v, checksum, len);
1103 if (r!=0) return r;
1104 fprintf(outf, " %s=%d", fieldname, v);
1105 if (format) fprintf(outf, format, v);
1106 else if (v=='\'') fprintf(outf, "('\'')");
1107 else if (isprint(v)) fprintf(outf, "('%c')", v);
1108 else {}/*nothing*/
1109 return 0;
1110}
1111
1112int toku_logprint_uint32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1113 uint32_t v;
1114 int r = toku_fread_uint32_t(inf, &v, checksum, len);
1115 if (r!=0) return r;
1116 fprintf(outf, " %s=", fieldname);
1117 fprintf(outf, format ? format : "%d", v);
1118 return 0;
1119}
1120
1121int toku_logprint_uint64_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1122 uint64_t v;
1123 int r = toku_fread_uint64_t(inf, &v, checksum, len);
1124 if (r!=0) return r;
1125 fprintf(outf, " %s=", fieldname);
1126 fprintf(outf, format ? format : "%" PRId64, v);
1127 return 0;
1128}
1129
1130int toku_logprint_bool (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1131 bool v;
1132 int r = toku_fread_bool(inf, &v, checksum, len);
1133 if (r!=0) return r;
1134 fprintf(outf, " %s=%s", fieldname, v ? "true" : "false");
1135 return 0;
1136
1137}
1138
1139void toku_print_BYTESTRING (FILE *outf, uint32_t len, char *data) {
1140 fprintf(outf, "{len=%u data=", len);
1141 toku_print_bytes(outf, len, data);
1142 fprintf(outf, "}");
1143
1144}
1145
1146int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1147 BYTESTRING bs;
1148 int r = toku_fread_BYTESTRING(inf, &bs, checksum, len);
1149 if (r!=0) return r;
1150 fprintf(outf, " %s=", fieldname);
1151 toku_print_BYTESTRING(outf, bs.len, bs.data);
1152 toku_free(bs.data);
1153 return 0;
1154}
1155
1156int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1157 return toku_logprint_uint64_t(outf, inf, fieldname, checksum, len, format);
1158
1159}
1160
1161int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1162 return toku_logprint_uint32_t(outf, inf, fieldname, checksum, len, format);
1163
1164}
1165
1166static void
1167toku_print_FILENUMS (FILE *outf, uint32_t num, FILENUM *filenums) {
1168 fprintf(outf, "{num=%u filenums=\"", num);
1169 uint32_t i;
1170 for (i=0; i<num; i++) {
1171 if (i>0)
1172 fprintf(outf, ",");
1173 fprintf(outf, "0x%" PRIx32, filenums[i].fileid);
1174 }
1175 fprintf(outf, "\"}");
1176
1177}
1178
1179int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1180 FILENUMS bs;
1181 int r = toku_fread_FILENUMS(inf, &bs, checksum, len);
1182 if (r!=0) return r;
1183 fprintf(outf, " %s=", fieldname);
1184 toku_print_FILENUMS(outf, bs.num, bs.filenums);
1185 toku_free(bs.filenums);
1186 return 0;
1187}
1188
1189int toku_read_and_print_logmagic (FILE *f, uint32_t *versionp) {
1190 {
1191 char magic[8];
1192 int r=fread(magic, 1, 8, f);
1193 if (r!=8) {
1194 return DB_BADFORMAT;
1195 }
1196 if (memcmp(magic, "tokulogg", 8)!=0) {
1197 return DB_BADFORMAT;
1198 }
1199 }
1200 {
1201 int version;
1202 int r=fread(&version, 1, 4, f);
1203 if (r!=4) {
1204 return DB_BADFORMAT;
1205 }
1206 printf("tokulog v.%u\n", toku_ntohl(version));
1207 //version MUST be in network order regardless of disk order
1208 *versionp=toku_ntohl(version);
1209 }
1210 return 0;
1211}
1212
1213int toku_read_logmagic (FILE *f, uint32_t *versionp) {
1214 {
1215 char magic[8];
1216 int r=fread(magic, 1, 8, f);
1217 if (r!=8) {
1218 return DB_BADFORMAT;
1219 }
1220 if (memcmp(magic, "tokulogg", 8)!=0) {
1221 return DB_BADFORMAT;
1222 }
1223 }
1224 {
1225 int version;
1226 int r=fread(&version, 1, 4, f);
1227 if (r!=4) {
1228 return DB_BADFORMAT;
1229 }
1230 *versionp=toku_ntohl(version);
1231 }
1232 return 0;
1233}
1234
1235TXNID_PAIR toku_txn_get_txnid (TOKUTXN txn) {
1236 TXNID_PAIR tp = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE};
1237 if (txn==0) return tp;
1238 else return txn->txnid;
1239}
1240
1241LSN toku_logger_last_lsn(TOKULOGGER logger) {
1242 return logger->lsn;
1243}
1244
1245TOKULOGGER toku_txn_logger (TOKUTXN txn) {
1246 return txn ? txn->logger : 0;
1247}
1248
1249void toku_txnid2txn(TOKULOGGER logger, TXNID_PAIR txnid, TOKUTXN *result) {
1250 TOKUTXN root_txn = NULL;
1251 toku_txn_manager_suspend(logger->txn_manager);
1252 toku_txn_manager_id2txn_unlocked(logger->txn_manager, txnid, &root_txn);
1253 if (root_txn == NULL || root_txn->txnid.child_id64 == txnid.child_id64) {
1254 *result = root_txn;
1255 }
1256 else if (root_txn != NULL) {
1257 root_txn->child_manager->suspend();
1258 root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid, result);
1259 root_txn->child_manager->resume();
1260 }
1261 toku_txn_manager_resume(logger->txn_manager);
1262}
1263
1264// Find the earliest LSN in a log. No locks are needed.
1265static int peek_at_log(TOKULOGGER logger, char *filename, LSN *first_lsn) {
1266 int fd = toku_os_open(
1267 filename, O_RDONLY + O_BINARY, S_IRUSR, *tokudb_file_log_key);
1268 if (fd < 0) {
1269 int er = get_error_errno();
1270 if (logger->write_log_files)
1271 printf("couldn't open: %s\n", strerror(er));
1272 return er;
1273 }
1274 enum { SKIP = 12+1+4 }; // read the 12 byte header, the first message, and the first len
1275 unsigned char header[SKIP+8];
1276 int r = read(fd, header, SKIP+8);
1277 if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no. If a later-log is archivable is then this one will be too.
1278
1279 uint64_t lsn;
1280 {
1281 struct rbuf rb;
1282 rb.buf = header+SKIP;
1283 rb.size = 8;
1284 rb.ndone = 0;
1285 lsn = rbuf_ulonglong(&rb);
1286 }
1287
1288 r = toku_os_close(fd);
1289
1290 if (r != 0) {
1291 return 0;
1292 }
1293
1294 first_lsn->lsn = lsn;
1295 return 0;
1296}
1297
1298// Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
1299// Output permission are obtained briefly so we can get a list of the log files without conflicting.
1300int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
1301 if (flags!=0) return EINVAL; // don't know what to do.
1302 int all_n_logs;
1303 int i;
1304 char **all_logs;
1305 int n_logfiles;
1306 LSN fsynced_lsn;
1307 grab_output(logger, &fsynced_lsn);
1308 int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles);
1309 release_output(logger, fsynced_lsn);
1310 if (r!=0) return r;
1311
1312 for (i=0; all_logs[i]; i++);
1313 all_n_logs=i;
1314 // get them into increasing order
1315 qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
1316
1317 LSN save_lsn = logger->last_completed_checkpoint_lsn;
1318
1319 // Now starting at the last one, look for archivable ones.
1320 // Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...)
1321 LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)};
1322 r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log
1323 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1324 i=all_n_logs-1;
1325 } else {
1326 for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
1327 r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile);
1328 if (r!=0) continue; // In case of error, just keep going
1329
1330 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1331 break;
1332 }
1333 }
1334 }
1335
1336 // all log files up to, but but not including, i can be archived.
1337 int n_to_archive=i;
1338 int count_bytes=0;
1339 for (i=0; i<n_to_archive; i++) {
1340 count_bytes+=1+strlen(all_logs[i]);
1341 }
1342 char **result;
1343 if (i==0) {
1344 result=0;
1345 } else {
1346 CAST_FROM_VOIDP(result, toku_xmalloc((1+n_to_archive)*sizeof(*result) + count_bytes));
1347 char *base = (char*)(result+1+n_to_archive);
1348 for (i=0; i<n_to_archive; i++) {
1349 int len=1+strlen(all_logs[i]);
1350 result[i]=base;
1351 memcpy(base, all_logs[i], len);
1352 base+=len;
1353 }
1354 result[n_to_archive]=0;
1355 }
1356 for (i=0; all_logs[i]; i++) {
1357 toku_free(all_logs[i]);
1358 }
1359 toku_free(all_logs);
1360 *logs_p = result;
1361 return 0;
1362}
1363
1364
1365TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
1366 return txn->parent;
1367}
1368
1369void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
1370 logger->last_completed_checkpoint_lsn = lsn;
1371}
1372
1373void
1374toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
1375 log_status.init();
1376 if (logger) {
1377 LOG_STATUS_VAL(LOGGER_NEXT_LSN) = logger->lsn.lsn;
1378 LOG_STATUS_VAL(LOGGER_NUM_WRITES) = logger->num_writes_to_disk;
1379 LOG_STATUS_VAL(LOGGER_BYTES_WRITTEN) = logger->bytes_written_to_disk;
1380 // No compression on logfiles so the uncompressed size is just number of bytes written
1381 LOG_STATUS_VAL(LOGGER_UNCOMPRESSED_BYTES_WRITTEN) = logger->bytes_written_to_disk;
1382 LOG_STATUS_VAL(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk;
1383 LOG_STATUS_VAL(LOGGER_WAIT_BUF_LONG) = logger->num_wait_buf_long;
1384 }
1385 *statp = log_status;
1386}
1387
1388
1389
1390//////////////////////////////////////////////////////////////////////////////////////////////////////
1391// Used for upgrade:
1392// if any valid log files exist in log_dir, then
1393// set *found_any_logs to true and set *version_found to version number of latest log
1394int
1395toku_get_version_of_logs_on_disk(const char *log_dir, bool *found_any_logs, uint32_t *version_found) {
1396 bool found = false;
1397 uint32_t highest_version = 0;
1398 int r = 0;
1399
1400 struct dirent *de;
1401 DIR *d=opendir(log_dir);
1402 if (d==NULL) {
1403 r = get_error_errno();
1404 }
1405 else {
1406 // Examine every file in the directory and find highest version
1407 while ((de=readdir(d))) {
1408 uint32_t this_log_version;
1409 uint64_t this_log_number;
1410 bool is_log = is_a_logfile_any_version(de->d_name, &this_log_number, &this_log_version);
1411 if (is_log) {
1412 if (!found) { // first log file found
1413 found = true;
1414 highest_version = this_log_version;
1415 }
1416 else
1417 highest_version = highest_version > this_log_version ? highest_version : this_log_version;
1418 }
1419 }
1420 int r2 = closedir(d);
1421 if (r==0) r = r2;
1422 }
1423 if (r==0) {
1424 *found_any_logs = found;
1425 if (found)
1426 *version_found = highest_version;
1427 }
1428 return r;
1429}
1430
1431TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) {
1432 return logger->txn_manager;
1433}
1434