1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <toku_portability.h>
40#include <toku_assert.h>
41
42#include <stdio.h>
43#include <string.h>
44
45#include <ft/le-cursor.h>
46#include <ft/ft-ops.h>
47#include <ft/leafentry.h>
48#include <ft/ule.h>
49#include <ft/txn/txn_manager.h>
50#include <ft/txn/xids.h>
51#include <ft/cachetable/checkpoint.h>
52
53#include "ydb-internal.h"
54#include "ydb_row_lock.h"
55#include "indexer.h"
56#include "indexer-internal.h"
57
58// initialize the commit keys
59static void
60indexer_commit_keys_init(struct indexer_commit_keys *keys) {
61 keys->max_keys = keys->current_keys = 0;
62 keys->keys = NULL;
63}
64
65// destroy the commit keys
66static void
67indexer_commit_keys_destroy(struct indexer_commit_keys *keys) {
68 for (int i = 0; i < keys->max_keys; i++)
69 toku_destroy_dbt(&keys->keys[i]);
70 toku_free(keys->keys);
71}
72
73// return the number of keys in the ordered set
74static int
75indexer_commit_keys_valid(struct indexer_commit_keys *keys) {
76 return keys->current_keys;
77}
78
79// add a key to the commit keys
80static void
81indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) {
82 if (keys->current_keys >= keys->max_keys) {
83 int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2;
84 keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT));
85 for (int i = keys->current_keys; i < new_max_keys; i++)
86 toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC);
87 keys->max_keys = new_max_keys;
88 }
89 DBT *key = &keys->keys[keys->current_keys];
90 toku_dbt_set(length, ptr, key, NULL);
91 keys->current_keys++;
92}
93
94// set the ordered set to empty
95static void
96indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) {
97 keys->current_keys = 0;
98}
99
100// internal functions
101static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
102static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
103
104static bool indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex);
105
106static int indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info* prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals);
107static int indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn);
108static int indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
109static int indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn);
110static int indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
111static int indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
112static void indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn);
113
114
115// initialize undo globals located in the indexer private object
116void
117indexer_undo_do_init(DB_INDEXER *indexer) {
118 indexer_commit_keys_init(&indexer->i->commit_keys);
119 XMALLOC_N(indexer->i->N, indexer->i->hot_keys);
120 XMALLOC_N(indexer->i->N, indexer->i->hot_vals);
121 for (int which = 0; which < indexer->i->N; which++) {
122 toku_dbt_array_init(&indexer->i->hot_keys[which], 1);
123 toku_dbt_array_init(&indexer->i->hot_vals[which], 1);
124 }
125}
126
127// destroy the undo globals
128void
129indexer_undo_do_destroy(DB_INDEXER *indexer) {
130 indexer_commit_keys_destroy(&indexer->i->commit_keys);
131 if (indexer->i->hot_keys) {
132 invariant(indexer->i->hot_vals);
133 for (int which = 0; which < indexer->i->N; which++) {
134 toku_dbt_array_destroy(&indexer->i->hot_keys[which]);
135 toku_dbt_array_destroy(&indexer->i->hot_vals[which]);
136 }
137 toku_free(indexer->i->hot_keys);
138 toku_free(indexer->i->hot_vals);
139 }
140}
141
142static int
143indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
144 int result = 0;
145 ULEHANDLE ule = prov_info->ule;
146
147 // init the xids to the root xid
148 XIDS xids = toku_xids_get_root_xids();
149
150 // scan the committed stack from bottom to top
151 uint32_t num_committed = ule_get_num_committed(ule);
152 for (uint64_t xrindex = 0; xrindex < num_committed; xrindex++) {
153
154 indexer_commit_keys_set_empty(&indexer->i->commit_keys);
155
156 // get the transaction record
157 UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
158
159 // setup up the xids
160 TXNID this_xid = uxr_get_txnid(uxr);
161 result = indexer_set_xid(indexer, this_xid, &xids);
162 if (result != 0)
163 break;
164
165 // placeholders in the committed stack are not allowed
166 invariant(!uxr_is_placeholder(uxr));
167
168 // undo
169 if (xrindex > 0) {
170 uint64_t prev_xrindex = xrindex - 1;
171 UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
172 if (uxr_is_delete(prevuxr)) {
173 ; // do nothing
174 } else if (uxr_is_insert(prevuxr)) {
175 // generate the hot delete key
176 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL);
177 if (result == 0) {
178 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
179 for (uint32_t i = 0; i < hot_keys->size; i++) {
180 DBT *hotkey = &hot_keys->dbts[i];
181
182 // send the delete message
183 result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids);
184 if (result == 0) {
185 indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
186 }
187 }
188 }
189 } else {
190 assert(0);
191 }
192 }
193 if (result != 0) {
194 break;
195 }
196
197 // do
198 if (uxr_is_delete(uxr)) {
199 ; // do nothing
200 } else if (uxr_is_insert(uxr)) {
201 // generate the hot insert key and val
202 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals);
203 if (result == 0) {
204 paranoid_invariant(hot_keys->size == hot_vals->size);
205 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
206 paranoid_invariant(hot_vals->size <= hot_vals->capacity);
207 for (uint32_t i = 0; i < hot_keys->size; i++) {
208 DBT *hotkey = &hot_keys->dbts[i];
209 DBT *hotval = &hot_vals->dbts[i];
210
211 // send the insert message
212 result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids);
213 if (result == 0) {
214 indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
215 }
216 }
217 }
218 } else
219 assert(0);
220
221 // send commit messages if needed
222 for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++)
223 result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
224
225 if (result != 0)
226 break;
227 }
228
229 toku_xids_destroy(&xids);
230
231 return result;
232}
233
234static void release_txns(
235 ULEHANDLE ule,
236 TOKUTXN_STATE* prov_states,
237 TOKUTXN* prov_txns,
238 DB_INDEXER *indexer
239 )
240{
241 uint32_t num_provisional = ule_get_num_provisional(ule);
242 if (indexer->i->test_xid_state) {
243 goto exit;
244 }
245 for (uint32_t i = 0; i < num_provisional; i++) {
246 if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
247 toku_txn_unpin_live_txn(prov_txns[i]);
248 }
249 }
250exit:
251 return;
252}
253
254static int
255indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
256 int result = 0;
257 indexer_commit_keys_set_empty(&indexer->i->commit_keys);
258 ULEHANDLE ule = prov_info->ule;
259
260 // init the xids to the root xid
261 XIDS xids = toku_xids_get_root_xids();
262
263 uint32_t num_provisional = prov_info->num_provisional;
264 uint32_t num_committed = prov_info->num_committed;
265 TXNID *prov_ids = prov_info->prov_ids;
266 TOKUTXN *prov_txns = prov_info->prov_txns;
267 TOKUTXN_STATE *prov_states = prov_info->prov_states;
268
269 // nothing to do if there's nothing provisional
270 if (num_provisional == 0) {
271 goto exit;
272 }
273
274 TXNID outermost_xid_state;
275 outermost_xid_state = prov_states[0];
276
277 // scan the provisional stack from the outermost to the innermost transaction record
278 TOKUTXN curr_txn;
279 curr_txn = NULL;
280 for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) {
281
282 // get the ith transaction record
283 UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
284
285 TXNID this_xid = uxr_get_txnid(uxr);
286 TOKUTXN_STATE this_xid_state = prov_states[xrindex - num_committed];
287
288 if (this_xid_state == TOKUTXN_ABORTING) {
289 break; // nothing to do once we reach a transaction that is aborting
290 }
291
292 if (xrindex == num_committed) { // if this is the outermost xr
293 result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list
294 curr_txn = prov_txns[xrindex - num_committed];
295 } else {
296 switch (this_xid_state) {
297 case TOKUTXN_LIVE:
298 result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
299 curr_txn = prov_txns[xrindex - num_committed];
300 if (!indexer->i->test_xid_state) {
301 assert(curr_txn);
302 }
303 break;
304 case TOKUTXN_PREPARING:
305 assert(0); // not allowed
306 case TOKUTXN_COMMITTING:
307 case TOKUTXN_ABORTING:
308 case TOKUTXN_RETIRED:
309 break; // nothing to do
310 }
311 }
312 if (result != 0)
313 break;
314
315 if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
316 // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works.
317 assert(this_xid_state == TOKUTXN_RETIRED);
318 }
319
320 if (uxr_is_placeholder(uxr)) {
321 continue; // skip placeholders
322 }
323 // undo
324 uint64_t prev_xrindex;
325 bool prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex);
326 if (prev_xrindex_found) {
327 UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
328 if (uxr_is_delete(prevuxr)) {
329 ; // do nothing
330 } else if (uxr_is_insert(prevuxr)) {
331 // generate the hot delete key
332 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL);
333 if (result == 0) {
334 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
335 for (uint32_t i = 0; i < hot_keys->size; i++) {
336 DBT *hotkey = &hot_keys->dbts[i];
337
338 // send the delete message
339 switch (outermost_xid_state) {
340 case TOKUTXN_LIVE:
341 case TOKUTXN_PREPARING:
342 invariant(this_xid_state != TOKUTXN_ABORTING);
343 invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
344 result = indexer_ft_delete_provisional(indexer, hotdb, hotkey, xids, curr_txn);
345 if (result == 0) {
346 indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], curr_txn);
347 }
348 break;
349 case TOKUTXN_COMMITTING:
350 case TOKUTXN_RETIRED:
351 result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids);
352 if (result == 0)
353 indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
354 break;
355 case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
356 assert(0);
357 }
358 }
359 }
360 } else
361 assert(0);
362 }
363 if (result != 0)
364 break;
365
366 // do
367 if (uxr_is_delete(uxr)) {
368 ; // do nothing
369 } else if (uxr_is_insert(uxr)) {
370 // generate the hot insert key and val
371 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals);
372 if (result == 0) {
373 paranoid_invariant(hot_keys->size == hot_vals->size);
374 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
375 paranoid_invariant(hot_vals->size <= hot_vals->capacity);
376 for (uint32_t i = 0; i < hot_keys->size; i++) {
377 DBT *hotkey = &hot_keys->dbts[i];
378 DBT *hotval = &hot_vals->dbts[i];
379
380 // send the insert message
381 switch (outermost_xid_state) {
382 case TOKUTXN_LIVE:
383 case TOKUTXN_PREPARING:
384 assert(this_xid_state != TOKUTXN_ABORTING);
385 invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
386 result = indexer_ft_insert_provisional(indexer, hotdb, hotkey, hotval, xids, curr_txn);
387 if (result == 0) {
388 indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], prov_txns[0]);
389 }
390 break;
391 case TOKUTXN_COMMITTING:
392 case TOKUTXN_RETIRED:
393 result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids);
394 // no need to do this because we do implicit commits on inserts
395 if (0 && result == 0)
396 indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
397 break;
398 case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
399 assert(0);
400 }
401 }
402 }
403 } else
404 assert(0);
405
406 if (result != 0)
407 break;
408 }
409
410 // send commits if the outermost provisional transaction is committed
411 for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) {
412 result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
413 }
414
415 // be careful with this in the future. Right now, only exit path
416 // is BEFORE we call fill_prov_info, so this happens before exit
417 // If in the future we add a way to exit after fill_prov_info,
418 // then this will need to be handled below exit
419 release_txns(ule, prov_states, prov_txns, indexer);
420exit:
421 toku_xids_destroy(&xids);
422 return result;
423}
424
425int
426indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
427 int result = indexer_undo_do_committed(indexer, hotdb, prov_info, hot_keys, hot_vals);
428 if (result == 0) {
429 result = indexer_undo_do_provisional(indexer, hotdb, prov_info, hot_keys, hot_vals);
430 }
431 if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) {
432 result = EINVAL;
433 }
434
435 return result;
436}
437
438// set xids_result = [root_xid, this_xid]
439// Note that this could be sped up by adding a new xids constructor that constructs the stack with
440// exactly one xid.
441static int
442indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
443 int result = 0;
444 XIDS old_xids = *xids_result;
445 XIDS new_xids = toku_xids_get_root_xids();
446 if (this_xid != TXNID_NONE) {
447 XIDS child_xids;
448 result = toku_xids_create_child(new_xids, &child_xids, this_xid);
449 toku_xids_destroy(&new_xids);
450 if (result == 0)
451 new_xids = child_xids;
452 }
453 if (result == 0) {
454 toku_xids_destroy(&old_xids);
455 *xids_result = new_xids;
456 }
457
458 return result;
459}
460
461// append xid to xids_result
462static int
463indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, XIDS *xids_result) {
464 XIDS old_xids = *xids_result;
465 XIDS new_xids;
466 int result = toku_xids_create_child(old_xids, &new_xids, xid);
467 if (result == 0) {
468 toku_xids_destroy(&old_xids);
469 *xids_result = new_xids;
470 }
471 return result;
472}
473
474static int
475indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals) {
476 int result = 0;
477
478 // setup the source key
479 DBT srckey;
480 toku_fill_dbt(&srckey, prov_info->key, prov_info->keylen);
481
482 // setup the source val
483 DBT srcval;
484 toku_fill_dbt(&srcval, uxr_get_val(uxr), uxr_get_vallen(uxr));
485
486 // generate the secondary row
487 DB_ENV *env = indexer->i->env;
488 if (hotvals) {
489 result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkeys, hotvals, &srckey, &srcval);
490 }
491 else {
492 result = env->i->generate_row_for_del(hotdb, indexer->i->src_db, hotkeys, &srckey, &srcval);
493 }
494 toku_destroy_dbt(&srckey);
495 toku_destroy_dbt(&srcval);
496
497 return result;
498}
499
500// Take a write lock on the given key for the outermost xid in the xids list.
501static void
502indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn) {
503 // TEST
504 if (indexer->i->test_lock_key) {
505 indexer->i->test_lock_key(indexer, outermost_live_xid, hotdb, key);
506 } else {
507 toku_db_grab_write_lock(hotdb, key, txn);
508 }
509}
510
511// find the index of a non-placeholder transaction record that is previous to the transaction record
512// found at xrindex. return true if one is found and return its index in prev_xrindex. otherwise,
513// return false.
514static bool
515indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) {
516 assert(xrindex < ule_num_uxrs(ule));
517 bool prev_found = false;
518 while (xrindex > 0) {
519 xrindex -= 1;
520 UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
521 if (!uxr_is_placeholder(uxr)) {
522 *prev_xrindex = xrindex;
523 prev_found = true;
524 break;
525 }
526 }
527 return prev_found;
528}
529
530// inject "delete" message into ft with logging in recovery and rollback logs,
531// and making association between txn and ft
532static int
533indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn) {
534 int result = 0;
535 // TEST
536 if (indexer->i->test_delete_provisional) {
537 result = indexer->i->test_delete_provisional(indexer, hotdb, hotkey, xids);
538 } else {
539 result = toku_ydb_check_avail_fs_space(indexer->i->env);
540 if (result == 0) {
541 assert(txn != NULL);
542 // Not sure if this is really necessary, as
543 // the hot index DB should have to be checkpointed
544 // upon commit of the hot index transaction, but
545 // it is safe to do this
546 // this question apples to delete_committed, insert_provisional
547 // and insert_committed
548 toku_ft_maybe_delete (hotdb->i->ft_handle, hotkey, txn, false, ZERO_LSN, true);
549 }
550 }
551 return result;
552}
553
554// send a delete message into the tree without rollback or recovery logging
555static int
556indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
557 int result = 0;
558 // TEST
559 if (indexer->i->test_delete_committed) {
560 result = indexer->i->test_delete_committed(indexer, hotdb, hotkey, xids);
561 } else {
562 result = toku_ydb_check_avail_fs_space(indexer->i->env);
563 if (result == 0) {
564 FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
565 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
566 txn_manager_state txn_state_for_gc(txn_manager);
567
568 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
569 txn_gc_info gc_info(&txn_state_for_gc,
570 oldest_referenced_xid_estimate,
571 oldest_referenced_xid_estimate,
572 true);
573 toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
574 toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, -1);
575 }
576 }
577 return result;
578}
579
580// inject "insert" message into ft with logging in recovery and rollback logs,
581// and making association between txn and ft
582static int
583indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn) {
584 int result = 0;
585 // TEST
586 if (indexer->i->test_insert_provisional) {
587 result = indexer->i->test_insert_provisional(indexer, hotdb, hotkey, hotval, xids);
588 } else {
589 result = toku_ydb_check_avail_fs_space(indexer->i->env);
590 if (result == 0) {
591 assert(txn != NULL);
592 // comment/question in indexer_ft_delete_provisional applies
593 toku_ft_maybe_insert (hotdb->i->ft_handle, hotkey, hotval, txn, false, ZERO_LSN, true, FT_INSERT);
594 }
595 }
596 return result;
597}
598
599// send an insert message into the tree without rollback or recovery logging
600// and without associating the txn and the ft
601static int
602indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) {
603 int result = 0;
604 // TEST
605 if (indexer->i->test_insert_committed) {
606 result = indexer->i->test_insert_committed(indexer, hotdb, hotkey, hotval, xids);
607 } else {
608 result = toku_ydb_check_avail_fs_space(indexer->i->env);
609 if (result == 0) {
610 FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
611 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
612 txn_manager_state txn_state_for_gc(txn_manager);
613
614 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
615 txn_gc_info gc_info(&txn_state_for_gc,
616 oldest_referenced_xid_estimate,
617 oldest_referenced_xid_estimate,
618 true);
619 toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, &gc_info);
620 toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, 1);
621 }
622 }
623 return result;
624}
625
626// send a commit message into the tree
627// Note: If the xid is zero, then the leafentry will already have a committed transaction
628// record and no commit message is needed. (A commit message with xid of zero is
629// illegal anyway.)
630static int
631indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
632 int result = 0;
633 if (toku_xids_get_num_xids(xids) > 0) {// send commit only when not the root xid
634 // TEST
635 if (indexer->i->test_commit_any) {
636 result = indexer->i->test_commit_any(indexer, hotdb, hotkey, xids);
637 } else {
638 result = toku_ydb_check_avail_fs_space(indexer->i->env);
639 if (result == 0) {
640 FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
641 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
642 txn_manager_state txn_state_for_gc(txn_manager);
643
644 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
645 txn_gc_info gc_info(&txn_state_for_gc,
646 oldest_referenced_xid_estimate,
647 oldest_referenced_xid_estimate,
648 true);
649 toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
650 }
651 }
652 }
653 return result;
654}
655