1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <ft/bndata.h>
41#include <ft/ft-internal.h>
42
43using namespace toku;
44uint32_t bn_data::klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const {
45 return sizeof(*klpair) + keylen_from_klpair_len(klpair_len) + leafentry_disksize(get_le_from_klpair(klpair));
46}
47
48void bn_data::init_zero() {
49 toku_mempool_zero(&m_buffer_mempool);
50 m_disksize_of_keys = 0;
51}
52
53void bn_data::initialize_empty() {
54 init_zero();
55 m_buffer.create();
56}
57
58void bn_data::add_key(uint32_t keylen) {
59 m_disksize_of_keys += sizeof(keylen) + keylen;
60}
61
62void bn_data::add_keys(uint32_t n_keys, uint32_t combined_klpair_len) {
63 invariant(n_keys * sizeof(uint32_t) <= combined_klpair_len);
64 m_disksize_of_keys += combined_klpair_len;
65}
66
67void bn_data::remove_key(uint32_t keylen) {
68 m_disksize_of_keys -= sizeof(keylen) + keylen;
69}
70
71// Deserialize from format optimized for keys being inlined.
72// Currently only supports fixed-length keys.
73void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
74 uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
75 uint32_t fixed_klpair_length) {
76 paranoid_invariant(version >= FT_LAYOUT_VERSION_26); // Support was added @26
77 uint32_t ndone_before = rb->ndone;
78 init_zero();
79 invariant(all_keys_same_length); // Until otherwise supported.
80 const void *keys_src;
81 rbuf_literal_bytes(rb, &keys_src, key_data_size);
82 //Generate dmt
83 this->m_buffer.create_from_sorted_memory_of_fixed_size_elements(
84 keys_src, num_entries, key_data_size, fixed_klpair_length);
85 toku_mempool_construct(&this->m_buffer_mempool, val_data_size);
86
87 const void *vals_src;
88 rbuf_literal_bytes(rb, &vals_src, val_data_size);
89
90 if (num_entries > 0) {
91 void *vals_dest = toku_mempool_malloc(&this->m_buffer_mempool, val_data_size);
92 paranoid_invariant_notnull(vals_dest);
93 memcpy(vals_dest, vals_src, val_data_size);
94 }
95
96 add_keys(num_entries, num_entries * fixed_klpair_length);
97
98 toku_note_deserialized_basement_node(all_keys_same_length);
99
100 invariant(rb->ndone - ndone_before == data_size);
101}
102
103static int
104wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
105 // need to pack the leafentry as it was in versions
106 // where the key was integrated into it (< 26)
107 uint32_t begin_spot UU() = wb->ndone;
108 uint32_t le_disk_size = leafentry_disksize(le);
109 wbuf_nocrc_uint8_t(wb, le->type);
110 wbuf_nocrc_uint32_t(wb, keylen);
111 if (le->type == LE_CLEAN) {
112 wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
113 wbuf_nocrc_literal_bytes(wb, key, keylen);
114 wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
115 }
116 else {
117 paranoid_invariant(le->type == LE_MVCC);
118 wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
119 wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
120 wbuf_nocrc_literal_bytes(wb, key, keylen);
121 wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
122 }
123 uint32_t end_spot UU() = wb->ndone;
124 paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
125 return 0;
126}
127
128void bn_data::serialize_to_wbuf(struct wbuf *const wb) {
129 prepare_to_serialize();
130 serialize_header(wb);
131 if (m_buffer.value_length_is_fixed()) {
132 serialize_rest(wb);
133 } else {
134 //
135 // iterate over leafentries and place them into the buffer
136 //
137 iterate<struct wbuf, wbufwriteleafentry>(wb);
138 }
139}
140
141// If we have fixed-length keys, we prepare the dmt and mempool.
142// The mempool is prepared by removing any fragmented space and ordering leafentries in the same order as their keys.
143void bn_data::prepare_to_serialize(void) {
144 if (m_buffer.value_length_is_fixed()) {
145 m_buffer.prepare_for_serialize();
146 dmt_compress_kvspace(0, nullptr, true); // Gets it ready for easy serialization.
147 }
148}
149
150void bn_data::serialize_header(struct wbuf *wb) const {
151 bool fixed = m_buffer.value_length_is_fixed();
152
153 //key_data_size
154 wbuf_nocrc_uint(wb, m_disksize_of_keys);
155 //val_data_size
156 wbuf_nocrc_uint(wb, toku_mempool_get_used_size(&m_buffer_mempool));
157 //fixed_klpair_length
158 wbuf_nocrc_uint(wb, m_buffer.get_fixed_length());
159 // all_keys_same_length
160 wbuf_nocrc_uint8_t(wb, fixed);
161 // keys_vals_separate
162 wbuf_nocrc_uint8_t(wb, fixed);
163}
164
165void bn_data::serialize_rest(struct wbuf *wb) const {
166 //Write keys
167 invariant(m_buffer.value_length_is_fixed()); //Assumes prepare_to_serialize was called
168 m_buffer.serialize_values(m_disksize_of_keys, wb);
169
170 //Write leafentries
171 //Just ran dmt_compress_kvspace so there is no fragmentation and also leafentries are in sorted order.
172 paranoid_invariant(toku_mempool_get_frag_size(&m_buffer_mempool) == 0);
173 uint32_t val_data_size = toku_mempool_get_used_size(&m_buffer_mempool);
174 wbuf_nocrc_literal_bytes(wb, toku_mempool_get_base(&m_buffer_mempool), val_data_size);
175}
176
177// Deserialize from rbuf
178void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version) {
179 uint32_t key_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
180 uint32_t val_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
181
182 bool all_keys_same_length = false;
183 bool keys_vals_separate = false;
184 uint32_t fixed_klpair_length = 0;
185
186 // In version 25 and older there is no header. Skip reading header for old version.
187 if (version >= FT_LAYOUT_VERSION_26) {
188 uint32_t ndone_before = rb->ndone;
189 key_data_size = rbuf_int(rb);
190 val_data_size = rbuf_int(rb);
191 fixed_klpair_length = rbuf_int(rb); // 0 if !all_keys_same_length
192 all_keys_same_length = rbuf_char(rb);
193 keys_vals_separate = rbuf_char(rb);
194 invariant(all_keys_same_length == keys_vals_separate); // Until we support otherwise
195 uint32_t header_size = rb->ndone - ndone_before;
196 data_size -= header_size;
197 invariant(header_size == HEADER_LENGTH);
198 if (keys_vals_separate) {
199 invariant(fixed_klpair_length >= sizeof(klpair_struct) || num_entries == 0);
200 initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
201 key_data_size, val_data_size, all_keys_same_length,
202 fixed_klpair_length);
203 return;
204 }
205 }
206 // Version >= 26 and version 25 deserialization are now identical except that <= 25 might allocate too much memory.
207 const void *bytes;
208 rbuf_literal_bytes(rb, &bytes, data_size);
209 const unsigned char *CAST_FROM_VOIDP(buf, bytes);
210 if (data_size == 0) {
211 invariant_zero(num_entries);
212 }
213 init_zero();
214 klpair_dmt_t::builder dmt_builder;
215 dmt_builder.create(num_entries, key_data_size);
216
217 // TODO(leif): clean this up (#149)
218 unsigned char *newmem = nullptr;
219 // add 25% extra wiggle room
220 uint32_t allocated_bytes_vals = val_data_size + (val_data_size / 4);
221 CAST_FROM_VOIDP(newmem, toku_xmalloc(allocated_bytes_vals));
222 const unsigned char* curr_src_pos = buf;
223 unsigned char* curr_dest_pos = newmem;
224 for (uint32_t i = 0; i < num_entries; i++) {
225 uint8_t curr_type = curr_src_pos[0];
226 curr_src_pos++;
227 // first thing we do is lay out the key,
228 // to do so, we must extract it from the leafentry
229 // and write it in
230 uint32_t keylen = 0;
231 const void* keyp = nullptr;
232 keylen = *(uint32_t *)curr_src_pos;
233 curr_src_pos += sizeof(uint32_t);
234 uint32_t clean_vallen = 0;
235 uint32_t num_cxrs = 0;
236 uint8_t num_pxrs = 0;
237 if (curr_type == LE_CLEAN) {
238 clean_vallen = toku_dtoh32(*(uint32_t *)curr_src_pos);
239 curr_src_pos += sizeof(clean_vallen); // val_len
240 keyp = curr_src_pos;
241 curr_src_pos += keylen;
242 }
243 else {
244 paranoid_invariant(curr_type == LE_MVCC);
245 num_cxrs = toku_htod32(*(uint32_t *)curr_src_pos);
246 curr_src_pos += sizeof(uint32_t); // num_cxrs
247 num_pxrs = curr_src_pos[0];
248 curr_src_pos += sizeof(uint8_t); //num_pxrs
249 keyp = curr_src_pos;
250 curr_src_pos += keylen;
251 }
252 uint32_t le_offset = curr_dest_pos - newmem;
253 dmt_builder.append(klpair_dmtwriter(keylen, le_offset, keyp));
254 add_key(keylen);
255
256 // now curr_dest_pos is pointing to where the leafentry should be packed
257 curr_dest_pos[0] = curr_type;
258 curr_dest_pos++;
259 if (curr_type == LE_CLEAN) {
260 *(uint32_t *)curr_dest_pos = toku_htod32(clean_vallen);
261 curr_dest_pos += sizeof(clean_vallen);
262 memcpy(curr_dest_pos, curr_src_pos, clean_vallen); // copy the val
263 curr_dest_pos += clean_vallen;
264 curr_src_pos += clean_vallen;
265 }
266 else {
267 // pack num_cxrs and num_pxrs
268 *(uint32_t *)curr_dest_pos = toku_htod32(num_cxrs);
269 curr_dest_pos += sizeof(num_cxrs);
270 *(uint8_t *)curr_dest_pos = num_pxrs;
271 curr_dest_pos += sizeof(num_pxrs);
272 // now we need to pack the rest of the data
273 uint32_t num_rest_bytes = leafentry_rest_memsize(num_pxrs, num_cxrs, const_cast<uint8_t*>(curr_src_pos));
274 memcpy(curr_dest_pos, curr_src_pos, num_rest_bytes);
275 curr_dest_pos += num_rest_bytes;
276 curr_src_pos += num_rest_bytes;
277 }
278 }
279 dmt_builder.build(&this->m_buffer);
280 toku_note_deserialized_basement_node(m_buffer.value_length_is_fixed());
281
282 uint32_t num_bytes_read = (uint32_t)(curr_src_pos - buf);
283 invariant(num_bytes_read == data_size);
284
285 uint32_t num_bytes_written = curr_dest_pos - newmem + m_disksize_of_keys;
286 invariant(num_bytes_written == data_size);
287 toku_mempool_init(&m_buffer_mempool, newmem, (size_t)(curr_dest_pos - newmem), allocated_bytes_vals);
288
289 invariant(get_disk_size() == data_size);
290 // Versions older than 26 might have allocated too much memory. Try to shrink the mempool now that we
291 // know how much memory we need.
292 if (version < FT_LAYOUT_VERSION_26) {
293 // Unnecessary after version 26
294 // Reallocate smaller mempool to save memory
295 invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
296 toku_mempool_realloc_larger(&m_buffer_mempool, toku_mempool_get_used_size(&m_buffer_mempool));
297 }
298}
299
300uint64_t bn_data::get_memory_size() {
301 uint64_t retval = 0;
302 //TODO: Maybe ask for memory_size instead of mempool_footprint (either this todo or the next)
303 // include fragmentation overhead but do not include space in the
304 // mempool that has not yet been allocated for leaf entries
305 size_t poolsize = toku_mempool_footprint(&m_buffer_mempool);
306 retval += poolsize;
307 // This one includes not-yet-allocated for nodes (just like old constant-key omt)
308 //TODO: Maybe ask for mempool_footprint instead of memory_size.
309 retval += m_buffer.memory_size();
310 invariant(retval >= get_disk_size());
311 return retval;
312}
313
314void bn_data::delete_leafentry (
315 uint32_t idx,
316 uint32_t keylen,
317 uint32_t old_le_size
318 )
319{
320 remove_key(keylen);
321 m_buffer.delete_at(idx);
322 toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
323}
324
325/* mempool support */
326
327struct dmt_compressor_state {
328 struct mempool *new_kvspace;
329 class bn_data *bd;
330};
331
332static int move_it (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct dmt_compressor_state * const oc) {
333 LEAFENTRY old_le = oc->bd->get_le_from_klpair(klpair);
334 uint32_t size = leafentry_memsize(old_le);
335 void* newdata = toku_mempool_malloc(oc->new_kvspace, size);
336 paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen
337 memcpy(newdata, old_le, size);
338 klpair->le_offset = toku_mempool_get_offset_from_pointer_and_base(oc->new_kvspace, newdata);
339 return 0;
340}
341
342// Compress things, and grow or shrink the mempool if needed.
343// May (always if force_compress) have a side effect of putting contents of mempool in sorted order.
344void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool force_compress) {
345 uint32_t total_size_needed = toku_mempool_get_used_size(&m_buffer_mempool) + added_size;
346
347 // If there is no fragmentation, e.g. in serial inserts, we can just increase the size
348 // of the mempool and move things over with a cheap memcpy. If force_compress is true,
349 // the caller needs the side effect that all contents are put in sorted order.
350 bool do_compress = toku_mempool_get_frag_size(&m_buffer_mempool) > 0 || force_compress;
351
352 void *old_mempool_base = toku_mempool_get_base(&m_buffer_mempool);
353 struct mempool new_kvspace;
354 if (do_compress) {
355 size_t requested_size = force_compress ? total_size_needed : ((total_size_needed * 3) / 2);
356 toku_mempool_construct(&new_kvspace, requested_size);
357 struct dmt_compressor_state oc = { &new_kvspace, this };
358 m_buffer.iterate_ptr< decltype(oc), move_it >(&oc);
359 } else {
360 toku_mempool_construct(&new_kvspace, total_size_needed);
361 size_t old_offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool);
362 void *new_mempool_base = toku_mempool_malloc(&new_kvspace, old_offset_limit);
363 memcpy(new_mempool_base, old_mempool_base, old_offset_limit);
364 }
365
366 if (maybe_free) {
367 *maybe_free = old_mempool_base;
368 } else {
369 toku_free(old_mempool_base);
370 }
371 m_buffer_mempool = new_kvspace;
372}
373
374// Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items
375// from the OMT (which items refer to items in the old mempool) into the new mempool.
376// If MAYBE_FREE is nullptr then free the old mempool's space.
377// Otherwise, store the old mempool's space in maybe_free.
378LEAFENTRY bn_data::mempool_malloc_and_update_dmt(size_t size, void **maybe_free) {
379 void *v = toku_mempool_malloc(&m_buffer_mempool, size);
380 if (v == nullptr) {
381 dmt_compress_kvspace(size, maybe_free, false);
382 v = toku_mempool_malloc(&m_buffer_mempool, size);
383 paranoid_invariant_notnull(v);
384 }
385 return (LEAFENTRY)v;
386}
387
388void bn_data::get_space_for_overwrite(
389 uint32_t idx,
390 const void* keyp UU(),
391 uint32_t keylen UU(),
392 uint32_t old_keylen,
393 uint32_t old_le_size,
394 uint32_t new_size,
395 LEAFENTRY* new_le_space,
396 void **const maybe_free
397 )
398{
399 *maybe_free = nullptr;
400 LEAFENTRY new_le = mempool_malloc_and_update_dmt(new_size, maybe_free);
401 toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
402 klpair_struct* klp = nullptr;
403 uint32_t klpair_len;
404 int r = m_buffer.fetch(idx, &klpair_len, &klp);
405 invariant_zero(r);
406 paranoid_invariant(klp!=nullptr);
407 // Old key length should be consistent with what is stored in the DMT
408 invariant(keylen_from_klpair_len(klpair_len) == old_keylen);
409
410 size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
411 paranoid_invariant(new_le_offset <= UINT32_MAX - new_size); // Not using > 4GB
412 klp->le_offset = new_le_offset;
413
414 paranoid_invariant(new_le == get_le_from_klpair(klp));
415 *new_le_space = new_le;
416}
417
418void bn_data::get_space_for_insert(
419 uint32_t idx,
420 const void* keyp,
421 uint32_t keylen,
422 size_t size,
423 LEAFENTRY* new_le_space,
424 void **const maybe_free
425 )
426{
427 add_key(keylen);
428
429 *maybe_free = nullptr;
430 LEAFENTRY new_le = mempool_malloc_and_update_dmt(size, maybe_free);
431 size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
432
433 klpair_dmtwriter kl(keylen, new_le_offset, keyp);
434 m_buffer.insert_at(kl, idx);
435
436 *new_le_space = new_le;
437}
438
439class split_klpairs_extra {
440 bn_data *const m_left_bn;
441 bn_data *const m_right_bn;
442 klpair_dmt_t::builder *const m_left_builder;
443 klpair_dmt_t::builder *const m_right_builder;
444 struct mempool *const m_left_dest_mp;
445 uint32_t m_split_at;
446
447 struct mempool *left_dest_mp(void) const { return m_left_dest_mp; }
448 struct mempool *right_dest_mp(void) const { return &m_right_bn->m_buffer_mempool; }
449
450 void copy_klpair(const uint32_t klpair_len, const klpair_struct &klpair,
451 klpair_dmt_t::builder *const builder,
452 struct mempool *const dest_mp,
453 bn_data *const bn) {
454 LEAFENTRY old_le = m_left_bn->get_le_from_klpair(&klpair);
455 size_t le_size = leafentry_memsize(old_le);
456
457 void *new_le = toku_mempool_malloc(dest_mp, le_size);
458 paranoid_invariant_notnull(new_le);
459 memcpy(new_le, old_le, le_size);
460 size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp, new_le);
461 size_t keylen = keylen_from_klpair_len(klpair_len);
462 builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key));
463
464 bn->add_key(keylen);
465 }
466
467 int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx) {
468 m_left_bn->remove_key(keylen_from_klpair_len(klpair_len));
469
470 if (idx < m_split_at) {
471 copy_klpair(klpair_len, klpair, m_left_builder, left_dest_mp(), m_left_bn);
472 } else {
473 copy_klpair(klpair_len, klpair, m_right_builder, right_dest_mp(), m_right_bn);
474 }
475 return 0;
476 }
477
478 public:
479 split_klpairs_extra(bn_data *const left_bn, bn_data *const right_bn,
480 klpair_dmt_t::builder *const left_builder,
481 klpair_dmt_t::builder *const right_builder,
482 struct mempool *const left_new_mp,
483 uint32_t split_at)
484 : m_left_bn(left_bn),
485 m_right_bn(right_bn),
486 m_left_builder(left_builder),
487 m_right_builder(right_builder),
488 m_left_dest_mp(left_new_mp),
489 m_split_at(split_at) {}
490 static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, split_klpairs_extra *const thisp) {
491 return thisp->move_leafentry(klpair_len, klpair, idx);
492 }
493};
494
495void bn_data::split_klpairs(
496 bn_data* right_bd,
497 uint32_t split_at //lower bound inclusive for right_bd
498 )
499{
500 // We use move_leafentries_to during a split, and the split algorithm should never call this
501 // if it's splitting on a boundary, so there must be some leafentries in the range to move.
502 paranoid_invariant(split_at < num_klpairs());
503
504 right_bd->init_zero();
505
506 size_t mpsize = toku_mempool_get_used_size(&m_buffer_mempool); // overkill, but safe
507
508 struct mempool new_left_mp;
509 toku_mempool_construct(&new_left_mp, mpsize);
510
511 struct mempool *right_mp = &right_bd->m_buffer_mempool;
512 toku_mempool_construct(right_mp, mpsize);
513
514 klpair_dmt_t::builder left_dmt_builder;
515 left_dmt_builder.create(split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
516
517 klpair_dmt_t::builder right_dmt_builder;
518 right_dmt_builder.create(num_klpairs() - split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
519
520 split_klpairs_extra extra(this, right_bd, &left_dmt_builder, &right_dmt_builder, &new_left_mp, split_at);
521
522 int r = m_buffer.iterate<split_klpairs_extra, split_klpairs_extra::cb>(&extra);
523 invariant_zero(r);
524
525 m_buffer.destroy();
526 toku_mempool_destroy(&m_buffer_mempool);
527
528 m_buffer_mempool = new_left_mp;
529
530 left_dmt_builder.build(&m_buffer);
531 right_dmt_builder.build(&right_bd->m_buffer);
532
533 // Potentially shrink memory pool for destination.
534 // We overallocated ("overkill") above
535 struct mempool *const left_mp = &m_buffer_mempool;
536 paranoid_invariant_zero(toku_mempool_get_frag_size(left_mp));
537 toku_mempool_realloc_larger(left_mp, toku_mempool_get_used_size(left_mp));
538 paranoid_invariant_zero(toku_mempool_get_frag_size(right_mp));
539 toku_mempool_realloc_larger(right_mp, toku_mempool_get_used_size(right_mp));
540}
541
542uint64_t bn_data::get_disk_size() {
543 return m_disksize_of_keys +
544 toku_mempool_get_used_size(&m_buffer_mempool);
545}
546
547struct verify_le_in_mempool_state {
548 size_t offset_limit;
549 class bn_data *bd;
550};
551
552static int verify_le_in_mempool (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct verify_le_in_mempool_state * const state) {
553 invariant(klpair->le_offset < state->offset_limit);
554
555 LEAFENTRY le = state->bd->get_le_from_klpair(klpair);
556 uint32_t size = leafentry_memsize(le);
557
558 size_t end_offset = klpair->le_offset+size;
559
560 invariant(end_offset <= state->offset_limit);
561 return 0;
562}
563
564//This is a debug-only (paranoid) verification.
565//Verifies the dmt is valid, and all leafentries are entirely in the mempool's memory.
566void bn_data::verify_mempool(void) {
567 //Verify the dmt itself <- paranoid and slow
568 m_buffer.verify();
569
570 verify_le_in_mempool_state state = { .offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool), .bd = this };
571 //Verify every leafentry pointed to by the keys in the dmt are fully inside the mempool
572 m_buffer.iterate_ptr< decltype(state), verify_le_in_mempool >(&state);
573}
574
575uint32_t bn_data::num_klpairs(void) const {
576 return m_buffer.size();
577}
578
579void bn_data::destroy(void) {
580 // The buffer may have been freed already, in some cases.
581 m_buffer.destroy();
582 toku_mempool_destroy(&m_buffer_mempool);
583 m_disksize_of_keys = 0;
584}
585
586void bn_data::set_contents_as_clone_of_sorted_array(
587 uint32_t num_les,
588 const void** old_key_ptrs,
589 uint32_t* old_keylens,
590 LEAFENTRY* old_les,
591 size_t *le_sizes,
592 size_t total_key_size,
593 size_t total_le_size
594 )
595{
596 //Enforce "just created" invariant.
597 paranoid_invariant_zero(m_disksize_of_keys);
598 paranoid_invariant_zero(num_klpairs());
599 paranoid_invariant_null(toku_mempool_get_base(&m_buffer_mempool));
600 paranoid_invariant_zero(toku_mempool_get_size(&m_buffer_mempool));
601
602 toku_mempool_construct(&m_buffer_mempool, total_le_size);
603 m_buffer.destroy();
604 m_disksize_of_keys = 0;
605
606 klpair_dmt_t::builder dmt_builder;
607 dmt_builder.create(num_les, total_key_size);
608
609 for (uint32_t idx = 0; idx < num_les; idx++) {
610 void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx]);
611 paranoid_invariant_notnull(new_le);
612 memcpy(new_le, old_les[idx], le_sizes[idx]);
613 size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le);
614 dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx]));
615 add_key(old_keylens[idx]);
616 }
617 dmt_builder.build(&this->m_buffer);
618}
619
620LEAFENTRY bn_data::get_le_from_klpair(const klpair_struct *klpair) const {
621 void * ptr = toku_mempool_get_pointer_from_base_and_offset(&this->m_buffer_mempool, klpair->le_offset);
622 LEAFENTRY CAST_FROM_VOIDP(le, ptr);
623 return le;
624}
625
626
627// get info about a single leafentry by index
628int bn_data::fetch_le(uint32_t idx, LEAFENTRY *le) {
629 klpair_struct* klpair = nullptr;
630 int r = m_buffer.fetch(idx, nullptr, &klpair);
631 if (r == 0) {
632 *le = get_le_from_klpair(klpair);
633 }
634 return r;
635}
636
637int bn_data::fetch_klpair(uint32_t idx, LEAFENTRY *le, uint32_t *len, void** key) {
638 klpair_struct* klpair = nullptr;
639 uint32_t klpair_len;
640 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
641 if (r == 0) {
642 *len = keylen_from_klpair_len(klpair_len);
643 *key = klpair->key;
644 *le = get_le_from_klpair(klpair);
645 }
646 return r;
647}
648
649int bn_data::fetch_klpair_disksize(uint32_t idx, size_t *size) {
650 klpair_struct* klpair = nullptr;
651 uint32_t klpair_len;
652 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
653 if (r == 0) {
654 *size = klpair_disksize(klpair_len, klpair);
655 }
656 return r;
657}
658
659int bn_data::fetch_key_and_len(uint32_t idx, uint32_t *len, void** key) {
660 klpair_struct* klpair = nullptr;
661 uint32_t klpair_len;
662 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
663 if (r == 0) {
664 *len = keylen_from_klpair_len(klpair_len);
665 *key = klpair->key;
666 }
667 return r;
668}
669
670void bn_data::clone(bn_data* orig_bn_data) {
671 toku_mempool_clone(&orig_bn_data->m_buffer_mempool, &m_buffer_mempool);
672 m_buffer.clone(orig_bn_data->m_buffer);
673 this->m_disksize_of_keys = orig_bn_data->m_disksize_of_keys;
674}
675
676