1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <toku_stdint.h>
40
41#include "ft/serialize/block_table.h"
42#include "ft/ft.h"
43#include "ft/logger/log-internal.h"
44#include "ft/txn/rollback-ct-callbacks.h"
45
46static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
47 FT CAST_FROM_VOIDP(ft, extra);
48 ft->blocktable.free_blocknum(cachekey, ft, for_checkpoint);
49}
50
51void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
52 int r;
53 CACHEFILE cf = txn->logger->rollback_cachefile;
54 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
55 r = toku_cachetable_unpin_and_remove (cf, log->ct_pair, rollback_unpin_remove_callback, ft);
56 assert(r == 0);
57}
58
59int
60toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind) {
61 if (xid<xidfind) return -1;
62 if (xid>xidfind) return +1;
63 return 0;
64}
65
66// TODO: fix this name
67// toku_rollback_malloc
68void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
69 return log->rollentry_arena.malloc_from_arena(size);
70}
71
72// TODO: fix this name
73// toku_rollback_memdup
74void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) {
75 void *r = toku_malloc_in_rollback(log, len);
76 memcpy(r, v, len);
77 return r;
78}
79
80static inline PAIR_ATTR make_rollback_pair_attr(long size) {
81 PAIR_ATTR result={
82 .size = size,
83 .nonleaf_size = 0,
84 .leaf_size = 0,
85 .rollback_size = size,
86 .cache_pressure_size = 0,
87 .is_valid = true
88 };
89 return result;
90}
91
92PAIR_ATTR
93rollback_memory_size(ROLLBACK_LOG_NODE log) {
94 size_t size = sizeof(*log);
95 size += log->rollentry_arena.total_footprint();
96 return make_rollback_pair_attr(size);
97}
98
99static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
100 ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
101 log->ct_pair = p;
102}
103
104//
105// initializes an empty rollback log node
106// Does not touch the blocknum, that is the
107// responsibility of the caller
108//
109void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
110 // Having a txnid set to TXNID_NONE is how we determine if the
111 // rollback log node is empty or in use.
112 log->txnid.parent_id64 = TXNID_NONE;
113 log->txnid.child_id64 = TXNID_NONE;
114
115 log->layout_version = FT_LAYOUT_VERSION;
116 log->layout_version_original = FT_LAYOUT_VERSION;
117 log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
118 log->dirty = true;
119 log->sequence = 0;
120 log->previous = make_blocknum(0);
121 log->oldest_logentry = NULL;
122 log->newest_logentry = NULL;
123 log->rollentry_arena.create(0);
124 log->rollentry_resident_bytecount = 0;
125}
126
127static void rollback_initialize_for_txn(
128 ROLLBACK_LOG_NODE log,
129 TOKUTXN txn,
130 BLOCKNUM previous
131 )
132{
133 log->txnid = txn->txnid;
134 log->sequence = txn->roll_info.num_rollback_nodes++;
135 log->previous = previous;
136 log->oldest_logentry = NULL;
137 log->newest_logentry = NULL;
138 log->rollentry_arena.create(1024);
139 log->rollentry_resident_bytecount = 0;
140 log->dirty = true;
141}
142
143// TODO: fix this name
144void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
145 log->rollentry_arena.destroy();
146 rollback_empty_log_init(log);
147}
148
149// create and pin a new rollback log node. chain it to the other rollback nodes
150// by providing a previous blocknum and assigning the new rollback log
151// node the next sequence number
152static void rollback_log_create (
153 TOKUTXN txn,
154 BLOCKNUM previous,
155 ROLLBACK_LOG_NODE *result
156 )
157{
158 ROLLBACK_LOG_NODE XMALLOC(log);
159 rollback_empty_log_init(log);
160
161 CACHEFILE cf = txn->logger->rollback_cachefile;
162 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
163 rollback_initialize_for_txn(log, txn, previous);
164 ft->blocktable.allocate_blocknum(&log->blocknum, ft);
165 const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
166 *result = log;
167 toku_cachetable_put(cf, log->blocknum, hash,
168 log, rollback_memory_size(log),
169 get_write_callbacks_for_rollback_log(ft),
170 toku_rollback_node_save_ct_pair);
171 txn->roll_info.current_rollback = log->blocknum;
172}
173
174void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
175 int r;
176 CACHEFILE cf = txn->logger->rollback_cachefile;
177 r = toku_cachetable_unpin(
178 cf,
179 log->ct_pair,
180 (enum cachetable_dirty)log->dirty,
181 rollback_memory_size(log)
182 );
183 assert(r == 0);
184}
185
186//Requires: log is pinned
187// log is current
188//After:
189// Maybe there is no current after (if it spilled)
190void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
191 if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
192 assert(log->blocknum.b == txn->roll_info.current_rollback.b);
193 //spill
194 if (!txn_has_spilled_rollback_logs(txn)) {
195 //First spilled. Copy to head.
196 txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
197 }
198 //Unconditionally copy to tail. Old tail does not need to be cached anymore.
199 txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
200
201 txn->roll_info.current_rollback = ROLLBACK_NONE;
202 }
203}
204
205int find_filenum (const FT &h, const FT &hfind);
206int find_filenum (const FT &h, const FT &hfind) {
207 FILENUM fnum = toku_cachefile_filenum(h->cf);
208 FILENUM fnumfind = toku_cachefile_filenum(hfind->cf);
209 if (fnum.fileid<fnumfind.fileid) return -1;
210 if (fnum.fileid>fnumfind.fileid) return +1;
211 return 0;
212}
213
214//Notify a transaction that it has touched an ft.
215void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
216 toku_txn_lock(txn);
217 FT ftv;
218 uint32_t idx;
219 int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx);
220 if (r == 0) {
221 // already there
222 assert(ftv == ft);
223 goto exit;
224 }
225 r = txn->open_fts.insert_at(ft, idx);
226 assert_zero(r);
227 // TODO(leif): if there's anything that locks the reflock and then
228 // the txn lock, this may deadlock, because it grabs the reflock.
229 toku_ft_add_txn_ref(ft);
230exit:
231 toku_txn_unlock(txn);
232}
233
234// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
235int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
236{
237 toku_txn_lock(txn);
238 txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
239 txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
240 toku_txn_unlock(txn);
241 return 0;
242}
243
244void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
245 //Currently processing 'log'. Prefetch the next (previous) log node.
246
247 BLOCKNUM name = log->previous;
248 int r = 0;
249 if (name.b != ROLLBACK_NONE.b) {
250 CACHEFILE cf = txn->logger->rollback_cachefile;
251 uint32_t hash = toku_cachetable_hash(cf, name);
252 FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
253 bool doing_prefetch = false;
254 r = toku_cachefile_prefetch(cf, name, hash,
255 get_write_callbacks_for_rollback_log(h),
256 toku_rollback_fetch_callback,
257 toku_rollback_pf_req_callback,
258 toku_rollback_pf_callback,
259 h,
260 &doing_prefetch);
261 assert(r == 0);
262 }
263}
264
265void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
266 TXNID_PAIR txnid, uint64_t sequence)
267{
268 assert(log->txnid.parent_id64 == txnid.parent_id64);
269 assert(log->txnid.child_id64 == txnid.child_id64);
270 assert(log->sequence == sequence);
271}
272
273void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
274 void * value;
275 CACHEFILE cf = txn->logger->rollback_cachefile;
276 FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
277 uint32_t hash = toku_cachetable_hash(cf, blocknum);
278 int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
279 &value, NULL,
280 get_write_callbacks_for_rollback_log(h),
281 toku_rollback_fetch_callback,
282 toku_rollback_pf_req_callback,
283 toku_rollback_pf_callback,
284 PL_WRITE_CHEAP, // lock_type
285 h,
286 0, NULL, NULL
287 );
288 assert(r == 0);
289 ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
290 assert(pinned_log->blocknum.b == blocknum.b);
291 *log = pinned_log;
292}
293
294void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
295 ROLLBACK_LOG_NODE pinned_log = NULL;
296 invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
297 if (txn_has_current_rollback_log(txn)) {
298 toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
299 toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
300 } else {
301 // For each transaction, we try to acquire the first rollback log
302 // from the rollback log node cache, so that we avoid
303 // putting something new into the cachetable. However,
304 // if transaction has spilled rollbacks, that means we
305 // have already done a lot of work for this transaction,
306 // and subsequent rollback log nodes are created
307 // and put into the cachetable. The idea is for
308 // transactions that don't do a lot of work to (hopefully)
309 // get a rollback log node from a cache, as opposed to
310 // taking the more expensive route of creating a new one.
311 if (!txn_has_spilled_rollback_logs(txn)) {
312 txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
313 if (pinned_log != NULL) {
314 rollback_initialize_for_txn(
315 pinned_log,
316 txn,
317 txn->roll_info.spilled_rollback_tail
318 );
319 txn->roll_info.current_rollback = pinned_log->blocknum;
320 }
321 }
322 if (pinned_log == NULL) {
323 rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
324 }
325 }
326 assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
327 assert(pinned_log->txnid.child_id64 == txn->txnid.child_id64);
328 assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
329 *log = pinned_log;
330}
331