| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <toku_portability.h> |
| 40 | #include <toku_assert.h> |
| 41 | |
| 42 | #include <stdio.h> |
| 43 | #include <string.h> |
| 44 | |
| 45 | #include <ft/le-cursor.h> |
| 46 | #include <ft/ft-ops.h> |
| 47 | #include <ft/leafentry.h> |
| 48 | #include <ft/ule.h> |
| 49 | #include <ft/txn/txn_manager.h> |
| 50 | #include <ft/txn/xids.h> |
| 51 | #include <ft/cachetable/checkpoint.h> |
| 52 | |
| 53 | #include "ydb-internal.h" |
| 54 | #include "ydb_row_lock.h" |
| 55 | #include "indexer.h" |
| 56 | #include "indexer-internal.h" |
| 57 | |
| 58 | // initialize the commit keys |
| 59 | static void |
| 60 | indexer_commit_keys_init(struct indexer_commit_keys *keys) { |
| 61 | keys->max_keys = keys->current_keys = 0; |
| 62 | keys->keys = NULL; |
| 63 | } |
| 64 | |
| 65 | // destroy the commit keys |
| 66 | static void |
| 67 | indexer_commit_keys_destroy(struct indexer_commit_keys *keys) { |
| 68 | for (int i = 0; i < keys->max_keys; i++) |
| 69 | toku_destroy_dbt(&keys->keys[i]); |
| 70 | toku_free(keys->keys); |
| 71 | } |
| 72 | |
| 73 | // return the number of keys in the ordered set |
| 74 | static int |
| 75 | indexer_commit_keys_valid(struct indexer_commit_keys *keys) { |
| 76 | return keys->current_keys; |
| 77 | } |
| 78 | |
| 79 | // add a key to the commit keys |
| 80 | static void |
| 81 | indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) { |
| 82 | if (keys->current_keys >= keys->max_keys) { |
| 83 | int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2; |
| 84 | keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT)); |
| 85 | for (int i = keys->current_keys; i < new_max_keys; i++) |
| 86 | toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC); |
| 87 | keys->max_keys = new_max_keys; |
| 88 | } |
| 89 | DBT *key = &keys->keys[keys->current_keys]; |
| 90 | toku_dbt_set(length, ptr, key, NULL); |
| 91 | keys->current_keys++; |
| 92 | } |
| 93 | |
| 94 | // set the ordered set to empty |
| 95 | static void |
| 96 | indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) { |
| 97 | keys->current_keys = 0; |
| 98 | } |
| 99 | |
| 100 | // internal functions |
| 101 | static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result); |
| 102 | static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result); |
| 103 | |
| 104 | static bool indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex); |
| 105 | |
| 106 | static int indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info* prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals); |
| 107 | static int indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn); |
| 108 | static int indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); |
| 109 | static int indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn); |
| 110 | static int indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids); |
| 111 | static int indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); |
| 112 | static void indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn); |
| 113 | |
| 114 | |
| 115 | // initialize undo globals located in the indexer private object |
| 116 | void |
| 117 | indexer_undo_do_init(DB_INDEXER *indexer) { |
| 118 | indexer_commit_keys_init(&indexer->i->commit_keys); |
| 119 | XMALLOC_N(indexer->i->N, indexer->i->hot_keys); |
| 120 | XMALLOC_N(indexer->i->N, indexer->i->hot_vals); |
| 121 | for (int which = 0; which < indexer->i->N; which++) { |
| 122 | toku_dbt_array_init(&indexer->i->hot_keys[which], 1); |
| 123 | toku_dbt_array_init(&indexer->i->hot_vals[which], 1); |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | // destroy the undo globals |
| 128 | void |
| 129 | indexer_undo_do_destroy(DB_INDEXER *indexer) { |
| 130 | indexer_commit_keys_destroy(&indexer->i->commit_keys); |
| 131 | if (indexer->i->hot_keys) { |
| 132 | invariant(indexer->i->hot_vals); |
| 133 | for (int which = 0; which < indexer->i->N; which++) { |
| 134 | toku_dbt_array_destroy(&indexer->i->hot_keys[which]); |
| 135 | toku_dbt_array_destroy(&indexer->i->hot_vals[which]); |
| 136 | } |
| 137 | toku_free(indexer->i->hot_keys); |
| 138 | toku_free(indexer->i->hot_vals); |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | static int |
| 143 | indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
| 144 | int result = 0; |
| 145 | ULEHANDLE ule = prov_info->ule; |
| 146 | |
| 147 | // init the xids to the root xid |
| 148 | XIDS xids = toku_xids_get_root_xids(); |
| 149 | |
| 150 | // scan the committed stack from bottom to top |
| 151 | uint32_t num_committed = ule_get_num_committed(ule); |
| 152 | for (uint64_t xrindex = 0; xrindex < num_committed; xrindex++) { |
| 153 | |
| 154 | indexer_commit_keys_set_empty(&indexer->i->commit_keys); |
| 155 | |
| 156 | // get the transaction record |
| 157 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
| 158 | |
| 159 | // setup up the xids |
| 160 | TXNID this_xid = uxr_get_txnid(uxr); |
| 161 | result = indexer_set_xid(indexer, this_xid, &xids); |
| 162 | if (result != 0) |
| 163 | break; |
| 164 | |
| 165 | // placeholders in the committed stack are not allowed |
| 166 | invariant(!uxr_is_placeholder(uxr)); |
| 167 | |
| 168 | // undo |
| 169 | if (xrindex > 0) { |
| 170 | uint64_t prev_xrindex = xrindex - 1; |
| 171 | UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex); |
| 172 | if (uxr_is_delete(prevuxr)) { |
| 173 | ; // do nothing |
| 174 | } else if (uxr_is_insert(prevuxr)) { |
| 175 | // generate the hot delete key |
| 176 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL); |
| 177 | if (result == 0) { |
| 178 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
| 179 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
| 180 | DBT *hotkey = &hot_keys->dbts[i]; |
| 181 | |
| 182 | // send the delete message |
| 183 | result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids); |
| 184 | if (result == 0) { |
| 185 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
| 186 | } |
| 187 | } |
| 188 | } |
| 189 | } else { |
| 190 | assert(0); |
| 191 | } |
| 192 | } |
| 193 | if (result != 0) { |
| 194 | break; |
| 195 | } |
| 196 | |
| 197 | // do |
| 198 | if (uxr_is_delete(uxr)) { |
| 199 | ; // do nothing |
| 200 | } else if (uxr_is_insert(uxr)) { |
| 201 | // generate the hot insert key and val |
| 202 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals); |
| 203 | if (result == 0) { |
| 204 | paranoid_invariant(hot_keys->size == hot_vals->size); |
| 205 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
| 206 | paranoid_invariant(hot_vals->size <= hot_vals->capacity); |
| 207 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
| 208 | DBT *hotkey = &hot_keys->dbts[i]; |
| 209 | DBT *hotval = &hot_vals->dbts[i]; |
| 210 | |
| 211 | // send the insert message |
| 212 | result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
| 213 | if (result == 0) { |
| 214 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
| 215 | } |
| 216 | } |
| 217 | } |
| 218 | } else |
| 219 | assert(0); |
| 220 | |
| 221 | // send commit messages if needed |
| 222 | for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) |
| 223 | result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids); |
| 224 | |
| 225 | if (result != 0) |
| 226 | break; |
| 227 | } |
| 228 | |
| 229 | toku_xids_destroy(&xids); |
| 230 | |
| 231 | return result; |
| 232 | } |
| 233 | |
| 234 | static void release_txns( |
| 235 | ULEHANDLE ule, |
| 236 | TOKUTXN_STATE* prov_states, |
| 237 | TOKUTXN* prov_txns, |
| 238 | DB_INDEXER *indexer |
| 239 | ) |
| 240 | { |
| 241 | uint32_t num_provisional = ule_get_num_provisional(ule); |
| 242 | if (indexer->i->test_xid_state) { |
| 243 | goto exit; |
| 244 | } |
| 245 | for (uint32_t i = 0; i < num_provisional; i++) { |
| 246 | if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) { |
| 247 | toku_txn_unpin_live_txn(prov_txns[i]); |
| 248 | } |
| 249 | } |
| 250 | exit: |
| 251 | return; |
| 252 | } |
| 253 | |
| 254 | static int |
| 255 | indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
| 256 | int result = 0; |
| 257 | indexer_commit_keys_set_empty(&indexer->i->commit_keys); |
| 258 | ULEHANDLE ule = prov_info->ule; |
| 259 | |
| 260 | // init the xids to the root xid |
| 261 | XIDS xids = toku_xids_get_root_xids(); |
| 262 | |
| 263 | uint32_t num_provisional = prov_info->num_provisional; |
| 264 | uint32_t num_committed = prov_info->num_committed; |
| 265 | TXNID *prov_ids = prov_info->prov_ids; |
| 266 | TOKUTXN *prov_txns = prov_info->prov_txns; |
| 267 | TOKUTXN_STATE *prov_states = prov_info->prov_states; |
| 268 | |
| 269 | // nothing to do if there's nothing provisional |
| 270 | if (num_provisional == 0) { |
| 271 | goto exit; |
| 272 | } |
| 273 | |
| 274 | TXNID outermost_xid_state; |
| 275 | outermost_xid_state = prov_states[0]; |
| 276 | |
| 277 | // scan the provisional stack from the outermost to the innermost transaction record |
| 278 | TOKUTXN curr_txn; |
| 279 | curr_txn = NULL; |
| 280 | for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) { |
| 281 | |
| 282 | // get the ith transaction record |
| 283 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
| 284 | |
| 285 | TXNID this_xid = uxr_get_txnid(uxr); |
| 286 | TOKUTXN_STATE this_xid_state = prov_states[xrindex - num_committed]; |
| 287 | |
| 288 | if (this_xid_state == TOKUTXN_ABORTING) { |
| 289 | break; // nothing to do once we reach a transaction that is aborting |
| 290 | } |
| 291 | |
| 292 | if (xrindex == num_committed) { // if this is the outermost xr |
| 293 | result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list |
| 294 | curr_txn = prov_txns[xrindex - num_committed]; |
| 295 | } else { |
| 296 | switch (this_xid_state) { |
| 297 | case TOKUTXN_LIVE: |
| 298 | result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list |
| 299 | curr_txn = prov_txns[xrindex - num_committed]; |
| 300 | if (!indexer->i->test_xid_state) { |
| 301 | assert(curr_txn); |
| 302 | } |
| 303 | break; |
| 304 | case TOKUTXN_PREPARING: |
| 305 | assert(0); // not allowed |
| 306 | case TOKUTXN_COMMITTING: |
| 307 | case TOKUTXN_ABORTING: |
| 308 | case TOKUTXN_RETIRED: |
| 309 | break; // nothing to do |
| 310 | } |
| 311 | } |
| 312 | if (result != 0) |
| 313 | break; |
| 314 | |
| 315 | if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) { |
| 316 | // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works. |
| 317 | assert(this_xid_state == TOKUTXN_RETIRED); |
| 318 | } |
| 319 | |
| 320 | if (uxr_is_placeholder(uxr)) { |
| 321 | continue; // skip placeholders |
| 322 | } |
| 323 | // undo |
| 324 | uint64_t prev_xrindex; |
| 325 | bool prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex); |
| 326 | if (prev_xrindex_found) { |
| 327 | UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex); |
| 328 | if (uxr_is_delete(prevuxr)) { |
| 329 | ; // do nothing |
| 330 | } else if (uxr_is_insert(prevuxr)) { |
| 331 | // generate the hot delete key |
| 332 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL); |
| 333 | if (result == 0) { |
| 334 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
| 335 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
| 336 | DBT *hotkey = &hot_keys->dbts[i]; |
| 337 | |
| 338 | // send the delete message |
| 339 | switch (outermost_xid_state) { |
| 340 | case TOKUTXN_LIVE: |
| 341 | case TOKUTXN_PREPARING: |
| 342 | invariant(this_xid_state != TOKUTXN_ABORTING); |
| 343 | invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING); |
| 344 | result = indexer_ft_delete_provisional(indexer, hotdb, hotkey, xids, curr_txn); |
| 345 | if (result == 0) { |
| 346 | indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], curr_txn); |
| 347 | } |
| 348 | break; |
| 349 | case TOKUTXN_COMMITTING: |
| 350 | case TOKUTXN_RETIRED: |
| 351 | result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids); |
| 352 | if (result == 0) |
| 353 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
| 354 | break; |
| 355 | case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting |
| 356 | assert(0); |
| 357 | } |
| 358 | } |
| 359 | } |
| 360 | } else |
| 361 | assert(0); |
| 362 | } |
| 363 | if (result != 0) |
| 364 | break; |
| 365 | |
| 366 | // do |
| 367 | if (uxr_is_delete(uxr)) { |
| 368 | ; // do nothing |
| 369 | } else if (uxr_is_insert(uxr)) { |
| 370 | // generate the hot insert key and val |
| 371 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals); |
| 372 | if (result == 0) { |
| 373 | paranoid_invariant(hot_keys->size == hot_vals->size); |
| 374 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
| 375 | paranoid_invariant(hot_vals->size <= hot_vals->capacity); |
| 376 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
| 377 | DBT *hotkey = &hot_keys->dbts[i]; |
| 378 | DBT *hotval = &hot_vals->dbts[i]; |
| 379 | |
| 380 | // send the insert message |
| 381 | switch (outermost_xid_state) { |
| 382 | case TOKUTXN_LIVE: |
| 383 | case TOKUTXN_PREPARING: |
| 384 | assert(this_xid_state != TOKUTXN_ABORTING); |
| 385 | invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING); |
| 386 | result = indexer_ft_insert_provisional(indexer, hotdb, hotkey, hotval, xids, curr_txn); |
| 387 | if (result == 0) { |
| 388 | indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], prov_txns[0]); |
| 389 | } |
| 390 | break; |
| 391 | case TOKUTXN_COMMITTING: |
| 392 | case TOKUTXN_RETIRED: |
| 393 | result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
| 394 | // no need to do this because we do implicit commits on inserts |
| 395 | if (0 && result == 0) |
| 396 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
| 397 | break; |
| 398 | case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting |
| 399 | assert(0); |
| 400 | } |
| 401 | } |
| 402 | } |
| 403 | } else |
| 404 | assert(0); |
| 405 | |
| 406 | if (result != 0) |
| 407 | break; |
| 408 | } |
| 409 | |
| 410 | // send commits if the outermost provisional transaction is committed |
| 411 | for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) { |
| 412 | result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids); |
| 413 | } |
| 414 | |
| 415 | // be careful with this in the future. Right now, only exit path |
| 416 | // is BEFORE we call fill_prov_info, so this happens before exit |
| 417 | // If in the future we add a way to exit after fill_prov_info, |
| 418 | // then this will need to be handled below exit |
| 419 | release_txns(ule, prov_states, prov_txns, indexer); |
| 420 | exit: |
| 421 | toku_xids_destroy(&xids); |
| 422 | return result; |
| 423 | } |
| 424 | |
| 425 | int |
| 426 | indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
| 427 | int result = indexer_undo_do_committed(indexer, hotdb, prov_info, hot_keys, hot_vals); |
| 428 | if (result == 0) { |
| 429 | result = indexer_undo_do_provisional(indexer, hotdb, prov_info, hot_keys, hot_vals); |
| 430 | } |
| 431 | if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) { |
| 432 | result = EINVAL; |
| 433 | } |
| 434 | |
| 435 | return result; |
| 436 | } |
| 437 | |
| 438 | // set xids_result = [root_xid, this_xid] |
| 439 | // Note that this could be sped up by adding a new xids constructor that constructs the stack with |
| 440 | // exactly one xid. |
| 441 | static int |
| 442 | indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) { |
| 443 | int result = 0; |
| 444 | XIDS old_xids = *xids_result; |
| 445 | XIDS new_xids = toku_xids_get_root_xids(); |
| 446 | if (this_xid != TXNID_NONE) { |
| 447 | XIDS child_xids; |
| 448 | result = toku_xids_create_child(new_xids, &child_xids, this_xid); |
| 449 | toku_xids_destroy(&new_xids); |
| 450 | if (result == 0) |
| 451 | new_xids = child_xids; |
| 452 | } |
| 453 | if (result == 0) { |
| 454 | toku_xids_destroy(&old_xids); |
| 455 | *xids_result = new_xids; |
| 456 | } |
| 457 | |
| 458 | return result; |
| 459 | } |
| 460 | |
| 461 | // append xid to xids_result |
| 462 | static int |
| 463 | indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, XIDS *xids_result) { |
| 464 | XIDS old_xids = *xids_result; |
| 465 | XIDS new_xids; |
| 466 | int result = toku_xids_create_child(old_xids, &new_xids, xid); |
| 467 | if (result == 0) { |
| 468 | toku_xids_destroy(&old_xids); |
| 469 | *xids_result = new_xids; |
| 470 | } |
| 471 | return result; |
| 472 | } |
| 473 | |
| 474 | static int |
| 475 | indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals) { |
| 476 | int result = 0; |
| 477 | |
| 478 | // setup the source key |
| 479 | DBT srckey; |
| 480 | toku_fill_dbt(&srckey, prov_info->key, prov_info->keylen); |
| 481 | |
| 482 | // setup the source val |
| 483 | DBT srcval; |
| 484 | toku_fill_dbt(&srcval, uxr_get_val(uxr), uxr_get_vallen(uxr)); |
| 485 | |
| 486 | // generate the secondary row |
| 487 | DB_ENV *env = indexer->i->env; |
| 488 | if (hotvals) { |
| 489 | result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkeys, hotvals, &srckey, &srcval); |
| 490 | } |
| 491 | else { |
| 492 | result = env->i->generate_row_for_del(hotdb, indexer->i->src_db, hotkeys, &srckey, &srcval); |
| 493 | } |
| 494 | toku_destroy_dbt(&srckey); |
| 495 | toku_destroy_dbt(&srcval); |
| 496 | |
| 497 | return result; |
| 498 | } |
| 499 | |
| 500 | // Take a write lock on the given key for the outermost xid in the xids list. |
| 501 | static void |
| 502 | indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn) { |
| 503 | // TEST |
| 504 | if (indexer->i->test_lock_key) { |
| 505 | indexer->i->test_lock_key(indexer, outermost_live_xid, hotdb, key); |
| 506 | } else { |
| 507 | toku_db_grab_write_lock(hotdb, key, txn); |
| 508 | } |
| 509 | } |
| 510 | |
| 511 | // find the index of a non-placeholder transaction record that is previous to the transaction record |
| 512 | // found at xrindex. return true if one is found and return its index in prev_xrindex. otherwise, |
| 513 | // return false. |
| 514 | static bool |
| 515 | indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) { |
| 516 | assert(xrindex < ule_num_uxrs(ule)); |
| 517 | bool prev_found = false; |
| 518 | while (xrindex > 0) { |
| 519 | xrindex -= 1; |
| 520 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
| 521 | if (!uxr_is_placeholder(uxr)) { |
| 522 | *prev_xrindex = xrindex; |
| 523 | prev_found = true; |
| 524 | break; |
| 525 | } |
| 526 | } |
| 527 | return prev_found; |
| 528 | } |
| 529 | |
| 530 | // inject "delete" message into ft with logging in recovery and rollback logs, |
| 531 | // and making association between txn and ft |
| 532 | static int |
| 533 | indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn) { |
| 534 | int result = 0; |
| 535 | // TEST |
| 536 | if (indexer->i->test_delete_provisional) { |
| 537 | result = indexer->i->test_delete_provisional(indexer, hotdb, hotkey, xids); |
| 538 | } else { |
| 539 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
| 540 | if (result == 0) { |
| 541 | assert(txn != NULL); |
| 542 | // Not sure if this is really necessary, as |
| 543 | // the hot index DB should have to be checkpointed |
| 544 | // upon commit of the hot index transaction, but |
| 545 | // it is safe to do this |
| 546 | // this question apples to delete_committed, insert_provisional |
| 547 | // and insert_committed |
| 548 | toku_ft_maybe_delete (hotdb->i->ft_handle, hotkey, txn, false, ZERO_LSN, true); |
| 549 | } |
| 550 | } |
| 551 | return result; |
| 552 | } |
| 553 | |
| 554 | // send a delete message into the tree without rollback or recovery logging |
| 555 | static int |
| 556 | indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { |
| 557 | int result = 0; |
| 558 | // TEST |
| 559 | if (indexer->i->test_delete_committed) { |
| 560 | result = indexer->i->test_delete_committed(indexer, hotdb, hotkey, xids); |
| 561 | } else { |
| 562 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
| 563 | if (result == 0) { |
| 564 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
| 565 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
| 566 | txn_manager_state txn_state_for_gc(txn_manager); |
| 567 | |
| 568 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
| 569 | txn_gc_info gc_info(&txn_state_for_gc, |
| 570 | oldest_referenced_xid_estimate, |
| 571 | oldest_referenced_xid_estimate, |
| 572 | true); |
| 573 | toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info); |
| 574 | toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, -1); |
| 575 | } |
| 576 | } |
| 577 | return result; |
| 578 | } |
| 579 | |
| 580 | // inject "insert" message into ft with logging in recovery and rollback logs, |
| 581 | // and making association between txn and ft |
| 582 | static int |
| 583 | indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn) { |
| 584 | int result = 0; |
| 585 | // TEST |
| 586 | if (indexer->i->test_insert_provisional) { |
| 587 | result = indexer->i->test_insert_provisional(indexer, hotdb, hotkey, hotval, xids); |
| 588 | } else { |
| 589 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
| 590 | if (result == 0) { |
| 591 | assert(txn != NULL); |
| 592 | // comment/question in indexer_ft_delete_provisional applies |
| 593 | toku_ft_maybe_insert (hotdb->i->ft_handle, hotkey, hotval, txn, false, ZERO_LSN, true, FT_INSERT); |
| 594 | } |
| 595 | } |
| 596 | return result; |
| 597 | } |
| 598 | |
| 599 | // send an insert message into the tree without rollback or recovery logging |
| 600 | // and without associating the txn and the ft |
| 601 | static int |
| 602 | indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) { |
| 603 | int result = 0; |
| 604 | // TEST |
| 605 | if (indexer->i->test_insert_committed) { |
| 606 | result = indexer->i->test_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
| 607 | } else { |
| 608 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
| 609 | if (result == 0) { |
| 610 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
| 611 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
| 612 | txn_manager_state txn_state_for_gc(txn_manager); |
| 613 | |
| 614 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
| 615 | txn_gc_info gc_info(&txn_state_for_gc, |
| 616 | oldest_referenced_xid_estimate, |
| 617 | oldest_referenced_xid_estimate, |
| 618 | true); |
| 619 | toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, &gc_info); |
| 620 | toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, 1); |
| 621 | } |
| 622 | } |
| 623 | return result; |
| 624 | } |
| 625 | |
| 626 | // send a commit message into the tree |
| 627 | // Note: If the xid is zero, then the leafentry will already have a committed transaction |
| 628 | // record and no commit message is needed. (A commit message with xid of zero is |
| 629 | // illegal anyway.) |
| 630 | static int |
| 631 | indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { |
| 632 | int result = 0; |
| 633 | if (toku_xids_get_num_xids(xids) > 0) {// send commit only when not the root xid |
| 634 | // TEST |
| 635 | if (indexer->i->test_commit_any) { |
| 636 | result = indexer->i->test_commit_any(indexer, hotdb, hotkey, xids); |
| 637 | } else { |
| 638 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
| 639 | if (result == 0) { |
| 640 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
| 641 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
| 642 | txn_manager_state txn_state_for_gc(txn_manager); |
| 643 | |
| 644 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
| 645 | txn_gc_info gc_info(&txn_state_for_gc, |
| 646 | oldest_referenced_xid_estimate, |
| 647 | oldest_referenced_xid_estimate, |
| 648 | true); |
| 649 | toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info); |
| 650 | } |
| 651 | } |
| 652 | } |
| 653 | return result; |
| 654 | } |
| 655 | |