1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39/*
40 * The loader
41 */
42
43#include <toku_portability.h>
44#include <portability/toku_atomic.h>
45#include <stdio.h>
46#include <string.h>
47
48#include <ft/ft.h>
49#include <ft/loader/loader.h>
50#include <ft/cachetable/checkpoint.h>
51
52#include "ydb-internal.h"
53#include "ydb_db.h"
54#include "ydb_load.h"
55
56#include "loader.h"
57#include <util/status.h>
58
59enum {MAX_FILE_SIZE=256};
60
61///////////////////////////////////////////////////////////////////////////////////
62// Engine status
63//
64// Status is intended for display to humans to help understand system behavior.
65// It does not need to be perfectly thread-safe.
66
67static LOADER_STATUS_S loader_status;
68
69#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(loader_status, k, c, t, "loader: " l, inc)
70
71static void
72status_init(void) {
73 // Note, this function initializes the keyname, type, and legend fields.
74 // Value fields are initialized to zero by compiler.
75 STATUS_INIT(LOADER_CREATE, LOADER_NUM_CREATED, UINT64, "number of loaders successfully created", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
76 STATUS_INIT(LOADER_CREATE_FAIL, nullptr, UINT64, "number of calls to toku_loader_create_loader() that failed", TOKU_ENGINE_STATUS);
77 STATUS_INIT(LOADER_PUT, nullptr, UINT64, "number of calls to loader->put() succeeded", TOKU_ENGINE_STATUS);
78 STATUS_INIT(LOADER_PUT_FAIL, nullptr, UINT64, "number of calls to loader->put() failed", TOKU_ENGINE_STATUS);
79 STATUS_INIT(LOADER_CLOSE, nullptr, UINT64, "number of calls to loader->close() that succeeded", TOKU_ENGINE_STATUS);
80 STATUS_INIT(LOADER_CLOSE_FAIL, nullptr, UINT64, "number of calls to loader->close() that failed", TOKU_ENGINE_STATUS);
81 STATUS_INIT(LOADER_ABORT, nullptr, UINT64, "number of calls to loader->abort()", TOKU_ENGINE_STATUS);
82 STATUS_INIT(LOADER_CURRENT, LOADER_NUM_CURRENT, UINT64, "number of loaders currently in existence", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
83 STATUS_INIT(LOADER_MAX, LOADER_NUM_MAX, UINT64, "max number of loaders that ever existed simultaneously", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
84 loader_status.initialized = true;
85}
86#undef STATUS_INIT
87
88void
89toku_loader_get_status(LOADER_STATUS statp) {
90 if (!loader_status.initialized)
91 status_init();
92 *statp = loader_status;
93}
94
95#define STATUS_VALUE(x) loader_status.status[x].value.num
96
97
98struct __toku_loader_internal {
99 DB_ENV *env;
100 DB_TXN *txn;
101 FTLOADER ft_loader;
102 int N;
103 DB **dbs; /* [N] */
104 DB *src_db;
105 uint32_t *db_flags;
106 uint32_t *dbt_flags;
107 uint32_t loader_flags;
108 void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
109 void *error_extra;
110 int (*poll_func)(void *poll_extra, float progress);
111 void *poll_extra;
112 char *temp_file_template;
113
114 DBT err_key; /* error key */
115 DBT err_val; /* error val */
116 int err_i; /* error i */
117 int err_errno;
118
119 char **inames_in_env; /* [N] inames of new files to be created */
120};
121
122static void free_inames(char **inames, int n) {
123 for (int i = 0; i < n; i++) {
124 toku_free(inames[i]);
125 }
126 toku_free(inames);
127}
128
129/*
130 * free_loader_resources() frees all of the resources associated with
131 * struct __toku_loader_internal
132 * assumes any previously freed items set the field pointer to NULL
133 * Requires that the ft_loader is closed or destroyed before calling this function.
134 */
135static void free_loader_resources(DB_LOADER *loader)
136{
137 if ( loader->i ) {
138 toku_destroy_dbt(&loader->i->err_key);
139 toku_destroy_dbt(&loader->i->err_val);
140
141 if (loader->i->inames_in_env) {
142 free_inames(loader->i->inames_in_env, loader->i->N);
143 loader->i->inames_in_env = nullptr;
144 }
145 toku_free(loader->i->temp_file_template);
146 loader->i->temp_file_template = nullptr;
147
148 // loader->i
149 toku_free(loader->i);
150 loader->i = nullptr;
151 }
152}
153
154static void free_loader(DB_LOADER *loader)
155{
156 if ( loader ) free_loader_resources(loader);
157 toku_free(loader);
158}
159
160static const char *loader_temp_prefix = "tokuld"; // #2536
161static const char *loader_temp_suffix = "XXXXXX";
162
163static int ft_loader_close_and_redirect(DB_LOADER *loader) {
164 int r;
165 // use the bulk loader
166 // in case you've been looking - here is where the real work is done!
167 r = toku_ft_loader_close(loader->i->ft_loader,
168 loader->i->error_callback, loader->i->error_extra,
169 loader->i->poll_func, loader->i->poll_extra);
170 if ( r==0 ) {
171 for (int i=0; i<loader->i->N; i++) {
172 toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
173 r = toku_dictionary_redirect(loader->i->inames_in_env[i],
174 loader->i->dbs[i]->i->ft_handle,
175 db_txn_struct_i(loader->i->txn)->tokutxn);
176 toku_multi_operation_client_unlock();
177 if ( r!=0 ) break;
178 }
179 }
180 return r;
181}
182
183
184// loader_flags currently has the following flags:
185// LOADER_DISALLOW_PUTS loader->put is not allowed.
186// Loader is only being used for its side effects
187// DB_PRELOCKED_WRITE Table lock is already held, no need to relock.
188int
189toku_loader_create_loader(DB_ENV *env,
190 DB_TXN *txn,
191 DB_LOADER **blp,
192 DB *src_db,
193 int N,
194 DB *dbs[],
195 uint32_t db_flags[/*N*/],
196 uint32_t dbt_flags[/*N*/],
197 uint32_t loader_flags,
198 bool check_empty) {
199 int rval;
200 HANDLE_READ_ONLY_TXN(txn);
201 DB_TXN *loader_txn = nullptr;
202
203 *blp = NULL; // set later when created
204
205 DB_LOADER *loader = NULL;
206 bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS);
207 bool compress_intermediates = (loader_flags & LOADER_COMPRESS_INTERMEDIATES) != 0;
208 XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
209 XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
210
211 loader->i->env = env;
212 loader->i->txn = txn;
213 loader->i->N = N;
214 loader->i->dbs = dbs;
215 loader->i->src_db = src_db;
216 loader->i->db_flags = db_flags;
217 loader->i->dbt_flags = dbt_flags;
218 loader->i->loader_flags = loader_flags;
219 loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE);
220
221 int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/%s%s", env->i->real_tmp_dir, loader_temp_prefix, loader_temp_suffix);
222 if ( !(n>0 && n<MAX_FILE_SIZE) ) {
223 rval = ENAMETOOLONG;
224 goto create_exit;
225 }
226
227 toku_init_dbt(&loader->i->err_key);
228 toku_init_dbt(&loader->i->err_val);
229 loader->i->err_i = 0;
230 loader->i->err_errno = 0;
231
232 loader->set_error_callback = toku_loader_set_error_callback;
233 loader->set_poll_function = toku_loader_set_poll_function;
234 loader->put = toku_loader_put;
235 loader->close = toku_loader_close;
236 loader->abort = toku_loader_abort;
237
238 // lock tables and check empty
239 for(int i=0;i<N;i++) {
240 if (!(loader_flags&DB_PRELOCKED_WRITE)) {
241 rval = toku_db_pre_acquire_table_lock(dbs[i], txn);
242 if (rval!=0) {
243 goto create_exit;
244 }
245 }
246 if (check_empty) {
247 bool empty = toku_ft_is_empty_fast(dbs[i]->i->ft_handle);
248 if (!empty) {
249 rval = ENOTEMPTY;
250 goto create_exit;
251 }
252 }
253 }
254
255 {
256 if (env->i->open_flags & DB_INIT_TXN) {
257 rval = env->txn_begin(env, txn, &loader_txn, 0);
258 if (rval) {
259 goto create_exit;
260 }
261 }
262
263 ft_compare_func compare_functions[N];
264 for (int i=0; i<N; i++) {
265 compare_functions[i] = env->i->bt_compare;
266 }
267
268 // time to open the big kahuna
269 char **XMALLOC_N(N, new_inames_in_env);
270 for (int i = 0; i < N; i++) {
271 new_inames_in_env[i] = nullptr;
272 }
273 FT_HANDLE *XMALLOC_N(N, fts);
274 for (int i=0; i<N; i++) {
275 fts[i] = dbs[i]->i->ft_handle;
276 }
277 LSN load_lsn;
278 rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
279 if ( rval!=0 ) {
280 free_inames(new_inames_in_env, N);
281 toku_free(fts);
282 goto create_exit;
283 }
284 TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL;
285 rval = toku_ft_loader_open(&loader->i->ft_loader,
286 env->i->cachetable,
287 env->i->generate_row_for_put,
288 src_db,
289 N,
290 fts, dbs,
291 (const char **)new_inames_in_env,
292 compare_functions,
293 loader->i->temp_file_template,
294 load_lsn,
295 ttxn,
296 puts_allowed,
297 env->get_loader_memory_size(env),
298 compress_intermediates,
299 puts_allowed);
300 if ( rval!=0 ) {
301 free_inames(new_inames_in_env, N);
302 toku_free(fts);
303 goto create_exit;
304 }
305
306 loader->i->inames_in_env = new_inames_in_env;
307 toku_free(fts);
308
309 if (!puts_allowed) {
310 rval = ft_loader_close_and_redirect(loader);
311 assert_zero(rval);
312 loader->i->ft_loader = NULL;
313 // close the ft_loader and skip to the redirection
314 rval = 0;
315 }
316
317 rval = loader_txn->commit(loader_txn, 0);
318 assert_zero(rval);
319 loader_txn = nullptr;
320
321 rval = 0;
322 }
323 *blp = loader;
324 create_exit:
325 if (loader_txn) {
326 int r = loader_txn->abort(loader_txn);
327 assert_zero(r);
328 loader_txn = nullptr;
329 }
330 if (rval == 0) {
331 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1);
332 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1);
333 if (STATUS_VALUE(LOADER_CURRENT) > STATUS_VALUE(LOADER_MAX) )
334 STATUS_VALUE(LOADER_MAX) = STATUS_VALUE(LOADER_CURRENT); // not worth a lock to make threadsafe, may be inaccurate
335 }
336 else {
337 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1);
338 free_loader(loader);
339 }
340 return rval;
341}
342
343int toku_loader_set_poll_function(DB_LOADER *loader,
344 int (*poll_func)(void *extra, float progress),
345 void *poll_extra)
346{
347 invariant(loader != NULL);
348 loader->i->poll_func = poll_func;
349 loader->i->poll_extra = poll_extra;
350 return 0;
351}
352
353int toku_loader_set_error_callback(DB_LOADER *loader,
354 void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra),
355 void *error_extra)
356{
357 invariant(loader != NULL);
358 loader->i->error_callback = error_cb;
359 loader->i->error_extra = error_extra;
360 return 0;
361}
362
363int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
364{
365 int r = 0;
366 int i = 0;
367 // err_i is unused now( always 0). How would we know which dictionary
368 // the error happens in? (put_multiple and toku_ft_loader_put do NOT report
369 // which dictionary).
370
371 // skip put if error already found
372 if ( loader->i->err_errno != 0 ) {
373 r = -1;
374 goto cleanup;
375 }
376
377 if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) {
378 r = EINVAL;
379 goto cleanup;
380 }
381 else {
382 // calling toku_ft_loader_put without a lock assumes that the
383 // handlerton is guaranteeing single access to the loader
384 // future multi-threaded solutions may need to protect this call
385 r = toku_ft_loader_put(loader->i->ft_loader, key, val);
386 }
387 if ( r != 0 ) {
388 // spec says errors all happen on close
389 // - have to save key, val, errno (r) and i for duplicate callback
390 toku_clone_dbt(&loader->i->err_key, *key);
391 toku_clone_dbt(&loader->i->err_val, *val);
392
393 loader->i->err_i = i;
394 loader->i->err_errno = r;
395
396 // deliberately return content free value
397 // - must call error_callback to get error info
398 r = -1;
399 }
400 cleanup:
401 if (r==0)
402 STATUS_VALUE(LOADER_PUT)++; // executed too often to be worth making threadsafe
403 else
404 STATUS_VALUE(LOADER_PUT_FAIL)++;
405 return r;
406}
407
408static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) {
409 DB_LOADER* tmp_loader = NULL;
410 int r = toku_loader_create_loader(
411 loader->i->env,
412 loader->i->txn,
413 &tmp_loader,
414 loader->i->src_db,
415 loader->i->N,
416 loader->i->dbs,
417 loader->i->db_flags,
418 loader->i->dbt_flags,
419 LOADER_DISALLOW_PUTS,
420 false
421 );
422 lazy_assert_zero(r);
423 r = toku_loader_close(tmp_loader);
424}
425
426int toku_loader_close(DB_LOADER *loader)
427{
428 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
429 int r=0;
430 if ( loader->i->err_errno != 0 ) {
431 if ( loader->i->error_callback != NULL ) {
432 loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
433 }
434 if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
435 r = toku_ft_loader_abort(loader->i->ft_loader, true);
436 redirect_loader_to_empty_dictionaries(loader);
437 }
438 else {
439 r = loader->i->err_errno;
440 }
441 }
442 else { // no error outstanding
443 if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
444 r = ft_loader_close_and_redirect(loader);
445 if (r) {
446 redirect_loader_to_empty_dictionaries(loader);
447 }
448 }
449 }
450 free_loader(loader);
451 if (r==0)
452 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE), 1);
453 else
454 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE_FAIL), 1);
455 return r;
456}
457
458int toku_loader_abort(DB_LOADER *loader)
459{
460 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1);
461 (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_ABORT), 1);
462 int r=0;
463 if ( loader->i->err_errno != 0 ) {
464 if ( loader->i->error_callback != NULL ) {
465 loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
466 }
467 }
468
469 if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS) ) {
470 r = toku_ft_loader_abort(loader->i->ft_loader, true);
471 lazy_assert_zero(r);
472 }
473
474 redirect_loader_to_empty_dictionaries(loader);
475 free_loader(loader);
476 return r;
477}
478
479
480// find all of the files in the environments home directory that match the loader temp name and remove them
481int toku_loader_cleanup_temp_files(DB_ENV *env) {
482 int result;
483 struct dirent *de;
484 char * dir = env->i->real_tmp_dir;
485 DIR *d = opendir(dir);
486 if (d==0) {
487 result = get_error_errno(); goto exit;
488 }
489
490 result = 0;
491 while ((de = readdir(d))) {
492 int r = memcmp(de->d_name, loader_temp_prefix, strlen(loader_temp_prefix));
493 if (r == 0 && strlen(de->d_name) == strlen(loader_temp_prefix) + strlen(loader_temp_suffix)) {
494 int fnamelen = strlen(dir) + 1 + strlen(de->d_name) + 1; // One for the slash and one for the trailing NUL.
495 char fname[fnamelen];
496 int l = snprintf(fname, fnamelen, "%s/%s", dir, de->d_name);
497 assert(l+1 == fnamelen);
498 r = unlink(fname);
499 if (r!=0) {
500 result = get_error_errno();
501 perror("Trying to delete a rolltmp file");
502 }
503 }
504 }
505 {
506 int r = closedir(d);
507 if (r == -1)
508 result = get_error_errno();
509 }
510
511exit:
512 return result;
513}
514
515
516
517#undef STATUS_VALUE
518
519