1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3/*======
4This file is part of PerconaFT.
5
6
7Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8
9 PerconaFT is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License, version 2,
11 as published by the Free Software Foundation.
12
13 PerconaFT is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
20
21----------------------------------------
22
23 PerconaFT is free software: you can redistribute it and/or modify
24 it under the terms of the GNU Affero General Public License, version 3,
25 as published by the Free Software Foundation.
26
27 PerconaFT is distributed in the hope that it will be useful,
28 but WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 GNU Affero General Public License for more details.
31
32 You should have received a copy of the GNU Affero General Public License
33 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
34======= */
35
36#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37
38#include <my_global.h>
39#include <string>
40
41#include "portability/memory.h"
42
43#include "ft/node.h"
44#include "ft/serialize/rbuf.h"
45#include "ft/serialize/wbuf.h"
46
47void ftnode_pivot_keys::create_empty() {
48 _num_pivots = 0;
49 _total_size = 0;
50 _fixed_keys = nullptr;
51 _fixed_keylen = 0;
52 _fixed_keylen_aligned = 0;
53 _dbt_keys = nullptr;
54}
55
56void ftnode_pivot_keys::create_from_dbts(const DBT *keys, int n) {
57 create_empty();
58 _num_pivots = n;
59
60 // see if every key has the same length
61 bool keys_same_size = true;
62 for (int i = 1; i < _num_pivots; i++) {
63 if (keys[i].size != keys[i - 1].size) {
64 keys_same_size = false;
65 break;
66 }
67 }
68
69 if (keys_same_size && _num_pivots > 0) {
70 // if so, store pivots in a tightly packed array of fixed length keys
71 _fixed_keylen = keys[0].size;
72 _fixed_keylen_aligned = _align4(_fixed_keylen);
73 _total_size = _fixed_keylen_aligned * _num_pivots;
74 XMALLOC_N_ALIGNED(64, _total_size, _fixed_keys);
75 for (int i = 0; i < _num_pivots; i++) {
76 invariant(keys[i].size == _fixed_keylen);
77 memcpy(_fixed_key(i), keys[i].data, _fixed_keylen);
78 }
79 } else {
80 // otherwise we'll just store the pivots in an array of dbts
81 XMALLOC_N_ALIGNED(64, _num_pivots, _dbt_keys);
82 for (int i = 0; i < _num_pivots; i++) {
83 size_t size = keys[i].size;
84 toku_memdup_dbt(&_dbt_keys[i], keys[i].data, size);
85 _total_size += size;
86 }
87 }
88
89 sanity_check();
90}
91
92void ftnode_pivot_keys::_create_from_fixed_keys(const char *fixedkeys, size_t fixed_keylen, int n) {
93 create_empty();
94 _num_pivots = n;
95 _fixed_keylen = fixed_keylen;
96 _fixed_keylen_aligned = _align4(fixed_keylen);
97 _total_size = _fixed_keylen_aligned * _num_pivots;
98 XMEMDUP_N(_fixed_keys, fixedkeys, _total_size);
99}
100
101// effect: create pivot keys as a clone of an existing set of pivotkeys
102void ftnode_pivot_keys::create_from_pivot_keys(const ftnode_pivot_keys &pivotkeys) {
103 if (pivotkeys._fixed_format()) {
104 _create_from_fixed_keys(pivotkeys._fixed_keys, pivotkeys._fixed_keylen, pivotkeys._num_pivots);
105 } else {
106 create_from_dbts(pivotkeys._dbt_keys, pivotkeys._num_pivots);
107 }
108
109 sanity_check();
110}
111
112void ftnode_pivot_keys::destroy() {
113 if (_dbt_keys != nullptr) {
114 for (int i = 0; i < _num_pivots; i++) {
115 toku_destroy_dbt(&_dbt_keys[i]);
116 }
117 toku_free(_dbt_keys);
118 _dbt_keys = nullptr;
119 }
120 if (_fixed_keys != nullptr) {
121 toku_free(_fixed_keys);
122 _fixed_keys = nullptr;
123 }
124 _fixed_keylen = 0;
125 _fixed_keylen_aligned = 0;
126 _num_pivots = 0;
127 _total_size = 0;
128}
129
130void ftnode_pivot_keys::_convert_to_fixed_format() {
131 invariant(!_fixed_format());
132
133 // convert to a tightly packed array of fixed length keys
134 _fixed_keylen = _dbt_keys[0].size;
135 _fixed_keylen_aligned = _align4(_fixed_keylen);
136 _total_size = _fixed_keylen_aligned * _num_pivots;
137 XMALLOC_N_ALIGNED(64, _total_size, _fixed_keys);
138 for (int i = 0; i < _num_pivots; i++) {
139 invariant(_dbt_keys[i].size == _fixed_keylen);
140 memcpy(_fixed_key(i), _dbt_keys[i].data, _fixed_keylen);
141 }
142
143 // destroy the dbt array format
144 for (int i = 0; i < _num_pivots; i++) {
145 toku_destroy_dbt(&_dbt_keys[i]);
146 }
147 toku_free(_dbt_keys);
148 _dbt_keys = nullptr;
149
150 invariant(_fixed_format());
151 sanity_check();
152}
153
154void ftnode_pivot_keys::_convert_to_dbt_format() {
155 invariant(_fixed_format());
156
157 // convert to an aray of dbts
158 REALLOC_N_ALIGNED(64, _num_pivots, _dbt_keys);
159 for (int i = 0; i < _num_pivots; i++) {
160 toku_memdup_dbt(&_dbt_keys[i], _fixed_key(i), _fixed_keylen);
161 }
162 // pivots sizes are not aligned up dbt format
163 _total_size = _num_pivots * _fixed_keylen;
164
165 // destroy the fixed key format
166 toku_free(_fixed_keys);
167 _fixed_keys = nullptr;
168 _fixed_keylen = 0;
169 _fixed_keylen_aligned = 0;
170
171 invariant(!_fixed_format());
172 sanity_check();
173}
174
175void ftnode_pivot_keys::deserialize_from_rbuf(struct rbuf *rb, int n) {
176 _num_pivots = n;
177 _total_size = 0;
178 _fixed_keys = nullptr;
179 _fixed_keylen = 0;
180 _dbt_keys = nullptr;
181
182 XMALLOC_N_ALIGNED(64, _num_pivots, _dbt_keys);
183 bool keys_same_size = true;
184 for (int i = 0; i < _num_pivots; i++) {
185 const void *pivotkeyptr;
186 uint32_t size;
187 rbuf_bytes(rb, &pivotkeyptr, &size);
188 toku_memdup_dbt(&_dbt_keys[i], pivotkeyptr, size);
189 _total_size += size;
190 if (i > 0 && keys_same_size && _dbt_keys[i].size != _dbt_keys[i - 1].size) {
191 // not all keys are the same size, we'll stick to the dbt array format
192 keys_same_size = false;
193 }
194 }
195
196 if (keys_same_size && _num_pivots > 0) {
197 _convert_to_fixed_format();
198 }
199
200 sanity_check();
201}
202
203DBT ftnode_pivot_keys::get_pivot(int i) const {
204 paranoid_invariant(i < _num_pivots);
205 if (_fixed_format()) {
206 paranoid_invariant(i * _fixed_keylen_aligned < _total_size);
207 DBT dbt;
208 toku_fill_dbt(&dbt, _fixed_key(i), _fixed_keylen);
209 return dbt;
210 } else {
211 return _dbt_keys[i];
212 }
213}
214
215DBT *ftnode_pivot_keys::fill_pivot(int i, DBT *dbt) const {
216 paranoid_invariant(i < _num_pivots);
217 if (_fixed_format()) {
218 toku_fill_dbt(dbt, _fixed_key(i), _fixed_keylen);
219 } else {
220 toku_copyref_dbt(dbt, _dbt_keys[i]);
221 }
222 return dbt;
223}
224
225void ftnode_pivot_keys::_add_key_dbt(const DBT *key, int i) {
226 toku_clone_dbt(&_dbt_keys[i], *key);
227 _total_size += _dbt_keys[i].size;
228}
229
230void ftnode_pivot_keys::_destroy_key_dbt(int i) {
231 invariant(_total_size >= _dbt_keys[i].size);
232 _total_size -= _dbt_keys[i].size;
233 toku_destroy_dbt(&_dbt_keys[i]);
234}
235
236void ftnode_pivot_keys::_insert_at_dbt(const DBT *key, int i) {
237 // make space for a new pivot, slide existing keys to the right
238 REALLOC_N_ALIGNED(64, _num_pivots + 1, _dbt_keys);
239 memmove(&_dbt_keys[i + 1], &_dbt_keys[i], (_num_pivots - i) * sizeof(DBT));
240 _add_key_dbt(key, i);
241}
242
243void ftnode_pivot_keys::_insert_at_fixed(const DBT *key, int i) {
244 REALLOC_N_ALIGNED(64, (_num_pivots + 1) * _fixed_keylen_aligned, _fixed_keys);
245 // TODO: This is not going to be valgrind-safe, because we do not initialize the space
246 // between _fixed_keylen and _fixed_keylen_aligned (but we probably should)
247 memmove(_fixed_key(i + 1), _fixed_key(i), (_num_pivots - i) * _fixed_keylen_aligned);
248 memcpy(_fixed_key(i), key->data, _fixed_keylen);
249 _total_size += _fixed_keylen_aligned;
250}
251
252void ftnode_pivot_keys::insert_at(const DBT *key, int i) {
253 invariant(i <= _num_pivots); // it's ok to insert at the end, so we check <= n
254
255 // if the new key doesn't have the same size, we can't be in fixed format
256 if (_fixed_format() && key->size != _fixed_keylen) {
257 _convert_to_dbt_format();
258 }
259
260 if (_fixed_format()) {
261 _insert_at_fixed(key, i);
262 } else {
263 _insert_at_dbt(key, i);
264 }
265 _num_pivots++;
266
267 invariant(total_size() > 0);
268}
269
270void ftnode_pivot_keys::_append_dbt(const ftnode_pivot_keys &pivotkeys) {
271 REALLOC_N_ALIGNED(64, _num_pivots + pivotkeys._num_pivots, _dbt_keys);
272 bool other_fixed = pivotkeys._fixed_format();
273 for (int i = 0; i < pivotkeys._num_pivots; i++) {
274 size_t size = other_fixed ? pivotkeys._fixed_keylen :
275 pivotkeys._dbt_keys[i].size;
276 toku_memdup_dbt(&_dbt_keys[_num_pivots + i],
277 other_fixed ? pivotkeys._fixed_key(i) :
278 pivotkeys._dbt_keys[i].data,
279 size);
280 _total_size += size;
281 }
282}
283
284void ftnode_pivot_keys::_append_fixed(const ftnode_pivot_keys &pivotkeys) {
285 if (pivotkeys._fixed_format() && pivotkeys._fixed_keylen == _fixed_keylen) {
286 // other pivotkeys have the same fixed keylen
287 REALLOC_N_ALIGNED(64, (_num_pivots + pivotkeys._num_pivots) * _fixed_keylen_aligned, _fixed_keys);
288 memcpy(_fixed_key(_num_pivots), pivotkeys._fixed_keys, pivotkeys._total_size);
289 _total_size += pivotkeys._total_size;
290 } else {
291 // must convert to dbt format, other pivotkeys have different length'd keys
292 _convert_to_dbt_format();
293 _append_dbt(pivotkeys);
294 }
295}
296
297void ftnode_pivot_keys::append(const ftnode_pivot_keys &pivotkeys) {
298 if (_fixed_format()) {
299 _append_fixed(pivotkeys);
300 } else {
301 _append_dbt(pivotkeys);
302 }
303 _num_pivots += pivotkeys._num_pivots;
304
305 sanity_check();
306}
307
308void ftnode_pivot_keys::_replace_at_dbt(const DBT *key, int i) {
309 _destroy_key_dbt(i);
310 _add_key_dbt(key, i);
311}
312
313void ftnode_pivot_keys::_replace_at_fixed(const DBT *key, int i) {
314 if (key->size == _fixed_keylen) {
315 memcpy(_fixed_key(i), key->data, _fixed_keylen);
316 } else {
317 // must convert to dbt format, replacement key has different length
318 _convert_to_dbt_format();
319 _replace_at_dbt(key, i);
320 }
321}
322
323void ftnode_pivot_keys::replace_at(const DBT *key, int i) {
324 if (i < _num_pivots) {
325 if (_fixed_format()) {
326 _replace_at_fixed(key, i);
327 } else {
328 _replace_at_dbt(key, i);
329 }
330 } else {
331 invariant(i == _num_pivots); // appending to the end is ok
332 insert_at(key, i);
333 }
334 invariant(total_size() > 0);
335}
336
337void ftnode_pivot_keys::_delete_at_fixed(int i) {
338 memmove(_fixed_key(i), _fixed_key(i + 1), (_num_pivots - 1 - i) * _fixed_keylen_aligned);
339 _total_size -= _fixed_keylen_aligned;
340}
341
342void ftnode_pivot_keys::_delete_at_dbt(int i) {
343 // slide over existing keys, then shrink down to size
344 _destroy_key_dbt(i);
345 memmove(&_dbt_keys[i], &_dbt_keys[i + 1], (_num_pivots - 1 - i) * sizeof(DBT));
346 REALLOC_N_ALIGNED(64, _num_pivots - 1, _dbt_keys);
347}
348
349void ftnode_pivot_keys::delete_at(int i) {
350 invariant(i < _num_pivots);
351
352 if (_fixed_format()) {
353 _delete_at_fixed(i);
354 } else {
355 _delete_at_dbt(i);
356 }
357
358 _num_pivots--;
359}
360
361void ftnode_pivot_keys::_split_at_fixed(int i, ftnode_pivot_keys *other) {
362 // recreate the other set of pivots from index >= i
363 other->_create_from_fixed_keys(_fixed_key(i), _fixed_keylen, _num_pivots - i);
364
365 // shrink down to size
366 _total_size = i * _fixed_keylen_aligned;
367 REALLOC_N_ALIGNED(64, _total_size, _fixed_keys);
368}
369
370void ftnode_pivot_keys::_split_at_dbt(int i, ftnode_pivot_keys *other) {
371 // recreate the other set of pivots from index >= i
372 other->create_from_dbts(&_dbt_keys[i], _num_pivots - i);
373
374 // destroy everything greater, shrink down to size
375 for (int k = i; k < _num_pivots; k++) {
376 _destroy_key_dbt(k);
377 }
378 REALLOC_N_ALIGNED(64, i, _dbt_keys);
379}
380
381void ftnode_pivot_keys::split_at(int i, ftnode_pivot_keys *other) {
382 if (i < _num_pivots) {
383 if (_fixed_format()) {
384 _split_at_fixed(i, other);
385 } else {
386 _split_at_dbt(i, other);
387 }
388 _num_pivots = i;
389 }
390
391 sanity_check();
392}
393
394void ftnode_pivot_keys::serialize_to_wbuf(struct wbuf *wb) const {
395 bool fixed = _fixed_format();
396 size_t written = 0;
397 for (int i = 0; i < _num_pivots; i++) {
398 size_t size = fixed ? _fixed_keylen : _dbt_keys[i].size;
399 invariant(size);
400 wbuf_nocrc_bytes(wb, fixed ? _fixed_key(i) : _dbt_keys[i].data, size);
401 written += size;
402 }
403 invariant(written == serialized_size());
404}
405
406int ftnode_pivot_keys::num_pivots() const {
407 // if we have fixed size keys, the number of pivots should be consistent
408 paranoid_invariant(_fixed_keys == nullptr || (_total_size == _fixed_keylen_aligned * _num_pivots));
409 return _num_pivots;
410}
411
412size_t ftnode_pivot_keys::total_size() const {
413 // if we have fixed size keys, the total size should be consistent
414 paranoid_invariant(_fixed_keys == nullptr || (_total_size == _fixed_keylen_aligned * _num_pivots));
415 return _total_size;
416}
417
418size_t ftnode_pivot_keys::serialized_size() const {
419 // we only return the size that will be used when serialized, so we calculate based
420 // on the fixed keylen and not the aligned keylen.
421 return _fixed_format() ? _num_pivots * _fixed_keylen : _total_size;
422}
423
424void ftnode_pivot_keys::sanity_check() const {
425 if (_fixed_format()) {
426 invariant(_dbt_keys == nullptr);
427 invariant(_fixed_keylen_aligned == _align4(_fixed_keylen));
428 invariant(_num_pivots * _fixed_keylen <= _total_size);
429 invariant(_num_pivots * _fixed_keylen_aligned == _total_size);
430 } else {
431 invariant(_num_pivots == 0 || _dbt_keys != nullptr);
432 size_t size = 0;
433 for (int i = 0; i < _num_pivots; i++) {
434 size += _dbt_keys[i].size;
435 }
436 invariant(size == _total_size);
437 }
438}
439