| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include "ft/logger/log-internal.h" |
| 40 | #include "ft/txn/rollback-apply.h" |
| 41 | |
| 42 | static void poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) { |
| 43 | if (txn->progress_poll_fun) { |
| 44 | TOKU_TXN_PROGRESS_S progress = { |
| 45 | .entries_total = txn->roll_info.num_rollentries, |
| 46 | .entries_processed = txn->roll_info.num_rollentries_processed, |
| 47 | .is_commit = is_commit, |
| 48 | .stalled_on_checkpoint = stall_for_checkpoint}; |
| 49 | txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra); |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) { |
| 54 | int r=0; |
| 55 | rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn); |
| 56 | txn->roll_info.num_rollentries_processed++; |
| 57 | if (txn->roll_info.num_rollentries_processed % 1024 == 0) { |
| 58 | poll_txn_progress_function(txn, true, false); |
| 59 | } |
| 60 | return r; |
| 61 | } |
| 62 | |
| 63 | int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) { |
| 64 | int r=0; |
| 65 | rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn); |
| 66 | txn->roll_info.num_rollentries_processed++; |
| 67 | if (txn->roll_info.num_rollentries_processed % 1024 == 0) { |
| 68 | poll_txn_progress_function(txn, false, false); |
| 69 | } |
| 70 | return r; |
| 71 | } |
| 72 | |
| 73 | int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child); |
| 74 | int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) { |
| 75 | TOKUTXN parent = child->parent; |
| 76 | toku_txn_maybe_note_ft(parent, ft); |
| 77 | return 0; |
| 78 | } |
| 79 | |
| 80 | static int apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { |
| 81 | int r = 0; |
| 82 | // do the commit/abort calls and free everything |
| 83 | // we do the commit/abort calls in reverse order too. |
| 84 | struct roll_entry *item; |
| 85 | //printf("%s:%d abort\n", __FILE__, __LINE__); |
| 86 | |
| 87 | BLOCKNUM next_log = ROLLBACK_NONE; |
| 88 | |
| 89 | bool is_current = false; |
| 90 | if (txn_has_current_rollback_log(txn)) { |
| 91 | next_log = txn->roll_info.current_rollback; |
| 92 | is_current = true; |
| 93 | } |
| 94 | else if (txn_has_spilled_rollback_logs(txn)) { |
| 95 | next_log = txn->roll_info.spilled_rollback_tail; |
| 96 | } |
| 97 | |
| 98 | uint64_t last_sequence = txn->roll_info.num_rollback_nodes; |
| 99 | bool found_head = false; |
| 100 | while (next_log.b != ROLLBACK_NONE.b) { |
| 101 | ROLLBACK_LOG_NODE log; |
| 102 | //pin log |
| 103 | toku_get_and_pin_rollback_log(txn, next_log, &log); |
| 104 | toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1); |
| 105 | |
| 106 | toku_maybe_prefetch_previous_rollback_log(txn, log); |
| 107 | |
| 108 | last_sequence = log->sequence; |
| 109 | if (func) { |
| 110 | while ((item=log->newest_logentry)) { |
| 111 | log->newest_logentry = item->prev; |
| 112 | r = func(txn, item, lsn); |
| 113 | if (r!=0) return r; |
| 114 | } |
| 115 | } |
| 116 | if (next_log.b == txn->roll_info.spilled_rollback_head.b) { |
| 117 | assert(!found_head); |
| 118 | found_head = true; |
| 119 | assert(log->sequence == 0); |
| 120 | } |
| 121 | next_log = log->previous; |
| 122 | { |
| 123 | //Clean up transaction structure to prevent |
| 124 | //toku_txn_close from double-freeing |
| 125 | if (is_current) { |
| 126 | txn->roll_info.current_rollback = ROLLBACK_NONE; |
| 127 | is_current = false; |
| 128 | } |
| 129 | else { |
| 130 | txn->roll_info.spilled_rollback_tail = next_log; |
| 131 | } |
| 132 | if (found_head) { |
| 133 | assert(next_log.b == ROLLBACK_NONE.b); |
| 134 | txn->roll_info.spilled_rollback_head = next_log; |
| 135 | } |
| 136 | } |
| 137 | bool give_back = false; |
| 138 | // each txn tries to give back at most one rollback log node |
| 139 | // to the cache. |
| 140 | if (next_log.b == ROLLBACK_NONE.b) { |
| 141 | give_back = txn->logger->rollback_cache.give_rollback_log_node( |
| 142 | txn, |
| 143 | log |
| 144 | ); |
| 145 | } |
| 146 | if (!give_back) { |
| 147 | toku_rollback_log_unpin_and_remove(txn, log); |
| 148 | } |
| 149 | } |
| 150 | return r; |
| 151 | } |
| 152 | |
| 153 | //Commit each entry in the rollback log. |
| 154 | //If the transaction has a parent, it just promotes its information to its parent. |
| 155 | int toku_rollback_commit(TOKUTXN txn, LSN lsn) { |
| 156 | int r=0; |
| 157 | if (txn->parent!=0) { |
| 158 | // First we must put a rollinclude entry into the parent if we spilled |
| 159 | |
| 160 | if (txn_has_spilled_rollback_logs(txn)) { |
| 161 | uint64_t num_nodes = txn->roll_info.num_rollback_nodes; |
| 162 | if (txn_has_current_rollback_log(txn)) { |
| 163 | num_nodes--; //Don't count the in-progress rollback log. |
| 164 | } |
| 165 | toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes, |
| 166 | txn->roll_info.spilled_rollback_head, |
| 167 | txn->roll_info.spilled_rollback_tail); |
| 168 | //Remove ownership from child. |
| 169 | txn->roll_info.spilled_rollback_head = ROLLBACK_NONE; |
| 170 | txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE; |
| 171 | } |
| 172 | // if we're committing a child rollback, put its entries into the parent |
| 173 | // by pinning both child and parent and then linking the child log entry |
| 174 | // list to the end of the parent log entry list. |
| 175 | if (txn_has_current_rollback_log(txn)) { |
| 176 | //Pin parent log |
| 177 | toku_txn_lock(txn->parent); |
| 178 | ROLLBACK_LOG_NODE parent_log; |
| 179 | toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log); |
| 180 | |
| 181 | //Pin child log |
| 182 | ROLLBACK_LOG_NODE child_log; |
| 183 | toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &child_log); |
| 184 | toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1); |
| 185 | |
| 186 | // Append the list to the front of the parent. |
| 187 | if (child_log->oldest_logentry) { |
| 188 | // There are some entries, so link them in. |
| 189 | parent_log->dirty = true; |
| 190 | child_log->oldest_logentry->prev = parent_log->newest_logentry; |
| 191 | if (!parent_log->oldest_logentry) { |
| 192 | parent_log->oldest_logentry = child_log->oldest_logentry; |
| 193 | } |
| 194 | parent_log->newest_logentry = child_log->newest_logentry; |
| 195 | parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount; |
| 196 | txn->parent->roll_info.rollentry_raw_count += txn->roll_info.rollentry_raw_count; |
| 197 | child_log->rollentry_resident_bytecount = 0; |
| 198 | } |
| 199 | if (parent_log->oldest_logentry==NULL) { |
| 200 | parent_log->oldest_logentry = child_log->oldest_logentry; |
| 201 | } |
| 202 | child_log->newest_logentry = child_log->oldest_logentry = 0; |
| 203 | // Put all the memarena data into the parent. |
| 204 | if (child_log->rollentry_arena.total_size_in_use() > 0) { |
| 205 | // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed. |
| 206 | child_log->rollentry_arena.move_memory(&parent_log->rollentry_arena); |
| 207 | } |
| 208 | // each txn tries to give back at most one rollback log node |
| 209 | // to the cache. All other rollback log nodes for this child |
| 210 | // transaction are included in the parent's rollback log, |
| 211 | // so this is the only node we can give back to the cache |
| 212 | bool give_back = txn->logger->rollback_cache.give_rollback_log_node( |
| 213 | txn, |
| 214 | child_log |
| 215 | ); |
| 216 | if (!give_back) { |
| 217 | toku_rollback_log_unpin_and_remove(txn, child_log); |
| 218 | } |
| 219 | txn->roll_info.current_rollback = ROLLBACK_NONE; |
| 220 | |
| 221 | toku_maybe_spill_rollbacks(txn->parent, parent_log); |
| 222 | toku_rollback_log_unpin(txn->parent, parent_log); |
| 223 | assert(r == 0); |
| 224 | toku_txn_unlock(txn->parent); |
| 225 | } |
| 226 | |
| 227 | // Note the open FTs, the omts must be merged |
| 228 | r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn); |
| 229 | assert(r==0); |
| 230 | |
| 231 | //If this transaction needs an fsync (if it commits) |
| 232 | //save that in the parent. Since the commit really happens in the root txn. |
| 233 | txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; |
| 234 | txn->parent->roll_info.num_rollentries += txn->roll_info.num_rollentries; |
| 235 | } else { |
| 236 | r = apply_txn(txn, lsn, toku_commit_rollback_item); |
| 237 | assert(r==0); |
| 238 | } |
| 239 | |
| 240 | return r; |
| 241 | } |
| 242 | |
| 243 | int toku_rollback_abort(TOKUTXN txn, LSN lsn) { |
| 244 | int r; |
| 245 | r = apply_txn(txn, lsn, toku_abort_rollback_item); |
| 246 | assert(r==0); |
| 247 | return r; |
| 248 | } |
| 249 | |
| 250 | int toku_rollback_discard(TOKUTXN txn) { |
| 251 | txn->roll_info.current_rollback = ROLLBACK_NONE; |
| 252 | txn->roll_info.spilled_rollback_head = ROLLBACK_NONE; |
| 253 | txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE; |
| 254 | return 0; |
| 255 | } |
| 256 | |
| 257 | |