1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <toku_portability.h> |
40 | #include <toku_assert.h> |
41 | |
42 | #include <stdio.h> |
43 | #include <string.h> |
44 | |
45 | #include <ft/le-cursor.h> |
46 | #include <ft/ft-ops.h> |
47 | #include <ft/leafentry.h> |
48 | #include <ft/ule.h> |
49 | #include <ft/txn/txn_manager.h> |
50 | #include <ft/txn/xids.h> |
51 | #include <ft/cachetable/checkpoint.h> |
52 | |
53 | #include "ydb-internal.h" |
54 | #include "ydb_row_lock.h" |
55 | #include "indexer.h" |
56 | #include "indexer-internal.h" |
57 | |
58 | // initialize the commit keys |
59 | static void |
60 | indexer_commit_keys_init(struct indexer_commit_keys *keys) { |
61 | keys->max_keys = keys->current_keys = 0; |
62 | keys->keys = NULL; |
63 | } |
64 | |
65 | // destroy the commit keys |
66 | static void |
67 | indexer_commit_keys_destroy(struct indexer_commit_keys *keys) { |
68 | for (int i = 0; i < keys->max_keys; i++) |
69 | toku_destroy_dbt(&keys->keys[i]); |
70 | toku_free(keys->keys); |
71 | } |
72 | |
73 | // return the number of keys in the ordered set |
74 | static int |
75 | indexer_commit_keys_valid(struct indexer_commit_keys *keys) { |
76 | return keys->current_keys; |
77 | } |
78 | |
79 | // add a key to the commit keys |
80 | static void |
81 | indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) { |
82 | if (keys->current_keys >= keys->max_keys) { |
83 | int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2; |
84 | keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT)); |
85 | for (int i = keys->current_keys; i < new_max_keys; i++) |
86 | toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC); |
87 | keys->max_keys = new_max_keys; |
88 | } |
89 | DBT *key = &keys->keys[keys->current_keys]; |
90 | toku_dbt_set(length, ptr, key, NULL); |
91 | keys->current_keys++; |
92 | } |
93 | |
94 | // set the ordered set to empty |
95 | static void |
96 | indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) { |
97 | keys->current_keys = 0; |
98 | } |
99 | |
100 | // internal functions |
101 | static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result); |
102 | static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result); |
103 | |
104 | static bool indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex); |
105 | |
106 | static int indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info* prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals); |
107 | static int indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn); |
108 | static int indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); |
109 | static int indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn); |
110 | static int indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids); |
111 | static int indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids); |
112 | static void indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn); |
113 | |
114 | |
115 | // initialize undo globals located in the indexer private object |
116 | void |
117 | indexer_undo_do_init(DB_INDEXER *indexer) { |
118 | indexer_commit_keys_init(&indexer->i->commit_keys); |
119 | XMALLOC_N(indexer->i->N, indexer->i->hot_keys); |
120 | XMALLOC_N(indexer->i->N, indexer->i->hot_vals); |
121 | for (int which = 0; which < indexer->i->N; which++) { |
122 | toku_dbt_array_init(&indexer->i->hot_keys[which], 1); |
123 | toku_dbt_array_init(&indexer->i->hot_vals[which], 1); |
124 | } |
125 | } |
126 | |
127 | // destroy the undo globals |
128 | void |
129 | indexer_undo_do_destroy(DB_INDEXER *indexer) { |
130 | indexer_commit_keys_destroy(&indexer->i->commit_keys); |
131 | if (indexer->i->hot_keys) { |
132 | invariant(indexer->i->hot_vals); |
133 | for (int which = 0; which < indexer->i->N; which++) { |
134 | toku_dbt_array_destroy(&indexer->i->hot_keys[which]); |
135 | toku_dbt_array_destroy(&indexer->i->hot_vals[which]); |
136 | } |
137 | toku_free(indexer->i->hot_keys); |
138 | toku_free(indexer->i->hot_vals); |
139 | } |
140 | } |
141 | |
142 | static int |
143 | indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
144 | int result = 0; |
145 | ULEHANDLE ule = prov_info->ule; |
146 | |
147 | // init the xids to the root xid |
148 | XIDS xids = toku_xids_get_root_xids(); |
149 | |
150 | // scan the committed stack from bottom to top |
151 | uint32_t num_committed = ule_get_num_committed(ule); |
152 | for (uint64_t xrindex = 0; xrindex < num_committed; xrindex++) { |
153 | |
154 | indexer_commit_keys_set_empty(&indexer->i->commit_keys); |
155 | |
156 | // get the transaction record |
157 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
158 | |
159 | // setup up the xids |
160 | TXNID this_xid = uxr_get_txnid(uxr); |
161 | result = indexer_set_xid(indexer, this_xid, &xids); |
162 | if (result != 0) |
163 | break; |
164 | |
165 | // placeholders in the committed stack are not allowed |
166 | invariant(!uxr_is_placeholder(uxr)); |
167 | |
168 | // undo |
169 | if (xrindex > 0) { |
170 | uint64_t prev_xrindex = xrindex - 1; |
171 | UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex); |
172 | if (uxr_is_delete(prevuxr)) { |
173 | ; // do nothing |
174 | } else if (uxr_is_insert(prevuxr)) { |
175 | // generate the hot delete key |
176 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL); |
177 | if (result == 0) { |
178 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
179 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
180 | DBT *hotkey = &hot_keys->dbts[i]; |
181 | |
182 | // send the delete message |
183 | result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids); |
184 | if (result == 0) { |
185 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
186 | } |
187 | } |
188 | } |
189 | } else { |
190 | assert(0); |
191 | } |
192 | } |
193 | if (result != 0) { |
194 | break; |
195 | } |
196 | |
197 | // do |
198 | if (uxr_is_delete(uxr)) { |
199 | ; // do nothing |
200 | } else if (uxr_is_insert(uxr)) { |
201 | // generate the hot insert key and val |
202 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals); |
203 | if (result == 0) { |
204 | paranoid_invariant(hot_keys->size == hot_vals->size); |
205 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
206 | paranoid_invariant(hot_vals->size <= hot_vals->capacity); |
207 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
208 | DBT *hotkey = &hot_keys->dbts[i]; |
209 | DBT *hotval = &hot_vals->dbts[i]; |
210 | |
211 | // send the insert message |
212 | result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
213 | if (result == 0) { |
214 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
215 | } |
216 | } |
217 | } |
218 | } else |
219 | assert(0); |
220 | |
221 | // send commit messages if needed |
222 | for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) |
223 | result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids); |
224 | |
225 | if (result != 0) |
226 | break; |
227 | } |
228 | |
229 | toku_xids_destroy(&xids); |
230 | |
231 | return result; |
232 | } |
233 | |
234 | static void release_txns( |
235 | ULEHANDLE ule, |
236 | TOKUTXN_STATE* prov_states, |
237 | TOKUTXN* prov_txns, |
238 | DB_INDEXER *indexer |
239 | ) |
240 | { |
241 | uint32_t num_provisional = ule_get_num_provisional(ule); |
242 | if (indexer->i->test_xid_state) { |
243 | goto exit; |
244 | } |
245 | for (uint32_t i = 0; i < num_provisional; i++) { |
246 | if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) { |
247 | toku_txn_unpin_live_txn(prov_txns[i]); |
248 | } |
249 | } |
250 | exit: |
251 | return; |
252 | } |
253 | |
254 | static int |
255 | indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
256 | int result = 0; |
257 | indexer_commit_keys_set_empty(&indexer->i->commit_keys); |
258 | ULEHANDLE ule = prov_info->ule; |
259 | |
260 | // init the xids to the root xid |
261 | XIDS xids = toku_xids_get_root_xids(); |
262 | |
263 | uint32_t num_provisional = prov_info->num_provisional; |
264 | uint32_t num_committed = prov_info->num_committed; |
265 | TXNID *prov_ids = prov_info->prov_ids; |
266 | TOKUTXN *prov_txns = prov_info->prov_txns; |
267 | TOKUTXN_STATE *prov_states = prov_info->prov_states; |
268 | |
269 | // nothing to do if there's nothing provisional |
270 | if (num_provisional == 0) { |
271 | goto exit; |
272 | } |
273 | |
274 | TXNID outermost_xid_state; |
275 | outermost_xid_state = prov_states[0]; |
276 | |
277 | // scan the provisional stack from the outermost to the innermost transaction record |
278 | TOKUTXN curr_txn; |
279 | curr_txn = NULL; |
280 | for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) { |
281 | |
282 | // get the ith transaction record |
283 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
284 | |
285 | TXNID this_xid = uxr_get_txnid(uxr); |
286 | TOKUTXN_STATE this_xid_state = prov_states[xrindex - num_committed]; |
287 | |
288 | if (this_xid_state == TOKUTXN_ABORTING) { |
289 | break; // nothing to do once we reach a transaction that is aborting |
290 | } |
291 | |
292 | if (xrindex == num_committed) { // if this is the outermost xr |
293 | result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list |
294 | curr_txn = prov_txns[xrindex - num_committed]; |
295 | } else { |
296 | switch (this_xid_state) { |
297 | case TOKUTXN_LIVE: |
298 | result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list |
299 | curr_txn = prov_txns[xrindex - num_committed]; |
300 | if (!indexer->i->test_xid_state) { |
301 | assert(curr_txn); |
302 | } |
303 | break; |
304 | case TOKUTXN_PREPARING: |
305 | assert(0); // not allowed |
306 | case TOKUTXN_COMMITTING: |
307 | case TOKUTXN_ABORTING: |
308 | case TOKUTXN_RETIRED: |
309 | break; // nothing to do |
310 | } |
311 | } |
312 | if (result != 0) |
313 | break; |
314 | |
315 | if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) { |
316 | // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works. |
317 | assert(this_xid_state == TOKUTXN_RETIRED); |
318 | } |
319 | |
320 | if (uxr_is_placeholder(uxr)) { |
321 | continue; // skip placeholders |
322 | } |
323 | // undo |
324 | uint64_t prev_xrindex; |
325 | bool prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex); |
326 | if (prev_xrindex_found) { |
327 | UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex); |
328 | if (uxr_is_delete(prevuxr)) { |
329 | ; // do nothing |
330 | } else if (uxr_is_insert(prevuxr)) { |
331 | // generate the hot delete key |
332 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL); |
333 | if (result == 0) { |
334 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
335 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
336 | DBT *hotkey = &hot_keys->dbts[i]; |
337 | |
338 | // send the delete message |
339 | switch (outermost_xid_state) { |
340 | case TOKUTXN_LIVE: |
341 | case TOKUTXN_PREPARING: |
342 | invariant(this_xid_state != TOKUTXN_ABORTING); |
343 | invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING); |
344 | result = indexer_ft_delete_provisional(indexer, hotdb, hotkey, xids, curr_txn); |
345 | if (result == 0) { |
346 | indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], curr_txn); |
347 | } |
348 | break; |
349 | case TOKUTXN_COMMITTING: |
350 | case TOKUTXN_RETIRED: |
351 | result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids); |
352 | if (result == 0) |
353 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
354 | break; |
355 | case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting |
356 | assert(0); |
357 | } |
358 | } |
359 | } |
360 | } else |
361 | assert(0); |
362 | } |
363 | if (result != 0) |
364 | break; |
365 | |
366 | // do |
367 | if (uxr_is_delete(uxr)) { |
368 | ; // do nothing |
369 | } else if (uxr_is_insert(uxr)) { |
370 | // generate the hot insert key and val |
371 | result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals); |
372 | if (result == 0) { |
373 | paranoid_invariant(hot_keys->size == hot_vals->size); |
374 | paranoid_invariant(hot_keys->size <= hot_keys->capacity); |
375 | paranoid_invariant(hot_vals->size <= hot_vals->capacity); |
376 | for (uint32_t i = 0; i < hot_keys->size; i++) { |
377 | DBT *hotkey = &hot_keys->dbts[i]; |
378 | DBT *hotval = &hot_vals->dbts[i]; |
379 | |
380 | // send the insert message |
381 | switch (outermost_xid_state) { |
382 | case TOKUTXN_LIVE: |
383 | case TOKUTXN_PREPARING: |
384 | assert(this_xid_state != TOKUTXN_ABORTING); |
385 | invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING); |
386 | result = indexer_ft_insert_provisional(indexer, hotdb, hotkey, hotval, xids, curr_txn); |
387 | if (result == 0) { |
388 | indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], prov_txns[0]); |
389 | } |
390 | break; |
391 | case TOKUTXN_COMMITTING: |
392 | case TOKUTXN_RETIRED: |
393 | result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
394 | // no need to do this because we do implicit commits on inserts |
395 | if (0 && result == 0) |
396 | indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data); |
397 | break; |
398 | case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting |
399 | assert(0); |
400 | } |
401 | } |
402 | } |
403 | } else |
404 | assert(0); |
405 | |
406 | if (result != 0) |
407 | break; |
408 | } |
409 | |
410 | // send commits if the outermost provisional transaction is committed |
411 | for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) { |
412 | result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids); |
413 | } |
414 | |
415 | // be careful with this in the future. Right now, only exit path |
416 | // is BEFORE we call fill_prov_info, so this happens before exit |
417 | // If in the future we add a way to exit after fill_prov_info, |
418 | // then this will need to be handled below exit |
419 | release_txns(ule, prov_states, prov_txns, indexer); |
420 | exit: |
421 | toku_xids_destroy(&xids); |
422 | return result; |
423 | } |
424 | |
425 | int |
426 | indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) { |
427 | int result = indexer_undo_do_committed(indexer, hotdb, prov_info, hot_keys, hot_vals); |
428 | if (result == 0) { |
429 | result = indexer_undo_do_provisional(indexer, hotdb, prov_info, hot_keys, hot_vals); |
430 | } |
431 | if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) { |
432 | result = EINVAL; |
433 | } |
434 | |
435 | return result; |
436 | } |
437 | |
438 | // set xids_result = [root_xid, this_xid] |
439 | // Note that this could be sped up by adding a new xids constructor that constructs the stack with |
440 | // exactly one xid. |
441 | static int |
442 | indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) { |
443 | int result = 0; |
444 | XIDS old_xids = *xids_result; |
445 | XIDS new_xids = toku_xids_get_root_xids(); |
446 | if (this_xid != TXNID_NONE) { |
447 | XIDS child_xids; |
448 | result = toku_xids_create_child(new_xids, &child_xids, this_xid); |
449 | toku_xids_destroy(&new_xids); |
450 | if (result == 0) |
451 | new_xids = child_xids; |
452 | } |
453 | if (result == 0) { |
454 | toku_xids_destroy(&old_xids); |
455 | *xids_result = new_xids; |
456 | } |
457 | |
458 | return result; |
459 | } |
460 | |
461 | // append xid to xids_result |
462 | static int |
463 | indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, XIDS *xids_result) { |
464 | XIDS old_xids = *xids_result; |
465 | XIDS new_xids; |
466 | int result = toku_xids_create_child(old_xids, &new_xids, xid); |
467 | if (result == 0) { |
468 | toku_xids_destroy(&old_xids); |
469 | *xids_result = new_xids; |
470 | } |
471 | return result; |
472 | } |
473 | |
474 | static int |
475 | indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals) { |
476 | int result = 0; |
477 | |
478 | // setup the source key |
479 | DBT srckey; |
480 | toku_fill_dbt(&srckey, prov_info->key, prov_info->keylen); |
481 | |
482 | // setup the source val |
483 | DBT srcval; |
484 | toku_fill_dbt(&srcval, uxr_get_val(uxr), uxr_get_vallen(uxr)); |
485 | |
486 | // generate the secondary row |
487 | DB_ENV *env = indexer->i->env; |
488 | if (hotvals) { |
489 | result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkeys, hotvals, &srckey, &srcval); |
490 | } |
491 | else { |
492 | result = env->i->generate_row_for_del(hotdb, indexer->i->src_db, hotkeys, &srckey, &srcval); |
493 | } |
494 | toku_destroy_dbt(&srckey); |
495 | toku_destroy_dbt(&srcval); |
496 | |
497 | return result; |
498 | } |
499 | |
500 | // Take a write lock on the given key for the outermost xid in the xids list. |
501 | static void |
502 | indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn) { |
503 | // TEST |
504 | if (indexer->i->test_lock_key) { |
505 | indexer->i->test_lock_key(indexer, outermost_live_xid, hotdb, key); |
506 | } else { |
507 | toku_db_grab_write_lock(hotdb, key, txn); |
508 | } |
509 | } |
510 | |
511 | // find the index of a non-placeholder transaction record that is previous to the transaction record |
512 | // found at xrindex. return true if one is found and return its index in prev_xrindex. otherwise, |
513 | // return false. |
514 | static bool |
515 | indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) { |
516 | assert(xrindex < ule_num_uxrs(ule)); |
517 | bool prev_found = false; |
518 | while (xrindex > 0) { |
519 | xrindex -= 1; |
520 | UXRHANDLE uxr = ule_get_uxr(ule, xrindex); |
521 | if (!uxr_is_placeholder(uxr)) { |
522 | *prev_xrindex = xrindex; |
523 | prev_found = true; |
524 | break; |
525 | } |
526 | } |
527 | return prev_found; |
528 | } |
529 | |
530 | // inject "delete" message into ft with logging in recovery and rollback logs, |
531 | // and making association between txn and ft |
532 | static int |
533 | indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn) { |
534 | int result = 0; |
535 | // TEST |
536 | if (indexer->i->test_delete_provisional) { |
537 | result = indexer->i->test_delete_provisional(indexer, hotdb, hotkey, xids); |
538 | } else { |
539 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
540 | if (result == 0) { |
541 | assert(txn != NULL); |
542 | // Not sure if this is really necessary, as |
543 | // the hot index DB should have to be checkpointed |
544 | // upon commit of the hot index transaction, but |
545 | // it is safe to do this |
546 | // this question apples to delete_committed, insert_provisional |
547 | // and insert_committed |
548 | toku_ft_maybe_delete (hotdb->i->ft_handle, hotkey, txn, false, ZERO_LSN, true); |
549 | } |
550 | } |
551 | return result; |
552 | } |
553 | |
554 | // send a delete message into the tree without rollback or recovery logging |
555 | static int |
556 | indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { |
557 | int result = 0; |
558 | // TEST |
559 | if (indexer->i->test_delete_committed) { |
560 | result = indexer->i->test_delete_committed(indexer, hotdb, hotkey, xids); |
561 | } else { |
562 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
563 | if (result == 0) { |
564 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
565 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
566 | txn_manager_state txn_state_for_gc(txn_manager); |
567 | |
568 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
569 | txn_gc_info gc_info(&txn_state_for_gc, |
570 | oldest_referenced_xid_estimate, |
571 | oldest_referenced_xid_estimate, |
572 | true); |
573 | toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info); |
574 | toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, -1); |
575 | } |
576 | } |
577 | return result; |
578 | } |
579 | |
580 | // inject "insert" message into ft with logging in recovery and rollback logs, |
581 | // and making association between txn and ft |
582 | static int |
583 | indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn) { |
584 | int result = 0; |
585 | // TEST |
586 | if (indexer->i->test_insert_provisional) { |
587 | result = indexer->i->test_insert_provisional(indexer, hotdb, hotkey, hotval, xids); |
588 | } else { |
589 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
590 | if (result == 0) { |
591 | assert(txn != NULL); |
592 | // comment/question in indexer_ft_delete_provisional applies |
593 | toku_ft_maybe_insert (hotdb->i->ft_handle, hotkey, hotval, txn, false, ZERO_LSN, true, FT_INSERT); |
594 | } |
595 | } |
596 | return result; |
597 | } |
598 | |
599 | // send an insert message into the tree without rollback or recovery logging |
600 | // and without associating the txn and the ft |
601 | static int |
602 | indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) { |
603 | int result = 0; |
604 | // TEST |
605 | if (indexer->i->test_insert_committed) { |
606 | result = indexer->i->test_insert_committed(indexer, hotdb, hotkey, hotval, xids); |
607 | } else { |
608 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
609 | if (result == 0) { |
610 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
611 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
612 | txn_manager_state txn_state_for_gc(txn_manager); |
613 | |
614 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
615 | txn_gc_info gc_info(&txn_state_for_gc, |
616 | oldest_referenced_xid_estimate, |
617 | oldest_referenced_xid_estimate, |
618 | true); |
619 | toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, &gc_info); |
620 | toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, 1); |
621 | } |
622 | } |
623 | return result; |
624 | } |
625 | |
626 | // send a commit message into the tree |
627 | // Note: If the xid is zero, then the leafentry will already have a committed transaction |
628 | // record and no commit message is needed. (A commit message with xid of zero is |
629 | // illegal anyway.) |
630 | static int |
631 | indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { |
632 | int result = 0; |
633 | if (toku_xids_get_num_xids(xids) > 0) {// send commit only when not the root xid |
634 | // TEST |
635 | if (indexer->i->test_commit_any) { |
636 | result = indexer->i->test_commit_any(indexer, hotdb, hotkey, xids); |
637 | } else { |
638 | result = toku_ydb_check_avail_fs_space(indexer->i->env); |
639 | if (result == 0) { |
640 | FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle; |
641 | TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h); |
642 | txn_manager_state txn_state_for_gc(txn_manager); |
643 | |
644 | TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h); |
645 | txn_gc_info gc_info(&txn_state_for_gc, |
646 | oldest_referenced_xid_estimate, |
647 | oldest_referenced_xid_estimate, |
648 | true); |
649 | toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info); |
650 | } |
651 | } |
652 | } |
653 | return result; |
654 | } |
655 | |