1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39// Purpose of this file is to handle all modifications and queries to the database
40// at the level of leafentry.
41//
42// ule = Unpacked Leaf Entry
43//
44// This design unpacks the leafentry into a convenient format, performs all work
45// on the unpacked form, then repacks the leafentry into its compact format.
46//
47// See design documentation for nested transactions at
48// TokuWiki/Imp/TransactionsOverview.
49
50#include <my_global.h>
51#include "portability/toku_portability.h"
52
53#include "ft/ft-internal.h"
54#include "ft/leafentry.h"
55#include "ft/logger/logger.h"
56#include "ft/msg.h"
57#include "ft/txn/txn.h"
58#include "ft/txn/txn_manager.h"
59#include "ft/ule.h"
60#include "ft/ule-internal.h"
61#include "ft/txn/xids.h"
62#include "util/bytestring.h"
63#include "util/omt.h"
64#include "util/partitioned_counter.h"
65#include "util/scoped_malloc.h"
66#include "util/status.h"
67
68#define ULE_DEBUG 0
69
70static uint32_t ule_get_innermost_numbytes(ULE ule, uint32_t keylen);
71
72void toku_le_get_status(LE_STATUS statp) {
73 le_status.init();
74 *statp = le_status;
75}
76
77////////////////////////////////////////////////////////////////////////////////
78// Accessor functions used by outside world (e.g. indexer)
79//
80
81ULEHANDLE toku_ule_create(LEAFENTRY le) {
82 ULE XMALLOC(ule_p);
83 le_unpack(ule_p, le);
84 return (ULEHANDLE) ule_p;
85}
86
87void toku_ule_free(ULEHANDLE ule_p) {
88 ule_cleanup((ULE) ule_p);
89 toku_free(ule_p);
90}
91
92////////////////////////////////////////////////////////////////////////////////
93//
94// Question: Can any software outside this file modify or read a leafentry?
95// If so, is it worthwhile to put it all here?
96//
97// There are two entries, one each for modification and query:
98// toku_le_apply_msg() performs all inserts/deletes/aborts
99//
100//
101//
102//
103
104//This is what we use to initialize Xuxrs[0] in a new unpacked leafentry.
105const UXR_S committed_delete = {
106 .type = XR_DELETE,
107 .vallen = 0,
108 .valp = NULL,
109 .xid = 0
110}; // static allocation of uxr with type set to committed delete and xid = 0
111
112#define INSERT_LENGTH(len) ((1U << 31) | len)
113#define DELETE_LENGTH(len) (0)
114#define GET_LENGTH(len) (len & ((1U << 31)-1))
115#define IS_INSERT(len) (len & (1U << 31))
116#define IS_VALID_LEN(len) (len < (1U<<31))
117
118// Local functions:
119
120static inline void msg_init_empty_ule(ULE ule);
121static int64_t msg_modify_ule(ULE ule, const ft_msg &msg);
122static inline void ule_init_empty_ule(ULE ule);
123static void ule_do_implicit_promotions(ULE ule, XIDS xids);
124static void ule_try_promote_provisional_outermost(
125 ULE ule,
126 TXNID oldest_possible_live_xid);
127static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index);
128static void ule_promote_provisional_innermost_to_committed(ULE ule);
129static inline int64_t ule_apply_insert_no_overwrite(
130 ULE ule,
131 XIDS xids,
132 uint32_t vallen,
133 void* valp);
134static inline int64_t ule_apply_insert(
135 ULE ule,
136 XIDS xids,
137 uint32_t vallen,
138 void* valp);
139static inline int64_t ule_apply_delete(ULE ule, XIDS xids);
140static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids);
141static inline int64_t ule_apply_abort(ULE ule, XIDS xids);
142static void ule_apply_broadcast_commit_all (ULE ule);
143static void ule_apply_commit(ULE ule, XIDS xids);
144static inline void ule_push_insert_uxr(
145 ULE ule,
146 bool is_committed,
147 TXNID xid,
148 uint32_t vallen,
149 void* valp);
150static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid);
151static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid);
152static inline UXR ule_get_innermost_uxr(ULE ule);
153static inline UXR ule_get_first_empty_uxr(ULE ule);
154static inline void ule_remove_innermost_uxr(ULE ule);
155static inline TXNID ule_get_innermost_xid(ULE ule);
156static inline TXNID ule_get_xid(ULE ule, uint32_t index);
157static void ule_remove_innermost_placeholders(ULE ule);
158static void ule_add_placeholders(ULE ule, XIDS xids);
159static void ule_optimize(ULE ule, XIDS xids);
160static inline bool uxr_type_is_insert(uint8_t type);
161static inline bool uxr_type_is_delete(uint8_t type);
162static inline bool uxr_type_is_placeholder(uint8_t type);
163static inline size_t uxr_pack_txnid(UXR uxr, uint8_t *p);
164static inline size_t uxr_pack_type_and_length(UXR uxr, uint8_t *p);
165static inline size_t uxr_pack_length_and_bit(UXR uxr, uint8_t *p);
166static inline size_t uxr_pack_data(UXR uxr, uint8_t *p);
167static inline size_t uxr_unpack_txnid(UXR uxr, uint8_t *p);
168static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p);
169static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p);
170static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p);
171
172#if 0
173static void ule_print(ULE ule, const char* note) {
174 fprintf(stderr, "%s : ULE[0x%p]\n", note, ule);
175 fprintf(stderr, " num_puxrs[%u]\n", ule->num_puxrs);
176 fprintf(stderr, " num_cuxrs[%u]\n", ule->num_cuxrs);
177 fprintf(stderr, " innermost[%u]\n", ule->num_cuxrs + ule->num_puxrs - 1);
178 fprintf(stderr, " first_empty[%u]\n", ule->num_cuxrs + ule->num_puxrs);
179
180 uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs - 1;
181 for (uint32_t uxr_num = 0; uxr_num <= num_uxrs; uxr_num++) {
182 UXR uxr = &(ule->uxrs[uxr_num]);
183 fprintf(stderr, " uxr[%u]\n", uxr_num);
184 switch (uxr->type) {
185 case 0: fprintf(stderr, " type[NONE]\n"); break;
186 case 1: fprintf(stderr, " type[INSERT]\n"); break;
187 case 2: fprintf(stderr, " type[DELETE]\n"); break;
188 case 3: fprintf(stderr, " type[PLACEHOLDER]\n"); break;
189 default: fprintf(stderr, " type[WHAT??]\n"); break;
190 }
191 fprintf(stderr, " xid[%lu]\n", uxr->xid);
192 }
193}
194#endif
195
196static void get_space_for_le(
197 bn_data* data_buffer,
198 uint32_t idx,
199 void* keyp,
200 uint32_t keylen,
201 uint32_t old_keylen,
202 uint32_t old_le_size,
203 size_t size,
204 LEAFENTRY* new_le_space,
205 void** const maybe_free) {
206
207 if (data_buffer == nullptr) {
208 CAST_FROM_VOIDP(*new_le_space, toku_xmalloc(size));
209 } else if (old_le_size > 0) {
210 // this means we are overwriting something
211 data_buffer->get_space_for_overwrite(
212 idx,
213 keyp,
214 keylen,
215 old_keylen,
216 old_le_size,
217 size,
218 new_le_space,
219 maybe_free);
220 } else {
221 // this means we are inserting something new
222 data_buffer->get_space_for_insert(
223 idx,
224 keyp,
225 keylen,
226 size,
227 new_le_space,
228 maybe_free);
229 }
230}
231
232
233/////////////////////////////////////////////////////////////////////
234// Garbage collection related functions
235//
236
237static TXNID get_next_older_txnid(TXNID xc, const xid_omt_t &omt) {
238 int r;
239 TXNID xid;
240 r = omt.find<TXNID, toku_find_xid_by_xid>(xc, -1, &xid, nullptr);
241 if (r==0) {
242 invariant(xid < xc); //sanity check
243 } else {
244 invariant(r==DB_NOTFOUND);
245 xid = TXNID_NONE;
246 }
247 return xid;
248}
249
250//
251// This function returns true if live transaction TL1 is allowed to read a
252// value committed by transaction xc, false otherwise.
253//
254static bool xid_reads_committed_xid(
255 TXNID tl1,
256 TXNID xc,
257 const xid_omt_t& snapshot_txnids,
258 const rx_omt_t& referenced_xids) {
259
260 bool rval;
261 if (tl1 < xc) {
262 rval = false; //cannot read a newer txn
263 } else {
264 TXNID x =
265 toku_get_youngest_live_list_txnid_for(
266 xc,
267 snapshot_txnids,
268 referenced_xids);
269
270 if (x == TXNID_NONE) {
271 //Not in ANY live list, tl1 can read it.
272 rval = true;
273 } else {
274 //Newer than the 'newest one that has it in live list'
275 rval = tl1 > x;
276 }
277 // we know tl1 > xc
278 // we know x > xc
279 // if tl1 == x, then we do not read, because tl1 is in xc's live list
280 // if x is older than tl1, that means that xc < x < tl1
281 // and if xc is in x's live list, it CANNOT be in tl1's live list
282 }
283 return rval;
284}
285
286//
287// This function does some simple garbage collection given a TXNID known
288// to be the oldest referenced xid, that is, the oldest xid in any live list.
289// We find the youngest entry in the stack with an xid less
290// than oldest_referenced_xid. All elements below this entry are garbage,
291// so we get rid of them.
292//
293static void ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
294 if (ule->num_cuxrs == 1) {
295 return;
296 }
297
298 uint32_t curr_index = 0;
299 if (gc_info->mvcc_needed) {
300 // starting at the top of the committed stack, find the first
301 // uxr with a txnid that is less than oldest_referenced_xid
302 for (uint32_t i = 0; i < ule->num_cuxrs; i++) {
303 curr_index = ule->num_cuxrs - i - 1;
304 if (ule->uxrs[curr_index].xid <
305 gc_info->oldest_referenced_xid_for_simple_gc) {
306 break;
307 }
308 }
309 } else {
310 // if mvcc is not needed, we can need the top committed
311 // value and nothing else
312 curr_index = ule->num_cuxrs - 1;
313 }
314
315 // curr_index is now set to the youngest uxr older than
316 // oldest_referenced_xid so if it's not the bottom of the stack..
317 if (curr_index != 0) {
318 // ..then we need to get rid of the entries below curr_index
319 uint32_t num_entries = ule->num_cuxrs + ule->num_puxrs - curr_index;
320 memmove(
321 &ule->uxrs[0],
322 &ule->uxrs[curr_index],
323 num_entries * sizeof(ule->uxrs[0]));
324 ule->uxrs[0].xid = TXNID_NONE; // New 'bottom of stack' loses its TXNID
325 ule->num_cuxrs -= curr_index;
326 }
327}
328
329// TODO: Clean this up
330extern bool garbage_collection_debug;
331
332static void ule_garbage_collect(
333 ULE ule,
334 const xid_omt_t& snapshot_xids,
335 const rx_omt_t& referenced_xids,
336 const xid_omt_t& live_root_txns) {
337
338 if (ule->num_cuxrs == 1) {
339 return;
340 }
341
342 toku::scoped_calloc necessary_buf(ule->num_cuxrs * sizeof(bool));
343 bool *necessary = reinterpret_cast<bool *>(necessary_buf.get());
344
345 uint32_t curr_committed_entry;
346 curr_committed_entry = ule->num_cuxrs - 1;
347 while (true) {
348 // mark the curr_committed_entry as necessary
349 necessary[curr_committed_entry] = true;
350 if (curr_committed_entry == 0) break; //nothing left
351
352 // find the youngest live transaction that reads something
353 // below curr_committed_entry, if it exists
354 TXNID tl1;
355 TXNID xc = ule->uxrs[curr_committed_entry].xid;
356
357 //
358 // If we find that the committed transaction is in the live list,
359 // then xc is really in the process of being committed. It has not
360 // been fully committed. As a result, our assumption that transactions
361 // newer than what is currently in these OMTs will read the top of the
362 // stack is not necessarily accurate. Transactions may read what is
363 // just below xc.
364 // As a result, we must mark what is just below xc as necessary and
365 // move on. This issue was found while testing flusher threads, and was
366 // fixed for #3979
367 //
368 bool is_xc_live = toku_is_txn_in_live_root_txn_list(live_root_txns, xc);
369 if (is_xc_live) {
370 curr_committed_entry--;
371 continue;
372 }
373
374 tl1 =
375 toku_get_youngest_live_list_txnid_for(
376 xc,
377 snapshot_xids,
378 referenced_xids);
379
380 // if tl1 == xc, that means xc should be live and show up in
381 // live_root_txns, which we check above.
382 invariant(tl1 != xc);
383
384 if (tl1 == TXNID_NONE) {
385 // set tl1 to youngest live transaction older than
386 // ule->uxrs[curr_committed_entry]->xid
387 tl1 = get_next_older_txnid(xc, snapshot_xids);
388 if (tl1 == TXNID_NONE) {
389 // remainder is garbage, we're done
390 break;
391 }
392 }
393 if (garbage_collection_debug) {
394 int r =
395 snapshot_xids.find_zero<TXNID, toku_find_xid_by_xid>(
396 tl1,
397 nullptr,
398 nullptr);
399 // make sure that the txn you are claiming is live is actually live
400 invariant_zero(r);
401 }
402 //
403 // tl1 should now be set
404 //
405 curr_committed_entry--;
406 while (curr_committed_entry > 0) {
407 xc = ule->uxrs[curr_committed_entry].xid;
408 if (xid_reads_committed_xid(
409 tl1,
410 xc,
411 snapshot_xids,
412 referenced_xids)) {
413 break;
414 }
415 curr_committed_entry--;
416 }
417 }
418 uint32_t first_free = 0;
419 for (uint32_t i = 0; i < ule->num_cuxrs; i++) {
420 // Shift values to 'delete' garbage values.
421 if (necessary[i]) {
422 ule->uxrs[first_free] = ule->uxrs[i];
423 first_free++;
424 }
425 }
426 uint32_t saved = first_free;
427 invariant(saved <= ule->num_cuxrs);
428 invariant(saved >= 1);
429 ule->uxrs[0].xid = TXNID_NONE; //New 'bottom of stack' loses its TXNID
430 if (first_free != ule->num_cuxrs) {
431 // Shift provisional values
432 memmove(
433 &ule->uxrs[first_free],
434 &ule->uxrs[ule->num_cuxrs],
435 ule->num_puxrs * sizeof(ule->uxrs[0]));
436 }
437 ule->num_cuxrs = saved;
438}
439
440static size_t ule_packed_memsize(ULE ule) {
441// Returns: The size 'ule' would be when packed into a leafentry, or 0 if the
442// topmost committed value is a delete.
443 if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
444 UXR uxr = ule_get_innermost_uxr(ule);
445 if (uxr_is_delete(uxr)) {
446 return 0;
447 }
448 }
449 return le_memsize_from_ule(ule);
450}
451
452// Heuristics to control when we decide to initialize
453// txn manager state (possibly expensive) and run gc.
454enum {
455 ULE_MIN_STACK_SIZE_TO_FORCE_GC = 5,
456 ULE_MIN_MEMSIZE_TO_FORCE_GC = 1024 * 1024
457};
458
459////////////////////////////////////////////////////////////////////////////////
460// This is the big enchilada. (Bring Tums.) Note that this level of
461// abstraction has no knowledge of the inner structure of either leafentry or
462// msg. It makes calls into the next lower layer (msg_xxx) which handles
463// messages.
464//
465// NOTE: This is the only function (at least in this body of code) that modifies
466// a leafentry.
467// NOTE: It is the responsibility of the caller to make sure that the key is set
468// in the FT_MSG, as it will be used to store the data in the data_buffer
469//
470// Returns -1, 0, or 1 that identifies the change in logical row count needed
471// based on the results of the message application. For example, if a delete
472// finds no logical leafentry or if an insert finds a duplicate and is
473// converted to an update.
474//
475// old_leafentry - NULL if there was no stored data.
476// data_buffer - bn_data storing leafentry, if NULL, means there is no bn_data
477// idx - index in data_buffer where leafentry is stored
478// (and should be replaced)
479// old_keylen - length of the any key in data_buffer
480// new_leafentry_p - If the leafentry is destroyed it sets *new_leafentry_p
481// to NULL. Otherwise the new_leafentry_p points at the new
482// leaf entry.
483// numbytes_delta_p - change in total size of key and val, not including any
484// overhead
485int64_t toku_le_apply_msg(
486 const ft_msg& msg,
487 LEAFENTRY old_leafentry,
488 bn_data* data_buffer,
489 uint32_t idx,
490 uint32_t old_keylen,
491 txn_gc_info* gc_info,
492 LEAFENTRY* new_leafentry_p,
493 int64_t* numbytes_delta_p) {
494
495 invariant_notnull(gc_info);
496 paranoid_invariant_notnull(new_leafentry_p);
497 ULE_S ule;
498 int64_t oldnumbytes = 0;
499 int64_t newnumbytes = 0;
500 uint64_t oldmemsize = 0;
501 uint32_t keylen = msg.kdbt()->size;
502 int32_t rowcountdelta = 0;
503
504 if (old_leafentry == NULL) {
505 msg_init_empty_ule(&ule);
506 } else {
507 oldmemsize = leafentry_memsize(old_leafentry);
508 le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
509 oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
510 }
511
512 // modify unpacked leafentry
513 rowcountdelta = msg_modify_ule(&ule, msg);
514
515 // - we may be able to immediately promote the newly-apllied outermost
516 // provisonal uxr
517 // - either way, run simple gc first, and then full gc if there are still
518 // some committed uxrs.
519 ule_try_promote_provisional_outermost(
520 &ule,
521 gc_info->oldest_referenced_xid_for_implicit_promotion);
522 ule_simple_garbage_collection(&ule, gc_info);
523 txn_manager_state *txn_state_for_gc = gc_info->txn_state_for_gc;
524 size_t size_before_gc = 0;
525 // there is garbage to clean, and our caller gave us state..
526 // ..and either the state is pre-initialized, or the committed stack is
527 // large enough
528 // ..or the ule's raw memsize is sufficiently large
529 // ..then it's worth running gc, possibly initializing the txn manager
530 // state, if it isn't already
531 if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr &&
532 (txn_state_for_gc->initialized ||
533 ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC ||
534 (size_before_gc = ule_packed_memsize(&ule)) >=
535 ULE_MIN_MEMSIZE_TO_FORCE_GC)) {
536 if (!txn_state_for_gc->initialized) {
537 txn_state_for_gc->init();
538 }
539 // it's already been calculated above
540 size_before_gc =
541 size_before_gc != 0 ? size_before_gc : ule_packed_memsize(&ule);
542 ule_garbage_collect(
543 &ule,
544 txn_state_for_gc->snapshot_xids,
545 txn_state_for_gc->referenced_xids,
546 txn_state_for_gc->live_root_txns);
547 size_t size_after_gc = ule_packed_memsize(&ule);
548
549 LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
550 LE_STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
551 }
552
553 void* maybe_free = nullptr;
554 // create packed leafentry
555 // contract of this function is caller has keyp and keylen set, always
556 int r =
557 le_pack(
558 &ule,
559 data_buffer,
560 idx,
561 msg.kdbt()->data,
562 keylen,
563 old_keylen,
564 oldmemsize,
565 new_leafentry_p,
566 &maybe_free);
567 invariant_zero(r);
568 if (*new_leafentry_p) {
569 newnumbytes = ule_get_innermost_numbytes(&ule, keylen);
570 }
571 *numbytes_delta_p = newnumbytes - oldnumbytes;
572
573 ule_cleanup(&ule);
574 if (maybe_free != nullptr) {
575 toku_free(maybe_free);
576 }
577 return rowcountdelta;
578}
579
580bool toku_le_worth_running_garbage_collection(
581 LEAFENTRY le,
582 txn_gc_info* gc_info) {
583// Effect: Quickly determines if it's worth trying to run garbage collection
584// on a leafentry
585// Return: True if it makes sense to try garbage collection, false otherwise.
586// Rationale: Garbage collection is likely to clean up under two circumstances:
587// 1.) There are multiple committed entries. Some may never be read
588// by new txns.
589// 2.) There is only one committed entry, but the outermost
590// provisional entry is older than the oldest known referenced
591// xid, so it must have committed. Therefor we can promote it to
592// committed and get rid of the old committed entry.
593 if (le->type != LE_MVCC) {
594 return false;
595 }
596 if (le->u.mvcc.num_cxrs > 1) {
597 return true;
598 } else {
599 paranoid_invariant(le->u.mvcc.num_cxrs == 1);
600 }
601 return le->u.mvcc.num_pxrs > 0 &&
602 le_outermost_uncommitted_xid(le) <
603 gc_info->oldest_referenced_xid_for_implicit_promotion;
604}
605
606// Garbage collect one leaf entry, using the given OMT's.
607// Parameters:
608// -- old_leaf_entry : the leaf we intend to clean up through garbage
609// collecting.
610// -- new_leaf_entry (OUTPUT) : a pointer to the leaf entry after
611// garbage collection.
612// -- new_leaf_entry_memory_size : after this call, our leaf entry
613// should be empty or smaller. This number represents that and is
614// used in a previous call to truncate the existing size.
615// -- omt : the memory where our leaf entry resides.
616// -- mp : our memory pool.
617// -- maybe_free (OUTPUT) : in a previous call, we may be able to free
618// the memory completely, if we removed the leaf entry.
619// -- snapshot_xids : we use these in memory transaction ids to
620// determine what to garbage collect.
621// -- referenced_xids : list of in memory active transactions.
622// NOTE: it is not a good idea to garbage collect a leaf
623// entry with only one committed value.
624void toku_le_garbage_collect(
625 LEAFENTRY old_leaf_entry,
626 bn_data* data_buffer,
627 uint32_t idx,
628 void* keyp,
629 uint32_t keylen,
630 txn_gc_info* gc_info,
631 LEAFENTRY* new_leaf_entry,
632 int64_t* numbytes_delta_p) {
633
634 // We shouldn't want to run gc without having provided a snapshot of the
635 // txn system.
636 invariant_notnull(gc_info);
637 invariant_notnull(gc_info->txn_state_for_gc);
638 paranoid_invariant_notnull(new_leaf_entry);
639 ULE_S ule;
640 int64_t oldnumbytes = 0;
641 int64_t newnumbytes = 0;
642
643 le_unpack(&ule, old_leaf_entry);
644
645 oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
646 uint32_t old_mem_size = leafentry_memsize(old_leaf_entry);
647
648 // Before running garbage collection, try to promote the outermost
649 // provisional entries to committed if its xid is older than the oldest
650 // possible live xid.
651 //
652 // The oldest known refeferenced xid is a lower bound on the oldest possible
653 // live xid, so we use that. It's usually close enough to get rid of most
654 // garbage in leafentries.
655 ule_try_promote_provisional_outermost(
656 &ule,
657 gc_info->oldest_referenced_xid_for_implicit_promotion);
658 // No need to run simple gc here if we're going straight for full gc.
659 if (ule.num_cuxrs > 1) {
660 size_t size_before_gc = ule_packed_memsize(&ule);
661 ule_garbage_collect(
662 &ule,
663 gc_info->txn_state_for_gc->snapshot_xids,
664 gc_info->txn_state_for_gc->referenced_xids,
665 gc_info->txn_state_for_gc->live_root_txns);
666 size_t size_after_gc = ule_packed_memsize(&ule);
667
668 LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
669 LE_STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
670 }
671
672 void *maybe_free = nullptr;
673 // old_keylen, same because the key isn't going to change for gc
674 int r =
675 le_pack(
676 &ule,
677 data_buffer,
678 idx,
679 keyp,
680 keylen,
681 keylen,
682 old_mem_size,
683 new_leaf_entry,
684 &maybe_free);
685 invariant_zero(r);
686 if (*new_leaf_entry) {
687 newnumbytes = ule_get_innermost_numbytes(&ule, keylen);
688 }
689 *numbytes_delta_p = newnumbytes - oldnumbytes;
690
691 ule_cleanup(&ule);
692 if (maybe_free != nullptr) {
693 toku_free(maybe_free);
694 }
695}
696
697////////////////////////////////////////////////////////////////////////////////
698// This layer of abstraction (msg_xxx)
699// knows the accessors of msg, but not of leafentry or unpacked leaf entry.
700// It makes calls into the lower layer (le_xxx) which handles leafentries.
701
702// Purpose is to init the ule with given key and no transaction records
703//
704static inline void msg_init_empty_ule(ULE ule) {
705 ule_init_empty_ule(ule);
706}
707
708// Purpose is to modify the unpacked leafentry in our private workspace.
709//
710// Returns -1, 0, or 1 that identifies the change in logical row count needed
711// based on the results of the message application. For example, if a delete
712// finds no logical leafentry or if an insert finds a duplicate and is
713// converted to an update.
714static int64_t msg_modify_ule(ULE ule, const ft_msg &msg) {
715 int64_t retval = 0;
716 XIDS xids = msg.xids();
717 invariant(toku_xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS);
718 enum ft_msg_type type = msg.type();
719 if (FT_LIKELY(type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE)) {
720 ule_do_implicit_promotions(ule, xids);
721 }
722 switch (type) {
723 case FT_INSERT_NO_OVERWRITE:
724 retval =
725 ule_apply_insert_no_overwrite(
726 ule,
727 xids,
728 msg.vdbt()->size,
729 msg.vdbt()->data);
730 break;
731 case FT_INSERT:
732 retval =
733 ule_apply_insert(
734 ule,
735 xids,
736 msg.vdbt()->size,
737 msg.vdbt()->data);
738 break;
739 case FT_DELETE_ANY:
740 retval = ule_apply_delete(ule, xids);
741 break;
742 case FT_ABORT_ANY:
743 case FT_ABORT_BROADCAST_TXN:
744 retval = ule_apply_abort(ule, xids);
745 break;
746 case FT_COMMIT_BROADCAST_ALL:
747 ule_apply_broadcast_commit_all(ule);
748 break;
749 case FT_COMMIT_ANY:
750 case FT_COMMIT_BROADCAST_TXN:
751 ule_apply_commit(ule, xids);
752 break;
753 case FT_OPTIMIZE:
754 case FT_OPTIMIZE_FOR_UPGRADE:
755 ule_optimize(ule, xids);
756 break;
757 case FT_UPDATE:
758 case FT_UPDATE_BROADCAST_ALL:
759 // These messages don't get this far. Instead they get translated (in
760 // setval_fun in do_update) into FT_INSERT messages.
761 assert(false);
762 break;
763 default:
764 // illegal ft msg type
765 assert(false);
766 break;
767 }
768 return retval;
769}
770
771void test_msg_modify_ule(ULE ule, const ft_msg &msg) {
772 msg_modify_ule(ule,msg);
773}
774
775static void ule_optimize(ULE ule, XIDS xids) {
776 if (ule->num_puxrs) {
777 // outermost uncommitted
778 TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid;
779 TXNID oldest_living_xid = TXNID_NONE;
780 uint32_t num_xids = toku_xids_get_num_xids(xids);
781 if (num_xids > 0) {
782 invariant(num_xids==1);
783 oldest_living_xid = toku_xids_get_xid(xids, 0);
784 }
785 if (oldest_living_xid == TXNID_NONE ||
786 uncommitted < oldest_living_xid) {
787 ule_promote_provisional_innermost_to_committed(ule);
788 }
789 }
790}
791
792////////////////////////////////////////////////////////////////////////////////
793// This layer of abstraction (le_xxx) understands the structure of the leafentry
794// and of the unpacked leafentry. It is the only layer that understands the
795// structure of leafentry. It has no knowledge of any other data structures.
796//
797
798//
799// required for every le_unpack that is done
800//
801void ule_cleanup(ULE ule) {
802 invariant(ule->uxrs);
803 if (ule->uxrs != ule->uxrs_static) {
804 toku_free(ule->uxrs);
805 ule->uxrs = NULL;
806 }
807}
808
809// populate an unpacked leafentry using pointers into the given leafentry.
810// thus, the memory referenced by 'le' must live as long as the ULE.
811void le_unpack(ULE ule, LEAFENTRY le) {
812 uint8_t type = le->type;
813 uint8_t *p;
814 uint32_t i;
815 switch (type) {
816 case LE_CLEAN: {
817 ule->uxrs = ule->uxrs_static; //Static version is always enough.
818 ule->num_cuxrs = 1;
819 ule->num_puxrs = 0;
820 UXR uxr = ule->uxrs;
821 uxr->type = XR_INSERT;
822 uxr->vallen = toku_dtoh32(le->u.clean.vallen);
823 uxr->valp = le->u.clean.val;
824 uxr->xid = TXNID_NONE;
825 //Set p to immediately after leafentry
826 p = le->u.clean.val + uxr->vallen;
827 break;
828 }
829 case LE_MVCC:
830 ule->num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
831 invariant(ule->num_cuxrs);
832 ule->num_puxrs = le->u.mvcc.num_pxrs;
833 //Dynamic memory
834 if (ule->num_cuxrs < MAX_TRANSACTION_RECORDS) {
835 ule->uxrs = ule->uxrs_static;
836 } else {
837 XMALLOC_N(
838 ule->num_cuxrs + 1 + MAX_TRANSACTION_RECORDS,
839 ule->uxrs);
840 }
841 p = le->u.mvcc.xrs;
842
843 //unpack interesting TXNIDs inner to outer.
844 if (ule->num_puxrs!=0) {
845 UXR outermost = ule->uxrs + ule->num_cuxrs;
846 p += uxr_unpack_txnid(outermost, p);
847 }
848 //unpack other TXNIDS (not for ule->uxrs[0])
849 ule->uxrs[0].xid = TXNID_NONE; //0 for super-root is implicit
850 for (i = 0; i < ule->num_cuxrs - 1; i++) {
851 p += uxr_unpack_txnid(ule->uxrs + ule->num_cuxrs - 1 - i, p);
852 }
853
854 //unpack interesting lengths inner to outer.
855 if (ule->num_puxrs!=0) {
856 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
857 p += uxr_unpack_length_and_bit(innermost, p);
858 }
859 for (i = 0; i < ule->num_cuxrs; i++) {
860 p +=
861 uxr_unpack_length_and_bit(
862 ule->uxrs + ule->num_cuxrs - 1 - i,
863 p);
864 }
865
866 //unpack interesting values inner to outer
867 if (ule->num_puxrs!=0) {
868 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
869 p += uxr_unpack_data(innermost, p);
870 }
871 for (i = 0; i < ule->num_cuxrs; i++) {
872 p += uxr_unpack_data(ule->uxrs + ule->num_cuxrs - 1 - i, p);
873 }
874
875 //unpack provisional xrs outer to inner
876 if (ule->num_puxrs > 1) {
877 {
878 //unpack length, bit, data for outermost uncommitted
879 UXR outermost = ule->uxrs + ule->num_cuxrs;
880 p += uxr_unpack_type_and_length(outermost, p);
881 p += uxr_unpack_data(outermost, p);
882 }
883 //unpack txnid, length, bit, data for non-outermost, non-innermost
884 for (i = ule->num_cuxrs + 1; i < ule->num_cuxrs + ule->num_puxrs - 1; i++) {
885 UXR uxr = ule->uxrs + i;
886 p += uxr_unpack_txnid(uxr, p);
887 p += uxr_unpack_type_and_length(uxr, p);
888 p += uxr_unpack_data(uxr, p);
889 }
890 {
891 //Just unpack txnid for innermost
892 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
893 p += uxr_unpack_txnid(innermost, p);
894 }
895 }
896 break;
897 default:
898 invariant(false);
899 }
900
901#if ULE_DEBUG
902 size_t memsize = le_memsize_from_ule(ule);
903 assert(p == ((uint8_t*)le) + memsize);
904#endif
905}
906
907static inline size_t uxr_pack_txnid(UXR uxr, uint8_t *p) {
908 *(TXNID*)p = toku_htod64(uxr->xid);
909 return sizeof(TXNID);
910}
911
912static inline size_t uxr_pack_type_and_length(UXR uxr, uint8_t *p) {
913 size_t rval = 1;
914 *p = uxr->type;
915 if (uxr_is_insert(uxr)) {
916 *(uint32_t*)(p+1) = toku_htod32(uxr->vallen);
917 rval += sizeof(uint32_t);
918 }
919 return rval;
920}
921
922static inline size_t uxr_pack_length_and_bit(UXR uxr, uint8_t *p) {
923 uint32_t length_and_bit;
924 if (uxr_is_insert(uxr)) {
925 length_and_bit = INSERT_LENGTH(uxr->vallen);
926 } else {
927 length_and_bit = DELETE_LENGTH(uxr->vallen);
928 }
929 *(uint32_t*)p = toku_htod32(length_and_bit);
930 return sizeof(uint32_t);
931}
932
933static inline size_t uxr_pack_data(UXR uxr, uint8_t *p) {
934 if (uxr_is_insert(uxr)) {
935 memcpy(p, uxr->valp, uxr->vallen);
936 return uxr->vallen;
937 }
938 return 0;
939}
940
941static inline size_t uxr_unpack_txnid(UXR uxr, uint8_t *p) {
942 uxr->xid = toku_dtoh64(*(TXNID*)p);
943 return sizeof(TXNID);
944}
945
946static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p) {
947 size_t rval = 1;
948 uxr->type = *p;
949 if (uxr_is_insert(uxr)) {
950 uxr->vallen = toku_dtoh32(*(uint32_t*)(p+1));
951 rval += sizeof(uint32_t);
952 }
953 return rval;
954}
955
956static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p) {
957 uint32_t length_and_bit = toku_dtoh32(*(uint32_t*)p);
958 if (IS_INSERT(length_and_bit)) {
959 uxr->type = XR_INSERT;
960 uxr->vallen = GET_LENGTH(length_and_bit);
961 } else {
962 uxr->type = XR_DELETE;
963 uxr->vallen = 0;
964 }
965 return sizeof(uint32_t);
966}
967
968static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p) {
969 if (uxr_is_insert(uxr)) {
970 uxr->valp = p;
971 return uxr->vallen;
972 }
973 return 0;
974}
975
976// executed too often to be worth making threadsafe
977static inline void update_le_status(ULE ule, size_t memsize) {
978 if (ule->num_cuxrs > LE_STATUS_VAL(LE_MAX_COMMITTED_XR))
979 LE_STATUS_VAL(LE_MAX_COMMITTED_XR) = ule->num_cuxrs;
980 if (ule->num_puxrs > LE_STATUS_VAL(LE_MAX_PROVISIONAL_XR))
981 LE_STATUS_VAL(LE_MAX_PROVISIONAL_XR) = ule->num_puxrs;
982 if (ule->num_cuxrs > MAX_TRANSACTION_RECORDS)
983 LE_STATUS_VAL(LE_EXPANDED)++;
984 if (memsize > LE_STATUS_VAL(LE_MAX_MEMSIZE))
985 LE_STATUS_VAL(LE_MAX_MEMSIZE) = memsize;
986}
987
988// Purpose is to return a newly allocated leaf entry in packed format, or
989// return null if leaf entry should be destroyed (if no transaction records
990// are for inserts).
991// Transaction records in packed le are stored inner to outer (first xr is
992// innermost), with some information extracted out of the transaction records
993// into the header.
994// Transaction records in ule are stored outer to inner (uxr[0] is outermost).
995// Takes 'ule' and creates 'new_leafentry_p
996int le_pack(
997 ULE ule,
998 bn_data* data_buffer,
999 uint32_t idx,
1000 void* keyp,
1001 uint32_t keylen,
1002 uint32_t old_keylen,
1003 uint32_t old_le_size,
1004 LEAFENTRY* const new_leafentry_p,
1005 void** const maybe_free) {
1006
1007 invariant(ule->num_cuxrs > 0);
1008 invariant(ule->uxrs[0].xid == TXNID_NONE);
1009 int rval;
1010 size_t memsize = 0;
1011 {
1012 // The unpacked leafentry may contain no inserts anywhere on its stack.
1013 // If so, then there IS no leafentry to pack, we should return NULL
1014 // So, first we check the stack to see if there is any insert. If not,
1015 // Then we can return NULL and exit the function, otherwise, we goto
1016 // found_insert, and proceed with packing the leafentry
1017 uint32_t i;
1018 for (i = 0; i < ule->num_cuxrs + ule->num_puxrs; i++) {
1019 if (uxr_is_insert(&ule->uxrs[i])) {
1020 goto found_insert;
1021 }
1022 }
1023 if (data_buffer && old_le_size > 0) {
1024 // must pass old_keylen and old_le_size, since that's what is
1025 // actually stored in data_buffer
1026 data_buffer->delete_leafentry(idx, old_keylen, old_le_size);
1027 }
1028 *new_leafentry_p = NULL;
1029 rval = 0;
1030 goto cleanup;
1031 }
1032found_insert:
1033 memsize = le_memsize_from_ule(ule);
1034 LEAFENTRY new_leafentry;
1035 get_space_for_le(
1036 data_buffer,
1037 idx,
1038 keyp,
1039 keylen,
1040 old_keylen,
1041 old_le_size,
1042 memsize,
1043 &new_leafentry,
1044 maybe_free);
1045
1046 //p always points to first unused byte after leafentry we are packing
1047 uint8_t *p;
1048 invariant(ule->num_cuxrs>0);
1049 //Type specific data
1050 if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
1051 //Pack a 'clean leafentry' (no uncommitted transactions, only one
1052 //committed value)
1053 new_leafentry->type = LE_CLEAN;
1054
1055 uint32_t vallen = ule->uxrs[0].vallen;
1056 //Store vallen
1057 new_leafentry->u.clean.vallen = toku_htod32(vallen);
1058
1059 //Store actual val
1060 memcpy(new_leafentry->u.clean.val, ule->uxrs[0].valp, vallen);
1061
1062 //Set p to after leafentry
1063 p = new_leafentry->u.clean.val + vallen;
1064 } else {
1065 uint32_t i;
1066 //Pack an 'mvcc leafentry'
1067 new_leafentry->type = LE_MVCC;
1068
1069 new_leafentry->u.mvcc.num_cxrs = toku_htod32(ule->num_cuxrs);
1070 // invariant makes cast that follows ok, although not sure if
1071 // check should be "< MAX_TRANSACTION_RECORDS" or
1072 // "< MAX_TRANSACTION_RECORDS - 1"
1073 invariant(ule->num_puxrs < MAX_TRANSACTION_RECORDS);
1074 new_leafentry->u.mvcc.num_pxrs = (uint8_t)ule->num_puxrs;
1075
1076 p = new_leafentry->u.mvcc.xrs;
1077
1078 //pack interesting TXNIDs inner to outer.
1079 if (ule->num_puxrs!=0) {
1080 UXR outermost = ule->uxrs + ule->num_cuxrs;
1081 p += uxr_pack_txnid(outermost, p);
1082 }
1083 //pack other TXNIDS (not for ule->uxrs[0])
1084 for (i = 0; i < ule->num_cuxrs - 1; i++) {
1085 p += uxr_pack_txnid(ule->uxrs + ule->num_cuxrs - 1 - i, p);
1086 }
1087
1088 //pack interesting lengths inner to outer.
1089 if (ule->num_puxrs!=0) {
1090 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
1091 p += uxr_pack_length_and_bit(innermost, p);
1092 }
1093 for (i = 0; i < ule->num_cuxrs; i++) {
1094 p += uxr_pack_length_and_bit(ule->uxrs + ule->num_cuxrs - 1 - i, p);
1095 }
1096
1097 //pack interesting values inner to outer
1098 if (ule->num_puxrs!=0) {
1099 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
1100 p += uxr_pack_data(innermost, p);
1101 }
1102 for (i = 0; i < ule->num_cuxrs; i++) {
1103 p += uxr_pack_data(ule->uxrs + ule->num_cuxrs - 1 - i, p);
1104 }
1105
1106 //pack provisional xrs outer to inner
1107 if (ule->num_puxrs > 1) {
1108 {
1109 //pack length, bit, data for outermost uncommitted
1110 UXR outermost = ule->uxrs + ule->num_cuxrs;
1111 p += uxr_pack_type_and_length(outermost, p);
1112 p += uxr_pack_data(outermost, p);
1113 }
1114 //pack txnid, length, bit, data for non-outermost, non-innermost
1115 for (i = ule->num_cuxrs + 1;
1116 i < ule->num_cuxrs + ule->num_puxrs - 1;
1117 i++) {
1118 UXR uxr = ule->uxrs + i;
1119 p += uxr_pack_txnid(uxr, p);
1120 p += uxr_pack_type_and_length(uxr, p);
1121 p += uxr_pack_data(uxr, p);
1122 }
1123 {
1124 //Just pack txnid for innermost
1125 UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
1126 p += uxr_pack_txnid(innermost, p);
1127 }
1128 }
1129 }
1130
1131 //p points to first unused byte after packed leafentry
1132
1133 size_t bytes_written;
1134 bytes_written = (size_t)p - (size_t)new_leafentry;
1135 invariant(bytes_written == memsize);
1136
1137#if ULE_DEBUG
1138 if (omt) { //Disable recursive debugging.
1139 size_t memsize_verify = leafentry_memsize(new_leafentry);
1140 invariant(memsize_verify == memsize);
1141
1142 ULE_S ule_tmp;
1143 le_unpack(&ule_tmp, new_leafentry);
1144
1145 memsize_verify = le_memsize_from_ule(&ule_tmp);
1146 invariant(memsize_verify == memsize);
1147 //Debugging code inside le_unpack will repack and verify it is the same.
1148
1149 LEAFENTRY le_copy;
1150
1151 int r_tmp = le_pack(&ule_tmp, &memsize_verify, &memsize_verify,
1152 &le_copy);
1153 invariant(r_tmp==0);
1154 invariant(memsize_verify == memsize);
1155
1156 invariant(memcmp(new_leafentry, le_copy, memsize)==0);
1157 toku_free(le_copy);
1158
1159 ule_cleanup(&ule_tmp);
1160 }
1161#endif
1162
1163 *new_leafentry_p = (LEAFENTRY)new_leafentry;
1164 rval = 0;
1165cleanup:
1166 update_le_status(ule, memsize);
1167 return rval;
1168}
1169
1170////////////////////////////////////////////////////////////////////////////////
1171// Following functions provide convenient access to a packed leafentry.
1172
1173//Requires:
1174// Leafentry that ule represents should not be destroyed (is not just all
1175// deletes)
1176size_t le_memsize_from_ule (ULE ule) {
1177 invariant(ule->num_cuxrs);
1178 size_t rval;
1179 if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
1180 UXR committed = ule->uxrs;
1181 invariant(uxr_is_insert(committed));
1182 rval = 1 //type
1183 +4 //vallen
1184 +committed->vallen; //actual val
1185 } else {
1186 rval = 1 //type
1187 +4 //num_cuxrs
1188 +1 //num_puxrs
1189 +4*(ule->num_cuxrs) //types+lengths for committed
1190 +8*(ule->num_cuxrs + ule->num_puxrs - 1); //txnids (excluding
1191 //superroot)
1192 uint32_t i;
1193 //Count data from committed uxrs and innermost puxr
1194 for (i = 0; i < ule->num_cuxrs; i++) {
1195 UXR uxr = &ule->uxrs[i];
1196 if (uxr_is_insert(uxr)) {
1197 rval += uxr->vallen; //actual val
1198 }
1199 }
1200 if (ule->num_puxrs) {
1201 UXR uxr = ule_get_innermost_uxr(ule);
1202 if (uxr_is_insert(uxr)) {
1203 rval += uxr->vallen; //actual val
1204 }
1205 rval += 4; //type+length for innermost puxr
1206 rval += 1*(ule->num_puxrs - 1); //type for remaining puxrs.
1207 //Count data and lengths from other puxrs
1208 for (i = 0; i < ule->num_puxrs-1; i++) {
1209 uxr = &ule->uxrs[i+ule->num_cuxrs];
1210 if (uxr_is_insert(uxr)) {
1211 rval += 4 + uxr->vallen; //length plus actual val
1212 }
1213 }
1214 }
1215 }
1216 return rval;
1217}
1218
1219// TODO: rename
1220size_t leafentry_rest_memsize(
1221 uint32_t num_puxrs,
1222 uint32_t num_cuxrs,
1223 uint8_t* start) {
1224
1225 UXR_S uxr;
1226 size_t lengths = 0;
1227 uint8_t* p = start;
1228
1229 //Skip TXNIDs
1230 if (num_puxrs!=0) {
1231 p += sizeof(TXNID);
1232 }
1233 p += (num_cuxrs-1)*sizeof(TXNID);
1234
1235 //Retrieve interesting lengths inner to outer.
1236 if (num_puxrs!=0) {
1237 p += uxr_unpack_length_and_bit(&uxr, p);
1238 if (uxr_is_insert(&uxr)) {
1239 lengths += uxr.vallen;
1240 }
1241 }
1242 uint32_t i;
1243 for (i = 0; i < num_cuxrs; i++) {
1244 p += uxr_unpack_length_and_bit(&uxr, p);
1245 if (uxr_is_insert(&uxr)) {
1246 lengths += uxr.vallen;
1247 }
1248 }
1249 //Skip all interesting 'data'
1250 p += lengths;
1251
1252 //unpack provisional xrs outer to inner
1253 if (num_puxrs > 1) {
1254 {
1255 p += uxr_unpack_type_and_length(&uxr, p);
1256 p += uxr_unpack_data(&uxr, p);
1257 }
1258 //unpack txnid, length, bit, data for non-outermost, non-innermost
1259 for (i = 0; i < num_puxrs - 2; i++) {
1260 p += uxr_unpack_txnid(&uxr, p);
1261 p += uxr_unpack_type_and_length(&uxr, p);
1262 p += uxr_unpack_data(&uxr, p);
1263 }
1264 {
1265 //Just unpack txnid for innermost
1266 p += uxr_unpack_txnid(&uxr, p);
1267 }
1268 }
1269 size_t rval = (size_t)p - (size_t)start;
1270 return rval;
1271}
1272
1273size_t leafentry_memsize (LEAFENTRY le) {
1274 size_t rval = 0;
1275
1276 uint8_t type = le->type;
1277
1278 uint8_t *p = NULL;
1279 switch (type) {
1280 case LE_CLEAN: {
1281 uint32_t vallen = toku_dtoh32(le->u.clean.vallen);
1282 rval = LE_CLEAN_MEMSIZE(vallen);
1283 break;
1284 }
1285 case LE_MVCC: {
1286 p = le->u.mvcc.xrs;
1287 uint32_t num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
1288 invariant(num_cuxrs);
1289 uint32_t num_puxrs = le->u.mvcc.num_pxrs;
1290 p += leafentry_rest_memsize(num_puxrs, num_cuxrs, p);
1291 rval = (size_t)p - (size_t)le;
1292 break;
1293 }
1294 default:
1295 invariant(false);
1296 }
1297#if ULE_DEBUG
1298 ULE_S ule;
1299 le_unpack(&ule, le);
1300 size_t slow_rval = le_memsize_from_ule(&ule);
1301 if (slow_rval!=rval) {
1302 int r = print_klpair(stderr, le, NULL, 0);
1303 fprintf(stderr, "\nSlow: [%" PRIu64 "] Fast: [%" PRIu64 "]\n", slow_rval, rval);
1304 invariant(r==0);
1305 }
1306 assert(slow_rval == rval);
1307 ule_cleanup(&ule);
1308#endif
1309 return rval;
1310}
1311
1312size_t leafentry_disksize (LEAFENTRY le) {
1313 return leafentry_memsize(le);
1314}
1315
1316bool le_is_clean(LEAFENTRY le) {
1317 uint8_t type = le->type;
1318 uint32_t rval;
1319 switch (type) {
1320 case LE_CLEAN:
1321 rval = true;
1322 break;
1323 case LE_MVCC:;
1324 rval = false;
1325 break;
1326 default:
1327 invariant(false);
1328 }
1329 return rval;
1330}
1331
1332int le_latest_is_del(LEAFENTRY le) {
1333 int rval;
1334 uint8_t type = le->type;
1335 uint8_t *p;
1336 switch (type) {
1337 case LE_CLEAN: {
1338 rval = 0;
1339 break;
1340 }
1341 case LE_MVCC: {
1342 UXR_S uxr;
1343 uint32_t num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
1344 invariant(num_cuxrs);
1345 uint32_t num_puxrs = le->u.mvcc.num_pxrs;
1346
1347 //Position p.
1348 p = le->u.mvcc.xrs;
1349
1350 //Skip TXNIDs
1351 if (num_puxrs!=0) {
1352 p += sizeof(TXNID);
1353 }
1354 p += (num_cuxrs-1)*sizeof(TXNID);
1355
1356 p += uxr_unpack_length_and_bit(&uxr, p);
1357 rval = uxr_is_delete(&uxr);
1358 break;
1359 }
1360 default:
1361 invariant(false);
1362 }
1363#if ULE_DEBUG
1364 ULE_S ule;
1365 le_unpack(&ule, le);
1366 UXR uxr = ule_get_innermost_uxr(&ule);
1367 int slow_rval = uxr_is_delete(uxr);
1368 assert((rval==0) == (slow_rval==0));
1369 ule_cleanup(&ule);
1370#endif
1371 return rval;
1372}
1373
1374
1375//
1376// returns true if the outermost provisional transaction id on the leafentry's
1377// stack matches the outermost transaction id in xids
1378// It is used to determine if a broadcast commit/abort message (look in ft-ops.c)
1379// should be applied to this leafentry
1380// If the outermost transactions match, then the broadcast commit/abort should
1381// be applied
1382//
1383bool le_has_xids(LEAFENTRY le, XIDS xids) {
1384 //Read num_uxrs
1385 uint32_t num_xids = toku_xids_get_num_xids(xids);
1386 invariant(num_xids > 0); //Disallow checking for having TXNID_NONE
1387 TXNID xid = toku_xids_get_xid(xids, 0);
1388 invariant(xid!=TXNID_NONE);
1389
1390 bool rval = (le_outermost_uncommitted_xid(le) == xid);
1391 return rval;
1392}
1393
1394void* le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
1395 uint8_t type = le->type;
1396 void *valp;
1397
1398 uint8_t *p;
1399 switch (type) {
1400 case LE_CLEAN:
1401 *len = toku_dtoh32(le->u.clean.vallen);
1402 valp = le->u.clean.val;
1403 break;
1404 case LE_MVCC:;
1405 UXR_S uxr;
1406 uint32_t num_cuxrs;
1407 num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
1408 invariant(num_cuxrs);
1409 uint32_t num_puxrs;
1410 num_puxrs = le->u.mvcc.num_pxrs;
1411
1412 //Position p.
1413 p = le->u.mvcc.xrs;
1414
1415 //Skip TXNIDs
1416 if (num_puxrs!=0) {
1417 p += sizeof(TXNID);
1418 }
1419 p += (num_cuxrs-1)*sizeof(TXNID);
1420
1421 p += uxr_unpack_length_and_bit(&uxr, p);
1422 if (uxr_is_insert(&uxr)) {
1423 *len = uxr.vallen;
1424 valp = p + (num_cuxrs - 1 + (num_puxrs!=0))*sizeof(uint32_t);
1425 } else {
1426 *len = 0;
1427 valp = NULL;
1428 }
1429 break;
1430 default:
1431 invariant(false);
1432 }
1433#if ULE_DEBUG
1434 ULE_S ule;
1435 le_unpack(&ule, le);
1436 UXR uxr = ule_get_innermost_uxr(&ule);
1437 void *slow_valp;
1438 uint32_t slow_len;
1439 if (uxr_is_insert(uxr)) {
1440 slow_valp = uxr->valp;
1441 slow_len = uxr->vallen;
1442 } else {
1443 slow_valp = NULL;
1444 slow_len = 0;
1445 }
1446 assert(slow_valp == le_latest_val(le));
1447 assert(slow_len == le_latest_vallen(le));
1448 assert(valp==slow_valp);
1449 assert(*len==slow_len);
1450 ule_cleanup(&ule);
1451#endif
1452 return valp;
1453}
1454
1455//DEBUG ONLY can be slow
1456void* le_latest_val (LEAFENTRY le) {
1457 ULE_S ule;
1458 le_unpack(&ule, le);
1459 UXR uxr = ule_get_innermost_uxr(&ule);
1460 void *slow_rval;
1461 if (uxr_is_insert(uxr))
1462 slow_rval = uxr->valp;
1463 else
1464 slow_rval = NULL;
1465 ule_cleanup(&ule);
1466 return slow_rval;
1467}
1468
1469//needed to be fast for statistics.
1470uint32_t le_latest_vallen (LEAFENTRY le) {
1471 uint32_t rval;
1472 uint8_t type = le->type;
1473 uint8_t *p;
1474 switch (type) {
1475 case LE_CLEAN:
1476 rval = toku_dtoh32(le->u.clean.vallen);
1477 break;
1478 case LE_MVCC:;
1479 UXR_S uxr;
1480 uint32_t num_cuxrs;
1481 num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
1482 invariant(num_cuxrs);
1483 uint32_t num_puxrs;
1484 num_puxrs = le->u.mvcc.num_pxrs;
1485
1486 //Position p.
1487 p = le->u.mvcc.xrs;
1488
1489 //Skip TXNIDs
1490 if (num_puxrs!=0) {
1491 p += sizeof(TXNID);
1492 }
1493 p += (num_cuxrs-1)*sizeof(TXNID);
1494
1495 uxr_unpack_length_and_bit(&uxr, p);
1496 if (uxr_is_insert(&uxr)) {
1497 rval = uxr.vallen;
1498 } else {
1499 rval = 0;
1500 }
1501 break;
1502 default:
1503 invariant(false);
1504 }
1505#if ULE_DEBUG
1506 ULE_S ule;
1507 le_unpack(&ule, le);
1508 UXR uxr = ule_get_innermost_uxr(&ule);
1509 uint32_t slow_rval;
1510 if (uxr_is_insert(uxr))
1511 slow_rval = uxr->vallen;
1512 else
1513 slow_rval = 0;
1514 ule_cleanup(&ule);
1515 invariant(slow_rval == rval);
1516#endif
1517 return rval;
1518}
1519
1520uint64_t le_outermost_uncommitted_xid (LEAFENTRY le) {
1521 uint64_t rval = TXNID_NONE;
1522 uint8_t type = le->type;
1523
1524 uint8_t *p;
1525 switch (type) {
1526 case LE_CLEAN:
1527 break;
1528 case LE_MVCC:;
1529 UXR_S uxr;
1530 uint32_t num_puxrs = le->u.mvcc.num_pxrs;
1531
1532 if (num_puxrs) {
1533 p = le->u.mvcc.xrs;
1534 uxr_unpack_txnid(&uxr, p);
1535 rval = uxr.xid;
1536 }
1537 break;
1538 }
1539#if ULE_DEBUG
1540 ULE_S ule;
1541 le_unpack(&ule, le);
1542 TXNID slow_rval = 0;
1543 if (ule.num_puxrs > 0)
1544 slow_rval = ule.uxrs[ule.num_cuxrs].xid;
1545 assert(rval==slow_rval);
1546 ule_cleanup(&ule);
1547#endif
1548 return rval;
1549}
1550
1551
1552//Optimization not required. This is a debug only function.
1553//Print a leafentry out in human-readable format
1554int print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) {
1555 ULE_S ule;
1556 le_unpack(&ule, le);
1557 uint32_t i;
1558 invariant(ule.num_cuxrs > 0);
1559 UXR uxr;
1560 if (!le) { printf("NULL"); return 0; }
1561 if (keyp) {
1562 fprintf(outf, "{key=");
1563 toku_print_BYTESTRING(outf, keylen, (char *) keyp);
1564 }
1565 for (i = 0; i < ule.num_cuxrs+ule.num_puxrs; i++) {
1566 // fprintf(outf, "\n%*s", i+1, " "); //Nested indenting
1567 uxr = &ule.uxrs[i];
1568 char prov = i < ule.num_cuxrs ? 'c' : 'p';
1569 fprintf(outf, " ");
1570 if (uxr_is_placeholder(uxr))
1571 fprintf(outf, "P: xid=%016" PRIx64, uxr->xid);
1572 else if (uxr_is_delete(uxr))
1573 fprintf(outf, "%cD: xid=%016" PRIx64, prov, uxr->xid);
1574 else {
1575 assert(uxr_is_insert(uxr));
1576 fprintf(outf, "%cI: xid=%016" PRIx64 " val=", prov, uxr->xid);
1577 toku_print_BYTESTRING(outf, uxr->vallen, (char *) uxr->valp);
1578 }
1579 }
1580 fprintf(outf, "}");
1581 ule_cleanup(&ule);
1582 return 0;
1583}
1584
1585////////////////////////////////////////////////////////////////////////////////
1586// This layer of abstraction (ule_xxx) knows the structure of the unpacked
1587// leafentry and no other structure.
1588//
1589
1590// ule constructor
1591// Note that transaction 0 is explicit in the ule
1592static inline void ule_init_empty_ule(ULE ule) {
1593 ule->num_cuxrs = 1;
1594 ule->num_puxrs = 0;
1595 ule->uxrs = ule->uxrs_static;
1596 ule->uxrs[0] = committed_delete;
1597}
1598
1599static inline int32_t min_i32(int32_t a, int32_t b) {
1600 int32_t rval = a < b ? a : b;
1601 return rval;
1602}
1603
1604///////////////////
1605// Implicit promotion logic:
1606//
1607// If the leafentry has already been promoted, there is nothing to do.
1608// We have two transaction stacks (one from message, one from leaf entry).
1609// We want to implicitly promote transactions newer than (but not including)
1610// the innermost common ancestor (ICA) of the two stacks of transaction ids. We
1611// know that this is the right thing to do because each transaction with an id
1612// greater (later) than the ICA must have been either committed or aborted.
1613// If it was aborted then we would have seen an abort message and removed the
1614// xid from the stack of transaction records. So any transaction still on the
1615// leaf entry stack must have been successfully promoted.
1616//
1617// After finding the ICA, promote transaction later than the ICA by copying
1618// value and type from innermost transaction record of leafentry to transaction
1619// record of ICA, keeping the transaction id of the ICA.
1620// Outermost xid is zero for both ule and xids<>
1621//
1622static void ule_do_implicit_promotions(ULE ule, XIDS xids) {
1623 //Optimization for (most) common case.
1624 //No commits necessary if everything is already committed.
1625 if (ule->num_puxrs > 0) {
1626 int num_xids = toku_xids_get_num_xids(xids);
1627 invariant(num_xids>0);
1628 uint32_t max_index = ule->num_cuxrs + min_i32(ule->num_puxrs, num_xids) - 1;
1629 uint32_t ica_index = max_index;
1630 uint32_t index;
1631 for (index = ule->num_cuxrs; index <= max_index; index++) {
1632 TXNID current_msg_xid = toku_xids_get_xid(xids, index - ule->num_cuxrs);
1633 TXNID current_ule_xid = ule_get_xid(ule, index);
1634 if (current_msg_xid != current_ule_xid) {
1635 //ica is innermost transaction with matching xids.
1636 ica_index = index - 1;
1637 break;
1638 }
1639 }
1640
1641 if (ica_index < ule->num_cuxrs) {
1642 invariant(ica_index == ule->num_cuxrs - 1);
1643 ule_promote_provisional_innermost_to_committed(ule);
1644 } else if (ica_index < ule->num_cuxrs + ule->num_puxrs - 1) {
1645 //If ica is the innermost uxr in the leafentry, no commits are
1646 //necessary.
1647 ule_promote_provisional_innermost_to_index(ule, ica_index);
1648 }
1649
1650 }
1651}
1652
1653static void ule_promote_provisional_innermost_to_committed(ULE ule) {
1654 //Must be something to promote.
1655 invariant(ule->num_puxrs);
1656 //Take value (or delete flag) from innermost.
1657 //Take TXNID from outermost uncommitted txn
1658 //"Delete" provisional stack
1659 //add one UXR that is committed using saved TXNID,val/delete flag
1660
1661 UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
1662 assert(!uxr_is_placeholder(old_innermost_uxr));
1663
1664 UXR old_outermost_uncommitted_uxr = &ule->uxrs[ule->num_cuxrs];
1665
1666 ule->num_puxrs = 0; //Discard all provisional uxrs.
1667 if (uxr_is_delete(old_innermost_uxr)) {
1668 ule_push_delete_uxr(ule, true, old_outermost_uncommitted_uxr->xid);
1669 } else {
1670 ule_push_insert_uxr(ule, true,
1671 old_outermost_uncommitted_uxr->xid,
1672 old_innermost_uxr->vallen,
1673 old_innermost_uxr->valp);
1674 }
1675}
1676
1677static void ule_try_promote_provisional_outermost(
1678 ULE ule,
1679 TXNID oldest_possible_live_xid) {
1680// Effect: If there is a provisional record whose outermost xid is older than
1681// the oldest known referenced_xid, promote it to committed.
1682 if (ule->num_puxrs > 0 &&
1683 ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) {
1684 ule_promote_provisional_innermost_to_committed(ule);
1685 }
1686}
1687
1688// Purpose is to promote the value (and type) of the innermost transaction
1689// record to the uxr at the specified index (keeping the txnid of the uxr at
1690// specified index.)
1691static void ule_promote_provisional_innermost_to_index(
1692 ULE ule,
1693 uint32_t index) {
1694 //Must not promote to committed portion of stack.
1695 invariant(index >= ule->num_cuxrs);
1696 //Must actually be promoting.
1697 invariant(index < ule->num_cuxrs + ule->num_puxrs - 1);
1698 UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
1699 assert(!uxr_is_placeholder(old_innermost_uxr));
1700 TXNID new_innermost_xid = ule->uxrs[index].xid;
1701 //Discard old uxr at index (and everything inner)
1702 ule->num_puxrs = index - ule->num_cuxrs;
1703 if (uxr_is_delete(old_innermost_uxr)) {
1704 ule_push_delete_uxr(ule, false, new_innermost_xid);
1705 } else {
1706 ule_push_insert_uxr(
1707 ule,
1708 false,
1709 new_innermost_xid,
1710 old_innermost_uxr->vallen,
1711 old_innermost_uxr->valp);
1712 }
1713}
1714
1715///////////////////
1716// All ule_apply_xxx operations are done after implicit promotions,
1717// so the innermost transaction record in the leafentry is the ICA.
1718//
1719
1720
1721// Purpose is to apply an insert message to this leafentry:
1722static inline int64_t ule_apply_insert_no_overwrite(
1723 ULE ule,
1724 XIDS xids,
1725 uint32_t vallen,
1726 void* valp) {
1727
1728 invariant(IS_VALID_LEN(vallen));
1729 int64_t retval = 0;
1730 UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
1731 // If something exists, don't overwrite
1732 if (uxr_is_insert(old_innermost_uxr)) {
1733 retval = -1;
1734 return retval;
1735 }
1736 ule_prepare_for_new_uxr(ule, xids);
1737 // xid of transaction doing this insert
1738 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1739 ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp);
1740 return retval;
1741}
1742
1743// Purpose is to apply an insert message to this leafentry:
1744static inline int64_t ule_apply_insert(
1745 ULE ule,
1746 XIDS xids,
1747 uint32_t vallen,
1748 void* valp) {
1749
1750 invariant(IS_VALID_LEN(vallen));
1751 int64_t retval = 0;
1752 UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
1753 // If something exists, overwrite
1754 if (uxr_is_insert(old_innermost_uxr)) {
1755 retval = -1;
1756 }
1757 ule_prepare_for_new_uxr(ule, xids);
1758 // xid of transaction doing this insert
1759 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1760 ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp);
1761 return retval;
1762}
1763
1764// Purpose is to apply a delete message to this leafentry:
1765static inline int64_t ule_apply_delete(ULE ule, XIDS xids) {
1766 int64_t retval = 0;
1767 UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
1768 if (FT_UNLIKELY(uxr_is_delete(old_innermost_uxr))) {
1769 retval = 1;
1770 }
1771 ule_prepare_for_new_uxr(ule, xids);
1772 // xid of transaction doing this delete
1773 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1774 ule_push_delete_uxr(ule, this_xid == TXNID_NONE, this_xid);
1775 return retval;
1776}
1777
1778// First, discard anything done earlier by this transaction.
1779// Then, add placeholders if necessary. This transaction may be nested within
1780// outer transactions that are newer than then newest (innermost) transaction in
1781// the leafentry. If so, record those outer transactions in the leafentry
1782// with placeholders.
1783static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids) {
1784 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1785 //This is for LOADER_USE_PUTS or transactionless environment
1786 //where messages use XIDS of 0
1787 if (this_xid == TXNID_NONE && ule_get_innermost_xid(ule) == TXNID_NONE) {
1788 ule_remove_innermost_uxr(ule);
1789 } else if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) {
1790 // case where we are transactional and xids stack matches ule stack
1791 ule_remove_innermost_uxr(ule);
1792 } else {
1793 // case where we are transactional and xids stack does not match ule
1794 // stack
1795 ule_add_placeholders(ule, xids);
1796 }
1797}
1798
1799// Purpose is to apply an abort message to this leafentry.
1800// If the aborted transaction (the transaction whose xid is the innermost xid
1801// in the id stack passed in the message), has not modified this leafentry,
1802// then there is nothing to be done.
1803// If this transaction did modify the leafentry, then undo whatever it did (by
1804// removing the transaction record (uxr) and any placeholders underneath.
1805// Remember, the innermost uxr can only be an insert or a delete, not a
1806// placeholder.
1807static inline int64_t ule_apply_abort(ULE ule, XIDS xids) {
1808 int64_t retval = 0;
1809 // xid of transaction doing this abort
1810 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1811 invariant(this_xid!=TXNID_NONE);
1812 UXR innermost = ule_get_innermost_uxr(ule);
1813 // need to check for provisional entries in ule, otherwise
1814 // there is nothing to abort, not checking this may result
1815 // in a bug where the most recently committed has same xid
1816 // as the XID's innermost
1817 if (ule->num_puxrs > 0 && innermost->xid == this_xid) {
1818 // if this is a rollback of a delete of a new ule, return 0
1819 // (i.e. double delete)
1820 if (uxr_is_delete(innermost)) {
1821 if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 &&
1822 uxr_is_delete(&(ule->uxrs[0]))) {
1823 retval = 0;
1824 } else {
1825 retval = 1;
1826 }
1827 } else if (uxr_is_insert(innermost)) {
1828 if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 &&
1829 uxr_is_insert(&(ule->uxrs[0]))) {
1830 retval = 0;
1831 } else {
1832 retval = -1;
1833 }
1834 }
1835 // if this is a rollback of a insert of an exising ule, return 0
1836 // (i.e. double insert)
1837 invariant(ule->num_puxrs>0);
1838 ule_remove_innermost_uxr(ule);
1839 ule_remove_innermost_placeholders(ule);
1840 }
1841 invariant(ule->num_cuxrs > 0);
1842 return retval;
1843}
1844
1845static void ule_apply_broadcast_commit_all (ULE ule) {
1846 ule->uxrs[0] = ule->uxrs[ule->num_puxrs + ule->num_cuxrs - 1];
1847 ule->uxrs[0].xid = TXNID_NONE;
1848 ule->num_puxrs = 0;
1849 ule->num_cuxrs = 1;
1850}
1851
1852// Purpose is to apply a commit message to this leafentry.
1853// If the committed transaction (the transaction whose xid is the innermost xid
1854// in the id stack passed in the message), has not modified this leafentry,
1855// then there is nothing to be done.
1856// Also, if there are no uncommitted transaction records there is nothing to do.
1857// If this transaction did modify the leafentry, then promote whatever it did.
1858// Remember, the innermost uxr can only be an insert or a delete, not a
1859// placeholder.
1860void ule_apply_commit(ULE ule, XIDS xids) {
1861 // xid of transaction committing
1862 TXNID this_xid = toku_xids_get_innermost_xid(xids);
1863 invariant(this_xid!=TXNID_NONE);
1864 // need to check for provisional entries in ule, otherwise
1865 // there is nothing to abort, not checking this may result
1866 // in a bug where the most recently committed has same xid
1867 // as the XID's innermost
1868 if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) {
1869 // 3 cases:
1870 //1- it's already a committed value (do nothing) (num_puxrs==0)
1871 //2- it's provisional but root level (make a new committed value
1872 // (num_puxrs==1)
1873 //3- it's provisional and not root (promote); (num_puxrs>1)
1874 if (ule->num_puxrs == 1) { //new committed value
1875 ule_promote_provisional_innermost_to_committed(ule);
1876 } else if (ule->num_puxrs > 1) {
1877 //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-1] is the innermost
1878 // (this transaction)
1879 //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-2] is the 2nd innermost
1880 //We want to promote the innermost uxr one level out.
1881 ule_promote_provisional_innermost_to_index(
1882 ule,
1883 ule->num_cuxrs+ule->num_puxrs-2);
1884 }
1885 }
1886}
1887
1888///////////////////
1889// Helper functions called from the functions above:
1890//
1891
1892// Purpose is to record an insert for this transaction (and set type correctly).
1893static inline void ule_push_insert_uxr(
1894 ULE ule,
1895 bool is_committed, TXNID xid,
1896 uint32_t vallen,
1897 void* valp) {
1898
1899 UXR uxr = ule_get_first_empty_uxr(ule);
1900 if (is_committed) {
1901 invariant(ule->num_puxrs==0);
1902 ule->num_cuxrs++;
1903 } else {
1904 ule->num_puxrs++;
1905 }
1906 uxr->xid = xid;
1907 uxr->vallen = vallen;
1908 uxr->valp = valp;
1909 uxr->type = XR_INSERT;
1910}
1911
1912// Purpose is to record a delete for this transaction. If this transaction
1913// is the root transaction, then truly delete the leafentry by marking the
1914// ule as empty.
1915static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid) {
1916 UXR uxr = ule_get_first_empty_uxr(ule);
1917 if (is_committed) {
1918 invariant(ule->num_puxrs==0);
1919 ule->num_cuxrs++;
1920 } else {
1921 ule->num_puxrs++;
1922 }
1923 uxr->xid = xid;
1924 uxr->type = XR_DELETE;
1925}
1926
1927// Purpose is to push a placeholder on the top of the leafentry's transaction
1928// stack.
1929static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid) {
1930 invariant(ule->num_cuxrs>0);
1931 UXR uxr = ule_get_first_empty_uxr(ule);
1932 uxr->xid = xid;
1933 uxr->type = XR_PLACEHOLDER;
1934 ule->num_puxrs++;
1935}
1936
1937// Return innermost transaction record.
1938static inline UXR ule_get_innermost_uxr(ULE ule) {
1939 invariant(ule->num_cuxrs > 0);
1940 UXR rval = &(ule->uxrs[ule->num_cuxrs + ule->num_puxrs - 1]);
1941 return rval;
1942}
1943
1944// Return first empty transaction record
1945static inline UXR ule_get_first_empty_uxr(ULE ule) {
1946 invariant(ule->num_puxrs < MAX_TRANSACTION_RECORDS-1);
1947 UXR rval = &(ule->uxrs[ule->num_cuxrs+ule->num_puxrs]);
1948 return rval;
1949}
1950
1951// Remove the innermost transaction (pop the leafentry's stack), undoing
1952// whatever the innermost transaction did.
1953static inline void ule_remove_innermost_uxr(ULE ule) {
1954 //It is possible to remove the committed delete at first insert.
1955 invariant(ule->num_cuxrs > 0);
1956 if (ule->num_puxrs) {
1957 ule->num_puxrs--;
1958 } else {
1959 //This is for LOADER_USE_PUTS or transactionless environment
1960 //where messages use XIDS of 0
1961 invariant(ule->num_cuxrs == 1);
1962 invariant(ule_get_innermost_xid(ule)==TXNID_NONE);
1963 ule->num_cuxrs--;
1964 }
1965}
1966
1967static inline TXNID ule_get_innermost_xid(ULE ule) {
1968 TXNID rval = ule_get_xid(ule, ule->num_cuxrs + ule->num_puxrs - 1);
1969 return rval;
1970}
1971
1972static inline TXNID ule_get_xid(ULE ule, uint32_t index) {
1973 invariant(index < ule->num_cuxrs + ule->num_puxrs);
1974 TXNID rval = ule->uxrs[index].xid;
1975 return rval;
1976}
1977
1978// Purpose is to remove any placeholders from the top of the leaf stack (the
1979// innermost recorded transactions), if necessary. This function is idempotent.
1980// It makes no logical sense for a placeholder to be the innermost recorded
1981// transaction record, so placeholders at the top of the stack are not legal.
1982static void ule_remove_innermost_placeholders(ULE ule) {
1983 UXR uxr = ule_get_innermost_uxr(ule);
1984 while (uxr_is_placeholder(uxr)) {
1985 invariant(ule->num_puxrs>0);
1986 ule_remove_innermost_uxr(ule);
1987 uxr = ule_get_innermost_uxr(ule);
1988 }
1989}
1990
1991// Purpose is to add placeholders to the top of the leaf stack (the innermost
1992// recorded transactions), if necessary. This function is idempotent.
1993// Note, after placeholders are added, an insert or delete will be added. This
1994// function temporarily leaves the transaction stack in an illegal state (having
1995// placeholders on top).
1996static void ule_add_placeholders(ULE ule, XIDS xids) {
1997 //Placeholders can be placed on top of the committed uxr.
1998 invariant(ule->num_cuxrs > 0);
1999
2000 uint32_t num_xids = toku_xids_get_num_xids(xids);
2001 // we assume that implicit promotion has happened
2002 // when we get this call, so the number of xids MUST
2003 // be greater than the number of provisional entries
2004 invariant(num_xids >= ule->num_puxrs);
2005 // make sure that the xids stack matches up to a certain amount
2006 // this first for loop is just debug code
2007 for (uint32_t i = 0; i < ule->num_puxrs; i++) {
2008 TXNID current_msg_xid = toku_xids_get_xid(xids, i);
2009 TXNID current_ule_xid = ule_get_xid(ule, i + ule->num_cuxrs);
2010 invariant(current_msg_xid == current_ule_xid);
2011 }
2012 for (uint32_t i = ule->num_puxrs; i < num_xids-1; i++) {
2013 TXNID current_msg_xid = toku_xids_get_xid(xids, i);
2014 ule_push_placeholder_uxr(ule, current_msg_xid);
2015 }
2016}
2017
2018uint64_t ule_num_uxrs(ULE ule) {
2019 return ule->num_cuxrs + ule->num_puxrs;
2020}
2021
2022UXR ule_get_uxr(ULE ule, uint64_t ith) {
2023 invariant(ith < ule_num_uxrs(ule));
2024 return &ule->uxrs[ith];
2025}
2026
2027uint32_t ule_get_num_committed(ULE ule) {
2028 return ule->num_cuxrs;
2029}
2030
2031uint32_t ule_get_num_provisional(ULE ule) {
2032 return ule->num_puxrs;
2033}
2034
2035int ule_is_committed(ULE ule, uint64_t ith) {
2036 invariant(ith < ule_num_uxrs(ule));
2037 return ith < ule->num_cuxrs;
2038}
2039
2040int ule_is_provisional(ULE ule, uint64_t ith) {
2041 invariant(ith < ule_num_uxrs(ule));
2042 return ith >= ule->num_cuxrs;
2043}
2044
2045// return size of data for innermost uxr, the size of val
2046uint32_t ule_get_innermost_numbytes(ULE ule, uint32_t keylen) {
2047 uint32_t rval;
2048 UXR uxr = ule_get_innermost_uxr(ule);
2049 if (uxr_is_delete(uxr)) {
2050 rval = 0;
2051 } else {
2052 rval = uxr_get_vallen(uxr) + keylen;
2053 }
2054 return rval;
2055}
2056
2057
2058/////////////////////////////////////////////////////////////////////////////////
2059// This layer of abstraction (uxr_xxx) understands uxr and nothing else.
2060//
2061
2062static inline bool uxr_type_is_insert(uint8_t type) {
2063 bool rval = (bool)(type == XR_INSERT);
2064 return rval;
2065}
2066
2067bool uxr_is_insert(UXR uxr) {
2068 return uxr_type_is_insert(uxr->type);
2069}
2070
2071static inline bool uxr_type_is_delete(uint8_t type) {
2072 bool rval = (bool)(type == XR_DELETE);
2073 return rval;
2074}
2075
2076bool uxr_is_delete(UXR uxr) {
2077 return uxr_type_is_delete(uxr->type);
2078}
2079
2080static inline bool uxr_type_is_placeholder(uint8_t type) {
2081 bool rval = (bool)(type == XR_PLACEHOLDER);
2082 return rval;
2083}
2084
2085bool uxr_is_placeholder(UXR uxr) {
2086 return uxr_type_is_placeholder(uxr->type);
2087}
2088
2089void* uxr_get_val(UXR uxr) {
2090 return uxr->valp;
2091}
2092
2093uint32_t uxr_get_vallen(UXR uxr) {
2094 return uxr->vallen;
2095}
2096
2097
2098TXNID uxr_get_txnid(UXR uxr) {
2099 return uxr->xid;
2100}
2101
2102static int le_iterate_get_accepted_index(
2103 TXNID* xids,
2104 uint32_t* index,
2105 uint32_t num_xids,
2106 LE_ITERATE_CALLBACK f,
2107 TOKUTXN context,
2108 bool top_is_provisional) {
2109
2110 uint32_t i;
2111 int r = 0;
2112 // if this for loop does not return anything, we return num_xids-1, which
2113 // should map to T_0
2114 for (i = 0; i < num_xids - 1; i++) {
2115 TXNID xid = toku_dtoh64(xids[i]);
2116 r = f(xid, context, (i == 0 && top_is_provisional));
2117 if (r==TOKUDB_ACCEPT) {
2118 r = 0;
2119 break; //or goto something
2120 } else if (r!=0) {
2121 break;
2122 }
2123 }
2124 *index = i;
2125 return r;
2126}
2127
2128#if ULE_DEBUG
2129static void ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) {
2130 int has_p = (ule->num_puxrs != 0);
2131 invariant(ule->num_cuxrs + has_p == interesting);
2132 uint32_t i;
2133 for (i = 0; i < interesting - 1; i++) {
2134 TXNID xid = toku_dtoh64(xids[i]);
2135 invariant(ule->uxrs[ule->num_cuxrs - 1 + has_p - i].xid == xid);
2136 }
2137}
2138#endif
2139
2140//
2141// Iterates over "possible" TXNIDs in a leafentry's stack, until one is
2142// accepted by 'f'. If the value associated with the accepted TXNID is not an
2143// insert, then set *is_emptyp to true, otherwise false
2144// The "possible" TXNIDs are:
2145// If provisionals exist, then the first possible TXNID is the outermost
2146// provisional.
2147// The next possible TXNIDs are the committed TXNIDs, from most recently
2148// committed to T_0.
2149// If provisionals exist, and the outermost provisional is accepted by 'f',
2150// the associated value checked is the innermost provisional's value.
2151// Parameters:
2152// le - leafentry to iterate over
2153// f - callback function that checks if a TXNID in le is accepted, and its
2154// associated value should be examined.
2155// is_delp - output parameter that returns answer
2156// context - parameter for f
2157//
2158static int le_iterate_is_del(
2159 LEAFENTRY le,
2160 LE_ITERATE_CALLBACK f,
2161 bool* is_delp,
2162 TOKUTXN context) {
2163
2164#if ULE_DEBUG
2165 ULE_S ule;
2166 le_unpack(&ule, le);
2167#endif
2168
2169 uint8_t type = le->type;
2170 int r;
2171 bool is_del = false;
2172 switch (type) {
2173 case LE_CLEAN: {
2174 r = 0;
2175#if ULE_DEBUG
2176 invariant(ule.num_cuxrs == 1);
2177 invariant(ule.num_puxrs == 0);
2178 invariant(uxr_is_insert(ule.uxrs));
2179#endif
2180 break;
2181 }
2182 case LE_MVCC:;
2183 uint32_t num_cuxrs;
2184 num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
2185 uint32_t num_puxrs;
2186 num_puxrs = le->u.mvcc.num_pxrs;
2187 uint8_t *p;
2188 p = le->u.mvcc.xrs;
2189
2190 uint32_t index;
2191 uint32_t num_interesting;
2192 num_interesting = num_cuxrs + (num_puxrs != 0);
2193 TXNID *xids;
2194 xids = (TXNID*)p;
2195#if ULE_DEBUG
2196 ule_verify_xids(&ule, num_interesting, xids);
2197#endif
2198 r =
2199 le_iterate_get_accepted_index(
2200 xids,
2201 &index,
2202 num_interesting,
2203 f,
2204 context,
2205 (num_puxrs != 0));
2206 if (r != 0) {
2207 goto cleanup;
2208 }
2209 invariant(index < num_interesting);
2210
2211 //Skip TXNIDs
2212 p += (num_interesting - 1)*sizeof(TXNID);
2213
2214 uint32_t *length_and_bits;
2215 length_and_bits = (uint32_t*)p;
2216 uint32_t my_length_and_bit;
2217 my_length_and_bit = toku_dtoh32(length_and_bits[index]);
2218 is_del = !IS_INSERT(my_length_and_bit);
2219#if ULE_DEBUG
2220 {
2221 uint32_t has_p = (ule.num_puxrs != 0);
2222 uint32_t ule_index = (index==0) ?
2223 ule.num_cuxrs + ule.num_puxrs - 1 :
2224 ule.num_cuxrs - 1 + has_p - index;
2225 UXR uxr = ule.uxrs + ule_index;
2226 invariant(uxr_is_delete(uxr) == is_del);
2227 }
2228#endif
2229 break;
2230 default:
2231 invariant(false);
2232 }
2233cleanup:
2234#if ULE_DEBUG
2235 ule_cleanup(&ule);
2236#endif
2237 if (!r) *is_delp = is_del;
2238 return r;
2239}
2240
2241static int le_iterate_read_committed_callback(
2242 TXNID txnid,
2243 TOKUTXN txn,
2244 bool is_provisional UU()) {
2245
2246 if (is_provisional) {
2247 return toku_txn_reads_txnid(txnid, txn, is_provisional);
2248 }
2249 return TOKUDB_ACCEPT;
2250}
2251
2252//
2253// Returns true if the value that is to be read is empty.
2254//
2255int le_val_is_del(LEAFENTRY le, enum cursor_read_type read_type, TOKUTXN txn) {
2256 int rval;
2257 if (read_type == C_READ_SNAPSHOT || read_type == C_READ_COMMITTED) {
2258 LE_ITERATE_CALLBACK f = (read_type == C_READ_SNAPSHOT) ?
2259 toku_txn_reads_txnid :
2260 le_iterate_read_committed_callback;
2261 bool is_del = false;
2262 le_iterate_is_del(
2263 le,
2264 f,
2265 &is_del,
2266 txn
2267 );
2268 rval = is_del;
2269 } else if (read_type == C_READ_ANY) {
2270 rval = le_latest_is_del(le);
2271 } else {
2272 invariant(false);
2273 }
2274 return rval;
2275}
2276
2277//
2278// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted
2279// by 'f'. Set valpp and vallenp to value and length associated with accepted
2280// TXNID
2281// The "possible" TXNIDs are:
2282// If provisionals exist, then the first possible TXNID is the outermost
2283// provisional.
2284// The next possible TXNIDs are the committed TXNIDs, from most recently
2285// committed to T_0.
2286// If provisionals exist, and the outermost provisional is accepted by 'f',
2287// the associated length value is the innermost provisional's length and value.
2288// Parameters:
2289// le - leafentry to iterate over
2290// f - callback function that checks if a TXNID in le is accepted, and its
2291// associated value should be examined.
2292// valpp - output parameter that returns pointer to value
2293// vallenp - output parameter that returns length of value
2294// context - parameter for f
2295//
2296int le_iterate_val(
2297 LEAFENTRY le,
2298 LE_ITERATE_CALLBACK f,
2299 void** valpp,
2300 uint32_t* vallenp,
2301 TOKUTXN context) {
2302
2303#if ULE_DEBUG
2304 ULE_S ule;
2305 le_unpack(&ule, le);
2306#endif
2307
2308 uint8_t type = le->type;
2309 int r;
2310 uint32_t vallen = 0;
2311 void *valp = NULL;
2312 switch (type) {
2313 case LE_CLEAN: {
2314 vallen = toku_dtoh32(le->u.clean.vallen);
2315 valp = le->u.clean.val;
2316 r = 0;
2317#if ULE_DEBUG
2318 invariant(ule.num_cuxrs == 1);
2319 invariant(ule.num_puxrs == 0);
2320 invariant(uxr_is_insert(ule.uxrs));
2321 invariant(ule.uxrs[0].vallen == vallen);
2322 invariant(ule.uxrs[0].valp == valp);
2323#endif
2324 break;
2325 }
2326 case LE_MVCC:;
2327 uint32_t num_cuxrs;
2328 num_cuxrs = toku_dtoh32(le->u.mvcc.num_cxrs);
2329 uint32_t num_puxrs;
2330 num_puxrs = le->u.mvcc.num_pxrs;
2331 uint8_t *p;
2332 p = le->u.mvcc.xrs;
2333
2334 uint32_t index;
2335 uint32_t num_interesting;
2336 num_interesting = num_cuxrs + (num_puxrs != 0);
2337 TXNID *xids;
2338 xids = (TXNID*)p;
2339#if ULE_DEBUG
2340 ule_verify_xids(&ule, num_interesting, xids);
2341#endif
2342 r =
2343 le_iterate_get_accepted_index(
2344 xids,
2345 &index,
2346 num_interesting,
2347 f,
2348 context,
2349 (num_puxrs != 0));
2350 if (r != 0) {
2351 goto cleanup;
2352 }
2353 invariant(index < num_interesting);
2354
2355 //Skip TXNIDs
2356 p += (num_interesting - 1)*sizeof(TXNID);
2357
2358 UXR_S temp;
2359 size_t offset;
2360 offset = 0;
2361
2362 uint32_t *length_and_bits;
2363 length_and_bits = (uint32_t*)p;
2364 uint32_t i;
2365 //evaluate the offset
2366 for (i=0; i < index; i++){
2367 uxr_unpack_length_and_bit(&temp, (uint8_t*)&length_and_bits[i]);
2368 offset += temp.vallen;
2369 }
2370 uxr_unpack_length_and_bit(&temp, (uint8_t*)&length_and_bits[index]);
2371 if (uxr_is_delete(&temp)) {
2372 goto verify_is_empty;
2373 }
2374 vallen = temp.vallen;
2375
2376 // move p past the length and bits, now points to beginning of data
2377 p += num_interesting*sizeof(uint32_t);
2378 // move p to point to the data we care about
2379 p += offset;
2380 valp = p;
2381
2382#if ULE_DEBUG
2383 {
2384 uint32_t has_p = (ule.num_puxrs != 0);
2385 uint32_t ule_index = (index==0) ?
2386 ule.num_cuxrs + ule.num_puxrs - 1 :
2387 ule.num_cuxrs - 1 + has_p - index;
2388 UXR uxr = ule.uxrs + ule_index;
2389 invariant(uxr_is_insert(uxr));
2390 invariant(uxr->vallen == vallen);
2391 invariant(uxr->valp == valp);
2392 }
2393#endif
2394 if (0) {
2395verify_is_empty:;
2396#if ULE_DEBUG
2397 uint32_t has_p = (ule.num_puxrs != 0);
2398 UXR uxr = ule.uxrs + ule.num_cuxrs - 1 + has_p - index;
2399 invariant(uxr_is_delete(uxr));
2400#endif
2401 }
2402 break;
2403 default:
2404 invariant(false);
2405 }
2406cleanup:
2407#if ULE_DEBUG
2408 ule_cleanup(&ule);
2409#endif
2410 if (!r) {
2411 *valpp = valp;
2412 *vallenp = vallen;
2413 }
2414 return r;
2415}
2416
2417void le_extract_val(
2418 LEAFENTRY le,
2419 // should we return the entire leafentry as the val?
2420 bool is_leaf_mode,
2421 enum cursor_read_type read_type,
2422 TOKUTXN ttxn,
2423 uint32_t* vallen,
2424 void** val) {
2425
2426 if (is_leaf_mode) {
2427 *val = le;
2428 *vallen = leafentry_memsize(le);
2429 } else if (read_type == C_READ_SNAPSHOT || read_type == C_READ_COMMITTED) {
2430 LE_ITERATE_CALLBACK f = (read_type == C_READ_SNAPSHOT) ?
2431 toku_txn_reads_txnid :
2432 le_iterate_read_committed_callback;
2433 int r = le_iterate_val(le, f, val, vallen, ttxn);
2434 lazy_assert_zero(r);
2435 } else if (read_type == C_READ_ANY){
2436 *val = le_latest_val_and_len(le, vallen);
2437 } else {
2438 assert(false);
2439 }
2440}
2441
2442// This is an on-disk format. static_asserts verify everything is packed and aligned correctly.
2443struct __attribute__ ((__packed__)) leafentry_13 {
2444 struct leafentry_committed_13 {
2445 uint8_t key_val[0]; //Actual key, then actual val
2446 };
2447 static_assert(0 == sizeof(leafentry_committed_13), "wrong size");
2448 static_assert(0 == __builtin_offsetof(leafentry_committed_13, key_val), "wrong offset");
2449 struct __attribute__ ((__packed__)) leafentry_provisional_13 {
2450 uint8_t innermost_type;
2451 TXNID xid_outermost_uncommitted;
2452 uint8_t key_val_xrs[0]; //Actual key,
2453 //then actual innermost inserted val,
2454 //then transaction records.
2455 };
2456 static_assert(9 == sizeof(leafentry_provisional_13), "wrong size");
2457 static_assert(9 == __builtin_offsetof(leafentry_provisional_13, key_val_xrs), "wrong offset");
2458
2459 uint8_t num_xrs;
2460 uint32_t keylen;
2461 uint32_t innermost_inserted_vallen;
2462 union __attribute__ ((__packed__)) {
2463 struct leafentry_committed_13 comm;
2464 struct leafentry_provisional_13 prov;
2465 } u;
2466};
2467static_assert(18 == sizeof(leafentry_13), "wrong size");
2468static_assert(9 == __builtin_offsetof(leafentry_13, u), "wrong offset");
2469
2470//Requires:
2471// Leafentry that ule represents should not be destroyed (is not just all
2472// deletes)
2473static size_t le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) {
2474 uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs;
2475 assert(num_uxrs);
2476 size_t rval;
2477 if (num_uxrs == 1) {
2478 assert(uxr_is_insert(&ule->uxrs[0]));
2479 rval = 1 //num_uxrs
2480 +4 //keylen
2481 +4 //vallen
2482 +le->keylen //actual key
2483 +ule->uxrs[0].vallen; //actual val
2484 } else {
2485 rval = 1 //num_uxrs
2486 +4 //keylen
2487 +le->keylen //actual key
2488 +1*num_uxrs //types
2489 +8*(num_uxrs-1); //txnids
2490 uint8_t i;
2491 for (i = 0; i < num_uxrs; i++) {
2492 UXR uxr = &ule->uxrs[i];
2493 if (uxr_is_insert(uxr)) {
2494 rval += 4; //vallen
2495 rval += uxr->vallen; //actual val
2496 }
2497 }
2498 }
2499 return rval;
2500}
2501
2502// This function is mostly copied from 4.1.1 (which is version 12, same as 13
2503// except that only 13 is upgradable).
2504// Note, number of transaction records in version 13 has been replaced by
2505// separate counters in version 14 (MVCC), one counter for committed transaction
2506// records and one counter for provisional transaction records. When upgrading
2507// a version 13 le to version 14, the number of committed transaction records is
2508// always set to one (1) and the number of provisional transaction records is
2509// set to the original number of transaction records minus one. The bottom
2510// transaction record is assumed to be a committed value. (If there is no
2511// committed value then the bottom transaction record of version 13 is a
2512// committed delete.)
2513// This is the only change from the 4.1.1 code. The rest of the leafentry is
2514// read as is.
2515static void le_unpack_13(ULE ule, LEAFENTRY_13 le) {
2516 //Read num_uxrs
2517 uint8_t num_xrs = le->num_xrs;
2518 assert(num_xrs > 0);
2519 ule->uxrs = ule->uxrs_static; //Static version is always enough.
2520 ule->num_cuxrs = 1;
2521 ule->num_puxrs = num_xrs - 1;
2522
2523 //Read the keylen
2524 uint32_t keylen = toku_dtoh32(le->keylen);
2525
2526 //Read the vallen of innermost insert
2527 uint32_t vallen_of_innermost_insert = toku_dtoh32(le->innermost_inserted_vallen);
2528
2529 uint8_t *p;
2530 if (num_xrs == 1) {
2531 //Unpack a 'committed leafentry' (No uncommitted transactions exist)
2532 //Must be or the leafentry would not exist
2533 ule->uxrs[0].type = XR_INSERT;
2534 ule->uxrs[0].vallen = vallen_of_innermost_insert;
2535 ule->uxrs[0].valp = &le->u.comm.key_val[keylen];
2536 ule->uxrs[0].xid = 0; //Required.
2537
2538 //Set p to immediately after leafentry
2539 p = &le->u.comm.key_val[keylen + vallen_of_innermost_insert];
2540 } else {
2541 //Unpack a 'provisional leafentry' (Uncommitted transactions exist)
2542
2543 //Read in type.
2544 uint8_t innermost_type = le->u.prov.innermost_type;
2545 assert(!uxr_type_is_placeholder(innermost_type));
2546
2547 //Read in xid
2548 TXNID xid_outermost_uncommitted = toku_dtoh64(le->u.prov.xid_outermost_uncommitted);
2549
2550 //Read pointer to innermost inserted val (immediately after key)
2551 uint8_t *valp_of_innermost_insert = &le->u.prov.key_val_xrs[keylen];
2552
2553 //Point p to immediately after 'header'
2554 p = &le->u.prov.key_val_xrs[keylen + vallen_of_innermost_insert];
2555
2556 bool found_innermost_insert = false;
2557 int i; //Index in ULE.uxrs[]
2558 //Loop inner to outer
2559 for (i = num_xrs - 1; i >= 0; i--) {
2560 UXR uxr = &ule->uxrs[i];
2561
2562 //Innermost's type is in header.
2563 if (i < num_xrs - 1) {
2564 //Not innermost, so load the type.
2565 uxr->type = *p;
2566 p += 1;
2567 } else {
2568 //Innermost, load the type previously read from header
2569 uxr->type = innermost_type;
2570 }
2571
2572 //Committed txn id is implicit (0). (i==0)
2573 //Outermost uncommitted txnid is stored in header. (i==1)
2574 if (i > 1) {
2575 //Not committed nor outermost uncommitted, so load the xid.
2576 uxr->xid = toku_dtoh64(*(TXNID*)p);
2577 p += 8;
2578 } else if (i == 1) {
2579 //Outermost uncommitted, load the xid previously read from
2580 //header
2581 uxr->xid = xid_outermost_uncommitted;
2582 } else {
2583 // i == 0, committed entry
2584 uxr->xid = 0;
2585 }
2586
2587 if (uxr_is_insert(uxr)) {
2588 if (found_innermost_insert) {
2589 //Not the innermost insert. Load vallen/valp
2590 uxr->vallen = toku_dtoh32(*(uint32_t*)p);
2591 p += 4;
2592
2593 uxr->valp = p;
2594 p += uxr->vallen;
2595 } else {
2596 //Innermost insert, load the vallen/valp previously read
2597 //from header
2598 uxr->vallen = vallen_of_innermost_insert;
2599 uxr->valp = valp_of_innermost_insert;
2600 found_innermost_insert = true;
2601 }
2602 }
2603 }
2604 assert(found_innermost_insert);
2605 }
2606#if ULE_DEBUG
2607 size_t memsize = le_memsize_from_ule_13(ule);
2608 assert(p == ((uint8_t*)le) + memsize);
2609#endif
2610}
2611
2612size_t leafentry_disksize_13(LEAFENTRY_13 le) {
2613 ULE_S ule;
2614 le_unpack_13(&ule, le);
2615 size_t memsize = le_memsize_from_ule_13(&ule, le);
2616 ule_cleanup(&ule);
2617 return memsize;
2618}
2619
2620int toku_le_upgrade_13_14(
2621 LEAFENTRY_13 old_leafentry,
2622 void** keyp,
2623 uint32_t* keylen,
2624 size_t* new_leafentry_memorysize,
2625 LEAFENTRY* new_leafentry_p) {
2626
2627 ULE_S ule;
2628 int rval;
2629 invariant(old_leafentry);
2630 le_unpack_13(&ule, old_leafentry);
2631 // get the key
2632 *keylen = old_leafentry->keylen;
2633 if (old_leafentry->num_xrs == 1) {
2634 *keyp = old_leafentry->u.comm.key_val;
2635 } else {
2636 *keyp = old_leafentry->u.prov.key_val_xrs;
2637 }
2638 // We used to pass NULL for omt and mempool, so that we would use
2639 // malloc instead of a mempool. However after supporting upgrade,
2640 // we need to use mempools and the OMT.
2641 rval =
2642 le_pack(
2643 &ule, // create packed leafentry
2644 nullptr,
2645 0, //only matters if we are passing in a bn_data
2646 nullptr, //only matters if we are passing in a bn_data
2647 0, //only matters if we are passing in a bn_data
2648 0, //only matters if we are passing in a bn_data
2649 0, //only matters if we are passing in a bn_data
2650 new_leafentry_p,
2651 nullptr); //only matters if we are passing in a bn_data
2652 ule_cleanup(&ule);
2653 *new_leafentry_memorysize = leafentry_memsize(*new_leafentry_p);
2654 return rval;
2655}
2656
2657#include <toku_race_tools.h>
2658void __attribute__((__constructor__)) toku_ule_helgrind_ignore(void);
2659void
2660toku_ule_helgrind_ignore(void) {
2661 TOKU_VALGRIND_HG_DISABLE_CHECKING(&le_status, sizeof le_status);
2662}
2663