1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39extern const char *toku_patent_string;
40const char *toku_copyright_string = "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.";
41
42#include <my_global.h>
43#include <db.h>
44#include <errno.h>
45#include <string.h>
46
47#include "portability/memory.h"
48#include "portability/toku_assert.h"
49#include "portability/toku_portability.h"
50#include "portability/toku_pthread.h"
51#include "portability/toku_stdlib.h"
52
53#include "ft/ft-flusher.h"
54#include "ft/cachetable/cachetable.h"
55#include "ft/cachetable/checkpoint.h"
56#include "ft/logger/log.h"
57#include "ft/loader/loader.h"
58#include "ft/log_header.h"
59#include "ft/ft.h"
60#include "ft/txn/txn_manager.h"
61#include "src/ydb.h"
62#include "src/ydb-internal.h"
63#include "src/ydb_cursor.h"
64#include "src/ydb_row_lock.h"
65#include "src/ydb_env_func.h"
66#include "src/ydb_db.h"
67#include "src/ydb_write.h"
68#include "src/ydb_txn.h"
69#include "src/loader.h"
70#include "src/indexer.h"
71#include "util/status.h"
72#include "util/context.h"
73
74#include <functional>
75
76// Include ydb_lib.cc here so that its constructor/destructor gets put into
77// ydb.o, to make sure they don't get erased at link time (when linking to
78// a static libtokufractaltree.a that was compiled with gcc). See #5094.
79#include "ydb_lib.cc"
80
81#ifdef TOKUTRACE
82 #define DB_ENV_CREATE_FUN db_env_create_toku10
83 #define DB_CREATE_FUN db_create_toku10
84#else
85 #define DB_ENV_CREATE_FUN db_env_create
86 #define DB_CREATE_FUN db_create
87 int toku_set_trace_file (const char *fname __attribute__((__unused__))) { return 0; }
88 int toku_close_trace_file (void) { return 0; }
89#endif
90
91// Set when env is panicked, never cleared.
92static int env_is_panicked = 0;
93
94void
95env_panic(DB_ENV * env, int cause, const char * msg) {
96 if (cause == 0)
97 cause = -1; // if unknown cause, at least guarantee panic
98 if (msg == NULL)
99 msg = "Unknown cause in env_panic\n";
100 env_is_panicked = cause;
101 env->i->is_panicked = cause;
102 env->i->panic_string = toku_strdup(msg);
103}
104
105static int env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp);
106
107/********************************************************************************
108 * Status is intended for display to humans to help understand system behavior.
109 * It does not need to be perfectly thread-safe.
110 */
111
112typedef enum {
113 YDB_LAYER_TIME_CREATION = 0, /* timestamp of environment creation, read from persistent environment */
114 YDB_LAYER_TIME_STARTUP, /* timestamp of system startup */
115 YDB_LAYER_TIME_NOW, /* timestamp of engine status query */
116 YDB_LAYER_NUM_DB_OPEN,
117 YDB_LAYER_NUM_DB_CLOSE,
118 YDB_LAYER_NUM_OPEN_DBS,
119 YDB_LAYER_MAX_OPEN_DBS,
120 YDB_LAYER_FSYNC_LOG_PERIOD,
121#if 0
122 YDB_LAYER_ORIGINAL_ENV_VERSION, /* version of original environment, read from persistent environment */
123 YDB_LAYER_STARTUP_ENV_VERSION, /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
124 YDB_LAYER_LAST_LSN_OF_V13, /* read from persistent environment */
125 YDB_LAYER_UPGRADE_V14_TIME, /* timestamp of upgrade to version 14, read from persistent environment */
126 YDB_LAYER_UPGRADE_V14_FOOTPRINT, /* footprint of upgrade to version 14, read from persistent environment */
127#endif
128 YDB_LAYER_STATUS_NUM_ROWS /* number of rows in this status array */
129} ydb_layer_status_entry;
130
131typedef struct {
132 bool initialized;
133 TOKU_ENGINE_STATUS_ROW_S status[YDB_LAYER_STATUS_NUM_ROWS];
134} YDB_LAYER_STATUS_S, *YDB_LAYER_STATUS;
135
136static YDB_LAYER_STATUS_S ydb_layer_status;
137#define STATUS_VALUE(x) ydb_layer_status.status[x].value.num
138
139#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_layer_status, k, c, t, l, inc)
140
141static void
142ydb_layer_status_init (void) {
143 // Note, this function initializes the keyname, type, and legend fields.
144 // Value fields are initialized to zero by compiler.
145
146 STATUS_INIT(YDB_LAYER_TIME_CREATION, nullptr, UNIXTIME, "time of environment creation", TOKU_ENGINE_STATUS);
147 STATUS_INIT(YDB_LAYER_TIME_STARTUP, nullptr, UNIXTIME, "time of engine startup", TOKU_ENGINE_STATUS);
148 STATUS_INIT(YDB_LAYER_TIME_NOW, nullptr, UNIXTIME, "time now", TOKU_ENGINE_STATUS);
149 STATUS_INIT(YDB_LAYER_NUM_DB_OPEN, DB_OPENS, UINT64, "db opens", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
150 STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE, DB_CLOSES, UINT64, "db closes", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
151 STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS, DB_OPEN_CURRENT, UINT64, "num open dbs now", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
152 STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS, DB_OPEN_MAX, UINT64, "max open dbs", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
153 STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD, nullptr, UINT64, "period, in ms, that recovery log is automatically fsynced", TOKU_ENGINE_STATUS);
154
155 STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
156 ydb_layer_status.initialized = true;
157}
158#undef STATUS_INIT
159
160static void
161ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
162 STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
163 STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
164 *statp = ydb_layer_status;
165}
166
167/********************************************************************************
168 * End of ydb_layer local status section.
169 */
170
171static DB_ENV * volatile most_recent_env; // most recently opened env, used for engine status on crash. Note there are likely to be races on this if you have multiple threads creating and closing environments in parallel. We'll declare it volatile since at least that helps make sure the compiler doesn't optimize away certain code (e.g., if while debugging, you write a code that spins on most_recent_env, you'd like to compiler not to optimize your code away.)
172
173static int env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt);
174static int toku_maybe_get_engine_status_text (char* buff, int buffsize); // for use by toku_assert
175static int toku_maybe_err_engine_status (void);
176static void toku_maybe_set_env_panic(int code, const char * msg); // for use by toku_assert
177
178int
179toku_ydb_init(void) {
180 int r = 0;
181 //Lower level must be initialized first.
182 r = toku_ft_layer_init();
183 return r;
184}
185
186// Do not clean up resources if env is panicked, just exit ugly
187void
188toku_ydb_destroy(void) {
189 if (env_is_panicked == 0) {
190 toku_ft_layer_destroy();
191 }
192}
193
194static int
195ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
196 return 0;
197}
198
199/* env methods */
200
201static void
202env_fs_report_in_yellow(DB_ENV *UU(env)) {
203 char tbuf[26];
204 time_t tnow = time(NULL);
205 fprintf(stderr, "%.24s PerconaFT file system space is low\n", ctime_r(&tnow, tbuf)); fflush(stderr);
206}
207
208static void
209env_fs_report_in_red(DB_ENV *UU(env)) {
210 char tbuf[26];
211 time_t tnow = time(NULL);
212 fprintf(stderr, "%.24s PerconaFT file system space is really low and access is restricted\n", ctime_r(&tnow, tbuf)); fflush(stderr);
213}
214
215static inline uint64_t
216env_fs_redzone(DB_ENV *env, uint64_t total) {
217 return total * env->i->redzone / 100;
218}
219
220#define ZONEREPORTLIMIT 12
221// Check the available space in the file systems used by tokuft and erect barriers when available space gets low.
222static int
223env_fs_poller(void *arg) {
224 DB_ENV *env = (DB_ENV *) arg;
225 int r;
226
227 int in_yellow; // set true to issue warning to user
228 int in_red; // set true to prevent certain operations (returning ENOSPC)
229
230 // get the fs sizes for the home dir
231 uint64_t avail_size = 0, total_size = 0;
232 r = toku_get_filesystem_sizes(env->i->dir, &avail_size, NULL, &total_size);
233 assert(r == 0);
234 in_yellow = (avail_size < 2 * env_fs_redzone(env, total_size));
235 in_red = (avail_size < env_fs_redzone(env, total_size));
236
237 // get the fs sizes for the data dir if different than the home dir
238 if (strcmp(env->i->dir, env->i->real_data_dir) != 0) {
239 r = toku_get_filesystem_sizes(env->i->real_data_dir, &avail_size, NULL, &total_size);
240 assert(r == 0);
241 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
242 in_red += (avail_size < env_fs_redzone(env, total_size));
243 }
244
245 // get the fs sizes for the log dir if different than the home dir and data dir
246 if (strcmp(env->i->dir, env->i->real_log_dir) != 0 && strcmp(env->i->real_data_dir, env->i->real_log_dir) != 0) {
247 r = toku_get_filesystem_sizes(env->i->real_log_dir, &avail_size, NULL, &total_size);
248 assert(r == 0);
249 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
250 in_red += (avail_size < env_fs_redzone(env, total_size));
251 }
252
253 env->i->fs_seq++; // how many times through this polling loop?
254 uint64_t now = env->i->fs_seq;
255
256 // Don't issue report if we have not been out of this fs_state for a while, unless we're at system startup
257 switch (env->i->fs_state) {
258 case FS_RED:
259 if (!in_red) {
260 if (in_yellow) {
261 env->i->fs_state = FS_YELLOW;
262 } else {
263 env->i->fs_state = FS_GREEN;
264 }
265 }
266 break;
267 case FS_YELLOW:
268 if (in_red) {
269 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
270 env_fs_report_in_red(env);
271 env->i->fs_state = FS_RED;
272 env->i->last_seq_entered_red = now;
273 } else if (!in_yellow) {
274 env->i->fs_state = FS_GREEN;
275 }
276 break;
277 case FS_GREEN:
278 if (in_red) {
279 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
280 env_fs_report_in_red(env);
281 env->i->fs_state = FS_RED;
282 env->i->last_seq_entered_red = now;
283 } else if (in_yellow) {
284 if ((now - env->i->last_seq_entered_yellow > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
285 env_fs_report_in_yellow(env);
286 env->i->fs_state = FS_YELLOW;
287 env->i->last_seq_entered_yellow = now;
288 }
289 break;
290 default:
291 assert(0);
292 }
293 return 0;
294}
295#undef ZONEREPORTLIMIT
296
297static void
298env_fs_init(DB_ENV *env) {
299 env->i->fs_state = FS_GREEN;
300 env->i->fs_poll_time = 5; // seconds
301 env->i->redzone = 5; // percent of total space
302 env->i->fs_poller_is_init = false;
303}
304
305// Initialize the minicron that polls file system space
306static int
307env_fs_init_minicron(DB_ENV *env) {
308 int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
309 if (r == 0)
310 env->i->fs_poller_is_init = true;
311 return r;
312}
313
314// Destroy the file system space minicron
315static void
316env_fs_destroy(DB_ENV *env) {
317 if (env->i->fs_poller_is_init) {
318 int r = toku_minicron_shutdown(&env->i->fs_poller);
319 assert(r == 0);
320 env->i->fs_poller_is_init = false;
321 }
322}
323
324static int
325env_fsync_log_on_minicron(void *arg) {
326 DB_ENV *env = (DB_ENV *) arg;
327 int r = env->log_flush(env, 0);
328 assert(r == 0);
329 return 0;
330}
331
332static void
333env_fsync_log_init(DB_ENV *env) {
334 env->i->fsync_log_period_ms = 0;
335 env->i->fsync_log_cron_is_init = false;
336}
337
338static void UU()
339env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
340 env->i->fsync_log_period_ms = period_ms;
341 if (env->i->fsync_log_cron_is_init) {
342 toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
343 }
344}
345
346static int
347env_fsync_log_cron_init(DB_ENV *env) {
348 int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
349 if (r == 0)
350 env->i->fsync_log_cron_is_init = true;
351 return r;
352}
353
354static void
355env_fsync_log_cron_destroy(DB_ENV *env) {
356 if (env->i->fsync_log_cron_is_init) {
357 int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
358 assert(r == 0);
359 env->i->fsync_log_cron_is_init = false;
360 }
361}
362
363static void
364env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
365 toku_free(*real_dir);
366 *real_dir = NULL;
367
368 assert(env->i->dir);
369 if (nominal_dir)
370 *real_dir = toku_construct_full_name(2, env->i->dir, nominal_dir);
371 else
372 *real_dir = toku_strdup(env->i->dir);
373}
374
375static void
376env_setup_real_data_dir(DB_ENV *env) {
377 env_setup_real_dir(env, &env->i->real_data_dir, env->i->data_dir);
378}
379
380static void
381env_setup_real_log_dir(DB_ENV *env) {
382 env_setup_real_dir(env, &env->i->real_log_dir, env->i->lg_dir);
383}
384
385static void
386env_setup_real_tmp_dir(DB_ENV *env) {
387 env_setup_real_dir(env, &env->i->real_tmp_dir, env->i->tmp_dir);
388}
389
390static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
391{
392 env->i->cachetable = cachetable;
393}
394
395static int
396ydb_do_recovery (DB_ENV *env) {
397 assert(env->i->real_log_dir);
398 int r = tokuft_recover(env,
399 toku_keep_prepared_txn_callback,
400 keep_cachetable_callback,
401 env->i->logger,
402 env->i->dir, env->i->real_log_dir, env->i->bt_compare,
403 env->i->update_function,
404 env->i->generate_row_for_put, env->i->generate_row_for_del,
405 env->i->cachetable_size);
406 return r;
407}
408
409static int
410needs_recovery (DB_ENV *env) {
411 assert(env->i->real_log_dir);
412 int recovery_needed = tokuft_needs_recovery(env->i->real_log_dir, true);
413 return recovery_needed ? DB_RUNRECOVERY : 0;
414}
415
416static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, uint32_t flags);
417
418// Keys used in persistent environment dictionary:
419// Following keys added in version 12
420static const char * orig_env_ver_key = "original_version";
421static const char * curr_env_ver_key = "current_version";
422// Following keys added in version 14, add more keys for future versions
423static const char * creation_time_key = "creation_time";
424
425static char * get_upgrade_time_key(int version) {
426 static char upgrade_time_key[sizeof("upgrade_v_time") + 12];
427 {
428 int n;
429 n = snprintf(upgrade_time_key, sizeof(upgrade_time_key), "upgrade_v%d_time", version);
430 assert(n >= 0 && n < (int)sizeof(upgrade_time_key));
431 }
432 return &upgrade_time_key[0];
433}
434
435static char * get_upgrade_footprint_key(int version) {
436 static char upgrade_footprint_key[sizeof("upgrade_v_footprint") + 12];
437 {
438 int n;
439 n = snprintf(upgrade_footprint_key, sizeof(upgrade_footprint_key), "upgrade_v%d_footprint", version);
440 assert(n >= 0 && n < (int)sizeof(upgrade_footprint_key));
441 }
442 return &upgrade_footprint_key[0];
443}
444
445static char * get_upgrade_last_lsn_key(int version) {
446 static char upgrade_last_lsn_key[sizeof("upgrade_v_last_lsn") + 12];
447 {
448 int n;
449 n = snprintf(upgrade_last_lsn_key, sizeof(upgrade_last_lsn_key), "upgrade_v%d_last_lsn", version);
450 assert(n >= 0 && n < (int)sizeof(upgrade_last_lsn_key));
451 }
452 return &upgrade_last_lsn_key[0];
453}
454
455// Values read from (or written into) persistent environment,
456// kept here for read-only access from engine status.
457// Note, persistent_upgrade_status info is separate in part to simplify its exclusion from engine status until relevant.
458typedef enum {
459 PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION = 0,
460 PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, // read from curr_env_ver_key, prev version as of this startup
461 PERSISTENT_UPGRADE_LAST_LSN_OF_V13,
462 PERSISTENT_UPGRADE_V14_TIME,
463 PERSISTENT_UPGRADE_V14_FOOTPRINT,
464 PERSISTENT_UPGRADE_STATUS_NUM_ROWS
465} persistent_upgrade_status_entry;
466
467typedef struct {
468 bool initialized;
469 TOKU_ENGINE_STATUS_ROW_S status[PERSISTENT_UPGRADE_STATUS_NUM_ROWS];
470} PERSISTENT_UPGRADE_STATUS_S, *PERSISTENT_UPGRADE_STATUS;
471
472static PERSISTENT_UPGRADE_STATUS_S persistent_upgrade_status;
473
474#define PERSISTENT_UPGRADE_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(persistent_upgrade_status, k, c, t, "upgrade: " l, inc)
475
476static void
477persistent_upgrade_status_init (void) {
478 // Note, this function initializes the keyname, type, and legend fields.
479 // Value fields are initialized to zero by compiler.
480
481 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION, nullptr, UINT64, "original version (at time of environment creation)", TOKU_ENGINE_STATUS);
482 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, nullptr, UINT64, "version at time of startup", TOKU_ENGINE_STATUS);
483 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_LAST_LSN_OF_V13, nullptr, UINT64, "last LSN of version 13", TOKU_ENGINE_STATUS);
484 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_TIME, nullptr, UNIXTIME, "time of upgrade to version 14", TOKU_ENGINE_STATUS);
485 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_FOOTPRINT, nullptr, UINT64, "footprint from version 13 to 14", TOKU_ENGINE_STATUS);
486 persistent_upgrade_status.initialized = true;
487}
488
489#define PERSISTENT_UPGRADE_STATUS_VALUE(x) persistent_upgrade_status.status[x].value.num
490
491// Requires: persistent environment dictionary is already open.
492// Input arg is lsn of clean shutdown of previous version,
493// or ZERO_LSN if no upgrade or if crash between log upgrade and here.
494// NOTE: To maintain compatibility with previous versions, do not change the
495// format of any information stored in the persistent environment dictionary.
496// For example, some values are stored as 32 bits, even though they are immediately
497// converted to 64 bits when read. Do not change them to be stored as 64 bits.
498//
499static int
500maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN last_lsn_of_clean_shutdown_read_from_log) {
501 int r;
502 DBT key, val;
503 DB *persistent_environment = env->i->persistent_environment;
504
505 if (!persistent_upgrade_status.initialized)
506 persistent_upgrade_status_init();
507
508 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
509 toku_init_dbt(&val);
510 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
511 assert(r == 0);
512 uint32_t stored_env_version = toku_dtoh32(*(uint32_t*)val.data);
513 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP) = stored_env_version;
514 if (stored_env_version > FT_LAYOUT_VERSION)
515 r = TOKUDB_DICTIONARY_TOO_NEW;
516 else if (stored_env_version < FT_LAYOUT_MIN_SUPPORTED_VERSION)
517 r = TOKUDB_DICTIONARY_TOO_OLD;
518 else if (stored_env_version < FT_LAYOUT_VERSION) {
519 const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
520 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
521 toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
522 r = toku_db_put(persistent_environment, txn, &key, &val, 0, false);
523 assert_zero(r);
524
525 time_t upgrade_time_d = toku_htod64(time(NULL));
526 uint64_t upgrade_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
527 uint64_t upgrade_last_lsn_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
528 for (int version = stored_env_version+1; version <= FT_LAYOUT_VERSION; version++) {
529 uint32_t put_flag = DB_NOOVERWRITE;
530 if (version <= FT_LAYOUT_VERSION_19) {
531 // See #5902.
532 // To prevent a crash (and any higher complexity code) we'll simply
533 // silently not overwrite anything if it exists.
534 // The keys existing for version <= 19 is not necessarily an error.
535 // If this happens for versions > 19 it IS an error and we'll use DB_NOOVERWRITE.
536 put_flag = DB_NOOVERWRITE_NO_ERROR;
537 }
538
539
540 char* upgrade_time_key = get_upgrade_time_key(version);
541 toku_fill_dbt(&key, upgrade_time_key, strlen(upgrade_time_key));
542 toku_fill_dbt(&val, &upgrade_time_d, sizeof(upgrade_time_d));
543 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
544 assert_zero(r);
545
546 char* upgrade_footprint_key = get_upgrade_footprint_key(version);
547 toku_fill_dbt(&key, upgrade_footprint_key, strlen(upgrade_footprint_key));
548 toku_fill_dbt(&val, &upgrade_footprint_d, sizeof(upgrade_footprint_d));
549 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
550 assert_zero(r);
551
552 char* upgrade_last_lsn_key = get_upgrade_last_lsn_key(version);
553 toku_fill_dbt(&key, upgrade_last_lsn_key, strlen(upgrade_last_lsn_key));
554 toku_fill_dbt(&val, &upgrade_last_lsn_d, sizeof(upgrade_last_lsn_d));
555 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
556 assert_zero(r);
557 }
558
559 }
560 return r;
561}
562
563// Capture contents of persistent_environment dictionary so that it can be read by engine status
564static void
565capture_persistent_env_contents (DB_ENV * env, DB_TXN * txn) {
566 int r;
567 DBT key, val;
568 DB *persistent_environment = env->i->persistent_environment;
569
570 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
571 toku_init_dbt(&val);
572 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
573 assert_zero(r);
574 uint32_t curr_env_version = toku_dtoh32(*(uint32_t*)val.data);
575 assert(curr_env_version == FT_LAYOUT_VERSION);
576
577 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
578 toku_init_dbt(&val);
579 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
580 assert_zero(r);
581 uint64_t persistent_original_env_version = toku_dtoh32(*(uint32_t*)val.data);
582 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION) = persistent_original_env_version;
583 assert(persistent_original_env_version <= curr_env_version);
584
585 // make no assertions about timestamps, clock may have been reset
586 if (persistent_original_env_version >= FT_LAYOUT_VERSION_14) {
587 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
588 toku_init_dbt(&val);
589 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
590 assert_zero(r);
591 STATUS_VALUE(YDB_LAYER_TIME_CREATION) = toku_dtoh64((*(time_t*)val.data));
592 }
593
594 if (persistent_original_env_version != curr_env_version) {
595 // an upgrade was performed at some time, capture info about the upgrade
596
597 char * last_lsn_key = get_upgrade_last_lsn_key(curr_env_version);
598 toku_fill_dbt(&key, last_lsn_key, strlen(last_lsn_key));
599 toku_init_dbt(&val);
600 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
601 assert_zero(r);
602 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_LAST_LSN_OF_V13) = toku_dtoh64(*(uint64_t*)val.data);
603
604 char * time_key = get_upgrade_time_key(curr_env_version);
605 toku_fill_dbt(&key, time_key, strlen(time_key));
606 toku_init_dbt(&val);
607 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
608 assert_zero(r);
609 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_TIME) = toku_dtoh64(*(time_t*)val.data);
610
611 char * footprint_key = get_upgrade_footprint_key(curr_env_version);
612 toku_fill_dbt(&key, footprint_key, strlen(footprint_key));
613 toku_init_dbt(&val);
614 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
615 assert_zero(r);
616 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_FOOTPRINT) = toku_dtoh64(*(uint64_t*)val.data);
617 }
618
619}
620
621// return 0 if log exists or ENOENT if log does not exist
622static int
623ydb_recover_log_exists(DB_ENV *env) {
624 int r = tokuft_recover_log_exists(env->i->real_log_dir);
625 return r;
626}
627
628// Validate that all required files are present, no side effects.
629// Return 0 if all is well, ENOENT if some files are present but at least one is
630// missing,
631// other non-zero value if some other error occurs.
632// Set *valid_newenv if creating a new environment (all files missing).
633// (Note, if special dictionaries exist, then they were created transactionally
634// and log should exist.)
635static int validate_env(DB_ENV *env,
636 bool *valid_newenv,
637 bool need_rollback_cachefile) {
638 int r;
639 bool expect_newenv = false; // set true if we expect to create a new env
640 toku_struct_stat buf;
641 char *path = NULL;
642
643 // Test for persistent environment
644 path = toku_construct_full_name(
645 2, env->i->dir, toku_product_name_strings.environmentdictionary);
646 assert(path);
647 r = toku_stat(path, &buf, toku_uninstrumented);
648 if (r == 0) {
649 expect_newenv = false; // persistent info exists
650 } else {
651 int stat_errno = get_error_errno();
652 if (stat_errno == ENOENT) {
653 expect_newenv = true;
654 r = 0;
655 } else {
656 r = toku_ydb_do_error(
657 env,
658 stat_errno,
659 "Unable to access persistent environment [%s] in [%s]\n",
660 toku_product_name_strings.environmentdictionary,
661 env->i->dir);
662 assert(r);
663 }
664 }
665 toku_free(path);
666
667 // Test for existence of rollback cachefile if it is expected to exist
668 if (r == 0 && need_rollback_cachefile) {
669 path = toku_construct_full_name(
670 2, env->i->dir, toku_product_name_strings.rollback_cachefile);
671 assert(path);
672 r = toku_stat(path, &buf, toku_uninstrumented);
673 if (r == 0) {
674 if (expect_newenv) // rollback cachefile exists, but persistent env
675 // is missing
676 r = toku_ydb_do_error(
677 env,
678 ENOENT,
679 "Persistent environment is missing while looking for "
680 "rollback cachefile [%s] in [%s]\n",
681 toku_product_name_strings.rollback_cachefile, env->i->dir);
682 } else {
683 int stat_errno = get_error_errno();
684 if (stat_errno == ENOENT) {
685 if (!expect_newenv) // rollback cachefile is missing but
686 // persistent env exists
687 r = toku_ydb_do_error(
688 env,
689 ENOENT,
690 "rollback cachefile [%s] is missing from [%s]\n",
691 toku_product_name_strings.rollback_cachefile,
692 env->i->dir);
693 else
694 r = 0; // both rollback cachefile and persistent env are
695 // missing
696 } else {
697 r = toku_ydb_do_error(
698 env,
699 stat_errno,
700 "Unable to access rollback cachefile [%s] in [%s]\n",
701 toku_product_name_strings.rollback_cachefile,
702 env->i->dir);
703 assert(r);
704 }
705 }
706 toku_free(path);
707 }
708
709 // Test for fileops directory
710 if (r == 0) {
711 path = toku_construct_full_name(
712 2, env->i->dir, toku_product_name_strings.fileopsdirectory);
713 assert(path);
714 r = toku_stat(path, &buf, toku_uninstrumented);
715 if (r == 0) {
716 if (expect_newenv) // fileops directory exists, but persistent env
717 // is missing
718 r = toku_ydb_do_error(
719 env,
720 ENOENT,
721 "Persistent environment is missing while looking for "
722 "fileops directory [%s] in [%s]\n",
723 toku_product_name_strings.fileopsdirectory,
724 env->i->dir);
725 } else {
726 int stat_errno = get_error_errno();
727 if (stat_errno == ENOENT) {
728 if (!expect_newenv) // fileops directory is missing but
729 // persistent env exists
730 r = toku_ydb_do_error(
731 env,
732 ENOENT,
733 "Fileops directory [%s] is missing from [%s]\n",
734 toku_product_name_strings.fileopsdirectory,
735 env->i->dir);
736 else
737 r = 0; // both fileops directory and persistent env are
738 // missing
739 } else {
740 r = toku_ydb_do_error(
741 env,
742 stat_errno,
743 "Unable to access fileops directory [%s] in [%s]\n",
744 toku_product_name_strings.fileopsdirectory,
745 env->i->dir);
746 assert(r);
747 }
748 }
749 toku_free(path);
750 }
751
752 // Test for recovery log
753 if ((r == 0) && (env->i->open_flags & DB_INIT_LOG)) {
754 // if using transactions, test for existence of log
755 r = ydb_recover_log_exists(env); // return 0 or ENOENT
756 if (expect_newenv && (r != ENOENT))
757 r = toku_ydb_do_error(env,
758 ENOENT,
759 "Persistent environment information is "
760 "missing (but log exists) while looking for "
761 "recovery log files in [%s]\n",
762 env->i->real_log_dir);
763 else if (!expect_newenv && r == ENOENT)
764 r = toku_ydb_do_error(env,
765 ENOENT,
766 "Recovery log is missing (persistent "
767 "environment information is present) while "
768 "looking for recovery log files in [%s]\n",
769 env->i->real_log_dir);
770 else
771 r = 0;
772 }
773
774 if (r == 0)
775 *valid_newenv = expect_newenv;
776 else
777 *valid_newenv = false;
778 return r;
779}
780
781// The version of the environment (on disk) is the version of the recovery log.
782// If the recovery log is of the current version, then there is no upgrade to be done.
783// If the recovery log is of an old version, then replacing it with a new recovery log
784// of the current version is how the upgrade is done.
785// Note, the upgrade procedure takes a checkpoint, so we must release the ydb lock.
786static int
787ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, bool * upgrade_in_progress) {
788 int r = 0;
789 if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
790 r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
791 }
792 return r;
793}
794
795static void
796unlock_single_process(DB_ENV *env) {
797 int r;
798 r = toku_single_process_unlock(&env->i->envdir_lockfd);
799 lazy_assert_zero(r);
800 r = toku_single_process_unlock(&env->i->datadir_lockfd);
801 lazy_assert_zero(r);
802 r = toku_single_process_unlock(&env->i->logdir_lockfd);
803 lazy_assert_zero(r);
804 r = toku_single_process_unlock(&env->i->tmpdir_lockfd);
805 lazy_assert_zero(r);
806}
807
808// Open the environment.
809// If this is a new environment, then create the necessary files.
810// Return 0 on success, ENOENT if any of the expected necessary files are missing.
811// (The set of necessary files is defined in the function validate_env() above.)
812static int
813env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
814 HANDLE_PANICKED_ENV(env);
815 int r;
816 bool newenv; // true iff creating a new environment
817 uint32_t unused_flags=flags;
818 CHECKPOINTER cp;
819 DB_TXN *txn = NULL;
820
821 if (env_opened(env)) {
822 r = toku_ydb_do_error(env, EINVAL, "The environment is already open\n");
823 goto cleanup;
824 }
825
826 if (env->get_check_thp(env) && toku_os_huge_pages_enabled()) {
827 r = toku_ydb_do_error(env, TOKUDB_HUGE_PAGES_ENABLED,
828 "Huge pages are enabled, disable them before continuing\n");
829 goto cleanup;
830 }
831
832 most_recent_env = NULL;
833
834 assert(sizeof(time_t) == sizeof(uint64_t));
835
836 HANDLE_EXTRA_FLAGS(env, flags,
837 DB_CREATE|DB_PRIVATE|DB_INIT_LOG|DB_INIT_TXN|DB_RECOVER|DB_INIT_MPOOL|DB_INIT_LOCK|DB_THREAD);
838
839 // DB_CREATE means create if env does not exist, and PerconaFT requires it because
840 // PerconaFT requries DB_PRIVATE.
841 if ((flags & DB_PRIVATE) && !(flags & DB_CREATE)) {
842 r = toku_ydb_do_error(env, ENOENT, "DB_PRIVATE requires DB_CREATE (seems gratuitous to us, but that's BDB's behavior\n");
843 goto cleanup;
844 }
845
846 if (!(flags & DB_PRIVATE)) {
847 r = toku_ydb_do_error(env, ENOENT, "PerconaFT requires DB_PRIVATE\n");
848 goto cleanup;
849 }
850
851 if ((flags & DB_INIT_LOG) && !(flags & DB_INIT_TXN)) {
852 r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires transactions for logging\n");
853 goto cleanup;
854 }
855
856 if (!home) home = ".";
857
858 // Verify that the home exists.
859 toku_struct_stat buf;
860 r = toku_stat(home, &buf, toku_uninstrumented);
861 if (r != 0) {
862 int e = get_error_errno();
863 r = toku_ydb_do_error(
864 env, e, "Error from toku_stat(\"%s\",...)\n", home);
865 goto cleanup;
866 }
867 unused_flags &= ~DB_PRIVATE;
868
869 if (env->i->dir) {
870 toku_free(env->i->dir);
871 }
872 env->i->dir = toku_strdup(home);
873 if (env->i->dir == 0) {
874 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
875 goto cleanup;
876 }
877 env->i->open_flags = flags;
878 env->i->open_mode = mode;
879
880 // Instrumentation probe start
881 TOKU_PROBE_START(toku_instr_probe_1);
882
883 env_setup_real_data_dir(env);
884 env_setup_real_log_dir(env);
885 env_setup_real_tmp_dir(env);
886
887 // Instrumentation probe stop
888 toku_instr_probe_1->stop();
889
890 r = toku_single_process_lock(
891 env->i->dir, "environment", &env->i->envdir_lockfd);
892 if (r != 0)
893 goto cleanup;
894 r = toku_single_process_lock(
895 env->i->real_data_dir, "data", &env->i->datadir_lockfd);
896 if (r!=0) goto cleanup;
897 r = toku_single_process_lock(env->i->real_log_dir, "logs", &env->i->logdir_lockfd);
898 if (r!=0) goto cleanup;
899 r = toku_single_process_lock(env->i->real_tmp_dir, "temp", &env->i->tmpdir_lockfd);
900 if (r!=0) goto cleanup;
901
902 bool need_rollback_cachefile;
903 need_rollback_cachefile = false;
904 if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
905 need_rollback_cachefile = true;
906 }
907
908 ydb_layer_status_init(); // do this before possibly upgrading, so upgrade work is counted in status counters
909
910 LSN last_lsn_of_clean_shutdown_read_from_log;
911 last_lsn_of_clean_shutdown_read_from_log = ZERO_LSN;
912 bool upgrade_in_progress;
913 upgrade_in_progress = false;
914 r = ydb_maybe_upgrade_env(env, &last_lsn_of_clean_shutdown_read_from_log, &upgrade_in_progress);
915 if (r!=0) goto cleanup;
916
917 if (upgrade_in_progress) {
918 // Delete old rollback file. There was a clean shutdown, so it has nothing useful,
919 // and there is no value in upgrading it. It is simpler to just create a new one.
920 char* rollback_filename = toku_construct_full_name(2, env->i->dir, toku_product_name_strings.rollback_cachefile);
921 assert(rollback_filename);
922 r = unlink(rollback_filename);
923 if (r != 0) {
924 assert(get_error_errno() == ENOENT);
925 }
926 toku_free(rollback_filename);
927 need_rollback_cachefile = false; // we're not expecting it to exist now
928 }
929
930 r = validate_env(env, &newenv, need_rollback_cachefile); // make sure that environment is either new or complete
931 if (r != 0) goto cleanup;
932
933 unused_flags &= ~DB_INIT_TXN & ~DB_INIT_LOG;
934
935 // do recovery only if there exists a log and recovery is requested
936 // otherwise, a log is created when the logger is opened later
937 if (!newenv) {
938 if (flags & DB_INIT_LOG) {
939 // the log does exist
940 if (flags & DB_RECOVER) {
941 r = ydb_do_recovery(env);
942 if (r != 0) goto cleanup;
943 } else {
944 // the log is required to have clean shutdown if recovery is not requested
945 r = needs_recovery(env);
946 if (r != 0) goto cleanup;
947 }
948 }
949 }
950
951 toku_loader_cleanup_temp_files(env);
952
953 if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
954 assert(env->i->logger);
955 toku_logger_write_log_files(env->i->logger, (bool)((flags & DB_INIT_LOG) != 0));
956 if (!toku_logger_is_open(env->i->logger)) {
957 r = toku_logger_open(env->i->real_log_dir, env->i->logger);
958 if (r!=0) {
959 toku_ydb_do_error(env, r, "Could not open logger\n");
960 }
961 }
962 } else {
963 r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
964 assert_zero(r);
965 }
966
967 unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
968 unused_flags &= ~DB_CREATE; // we always do DB_CREATE
969 unused_flags &= ~DB_INIT_LOCK; // we check this later (e.g. in db->open)
970 unused_flags &= ~DB_RECOVER;
971
972// This is probably correct, but it will be pain...
973// if ((flags & DB_THREAD)==0) {
974// r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires DB_THREAD");
975// goto cleanup;
976// }
977 unused_flags &= ~DB_THREAD;
978
979 if (unused_flags!=0) {
980 r = toku_ydb_do_error(env, EINVAL, "Extra flags not understood by tokuft: %u\n", unused_flags);
981 goto cleanup;
982 }
983
984 if (env->i->cachetable==NULL) {
985 // If we ran recovery then the cachetable should be set here.
986 r = toku_cachetable_create_ex(&env->i->cachetable, env->i->cachetable_size,
987 env->i->client_pool_threads,
988 env->i->cachetable_pool_threads,
989 env->i->checkpoint_pool_threads,
990 ZERO_LSN, env->i->logger);
991 if (r != 0) {
992 r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
993 goto cleanup;
994 }
995 }
996
997 toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
998
999 int using_txns;
1000 using_txns = env->i->open_flags & DB_INIT_TXN;
1001 if (env->i->logger) {
1002 // if this is a newborn env or if this is an upgrade, then create a brand new rollback file
1003 assert (using_txns);
1004 toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
1005 if (!toku_logger_rollback_is_open(env->i->logger)) {
1006 bool create_new_rollback_file = newenv | upgrade_in_progress;
1007 r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
1008 if (r != 0) {
1009 r = toku_ydb_do_error(env, r, "Cant open rollback\n");
1010 goto cleanup;
1011 }
1012 }
1013 }
1014
1015 if (using_txns) {
1016 r = toku_txn_begin(env, 0, &txn, 0);
1017 assert_zero(r);
1018 }
1019
1020 {
1021 r = toku_db_create(&env->i->persistent_environment, env, 0);
1022 assert_zero(r);
1023 r = toku_db_use_builtin_key_cmp(env->i->persistent_environment);
1024 assert_zero(r);
1025 r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
1026 if (r != 0) {
1027 r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
1028 goto cleanup;
1029 }
1030 if (newenv) {
1031 // create new persistent_environment
1032 DBT key, val;
1033 uint32_t persistent_original_env_version = FT_LAYOUT_VERSION;
1034 const uint32_t environment_version = toku_htod32(persistent_original_env_version);
1035
1036 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
1037 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1038 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1039 assert_zero(r);
1040
1041 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
1042 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1043 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1044 assert_zero(r);
1045
1046 time_t creation_time_d = toku_htod64(time(NULL));
1047 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
1048 toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
1049 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1050 assert_zero(r);
1051 }
1052 else {
1053 r = maybe_upgrade_persistent_environment_dictionary(env, txn, last_lsn_of_clean_shutdown_read_from_log);
1054 assert_zero(r);
1055 }
1056 capture_persistent_env_contents(env, txn);
1057 }
1058 {
1059 r = toku_db_create(&env->i->directory, env, 0);
1060 assert_zero(r);
1061 r = toku_db_use_builtin_key_cmp(env->i->directory);
1062 assert_zero(r);
1063 r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
1064 if (r != 0) {
1065 r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
1066 goto cleanup;
1067 }
1068 }
1069 if (using_txns) {
1070 r = locked_txn_commit(txn, 0);
1071 assert_zero(r);
1072 txn = NULL;
1073 }
1074 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1075 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
1076 assert_zero(r);
1077 env_fs_poller(env); // get the file system state at startup
1078 r = env_fs_init_minicron(env);
1079 if (r != 0) {
1080 r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
1081 goto cleanup;
1082 }
1083 r = env_fsync_log_cron_init(env);
1084 if (r != 0) {
1085 r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
1086 goto cleanup;
1087 }
1088cleanup:
1089 if (r!=0) {
1090 if (txn) {
1091 locked_txn_abort(txn);
1092 }
1093 if (env && env->i) {
1094 unlock_single_process(env);
1095 }
1096 }
1097 if (r == 0) {
1098 set_errno(0); // tabula rasa. If there's a crash after env was successfully opened, no misleading errno will have been left around by this code.
1099 most_recent_env = env;
1100 uint64_t num_rows;
1101 env_get_engine_status_num_rows(env, &num_rows);
1102 toku_assert_set_fpointers(toku_maybe_get_engine_status_text, toku_maybe_err_engine_status, toku_maybe_set_env_panic, num_rows);
1103 }
1104 return r;
1105}
1106
1107static int
1108env_close(DB_ENV * env, uint32_t flags) {
1109 int r = 0;
1110 const char * err_msg = NULL;
1111 bool clean_shutdown = true;
1112
1113 if (flags & TOKUFT_DIRTY_SHUTDOWN) {
1114 clean_shutdown = false;
1115 flags &= ~TOKUFT_DIRTY_SHUTDOWN;
1116 }
1117
1118 most_recent_env = NULL; // Set most_recent_env to NULL so that we don't have a dangling pointer (and if there's an error, the toku assert code would try to look at the env.)
1119
1120 // if panicked, or if any open transactions, or any open dbs, then do nothing.
1121
1122 if (toku_env_is_panicked(env)) {
1123 goto panic_and_quit_early;
1124 }
1125 if (env->i->logger && toku_logger_txns_exist(env->i->logger)) {
1126 err_msg = "Cannot close environment due to open transactions\n";
1127 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1128 goto panic_and_quit_early;
1129 }
1130 if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
1131 if (env->i->open_dbs_by_dname->size() > 0) {
1132 err_msg = "Cannot close environment due to open DBs\n";
1133 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1134 goto panic_and_quit_early;
1135 }
1136 }
1137 if (env->i->persistent_environment) {
1138 r = toku_db_close(env->i->persistent_environment);
1139 if (r) {
1140 err_msg = "Cannot close persistent environment dictionary (DB->close error)\n";
1141 toku_ydb_do_error(env, r, "%s", err_msg);
1142 goto panic_and_quit_early;
1143 }
1144 }
1145 if (env->i->directory) {
1146 r = toku_db_close(env->i->directory);
1147 if (r) {
1148 err_msg = "Cannot close Directory dictionary (DB->close error)\n";
1149 toku_ydb_do_error(env, r, "%s", err_msg);
1150 goto panic_and_quit_early;
1151 }
1152 }
1153 env_fsync_log_cron_destroy(env);
1154 if (env->i->cachetable) {
1155 toku_cachetable_prepare_close(env->i->cachetable);
1156 toku_cachetable_minicron_shutdown(env->i->cachetable);
1157 if (env->i->logger) {
1158 CHECKPOINTER cp = nullptr;
1159 if (clean_shutdown) {
1160 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1161 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1162 if (r) {
1163 err_msg = "Cannot close environment (error during checkpoint)\n";
1164 toku_ydb_do_error(env, r, "%s", err_msg);
1165 goto panic_and_quit_early;
1166 }
1167 }
1168 toku_logger_close_rollback_check_empty(env->i->logger, clean_shutdown);
1169 if (clean_shutdown) {
1170 //Do a second checkpoint now that the rollback cachefile is closed.
1171 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1172 if (r) {
1173 err_msg = "Cannot close environment (error during checkpoint)\n";
1174 toku_ydb_do_error(env, r, "%s", err_msg);
1175 goto panic_and_quit_early;
1176 }
1177 toku_logger_shutdown(env->i->logger);
1178 }
1179 }
1180 toku_cachetable_close(&env->i->cachetable);
1181 }
1182 if (env->i->logger) {
1183 r = toku_logger_close(&env->i->logger);
1184 if (r) {
1185 err_msg = "Cannot close environment (logger close error)\n";
1186 env->i->logger = NULL;
1187 toku_ydb_do_error(env, r, "%s", err_msg);
1188 goto panic_and_quit_early;
1189 }
1190 }
1191 // Even if nothing else went wrong, but we were panicked, then raise an error.
1192 // But if something else went wrong then raise that error (above)
1193 if (toku_env_is_panicked(env)) {
1194 goto panic_and_quit_early;
1195 } else {
1196 assert(env->i->panic_string == 0);
1197 }
1198
1199 env_fs_destroy(env);
1200 env->i->ltm.destroy();
1201 if (env->i->data_dir)
1202 toku_free(env->i->data_dir);
1203 if (env->i->lg_dir)
1204 toku_free(env->i->lg_dir);
1205 if (env->i->tmp_dir)
1206 toku_free(env->i->tmp_dir);
1207 if (env->i->real_data_dir)
1208 toku_free(env->i->real_data_dir);
1209 if (env->i->real_log_dir)
1210 toku_free(env->i->real_log_dir);
1211 if (env->i->real_tmp_dir)
1212 toku_free(env->i->real_tmp_dir);
1213 if (env->i->open_dbs_by_dname) {
1214 env->i->open_dbs_by_dname->destroy();
1215 toku_free(env->i->open_dbs_by_dname);
1216 }
1217 if (env->i->open_dbs_by_dict_id) {
1218 env->i->open_dbs_by_dict_id->destroy();
1219 toku_free(env->i->open_dbs_by_dict_id);
1220 }
1221 if (env->i->dir)
1222 toku_free(env->i->dir);
1223 toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
1224
1225 // Immediately before freeing internal environment unlock the directories
1226 unlock_single_process(env);
1227 toku_free(env->i);
1228 toku_free(env);
1229 toku_sync_fetch_and_add(&tokuft_num_envs, -1);
1230 if (flags != 0) {
1231 r = EINVAL;
1232 }
1233 return r;
1234
1235panic_and_quit_early:
1236 //release lock files.
1237 unlock_single_process(env);
1238 //r is the panic error
1239 if (toku_env_is_panicked(env)) {
1240 char *panic_string = env->i->panic_string;
1241 r = toku_ydb_do_error(env, toku_env_is_panicked(env), "Cannot close environment due to previous error: %s\n", panic_string);
1242 }
1243 else {
1244 env_panic(env, r, err_msg);
1245 }
1246 return r;
1247}
1248
1249static int
1250env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
1251 return toku_logger_log_archive(env->i->logger, list, flags);
1252}
1253
1254static int
1255env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
1256 HANDLE_PANICKED_ENV(env);
1257 // do nothing if no logger
1258 if (env->i->logger) {
1259 // We just flush everything. MySQL uses lsn == 0 which means flush everything.
1260 // For anyone else using the log, it is correct to flush too much, so we are OK.
1261 toku_logger_fsync(env->i->logger);
1262 }
1263 return 0;
1264}
1265
1266static int
1267env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
1268 HANDLE_PANICKED_ENV(env);
1269 if (ncache != 1) {
1270 return EINVAL;
1271 }
1272 uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
1273 unsigned long cs = cs64;
1274 if (cs64 > cs) {
1275 return EINVAL;
1276 }
1277 env->i->cachetable_size = cs;
1278 return 0;
1279}
1280
1281static int
1282env_set_client_pool_threads(DB_ENV * env, uint32_t threads) {
1283 HANDLE_PANICKED_ENV(env);
1284 env->i->client_pool_threads = threads;
1285 return 0;
1286}
1287
1288static int
1289env_set_cachetable_pool_threads(DB_ENV * env, uint32_t threads) {
1290 HANDLE_PANICKED_ENV(env);
1291 env->i->cachetable_pool_threads = threads;
1292 return 0;
1293}
1294
1295static int
1296env_set_checkpoint_pool_threads(DB_ENV * env, uint32_t threads) {
1297 HANDLE_PANICKED_ENV(env);
1298 env->i->checkpoint_pool_threads = threads;
1299 return 0;
1300}
1301
1302static void
1303env_set_check_thp(DB_ENV * env, bool new_val) {
1304 assert(env);
1305 env->i->check_thp = new_val;
1306}
1307
1308static bool
1309env_get_check_thp(DB_ENV * env) {
1310 assert(env);
1311 return env->i->check_thp;
1312}
1313
1314static bool env_set_dir_per_db(DB_ENV *env, bool new_val) {
1315 HANDLE_PANICKED_ENV(env);
1316 bool r = env->i->dir_per_db;
1317 env->i->dir_per_db = new_val;
1318 return r;
1319}
1320
1321static bool env_get_dir_per_db(DB_ENV *env) {
1322 HANDLE_PANICKED_ENV(env);
1323 return env->i->dir_per_db;
1324}
1325
1326static const char *env_get_data_dir(DB_ENV *env) {
1327 return env->i->real_data_dir;
1328}
1329
1330static int env_dirtool_attach(DB_ENV *env,
1331 DB_TXN *txn,
1332 const char *dname,
1333 const char *iname) {
1334 int r;
1335 DBT dname_dbt;
1336 DBT iname_dbt;
1337
1338 HANDLE_PANICKED_ENV(env);
1339 if (!env_opened(env)) {
1340 return EINVAL;
1341 }
1342 HANDLE_READ_ONLY_TXN(txn);
1343 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1344 toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
1345
1346 r = toku_db_put(env->i->directory,
1347 txn,
1348 &dname_dbt,
1349 &iname_dbt,
1350 0,
1351 true);
1352 return r;
1353}
1354
1355static int env_dirtool_detach(DB_ENV *env,
1356 DB_TXN *txn,
1357 const char *dname) {
1358 int r;
1359 DBT dname_dbt;
1360 DBT old_iname_dbt;
1361
1362 HANDLE_PANICKED_ENV(env);
1363 if (!env_opened(env)) {
1364 return EINVAL;
1365 }
1366 HANDLE_READ_ONLY_TXN(txn);
1367
1368 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1369 toku_init_dbt_flags(&old_iname_dbt, DB_DBT_REALLOC);
1370
1371 r = toku_db_get(env->i->directory,
1372 txn,
1373 &dname_dbt,
1374 &old_iname_dbt,
1375 DB_SERIALIZABLE); // allocates memory for iname
1376 if (r == DB_NOTFOUND)
1377 return EEXIST;
1378 toku_free(old_iname_dbt.data);
1379
1380 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
1381
1382 return r;
1383}
1384
1385static int env_dirtool_move(DB_ENV *env,
1386 DB_TXN *txn,
1387 const char *old_dname,
1388 const char *new_dname) {
1389 int r;
1390 DBT old_dname_dbt;
1391 DBT new_dname_dbt;
1392 DBT iname_dbt;
1393
1394 HANDLE_PANICKED_ENV(env);
1395 if (!env_opened(env)) {
1396 return EINVAL;
1397 }
1398 HANDLE_READ_ONLY_TXN(txn);
1399
1400 toku_fill_dbt(&old_dname_dbt, old_dname, strlen(old_dname) + 1);
1401 toku_fill_dbt(&new_dname_dbt, new_dname, strlen(new_dname) + 1);
1402 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
1403
1404 r = toku_db_get(env->i->directory,
1405 txn,
1406 &old_dname_dbt,
1407 &iname_dbt,
1408 DB_SERIALIZABLE); // allocates memory for iname
1409 if (r == DB_NOTFOUND)
1410 return EEXIST;
1411
1412 r = toku_db_del(
1413 env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
1414 if (r != 0)
1415 goto exit;
1416
1417 r = toku_db_put(
1418 env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, true);
1419
1420exit:
1421 toku_free(iname_dbt.data);
1422 return r;
1423}
1424
1425static int locked_env_op(DB_ENV *env,
1426 DB_TXN *txn,
1427 std::function<int(DB_TXN *)> f) {
1428 int ret, r;
1429 HANDLE_READ_ONLY_TXN(txn);
1430 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1431
1432 DB_TXN *child_txn = NULL;
1433 int using_txns = env->i->open_flags & DB_INIT_TXN;
1434 if (using_txns) {
1435 ret = toku_txn_begin(env, txn, &child_txn, 0);
1436 lazy_assert_zero(ret);
1437 }
1438
1439 // cannot begin a checkpoint
1440 toku_multi_operation_client_lock();
1441 r = f(child_txn);
1442 toku_multi_operation_client_unlock();
1443
1444 if (using_txns) {
1445 if (r == 0) {
1446 ret = locked_txn_commit(child_txn, 0);
1447 lazy_assert_zero(ret);
1448 } else {
1449 ret = locked_txn_abort(child_txn);
1450 lazy_assert_zero(ret);
1451 }
1452 }
1453 return r;
1454
1455}
1456
1457static int locked_env_dirtool_attach(DB_ENV *env,
1458 DB_TXN *txn,
1459 const char *dname,
1460 const char *iname) {
1461 auto f = std::bind(
1462 env_dirtool_attach, env, std::placeholders::_1, dname, iname);
1463 return locked_env_op(env, txn, f);
1464}
1465
1466static int locked_env_dirtool_detach(DB_ENV *env,
1467 DB_TXN *txn,
1468 const char *dname) {
1469 auto f = std::bind(
1470 env_dirtool_detach, env, std::placeholders::_1, dname);
1471 return locked_env_op(env, txn, f);
1472}
1473
1474static int locked_env_dirtool_move(DB_ENV *env,
1475 DB_TXN *txn,
1476 const char *old_dname,
1477 const char *new_dname) {
1478 auto f = std::bind(
1479 env_dirtool_move, env, std::placeholders::_1, old_dname, new_dname);
1480 return locked_env_op(env, txn, f);
1481}
1482
1483static int env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags);
1484
1485static int
1486locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
1487 int ret, r;
1488 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1489 HANDLE_READ_ONLY_TXN(txn);
1490
1491 DB_TXN *child_txn = NULL;
1492 int using_txns = env->i->open_flags & DB_INIT_TXN;
1493 if (using_txns) {
1494 ret = toku_txn_begin(env, txn, &child_txn, 0);
1495 lazy_assert_zero(ret);
1496 }
1497
1498 // cannot begin a checkpoint
1499 toku_multi_operation_client_lock();
1500 r = env_dbremove(env, child_txn, fname, dbname, flags);
1501 toku_multi_operation_client_unlock();
1502
1503 if (using_txns) {
1504 if (r == 0) {
1505 ret = locked_txn_commit(child_txn, 0);
1506 lazy_assert_zero(ret);
1507 } else {
1508 ret = locked_txn_abort(child_txn);
1509 lazy_assert_zero(ret);
1510 }
1511 }
1512 return r;
1513}
1514
1515static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags);
1516
1517static int
1518locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
1519 int ret, r;
1520 HANDLE_READ_ONLY_TXN(txn);
1521 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1522
1523 DB_TXN *child_txn = NULL;
1524 int using_txns = env->i->open_flags & DB_INIT_TXN;
1525 if (using_txns) {
1526 ret = toku_txn_begin(env, txn, &child_txn, 0);
1527 lazy_assert_zero(ret);
1528 }
1529
1530 // cannot begin a checkpoint
1531 toku_multi_operation_client_lock();
1532 r = env_dbrename(env, child_txn, fname, dbname, newname, flags);
1533 toku_multi_operation_client_unlock();
1534
1535 if (using_txns) {
1536 if (r == 0) {
1537 ret = locked_txn_commit(child_txn, 0);
1538 lazy_assert_zero(ret);
1539 } else {
1540 ret = locked_txn_abort(child_txn);
1541 lazy_assert_zero(ret);
1542 }
1543 }
1544 return r;
1545}
1546
1547#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
1548
1549static int
1550env_get_cachesize(DB_ENV * env, uint32_t *gbytes, uint32_t *bytes, int *ncache) {
1551 HANDLE_PANICKED_ENV(env);
1552 *gbytes = env->i->cachetable_size >> 30;
1553 *bytes = env->i->cachetable_size & ((1<<30)-1);
1554 *ncache = 1;
1555 return 0;
1556}
1557
1558#endif
1559
1560static int
1561env_set_data_dir(DB_ENV * env, const char *dir) {
1562 HANDLE_PANICKED_ENV(env);
1563 int r;
1564
1565 if (env_opened(env) || !dir) {
1566 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
1567 }
1568 else if (env->i->data_dir)
1569 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir more than once.\n");
1570 else {
1571 env->i->data_dir = toku_strdup(dir);
1572 if (env->i->data_dir==NULL) {
1573 assert(get_error_errno() == ENOMEM);
1574 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1575 }
1576 else r = 0;
1577 }
1578 return r;
1579}
1580
1581static void
1582env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
1583 env->i->errcall = errcall;
1584}
1585
1586static void
1587env_set_errfile(DB_ENV*env, FILE*errfile) {
1588 env->i->errfile = errfile;
1589}
1590
1591static void
1592env_set_errpfx(DB_ENV * env, const char *errpfx) {
1593 env->i->errpfx = errpfx;
1594}
1595
1596static int
1597env_set_flags(DB_ENV * env, uint32_t flags, int onoff) {
1598 HANDLE_PANICKED_ENV(env);
1599
1600 uint32_t change = 0;
1601 if (flags & DB_AUTO_COMMIT) {
1602 change |= DB_AUTO_COMMIT;
1603 flags &= ~DB_AUTO_COMMIT;
1604 }
1605 if (flags != 0 && onoff) {
1606 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
1607 }
1608 if (onoff) env->i->open_flags |= change;
1609 else env->i->open_flags &= ~change;
1610 return 0;
1611}
1612
1613static int
1614env_set_lg_bsize(DB_ENV * env, uint32_t bsize) {
1615 HANDLE_PANICKED_ENV(env);
1616 return toku_logger_set_lg_bsize(env->i->logger, bsize);
1617}
1618
1619static int
1620env_set_lg_dir(DB_ENV * env, const char *dir) {
1621 HANDLE_PANICKED_ENV(env);
1622 if (env_opened(env)) {
1623 return toku_ydb_do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
1624 }
1625
1626 if (env->i->lg_dir) toku_free(env->i->lg_dir);
1627 if (dir) {
1628 env->i->lg_dir = toku_strdup(dir);
1629 if (!env->i->lg_dir) {
1630 return toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1631 }
1632 }
1633 else env->i->lg_dir = NULL;
1634 return 0;
1635}
1636
1637static int
1638env_set_lg_max(DB_ENV * env, uint32_t lg_max) {
1639 HANDLE_PANICKED_ENV(env);
1640 return toku_logger_set_lg_max(env->i->logger, lg_max);
1641}
1642
1643static int
1644env_get_lg_max(DB_ENV * env, uint32_t *lg_maxp) {
1645 HANDLE_PANICKED_ENV(env);
1646 return toku_logger_get_lg_max(env->i->logger, lg_maxp);
1647}
1648
1649static int
1650env_set_lk_detect(DB_ENV * env, uint32_t UU(detect)) {
1651 HANDLE_PANICKED_ENV(env);
1652 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support set_lk_detect\n");
1653}
1654
1655static int
1656env_set_lk_max_memory(DB_ENV *env, uint64_t lock_memory_limit) {
1657 HANDLE_PANICKED_ENV(env);
1658 int r = 0;
1659 if (env_opened(env)) {
1660 r = EINVAL;
1661 } else {
1662 r = env->i->ltm.set_max_lock_memory(lock_memory_limit);
1663 }
1664 return r;
1665}
1666
1667static int
1668env_get_lk_max_memory(DB_ENV *env, uint64_t *lk_maxp) {
1669 HANDLE_PANICKED_ENV(env);
1670 uint32_t max_lock_memory = env->i->ltm.get_max_lock_memory();
1671 *lk_maxp = max_lock_memory;
1672 return 0;
1673}
1674
1675//void toku__env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
1676// env->i->noticecall = noticecall;
1677//}
1678
1679static int
1680env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
1681 HANDLE_PANICKED_ENV(env);
1682 if (env_opened(env)) {
1683 return toku_ydb_do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
1684 }
1685 if (!tmp_dir) {
1686 return toku_ydb_do_error(env, EINVAL, "Tmp dir bust be non-null\n");
1687 }
1688 if (env->i->tmp_dir)
1689 toku_free(env->i->tmp_dir);
1690 env->i->tmp_dir = toku_strdup(tmp_dir);
1691 return env->i->tmp_dir ? 0 : ENOMEM;
1692}
1693
1694static int
1695env_set_verbose(DB_ENV * env, uint32_t UU(which), int UU(onoff)) {
1696 HANDLE_PANICKED_ENV(env);
1697 return 1;
1698}
1699
1700static int
1701toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte __attribute__((__unused__)), uint32_t min __attribute__((__unused__)), uint32_t flags __attribute__((__unused__))) {
1702 CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1703 int r = toku_checkpoint(cp, env->i->logger,
1704 checkpoint_callback_f, checkpoint_callback_extra,
1705 checkpoint_callback2_f, checkpoint_callback2_extra,
1706 CLIENT_CHECKPOINT);
1707 if (r) {
1708 // Panicking the whole environment may be overkill, but I'm not sure what else to do.
1709 env_panic(env, r, "checkpoint error\n");
1710 toku_ydb_do_error(env, r, "Checkpoint\n");
1711 }
1712 return r;
1713}
1714
1715static int
1716env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), uint32_t UU(flags)) {
1717 HANDLE_PANICKED_ENV(env);
1718 return 1;
1719}
1720
1721//
1722// We can assume the client calls this function right after recovery
1723// to return a list of prepared transactions to the user. When called,
1724// we can assume that no other work is being done in the system,
1725// as we are in the state of being after recovery,
1726// but before client operations should commence
1727//
1728static int
1729env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1730 struct tokulogger_preplist *MALLOC_N(count,preps);
1731 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1732 if (r==0) {
1733 assert(*retp<=count);
1734 for (int i=0; i<*retp; i++) {
1735 xids[i] = preps[i].xid;
1736 }
1737 }
1738 toku_free(preps);
1739 return r;
1740}
1741
1742//
1743// We can assume the client calls this function right after recovery
1744// to return a list of prepared transactions to the user. When called,
1745// we can assume that no other work is being done in the system,
1746// as we are in the state of being after recovery,
1747// but before client operations should commence
1748//
1749static int
1750env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1751 struct tokulogger_preplist *MALLOC_N(count,preps);
1752 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1753 if (r==0) {
1754 assert(*retp<=count);
1755 for (int i=0; i<*retp; i++) {
1756 preplist[i].txn = preps[i].txn;
1757 memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
1758 }
1759 }
1760 toku_free(preps);
1761 return r;
1762}
1763
1764static int
1765env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
1766 return toku_txn_manager_get_root_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
1767}
1768
1769static int
1770env_checkpointing_set_period(DB_ENV * env, uint32_t seconds) {
1771 HANDLE_PANICKED_ENV(env);
1772 int r = 0;
1773 if (!env_opened(env)) {
1774 r = EINVAL;
1775 } else {
1776 toku_set_checkpoint_period(env->i->cachetable, seconds);
1777 }
1778 return r;
1779}
1780
1781static int
1782env_cleaner_set_period(DB_ENV * env, uint32_t seconds) {
1783 HANDLE_PANICKED_ENV(env);
1784 int r = 0;
1785 if (!env_opened(env)) {
1786 r = EINVAL;
1787 } else {
1788 toku_set_cleaner_period(env->i->cachetable, seconds);
1789 }
1790 return r;
1791}
1792
1793static int
1794env_cleaner_set_iterations(DB_ENV * env, uint32_t iterations) {
1795 HANDLE_PANICKED_ENV(env);
1796 int r = 0;
1797 if (!env_opened(env)) {
1798 r = EINVAL;
1799 } else {
1800 toku_set_cleaner_iterations(env->i->cachetable, iterations);
1801 }
1802 return r;
1803}
1804
1805static int
1806env_create_loader(DB_ENV *env,
1807 DB_TXN *txn,
1808 DB_LOADER **blp,
1809 DB *src_db,
1810 int N,
1811 DB *dbs[],
1812 uint32_t db_flags[/*N*/],
1813 uint32_t dbt_flags[/*N*/],
1814 uint32_t loader_flags) {
1815 int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
1816 return r;
1817}
1818
1819static int
1820env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
1821 HANDLE_PANICKED_ENV(env);
1822 int r = 0;
1823 if (!env_opened(env)) r = EINVAL;
1824 else
1825 *seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
1826 return r;
1827}
1828
1829static int
1830env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
1831 HANDLE_PANICKED_ENV(env);
1832 int r = 0;
1833 if (!env_opened(env)) r = EINVAL;
1834 else
1835 *seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
1836 return r;
1837}
1838
1839static int
1840env_cleaner_get_iterations(DB_ENV * env, uint32_t *iterations) {
1841 HANDLE_PANICKED_ENV(env);
1842 int r = 0;
1843 if (!env_opened(env)) r = EINVAL;
1844 else
1845 *iterations = toku_get_cleaner_iterations(env->i->cachetable);
1846 return r;
1847}
1848
1849static int
1850env_evictor_set_enable_partial_eviction(DB_ENV* env, bool enabled) {
1851 HANDLE_PANICKED_ENV(env);
1852 int r = 0;
1853 if (!env_opened(env)) r = EINVAL;
1854 else toku_set_enable_partial_eviction(env->i->cachetable, enabled);
1855 return r;
1856}
1857
1858static int
1859env_evictor_get_enable_partial_eviction(DB_ENV* env, bool *enabled) {
1860 HANDLE_PANICKED_ENV(env);
1861 int r = 0;
1862 if (!env_opened(env)) r = EINVAL;
1863 else *enabled = toku_get_enable_partial_eviction(env->i->cachetable);
1864 return r;
1865}
1866
1867static int
1868env_checkpointing_postpone(DB_ENV * env) {
1869 HANDLE_PANICKED_ENV(env);
1870 int r = 0;
1871 if (!env_opened(env)) r = EINVAL;
1872 else toku_checkpoint_safe_client_lock();
1873 return r;
1874}
1875
1876static int
1877env_checkpointing_resume(DB_ENV * env) {
1878 HANDLE_PANICKED_ENV(env);
1879 int r = 0;
1880 if (!env_opened(env)) r = EINVAL;
1881 else toku_checkpoint_safe_client_unlock();
1882 return r;
1883}
1884
1885static int
1886env_checkpointing_begin_atomic_operation(DB_ENV * env) {
1887 HANDLE_PANICKED_ENV(env);
1888 int r = 0;
1889 if (!env_opened(env)) r = EINVAL;
1890 else toku_multi_operation_client_lock();
1891 return r;
1892}
1893
1894static int
1895env_checkpointing_end_atomic_operation(DB_ENV * env) {
1896 HANDLE_PANICKED_ENV(env);
1897 int r = 0;
1898 if (!env_opened(env)) r = EINVAL;
1899 else toku_multi_operation_client_unlock();
1900 return r;
1901}
1902
1903static int
1904env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1905 HANDLE_PANICKED_ENV(env);
1906 int r = 0;
1907 if (env_opened(env)) r = EINVAL;
1908 else {
1909 env->i->bt_compare = bt_compare;
1910 }
1911 return r;
1912}
1913
1914static void
1915env_set_update (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
1916 env->i->update_function = update_function;
1917}
1918
1919static int
1920env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
1921 HANDLE_PANICKED_ENV(env);
1922 int r = 0;
1923 if (env_opened(env)) r = EINVAL;
1924 else {
1925 env->i->generate_row_for_put = generate_row_for_put;
1926 }
1927 return r;
1928}
1929
1930static int
1931env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
1932 HANDLE_PANICKED_ENV(env);
1933 int r = 0;
1934 if (env_opened(env)) r = EINVAL;
1935 else {
1936 env->i->generate_row_for_del = generate_row_for_del;
1937 }
1938 return r;
1939}
1940static int
1941env_set_redzone(DB_ENV *env, int redzone) {
1942 HANDLE_PANICKED_ENV(env);
1943 int r;
1944 if (env_opened(env))
1945 r = EINVAL;
1946 else {
1947 env->i->redzone = redzone;
1948 r = 0;
1949 }
1950 return r;
1951}
1952
1953static int env_get_lock_timeout(DB_ENV *env, uint64_t *lock_timeout_msec) {
1954 uint64_t t = env->i->default_lock_timeout_msec;
1955 if (env->i->get_lock_timeout_callback)
1956 t = env->i->get_lock_timeout_callback(t);
1957 *lock_timeout_msec = t;
1958 return 0;
1959}
1960
1961static int env_set_lock_timeout(DB_ENV *env, uint64_t default_lock_timeout_msec, uint64_t (*get_lock_timeout_callback)(uint64_t default_lock_timeout_msec)) {
1962 env->i->default_lock_timeout_msec = default_lock_timeout_msec;
1963 env->i->get_lock_timeout_callback = get_lock_timeout_callback;
1964 return 0;
1965}
1966
1967static int
1968env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
1969 env->i->lock_wait_timeout_callback = callback;
1970 return 0;
1971}
1972
1973static int
1974env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
1975 env->i->lock_wait_needed_callback = callback;
1976 return 0;
1977}
1978
1979static void
1980format_time(const time_t *timer, char *buf) {
1981 ctime_r(timer, buf);
1982 size_t len = strlen(buf);
1983 assert(len < 26);
1984 char end;
1985
1986 assert(len>=1);
1987 end = buf[len-1];
1988 while (end == '\n' || end == '\r') {
1989 buf[len-1] = '\0';
1990 len--;
1991 assert(len>=1);
1992 end = buf[len-1];
1993 }
1994}
1995
1996////////////////////////////////////////////////////////////////////////////////////////////////
1997// Local definition of status information from portability layer, which should not include db.h.
1998// Local status structs are used to concentrate file system information collected from various places
1999// and memory information collected from memory.c.
2000//
2001typedef enum {
2002 FS_ENOSPC_REDZONE_STATE = 0, // possible values are enumerated by fs_redzone_state
2003 FS_ENOSPC_THREADS_BLOCKED, // how many threads currently blocked on ENOSPC
2004 FS_ENOSPC_REDZONE_CTR, // number of operations rejected by enospc prevention (red zone)
2005 FS_ENOSPC_MOST_RECENT, // most recent time that file system was completely full
2006 FS_ENOSPC_COUNT, // total number of times ENOSPC was returned from an attempt to write
2007 FS_FSYNC_TIME,
2008 FS_FSYNC_COUNT,
2009 FS_LONG_FSYNC_TIME,
2010 FS_LONG_FSYNC_COUNT,
2011 FS_STATUS_NUM_ROWS, // must be last
2012} fs_status_entry;
2013
2014typedef struct {
2015 bool initialized;
2016 TOKU_ENGINE_STATUS_ROW_S status[FS_STATUS_NUM_ROWS];
2017} FS_STATUS_S, *FS_STATUS;
2018
2019static FS_STATUS_S fsstat;
2020
2021#define FS_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(fsstat, k, c, t, "filesystem: " l, inc)
2022
2023static void
2024fs_status_init(void) {
2025 FS_STATUS_INIT(FS_ENOSPC_REDZONE_STATE, nullptr, FS_STATE, "ENOSPC redzone state", TOKU_ENGINE_STATUS);
2026 FS_STATUS_INIT(FS_ENOSPC_THREADS_BLOCKED, FILESYSTEM_THREADS_BLOCKED_BY_FULL_DISK, UINT64, "threads currently blocked by full disk", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2027 FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR, nullptr, UINT64, "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
2028 FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT, nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
2029 FS_STATUS_INIT(FS_ENOSPC_COUNT, nullptr, UINT64, "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
2030 FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_TIME, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2031 FS_STATUS_INIT(FS_FSYNC_COUNT, FILESYSTEM_FSYNC_NUM, UINT64, "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2032 FS_STATUS_INIT(FS_LONG_FSYNC_TIME, FILESYSTEM_LONG_FSYNC_TIME, UINT64, "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2033 FS_STATUS_INIT(FS_LONG_FSYNC_COUNT, FILESYSTEM_LONG_FSYNC_NUM, UINT64, "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2034 fsstat.initialized = true;
2035}
2036#undef FS_STATUS_INIT
2037
2038#define FS_STATUS_VALUE(x) fsstat.status[x].value.num
2039
2040static void
2041fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
2042 if (!fsstat.initialized)
2043 fs_status_init();
2044
2045 time_t enospc_most_recent_timestamp;
2046 uint64_t enospc_threads_blocked, enospc_total;
2047 toku_fs_get_write_info(&enospc_most_recent_timestamp, &enospc_threads_blocked, &enospc_total);
2048 if (enospc_threads_blocked)
2049 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = FS_BLOCKED;
2050 else
2051 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = env->i->fs_state;
2052 *redzone_state = (fs_redzone_state) FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE);
2053 FS_STATUS_VALUE(FS_ENOSPC_THREADS_BLOCKED) = enospc_threads_blocked;
2054 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_CTR) = env->i->enospc_redzone_ctr;
2055 FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
2056 FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
2057
2058 uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
2059 toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
2060 FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
2061 FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
2062 FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
2063 FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
2064}
2065#undef FS_STATUS_VALUE
2066
2067// Local status struct used to get information from memory.c
2068typedef enum {
2069 MEMORY_MALLOC_COUNT = 0,
2070 MEMORY_FREE_COUNT,
2071 MEMORY_REALLOC_COUNT,
2072 MEMORY_MALLOC_FAIL,
2073 MEMORY_REALLOC_FAIL,
2074 MEMORY_REQUESTED,
2075 MEMORY_USED,
2076 MEMORY_FREED,
2077 MEMORY_MAX_REQUESTED_SIZE,
2078 MEMORY_LAST_FAILED_SIZE,
2079 MEMORY_MAX_IN_USE,
2080 MEMORY_MALLOCATOR_VERSION,
2081 MEMORY_MMAP_THRESHOLD,
2082 MEMORY_STATUS_NUM_ROWS
2083} memory_status_entry;
2084
2085typedef struct {
2086 bool initialized;
2087 TOKU_ENGINE_STATUS_ROW_S status[MEMORY_STATUS_NUM_ROWS];
2088} MEMORY_STATUS_S, *MEMORY_STATUS;
2089
2090static MEMORY_STATUS_S memory_status;
2091
2092#define STATUS_INIT(k,c,t,l) TOKUFT_STATUS_INIT(memory_status, k, c, t, "memory: " l, TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS)
2093
2094static void
2095memory_status_init(void) {
2096 // Note, this function initializes the keyname, type, and legend fields.
2097 // Value fields are initialized to zero by compiler.
2098 STATUS_INIT(MEMORY_MALLOC_COUNT, MEMORY_MALLOC_COUNT, UINT64, "number of malloc operations");
2099 STATUS_INIT(MEMORY_FREE_COUNT, MEMORY_FREE_COUNT, UINT64, "number of free operations");
2100 STATUS_INIT(MEMORY_REALLOC_COUNT, MEMORY_REALLOC_COUNT, UINT64, "number of realloc operations");
2101 STATUS_INIT(MEMORY_MALLOC_FAIL, MEMORY_MALLOC_FAIL, UINT64, "number of malloc operations that failed");
2102 STATUS_INIT(MEMORY_REALLOC_FAIL, MEMORY_REALLOC_FAIL, UINT64, "number of realloc operations that failed" );
2103 STATUS_INIT(MEMORY_REQUESTED, MEMORY_REQUESTED, UINT64, "number of bytes requested");
2104 STATUS_INIT(MEMORY_USED, MEMORY_USED, UINT64, "number of bytes used (requested + overhead)");
2105 STATUS_INIT(MEMORY_FREED, MEMORY_FREED, UINT64, "number of bytes freed");
2106 STATUS_INIT(MEMORY_MAX_REQUESTED_SIZE, MEMORY_MAX_REQUESTED_SIZE, UINT64, "largest attempted allocation size");
2107 STATUS_INIT(MEMORY_LAST_FAILED_SIZE, MEMORY_LAST_FAILED_SIZE, UINT64, "size of the last failed allocation attempt");
2108 STATUS_INIT(MEMORY_MAX_IN_USE, MEM_ESTIMATED_MAXIMUM_MEMORY_FOOTPRINT, UINT64, "estimated maximum memory footprint");
2109 STATUS_INIT(MEMORY_MALLOCATOR_VERSION, MEMORY_MALLOCATOR_VERSION, CHARSTR, "mallocator version");
2110 STATUS_INIT(MEMORY_MMAP_THRESHOLD, MEMORY_MMAP_THRESHOLD, UINT64, "mmap threshold");
2111 memory_status.initialized = true;
2112}
2113#undef STATUS_INIT
2114
2115#define MEMORY_STATUS_VALUE(x) memory_status.status[x].value.num
2116
2117static void
2118memory_get_status(void) {
2119 if (!memory_status.initialized)
2120 memory_status_init();
2121 LOCAL_MEMORY_STATUS_S local_memstat;
2122 toku_memory_get_status(&local_memstat);
2123 MEMORY_STATUS_VALUE(MEMORY_MALLOC_COUNT) = local_memstat.malloc_count;
2124 MEMORY_STATUS_VALUE(MEMORY_FREE_COUNT) = local_memstat.free_count;
2125 MEMORY_STATUS_VALUE(MEMORY_REALLOC_COUNT) = local_memstat.realloc_count;
2126 MEMORY_STATUS_VALUE(MEMORY_MALLOC_FAIL) = local_memstat.malloc_fail;
2127 MEMORY_STATUS_VALUE(MEMORY_REALLOC_FAIL) = local_memstat.realloc_fail;
2128 MEMORY_STATUS_VALUE(MEMORY_REQUESTED) = local_memstat.requested;
2129 MEMORY_STATUS_VALUE(MEMORY_USED) = local_memstat.used;
2130 MEMORY_STATUS_VALUE(MEMORY_FREED) = local_memstat.freed;
2131 MEMORY_STATUS_VALUE(MEMORY_MAX_IN_USE) = local_memstat.max_in_use;
2132 MEMORY_STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = local_memstat.mmap_threshold;
2133 memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str = local_memstat.mallocator_version;
2134}
2135#undef MEMORY_STATUS_VALUE
2136
2137// how many rows are in engine status?
2138static int
2139env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp) {
2140 uint64_t num_rows = 0;
2141 num_rows += YDB_LAYER_STATUS_NUM_ROWS;
2142 num_rows += YDB_C_LAYER_STATUS_NUM_ROWS;
2143 num_rows += YDB_WRITE_LAYER_STATUS_NUM_ROWS;
2144 num_rows += LE_STATUS_S::LE_STATUS_NUM_ROWS;
2145 num_rows += CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS;
2146 num_rows += CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS;
2147 num_rows += LTM_STATUS_S::LTM_STATUS_NUM_ROWS;
2148 num_rows += FT_STATUS_S::FT_STATUS_NUM_ROWS;
2149 num_rows += FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS;
2150 num_rows += FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS;
2151 num_rows += TXN_STATUS_S::TXN_STATUS_NUM_ROWS;
2152 num_rows += LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS;
2153 num_rows += MEMORY_STATUS_NUM_ROWS;
2154 num_rows += FS_STATUS_NUM_ROWS;
2155 num_rows += INDEXER_STATUS_NUM_ROWS;
2156 num_rows += LOADER_STATUS_NUM_ROWS;
2157 num_rows += CTX_STATUS_NUM_ROWS;
2158#if 0
2159 // enable when upgrade is supported
2160 num_rows += FT_UPGRADE_STATUS_NUM_ROWS;
2161 num_rows += PERSISTENT_UPGRADE_STATUS_NUM_ROWS;
2162#endif
2163 *num_rowsp = num_rows;
2164 return 0;
2165}
2166
2167// Do not take ydb lock or any other lock around or in this function.
2168// If the engine is blocked because some thread is holding a lock, this function
2169// can help diagnose the problem.
2170// This function only collects information, and it does not matter if something gets garbled
2171// because of a race condition.
2172// Note, engine status is still collected even if the environment or logger is panicked
2173static int
2174env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t maxrows, uint64_t *num_rows, fs_redzone_state* redzone_state, uint64_t * env_panicp, char * env_panic_string_buf, int env_panic_string_length, toku_engine_status_include_type include_flags) {
2175 int r;
2176
2177 if (env_panic_string_buf) {
2178 if (env && env->i && env->i->is_panicked && env->i->panic_string) {
2179 strncpy(env_panic_string_buf, env->i->panic_string, env_panic_string_length);
2180 env_panic_string_buf[env_panic_string_length - 1] = '\0'; // just in case
2181 }
2182 else
2183 *env_panic_string_buf = '\0';
2184 }
2185
2186 if ( !(env) ||
2187 !(env->i) ||
2188 !(env_opened(env)) ||
2189 !num_rows ||
2190 !include_flags)
2191 r = EINVAL;
2192 else {
2193 r = 0;
2194 uint64_t row = 0; // which row to fill next
2195 *env_panicp = env->i->is_panicked;
2196
2197 {
2198 YDB_LAYER_STATUS_S ydb_stat;
2199 ydb_layer_get_status(env, &ydb_stat);
2200 for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2201 if (ydb_stat.status[i].include & include_flags) {
2202 engstat[row++] = ydb_stat.status[i];
2203 }
2204 }
2205 }
2206 {
2207 YDB_C_LAYER_STATUS_S ydb_c_stat;
2208 ydb_c_layer_get_status(&ydb_c_stat);
2209 for (int i = 0; i < YDB_C_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2210 if (ydb_c_stat.status[i].include & include_flags) {
2211 engstat[row++] = ydb_c_stat.status[i];
2212 }
2213 }
2214 }
2215 {
2216 YDB_WRITE_LAYER_STATUS_S ydb_write_stat;
2217 ydb_write_layer_get_status(&ydb_write_stat);
2218 for (int i = 0; i < YDB_WRITE_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2219 if (ydb_write_stat.status[i].include & include_flags) {
2220 engstat[row++] = ydb_write_stat.status[i];
2221 }
2222 }
2223 }
2224 {
2225 LE_STATUS_S lestat; // Rice's vampire
2226 toku_le_get_status(&lestat);
2227 for (int i = 0; i < LE_STATUS_S::LE_STATUS_NUM_ROWS && row < maxrows; i++) {
2228 if (lestat.status[i].include & include_flags) {
2229 engstat[row++] = lestat.status[i];
2230 }
2231 }
2232 }
2233 {
2234 CHECKPOINT_STATUS_S cpstat;
2235 toku_checkpoint_get_status(env->i->cachetable, &cpstat);
2236 for (int i = 0; i < CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS && row < maxrows; i++) {
2237 if (cpstat.status[i].include & include_flags) {
2238 engstat[row++] = cpstat.status[i];
2239 }
2240 }
2241 }
2242 {
2243 CACHETABLE_STATUS_S ctstat;
2244 toku_cachetable_get_status(env->i->cachetable, &ctstat);
2245 for (int i = 0; i < CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS && row < maxrows; i++) {
2246 if (ctstat.status[i].include & include_flags) {
2247 engstat[row++] = ctstat.status[i];
2248 }
2249 }
2250 }
2251 {
2252 LTM_STATUS_S ltmstat;
2253 env->i->ltm.get_status(&ltmstat);
2254 for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS && row < maxrows; i++) {
2255 if (ltmstat.status[i].include & include_flags) {
2256 engstat[row++] = ltmstat.status[i];
2257 }
2258 }
2259 }
2260 {
2261 FT_STATUS_S ftstat;
2262 toku_ft_get_status(&ftstat);
2263 for (int i = 0; i < FT_STATUS_S::FT_STATUS_NUM_ROWS && row < maxrows; i++) {
2264 if (ftstat.status[i].include & include_flags) {
2265 engstat[row++] = ftstat.status[i];
2266 }
2267 }
2268 }
2269 {
2270 FT_FLUSHER_STATUS_S flusherstat;
2271 toku_ft_flusher_get_status(&flusherstat);
2272 for (int i = 0; i < FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS && row < maxrows; i++) {
2273 if (flusherstat.status[i].include & include_flags) {
2274 engstat[row++] = flusherstat.status[i];
2275 }
2276 }
2277 }
2278 {
2279 FT_HOT_STATUS_S hotstat;
2280 toku_ft_hot_get_status(&hotstat);
2281 for (int i = 0; i < FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS && row < maxrows; i++) {
2282 if (hotstat.status[i].include & include_flags) {
2283 engstat[row++] = hotstat.status[i];
2284 }
2285 }
2286 }
2287 {
2288 TXN_STATUS_S txnstat;
2289 toku_txn_get_status(&txnstat);
2290 for (int i = 0; i < TXN_STATUS_S::TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
2291 if (txnstat.status[i].include & include_flags) {
2292 engstat[row++] = txnstat.status[i];
2293 }
2294 }
2295 }
2296 {
2297 LOGGER_STATUS_S loggerstat;
2298 toku_logger_get_status(env->i->logger, &loggerstat);
2299 for (int i = 0; i < LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS && row < maxrows; i++) {
2300 if (loggerstat.status[i].include & include_flags) {
2301 engstat[row++] = loggerstat.status[i];
2302 }
2303 }
2304 }
2305
2306 {
2307 INDEXER_STATUS_S indexerstat;
2308 toku_indexer_get_status(&indexerstat);
2309 for (int i = 0; i < INDEXER_STATUS_NUM_ROWS && row < maxrows; i++) {
2310 if (indexerstat.status[i].include & include_flags) {
2311 engstat[row++] = indexerstat.status[i];
2312 }
2313 }
2314 }
2315 {
2316 LOADER_STATUS_S loaderstat;
2317 toku_loader_get_status(&loaderstat);
2318 for (int i = 0; i < LOADER_STATUS_NUM_ROWS && row < maxrows; i++) {
2319 if (loaderstat.status[i].include & include_flags) {
2320 engstat[row++] = loaderstat.status[i];
2321 }
2322 }
2323 }
2324
2325 {
2326 // memory_status is local to this file
2327 memory_get_status();
2328 for (int i = 0; i < MEMORY_STATUS_NUM_ROWS && row < maxrows; i++) {
2329 if (memory_status.status[i].include & include_flags) {
2330 engstat[row++] = memory_status.status[i];
2331 }
2332 }
2333 }
2334 {
2335 // Note, fs_get_status() and the fsstat structure are local to this file because they
2336 // are used to concentrate file system information collected from various places.
2337 fs_get_status(env, redzone_state);
2338 for (int i = 0; i < FS_STATUS_NUM_ROWS && row < maxrows; i++) {
2339 if (fsstat.status[i].include & include_flags) {
2340 engstat[row++] = fsstat.status[i];
2341 }
2342 }
2343 }
2344 {
2345 struct context_status ctxstatus;
2346 toku_context_get_status(&ctxstatus);
2347 for (int i = 0; i < CTX_STATUS_NUM_ROWS && row < maxrows; i++) {
2348 if (ctxstatus.status[i].include & include_flags) {
2349 engstat[row++] = ctxstatus.status[i];
2350 }
2351 }
2352 }
2353#if 0
2354 // enable when upgrade is supported
2355 {
2356 for (int i = 0; i < PERSISTENT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2357 if (persistent_upgrade_status.status[i].include & include_flags) {
2358 engstat[row++] = persistent_upgrade_status.status[i];
2359 }
2360 }
2361 FT_UPGRADE_STATUS_S ft_upgradestat;
2362 toku_ft_upgrade_get_status(&ft_upgradestat);
2363 for (int i = 0; i < FT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2364 if (ft_upgradestat.status[i].include & include_flags) {
2365 engstat[row++] = ft_upgradestat.status[i];
2366 }
2367 }
2368
2369 }
2370#endif
2371 if (r==0) {
2372 *num_rows = row;
2373 }
2374 }
2375 return r;
2376}
2377
2378// Fill buff with text description of engine status up to bufsiz bytes.
2379// Intended for use by test programs that do not have the handlerton available,
2380// and for use by toku_assert logic to print diagnostic info on crash.
2381static int
2382env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
2383 uint32_t stringsize = 1024;
2384 uint64_t panic;
2385 char panicstring[stringsize];
2386 int n = 0; // number of characters printed so far
2387 uint64_t num_rows;
2388 uint64_t max_rows;
2389 fs_redzone_state redzone_state;
2390
2391 n = snprintf(buff, bufsiz - n, "BUILD_ID = %d\n", BUILD_ID);
2392
2393 (void) env_get_engine_status_num_rows (env, &max_rows);
2394 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2395 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2396
2397 if (r) {
2398 n += snprintf(buff + n, bufsiz - n, "Engine status not available: ");
2399 if (!env) {
2400 n += snprintf(buff + n, bufsiz - n, "no environment\n");
2401 }
2402 else if (!(env->i)) {
2403 n += snprintf(buff + n, bufsiz - n, "environment internal struct is null\n");
2404 }
2405 else if (!env_opened(env)) {
2406 n += snprintf(buff + n, bufsiz - n, "environment is not open\n");
2407 }
2408 }
2409 else {
2410 if (panic) {
2411 n += snprintf(buff + n, bufsiz - n, "Env panic code: %" PRIu64 "\n", panic);
2412 if (strlen(panicstring)) {
2413 invariant(strlen(panicstring) <= stringsize);
2414 n += snprintf(buff + n, bufsiz - n, "Env panic string: %s\n", panicstring);
2415 }
2416 }
2417
2418 for (uint64_t row = 0; row < num_rows; row++) {
2419 n += snprintf(buff + n, bufsiz - n, "%s: ", mystat[row].legend);
2420 switch (mystat[row].type) {
2421 case FS_STATE:
2422 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2423 break;
2424 case UINT64:
2425 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2426 break;
2427 case CHARSTR:
2428 n += snprintf(buff + n, bufsiz - n, "%s\n", mystat[row].value.str);
2429 break;
2430 case UNIXTIME:
2431 {
2432 char tbuf[26];
2433 format_time((time_t*)&mystat[row].value.num, tbuf);
2434 n += snprintf(buff + n, bufsiz - n, "%s\n", tbuf);
2435 }
2436 break;
2437 case TOKUTIME:
2438 {
2439 double t = tokutime_to_seconds(mystat[row].value.num);
2440 n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
2441 }
2442 break;
2443 case PARCOUNT:
2444 {
2445 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2446 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
2447 }
2448 break;
2449 default:
2450 n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
2451 break;
2452 }
2453 }
2454 }
2455
2456 if (n > bufsiz) {
2457 const char * errmsg = "BUFFER TOO SMALL\n";
2458 int len = strlen(errmsg) + 1;
2459 (void) snprintf(buff + (bufsiz - 1) - len, len, "%s", errmsg);
2460 }
2461
2462 return r;
2463}
2464
2465// prints engine status using toku_env_err line-by-line
2466static int
2467env_err_engine_status(DB_ENV * env) {
2468 uint32_t stringsize = 1024;
2469 uint64_t panic;
2470 char panicstring[stringsize];
2471 uint64_t num_rows;
2472 uint64_t max_rows;
2473 fs_redzone_state redzone_state;
2474
2475 toku_env_err(env, 0, "BUILD_ID = %d", BUILD_ID);
2476
2477 (void) env_get_engine_status_num_rows (env, &max_rows);
2478 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2479 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2480
2481 if (r) {
2482 toku_env_err(env, 0, "Engine status not available: ");
2483 if (!env) {
2484 toku_env_err(env, 0, "no environment");
2485 }
2486 else if (!(env->i)) {
2487 toku_env_err(env, 0, "environment internal struct is null");
2488 }
2489 else if (!env_opened(env)) {
2490 toku_env_err(env, 0, "environment is not open");
2491 }
2492 }
2493 else {
2494 if (panic) {
2495 toku_env_err(env, 0, "Env panic code: %" PRIu64, panic);
2496 if (strlen(panicstring)) {
2497 invariant(strlen(panicstring) <= stringsize);
2498 toku_env_err(env, 0, "Env panic string: %s", panicstring);
2499 }
2500 }
2501
2502 for (uint64_t row = 0; row < num_rows; row++) {
2503 switch (mystat[row].type) {
2504 case FS_STATE:
2505 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2506 break;
2507 case UINT64:
2508 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2509 break;
2510 case CHARSTR:
2511 toku_env_err(env, 0, "%s: %s", mystat[row].legend, mystat[row].value.str);
2512 break;
2513 case UNIXTIME:
2514 {
2515 char tbuf[26];
2516 format_time((time_t*)&mystat[row].value.num, tbuf);
2517 toku_env_err(env, 0, "%s: %s", mystat[row].legend, tbuf);
2518 }
2519 break;
2520 case TOKUTIME:
2521 {
2522 double t = tokutime_to_seconds(mystat[row].value.num);
2523 toku_env_err(env, 0, "%s: %.6f", mystat[row].legend, t);
2524 }
2525 break;
2526 case PARCOUNT:
2527 {
2528 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2529 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, v);
2530 }
2531 break;
2532 default:
2533 toku_env_err(env, 0, "%s: UNKNOWN STATUS TYPE: %d", mystat[row].legend, mystat[row].type);
2534 break;
2535 }
2536 }
2537 }
2538
2539 return r;
2540}
2541
2542// intended for use by toku_assert logic, when env is not known
2543static int
2544toku_maybe_get_engine_status_text (char * buff, int buffsize) {
2545 DB_ENV * env = most_recent_env;
2546 int r;
2547 if (engine_status_enable && env != NULL) {
2548 r = env_get_engine_status_text(env, buff, buffsize);
2549 }
2550 else {
2551 r = EOPNOTSUPP;
2552 snprintf(buff, buffsize, "Engine status not available: disabled by user. This should only happen in test programs.\n");
2553 }
2554 return r;
2555}
2556
2557static int
2558toku_maybe_err_engine_status (void) {
2559 DB_ENV * env = most_recent_env;
2560 int r;
2561 if (engine_status_enable && env != NULL) {
2562 r = env_err_engine_status(env);
2563 }
2564 else {
2565 r = EOPNOTSUPP;
2566 }
2567 return r;
2568}
2569
2570// Set panic code and panic string if not already panicked,
2571// intended for use by toku_assert when about to abort().
2572static void
2573toku_maybe_set_env_panic(int code, const char * msg) {
2574 if (code == 0)
2575 code = -1;
2576 if (msg == NULL)
2577 msg = "Unknown cause from abort (failed assert)\n";
2578 env_is_panicked = code; // disable library destructor no matter what
2579 DB_ENV * env = most_recent_env;
2580 if (env &&
2581 env->i &&
2582 (env->i->is_panicked == 0)) {
2583 env_panic(env, code, msg);
2584 }
2585}
2586
2587// handlerton's call to fractal tree layer on failed assert in handlerton
2588static int
2589env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* file, int line, int caller_errno) {
2590 toku_do_assert_fail(msg, fun, file, line, caller_errno);
2591 return -1; // placate compiler
2592}
2593
2594static int
2595env_get_cursor_for_persistent_environment(DB_ENV* env, DB_TXN* txn, DBC** c) {
2596 if (!env_opened(env)) {
2597 return EINVAL;
2598 }
2599 return toku_db_cursor(env->i->persistent_environment, txn, c, 0);
2600}
2601
2602static int
2603env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
2604 if (!env_opened(env)) {
2605 return EINVAL;
2606 }
2607 return toku_db_cursor(env->i->directory, txn, c, 0);
2608}
2609
2610static DB *
2611env_get_db_for_directory(DB_ENV* env) {
2612 if (!env_opened(env)) {
2613 return NULL;
2614 }
2615 return env->i->directory;
2616}
2617
2618struct ltm_iterate_requests_callback_extra {
2619 ltm_iterate_requests_callback_extra(DB_ENV *e,
2620 iterate_requests_callback cb,
2621 void *ex) :
2622 env(e), callback(cb), extra(ex) {
2623 }
2624 DB_ENV *env;
2625 iterate_requests_callback callback;
2626 void *extra;
2627};
2628
2629static int
2630find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
2631 DICTIONARY_ID dict_id = db->i->dict_id;
2632 if (dict_id.dictid < dict_id_find.dictid) {
2633 return -1;
2634 } else if (dict_id.dictid > dict_id_find.dictid) {
2635 return 1;
2636 } else {
2637 return 0;
2638 }
2639}
2640
2641static DB *
2642locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
2643 DB *db;
2644 int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
2645 return r == 0 ? db : nullptr;
2646}
2647
2648static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
2649 const DBT *left_key,
2650 const DBT *right_key,
2651 TXNID blocking_txnid,
2652 uint64_t start_time,
2653 void *extra) {
2654 ltm_iterate_requests_callback_extra *info =
2655 reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
2656
2657 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2658 int r = 0;
2659 DB *db = locked_get_db_by_dict_id(info->env, dict_id);
2660 if (db != nullptr) {
2661 r = info->callback(db, txnid, left_key, right_key,
2662 blocking_txnid, start_time, info->extra);
2663 }
2664 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2665 return r;
2666}
2667
2668static int
2669env_iterate_pending_lock_requests(DB_ENV *env,
2670 iterate_requests_callback callback,
2671 void *extra) {
2672 if (!env_opened(env)) {
2673 return EINVAL;
2674 }
2675
2676 toku::locktree_manager *mgr = &env->i->ltm;
2677 ltm_iterate_requests_callback_extra e(env, callback, extra);
2678 return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
2679}
2680
2681// for the lifetime of this object:
2682// - open_dbs_rwlock must be read locked (or better)
2683// - txn_mutex must be held
2684struct iter_txn_row_locks_callback_extra {
2685 iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
2686 env(e), current_db(nullptr), which_lt(0), lt_map(m) {
2687 if (lt_map->size() > 0) {
2688 set_iterator_and_current_db();
2689 }
2690 }
2691
2692 void set_iterator_and_current_db() {
2693 txn_lt_key_ranges ranges;
2694 const int r = lt_map->fetch(which_lt, &ranges);
2695 invariant_zero(r);
2696 current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
2697 iter = toku::range_buffer::iterator(ranges.buffer);
2698 }
2699
2700 DB_ENV *env;
2701 DB *current_db;
2702 size_t which_lt;
2703 toku::omt<txn_lt_key_ranges> *lt_map;
2704 toku::range_buffer::iterator iter;
2705 toku::range_buffer::iterator::record rec;
2706};
2707
2708static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
2709 iter_txn_row_locks_callback_extra *info =
2710 reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
2711
2712 while (info->which_lt < info->lt_map->size()) {
2713 const bool more = info->iter.current(&info->rec);
2714 if (more) {
2715 *db = info->current_db;
2716 // The caller should interpret data/size == 0 to mean infinity.
2717 // Therefore, when we copyref pos/neg infinity into left/right_key,
2718 // the caller knows what we're talking about.
2719 toku_copyref_dbt(left_key, *info->rec.get_left_key());
2720 toku_copyref_dbt(right_key, *info->rec.get_right_key());
2721 info->iter.next();
2722 return 0;
2723 } else {
2724 info->which_lt++;
2725 if (info->which_lt < info->lt_map->size()) {
2726 info->set_iterator_and_current_db();
2727 }
2728 }
2729 }
2730 return DB_NOTFOUND;
2731}
2732
2733struct iter_txns_callback_extra {
2734 iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
2735 env(e), callback(cb), extra(ex) {
2736 }
2737 DB_ENV *env;
2738 iterate_transactions_callback callback;
2739 void *extra;
2740};
2741
2742static int iter_txns_callback(TOKUTXN txn, void *extra) {
2743 int r = 0;
2744 iter_txns_callback_extra *info =
2745 reinterpret_cast<iter_txns_callback_extra *>(extra);
2746 DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
2747 invariant_notnull(dbtxn);
2748 struct __toku_db_txn_internal *db_txn_internal __attribute__((__unused__)) = db_txn_struct_i(dbtxn);
2749 TOKU_VALGRIND_HG_DISABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2750 if (db_txn_struct_i(dbtxn)->tokutxn == txn) { // make sure that the dbtxn is fully initialized
2751 toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
2752 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2753
2754 iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
2755 r = info->callback(dbtxn, iter_txn_row_locks_callback, &e, info->extra);
2756
2757 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2758 toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
2759 }
2760 TOKU_VALGRIND_HG_ENABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2761
2762 return r;
2763}
2764
2765static int
2766env_iterate_live_transactions(DB_ENV *env,
2767 iterate_transactions_callback callback,
2768 void *extra) {
2769 if (!env_opened(env)) {
2770 return EINVAL;
2771 }
2772
2773 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
2774 iter_txns_callback_extra e(env, callback, extra);
2775 return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
2776}
2777
2778static void env_set_loader_memory_size(DB_ENV *env, uint64_t (*get_loader_memory_size_callback)(void)) {
2779 env->i->get_loader_memory_size_callback = get_loader_memory_size_callback;
2780}
2781
2782static uint64_t env_get_loader_memory_size(DB_ENV *env) {
2783 uint64_t memory_size = 0;
2784 if (env->i->get_loader_memory_size_callback)
2785 memory_size = env->i->get_loader_memory_size_callback();
2786 return memory_size;
2787}
2788
2789static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_msec, uint64_t (*get_killed_time_callback)(uint64_t default_killed_time_msec), int (*killed_callback)(void)) {
2790 env->i->default_killed_time_msec = default_killed_time_msec;
2791 env->i->get_killed_time_callback = get_killed_time_callback;
2792 env->i->killed_callback = killed_callback;
2793}
2794
2795static void env_kill_waiter(DB_ENV *env, void *extra) {
2796 env->i->ltm.kill_waiter(extra);
2797}
2798
2799static void env_do_backtrace(DB_ENV *env) {
2800 if (env->i->errcall) {
2801 db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
2802 }
2803 if (env->i->errfile) {
2804 db_env_do_backtrace((FILE *) env->i->errfile);
2805 } else {
2806 db_env_do_backtrace(stderr);
2807 }
2808}
2809
2810static int
2811toku_env_create(DB_ENV ** envp, uint32_t flags) {
2812 int r = ENOSYS;
2813 DB_ENV* result = NULL;
2814
2815 if (flags!=0) { r = EINVAL; goto cleanup; }
2816 MALLOC(result);
2817 if (result == 0) { r = ENOMEM; goto cleanup; }
2818 memset(result, 0, sizeof *result);
2819
2820 // locked methods
2821 result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_env_err;
2822#define SENV(name) result->name = locked_env_ ## name
2823 SENV(dbremove);
2824 SENV(dbrename);
2825 SENV(dirtool_attach);
2826 SENV(dirtool_detach);
2827 SENV(dirtool_move);
2828 //SENV(set_noticecall);
2829#undef SENV
2830#define USENV(name) result->name = env_ ## name
2831 // methods with locking done internally
2832 USENV(put_multiple);
2833 USENV(del_multiple);
2834 USENV(update_multiple);
2835 // unlocked methods
2836 USENV(open);
2837 USENV(close);
2838 USENV(set_default_bt_compare);
2839 USENV(set_update);
2840 USENV(set_generate_row_callback_for_put);
2841 USENV(set_generate_row_callback_for_del);
2842 USENV(set_lg_bsize);
2843 USENV(set_lg_dir);
2844 USENV(set_lg_max);
2845 USENV(get_lg_max);
2846 USENV(set_lk_max_memory);
2847 USENV(get_lk_max_memory);
2848 USENV(get_iname);
2849 USENV(set_errcall);
2850 USENV(set_errfile);
2851 USENV(set_errpfx);
2852 USENV(set_data_dir);
2853 USENV(checkpointing_set_period);
2854 USENV(checkpointing_get_period);
2855 USENV(cleaner_set_period);
2856 USENV(cleaner_get_period);
2857 USENV(cleaner_set_iterations);
2858 USENV(cleaner_get_iterations);
2859 USENV(evictor_set_enable_partial_eviction);
2860 USENV(evictor_get_enable_partial_eviction);
2861 USENV(set_cachesize);
2862 USENV(set_client_pool_threads);
2863 USENV(set_cachetable_pool_threads);
2864 USENV(set_checkpoint_pool_threads);
2865#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
2866 USENV(get_cachesize);
2867#endif
2868#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
2869 USENV(set_lk_max);
2870#endif
2871 USENV(set_lk_detect);
2872 USENV(set_flags);
2873 USENV(set_tmp_dir);
2874 USENV(set_verbose);
2875 USENV(txn_recover);
2876 USENV(txn_xa_recover);
2877 USENV(get_txn_from_xid);
2878 USENV(txn_stat);
2879 USENV(get_lock_timeout);
2880 USENV(set_lock_timeout);
2881 USENV(set_lock_timeout_callback);
2882 USENV(set_lock_wait_callback);
2883 USENV(set_redzone);
2884 USENV(log_flush);
2885 USENV(log_archive);
2886 USENV(create_loader);
2887 USENV(get_cursor_for_persistent_environment);
2888 USENV(get_cursor_for_directory);
2889 USENV(get_db_for_directory);
2890 USENV(iterate_pending_lock_requests);
2891 USENV(iterate_live_transactions);
2892 USENV(change_fsync_log_period);
2893 USENV(set_loader_memory_size);
2894 USENV(get_loader_memory_size);
2895 USENV(set_killed_callback);
2896 USENV(do_backtrace);
2897 USENV(set_check_thp);
2898 USENV(get_check_thp);
2899 USENV(set_dir_per_db);
2900 USENV(get_dir_per_db);
2901 USENV(get_data_dir);
2902 USENV(kill_waiter);
2903#undef USENV
2904
2905 // unlocked methods
2906 result->create_indexer = toku_indexer_create_indexer;
2907 result->txn_checkpoint = toku_env_txn_checkpoint;
2908 result->checkpointing_postpone = env_checkpointing_postpone;
2909 result->checkpointing_resume = env_checkpointing_resume;
2910 result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
2911 result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
2912 result->get_engine_status_num_rows = env_get_engine_status_num_rows;
2913 result->get_engine_status = env_get_engine_status;
2914 result->get_engine_status_text = env_get_engine_status_text;
2915 result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert
2916 result->txn_begin = toku_txn_begin;
2917
2918 MALLOC(result->i);
2919 if (result->i == 0) { r = ENOMEM; goto cleanup; }
2920 memset(result->i, 0, sizeof *result->i);
2921 result->i->envdir_lockfd = -1;
2922 result->i->datadir_lockfd = -1;
2923 result->i->logdir_lockfd = -1;
2924 result->i->tmpdir_lockfd = -1;
2925 env_fs_init(result);
2926 env_fsync_log_init(result);
2927
2928 result->i->check_thp = true;
2929
2930 result->i->bt_compare = toku_builtin_compare_fun;
2931
2932 r = toku_logger_create(&result->i->logger);
2933 invariant_zero(r);
2934 invariant_notnull(result->i->logger);
2935
2936 // Create the locktree manager, passing in the create/destroy/escalate callbacks.
2937 // The extra parameter for escalation is simply a pointer to this environment.
2938 // The escalate callback will need it to translate txnids to DB_TXNs
2939 result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
2940
2941 XMALLOC(result->i->open_dbs_by_dname);
2942 result->i->open_dbs_by_dname->create();
2943 XMALLOC(result->i->open_dbs_by_dict_id);
2944 result->i->open_dbs_by_dict_id->create();
2945 toku_pthread_rwlock_init(
2946 *result_i_open_dbs_rwlock_key, &result->i->open_dbs_rwlock, nullptr);
2947
2948 *envp = result;
2949 r = 0;
2950 toku_sync_fetch_and_add(&tokuft_num_envs, 1);
2951cleanup:
2952 if (r!=0) {
2953 if (result) {
2954 toku_free(result->i);
2955 toku_free(result);
2956 }
2957 }
2958 return r;
2959}
2960
2961int
2962DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
2963 int r = toku_env_create(envp, flags);
2964 return r;
2965}
2966
2967// return 0 if v and dbv refer to same db (including same dname)
2968// return <0 if v is earlier in omt than dbv
2969// return >0 if v is later in omt than dbv
2970static int
2971find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
2972 int cmp;
2973 const char *dname = db->i->dname;
2974 const char *dnamefind = dbfind->i->dname;
2975 cmp = strcmp(dname, dnamefind);
2976 if (cmp != 0) return cmp;
2977 if (db < dbfind) return -1;
2978 if (db > dbfind) return 1;
2979 return 0;
2980}
2981
2982static int
2983find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
2984 DICTIONARY_ID dict_id = db->i->dict_id;
2985 DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
2986 if (dict_id.dictid < dict_id_find.dictid) {
2987 return -1;
2988 } else if (dict_id.dictid > dict_id_find.dictid) {
2989 return 1;
2990 } else if (db < dbfind) {
2991 return -1;
2992 } else if (db > dbfind) {
2993 return 1;
2994 } else {
2995 return 0;
2996 }
2997}
2998
2999// Tell env that there is a new db handle (with non-unique dname in db->i-dname)
3000void
3001env_note_db_opened(DB_ENV *env, DB *db) {
3002 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3003 assert(db->i->dname); // internal (non-user) dictionary has no dname
3004
3005 int r;
3006 uint32_t idx;
3007
3008 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3009 assert(r == DB_NOTFOUND);
3010 r = env->i->open_dbs_by_dname->insert_at(db, idx);
3011 assert_zero(r);
3012 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3013 assert(r == DB_NOTFOUND);
3014 r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
3015 assert_zero(r);
3016
3017 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3018 STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
3019 if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
3020 STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
3021 }
3022 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3023}
3024
3025// Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
3026void
3027env_note_db_closed(DB_ENV *env, DB *db) {
3028 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3029 assert(db->i->dname); // internal (non-user) dictionary has no dname
3030 assert(env->i->open_dbs_by_dname->size() > 0);
3031 assert(env->i->open_dbs_by_dict_id->size() > 0);
3032
3033 int r;
3034 uint32_t idx;
3035
3036 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3037 assert_zero(r);
3038 r = env->i->open_dbs_by_dname->delete_at(idx);
3039 assert_zero(r);
3040 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3041 assert_zero(r);
3042 r = env->i->open_dbs_by_dict_id->delete_at(idx);
3043 assert_zero(r);
3044
3045 STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
3046 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3047 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3048}
3049
3050static int
3051find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
3052 return strcmp(db->i->dname, dnamefind);
3053}
3054
3055// return true if there is any db open with the given dname
3056static bool
3057env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
3058 DB *db;
3059 toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
3060 int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
3061 if (r == 0) {
3062 invariant(strcmp(dname, db->i->dname) == 0);
3063 } else {
3064 invariant(r == DB_NOTFOUND);
3065 }
3066 toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
3067 return r == 0 ? true : false;
3068}
3069
3070//We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
3071static int
3072env_dbremove_subdb(DB_ENV * env, DB_TXN * txn, const char *fname, const char *dbname, int32_t flags) {
3073 int r;
3074 if (!fname || !dbname) r = EINVAL;
3075 else {
3076 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3077 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3078 assert(bytes==(int)sizeof(subdb_full_name)-1);
3079 const char *null_subdbname = NULL;
3080 r = env_dbremove(env, txn, subdb_full_name, null_subdbname, flags);
3081 }
3082 return r;
3083}
3084
3085// see if we can acquire a table lock for the given dname.
3086// requires: write lock on dname in the directory. dictionary
3087// open, close, and begin checkpoint cannot occur.
3088// returns: zero if we could open, lock, and close a dictionary
3089// with the given dname, errno otherwise.
3090static int
3091can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
3092 int r;
3093 DB *db;
3094
3095 r = toku_db_create(&db, env, 0);
3096 assert_zero(r);
3097 r = toku_db_open_iname(db, txn, iname_in_env, 0, 0);
3098 if(r) {
3099 if (r == ENAMETOOLONG)
3100 toku_ydb_do_error(env, r, "File name too long!\n");
3101 goto exit;
3102 }
3103 r = toku_db_pre_acquire_table_lock(db, txn);
3104 if (r) {
3105 r = DB_LOCK_NOTGRANTED;
3106 }
3107exit:
3108 if(db) {
3109 int r2 = toku_db_close(db);
3110 assert_zero(r2);
3111 }
3112 return r;
3113}
3114
3115static int
3116env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
3117 int r;
3118 HANDLE_PANICKED_ENV(env);
3119 if (!env_opened(env) || flags != 0) {
3120 return EINVAL;
3121 }
3122 HANDLE_READ_ONLY_TXN(txn);
3123 if (dbname != NULL) {
3124 // env_dbremove_subdb() converts (fname, dbname) to dname
3125 return env_dbremove_subdb(env, txn, fname, dbname, flags);
3126 }
3127
3128 const char * dname = fname;
3129 assert(dbname == NULL);
3130
3131 // We check for an open db here as a "fast path" to error.
3132 // We'll need to check again below to be sure.
3133 if (env_is_db_with_dname_open(env, dname)) {
3134 return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3135 }
3136
3137 DBT dname_dbt;
3138 DBT iname_dbt;
3139 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
3140 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3141
3142 // get iname
3143 r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3144 char *iname = (char *) iname_dbt.data;
3145 DB *db = NULL;
3146 if (r != 0) {
3147 if (r == DB_NOTFOUND) {
3148 r = ENOENT;
3149 }
3150 goto exit;
3151 }
3152 // remove (dname,iname) from directory
3153 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
3154 if (r != 0) {
3155 goto exit;
3156 }
3157 r = toku_db_create(&db, env, 0);
3158 lazy_assert_zero(r);
3159 r = toku_db_open_iname(db, txn, iname, 0, 0);
3160 if (txn && r) {
3161 if (r == EMFILE || r == ENFILE)
3162 r = toku_ydb_do_error(env, r, "toku dbremove failed because open file limit reached\n");
3163 else if (r != ENOENT)
3164 r = toku_ydb_do_error(env, r, "toku dbremove failed\n");
3165 else
3166 r = 0;
3167 goto exit;
3168 }
3169 if (txn) {
3170 // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
3171 if (env_is_db_with_dname_open(env, dname)) {
3172 r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3173 goto exit;
3174 }
3175 // we know a live db handle does not exist.
3176 //
3177 // use the internally opened db to try and get a table lock
3178 //
3179 // if we can't get it, then some txn needs the ft and we
3180 // should return lock not granted.
3181 //
3182 // otherwise, we're okay in marking this ft as remove on
3183 // commit. no new handles can open for this dictionary
3184 // because the txn has directory write locks on the dname
3185 r = toku_db_pre_acquire_table_lock(db, txn);
3186 if (r != 0) {
3187 r = DB_LOCK_NOTGRANTED;
3188 goto exit;
3189 }
3190 // The ft will be unlinked when the txn commits
3191 toku_ft_unlink_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
3192 }
3193 else {
3194 // unlink the ft without a txn
3195 toku_ft_unlink(db->i->ft_handle);
3196 }
3197
3198exit:
3199 if (db) {
3200 int ret = toku_db_close(db);
3201 assert(ret == 0);
3202 }
3203 if (iname) {
3204 toku_free(iname);
3205 }
3206 return r;
3207}
3208
3209static int
3210env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3211 int r;
3212 if (!fname || !dbname || !newname) r = EINVAL;
3213 else {
3214 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3215 {
3216 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3217 assert(bytes==(int)sizeof(subdb_full_name)-1);
3218 }
3219 char new_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3220 {
3221 int bytes = snprintf(new_full_name, sizeof(new_full_name), "%s/%s", fname, dbname);
3222 assert(bytes==(int)sizeof(new_full_name)-1);
3223 }
3224 const char *null_subdbname = NULL;
3225 r = env_dbrename(env, txn, subdb_full_name, null_subdbname, new_full_name, flags);
3226 }
3227 return r;
3228}
3229
3230static int
3231env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3232 int r;
3233 HANDLE_PANICKED_ENV(env);
3234 if (!env_opened(env) || flags != 0) {
3235 return EINVAL;
3236 }
3237 HANDLE_READ_ONLY_TXN(txn);
3238 if (dbname != NULL) {
3239 // env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
3240 return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
3241 }
3242
3243 const char * dname = fname;
3244 assert(dbname == NULL);
3245
3246 // We check for open dnames for the old and new name as a "fast path" to error.
3247 // We will need to check these again later.
3248 if (env_is_db_with_dname_open(env, dname)) {
3249 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3250 }
3251 if (env_is_db_with_dname_open(env, newname)) {
3252 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3253 }
3254
3255 DBT old_dname_dbt;
3256 DBT new_dname_dbt;
3257 DBT iname_dbt;
3258 toku_fill_dbt(&old_dname_dbt, dname, strlen(dname)+1);
3259 toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
3260 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3261
3262 // get iname
3263 r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3264 char *iname = (char *) iname_dbt.data;
3265 if (r == DB_NOTFOUND) {
3266 r = ENOENT;
3267 } else if (r == 0) {
3268 // verify that newname does not already exist
3269 r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
3270 if (r == 0) {
3271 r = EEXIST;
3272 }
3273 else if (r == DB_NOTFOUND) {
3274 DBT new_iname_dbt;
3275 // Do not rename ft file if 'dir_per_db' option is not set
3276 auto new_iname =
3277 env->get_dir_per_db(env)
3278 ? generate_iname_for_rename_or_open(
3279 env, txn, newname, false)
3280 : std::unique_ptr<char[], decltype(&toku_free)>(
3281 toku_strdup(iname), &toku_free);
3282 toku_fill_dbt(
3283 &new_iname_dbt, new_iname.get(), strlen(new_iname.get()) + 1);
3284
3285 // remove old (dname,iname) and insert (newname,iname) in directory
3286 r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
3287 if (r != 0) { goto exit; }
3288
3289 // Do not rename ft file if 'dir_per_db' option is not set
3290 if (env->get_dir_per_db(env))
3291 r = toku_ft_rename_iname(txn,
3292 env->get_data_dir(env),
3293 iname,
3294 new_iname.get(),
3295 env->i->cachetable);
3296
3297 r = toku_db_put(env->i->directory,
3298 txn,
3299 &new_dname_dbt,
3300 &new_iname_dbt,
3301 0,
3302 true);
3303 if (r != 0) { goto exit; }
3304
3305 //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
3306 if (env_is_db_with_dname_open(env, dname)) {
3307 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3308 goto exit;
3309 }
3310 if (env_is_db_with_dname_open(env, newname)) {
3311 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3312 goto exit;
3313 }
3314
3315 // we know a live db handle does not exist.
3316 //
3317 // use the internally opened db to try and get a table lock
3318 //
3319 // if we can't get it, then some txn needs the ft and we
3320 // should return lock not granted.
3321 //
3322 // otherwise, we're okay in marking this ft as remove on
3323 // commit. no new handles can open for this dictionary
3324 // because the txn has directory write locks on the dname
3325 if (txn) {
3326 r = can_acquire_table_lock(env, txn, new_iname.get());
3327 }
3328 // We don't do anything at the ft or cachetable layer for rename.
3329 // We just update entries in the environment's directory.
3330 }
3331 }
3332
3333exit:
3334 if (iname) {
3335 toku_free(iname);
3336 }
3337 return r;
3338}
3339
3340int
3341DB_CREATE_FUN (DB ** db, DB_ENV * env, uint32_t flags) {
3342 int r = toku_db_create(db, env, flags);
3343 return r;
3344}
3345
3346/* need db_strerror_r for multiple threads */
3347
3348const char *
3349db_strerror(int error) {
3350 char *errorstr;
3351 if (error >= 0) {
3352 errorstr = strerror(error);
3353 if (errorstr)
3354 return errorstr;
3355 }
3356
3357 switch (error) {
3358 case DB_BADFORMAT:
3359 return "Database Bad Format (probably a corrupted database)";
3360 case DB_NOTFOUND:
3361 return "Not found";
3362 case TOKUDB_OUT_OF_LOCKS:
3363 return "Out of locks";
3364 case TOKUDB_DICTIONARY_TOO_OLD:
3365 return "Dictionary too old for this version of PerconaFT";
3366 case TOKUDB_DICTIONARY_TOO_NEW:
3367 return "Dictionary too new for this version of PerconaFT";
3368 case TOKUDB_CANCELED:
3369 return "User cancelled operation";
3370 case TOKUDB_NO_DATA:
3371 return "Ran out of data (not EOF)";
3372 case TOKUDB_HUGE_PAGES_ENABLED:
3373 return "Transparent huge pages are enabled but PerconaFT's memory allocator will oversubscribe main memory with transparent huge pages. This check can be disabled by setting the environment variable TOKU_HUGE_PAGES_OK.";
3374 }
3375
3376 static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
3377 errorstr = unknown_result;
3378 snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
3379 return errorstr;
3380}
3381
3382const char *
3383db_version(int *major, int *minor, int *patch) {
3384 if (major)
3385 *major = DB_VERSION_MAJOR;
3386 if (minor)
3387 *minor = DB_VERSION_MINOR;
3388 if (patch)
3389 *patch = DB_VERSION_PATCH;
3390 return toku_product_name_strings.db_version;
3391}
3392
3393// HACK: To ensure toku_pthread_yield gets included in the .so
3394// non-static would require a prototype in a header
3395// static (since unused) would give a warning
3396// static + unused would not actually help toku_pthread_yield get in the .so
3397// static + used avoids all the warnings and makes sure toku_pthread_yield is in the .so
3398static void __attribute__((__used__))
3399include_toku_pthread_yield (void) {
3400 toku_pthread_yield();
3401}
3402
3403// For test purposes only, translate dname to iname
3404// YDB lock is NOT held when this function is called,
3405// as it is called by user
3406static int
3407env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
3408 DB *directory = env->i->directory;
3409 int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_SERIALIZABLE|DB_PRELOCKED); // allocates memory for iname
3410 return r;
3411}
3412
3413// TODO 2216: Patch out this (dangerous) function when loader is working and
3414// we don't need to test the low-level redirect anymore.
3415// for use by test programs only, just a wrapper around ft call:
3416int
3417toku_test_db_redirect_dictionary(DB * db, const char * dname_of_new_file, DB_TXN *dbtxn) {
3418 int r;
3419 DBT dname_dbt;
3420 DBT iname_dbt;
3421 char * new_iname_in_env;
3422
3423 FT_HANDLE ft_handle = db->i->ft_handle;
3424 TOKUTXN tokutxn = db_txn_struct_i(dbtxn)->tokutxn;
3425
3426 toku_fill_dbt(&dname_dbt, dname_of_new_file, strlen(dname_of_new_file)+1);
3427 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3428 r = toku_db_get(db->dbenv->i->directory, dbtxn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3429 assert_zero(r);
3430 new_iname_in_env = (char *) iname_dbt.data;
3431
3432 toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
3433 r = toku_dictionary_redirect(new_iname_in_env, ft_handle, tokutxn);
3434 toku_multi_operation_client_unlock();
3435
3436 toku_free(new_iname_in_env);
3437 return r;
3438}
3439
3440//Tets only function
3441uint64_t
3442toku_test_get_latest_lsn(DB_ENV *env) {
3443 LSN rval = ZERO_LSN;
3444 if (env && env->i->logger) {
3445 rval = toku_logger_last_lsn(env->i->logger);
3446 }
3447 return rval.lsn;
3448}
3449
3450void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
3451 set_test_txn_sync_callback(cb, extra);
3452}
3453
3454int
3455toku_test_get_checkpointing_user_data_status (void) {
3456 return toku_cachetable_get_checkpointing_user_data_status();
3457}
3458
3459#undef STATUS_VALUE
3460#undef PERSISTENT_UPGRADE_STATUS_VALUE
3461
3462#include <toku_race_tools.h>
3463void __attribute__((constructor)) toku_ydb_helgrind_ignore(void);
3464void
3465toku_ydb_helgrind_ignore(void) {
3466 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_layer_status, sizeof ydb_layer_status);
3467}
3468