1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | /* |
40 | * The loader |
41 | */ |
42 | |
43 | #include <toku_portability.h> |
44 | #include <portability/toku_atomic.h> |
45 | #include <stdio.h> |
46 | #include <string.h> |
47 | |
48 | #include <ft/ft.h> |
49 | #include <ft/loader/loader.h> |
50 | #include <ft/cachetable/checkpoint.h> |
51 | |
52 | #include "ydb-internal.h" |
53 | #include "ydb_db.h" |
54 | #include "ydb_load.h" |
55 | |
56 | #include "loader.h" |
57 | #include <util/status.h> |
58 | |
59 | enum {MAX_FILE_SIZE=256}; |
60 | |
61 | /////////////////////////////////////////////////////////////////////////////////// |
62 | // Engine status |
63 | // |
64 | // Status is intended for display to humans to help understand system behavior. |
65 | // It does not need to be perfectly thread-safe. |
66 | |
67 | static LOADER_STATUS_S loader_status; |
68 | |
69 | #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(loader_status, k, c, t, "loader: " l, inc) |
70 | |
71 | static void |
72 | status_init(void) { |
73 | // Note, this function initializes the keyname, type, and legend fields. |
74 | // Value fields are initialized to zero by compiler. |
75 | STATUS_INIT(LOADER_CREATE, LOADER_NUM_CREATED, UINT64, "number of loaders successfully created" , TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); |
76 | STATUS_INIT(LOADER_CREATE_FAIL, nullptr, UINT64, "number of calls to toku_loader_create_loader() that failed" , TOKU_ENGINE_STATUS); |
77 | STATUS_INIT(LOADER_PUT, nullptr, UINT64, "number of calls to loader->put() succeeded" , TOKU_ENGINE_STATUS); |
78 | STATUS_INIT(LOADER_PUT_FAIL, nullptr, UINT64, "number of calls to loader->put() failed" , TOKU_ENGINE_STATUS); |
79 | STATUS_INIT(LOADER_CLOSE, nullptr, UINT64, "number of calls to loader->close() that succeeded" , TOKU_ENGINE_STATUS); |
80 | STATUS_INIT(LOADER_CLOSE_FAIL, nullptr, UINT64, "number of calls to loader->close() that failed" , TOKU_ENGINE_STATUS); |
81 | STATUS_INIT(LOADER_ABORT, nullptr, UINT64, "number of calls to loader->abort()" , TOKU_ENGINE_STATUS); |
82 | STATUS_INIT(LOADER_CURRENT, LOADER_NUM_CURRENT, UINT64, "number of loaders currently in existence" , TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); |
83 | STATUS_INIT(LOADER_MAX, LOADER_NUM_MAX, UINT64, "max number of loaders that ever existed simultaneously" , TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); |
84 | loader_status.initialized = true; |
85 | } |
86 | #undef STATUS_INIT |
87 | |
88 | void |
89 | toku_loader_get_status(LOADER_STATUS statp) { |
90 | if (!loader_status.initialized) |
91 | status_init(); |
92 | *statp = loader_status; |
93 | } |
94 | |
95 | #define STATUS_VALUE(x) loader_status.status[x].value.num |
96 | |
97 | |
98 | struct __toku_loader_internal { |
99 | DB_ENV *env; |
100 | DB_TXN *txn; |
101 | FTLOADER ft_loader; |
102 | int N; |
103 | DB **dbs; /* [N] */ |
104 | DB *src_db; |
105 | uint32_t *db_flags; |
106 | uint32_t *dbt_flags; |
107 | uint32_t loader_flags; |
108 | void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *); |
109 | void *; |
110 | int (*poll_func)(void *, float progress); |
111 | void *; |
112 | char *temp_file_template; |
113 | |
114 | DBT err_key; /* error key */ |
115 | DBT err_val; /* error val */ |
116 | int err_i; /* error i */ |
117 | int err_errno; |
118 | |
119 | char **inames_in_env; /* [N] inames of new files to be created */ |
120 | }; |
121 | |
122 | static void free_inames(char **inames, int n) { |
123 | for (int i = 0; i < n; i++) { |
124 | toku_free(inames[i]); |
125 | } |
126 | toku_free(inames); |
127 | } |
128 | |
129 | /* |
130 | * free_loader_resources() frees all of the resources associated with |
131 | * struct __toku_loader_internal |
132 | * assumes any previously freed items set the field pointer to NULL |
133 | * Requires that the ft_loader is closed or destroyed before calling this function. |
134 | */ |
135 | static void free_loader_resources(DB_LOADER *loader) |
136 | { |
137 | if ( loader->i ) { |
138 | toku_destroy_dbt(&loader->i->err_key); |
139 | toku_destroy_dbt(&loader->i->err_val); |
140 | |
141 | if (loader->i->inames_in_env) { |
142 | free_inames(loader->i->inames_in_env, loader->i->N); |
143 | loader->i->inames_in_env = nullptr; |
144 | } |
145 | toku_free(loader->i->temp_file_template); |
146 | loader->i->temp_file_template = nullptr; |
147 | |
148 | // loader->i |
149 | toku_free(loader->i); |
150 | loader->i = nullptr; |
151 | } |
152 | } |
153 | |
154 | static void free_loader(DB_LOADER *loader) |
155 | { |
156 | if ( loader ) free_loader_resources(loader); |
157 | toku_free(loader); |
158 | } |
159 | |
160 | static const char *loader_temp_prefix = "tokuld" ; // #2536 |
161 | static const char *loader_temp_suffix = "XXXXXX" ; |
162 | |
163 | static int ft_loader_close_and_redirect(DB_LOADER *loader) { |
164 | int r; |
165 | // use the bulk loader |
166 | // in case you've been looking - here is where the real work is done! |
167 | r = toku_ft_loader_close(loader->i->ft_loader, |
168 | loader->i->error_callback, loader->i->error_extra, |
169 | loader->i->poll_func, loader->i->poll_extra); |
170 | if ( r==0 ) { |
171 | for (int i=0; i<loader->i->N; i++) { |
172 | toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect. |
173 | r = toku_dictionary_redirect(loader->i->inames_in_env[i], |
174 | loader->i->dbs[i]->i->ft_handle, |
175 | db_txn_struct_i(loader->i->txn)->tokutxn); |
176 | toku_multi_operation_client_unlock(); |
177 | if ( r!=0 ) break; |
178 | } |
179 | } |
180 | return r; |
181 | } |
182 | |
183 | |
184 | // loader_flags currently has the following flags: |
185 | // LOADER_DISALLOW_PUTS loader->put is not allowed. |
186 | // Loader is only being used for its side effects |
187 | // DB_PRELOCKED_WRITE Table lock is already held, no need to relock. |
188 | int |
189 | toku_loader_create_loader(DB_ENV *env, |
190 | DB_TXN *txn, |
191 | DB_LOADER **blp, |
192 | DB *src_db, |
193 | int N, |
194 | DB *dbs[], |
195 | uint32_t db_flags[/*N*/], |
196 | uint32_t dbt_flags[/*N*/], |
197 | uint32_t loader_flags, |
198 | bool check_empty) { |
199 | int rval; |
200 | HANDLE_READ_ONLY_TXN(txn); |
201 | DB_TXN *loader_txn = nullptr; |
202 | |
203 | *blp = NULL; // set later when created |
204 | |
205 | DB_LOADER *loader = NULL; |
206 | bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS); |
207 | bool compress_intermediates = (loader_flags & LOADER_COMPRESS_INTERMEDIATES) != 0; |
208 | XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) |
209 | XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) |
210 | |
211 | loader->i->env = env; |
212 | loader->i->txn = txn; |
213 | loader->i->N = N; |
214 | loader->i->dbs = dbs; |
215 | loader->i->src_db = src_db; |
216 | loader->i->db_flags = db_flags; |
217 | loader->i->dbt_flags = dbt_flags; |
218 | loader->i->loader_flags = loader_flags; |
219 | loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE); |
220 | |
221 | int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/%s%s" , env->i->real_tmp_dir, loader_temp_prefix, loader_temp_suffix); |
222 | if ( !(n>0 && n<MAX_FILE_SIZE) ) { |
223 | rval = ENAMETOOLONG; |
224 | goto create_exit; |
225 | } |
226 | |
227 | toku_init_dbt(&loader->i->err_key); |
228 | toku_init_dbt(&loader->i->err_val); |
229 | loader->i->err_i = 0; |
230 | loader->i->err_errno = 0; |
231 | |
232 | loader->set_error_callback = toku_loader_set_error_callback; |
233 | loader->set_poll_function = toku_loader_set_poll_function; |
234 | loader->put = toku_loader_put; |
235 | loader->close = toku_loader_close; |
236 | loader->abort = toku_loader_abort; |
237 | |
238 | // lock tables and check empty |
239 | for(int i=0;i<N;i++) { |
240 | if (!(loader_flags&DB_PRELOCKED_WRITE)) { |
241 | rval = toku_db_pre_acquire_table_lock(dbs[i], txn); |
242 | if (rval!=0) { |
243 | goto create_exit; |
244 | } |
245 | } |
246 | if (check_empty) { |
247 | bool empty = toku_ft_is_empty_fast(dbs[i]->i->ft_handle); |
248 | if (!empty) { |
249 | rval = ENOTEMPTY; |
250 | goto create_exit; |
251 | } |
252 | } |
253 | } |
254 | |
255 | { |
256 | if (env->i->open_flags & DB_INIT_TXN) { |
257 | rval = env->txn_begin(env, txn, &loader_txn, 0); |
258 | if (rval) { |
259 | goto create_exit; |
260 | } |
261 | } |
262 | |
263 | ft_compare_func compare_functions[N]; |
264 | for (int i=0; i<N; i++) { |
265 | compare_functions[i] = env->i->bt_compare; |
266 | } |
267 | |
268 | // time to open the big kahuna |
269 | char **XMALLOC_N(N, new_inames_in_env); |
270 | for (int i = 0; i < N; i++) { |
271 | new_inames_in_env[i] = nullptr; |
272 | } |
273 | FT_HANDLE *XMALLOC_N(N, fts); |
274 | for (int i=0; i<N; i++) { |
275 | fts[i] = dbs[i]->i->ft_handle; |
276 | } |
277 | LSN load_lsn; |
278 | rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed); |
279 | if ( rval!=0 ) { |
280 | free_inames(new_inames_in_env, N); |
281 | toku_free(fts); |
282 | goto create_exit; |
283 | } |
284 | TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL; |
285 | rval = toku_ft_loader_open(&loader->i->ft_loader, |
286 | env->i->cachetable, |
287 | env->i->generate_row_for_put, |
288 | src_db, |
289 | N, |
290 | fts, dbs, |
291 | (const char **)new_inames_in_env, |
292 | compare_functions, |
293 | loader->i->temp_file_template, |
294 | load_lsn, |
295 | ttxn, |
296 | puts_allowed, |
297 | env->get_loader_memory_size(env), |
298 | compress_intermediates, |
299 | puts_allowed); |
300 | if ( rval!=0 ) { |
301 | free_inames(new_inames_in_env, N); |
302 | toku_free(fts); |
303 | goto create_exit; |
304 | } |
305 | |
306 | loader->i->inames_in_env = new_inames_in_env; |
307 | toku_free(fts); |
308 | |
309 | if (!puts_allowed) { |
310 | rval = ft_loader_close_and_redirect(loader); |
311 | assert_zero(rval); |
312 | loader->i->ft_loader = NULL; |
313 | // close the ft_loader and skip to the redirection |
314 | rval = 0; |
315 | } |
316 | |
317 | rval = loader_txn->commit(loader_txn, 0); |
318 | assert_zero(rval); |
319 | loader_txn = nullptr; |
320 | |
321 | rval = 0; |
322 | } |
323 | *blp = loader; |
324 | create_exit: |
325 | if (loader_txn) { |
326 | int r = loader_txn->abort(loader_txn); |
327 | assert_zero(r); |
328 | loader_txn = nullptr; |
329 | } |
330 | if (rval == 0) { |
331 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE), 1); |
332 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CURRENT), 1); |
333 | if (STATUS_VALUE(LOADER_CURRENT) > STATUS_VALUE(LOADER_MAX) ) |
334 | STATUS_VALUE(LOADER_MAX) = STATUS_VALUE(LOADER_CURRENT); // not worth a lock to make threadsafe, may be inaccurate |
335 | } |
336 | else { |
337 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CREATE_FAIL), 1); |
338 | free_loader(loader); |
339 | } |
340 | return rval; |
341 | } |
342 | |
343 | int toku_loader_set_poll_function(DB_LOADER *loader, |
344 | int (*poll_func)(void *, float progress), |
345 | void *) |
346 | { |
347 | invariant(loader != NULL); |
348 | loader->i->poll_func = poll_func; |
349 | loader->i->poll_extra = poll_extra; |
350 | return 0; |
351 | } |
352 | |
353 | int toku_loader_set_error_callback(DB_LOADER *loader, |
354 | void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *), |
355 | void *) |
356 | { |
357 | invariant(loader != NULL); |
358 | loader->i->error_callback = error_cb; |
359 | loader->i->error_extra = error_extra; |
360 | return 0; |
361 | } |
362 | |
363 | int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) |
364 | { |
365 | int r = 0; |
366 | int i = 0; |
367 | // err_i is unused now( always 0). How would we know which dictionary |
368 | // the error happens in? (put_multiple and toku_ft_loader_put do NOT report |
369 | // which dictionary). |
370 | |
371 | // skip put if error already found |
372 | if ( loader->i->err_errno != 0 ) { |
373 | r = -1; |
374 | goto cleanup; |
375 | } |
376 | |
377 | if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) { |
378 | r = EINVAL; |
379 | goto cleanup; |
380 | } |
381 | else { |
382 | // calling toku_ft_loader_put without a lock assumes that the |
383 | // handlerton is guaranteeing single access to the loader |
384 | // future multi-threaded solutions may need to protect this call |
385 | r = toku_ft_loader_put(loader->i->ft_loader, key, val); |
386 | } |
387 | if ( r != 0 ) { |
388 | // spec says errors all happen on close |
389 | // - have to save key, val, errno (r) and i for duplicate callback |
390 | toku_clone_dbt(&loader->i->err_key, *key); |
391 | toku_clone_dbt(&loader->i->err_val, *val); |
392 | |
393 | loader->i->err_i = i; |
394 | loader->i->err_errno = r; |
395 | |
396 | // deliberately return content free value |
397 | // - must call error_callback to get error info |
398 | r = -1; |
399 | } |
400 | cleanup: |
401 | if (r==0) |
402 | STATUS_VALUE(LOADER_PUT)++; // executed too often to be worth making threadsafe |
403 | else |
404 | STATUS_VALUE(LOADER_PUT_FAIL)++; |
405 | return r; |
406 | } |
407 | |
408 | static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) { |
409 | DB_LOADER* tmp_loader = NULL; |
410 | int r = toku_loader_create_loader( |
411 | loader->i->env, |
412 | loader->i->txn, |
413 | &tmp_loader, |
414 | loader->i->src_db, |
415 | loader->i->N, |
416 | loader->i->dbs, |
417 | loader->i->db_flags, |
418 | loader->i->dbt_flags, |
419 | LOADER_DISALLOW_PUTS, |
420 | false |
421 | ); |
422 | lazy_assert_zero(r); |
423 | r = toku_loader_close(tmp_loader); |
424 | } |
425 | |
426 | int toku_loader_close(DB_LOADER *loader) |
427 | { |
428 | (void) toku_sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1); |
429 | int r=0; |
430 | if ( loader->i->err_errno != 0 ) { |
431 | if ( loader->i->error_callback != NULL ) { |
432 | loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); |
433 | } |
434 | if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) { |
435 | r = toku_ft_loader_abort(loader->i->ft_loader, true); |
436 | redirect_loader_to_empty_dictionaries(loader); |
437 | } |
438 | else { |
439 | r = loader->i->err_errno; |
440 | } |
441 | } |
442 | else { // no error outstanding |
443 | if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) { |
444 | r = ft_loader_close_and_redirect(loader); |
445 | if (r) { |
446 | redirect_loader_to_empty_dictionaries(loader); |
447 | } |
448 | } |
449 | } |
450 | free_loader(loader); |
451 | if (r==0) |
452 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE), 1); |
453 | else |
454 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_CLOSE_FAIL), 1); |
455 | return r; |
456 | } |
457 | |
458 | int toku_loader_abort(DB_LOADER *loader) |
459 | { |
460 | (void) toku_sync_fetch_and_sub(&STATUS_VALUE(LOADER_CURRENT), 1); |
461 | (void) toku_sync_fetch_and_add(&STATUS_VALUE(LOADER_ABORT), 1); |
462 | int r=0; |
463 | if ( loader->i->err_errno != 0 ) { |
464 | if ( loader->i->error_callback != NULL ) { |
465 | loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); |
466 | } |
467 | } |
468 | |
469 | if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS) ) { |
470 | r = toku_ft_loader_abort(loader->i->ft_loader, true); |
471 | lazy_assert_zero(r); |
472 | } |
473 | |
474 | redirect_loader_to_empty_dictionaries(loader); |
475 | free_loader(loader); |
476 | return r; |
477 | } |
478 | |
479 | |
480 | // find all of the files in the environments home directory that match the loader temp name and remove them |
481 | int toku_loader_cleanup_temp_files(DB_ENV *env) { |
482 | int result; |
483 | struct dirent *de; |
484 | char * dir = env->i->real_tmp_dir; |
485 | DIR *d = opendir(dir); |
486 | if (d==0) { |
487 | result = get_error_errno(); goto exit; |
488 | } |
489 | |
490 | result = 0; |
491 | while ((de = readdir(d))) { |
492 | int r = memcmp(de->d_name, loader_temp_prefix, strlen(loader_temp_prefix)); |
493 | if (r == 0 && strlen(de->d_name) == strlen(loader_temp_prefix) + strlen(loader_temp_suffix)) { |
494 | int fnamelen = strlen(dir) + 1 + strlen(de->d_name) + 1; // One for the slash and one for the trailing NUL. |
495 | char fname[fnamelen]; |
496 | int l = snprintf(fname, fnamelen, "%s/%s" , dir, de->d_name); |
497 | assert(l+1 == fnamelen); |
498 | r = unlink(fname); |
499 | if (r!=0) { |
500 | result = get_error_errno(); |
501 | perror("Trying to delete a rolltmp file" ); |
502 | } |
503 | } |
504 | } |
505 | { |
506 | int r = closedir(d); |
507 | if (r == -1) |
508 | result = get_error_errno(); |
509 | } |
510 | |
511 | exit: |
512 | return result; |
513 | } |
514 | |
515 | |
516 | |
517 | #undef STATUS_VALUE |
518 | |
519 | |