1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <string.h>
40#include <db.h>
41
42#include <portability/memory.h>
43#include <limits.h>
44
45namespace toku {
46
47template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
48void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::create(void) {
49 toku_mempool_zero(&this->mp);
50 this->values_same_size = true;
51 this->value_length = 0;
52 this->is_array = true;
53 this->d.a.num_values = 0;
54 //TODO: maybe allocate enough space for something by default?
55 // We may be relying on not needing to allocate space the first time (due to limited time spent while a lock is held)
56}
57
58/**
59 * Note: create_from_sorted_memory_of_fixed_size_elements does not take ownership of 'mem'.
60 * Owner is still responsible for freeing it.
61 * While in the OMT a similar function would steal ownership, this doesn't make sense for the DMT because
62 * we (usually) have to add padding for alignment (mem has all of the elements PACKED).
63 * Also all current uses (as of Jan 12, 2014) of this function would require mallocing a new array
64 * in order to allow stealing.
65 */
66template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
67void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::create_from_sorted_memory_of_fixed_size_elements(
68 const void *mem,
69 const uint32_t numvalues,
70 const uint32_t mem_length,
71 const uint32_t fixed_value_length) {
72 this->values_same_size = true;
73 this->value_length = fixed_value_length;
74 this->is_array = true;
75 this->d.a.num_values = numvalues;
76 const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
77 uint32_t aligned_memsize = mem_length + numvalues * pad_bytes;
78 toku_mempool_construct(&this->mp, aligned_memsize);
79 if (aligned_memsize > 0) {
80 paranoid_invariant(numvalues > 0);
81 void *ptr = toku_mempool_malloc(&this->mp, aligned_memsize);
82 paranoid_invariant_notnull(ptr);
83 uint8_t * const CAST_FROM_VOIDP(dest, ptr);
84 const uint8_t * const CAST_FROM_VOIDP(src, mem);
85 if (pad_bytes == 0) {
86 paranoid_invariant(aligned_memsize == mem_length);
87 memcpy(dest, src, aligned_memsize);
88 } else {
89 // TODO(leif): check what vectorizes best: multiplying like this or adding to offsets
90 const uint32_t fixed_len = this->value_length;
91 const uint32_t fixed_aligned_len = align(this->value_length);
92 paranoid_invariant(this->d.a.num_values*fixed_len == mem_length);
93 for (uint32_t i = 0; i < this->d.a.num_values; i++) {
94 memcpy(&dest[i*fixed_aligned_len], &src[i*fixed_len], fixed_len);
95 }
96 }
97 }
98}
99
100template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
101void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::clone(const dmt &src) {
102 *this = src;
103 toku_mempool_clone(&src.mp, &this->mp);
104}
105
106template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
107void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::clear(void) {
108 this->is_array = true;
109 this->d.a.num_values = 0;
110 this->values_same_size = true; // Reset state
111 this->value_length = 0;
112 //TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
113 // One 'fix' is for mempool to also track what was touched, and reset() shouldn't reset that, though realloc() might.
114 toku_mempool_reset(&this->mp);
115}
116
117template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
118void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::destroy(void) {
119 this->clear();
120 toku_mempool_destroy(&this->mp);
121}
122
123template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
124uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::size(void) const {
125 if (this->is_array) {
126 return this->d.a.num_values;
127 } else {
128 return this->nweight(this->d.t.root);
129 }
130}
131
132template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
133uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::nweight(const subtree &subtree) const {
134 if (subtree.is_null()) {
135 return 0;
136 } else {
137 const dmt_node & node = get_node(subtree);
138 return node.weight;
139 }
140}
141
142template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
143template<typename dmtcmp_t, int (*h)(const uint32_t size, const dmtdata_t &, const dmtcmp_t &)>
144int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert(const dmtwriter_t &value, const dmtcmp_t &v, uint32_t *const idx) {
145 int r;
146 uint32_t insert_idx;
147
148 r = this->find_zero<dmtcmp_t, h>(v, nullptr, nullptr, &insert_idx);
149 if (r==0) {
150 if (idx) *idx = insert_idx;
151 return DB_KEYEXIST;
152 }
153 if (r != DB_NOTFOUND) return r;
154
155 if ((r = this->insert_at(value, insert_idx))) return r;
156 if (idx) *idx = insert_idx;
157
158 return 0;
159}
160
161template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
162int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_at(const dmtwriter_t &value, const uint32_t idx) {
163 if (idx > this->size()) { return EINVAL; }
164
165 bool same_size = this->values_same_size && (this->size() == 0 || value.get_size() == this->value_length);
166 if (this->is_array) {
167 if (same_size && idx == this->d.a.num_values) {
168 return this->insert_at_array_end<true>(value);
169 }
170 this->convert_from_array_to_tree();
171 }
172 // Is a tree.
173 paranoid_invariant(!is_array);
174 if (!same_size) {
175 this->values_same_size = false;
176 this->value_length = 0;
177 }
178
179 this->maybe_resize_tree(&value);
180 subtree *rebalance_subtree = nullptr;
181 this->insert_internal(&this->d.t.root, value, idx, &rebalance_subtree);
182 if (rebalance_subtree != nullptr) {
183 this->rebalance(rebalance_subtree);
184 }
185 return 0;
186}
187
188template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
189template<bool with_resize>
190int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_at_array_end(const dmtwriter_t& value_in) {
191 paranoid_invariant(this->is_array);
192 paranoid_invariant(this->values_same_size);
193 if (this->d.a.num_values == 0) {
194 this->value_length = value_in.get_size();
195 }
196 paranoid_invariant(this->value_length == value_in.get_size());
197
198 if (with_resize) {
199 this->maybe_resize_array_for_insert();
200 }
201 dmtdata_t *dest = this->alloc_array_value_end();
202 value_in.write_to(dest);
203 return 0;
204}
205
206template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
207dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::alloc_array_value_end(void) {
208 paranoid_invariant(this->is_array);
209 paranoid_invariant(this->values_same_size);
210 this->d.a.num_values++;
211
212 void *ptr = toku_mempool_malloc(&this->mp, align(this->value_length));
213 paranoid_invariant_notnull(ptr);
214 paranoid_invariant(reinterpret_cast<size_t>(ptr) % ALIGNMENT == 0);
215 dmtdata_t *CAST_FROM_VOIDP(n, ptr);
216 paranoid_invariant(n == get_array_value(this->d.a.num_values - 1));
217 return n;
218}
219
220template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
221dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_array_value(const uint32_t idx) const {
222 paranoid_invariant(this->is_array);
223 paranoid_invariant(this->values_same_size);
224
225 paranoid_invariant(idx < this->d.a.num_values);
226 return get_array_value_internal(&this->mp, idx);
227}
228
229template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
230dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_array_value_internal(const struct mempool *mempool, const uint32_t idx) const {
231 void* ptr = toku_mempool_get_pointer_from_base_and_offset(mempool, idx * align(this->value_length));
232 dmtdata_t *CAST_FROM_VOIDP(value, ptr);
233 return value;
234}
235
236//TODO(leif) write microbenchmarks to compare growth factor. Note: growth factor here is actually 2.5 because of mempool_construct
237template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
238void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::maybe_resize_array_for_insert(void) {
239 bool space_available = toku_mempool_get_free_size(&this->mp) >= align(this->value_length);
240
241 if (!space_available) {
242 const uint32_t n = this->d.a.num_values + 1;
243 const uint32_t new_n = n <=2 ? 4 : 2*n;
244 const uint32_t new_space = align(this->value_length) * new_n;
245
246 struct mempool new_kvspace;
247 toku_mempool_construct(&new_kvspace, new_space);
248 size_t copy_bytes = this->d.a.num_values * align(this->value_length);
249 invariant(copy_bytes + align(this->value_length) <= new_space);
250 paranoid_invariant(copy_bytes <= toku_mempool_get_used_size(&this->mp));
251 // Copy over to new mempool
252 if (this->d.a.num_values > 0) {
253 void* dest = toku_mempool_malloc(&new_kvspace, copy_bytes);
254 invariant(dest!=nullptr);
255 memcpy(dest, get_array_value(0), copy_bytes);
256 }
257 toku_mempool_destroy(&this->mp);
258 this->mp = new_kvspace;
259 }
260}
261
262template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
263uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::align(const uint32_t x) const {
264 return roundup_to_multiple(ALIGNMENT, x);
265}
266
267template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
268void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::prepare_for_serialize(void) {
269 if (!this->is_array) {
270 this->convert_from_tree_to_array();
271 }
272}
273
274template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
275void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::convert_from_tree_to_array(void) {
276 paranoid_invariant(!this->is_array);
277 paranoid_invariant(this->values_same_size);
278
279 const uint32_t num_values = this->size();
280
281 node_offset *tmp_array;
282 bool malloced = false;
283 tmp_array = alloc_temp_node_offsets(num_values);
284 if (!tmp_array) {
285 malloced = true;
286 XMALLOC_N(num_values, tmp_array);
287 }
288 this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
289
290 struct mempool new_mp;
291 const uint32_t fixed_len = this->value_length;
292 const uint32_t fixed_aligned_len = align(this->value_length);
293 size_t mem_needed = num_values * fixed_aligned_len;
294 toku_mempool_construct(&new_mp, mem_needed);
295 uint8_t* CAST_FROM_VOIDP(dest, toku_mempool_malloc(&new_mp, mem_needed));
296 paranoid_invariant_notnull(dest);
297 for (uint32_t i = 0; i < num_values; i++) {
298 const dmt_node &n = get_node(tmp_array[i]);
299 memcpy(&dest[i*fixed_aligned_len], &n.value, fixed_len);
300 }
301 toku_mempool_destroy(&this->mp);
302 this->mp = new_mp;
303 this->is_array = true;
304 this->d.a.num_values = num_values;
305
306 if (malloced) toku_free(tmp_array);
307}
308
309template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
310void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::convert_from_array_to_tree(void) {
311 paranoid_invariant(this->is_array);
312 paranoid_invariant(this->values_same_size);
313
314 //save array-format information to locals
315 const uint32_t num_values = this->d.a.num_values;
316
317 node_offset *tmp_array;
318 bool malloced = false;
319 tmp_array = alloc_temp_node_offsets(num_values);
320 if (!tmp_array) {
321 malloced = true;
322 XMALLOC_N(num_values, tmp_array);
323 }
324
325 struct mempool old_mp = this->mp;
326 size_t mem_needed = num_values * align(this->value_length + __builtin_offsetof(dmt_node, value));
327 toku_mempool_construct(&this->mp, mem_needed);
328
329 for (uint32_t i = 0; i < num_values; i++) {
330 dmtwriter_t writer(this->value_length, get_array_value_internal(&old_mp, i));
331 tmp_array[i] = node_malloc_and_set_value(writer);
332 }
333 this->is_array = false;
334 this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, num_values);
335
336 if (malloced) toku_free(tmp_array);
337 toku_mempool_destroy(&old_mp);
338}
339
340template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
341int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::delete_at(const uint32_t idx) {
342 uint32_t n = this->size();
343 if (idx >= n) { return EINVAL; }
344
345 if (n == 1) {
346 this->clear(); //Emptying out the entire dmt.
347 return 0;
348 }
349 if (this->is_array) {
350 this->convert_from_array_to_tree();
351 }
352 paranoid_invariant(!is_array);
353
354 subtree *rebalance_subtree = nullptr;
355 this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree);
356 if (rebalance_subtree != nullptr) {
357 this->rebalance(rebalance_subtree);
358 }
359 this->maybe_resize_tree(nullptr);
360 return 0;
361}
362
363template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
364template<typename iterate_extra_t,
365 int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
366int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate(iterate_extra_t *const iterate_extra) const {
367 return this->iterate_on_range<iterate_extra_t, f>(0, this->size(), iterate_extra);
368}
369
370template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
371template<typename iterate_extra_t,
372 int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
373int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const {
374 if (right > this->size()) { return EINVAL; }
375 if (left == right) { return 0; }
376 if (this->is_array) {
377 return this->iterate_internal_array<iterate_extra_t, f>(left, right, iterate_extra);
378 }
379 return this->iterate_internal<iterate_extra_t, f>(left, right, this->d.t.root, 0, iterate_extra);
380}
381
382template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
383void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::verify(void) const {
384 uint32_t num_values = this->size();
385 invariant(num_values < UINT32_MAX);
386 size_t pool_used = toku_mempool_get_used_size(&this->mp);
387 size_t pool_size = toku_mempool_get_size(&this->mp);
388 size_t pool_frag = toku_mempool_get_frag_size(&this->mp);
389 invariant(pool_used <= pool_size);
390 if (this->is_array) {
391 invariant(this->values_same_size);
392 invariant(num_values == this->d.a.num_values);
393
394 // We know exactly how much memory should be used.
395 invariant(pool_used == num_values * align(this->value_length));
396
397 // Array form must have 0 fragmentation in mempool.
398 invariant(pool_frag == 0);
399 } else {
400 if (this->values_same_size) {
401 // We know exactly how much memory should be used.
402 invariant(pool_used == num_values * align(this->value_length + __builtin_offsetof(dmt_node, value)));
403 } else {
404 // We can only do a lower bound on memory usage.
405 invariant(pool_used >= num_values * __builtin_offsetof(dmt_node, value));
406 }
407 std::vector<bool> touched(pool_size, false);
408 verify_internal(this->d.t.root, &touched);
409 size_t bytes_used = 0;
410 for (size_t i = 0; i < pool_size; i++) {
411 if (touched.at(i)) {
412 ++bytes_used;
413 }
414 }
415 invariant(bytes_used == pool_used);
416 }
417}
418
419// Verifies all weights are internally consistent.
420template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
421void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::verify_internal(const subtree &subtree, std::vector<bool> *touched) const {
422 if (subtree.is_null()) {
423 return;
424 }
425 const dmt_node &node = get_node(subtree);
426
427 if (this->values_same_size) {
428 invariant(node.value_length == this->value_length);
429 }
430
431 size_t offset = toku_mempool_get_offset_from_pointer_and_base(&this->mp, &node);
432 size_t node_size = align(__builtin_offsetof(dmt_node, value) + node.value_length);
433 invariant(offset <= touched->size());
434 invariant(offset+node_size <= touched->size());
435 invariant(offset % ALIGNMENT == 0);
436 // Mark memory as touched and never allocated to multiple nodes.
437 for (size_t i = offset; i < offset+node_size; ++i) {
438 invariant(!touched->at(i));
439 touched->at(i) = true;
440 }
441
442 const uint32_t leftweight = this->nweight(node.left);
443 const uint32_t rightweight = this->nweight(node.right);
444
445 invariant(leftweight + rightweight + 1 == this->nweight(subtree));
446 verify_internal(node.left, touched);
447 verify_internal(node.right, touched);
448}
449
450template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
451template<typename iterate_extra_t,
452 int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
453void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr(iterate_extra_t *const iterate_extra) {
454 if (this->is_array) {
455 this->iterate_ptr_internal_array<iterate_extra_t, f>(0, this->size(), iterate_extra);
456 } else {
457 this->iterate_ptr_internal<iterate_extra_t, f>(0, this->size(), this->d.t.root, 0, iterate_extra);
458 }
459}
460
461template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
462int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch(const uint32_t idx, uint32_t *const value_len, dmtdataout_t *const value) const {
463 if (idx >= this->size()) { return EINVAL; }
464 if (this->is_array) {
465 this->fetch_internal_array(idx, value_len, value);
466 } else {
467 this->fetch_internal(this->d.t.root, idx, value_len, value);
468 }
469 return 0;
470}
471
472template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
473template<typename dmtcmp_t,
474 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
475int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_zero(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
476 uint32_t tmp_index;
477 uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
478 int r;
479 if (this->is_array) {
480 r = this->find_internal_zero_array<dmtcmp_t, h>(extra, value_len, value, child_idxp);
481 }
482 else {
483 r = this->find_internal_zero<dmtcmp_t, h>(this->d.t.root, extra, value_len, value, child_idxp);
484 }
485 return r;
486}
487
488template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
489template<typename dmtcmp_t,
490 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
491int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find(const dmtcmp_t &extra, int direction, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
492 uint32_t tmp_index;
493 uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
494 paranoid_invariant(direction != 0);
495 if (direction < 0) {
496 if (this->is_array) {
497 return this->find_internal_minus_array<dmtcmp_t, h>(extra, value_len, value, child_idxp);
498 } else {
499 return this->find_internal_minus<dmtcmp_t, h>(this->d.t.root, extra, value_len, value, child_idxp);
500 }
501 } else {
502 if (this->is_array) {
503 return this->find_internal_plus_array<dmtcmp_t, h>(extra, value_len, value, child_idxp);
504 } else {
505 return this->find_internal_plus<dmtcmp_t, h>(this->d.t.root, extra, value_len, value, child_idxp);
506 }
507 }
508}
509
510template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
511size_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::memory_size(void) {
512 return (sizeof *this) + toku_mempool_get_size(&this->mp);
513}
514
515template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
516dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_node(const subtree &subtree) const {
517 paranoid_invariant(!subtree.is_null());
518 return get_node(subtree.get_offset());
519}
520
521template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
522dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_node(const node_offset offset) const {
523 void* ptr = toku_mempool_get_pointer_from_base_and_offset(&this->mp, offset);
524 dmt_node *CAST_FROM_VOIDP(node, ptr);
525 return *node;
526}
527
528template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
529void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_set_value(dmt_node * n, const dmtwriter_t &value) {
530 n->value_length = value.get_size();
531 value.write_to(&n->value);
532}
533
534template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
535node_offset dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_malloc_and_set_value(const dmtwriter_t &value) {
536 size_t val_size = value.get_size();
537 size_t size_to_alloc = __builtin_offsetof(dmt_node, value) + val_size;
538 size_to_alloc = align(size_to_alloc);
539 void* np = toku_mempool_malloc(&this->mp, size_to_alloc);
540 paranoid_invariant_notnull(np);
541 dmt_node *CAST_FROM_VOIDP(n, np);
542 node_set_value(n, value);
543
544 return toku_mempool_get_offset_from_pointer_and_base(&this->mp, np);
545}
546
547template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
548void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_free(const subtree &st) {
549 dmt_node &n = get_node(st);
550 size_t size_to_free = __builtin_offsetof(dmt_node, value) + n.value_length;
551 size_to_free = align(size_to_free);
552 toku_mempool_mfree(&this->mp, &n, size_to_free);
553}
554
555template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
556void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::maybe_resize_tree(const dmtwriter_t * value) {
557 const ssize_t curr_capacity = toku_mempool_get_size(&this->mp);
558 const ssize_t curr_free = toku_mempool_get_free_size(&this->mp);
559 const ssize_t curr_used = toku_mempool_get_used_size(&this->mp);
560 ssize_t add_size = 0;
561 if (value) {
562 add_size = __builtin_offsetof(dmt_node, value) + value->get_size();
563 add_size = align(add_size);
564 }
565
566 const ssize_t need_size = curr_used + add_size;
567 paranoid_invariant(need_size <= UINT32_MAX);
568 //TODO(leif) consider different growth rates
569 const ssize_t new_size = 2*need_size;
570 paranoid_invariant(new_size <= UINT32_MAX);
571
572 if ((curr_capacity / 2 >= new_size) || // Way too much allocated
573 (curr_free < add_size)) { // No room in mempool
574 // Copy all memory and reconstruct dmt in new mempool.
575 if (curr_free < add_size && toku_mempool_get_frag_size(&this->mp) == 0) {
576 // TODO(yoni) or TODO(leif) consider doing this not just when frag size is zero, but also when it is a small percentage of the total mempool size
577 // Offsets remain the same in the new mempool so we can just realloc.
578 toku_mempool_realloc_larger(&this->mp, new_size);
579 } else if (!this->d.t.root.is_null()) {
580 struct mempool new_kvspace;
581 toku_mempool_construct(&new_kvspace, new_size);
582
583 const dmt_node &n = get_node(this->d.t.root);
584 node_offset *tmp_array;
585 bool malloced = false;
586 tmp_array = alloc_temp_node_offsets(n.weight);
587 if (!tmp_array) {
588 malloced = true;
589 XMALLOC_N(n.weight, tmp_array);
590 }
591 this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
592 for (node_offset i = 0; i < n.weight; i++) {
593 dmt_node &node = get_node(tmp_array[i]);
594 const size_t bytes_to_copy = __builtin_offsetof(dmt_node, value) + node.value_length;
595 const size_t bytes_to_alloc = align(bytes_to_copy);
596 void* newdata = toku_mempool_malloc(&new_kvspace, bytes_to_alloc);
597 memcpy(newdata, &node, bytes_to_copy);
598 tmp_array[i] = toku_mempool_get_offset_from_pointer_and_base(&new_kvspace, newdata);
599 }
600
601 struct mempool old_kvspace = this->mp;
602 this->mp = new_kvspace;
603 this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, n.weight);
604 if (malloced) toku_free(tmp_array);
605 toku_mempool_destroy(&old_kvspace);
606 } else {
607 toku_mempool_destroy(&this->mp);
608 toku_mempool_construct(&this->mp, new_size);
609 }
610 }
611}
612
613template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
614bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::will_need_rebalance(const subtree &subtree, const int leftmod, const int rightmod) const {
615 if (subtree.is_null()) { return false; }
616 const dmt_node &n = get_node(subtree);
617 // one of the 1's is for the root.
618 // the other is to take ceil(n/2)
619 const uint32_t weight_left = this->nweight(n.left) + leftmod;
620 const uint32_t weight_right = this->nweight(n.right) + rightmod;
621 return ((1+weight_left < (1+1+weight_right)/2)
622 ||
623 (1+weight_right < (1+1+weight_left)/2));
624}
625
626template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
627void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_internal(subtree *const subtreep, const dmtwriter_t &value, const uint32_t idx, subtree **const rebalance_subtree) {
628 if (subtreep->is_null()) {
629 paranoid_invariant_zero(idx);
630 const node_offset newoffset = this->node_malloc_and_set_value(value);
631 dmt_node &newnode = get_node(newoffset);
632 newnode.weight = 1;
633 newnode.left.set_to_null();
634 newnode.right.set_to_null();
635 subtreep->set_offset(newoffset);
636 } else {
637 dmt_node &n = get_node(*subtreep);
638 n.weight++;
639 if (idx <= this->nweight(n.left)) {
640 if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 1, 0)) {
641 *rebalance_subtree = subtreep;
642 }
643 this->insert_internal(&n.left, value, idx, rebalance_subtree);
644 } else {
645 if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, 1)) {
646 *rebalance_subtree = subtreep;
647 }
648 const uint32_t sub_index = idx - this->nweight(n.left) - 1;
649 this->insert_internal(&n.right, value, sub_index, rebalance_subtree);
650 }
651 }
652}
653
654template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
655void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::delete_internal(subtree *const subtreep, const uint32_t idx, subtree *const subtree_replace, subtree **const rebalance_subtree) {
656 paranoid_invariant_notnull(subtreep);
657 paranoid_invariant_notnull(rebalance_subtree);
658 paranoid_invariant(!subtreep->is_null());
659 dmt_node &n = get_node(*subtreep);
660 const uint32_t leftweight = this->nweight(n.left);
661 if (idx < leftweight) {
662 n.weight--;
663 if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, -1, 0)) {
664 *rebalance_subtree = subtreep;
665 }
666 this->delete_internal(&n.left, idx, subtree_replace, rebalance_subtree);
667 } else if (idx == leftweight) {
668 // Found the correct index.
669 if (n.left.is_null()) {
670 paranoid_invariant_zero(idx);
671 // Delete n and let parent point to n.right
672 subtree ptr_this = *subtreep;
673 *subtreep = n.right;
674 subtree to_free;
675 if (subtree_replace != nullptr) {
676 // Swap self with the other node. Taking over all responsibility.
677 to_free = *subtree_replace;
678 dmt_node &ancestor = get_node(*subtree_replace);
679 if (*rebalance_subtree == &ancestor.right) {
680 // Take over rebalance responsibility.
681 *rebalance_subtree = &n.right;
682 }
683 n.weight = ancestor.weight;
684 n.left = ancestor.left;
685 n.right = ancestor.right;
686 *subtree_replace = ptr_this;
687 } else {
688 to_free = ptr_this;
689 }
690 this->node_free(to_free);
691 } else if (n.right.is_null()) {
692 // Delete n and let parent point to n.left
693 subtree to_free = *subtreep;
694 *subtreep = n.left;
695 paranoid_invariant(idx>0);
696 paranoid_invariant_null(subtree_replace); // To be recursive, we're looking for index 0. n is index > 0 here.
697 this->node_free(to_free);
698 } else {
699 if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) {
700 *rebalance_subtree = subtreep;
701 }
702 // don't need to copy up value, it's only used by this
703 // next call, and when that gets to the bottom there
704 // won't be any more recursion
705 n.weight--;
706 this->delete_internal(&n.right, 0, subtreep, rebalance_subtree);
707 }
708 } else {
709 n.weight--;
710 if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) {
711 *rebalance_subtree = subtreep;
712 }
713 this->delete_internal(&n.right, idx - leftweight - 1, subtree_replace, rebalance_subtree);
714 }
715}
716
717template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
718template<typename iterate_extra_t,
719 int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
720int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_internal_array(const uint32_t left, const uint32_t right,
721 iterate_extra_t *const iterate_extra) const {
722 int r;
723 for (uint32_t i = left; i < right; ++i) {
724 r = f(this->value_length, *get_array_value(i), i, iterate_extra);
725 if (r != 0) {
726 return r;
727 }
728 }
729 return 0;
730}
731
732template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
733template<typename iterate_extra_t,
734 int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
735void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr_internal(const uint32_t left, const uint32_t right,
736 const subtree &subtree, const uint32_t idx,
737 iterate_extra_t *const iterate_extra) {
738 if (!subtree.is_null()) {
739 dmt_node &n = get_node(subtree);
740 const uint32_t idx_root = idx + this->nweight(n.left);
741 if (left < idx_root) {
742 this->iterate_ptr_internal<iterate_extra_t, f>(left, right, n.left, idx, iterate_extra);
743 }
744 if (left <= idx_root && idx_root < right) {
745 int r = f(n.value_length, &n.value, idx_root, iterate_extra);
746 lazy_assert_zero(r);
747 }
748 if (idx_root + 1 < right) {
749 this->iterate_ptr_internal<iterate_extra_t, f>(left, right, n.right, idx_root + 1, iterate_extra);
750 }
751 }
752}
753
754template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
755template<typename iterate_extra_t,
756 int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
757void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr_internal_array(const uint32_t left, const uint32_t right,
758 iterate_extra_t *const iterate_extra) {
759 for (uint32_t i = left; i < right; ++i) {
760 int r = f(this->value_length, get_array_value(i), i, iterate_extra);
761 lazy_assert_zero(r);
762 }
763}
764
765template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
766template<typename iterate_extra_t,
767 int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
768int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_internal(const uint32_t left, const uint32_t right,
769 const subtree &subtree, const uint32_t idx,
770 iterate_extra_t *const iterate_extra) const {
771 if (subtree.is_null()) { return 0; }
772 int r;
773 const dmt_node &n = get_node(subtree);
774 const uint32_t idx_root = idx + this->nweight(n.left);
775 if (left < idx_root) {
776 r = this->iterate_internal<iterate_extra_t, f>(left, right, n.left, idx, iterate_extra);
777 if (r != 0) { return r; }
778 }
779 if (left <= idx_root && idx_root < right) {
780 r = f(n.value_length, n.value, idx_root, iterate_extra);
781 if (r != 0) { return r; }
782 }
783 if (idx_root + 1 < right) {
784 return this->iterate_internal<iterate_extra_t, f>(left, right, n.right, idx_root + 1, iterate_extra);
785 }
786 return 0;
787}
788
789template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
790void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch_internal_array(const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const {
791 copyout(value_len, value, this->value_length, get_array_value(i));
792}
793
794template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
795void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch_internal(const subtree &subtree, const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const {
796 dmt_node &n = get_node(subtree);
797 const uint32_t leftweight = this->nweight(n.left);
798 if (i < leftweight) {
799 this->fetch_internal(n.left, i, value_len, value);
800 } else if (i == leftweight) {
801 copyout(value_len, value, &n);
802 } else {
803 this->fetch_internal(n.right, i - leftweight - 1, value_len, value);
804 }
805}
806
807template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
808void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fill_array_with_subtree_offsets(node_offset *const array, const subtree &subtree) const {
809 if (!subtree.is_null()) {
810 const dmt_node &tree = get_node(subtree);
811 this->fill_array_with_subtree_offsets(&array[0], tree.left);
812 array[this->nweight(tree.left)] = subtree.get_offset();
813 this->fill_array_with_subtree_offsets(&array[this->nweight(tree.left) + 1], tree.right);
814 }
815}
816
817template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
818void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::rebuild_subtree_from_offsets(subtree *const subtree, const node_offset *const offsets, const uint32_t numvalues) {
819 if (numvalues==0) {
820 subtree->set_to_null();
821 } else {
822 uint32_t halfway = numvalues/2;
823 subtree->set_offset(offsets[halfway]);
824 dmt_node &newnode = get_node(offsets[halfway]);
825 newnode.weight = numvalues;
826 // value is already in there.
827 this->rebuild_subtree_from_offsets(&newnode.left, &offsets[0], halfway);
828 this->rebuild_subtree_from_offsets(&newnode.right, &offsets[halfway+1], numvalues-(halfway+1));
829 }
830}
831
832//TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
833template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
834node_offset* dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::alloc_temp_node_offsets(uint32_t num_offsets) {
835 size_t mem_needed = num_offsets * sizeof(node_offset);
836 size_t mem_free;
837 mem_free = toku_mempool_get_free_size(&this->mp);
838 node_offset* CAST_FROM_VOIDP(tmp, toku_mempool_get_next_free_ptr(&this->mp));
839 if (mem_free >= mem_needed) {
840 return tmp;
841 }
842 return nullptr;
843}
844
845template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
846void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::rebalance(subtree *const subtree) {
847 paranoid_invariant(!subtree->is_null());
848
849 // There is a possible "optimization" here:
850 // if (this->values_same_size && subtree == &this->d.t.root) {
851 // this->convert_from_tree_to_array();
852 // return;
853 // }
854 // but we don't want to do it because it involves actually copying values around
855 // as opposed to stopping in the middle of rebalancing (like in the OMT)
856
857 node_offset offset = subtree->get_offset();
858 const dmt_node &n = get_node(offset);
859 node_offset *tmp_array;
860 bool malloced = false;
861 tmp_array = alloc_temp_node_offsets(n.weight);
862 if (!tmp_array) {
863 malloced = true;
864 XMALLOC_N(n.weight, tmp_array);
865 }
866 this->fill_array_with_subtree_offsets(tmp_array, *subtree);
867 this->rebuild_subtree_from_offsets(subtree, tmp_array, n.weight);
868 if (malloced) toku_free(tmp_array);
869}
870
871template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
872void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t *const out, const dmt_node *const n) {
873 if (outlen) {
874 *outlen = n->value_length;
875 }
876 if (out) {
877 *out = n->value;
878 }
879}
880
881template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
882void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t **const out, dmt_node *const n) {
883 if (outlen) {
884 *outlen = n->value_length;
885 }
886 if (out) {
887 *out = &n->value;
888 }
889}
890
891template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
892void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t *const out, const uint32_t len, const dmtdata_t *const stored_value_ptr) {
893 if (outlen) {
894 *outlen = len;
895 }
896 if (out) {
897 *out = *stored_value_ptr;
898 }
899}
900
901template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
902void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t **const out, const uint32_t len, dmtdata_t *const stored_value_ptr) {
903 if (outlen) {
904 *outlen = len;
905 }
906 if (out) {
907 *out = stored_value_ptr;
908 }
909}
910
911template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
912template<typename dmtcmp_t,
913 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
914int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_zero_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
915 paranoid_invariant_notnull(idxp);
916 uint32_t min = 0;
917 uint32_t limit = this->d.a.num_values;
918 uint32_t best_pos = subtree::NODE_NULL;
919 uint32_t best_zero = subtree::NODE_NULL;
920
921 while (min!=limit) {
922 uint32_t mid = (min + limit) / 2;
923 int hv = h(this->value_length, *get_array_value(mid), extra);
924 if (hv<0) {
925 min = mid+1;
926 }
927 else if (hv>0) {
928 best_pos = mid;
929 limit = mid;
930 }
931 else {
932 best_zero = mid;
933 limit = mid;
934 }
935 }
936 if (best_zero!=subtree::NODE_NULL) {
937 //Found a zero
938 copyout(value_len, value, this->value_length, get_array_value(best_zero));
939 *idxp = best_zero;
940 return 0;
941 }
942 if (best_pos!=subtree::NODE_NULL) *idxp = best_pos;
943 else *idxp = this->d.a.num_values;
944 return DB_NOTFOUND;
945}
946
947template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
948template<typename dmtcmp_t,
949 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
950int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_zero(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
951 paranoid_invariant_notnull(idxp);
952 if (subtree.is_null()) {
953 *idxp = 0;
954 return DB_NOTFOUND;
955 }
956 dmt_node &n = get_node(subtree);
957 int hv = h(n.value_length, n.value, extra);
958 if (hv<0) {
959 int r = this->find_internal_zero<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
960 *idxp += this->nweight(n.left)+1;
961 return r;
962 } else if (hv>0) {
963 return this->find_internal_zero<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
964 } else {
965 int r = this->find_internal_zero<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
966 if (r==DB_NOTFOUND) {
967 *idxp = this->nweight(n.left);
968 copyout(value_len, value, &n);
969 r = 0;
970 }
971 return r;
972 }
973}
974
975template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
976template<typename dmtcmp_t,
977 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
978int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_plus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
979 paranoid_invariant_notnull(idxp);
980 uint32_t min = 0;
981 uint32_t limit = this->d.a.num_values;
982 uint32_t best = subtree::NODE_NULL;
983
984 while (min != limit) {
985 const uint32_t mid = (min + limit) / 2;
986 const int hv = h(this->value_length, *get_array_value(mid), extra);
987 if (hv > 0) {
988 best = mid;
989 limit = mid;
990 } else {
991 min = mid + 1;
992 }
993 }
994 if (best == subtree::NODE_NULL) { return DB_NOTFOUND; }
995 copyout(value_len, value, this->value_length, get_array_value(best));
996 *idxp = best;
997 return 0;
998}
999
1000template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1001template<typename dmtcmp_t,
1002 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
1003int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_plus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1004 paranoid_invariant_notnull(idxp);
1005 if (subtree.is_null()) {
1006 return DB_NOTFOUND;
1007 }
1008 dmt_node & n = get_node(subtree);
1009 int hv = h(n.value_length, n.value, extra);
1010 int r;
1011 if (hv > 0) {
1012 r = this->find_internal_plus<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
1013 if (r == DB_NOTFOUND) {
1014 *idxp = this->nweight(n.left);
1015 copyout(value_len, value, &n);
1016 r = 0;
1017 }
1018 } else {
1019 r = this->find_internal_plus<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
1020 if (r == 0) {
1021 *idxp += this->nweight(n.left) + 1;
1022 }
1023 }
1024 return r;
1025}
1026
1027template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1028template<typename dmtcmp_t,
1029 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
1030int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_minus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1031 paranoid_invariant_notnull(idxp);
1032 uint32_t min = 0;
1033 uint32_t limit = this->d.a.num_values;
1034 uint32_t best = subtree::NODE_NULL;
1035
1036 while (min != limit) {
1037 const uint32_t mid = (min + limit) / 2;
1038 const int hv = h(this->value_length, *get_array_value(mid), extra);
1039 if (hv < 0) {
1040 best = mid;
1041 min = mid + 1;
1042 } else {
1043 limit = mid;
1044 }
1045 }
1046 if (best == subtree::NODE_NULL) { return DB_NOTFOUND; }
1047 copyout(value_len, value, this->value_length, get_array_value(best));
1048 *idxp = best;
1049 return 0;
1050}
1051
1052template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1053template<typename dmtcmp_t,
1054 int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
1055int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_minus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1056 paranoid_invariant_notnull(idxp);
1057 if (subtree.is_null()) {
1058 return DB_NOTFOUND;
1059 }
1060 dmt_node & n = get_node(subtree);
1061 int hv = h(n.value_length, n.value, extra);
1062 if (hv < 0) {
1063 int r = this->find_internal_minus<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
1064 if (r == 0) {
1065 *idxp += this->nweight(n.left) + 1;
1066 } else if (r == DB_NOTFOUND) {
1067 *idxp = this->nweight(n.left);
1068 copyout(value_len, value, &n);
1069 r = 0;
1070 }
1071 return r;
1072 } else {
1073 return this->find_internal_minus<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
1074 }
1075}
1076
1077template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1078uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_fixed_length(void) const {
1079 return this->values_same_size ? this->value_length : 0;
1080}
1081
1082template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1083uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_fixed_length_alignment_overhead(void) const {
1084 return this->values_same_size ? align(this->value_length) - this->value_length : 0;
1085}
1086
1087template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1088bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::value_length_is_fixed(void) const {
1089 return this->values_same_size;
1090}
1091
1092template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1093void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::serialize_values(uint32_t expected_unpadded_memory, struct wbuf *wb) const {
1094 invariant(this->is_array);
1095 invariant(this->values_same_size);
1096 const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
1097 const uint32_t fixed_len = this->value_length;
1098 const uint32_t fixed_aligned_len = align(this->value_length);
1099 paranoid_invariant(expected_unpadded_memory == this->d.a.num_values * this->value_length);
1100 paranoid_invariant(toku_mempool_get_used_size(&this->mp) >=
1101 expected_unpadded_memory + pad_bytes * this->d.a.num_values);
1102 if (this->d.a.num_values == 0) {
1103 // Nothing to serialize
1104 } else if (pad_bytes == 0) {
1105 // Basically a memcpy
1106 wbuf_nocrc_literal_bytes(wb, get_array_value(0), expected_unpadded_memory);
1107 } else {
1108 uint8_t* const dest = wbuf_nocrc_reserve_literal_bytes(wb, expected_unpadded_memory);
1109 const uint8_t* const src = reinterpret_cast<uint8_t*>(get_array_value(0));
1110 //TODO(leif) maybe look at vectorization here
1111 for (uint32_t i = 0; i < this->d.a.num_values; i++) {
1112 memcpy(&dest[i*fixed_len], &src[i*fixed_aligned_len], fixed_len);
1113 }
1114 }
1115}
1116
1117template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1118void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::create(uint32_t _max_values, uint32_t _max_value_bytes) {
1119 this->max_values = _max_values;
1120 this->max_value_bytes = _max_value_bytes;
1121 this->temp.create();
1122 paranoid_invariant_null(toku_mempool_get_base(&this->temp.mp));
1123 this->temp_valid = true;
1124 this->sorted_node_offsets = nullptr;
1125 // Include enough space for alignment padding
1126 size_t initial_space = (ALIGNMENT - 1) * _max_values + _max_value_bytes;
1127
1128 toku_mempool_construct(&this->temp.mp, initial_space); // Adds 25%
1129}
1130
1131template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1132void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::append(const dmtwriter_t &value) {
1133 paranoid_invariant(this->temp_valid);
1134 //NOTE: Always use d.a.num_values for size because we have not yet created root.
1135 if (this->temp.values_same_size && (this->temp.d.a.num_values == 0 || value.get_size() == this->temp.value_length)) {
1136 temp.insert_at_array_end<false>(value);
1137 return;
1138 }
1139 if (this->temp.is_array) {
1140 // Convert to tree format (without weights and linkage)
1141 XMALLOC_N(this->max_values, this->sorted_node_offsets);
1142
1143 // Include enough space for alignment padding
1144 size_t mem_needed = (ALIGNMENT - 1 + __builtin_offsetof(dmt_node, value)) * max_values + max_value_bytes;
1145 struct mempool old_mp = this->temp.mp;
1146
1147 const uint32_t num_values = this->temp.d.a.num_values;
1148 toku_mempool_construct(&this->temp.mp, mem_needed);
1149
1150 // Copy over and get node_offsets
1151 for (uint32_t i = 0; i < num_values; i++) {
1152 dmtwriter_t writer(this->temp.value_length, this->temp.get_array_value_internal(&old_mp, i));
1153 this->sorted_node_offsets[i] = this->temp.node_malloc_and_set_value(writer);
1154 }
1155 this->temp.is_array = false;
1156 this->temp.values_same_size = false;
1157 this->temp.value_length = 0;
1158 toku_mempool_destroy(&old_mp);
1159 }
1160 paranoid_invariant(!this->temp.is_array);
1161 this->sorted_node_offsets[this->temp.d.a.num_values++] = this->temp.node_malloc_and_set_value(value);
1162}
1163
1164template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1165bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::value_length_is_fixed(void) {
1166 paranoid_invariant(this->temp_valid);
1167 return this->temp.values_same_size;
1168}
1169
1170template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1171void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::build(dmt<dmtdata_t, dmtdataout_t, dmtwriter_t> *dest) {
1172 invariant(this->temp_valid);
1173 //NOTE: Always use d.a.num_values for size because we have not yet created root.
1174 invariant(this->temp.d.a.num_values <= this->max_values);
1175 // Memory invariant is taken care of incrementally (during append())
1176
1177 if (!this->temp.is_array) {
1178 invariant_notnull(this->sorted_node_offsets);
1179 this->temp.rebuild_subtree_from_offsets(&this->temp.d.t.root, this->sorted_node_offsets, this->temp.d.a.num_values);
1180 toku_free(this->sorted_node_offsets);
1181 this->sorted_node_offsets = nullptr;
1182 }
1183 paranoid_invariant_null(this->sorted_node_offsets);
1184
1185 const size_t used = toku_mempool_get_used_size(&this->temp.mp);
1186 const size_t allocated = toku_mempool_get_size(&this->temp.mp);
1187 // We want to use no more than (about) the actual used space + 25% overhead for mempool growth.
1188 // When we know the elements are fixed-length, we use the better dmt constructor.
1189 // In practice, as of Jan 2014, we use the builder in two cases:
1190 // - When we know the elements are not fixed-length.
1191 // - During upgrade of a pre version 26 basement node.
1192 // During upgrade, we will probably wildly overallocate because we don't account for the values that aren't stored in the dmt, so here we want to shrink the mempool.
1193 // When we know the elements are not fixed-length, we still know how much memory they occupy in total, modulo alignment, so we want to allow for mempool overhead and worst-case alignment overhead, and not shrink the mempool.
1194 const size_t max_allowed = used + (ALIGNMENT-1) * this->temp.size();
1195 const size_t max_allowed_with_mempool_overhead = max_allowed + max_allowed / 4;
1196 //TODO(leif): get footprint calculation correct (under jemalloc) and add some form of footprint constraint
1197 if (allocated > max_allowed_with_mempool_overhead) {
1198 // Reallocate smaller mempool to save memory
1199 invariant_zero(toku_mempool_get_frag_size(&this->temp.mp));
1200 struct mempool new_mp;
1201 toku_mempool_construct(&new_mp, used);
1202 void * newbase = toku_mempool_malloc(&new_mp, used);
1203 invariant_notnull(newbase);
1204 memcpy(newbase, toku_mempool_get_base(&this->temp.mp), used);
1205 toku_mempool_destroy(&this->temp.mp);
1206 this->temp.mp = new_mp;
1207 }
1208
1209 *dest = this->temp;
1210 this->temp_valid = false;
1211
1212}
1213} // namespace toku
1214