1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <ctype.h>
41
42#include <db.h>
43#include <locktree/locktree.h>
44#include <ft/ft.h>
45#include <ft/ft-flusher.h>
46#include <ft/cachetable/checkpoint.h>
47
48#include "ydb_cursor.h"
49#include "ydb_row_lock.h"
50#include "ydb_db.h"
51#include "ydb_write.h"
52#include "ydb-internal.h"
53#include "ydb_load.h"
54#include "indexer.h"
55#include <portability/toku_atomic.h>
56#include <util/status.h>
57#include <ft/le-cursor.h>
58
59static YDB_DB_LAYER_STATUS_S ydb_db_layer_status;
60#ifdef STATUS_VALUE
61#undef STATUS_VALUE
62#endif
63#define STATUS_VALUE(x) ydb_db_layer_status.status[x].value.num
64
65#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_db_layer_status, k, c, t, l, inc)
66
67static void
68ydb_db_layer_status_init (void) {
69 // Note, this function initializes the keyname, type, and legend fields.
70 // Value fields are initialized to zero by compiler.
71
72 STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS, nullptr, UINT64, "directory write locks", TOKU_ENGINE_STATUS);
73 STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL, nullptr, UINT64, "directory write locks fail", TOKU_ENGINE_STATUS);
74 STATUS_INIT(YDB_LAYER_LOGSUPPRESS, nullptr, UINT64, "log suppress", TOKU_ENGINE_STATUS);
75 STATUS_INIT(YDB_LAYER_LOGSUPPRESS_FAIL, nullptr, UINT64, "log suppress fail", TOKU_ENGINE_STATUS);
76 ydb_db_layer_status.initialized = true;
77}
78#undef STATUS_INIT
79
80void
81ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp) {
82 if (!ydb_db_layer_status.initialized)
83 ydb_db_layer_status_init();
84 *statp = ydb_db_layer_status;
85}
86
87void create_iname_hint(DB_ENV *env, const char *dname, char *hint) {
88 //Requires: size of hint array must be > strlen(dname)
89 //Copy alphanumeric characters only.
90 //Replace strings of non-alphanumeric characters with a single underscore.
91 if (env->get_dir_per_db(env) && !toku_os_is_absolute_name(dname)) {
92 assert(dname);
93 if (*dname == '.')
94 ++dname;
95 if (*dname == '/')
96 ++dname;
97 bool underscored = false;
98 bool dbdir_is_parsed = false;
99 // Do not change the first '/' because this is
100 // delimiter which splits name into database dir
101 // and table dir.
102 while (*dname) {
103 if (isalnum(*dname) || (*dname == '/' && !dbdir_is_parsed)) {
104 char c = *dname++;
105 *hint++ = c;
106 if (c == '/')
107 dbdir_is_parsed = true;
108 underscored = false;
109 } else if (!dbdir_is_parsed) {
110 char c = *dname++;
111 *hint++ = c;
112 } else {
113 if (!underscored)
114 *hint++ = '_';
115 dname++;
116 underscored = true;
117 }
118 }
119 *hint = '\0';
120 } else {
121 bool underscored = false;
122 while (*dname) {
123 if (isalnum(*dname)) {
124 char c = *dname++;
125 *hint++ = c;
126 underscored = false;
127 }
128 else {
129 if (!underscored)
130 *hint++ = '_';
131 dname++;
132 underscored = true;
133 }
134 }
135 *hint = '\0';
136 }
137}
138
139// n < 0 means to ignore mark and ignore n
140// n >= 0 means to include mark ("_B_" or "_P_") with hex value of n in iname
141// (intended for use by loader, which will create many inames using one txnid).
142char *create_iname(DB_ENV *env,
143 uint64_t id1,
144 uint64_t id2,
145 char *hint,
146 const char *mark,
147 int n) {
148 int bytes;
149 char inamebase[strlen(hint) +
150 8 + // hex file format version
151 24 + // hex id (normally the txnid's parent and child)
152 8 + // hex value of n if non-neg
153 sizeof("_B___.") + // extra pieces
154 strlen(toku_product_name)];
155 if (n < 0)
156 bytes = snprintf(inamebase, sizeof(inamebase),
157 "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32 ".%s",
158 hint, id1, id2, FT_LAYOUT_VERSION, toku_product_name);
159 else {
160 invariant(strlen(mark) == 1);
161 bytes = snprintf(inamebase, sizeof(inamebase),
162 "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32 "_%s_%" PRIx32 ".%s",
163 hint, id1, id2, FT_LAYOUT_VERSION, mark, n, toku_product_name);
164 }
165 assert(bytes>0);
166 assert(bytes<=(int)sizeof(inamebase)-1);
167 char *rval;
168 if (env->i->data_dir)
169 rval = toku_construct_full_name(2, env->i->data_dir, inamebase);
170 else
171 rval = toku_construct_full_name(1, inamebase);
172 assert(rval);
173 return rval;
174}
175
176static uint64_t nontransactional_open_id = 0;
177
178std::unique_ptr<char[], decltype(&toku_free)> generate_iname_for_rename_or_open(
179 DB_ENV *env,
180 DB_TXN *txn,
181 const char *dname,
182 bool is_open) {
183 std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free);
184 char hint[strlen(dname) + 1];
185 uint64_t id1 = 0;
186 uint64_t id2 = 0;
187
188 if (txn) {
189 id1 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).parent_id64;
190 id2 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).child_id64;
191 } else if (is_open)
192 id1 = toku_sync_fetch_and_add(&nontransactional_open_id, 1);
193
194 create_iname_hint(env, dname, hint);
195
196 result.reset(create_iname(env, id1, id2, hint, NULL, -1));
197
198 return result;
199}
200
201static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode);
202
203// Effect: Do the work required of DB->close().
204// requires: the multi_operation client lock is held.
205int
206toku_db_close(DB * db) {
207 int r = 0;
208 if (db_opened(db) && db->i->dname) {
209 // internal (non-user) dictionary has no dname
210 env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
211 }
212 // close the ft handle, and possibly close the locktree
213 toku_ft_handle_close(db->i->ft_handle);
214 if (db->i->lt) {
215 db->dbenv->i->ltm.release_lt(db->i->lt);
216 }
217 toku_sdbt_cleanup(&db->i->skey);
218 toku_sdbt_cleanup(&db->i->sval);
219 if (db->i->dname) {
220 toku_free(db->i->dname);
221 }
222 toku_free(db->i);
223 toku_free(db);
224 return r;
225}
226
227///////////
228//db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor
229
230int
231db_getf_set(DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
232 HANDLE_PANICKED_DB(db);
233 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
234 DBC c;
235 uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
236 flags &= ~DB_ISOLATION_FLAGS;
237 int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
238 if (r==0) {
239 r = toku_c_getf_set(&c, flags, key, f, extra);
240 int r2 = toku_c_close_internal(&c);
241 if (r==0) r = r2;
242 }
243 return r;
244}
245
246static inline int
247db_thread_need_flags(DBT *dbt) {
248 return (dbt->flags & (DB_DBT_MALLOC+DB_DBT_REALLOC+DB_DBT_USERMEM)) == 0;
249}
250
251int
252toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, uint32_t flags) {
253 HANDLE_PANICKED_DB(db);
254 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
255 int r;
256 uint32_t iso_flags = flags & DB_ISOLATION_FLAGS;
257
258 if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
259 return EINVAL;
260
261 uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
262 flags &= ~lock_flags;
263 flags &= ~DB_ISOLATION_FLAGS;
264 // And DB_GET_BOTH is no longer supported. #2862.
265 if (flags != 0) return EINVAL;
266
267 DBC dbc;
268 r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
269 if (r!=0) return r;
270 uint32_t c_get_flags = DB_SET;
271 r = toku_c_get(&dbc, key, data, c_get_flags | lock_flags);
272 int r2 = toku_c_close_internal(&dbc);
273 return r ? r : r2;
274}
275
276static int
277db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
278 int r;
279 if (!fname || !dbname) r = EINVAL;
280 else {
281 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
282 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
283 assert(bytes==(int)sizeof(subdb_full_name)-1);
284 const char *null_subdbname = NULL;
285 r = toku_db_open(db, txn, subdb_full_name, null_subdbname, dbtype, flags, mode);
286 }
287 return r;
288}
289
290// inames are created here.
291// algorithm:
292// begin txn
293// convert dname to iname (possibly creating new iname)
294// open file (toku_ft_handle_open() will handle logging)
295// close txn
296// if created a new iname, take full range lock
297// Requires: no checkpoint may take place during this function, which is enforced by holding the multi_operation_client_lock.
298static int
299toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
300 HANDLE_PANICKED_DB(db);
301 HANDLE_READ_ONLY_TXN(txn);
302 if (dbname != NULL) {
303 return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
304 }
305
306 // at this point fname is the dname
307 //This code ONLY supports single-db files.
308 assert(dbname == NULL);
309 const char * dname = fname; // db_open_subdb() converts (fname, dbname) to dname
310
311 ////////////////////////////// do some level of parameter checking.
312 uint32_t unused_flags = flags;
313 int r;
314 if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
315 int is_db_excl = flags & DB_EXCL; unused_flags&=~DB_EXCL;
316 int is_db_create = flags & DB_CREATE; unused_flags&=~DB_CREATE;
317 int is_db_hot_index = flags & DB_IS_HOT_INDEX; unused_flags&=~DB_IS_HOT_INDEX;
318
319 //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
320 unused_flags&=~DB_READ_UNCOMMITTED;
321 unused_flags&=~DB_READ_COMMITTED;
322 unused_flags&=~DB_SERIALIZABLE;
323
324 // DB_THREAD is implicitly supported and DB_BLACKHOLE is supported at the ft-layer
325 unused_flags &= ~DB_THREAD;
326 unused_flags &= ~DB_BLACKHOLE;
327
328 // check for unknown or conflicting flags
329 if (unused_flags) return EINVAL; // unknown flags
330 if (is_db_excl && !is_db_create) return EINVAL;
331 if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
332
333 if (db_opened(db)) {
334 // it was already open
335 return EINVAL;
336 }
337 //////////////////////////////
338
339 // convert dname to iname
340 // - look up dname, get iname
341 // - if dname does not exist, create iname and make entry in directory
342 DBT dname_dbt; // holds dname
343 DBT iname_dbt; // holds iname_in_env
344 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
345 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
346 r = toku_db_get(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
347 std::unique_ptr<char[], decltype(&toku_free)> iname(
348 static_cast<char *>(iname_dbt.data), &toku_free);
349 if (r == DB_NOTFOUND && !is_db_create) {
350 r = ENOENT;
351 } else if (r==0 && is_db_excl) {
352 r = EEXIST;
353 } else if (r == DB_NOTFOUND) {
354 iname = generate_iname_for_rename_or_open(db->dbenv, txn, dname, true);
355 toku_fill_dbt(&iname_dbt, iname.get(), strlen(iname.get()) + 1);
356 //
357 // put_flags will be 0 for performance only, avoid unnecessary query
358 // if we are creating a hot index, per #3166, we do not want the write lock in directory grabbed.
359 // directory read lock is grabbed in toku_db_get above
360 //
361 uint32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0);
362 r = toku_db_put(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, put_flags, true);
363 }
364
365 // we now have an iname
366 if (r == 0) {
367 r = toku_db_open_iname(db, txn, iname.get(), flags, mode);
368 if (r == 0) {
369 db->i->dname = toku_xstrdup(dname);
370 env_note_db_opened(db->dbenv, db); // tell env that a new db handle is open (using dname)
371 }
372 }
373
374 return r;
375}
376
377// set the descriptor and cmp_descriptor to the
378// descriptors from the given ft, updating the
379// locktree's descriptor pointer if necessary
380static void
381db_set_descriptors(DB *db, FT_HANDLE ft_handle) {
382 const toku::comparator &cmp = toku_ft_get_comparator(ft_handle);
383 db->descriptor = toku_ft_get_descriptor(ft_handle);
384 db->cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
385 invariant(db->cmp_descriptor == cmp.get_descriptor());
386 if (db->i->lt) {
387 db->i->lt->set_comparator(cmp);
388 }
389}
390
391// callback that sets the descriptors when
392// a dictionary is redirected at the ft layer
393static void
394db_on_redirect_callback(FT_HANDLE ft_handle, void* extra) {
395 DB *db = (DB *) extra;
396 db_set_descriptors(db, ft_handle);
397}
398
399// when a locktree is created, clone a ft handle and store it
400// as userdata so we can close it later.
401int toku_db_lt_on_create_callback(toku::locktree *lt, void *extra) {
402 int r;
403 struct lt_on_create_callback_extra *info = (struct lt_on_create_callback_extra *) extra;
404 TOKUTXN ttxn = info->txn ? db_txn_struct_i(info->txn)->tokutxn : NULL;
405 FT_HANDLE ft_handle = info->ft_handle;
406
407 FT_HANDLE cloned_ft_handle;
408 r = toku_ft_handle_clone(&cloned_ft_handle, ft_handle, ttxn);
409 if (r == 0) {
410 assert(lt->get_userdata() == NULL);
411 lt->set_userdata(cloned_ft_handle);
412 }
413 return r;
414}
415
416// when a locktree is about to be destroyed,
417// close the ft handle stored as userdata.
418void toku_db_lt_on_destroy_callback(toku::locktree *lt) {
419 FT_HANDLE ft_handle = (FT_HANDLE) lt->get_userdata();
420 assert(ft_handle);
421 toku_ft_handle_close(ft_handle);
422}
423
424// Instruct db to use the default (built-in) key comparison function
425// by setting the flag bits in the db and ft structs
426int toku_db_use_builtin_key_cmp(DB *db) {
427 HANDLE_PANICKED_DB(db);
428 int r = 0;
429 if (db_opened(db)) {
430 r = toku_ydb_do_error(db->dbenv, EINVAL, "Comparison functions cannot be set after DB open.\n");
431 } else if (db->i->key_compare_was_set) {
432 r = toku_ydb_do_error(db->dbenv, EINVAL, "Key comparison function already set.\n");
433 } else {
434 uint32_t tflags;
435 toku_ft_get_flags(db->i->ft_handle, &tflags);
436
437 tflags |= TOKU_DB_KEYCMP_BUILTIN;
438 toku_ft_set_flags(db->i->ft_handle, tflags);
439 db->i->key_compare_was_set = true;
440 }
441 return r;
442}
443
444int toku_db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, int mode) {
445 //Set comparison functions if not yet set.
446 HANDLE_READ_ONLY_TXN(txn);
447 if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
448 toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
449 db->i->key_compare_was_set = true;
450 }
451 if (db->dbenv->i->update_function) {
452 toku_ft_set_update(db->i->ft_handle,db->dbenv->i->update_function);
453 }
454 toku_ft_set_redirect_callback(
455 db->i->ft_handle,
456 db_on_redirect_callback,
457 db
458 );
459 bool need_locktree = (bool)((db->dbenv->i->open_flags & DB_INIT_LOCK) &&
460 (db->dbenv->i->open_flags & DB_INIT_TXN));
461
462 int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL;
463 int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE;
464 //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
465 flags&=~DB_READ_UNCOMMITTED;
466 flags&=~DB_READ_COMMITTED;
467 flags&=~DB_SERIALIZABLE;
468 flags&=~DB_IS_HOT_INDEX;
469 // unknown or conflicting flags are bad
470 int unknown_flags = flags & ~DB_THREAD;
471 unknown_flags &= ~DB_BLACKHOLE;
472 if (unknown_flags || (is_db_excl && !is_db_create)) {
473 return EINVAL;
474 }
475
476 if (db_opened(db)) {
477 return EINVAL; /* It was already open. */
478 }
479
480 db->i->open_flags = flags;
481 db->i->open_mode = mode;
482
483 FT_HANDLE ft_handle = db->i->ft_handle;
484 int r = toku_ft_handle_open(ft_handle, iname_in_env,
485 is_db_create, is_db_excl,
486 db->dbenv->i->cachetable,
487 txn ? db_txn_struct_i(txn)->tokutxn : nullptr);
488 if (r != 0) {
489 goto out;
490 }
491
492 // if the dictionary was opened as a blackhole, mark the
493 // fractal tree as blackhole too.
494 if (flags & DB_BLACKHOLE) {
495 toku_ft_set_blackhole(ft_handle);
496 }
497
498 db->i->opened = 1;
499
500 // now that the handle has successfully opened, a valid descriptor
501 // is in the ft. we need to set the db's descriptor pointers
502 db_set_descriptors(db, ft_handle);
503
504 if (need_locktree) {
505 db->i->dict_id = toku_ft_get_dictionary_id(db->i->ft_handle);
506 struct lt_on_create_callback_extra on_create_extra = {
507 .txn = txn,
508 .ft_handle = db->i->ft_handle,
509 };
510 db->i->lt = db->dbenv->i->ltm.get_lt(db->i->dict_id,
511 toku_ft_get_comparator(db->i->ft_handle),
512 &on_create_extra);
513 if (db->i->lt == nullptr) {
514 r = errno;
515 if (r == 0) {
516 r = EINVAL;
517 }
518 goto out;
519 }
520 }
521 r = 0;
522
523out:
524 if (r != 0) {
525 db->i->dict_id = DICTIONARY_ID_NONE;
526 db->i->opened = 0;
527 if (db->i->lt) {
528 db->dbenv->i->ltm.release_lt(db->i->lt);
529 db->i->lt = nullptr;
530 }
531 }
532 return r;
533}
534
535// Return the maximum key and val size in
536// *key_size and *val_size respectively
537static void
538toku_db_get_max_row_size(DB * UU(db), uint32_t * max_key_size, uint32_t * max_val_size) {
539 *max_key_size = 0;
540 *max_val_size = 0;
541 toku_ft_get_maximum_advised_key_value_lengths(max_key_size, max_val_size);
542}
543
544int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
545 // bad hack because some environment dictionaries do not have a dname
546 char *dname = db->i->dname;
547 if (!dname)
548 return 0;
549
550 DBT key_in_directory = { .data = dname, .size = (uint32_t) strlen(dname)+1 };
551 //Left end of range == right end of range (point lock)
552 int r = toku_db_get_range_lock(db->dbenv->i->directory, txn,
553 &key_in_directory, &key_in_directory,
554 toku::lock_request::type::WRITE);
555 if (r == 0)
556 STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS)++; // accountability
557 else
558 STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL)++; // accountability
559 return r;
560}
561
562//
563// This function is used both to set an initial descriptor of a DB and to
564// change a descriptor. (only way to set a descriptor of a DB)
565//
566// Requires:
567// - The caller must not call put_multiple, del_multiple, or update_multiple concurrently
568// - The caller must not have a hot index running concurrently on db
569// - If the caller has passed DB_UPDATE_CMP_DESCRIPTOR as a flag, then he is calling this function
570// ONLY immediately after creating the dictionary and before doing any actual work on the dictionary.
571//
572static int
573toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, uint32_t flags) {
574 HANDLE_PANICKED_DB(db);
575 HANDLE_READ_ONLY_TXN(txn);
576 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
577 int r = 0;
578 TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
579 bool is_db_hot_index = ((flags & DB_IS_HOT_INDEX) != 0);
580 bool update_cmp_descriptor = ((flags & DB_UPDATE_CMP_DESCRIPTOR) != 0);
581
582 DBT old_descriptor_dbt;
583 toku_init_dbt(&old_descriptor_dbt);
584
585 if (!db_opened(db) || !descriptor || (descriptor->size>0 && !descriptor->data)){
586 r = EINVAL;
587 goto cleanup;
588 }
589 // For a hot index, this is an initial descriptor.
590 // We do not support (yet) hcad with hot index concurrently on a single table, which
591 // would require changing a descriptor for a hot index.
592 if (!is_db_hot_index) {
593 r = toku_db_pre_acquire_table_lock(db, txn);
594 if (r != 0) { goto cleanup; }
595 }
596
597 toku_clone_dbt(&old_descriptor_dbt, db->descriptor->dbt);
598 toku_ft_change_descriptor(db->i->ft_handle, &old_descriptor_dbt, descriptor,
599 true, ttxn, update_cmp_descriptor);
600
601cleanup:
602 toku_destroy_dbt(&old_descriptor_dbt);
603 return r;
604}
605
606static int
607toku_db_set_flags(DB *db, uint32_t flags) {
608 HANDLE_PANICKED_DB(db);
609
610 /* the following matches BDB */
611 if (db_opened(db) && flags != 0) return EINVAL;
612
613 return 0;
614}
615
616static int
617toku_db_get_flags(DB *db, uint32_t *pflags) {
618 HANDLE_PANICKED_DB(db);
619 if (!pflags) return EINVAL;
620 *pflags = 0;
621 return 0;
622}
623
624static int
625toku_db_change_pagesize(DB *db, uint32_t pagesize) {
626 HANDLE_PANICKED_DB(db);
627 if (!db_opened(db)) return EINVAL;
628 toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
629 return 0;
630}
631
632static int
633toku_db_set_pagesize(DB *db, uint32_t pagesize) {
634 HANDLE_PANICKED_DB(db);
635 if (db_opened(db)) return EINVAL;
636 toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
637 return 0;
638}
639
640static int
641toku_db_get_pagesize(DB *db, uint32_t *pagesize_ptr) {
642 HANDLE_PANICKED_DB(db);
643 toku_ft_handle_get_nodesize(db->i->ft_handle, pagesize_ptr);
644 return 0;
645}
646
647static int
648toku_db_change_readpagesize(DB *db, uint32_t readpagesize) {
649 HANDLE_PANICKED_DB(db);
650 if (!db_opened(db)) return EINVAL;
651 toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
652 return 0;
653}
654
655static int
656toku_db_set_readpagesize(DB *db, uint32_t readpagesize) {
657 HANDLE_PANICKED_DB(db);
658 if (db_opened(db)) return EINVAL;
659 toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
660 return 0;
661}
662
663static int
664toku_db_get_readpagesize(DB *db, uint32_t *readpagesize_ptr) {
665 HANDLE_PANICKED_DB(db);
666 toku_ft_handle_get_basementnodesize(db->i->ft_handle, readpagesize_ptr);
667 return 0;
668}
669
670static int
671toku_db_change_compression_method(DB *db, enum toku_compression_method compression_method) {
672 HANDLE_PANICKED_DB(db);
673 if (!db_opened(db)) return EINVAL;
674 toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
675 return 0;
676}
677
678static int
679toku_db_set_compression_method(DB *db, enum toku_compression_method compression_method) {
680 HANDLE_PANICKED_DB(db);
681 if (db_opened(db)) return EINVAL;
682 toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
683 return 0;
684}
685
686static int
687toku_db_get_compression_method(DB *db, enum toku_compression_method *compression_method_ptr) {
688 HANDLE_PANICKED_DB(db);
689 toku_ft_handle_get_compression_method(db->i->ft_handle, compression_method_ptr);
690 return 0;
691}
692
693static int
694toku_db_change_fanout(DB *db, unsigned int fanout) {
695 HANDLE_PANICKED_DB(db);
696 if (!db_opened(db)) return EINVAL;
697 toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
698 return 0;
699}
700
701static int
702toku_db_set_fanout(DB *db, unsigned int fanout) {
703 HANDLE_PANICKED_DB(db);
704 if (db_opened(db)) return EINVAL;
705 toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
706 return 0;
707}
708
709static int
710toku_db_get_fanout(DB *db, unsigned int *fanout) {
711 HANDLE_PANICKED_DB(db);
712 toku_ft_handle_get_fanout(db->i->ft_handle, fanout);
713 return 0;
714}
715
716static int
717toku_db_set_memcmp_magic(DB *db, uint8_t magic) {
718 HANDLE_PANICKED_DB(db);
719 if (db_opened(db)) {
720 return EINVAL;
721 }
722 return toku_ft_handle_set_memcmp_magic(db->i->ft_handle, magic);
723}
724
725static int
726toku_db_get_fractal_tree_info64(DB *db, uint64_t *num_blocks_allocated, uint64_t *num_blocks_in_use, uint64_t *size_allocated, uint64_t *size_in_use) {
727 HANDLE_PANICKED_DB(db);
728 struct ftinfo64 ftinfo;
729 toku_ft_handle_get_fractal_tree_info64(db->i->ft_handle, &ftinfo);
730 *num_blocks_allocated = ftinfo.num_blocks_allocated;
731 *num_blocks_in_use = ftinfo.num_blocks_in_use;
732 *size_allocated = ftinfo.size_allocated;
733 *size_in_use = ftinfo.size_in_use;
734 return 0;
735}
736
737static int
738toku_db_iterate_fractal_tree_block_map(DB *db, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
739 HANDLE_PANICKED_DB(db);
740 return toku_ft_handle_iterate_fractal_tree_block_map(db->i->ft_handle, iter, iter_extra);
741}
742
743static int
744toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) {
745 HANDLE_PANICKED_DB(db);
746 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
747 struct ftstat64_s ftstat;
748 TOKUTXN tokutxn = NULL;
749 if (txn != NULL) {
750 tokutxn = db_txn_struct_i(txn)->tokutxn;
751 }
752 toku_ft_handle_stat64(db->i->ft_handle, tokutxn, &ftstat);
753 s->bt_nkeys = ftstat.nkeys;
754 s->bt_ndata = ftstat.ndata;
755 s->bt_dsize = ftstat.dsize;
756 s->bt_fsize = ftstat.fsize;
757 s->bt_create_time_sec = ftstat.create_time_sec;
758 s->bt_modify_time_sec = ftstat.modify_time_sec;
759 s->bt_verify_time_sec = ftstat.verify_time_sec;
760 return 0;
761}
762
763static const char *
764toku_db_get_dname(DB *db) {
765 if (!db_opened(db)) {
766 return nullptr;
767 }
768 if (db->i->dname == nullptr) {
769 return "";
770 }
771 return db->i->dname;
772}
773
774static int
775toku_db_keys_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* keyleft, DBT* keyright, uint64_t* less, uint64_t* left, uint64_t* between, uint64_t *right, uint64_t *greater, bool* middle_3_exact) {
776 HANDLE_PANICKED_DB(db);
777 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
778
779 // note that we ignore the txn param. It would be more complicated to support it.
780 // TODO(yoni): Maybe add support for txns later? How would we do this? ydb lock comment about db_keyrange64 is obsolete.
781 toku_ft_keysrange(db->i->ft_handle, keyleft, keyright, less, left, between, right, greater, middle_3_exact);
782 return 0;
783}
784
785static int
786toku_db_key_range64(DB* db, DB_TXN* txn, DBT* key, uint64_t* less_p, uint64_t* equal_p, uint64_t* greater_p, int* is_exact) {
787 uint64_t less, equal_left, middle, equal_right, greater;
788 bool ignore;
789 int r = toku_db_keys_range64(db, txn, key, NULL, &less, &equal_left, &middle, &equal_right, &greater, &ignore);
790 if (r == 0) {
791 *less_p = less;
792 *equal_p = equal_left;
793 *greater_p = middle;
794 paranoid_invariant_zero(greater); // no keys are greater than positive infinity
795 paranoid_invariant_zero(equal_right); // no keys are equal to positive infinity
796 // toku_ft_keysrange does not know when all 3 are exact, so set is_exact to false
797 *is_exact = false;
798 }
799 return 0;
800}
801
802static int toku_db_get_key_after_bytes(DB *db, DB_TXN *txn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *extra), void *cb_extra, uint32_t UU(flags)) {
803 HANDLE_PANICKED_DB(db);
804 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
805 return toku_ft_get_key_after_bytes(db->i->ft_handle, start_key, skip_len, callback, cb_extra);
806}
807
808// needed by loader.c
809int
810toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
811 HANDLE_PANICKED_DB(db);
812 if (!db->i->lt || !txn) return 0;
813 int r;
814 r = toku_db_get_range_lock(db, txn,
815 toku_dbt_negative_infinity(), toku_dbt_positive_infinity(),
816 toku::lock_request::type::WRITE);
817 return r;
818}
819
820static int
821locked_db_close(DB * db, uint32_t UU(flags)) {
822 // cannot begin a checkpoint
823 toku_multi_operation_client_lock();
824 int r = toku_db_close(db);
825 toku_multi_operation_client_unlock();
826 return r;
827}
828
829int
830autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data, uint32_t flags) {
831 bool changed; int r;
832 r = toku_db_construct_autotxn(db, &txn, &changed, false);
833 if (r!=0) return r;
834 r = toku_db_get(db, txn, key, data, flags);
835 return toku_db_destruct_autotxn(txn, r, changed);
836}
837
838static inline int
839autotxn_db_getf_set (DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
840 bool changed; int r;
841 r = toku_db_construct_autotxn(db, &txn, &changed, false);
842 if (r!=0) return r;
843 r = db_getf_set(db, txn, flags, key, f, extra);
844 return toku_db_destruct_autotxn(txn, r, changed);
845}
846
847static int
848locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
849 int ret, r;
850 HANDLE_READ_ONLY_TXN(txn);
851 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
852
853 //
854 // Note that this function opens a db with a transaction. Should
855 // the transaction abort, the user is responsible for closing the DB
856 // before aborting the transaction. Not doing so results in undefined
857 // behavior.
858 //
859 DB_ENV *env = db->dbenv;
860 DB_TXN *child_txn = NULL;
861 int using_txns = env->i->open_flags & DB_INIT_TXN;
862 if (using_txns) {
863 ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
864 invariant_zero(ret);
865 }
866
867 // cannot begin a checkpoint
868 toku_multi_operation_client_lock();
869 r = toku_db_open(db, child_txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
870 toku_multi_operation_client_unlock();
871
872 if (using_txns) {
873 if (r == 0) {
874 ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
875 invariant_zero(ret);
876 } else {
877 ret = locked_txn_abort(child_txn);
878 invariant_zero(ret);
879 }
880 }
881 return r;
882}
883
884static int
885locked_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
886 // cannot begin a checkpoint
887 toku_multi_operation_client_lock();
888 int r = toku_db_change_descriptor(db, txn, descriptor, flags);
889 toku_multi_operation_client_unlock();
890 return r;
891}
892
893static int
894autotxn_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
895 bool changed; int r;
896 r = toku_db_construct_autotxn(db, &txn, &changed, false);
897 if (r != 0) { return r; }
898 r = locked_db_change_descriptor(db, txn, descriptor, flags);
899 return toku_db_destruct_autotxn(txn, r, changed);
900}
901
902static void
903toku_db_set_errfile (DB *db, FILE *errfile) {
904 db->dbenv->set_errfile(db->dbenv, errfile);
905}
906
907// TODO 2216 delete this
908static int
909toku_db_fd(DB * UU(db), int * UU(fdp)) {
910 return 0;
911}
912
913static const DBT* toku_db_dbt_pos_infty(void) __attribute__((pure));
914static const DBT*
915toku_db_dbt_pos_infty(void) {
916 return toku_dbt_positive_infinity();
917}
918
919static const DBT* toku_db_dbt_neg_infty(void) __attribute__((pure));
920static const DBT*
921toku_db_dbt_neg_infty(void) {
922 return toku_dbt_negative_infinity();
923}
924
925static int
926toku_db_optimize(DB *db) {
927 HANDLE_PANICKED_DB(db);
928 toku_ft_optimize(db->i->ft_handle);
929 return 0;
930}
931
932static int
933toku_db_hot_optimize(DB *db, DBT* left, DBT* right,
934 int (*progress_callback)(void *extra, float progress),
935 void *progress_extra, uint64_t* loops_run)
936{
937 HANDLE_PANICKED_DB(db);
938 int r = 0;
939 r = toku_ft_hot_optimize(db->i->ft_handle, left, right,
940 progress_callback,
941 progress_extra, loops_run);
942
943 return r;
944}
945
946static int
947locked_db_optimize(DB *db) {
948 // need to protect from checkpointing because
949 // toku_db_optimize does a message injection
950 toku_multi_operation_client_lock(); //Cannot begin checkpoint
951 int r = toku_db_optimize(db);
952 toku_multi_operation_client_unlock();
953 return r;
954}
955
956
957struct last_key_extra {
958 YDB_CALLBACK_FUNCTION func;
959 void* extra;
960};
961
962static int
963db_get_last_key_callback(uint32_t keylen, const void *key, uint32_t vallen UU(), const void *val UU(), void *extra, bool lock_only) {
964 if (!lock_only) {
965 DBT keydbt;
966 toku_fill_dbt(&keydbt, key, keylen);
967 struct last_key_extra * CAST_FROM_VOIDP(info, extra);
968 info->func(&keydbt, NULL, info->extra);
969 }
970 return 0;
971}
972
973static int
974toku_db_get_last_key(DB * db, DB_TXN *txn, YDB_CALLBACK_FUNCTION func, void* extra) {
975 int r;
976 LE_CURSOR cursor = nullptr;
977 struct last_key_extra last_extra = { .func = func, .extra = extra };
978
979 r = toku_le_cursor_create(&cursor, db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
980 if (r != 0) { goto cleanup; }
981
982 // Goes in reverse order. First key returned is last in dictionary.
983 r = toku_le_cursor_next(cursor, db_get_last_key_callback, &last_extra);
984 if (r != 0) { goto cleanup; }
985
986cleanup:
987 if (cursor) {
988 toku_le_cursor_close(cursor);
989 }
990 return r;
991}
992
993static int
994autotxn_db_get_last_key(DB* db, YDB_CALLBACK_FUNCTION func, void* extra) {
995 bool changed; int r;
996 DB_TXN *txn = nullptr;
997 // Cursors inside require transactions, but this is _not_ a transactional function.
998 // Create transaction in a wrapper and then later close it.
999 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1000 if (r!=0) return r;
1001 r = toku_db_get_last_key(db, txn, func, extra);
1002 return toku_db_destruct_autotxn(txn, r, changed);
1003}
1004
1005static int
1006toku_db_get_fragmentation(DB * db, TOKU_DB_FRAGMENTATION report) {
1007 HANDLE_PANICKED_DB(db);
1008 int r;
1009 if (!db_opened(db))
1010 r = toku_ydb_do_error(db->dbenv, EINVAL, "Fragmentation report available only on open DBs.\n");
1011 else
1012 r = toku_ft_get_fragmentation(db->i->ft_handle, report);
1013 return r;
1014}
1015
1016int
1017toku_db_set_indexer(DB *db, DB_INDEXER * indexer) {
1018 int r = 0;
1019 if ( db->i->indexer != NULL && indexer != NULL ) {
1020 // you are trying to overwrite a valid indexer
1021 r = EINVAL;
1022 }
1023 else {
1024 db->i->indexer = indexer;
1025 }
1026 return r;
1027}
1028
1029DB_INDEXER *
1030toku_db_get_indexer(DB *db) {
1031 return db->i->indexer;
1032}
1033
1034static void
1035db_get_indexer(DB *db, DB_INDEXER **indexer_ptr) {
1036 *indexer_ptr = toku_db_get_indexer(db);
1037}
1038
1039struct ydb_verify_context {
1040 int (*progress_callback)(void *extra, float progress);
1041 void *progress_extra;
1042};
1043
1044static int
1045ydb_verify_progress_callback(void *extra, float progress) {
1046 struct ydb_verify_context *context = (struct ydb_verify_context *) extra;
1047 int r = 0;
1048 if (context->progress_callback) {
1049 r = context->progress_callback(context->progress_extra, progress);
1050 }
1051 return r;
1052}
1053
1054static int
1055toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_going) {
1056 struct ydb_verify_context context = { progress_callback, progress_extra };
1057 int r = toku_verify_ft_with_progress(db->i->ft_handle, ydb_verify_progress_callback, &context, verbose, keep_going);
1058 return r;
1059}
1060
1061
1062static int
1063toku_db_recount_rows(DB* db, int (*progress_callback)(uint64_t count,
1064 uint64_t deleted,
1065 void* progress_extra),
1066 void* progress_extra) {
1067
1068 HANDLE_PANICKED_DB(db);
1069 int r = 0;
1070 r =
1071 toku_ft_recount_rows(
1072 db->i->ft_handle,
1073 progress_callback,
1074 progress_extra);
1075
1076 return r;
1077}
1078
1079
1080int toku_setup_db_internal (DB **dbp, DB_ENV *env, uint32_t flags, FT_HANDLE ft_handle, bool is_open) {
1081 if (flags || env == NULL)
1082 return EINVAL;
1083
1084 if (!env_opened(env))
1085 return EINVAL;
1086
1087 DB *MALLOC(result);
1088 if (result == 0) {
1089 return ENOMEM;
1090 }
1091 memset(result, 0, sizeof *result);
1092 result->dbenv = env;
1093 MALLOC(result->i);
1094 if (result->i == 0) {
1095 toku_free(result);
1096 return ENOMEM;
1097 }
1098 memset(result->i, 0, sizeof *result->i);
1099 result->i->ft_handle = ft_handle;
1100 result->i->opened = is_open;
1101 *dbp = result;
1102 return 0;
1103}
1104
1105int
1106toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) {
1107 if (flags || env == NULL)
1108 return EINVAL;
1109
1110 if (!env_opened(env))
1111 return EINVAL;
1112
1113
1114 FT_HANDLE ft_handle;
1115 toku_ft_handle_create(&ft_handle);
1116
1117 int r = toku_setup_db_internal(db, env, flags, ft_handle, false);
1118 if (r != 0) return r;
1119
1120 DB *result=*db;
1121 // methods that grab the ydb lock
1122#define SDB(name) result->name = locked_db_ ## name
1123 SDB(close);
1124 SDB(open);
1125 SDB(optimize);
1126#undef SDB
1127 // methods that do not take the ydb lock
1128#define USDB(name) result->name = toku_db_ ## name
1129 USDB(set_errfile);
1130 USDB(set_pagesize);
1131 USDB(get_pagesize);
1132 USDB(change_pagesize);
1133 USDB(set_readpagesize);
1134 USDB(get_readpagesize);
1135 USDB(change_readpagesize);
1136 USDB(set_compression_method);
1137 USDB(get_compression_method);
1138 USDB(change_compression_method);
1139 USDB(set_fanout);
1140 USDB(get_fanout);
1141 USDB(set_memcmp_magic);
1142 USDB(change_fanout);
1143 USDB(set_flags);
1144 USDB(get_flags);
1145 USDB(fd);
1146 USDB(get_max_row_size);
1147 USDB(set_indexer);
1148 USDB(pre_acquire_table_lock);
1149 USDB(pre_acquire_fileops_lock);
1150 USDB(key_range64);
1151 USDB(keys_range64);
1152 USDB(get_key_after_bytes);
1153 USDB(hot_optimize);
1154 USDB(stat64);
1155 USDB(get_fractal_tree_info64);
1156 USDB(iterate_fractal_tree_block_map);
1157 USDB(get_dname);
1158 USDB(verify_with_progress);
1159 USDB(cursor);
1160 USDB(dbt_pos_infty);
1161 USDB(dbt_neg_infty);
1162 USDB(get_fragmentation);
1163 USDB(recount_rows);
1164#undef USDB
1165 result->get_indexer = db_get_indexer;
1166 result->del = autotxn_db_del;
1167 result->put = autotxn_db_put;
1168 result->update = autotxn_db_update;
1169 result->update_broadcast = autotxn_db_update_broadcast;
1170 result->change_descriptor = autotxn_db_change_descriptor;
1171 result->get_last_key = autotxn_db_get_last_key;
1172
1173 // unlocked methods
1174 result->get = autotxn_db_get;
1175 result->getf_set = autotxn_db_getf_set;
1176
1177 result->i->dict_id = DICTIONARY_ID_NONE;
1178 result->i->opened = 0;
1179 result->i->open_flags = 0;
1180 result->i->open_mode = 0;
1181 result->i->indexer = NULL;
1182 *db = result;
1183 return 0;
1184}
1185
1186// When the loader is created, it makes this call (toku_env_load_inames).
1187// For each dictionary to be loaded, replace old iname in directory
1188// with a newly generated iname. This will also take a write lock
1189// on the directory entries. The write lock will be released when
1190// the transaction of the loader is completed.
1191// If the transaction commits, the new inames are in place.
1192// If the transaction aborts, the old inames will be restored.
1193// The new inames are returned to the caller.
1194// It is the caller's responsibility to free them.
1195// If "mark_as_loader" is true, then include a mark in the iname
1196// to indicate that the file is created by the ft loader.
1197// Return 0 on success (could fail if write lock not available).
1198static int
1199load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], const char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1200 int rval = 0;
1201 int i;
1202
1203 TXNID_PAIR xid = TXNID_PAIR_NONE;
1204 DBT dname_dbt; // holds dname
1205 DBT iname_dbt; // holds new iname
1206
1207 const char *mark;
1208
1209 if (mark_as_loader) {
1210 mark = "B";
1211 } else {
1212 mark = "P";
1213 }
1214
1215 for (i=0; i<N; i++) {
1216 new_inames_in_env[i] = NULL;
1217 }
1218
1219 if (txn) {
1220 xid = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
1221 }
1222 for (i = 0; i < N; i++) {
1223 char * dname = dbs[i]->i->dname;
1224 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
1225
1226 // now create new iname
1227 char hint[strlen(dname) + 1];
1228 create_iname_hint(env, dname, hint);
1229
1230 // allocates memory for iname_in_env
1231 const char *new_iname =
1232 create_iname(env, xid.parent_id64, xid.child_id64, hint, mark, i);
1233 new_inames_in_env[i] = new_iname;
1234
1235 // iname_in_env goes in directory
1236 toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1);
1237 rval = toku_db_put(env->i->directory, txn, &dname_dbt, &iname_dbt, 0, true);
1238 if (rval) break;
1239 }
1240
1241 // Generate load log entries.
1242 if (!rval && txn) {
1243 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
1244 int do_fsync = 0;
1245 LSN *get_lsn = NULL;
1246 for (i = 0; i < N; i++) {
1247 FT_HANDLE ft_handle = dbs[i]->i->ft_handle;
1248 //Fsync is necessary for the last one only.
1249 if (i==N-1) {
1250 do_fsync = 1; //We only need a single fsync of logs.
1251 get_lsn = load_lsn; //Set pointer to capture the last lsn.
1252 }
1253 toku_ft_load(ft_handle, ttxn, new_inames_in_env[i], do_fsync, get_lsn);
1254 }
1255 }
1256 return rval;
1257}
1258
1259int
1260locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1261 int r;
1262 HANDLE_READ_ONLY_TXN(txn);
1263
1264 // cannot begin a checkpoint
1265 toku_multi_operation_client_lock();
1266 r = load_inames(env, txn, N, dbs, (const char **) new_inames_in_env, load_lsn, mark_as_loader);
1267 toku_multi_operation_client_unlock();
1268
1269 return r;
1270
1271}
1272
1273#undef STATUS_VALUE
1274
1275#include <toku_race_tools.h>
1276void __attribute__((constructor)) toku_ydb_db_helgrind_ignore(void);
1277void
1278toku_ydb_db_helgrind_ignore(void) {
1279 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_db_layer_status, sizeof ydb_db_layer_status);
1280}
1281