1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39/*
40 * The indexer
41 */
42#include <stdio.h>
43#include <string.h>
44#include <toku_portability.h>
45#include "toku_assert.h"
46#include "ydb-internal.h"
47#include <ft/le-cursor.h>
48#include "indexer.h"
49#include <ft/ft-ops.h>
50#include <ft/leafentry.h>
51#include <ft/ule.h>
52#include <ft/txn/xids.h>
53#include <ft/logger/log-internal.h>
54#include <ft/cachetable/checkpoint.h>
55#include <portability/toku_atomic.h>
56#include "loader.h"
57#include <util/status.h>
58
59///////////////////////////////////////////////////////////////////////////////////
60// Engine status
61//
62// Status is intended for display to humans to help understand system behavior.
63// It does not need to be perfectly thread-safe.
64
65static INDEXER_STATUS_S indexer_status;
66
67#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(indexer_status, k, c, t, "indexer: " l, inc)
68
69static void
70status_init(void) {
71 // Note, this function initializes the keyname, type, and legend fields.
72 // Value fields are initialized to zero by compiler.
73 STATUS_INIT(INDEXER_CREATE, nullptr, UINT64, "number of indexers successfully created", TOKU_ENGINE_STATUS);
74 STATUS_INIT(INDEXER_CREATE_FAIL, nullptr, UINT64, "number of calls to toku_indexer_create_indexer() that failed", TOKU_ENGINE_STATUS);
75 STATUS_INIT(INDEXER_BUILD, nullptr, UINT64, "number of calls to indexer->build() succeeded", TOKU_ENGINE_STATUS);
76 STATUS_INIT(INDEXER_BUILD_FAIL, nullptr, UINT64, "number of calls to indexer->build() failed", TOKU_ENGINE_STATUS);
77 STATUS_INIT(INDEXER_CLOSE, nullptr, UINT64, "number of calls to indexer->close() that succeeded", TOKU_ENGINE_STATUS);
78 STATUS_INIT(INDEXER_CLOSE_FAIL, nullptr, UINT64, "number of calls to indexer->close() that failed", TOKU_ENGINE_STATUS);
79 STATUS_INIT(INDEXER_ABORT, nullptr, UINT64, "number of calls to indexer->abort()", TOKU_ENGINE_STATUS);
80 STATUS_INIT(INDEXER_CURRENT, nullptr, UINT64, "number of indexers currently in existence", TOKU_ENGINE_STATUS);
81 STATUS_INIT(INDEXER_MAX, nullptr, UINT64, "max number of indexers that ever existed simultaneously", TOKU_ENGINE_STATUS);
82 indexer_status.initialized = true;
83}
84#undef STATUS_INIT
85
86void
87toku_indexer_get_status(INDEXER_STATUS statp) {
88 if (!indexer_status.initialized)
89 status_init();
90 *statp = indexer_status;
91}
92
93#define STATUS_VALUE(x) indexer_status.status[x].value.num
94
95#include "indexer-internal.h"
96
97static int build_index(DB_INDEXER *indexer);
98static int close_indexer(DB_INDEXER *indexer);
99static int abort_indexer(DB_INDEXER *indexer);
100static void free_indexer_resources(DB_INDEXER *indexer);
101static void free_indexer(DB_INDEXER *indexer);
102static int update_estimated_rows(DB_INDEXER *indexer);
103static int maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count);
104
105static int
106associate_indexer_with_hot_dbs(DB_INDEXER *indexer, DB *dest_dbs[], int N) {
107 int result =0;
108 for (int i = 0; i < N; i++) {
109 result = toku_db_set_indexer(dest_dbs[i], indexer);
110 if (result != 0) {
111 for (int j = 0; j < i; j++) {
112 int result2 = toku_db_set_indexer(dest_dbs[j], NULL);
113 lazy_assert(result2 == 0);
114 }
115 break;
116 }
117 }
118 return result;
119}
120
121static void
122disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) {
123 for (int i = 0; i < indexer->i->N; i++) {
124 int result = toku_db_set_indexer(indexer->i->dest_dbs[i], NULL);
125 lazy_assert(result == 0);
126 }
127}
128
129/*
130 * free_indexer_resources() frees all of the resources associated with
131 * struct __toku_indexer_internal
132 * assumes any previously freed items set the field pointer to NULL
133 */
134
135static void
136free_indexer_resources(DB_INDEXER *indexer) {
137 if ( indexer->i ) {
138 toku_mutex_destroy(&indexer->i->indexer_lock);
139 toku_mutex_destroy(&indexer->i->indexer_estimate_lock);
140 toku_destroy_dbt(&indexer->i->position_estimate);
141 if ( indexer->i->lec ) {
142 toku_le_cursor_close(indexer->i->lec);
143 }
144 if ( indexer->i->fnums ) {
145 toku_free(indexer->i->fnums);
146 indexer->i->fnums = NULL;
147 }
148 indexer_undo_do_destroy(indexer);
149 // indexer->i
150 toku_free(indexer->i);
151 indexer->i = NULL;
152 }
153}
154
155static void
156free_indexer(DB_INDEXER *indexer) {
157 if ( indexer ) {
158 free_indexer_resources(indexer);
159 toku_free(indexer);
160 indexer = NULL;
161 }
162}
163
164void
165toku_indexer_lock(DB_INDEXER* indexer) {
166 toku_mutex_lock(&indexer->i->indexer_lock);
167}
168
169void
170toku_indexer_unlock(DB_INDEXER* indexer) {
171 toku_mutex_unlock(&indexer->i->indexer_lock);
172}
173
174// a shortcut call
175//
176// a cheap(er) call to see if a key must be inserted
177// into the DB. If true, then we know we have to insert.
178// If false, then we don't know, and have to check again
179// after grabbing the indexer lock
180bool
181toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
182 bool may_insert = false;
183 toku_mutex_lock(&indexer->i->indexer_estimate_lock);
184
185 // if we have no position estimate, we can't tell, so return false
186 if (indexer->i->position_estimate.data == nullptr) {
187 may_insert = false;
188 } else {
189 DB *db = indexer->i->src_db;
190 const toku::comparator &cmp = toku_ft_get_comparator(db->i->ft_handle);
191 int c = cmp(&indexer->i->position_estimate, key);
192
193 // if key > position_estimate, then we know the indexer cursor
194 // is past key, and we can safely say that associated values of
195 // key must be inserted into the indexer's db
196 may_insert = c < 0;
197 }
198
199 toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
200 return may_insert;
201}
202
203void
204toku_indexer_update_estimate(DB_INDEXER* indexer) {
205 toku_mutex_lock(&indexer->i->indexer_estimate_lock);
206 toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate);
207 toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
208}
209
210// forward declare the test-only wrapper function for undo-do
211static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule);
212
213int
214toku_indexer_create_indexer(DB_ENV *env,
215 DB_TXN *txn,
216 DB_INDEXER **indexerp,
217 DB *src_db,
218 int N,
219 DB *dest_dbs[/*N*/],
220 uint32_t db_flags[/*N*/] UU(),
221 uint32_t indexer_flags)
222{
223 int rval;
224 DB_INDEXER *indexer = 0; // set later when created
225 HANDLE_READ_ONLY_TXN(txn);
226
227 *indexerp = NULL;
228
229 XCALLOC(indexer); // init to all zeroes (thus initializing the error_callback and poll_func)
230 if ( !indexer ) { rval = ENOMEM; goto create_exit; }
231 XCALLOC(indexer->i); // init to all zeroes (thus initializing all pointers to NULL)
232 if ( !indexer->i ) { rval = ENOMEM; goto create_exit; }
233
234 indexer->i->env = env;
235 indexer->i->txn = txn;
236 indexer->i->src_db = src_db;
237 indexer->i->N = N;
238 indexer->i->dest_dbs = dest_dbs;
239 indexer->i->indexer_flags = indexer_flags;
240 indexer->i->loop_mod = 1000; // call poll_func every 1000 rows
241 indexer->i->estimated_rows = 0;
242 indexer->i->undo_do = test_indexer_undo_do; // TEST export the undo do function
243
244 XCALLOC_N(N, indexer->i->fnums);
245 if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; }
246 for(int i=0;i<indexer->i->N;i++) {
247 indexer->i->fnums[i] = toku_cachefile_filenum(db_struct_i(dest_dbs[i])->ft_handle->ft->cf);
248 }
249 indexer->i->filenums.num = N;
250 indexer->i->filenums.filenums = indexer->i->fnums;
251 indexer->i->test_only_flags = 0; // for test use only
252
253 indexer->set_error_callback = toku_indexer_set_error_callback;
254 indexer->set_poll_function = toku_indexer_set_poll_function;
255 indexer->build = build_index;
256 indexer->close = close_indexer;
257 indexer->abort = abort_indexer;
258
259 toku_mutex_init(
260 *indexer_i_indexer_lock_mutex_key, &indexer->i->indexer_lock, nullptr);
261 toku_mutex_init(*indexer_i_indexer_estimate_lock_mutex_key,
262 &indexer->i->indexer_estimate_lock,
263 nullptr);
264 toku_init_dbt(&indexer->i->position_estimate);
265
266 //
267 // create and close a dummy loader to get redirection going for the hot
268 // indexer
269 // This way, if the hot index aborts, but other transactions have references
270 // to the
271 // underlying FT, then those transactions can do dummy operations on the FT
272 // while the DB gets redirected back to an empty dictionary
273 //
274 {
275 DB_LOADER* loader = NULL;
276 rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true);
277 if (rval) {
278 goto create_exit;
279 }
280 rval = loader->close(loader);
281 if (rval) {
282 goto create_exit;
283 }
284 }
285
286 // create and initialize the leafentry cursor
287 rval = toku_le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->ft_handle, db_txn_struct_i(txn)->tokutxn);
288 if ( !indexer->i->lec ) { goto create_exit; }
289
290 // 2954: add recovery and rollback entries
291 LSN hot_index_lsn; // not used (yet)
292 TOKUTXN ttxn;
293 ttxn = db_txn_struct_i(txn)->tokutxn;
294 FILENUMS filenums;
295 filenums = indexer->i->filenums;
296 toku_multi_operation_client_lock();
297 toku_ft_hot_index(NULL, ttxn, filenums, 1, &hot_index_lsn);
298 toku_multi_operation_client_unlock();
299
300 if (rval == 0) {
301 rval = associate_indexer_with_hot_dbs(indexer, dest_dbs, N);
302 }
303create_exit:
304 if ( rval == 0 ) {
305
306 indexer_undo_do_init(indexer);
307
308 *indexerp = indexer;
309
310 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE), 1);
311 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CURRENT), 1);
312 if ( STATUS_VALUE(INDEXER_CURRENT) > STATUS_VALUE(INDEXER_MAX) )
313 STATUS_VALUE(INDEXER_MAX) = STATUS_VALUE(INDEXER_CURRENT); // NOT WORTH A LOCK TO MAKE THREADSAFE), may be inaccurate
314
315 } else {
316 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE_FAIL), 1);
317 free_indexer(indexer);
318 }
319
320 return rval;
321}
322
323int
324toku_indexer_set_poll_function(DB_INDEXER *indexer,
325 int (*poll_func)(void *poll_extra,
326 float progress),
327 void *poll_extra)
328{
329 invariant(indexer != NULL);
330 indexer->i->poll_func = poll_func;
331 indexer->i->poll_extra = poll_extra;
332 return 0;
333}
334
335int
336toku_indexer_set_error_callback(DB_INDEXER *indexer,
337 void (*error_cb)(DB *db, int i, int err,
338 DBT *key, DBT *val,
339 void *error_extra),
340 void *error_extra)
341{
342 invariant(indexer != NULL);
343 indexer->i->error_callback = error_cb;
344 indexer->i->error_extra = error_extra;
345 return 0;
346}
347
348// a key is to the right of the indexer's cursor if it compares
349// greater than the current le cursor position.
350bool
351toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) {
352 // the hot indexer runs from the end to the beginning, it gets the largest keys first
353 //
354 // if key is less than indexer's position, then we should NOT insert it because
355 // the indexer will get to it. If it is greater or equal, that means the indexer
356 // has already processed the key, and will not get to it, therefore, we need
357 // to handle it
358 return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key);
359}
360
361// initialize provisional info by allocating enough space to hold provisional
362// ids, states, and txns for each of the provisional entries in the ule. the
363// ule and le remain owned by the caller, not this struct.
364static void
365ule_prov_info_init(struct ule_prov_info *prov_info, const void* key, uint32_t keylen, LEAFENTRY le, ULEHANDLE ule) {
366 prov_info->le = le;
367 prov_info->ule = ule;
368 prov_info->keylen = keylen;
369 prov_info->key = toku_xmalloc(keylen);
370 memcpy(prov_info->key, key, keylen);
371 prov_info->num_provisional = ule_get_num_provisional(ule);
372 prov_info->num_committed = ule_get_num_committed(ule);
373 uint32_t n = prov_info->num_provisional;
374 if (n > 0) {
375 XMALLOC_N(n, prov_info->prov_ids);
376 XMALLOC_N(n, prov_info->prov_states);
377 XMALLOC_N(n, prov_info->prov_txns);
378 }
379}
380
381// clean up anything possibly created by ule_prov_info_init()
382static void
383ule_prov_info_destroy(struct ule_prov_info *prov_info) {
384 if (prov_info->num_provisional > 0) {
385 toku_free(prov_info->prov_ids);
386 toku_free(prov_info->prov_states);
387 toku_free(prov_info->prov_txns);
388 } else {
389 // nothing to free if there was nothing provisional
390 invariant(prov_info->prov_ids == NULL);
391 invariant(prov_info->prov_states == NULL);
392 invariant(prov_info->prov_txns == NULL);
393 }
394}
395
396static void
397indexer_fill_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
398 ULEHANDLE ule = prov_info->ule;
399 uint32_t num_provisional = prov_info->num_provisional;
400 uint32_t num_committed = prov_info->num_committed;
401 TXNID *prov_ids = prov_info->prov_ids;
402 TOKUTXN_STATE *prov_states = prov_info->prov_states;
403 TOKUTXN *prov_txns = prov_info->prov_txns;
404
405 // don't both grabbing the txn manager lock if we don't
406 // have any provisional txns to record
407 if (num_provisional == 0) {
408 return;
409 }
410
411 // handle test case first
412 if (indexer->i->test_xid_state) {
413 for (uint32_t i = 0; i < num_provisional; i++) {
414 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
415 prov_ids[i] = uxr_get_txnid(uxr);
416 prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
417 prov_txns[i] = NULL;
418 }
419 return;
420 }
421
422 // hold the txn manager lock while we inspect txn state
423 // and pin some live txns
424 DB_ENV *env = indexer->i->env;
425 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
426 TXNID parent_xid = uxr_get_txnid(ule_get_uxr(ule, num_committed));
427
428 // let's first initialize things to defaults
429 for (uint32_t i = 0; i < num_provisional; i++) {
430 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
431 prov_ids[i] = uxr_get_txnid(uxr);
432 prov_txns[i] = NULL;
433 prov_states[i] = TOKUTXN_RETIRED;
434 }
435
436 toku_txn_manager_suspend(txn_manager);
437 TXNID_PAIR root_xid_pair = {.parent_id64=parent_xid, .child_id64 = TXNID_NONE};
438 TOKUTXN root_txn = NULL;
439 toku_txn_manager_id2txn_unlocked(
440 txn_manager,
441 root_xid_pair,
442 &root_txn
443 );
444 if (root_txn == NULL) {
445 toku_txn_manager_resume(txn_manager);
446 return; //everything is retired in this case, the default
447 }
448 prov_txns[0] = root_txn;
449 prov_states[0] = toku_txn_get_state(root_txn);
450 toku_txn_lock_state(root_txn);
451 prov_states[0] = toku_txn_get_state(root_txn);
452 if (prov_states[0] == TOKUTXN_LIVE || prov_states[0] == TOKUTXN_PREPARING) {
453 // pin this live txn so it can't commit or abort until we're done with it
454 toku_txn_pin_live_txn_unlocked(root_txn);
455 }
456 toku_txn_unlock_state(root_txn);
457
458 root_txn->child_manager->suspend();
459 for (uint32_t i = 1; i < num_provisional; i++) {
460 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
461 TXNID child_id = uxr_get_txnid(uxr);
462 TOKUTXN txn = NULL;
463
464 TXNID_PAIR txnid_pair = {.parent_id64 = parent_xid, .child_id64 = child_id};
465 root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid_pair, &txn);
466 prov_txns[i] = txn;
467 if (txn) {
468 toku_txn_lock_state(txn);
469 prov_states[i] = toku_txn_get_state(txn);
470 if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
471 // pin this live txn so it can't commit or abort until we're done with it
472 toku_txn_pin_live_txn_unlocked(txn);
473 }
474 toku_txn_unlock_state(txn);
475 }
476 else {
477 prov_states[i] = TOKUTXN_RETIRED;
478 }
479 }
480 root_txn->child_manager->resume();
481 toku_txn_manager_resume(txn_manager);
482}
483
484struct le_cursor_extra {
485 DB_INDEXER *indexer;
486 struct ule_prov_info *prov_info;
487};
488
489// cursor callback, so its synchronized with other db operations using
490// cachetable pair locks. because no txn can commit on this db, read
491// the provisional info for the newly read ule.
492static int
493le_cursor_callback(uint32_t keylen, const void *key, uint32_t UU(vallen), const void *val, void *extra, bool lock_only) {
494 if (lock_only || val == NULL) {
495 ; // do nothing if only locking. do nothing if val==NULL, means DB_NOTFOUND
496 } else {
497 struct le_cursor_extra *CAST_FROM_VOIDP(cursor_extra, extra);
498 struct ule_prov_info *prov_info = cursor_extra->prov_info;
499 // the val here is a leafentry. ule_create does not copy the entire
500 // contents of the leafentry it is given into its own buffers, so we
501 // must allocate space for a leafentry and keep it around with the ule.
502 LEAFENTRY CAST_FROM_VOIDP(le, toku_xmemdup(val, vallen));
503 ULEHANDLE ule = toku_ule_create(le);
504 invariant(ule);
505 // when we initialize prov info, we also pass in the leafentry and ule
506 // pointers so the caller can access them later. it's their job to free
507 // them when they're not needed.
508 ule_prov_info_init(prov_info, key, keylen, le, ule);
509 indexer_fill_prov_info(cursor_extra->indexer, prov_info);
510 }
511 return 0;
512}
513
514// get the next ule and fill out its provisional info in the
515// prov_info struct provided. caller is responsible for cleaning
516// up the ule info after it's done.
517static int
518get_next_ule_with_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
519 struct le_cursor_extra extra = {
520 .indexer = indexer,
521 .prov_info = prov_info,
522 };
523 int r = toku_le_cursor_next(indexer->i->lec, le_cursor_callback, &extra);
524 return r;
525}
526
527static int
528build_index(DB_INDEXER *indexer) {
529 int result = 0;
530
531 bool done = false;
532 for (uint64_t loop_count = 0; !done; loop_count++) {
533
534 toku_indexer_lock(indexer);
535 // grab the multi operation lock because we will be injecting messages
536 // grab it here because we must hold it before
537 // trying to pin any live transactions, as discovered by #5775
538 toku_multi_operation_client_lock();
539
540 // grab the next leaf entry and get its provisional info. we'll
541 // need the provisional info for the undo-do algorithm, and we get
542 // it here so it can be read atomically with respect to txn commit
543 // and abort. the atomicity comes from the root-to-leaf path pinned
544 // by the query and in the getf callback function
545 //
546 // this allocates space for the prov info, so we have to destroy it
547 // when we're done.
548 struct ule_prov_info prov_info;
549 memset(&prov_info, 0, sizeof(prov_info));
550 result = get_next_ule_with_prov_info(indexer, &prov_info);
551
552 if (result != 0) {
553 invariant(prov_info.ule == NULL);
554 done = true;
555 if (result == DB_NOTFOUND) {
556 result = 0; // all done, normal way to exit loop successfully
557 }
558 }
559 else {
560 invariant(prov_info.le);
561 invariant(prov_info.ule);
562 for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
563 DB *db = indexer->i->dest_dbs[which_db];
564 DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
565 DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
566 result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals);
567 if ((result != 0) && (indexer->i->error_callback != NULL)) {
568 // grab the key and call the error callback
569 DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
570 toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL);
571 indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
572 toku_destroy_dbt(&key);
573 }
574 }
575 // the leafentry and ule are not owned by the prov_info,
576 // and are still our responsibility to free
577 toku_free(prov_info.le);
578 toku_free(prov_info.key);
579 toku_ule_free(prov_info.ule);
580 }
581
582 toku_multi_operation_client_unlock();
583 toku_indexer_unlock(indexer);
584 ule_prov_info_destroy(&prov_info);
585
586 if (result == 0) {
587 result = maybe_call_poll_func(indexer, loop_count);
588 }
589 if (result != 0) {
590 done = true;
591 }
592 }
593
594 // post index creation cleanup
595 // - optimize?
596 // - garbage collect?
597 // - unique checks?
598
599 if ( result == 0 ) {
600 // Perform a checkpoint so that all of the indexing makes it to disk before continuing.
601 // Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log.
602 DB_ENV *env = indexer->i->env;
603 CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
604 toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT);
605 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1);
606 } else {
607 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
608 }
609
610 return result;
611}
612
613// Clients must not operate on any of the hot dbs concurrently with close
614static int
615close_indexer(DB_INDEXER *indexer) {
616 int r = 0;
617 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
618
619 // Disassociate the indexer from the hot db and free_indexer
620 disassociate_indexer_from_hot_dbs(indexer);
621 free_indexer(indexer);
622
623 if ( r == 0 ) {
624 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE), 1);
625 } else {
626 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE_FAIL), 1);
627 }
628 return r;
629}
630
631// Clients must not operate on any of the hot dbs concurrently with abort
632static int
633abort_indexer(DB_INDEXER *indexer) {
634 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
635 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_ABORT), 1);
636 // Disassociate the indexer from the hot db and free_indexer
637 disassociate_indexer_from_hot_dbs(indexer);
638 free_indexer(indexer);
639 return 0;
640}
641
642
643// derived from the handlerton's estimate_num_rows()
644static int
645update_estimated_rows(DB_INDEXER *indexer) {
646 int error;
647 DB_TXN *txn = NULL;
648 DB_ENV *db_env = indexer->i->env;
649 error = db_env->txn_begin(db_env, 0, &txn, DB_READ_UNCOMMITTED);
650 if (error == 0) {
651 DB_BTREE_STAT64 stats;
652 DB *db = indexer->i->src_db;
653 error = db->stat64(db, txn, &stats);
654 if (error == 0) {
655 indexer->i->estimated_rows = stats.bt_ndata;
656 }
657 txn->commit(txn, 0);
658 }
659 return error;
660}
661
662static int
663maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) {
664 int result = 0;
665 if ( indexer->i->poll_func != NULL && ( loop_count % indexer->i->loop_mod ) == 0 ) {
666 int r __attribute__((unused)) = update_estimated_rows(indexer);
667 // what happens if estimate_rows fails?
668 // - currently does not modify estimate, which is probably sufficient
669 float progress;
670 if ( indexer->i->estimated_rows == 0 || loop_count > indexer->i->estimated_rows)
671 progress = 1.0;
672 else
673 progress = (float)loop_count / (float)indexer->i->estimated_rows;
674 result = indexer->i->poll_func(indexer->i->poll_extra, progress);
675 }
676 return result;
677}
678
679
680// this allows us to force errors under test. Flags are defined in indexer.h
681void
682toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) {
683 invariant(indexer != NULL);
684 indexer->i->test_only_flags = flags;
685}
686
687// this allows us to call the undo do function in tests using
688// a convenience wrapper that gets and destroys the ule's prov info
689static int
690test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule) {
691 int which_db;
692 for (which_db = 0; which_db < indexer->i->N; which_db++) {
693 if (indexer->i->dest_dbs[which_db] == hotdb) {
694 break;
695 }
696 }
697 if (which_db == indexer->i->N) {
698 return EINVAL;
699 }
700 struct ule_prov_info prov_info;
701 memset(&prov_info, 0, sizeof(prov_info));
702 // pass null for the leafentry - we don't need it, neither does the info
703 ule_prov_info_init(&prov_info, key->data, key->size, NULL, ule); // mallocs prov_info->key, owned by this function
704 indexer_fill_prov_info(indexer, &prov_info);
705 DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
706 DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
707 int r = indexer_undo_do(indexer, hotdb, &prov_info, hot_keys, hot_vals);
708 toku_free(prov_info.key);
709 ule_prov_info_destroy(&prov_info);
710 return r;
711}
712
713DB *
714toku_indexer_get_src_db(DB_INDEXER *indexer) {
715 return indexer->i->src_db;
716}
717
718
719#undef STATUS_VALUE
720
721