1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of TokuDB
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDB is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22======= */
23
24#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26#include "tokudb_sysvars.h"
27#include "toku_time.h"
28
29namespace tokudb {
30namespace analyze {
31
32class recount_rows_t : public tokudb::background::job_manager_t::job_t {
33public:
34 void* operator new(size_t sz);
35 void operator delete(void* p);
36
37 recount_rows_t(
38 bool user_schedued,
39 THD* thd,
40 TOKUDB_SHARE* share,
41 DB_TXN* txn);
42
43 virtual ~recount_rows_t();
44
45 virtual const char* key();
46 virtual const char* database();
47 virtual const char* table();
48 virtual const char* type();
49 virtual const char* parameters();
50 virtual const char* status();
51
52protected:
53 virtual void on_run();
54
55 virtual void on_destroy();
56
57private:
58 // to be provided by the initiator of recount rows
59 THD* _thd;
60 TOKUDB_SHARE* _share;
61 DB_TXN* _txn;
62 ulonglong _throttle;
63
64 // for recount rows status reporting
65 char _parameters[256];
66 char _status[1024];
67 int _result;
68 ulonglong _recount_start; // in microseconds
69 ulonglong _total_elapsed_time; // in microseconds
70
71 bool _local_txn;
72 ulonglong _rows;
73 ulonglong _deleted_rows;
74 ulonglong _ticks;
75
76 static int analyze_recount_rows_progress(
77 uint64_t count,
78 uint64_t deleted,
79 void* extra);
80 int analyze_recount_rows_progress(uint64_t count, uint64_t deleted);
81};
82
83void* recount_rows_t::operator new(size_t sz) {
84 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
85}
86void recount_rows_t::operator delete(void* p) {
87 tokudb::memory::free(p);
88}
89recount_rows_t::recount_rows_t(
90 bool user_scheduled,
91 THD* thd,
92 TOKUDB_SHARE* share,
93 DB_TXN* txn) :
94 tokudb::background::job_manager_t::job_t(user_scheduled),
95 _share(share),
96 _result(HA_ADMIN_OK),
97 _recount_start(0),
98 _total_elapsed_time(0),
99 _local_txn(false),
100 _rows(0),
101 _deleted_rows(0),
102 _ticks(0) {
103
104 assert_debug(thd != NULL);
105 assert_debug(share != NULL);
106
107 if (tokudb::sysvars::analyze_in_background(thd)) {
108 _thd = NULL;
109 _txn = NULL;
110 } else {
111 _thd = thd;
112 _txn = txn;
113 }
114
115 _throttle = tokudb::sysvars::analyze_throttle(thd);
116
117 snprintf(_parameters,
118 sizeof(_parameters),
119 "TOKUDB_ANALYZE_THROTTLE=%llu;",
120 _throttle);
121 _status[0] = '\0';
122}
123recount_rows_t::~recount_rows_t() {
124}
125void recount_rows_t::on_run() {
126 const char* orig_proc_info = NULL;
127 if (_thd)
128 orig_proc_info = tokudb_thd_get_proc_info(_thd);
129 _recount_start = tokudb::time::microsec();
130 _total_elapsed_time = 0;
131
132 if (_txn == NULL) {
133 _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
134
135 if (_result != 0) {
136 _txn = NULL;
137 _result = HA_ADMIN_FAILED;
138 goto error;
139 }
140 _local_txn = true;
141 } else {
142 _local_txn = false;
143 }
144
145 _result =
146 _share->file->recount_rows(
147 _share->file,
148 analyze_recount_rows_progress,
149 this);
150
151 if (_result != 0) {
152 if (_local_txn) {
153 _txn->abort(_txn);
154 _txn = NULL;
155 }
156 _result = HA_ADMIN_FAILED;
157 goto error;
158 }
159
160 DB_BTREE_STAT64 dict_stats;
161 _result = _share->file->stat64(_share->file, _txn, &dict_stats);
162 if (_result == 0) {
163 _share->set_row_count(dict_stats.bt_ndata, false);
164 }
165 if (_result != 0)
166 _result = HA_ADMIN_FAILED;
167
168 if (_local_txn) {
169 if (_result == HA_ADMIN_OK) {
170 _txn->commit(_txn, 0);
171 } else {
172 _txn->abort(_txn);
173 }
174 _txn = NULL;
175 }
176
177 sql_print_information(
178 "tokudb analyze recount rows %d counted %lld",
179 _result,
180 _share->row_count());
181error:
182 if(_thd)
183 tokudb_thd_set_proc_info(_thd, orig_proc_info);
184 return;
185}
186void recount_rows_t::on_destroy() {
187 _share->release();
188}
189const char* recount_rows_t::key() {
190 return _share->full_table_name();
191}
192const char* recount_rows_t::database() {
193 return _share->database_name();
194}
195const char* recount_rows_t::table() {
196 return _share->table_name();
197}
198const char* recount_rows_t::type() {
199 static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS";
200 return type;
201}
202const char* recount_rows_t::parameters() {
203 return _parameters;
204}
205const char* recount_rows_t::status() {
206 return _status;
207}
208int recount_rows_t::analyze_recount_rows_progress(
209 uint64_t count,
210 uint64_t deleted,
211 void* extra) {
212
213 recount_rows_t* context = (recount_rows_t*)extra;
214 return context->analyze_recount_rows_progress(count, deleted);
215}
216int recount_rows_t::analyze_recount_rows_progress(
217 uint64_t count,
218 uint64_t deleted) {
219
220 _rows = count;
221 _deleted_rows += deleted;
222 deleted > 0 ? _ticks += deleted : _ticks++;
223
224 if (_ticks > 1000) {
225 _ticks = 0;
226 uint64_t now = tokudb::time::microsec();
227 _total_elapsed_time = now - _recount_start;
228 if ((_thd && thd_kill_level(_thd)) || cancelled()) {
229 // client killed
230 return ER_ABORTING_CONNECTION;
231 }
232
233 // rebuild status
234 // There is a slight race condition here,
235 // _status is used here for tokudb_thd_set_proc_info and it is also used
236 // for the status column in i_s.background_job_status.
237 // If someone happens to be querying/building the i_s table
238 // at the exact same time that the status is being rebuilt here,
239 // the i_s table could get some garbage status.
240 // This solution is a little heavy handed but it works, it prevents us
241 // from changing the status while someone might be immediately observing
242 // us and it prevents someone from observing us while we change the
243 // status
244 tokudb::background::_job_manager->lock();
245 snprintf(_status,
246 sizeof(_status),
247 "recount_rows %s.%s counted %llu rows and %llu deleted "
248 "in %llu seconds.",
249 _share->database_name(),
250 _share->table_name(),
251 _rows,
252 _deleted_rows,
253 _total_elapsed_time / tokudb::time::MICROSECONDS);
254 tokudb::background::_job_manager->unlock();
255
256 // report
257 if (_thd)
258 tokudb_thd_set_proc_info(_thd, _status);
259
260 // throttle
261 // given the throttle value, lets calculate the maximum number of rows
262 // we should have seen so far in a .1 sec resolution
263 if (_throttle > 0) {
264 uint64_t estimated_rows = _total_elapsed_time / 100000;
265 estimated_rows = estimated_rows * (_throttle / 10);
266 if (_rows + _deleted_rows > estimated_rows) {
267 // sleep for 1/10 of a second
268 tokudb::time::sleep_microsec(100000);
269 }
270 }
271 }
272 return 0;
273}
274
275class standard_t : public tokudb::background::job_manager_t::job_t {
276public:
277 void* operator new(size_t sz);
278 void operator delete(void* p);
279
280 standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn);
281
282 virtual ~standard_t();
283
284 virtual const char* key(void);
285 virtual const char* database();
286 virtual const char* table();
287 virtual const char* type();
288 virtual const char* parameters();
289 virtual const char* status();
290
291protected:
292 virtual void on_run();
293
294 virtual void on_destroy();
295
296private:
297 // to be provided by initiator of analyze
298 THD* _thd;
299 TOKUDB_SHARE* _share;
300 DB_TXN* _txn;
301 ulonglong _throttle; // in microseconds
302 ulonglong _time_limit; // in microseconds
303 double _delete_fraction;
304
305 // for analyze status reporting, may also use other state
306 char _parameters[256];
307 char _status[1024];
308 int _result;
309 ulonglong _analyze_start; // in microseconds
310 ulonglong _total_elapsed_time; // in microseconds
311
312 // for analyze internal use, pretty much these are per-key/index
313 ulonglong _current_key;
314 bool _local_txn;
315 ulonglong _half_time;
316 ulonglong _half_rows;
317 ulonglong _rows;
318 ulonglong _deleted_rows;
319 ulonglong _ticks;
320 ulonglong _analyze_key_start; // in microseconds
321 ulonglong _key_elapsed_time; // in microseconds
322 uint _scan_direction;
323
324 static bool analyze_standard_cursor_callback(
325 void* extra,
326 uint64_t deleted_rows);
327 bool analyze_standard_cursor_callback(uint64_t deleted_rows);
328
329 int analyze_key_progress();
330 int analyze_key(uint64_t* rec_per_key_part);
331};
332
333void* standard_t::operator new(size_t sz) {
334 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
335}
336void standard_t::operator delete(void* p) {
337 tokudb::memory::free(p);
338}
339standard_t::standard_t(
340 bool user_scheduled,
341 THD* thd,
342 TOKUDB_SHARE* share,
343 DB_TXN* txn) :
344 tokudb::background::job_manager_t::job_t(user_scheduled),
345 _share(share),
346 _result(HA_ADMIN_OK),
347 _analyze_start(0),
348 _total_elapsed_time(0),
349 _current_key(0),
350 _local_txn(false),
351 _half_time(0),
352 _half_rows(0),
353 _rows(0),
354 _deleted_rows(0),
355 _ticks(0),
356 _analyze_key_start(0),
357 _key_elapsed_time(0),
358 _scan_direction(0) {
359
360 assert_debug(thd != NULL);
361 assert_debug(share != NULL);
362
363 if (tokudb::sysvars::analyze_in_background(thd)) {
364 _thd = NULL;
365 _txn = NULL;
366 } else {
367 _thd = thd;
368 _txn = txn;
369 }
370 _throttle = tokudb::sysvars::analyze_throttle(thd);
371 _time_limit =
372 tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS;
373 _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd);
374
375 snprintf(_parameters,
376 sizeof(_parameters),
377 "TOKUDB_ANALYZE_DELETE_FRACTION=%f; "
378 "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;",
379 _delete_fraction,
380 _time_limit / tokudb::time::MICROSECONDS,
381 _throttle);
382
383 _status[0] = '\0';
384}
385standard_t::~standard_t() {
386}
387void standard_t::on_run() {
388 DB_BTREE_STAT64 stat64;
389 uint64_t rec_per_key_part[_share->_max_key_parts];
390 uint64_t total_key_parts = 0;
391 const char* orig_proc_info = NULL;
392 if (_thd)
393 orig_proc_info = tokudb_thd_get_proc_info(_thd);
394
395 _analyze_start = tokudb::time::microsec();
396 _half_time = _time_limit > 0 ? _time_limit/2 : 0;
397
398 if (_txn == NULL) {
399 _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED);
400
401 if (_result != 0) {
402 _txn = NULL;
403 _result = HA_ADMIN_FAILED;
404 goto error;
405 }
406 _local_txn = true;
407 } else {
408 _local_txn = false;
409 }
410
411 assert_always(_share->key_file[0] != NULL);
412 _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64);
413 if (_result != 0) {
414 _result = HA_ADMIN_FAILED;
415 goto cleanup;
416 }
417 _half_rows = stat64.bt_ndata / 2;
418
419 for (ulonglong current_key = 0;
420 _result == HA_ADMIN_OK && current_key < _share->_keys;
421 current_key++) {
422
423 _current_key = current_key;
424 _rows = _deleted_rows = _ticks = 0;
425 _result = analyze_key(&rec_per_key_part[total_key_parts]);
426
427 if ((_result != 0 && _result != ETIME) ||
428 (_result != 0 && _rows == 0 && _deleted_rows > 0)) {
429 _result = HA_ADMIN_FAILED;
430 }
431 if (_thd && (_result == HA_ADMIN_FAILED ||
432 static_cast<double>(_deleted_rows) >
433 _delete_fraction * (_rows + _deleted_rows))) {
434
435 char name[256]; int namelen;
436 namelen =
437 snprintf(
438 name,
439 sizeof(name),
440 "%s.%s.%s",
441 _share->database_name(),
442 _share->table_name(),
443 _share->_key_descriptors[_current_key]._name);
444 _thd->protocol->prepare_for_resend();
445 _thd->protocol->store(name, namelen, system_charset_info);
446 _thd->protocol->store("analyze", 7, system_charset_info);
447 _thd->protocol->store("info", 4, system_charset_info);
448 char rowmsg[256];
449 int rowmsglen;
450 rowmsglen =
451 snprintf(
452 rowmsg,
453 sizeof(rowmsg),
454 "rows processed %llu rows deleted %llu",
455 _rows,
456 _deleted_rows);
457 _thd->protocol->store(rowmsg, rowmsglen, system_charset_info);
458 _thd->protocol->write();
459
460 sql_print_information(
461 "tokudb analyze on %.*s %.*s",
462 namelen,
463 name,
464 rowmsglen,
465 rowmsg);
466 }
467
468 total_key_parts += _share->_key_descriptors[_current_key]._parts;
469 }
470 if (_result == HA_ADMIN_OK) {
471 int error =
472 tokudb::set_card_in_status(
473 _share->status_block,
474 _txn,
475 total_key_parts,
476 rec_per_key_part);
477 if (error)
478 _result = HA_ADMIN_FAILED;
479
480 _share->lock();
481 _share->update_cardinality_counts(total_key_parts, rec_per_key_part);
482 _share->allow_auto_analysis(true);
483 _share->unlock();
484 }
485
486cleanup:
487 if (_local_txn) {
488 if (_result == HA_ADMIN_OK) {
489 _txn->commit(_txn, 0);
490 } else {
491 _txn->abort(_txn);
492 }
493 _txn = NULL;
494 }
495
496error:
497 if (_thd)
498 tokudb_thd_set_proc_info(_thd, orig_proc_info);
499 return;
500}
501void standard_t::on_destroy() {
502 _share->lock();
503 _share->allow_auto_analysis(false);
504 _share->unlock();
505 _share->release();
506}
507const char* standard_t::key() {
508 return _share->full_table_name();
509}
510const char* standard_t::database() {
511 return _share->database_name();
512}
513const char* standard_t::table() {
514 return _share->table_name();
515}
516const char* standard_t::type() {
517 static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD";
518 return type;
519}
520const char* standard_t::parameters() {
521 return _parameters;
522}
523const char* standard_t::status() {
524 return _status;
525}
526bool standard_t::analyze_standard_cursor_callback(
527 void* extra,
528 uint64_t deleted_rows) {
529 standard_t* context = (standard_t*)extra;
530 return context->analyze_standard_cursor_callback(deleted_rows);
531}
532bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) {
533 _deleted_rows += deleted_rows;
534 _ticks += deleted_rows;
535 return analyze_key_progress() != 0;
536}
537int standard_t::analyze_key_progress(void) {
538 if (_ticks > 1000) {
539 _ticks = 0;
540 uint64_t now = tokudb::time::microsec();
541 _total_elapsed_time = now - _analyze_start;
542 _key_elapsed_time = now - _analyze_key_start;
543 if ((_thd && thd_kill_level(_thd)) || cancelled()) {
544 // client killed
545 return ER_ABORTING_CONNECTION;
546 } else if (_time_limit > 0 &&
547 static_cast<uint64_t>(_key_elapsed_time) > _time_limit) {
548 // time limit reached
549 return ETIME;
550 }
551
552 // rebuild status
553 // There is a slight race condition here,
554 // _status is used here for tokudb_thd_set_proc_info and it is also used
555 // for the status column in i_s.background_job_status.
556 // If someone happens to be querying/building the i_s table
557 // at the exact same time that the status is being rebuilt here,
558 // the i_s table could get some garbage status.
559 // This solution is a little heavy handed but it works, it prevents us
560 // from changing the status while someone might be immediately observing
561 // us and it prevents someone from observing us while we change the
562 // status.
563 static const char* scan_direction_str[] = {"not scanning",
564 "scanning forward",
565 "scanning backward",
566 "scan unknown"};
567
568 const char* scan_direction = NULL;
569 switch (_scan_direction) {
570 case 0:
571 scan_direction = scan_direction_str[0];
572 break;
573 case DB_NEXT:
574 scan_direction = scan_direction_str[1];
575 break;
576 case DB_PREV:
577 scan_direction = scan_direction_str[2];
578 break;
579 default:
580 scan_direction = scan_direction_str[3];
581 break;
582 }
583
584 float progress_rows = 0.0;
585 if (_share->row_count() > 0)
586 progress_rows = static_cast<float>(_rows) /
587 static_cast<float>(_share->row_count());
588 float progress_time = 0.0;
589 if (_time_limit > 0)
590 progress_time = static_cast<float>(_key_elapsed_time) /
591 static_cast<float>(_time_limit);
592 tokudb::background::_job_manager->lock();
593 snprintf(
594 _status,
595 sizeof(_status),
596 "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% "
597 "time, %s",
598 _share->database_name(),
599 _share->table_name(),
600 _share->_key_descriptors[_current_key]._name,
601 _current_key,
602 _share->_keys,
603 progress_rows * 100.0,
604 progress_time * 100.0,
605 scan_direction);
606 tokudb::background::_job_manager->unlock();
607
608 // report
609 if (_thd)
610 tokudb_thd_set_proc_info(_thd, _status);
611
612 // throttle
613 // given the throttle value, lets calculate the maximum number of rows
614 // we should have seen so far in a .1 sec resolution
615 if (_throttle > 0) {
616 uint64_t estimated_rows = _key_elapsed_time / 100000;
617 estimated_rows = estimated_rows * (_throttle / 10);
618 if (_rows + _deleted_rows > estimated_rows) {
619 // sleep for 1/10 of a second
620 tokudb::time::sleep_microsec(100000);
621 }
622 }
623 }
624 return 0;
625}
626int standard_t::analyze_key(uint64_t* rec_per_key_part) {
627 int error = 0;
628 DB* db = _share->key_file[_current_key];
629 assert_always(db != NULL);
630 uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts;
631 uint64_t unique_rows[num_key_parts];
632 bool is_unique = _share->_key_descriptors[_current_key]._is_unique;
633 DBC* cursor = NULL;
634 int close_error = 0;
635 DBT key, prev_key;
636 bool copy_key = false;
637
638 _analyze_key_start = tokudb::time::microsec();
639 _key_elapsed_time = 0;
640 _scan_direction = DB_NEXT;
641
642 if (is_unique && num_key_parts == 1) {
643 // don't compute for unique keys with a single part. we already know
644 // the answer.
645 _rows = unique_rows[0] = 1;
646 goto done;
647 }
648
649 for (uint64_t i = 0; i < num_key_parts; i++)
650 unique_rows[i] = 1;
651
652 // stop looking when the entire dictionary was analyzed, or a
653 // cap on execution time was reached, or the analyze was killed.
654 while (1) {
655 if (cursor == NULL) {
656 error = db->cursor(db, _txn, &cursor, 0);
657 if (error != 0)
658 goto done;
659
660 cursor->c_set_check_interrupt_callback(
661 cursor,
662 analyze_standard_cursor_callback,
663 this);
664
665 memset(&key, 0, sizeof(DBT));
666 memset(&prev_key, 0, sizeof(DBT));
667 copy_key = true;
668 }
669
670 error = cursor->c_get(cursor, &key, 0, _scan_direction);
671 if (error != 0) {
672 if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED)
673 error = 0; // not an error
674 break;
675 } else if (cancelled()) {
676 error = ER_ABORTING_CONNECTION;
677 break;
678 }
679
680 _rows++;
681 _ticks++;
682
683 // if copy_key is false at this pont, we have some value sitting in
684 // prev_key that we can compare to
685 // if the comparison reveals a unique key, we must set copy_key to true
686 // so the code following can copy he current key into prev_key for the
687 // next iteration
688 if (copy_key == false) {
689 // compare this key with the previous key. ignore
690 // appended PK for SK's.
691 // TODO if a prefix is different, then all larger keys
692 // that include the prefix are also different.
693 // TODO if we are comparing the entire primary key or
694 // the entire unique secondary key, then the cardinality
695 // must be 1, so we can avoid computing it.
696 for (uint64_t i = 0; i < num_key_parts; i++) {
697 int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
698 if (cmp != 0) {
699 unique_rows[i]++;
700 copy_key = true;
701 }
702 }
703 }
704
705 // prev_key = key or prev_key is NULL
706 if (copy_key) {
707 prev_key.data =
708 tokudb::memory::realloc(
709 prev_key.data,
710 key.size,
711 MYF(MY_WME|MY_ZEROFILL|MY_FAE));
712 assert_always(prev_key.data);
713 prev_key.size = key.size;
714 memcpy(prev_key.data, key.data, prev_key.size);
715 copy_key = false;
716 }
717
718 error = analyze_key_progress();
719 if (error == ETIME) {
720 error = 0;
721 break;
722 } else if (error) {
723 break;
724 }
725
726 // if we have a time limit, are scanning forward and have exceed the
727 // _half_time and not passed the _half_rows number of the rows in the
728 // index: clean up the keys, close the cursor and reverse direction.
729 if (TOKUDB_UNLIKELY(_half_time > 0 &&
730 _scan_direction == DB_NEXT &&
731 _key_elapsed_time >= _half_time &&
732 _rows < _half_rows)) {
733
734 tokudb::memory::free(prev_key.data); prev_key.data = NULL;
735 close_error = cursor->c_close(cursor);
736 assert_always(close_error == 0);
737 cursor = NULL;
738 _scan_direction = DB_PREV;
739 }
740 }
741 // cleanup
742 if (prev_key.data) tokudb::memory::free(prev_key.data);
743 if (cursor) close_error = cursor->c_close(cursor);
744 assert_always(close_error == 0);
745
746done:
747 // in case we timed out (bunch of deleted records) without hitting a
748 // single row
749 if (_rows == 0)
750 _rows = 1;
751
752 // return cardinality
753 for (uint64_t i = 0; i < num_key_parts; i++) {
754 rec_per_key_part[i] = _rows / unique_rows[i];
755 }
756 return error;
757}
758
759} // namespace analyze
760} // namespace tokudb
761
762
763int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
764 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
765 int result = HA_ADMIN_OK;
766 tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd);
767
768 switch (mode) {
769 case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS:
770 result = share->analyze_recount_rows(thd, transaction);
771 break;
772 case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD:
773 share->lock();
774 result = share->analyze_standard(thd, transaction);
775 share->unlock();
776 break;
777 case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL:
778 share->cancel_background_jobs();
779 break;
780 default:
781 break; // no-op
782 }
783 TOKUDB_HANDLER_DBUG_RETURN(result);
784}
785
786int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) {
787 TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
788
789 assert_always(thd != NULL);
790
791 int result = HA_ADMIN_OK;
792
793 tokudb::analyze::recount_rows_t* job
794 = new tokudb::analyze::recount_rows_t(true, thd, this, txn);
795 assert_always(job != NULL);
796
797 // job->destroy will drop the ref
798 addref();
799 unlock();
800
801 bool ret = tokudb::background::_job_manager->
802 run_job(job, tokudb::sysvars::analyze_in_background(thd));
803
804 if (!ret) {
805 job->destroy();
806 delete job;
807 result = HA_ADMIN_FAILED;
808 }
809
810 TOKUDB_HANDLER_DBUG_RETURN(result);
811}
812
813// on entry, if txn is !NULL, it is a user session invoking ANALYZE directly
814// and no lock will be held on 'this', else if txn is NULL it is an auto and
815// 'this' will be locked.
816int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) {
817 TOKUDB_HANDLER_DBUG_ENTER("%s", table_name());
818
819 assert_always(thd != NULL);
820 assert_debug(_mutex.is_owned_by_me() == true);
821
822 int result = HA_ADMIN_OK;
823
824 // stub out analyze if optimize is remapped to alter recreate + analyze
825 // when not auto analyze or if this is an alter
826 if ((txn &&
827 thd_sql_command(thd) != SQLCOM_ANALYZE &&
828 thd_sql_command(thd) != SQLCOM_ALTER_TABLE) ||
829 thd_sql_command(thd) == SQLCOM_ALTER_TABLE) {
830 TOKUDB_HANDLER_DBUG_RETURN(result);
831 }
832
833 tokudb::analyze::standard_t* job
834 = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd,
835 this, txn);
836 assert_always(job != NULL);
837
838 // akin to calling addref, but we know, right here, right now, everything
839 // in the share is set up, files open, etc...
840 // job->destroy will drop the ref
841 _use_count++;
842
843 // don't want any autos kicking off while we are analyzing
844 disallow_auto_analysis();
845
846 unlock();
847
848 bool ret =
849 tokudb::background::_job_manager->run_job(
850 job,
851 tokudb::sysvars::analyze_in_background(thd));
852
853 if (!ret) {
854 job->destroy();
855 delete job;
856 result = HA_ADMIN_FAILED;
857 }
858
859 lock();
860
861 TOKUDB_HANDLER_DBUG_RETURN(result);
862}
863
864
865typedef struct hot_optimize_context {
866 THD* thd;
867 char* write_status_msg;
868 ha_tokudb* ha;
869 uint progress_stage;
870 uint current_table;
871 uint num_tables;
872 float progress_limit;
873 uint64_t progress_last_time;
874 uint64_t throttle;
875} *HOT_OPTIMIZE_CONTEXT;
876
877static int hot_optimize_progress_fun(void *extra, float progress) {
878 HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
879 if (thd_kill_level(context->thd)) {
880 sprintf(
881 context->write_status_msg,
882 "The process has been killed, aborting hot optimize.");
883 return ER_ABORTING_CONNECTION;
884 }
885 float percentage = progress * 100;
886 sprintf(
887 context->write_status_msg,
888 "Optimization of index %u of %u about %.lf%% done",
889 context->current_table + 1,
890 context->num_tables,
891 percentage);
892 thd_proc_info(context->thd, context->write_status_msg);
893#ifdef HA_TOKUDB_HAS_THD_PROGRESS
894 if (context->progress_stage < context->current_table) {
895 // the progress stage is behind the current table, so move up
896 // to the next stage and set the progress stage to current.
897 thd_progress_next_stage(context->thd);
898 context->progress_stage = context->current_table;
899 }
900 // the percentage we report here is for the current stage/db
901 thd_progress_report(context->thd, (unsigned long long) percentage, 100);
902#endif
903
904 // throttle the optimize table
905 if (context->throttle) {
906 uint64_t time_now = toku_current_time_microsec();
907 uint64_t dt = time_now - context->progress_last_time;
908 uint64_t throttle_time = 1000000ULL / context->throttle;
909 if (throttle_time > dt) {
910 usleep(throttle_time - dt);
911 }
912 context->progress_last_time = toku_current_time_microsec();
913 }
914
915 // return 1 if progress has reach the progress limit
916 return progress >= context->progress_limit;
917}
918
919// flatten all DB's in this table, to do so, peform hot optimize on each db
920int ha_tokudb::do_optimize(THD* thd) {
921 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
922 int error = 0;
923 const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
924 uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key);
925
926#ifdef HA_TOKUDB_HAS_THD_PROGRESS
927 // each DB is its own stage. as HOT goes through each db, we'll
928 // move on to the next stage.
929 thd_progress_init(thd, curr_num_DBs);
930#endif
931
932 // for each DB, run optimize and hot_optimize
933 for (uint i = 0; i < curr_num_DBs; i++) {
934 // only optimize the index if it matches the optimize_index_name
935 // session variable
936 const char* optimize_index_name =
937 tokudb::sysvars::optimize_index_name(thd);
938 if (optimize_index_name) {
939 const char* this_index_name =
940 i >= table_share->keys ?
941 "primary" :
942 table_share->key_info[i].name.str;
943 if (strcasecmp(optimize_index_name, this_index_name) != 0) {
944 continue;
945 }
946 }
947
948 DB* db = share->key_file[i];
949 assert_always(db != NULL);
950 error = db->optimize(db);
951 if (error) {
952 goto cleanup;
953 }
954
955 struct hot_optimize_context hc;
956 memset(&hc, 0, sizeof hc);
957 hc.thd = thd;
958 hc.write_status_msg = this->write_status_msg;
959 hc.ha = this;
960 hc.current_table = i;
961 hc.num_tables = curr_num_DBs;
962 hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd);
963 hc.progress_last_time = toku_current_time_microsec();
964 hc.throttle = tokudb::sysvars::optimize_throttle(thd);
965 uint64_t loops_run;
966 error =
967 db->hot_optimize(
968 db,
969 NULL,
970 NULL,
971 hot_optimize_progress_fun,
972 &hc,
973 &loops_run);
974 if (error) {
975 goto cleanup;
976 }
977 }
978 error = 0;
979
980cleanup:
981#ifdef HA_TOKUDB_HAS_THD_PROGRESS
982 thd_progress_end(thd);
983#endif
984 thd_proc_info(thd, orig_proc_info);
985 TOKUDB_HANDLER_DBUG_RETURN(error);
986}
987
988int ha_tokudb::optimize(THD* thd, HA_CHECK_OPT* check_opt) {
989 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
990 int error;
991#if TOKU_OPTIMIZE_WITH_RECREATE
992 error = HA_ADMIN_TRY_ALTER;
993#else
994 error = do_optimize(thd);
995#endif
996 TOKUDB_HANDLER_DBUG_RETURN(error);
997}
998
999struct check_context {
1000 THD* thd;
1001};
1002
1003static int ha_tokudb_check_progress(void* extra, float progress) {
1004 struct check_context* context = (struct check_context*)extra;
1005 int result = 0;
1006 if (thd_kill_level(context->thd))
1007 result = ER_ABORTING_CONNECTION;
1008 return result;
1009}
1010
1011static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) {
1012 if (thd->vio_ok()) {
1013 char tablename[
1014 table->s->db.length + 1 +
1015 table->s->table_name.length + 1];
1016 snprintf(
1017 tablename,
1018 sizeof(tablename),
1019 "%.*s.%.*s",
1020 (int)table->s->db.length,
1021 table->s->db.str,
1022 (int)table->s->table_name.length,
1023 table->s->table_name.str);
1024 thd->protocol->prepare_for_resend();
1025 thd->protocol->store(tablename, strlen(tablename), system_charset_info);
1026 thd->protocol->store("check", 5, system_charset_info);
1027 thd->protocol->store("info", 4, system_charset_info);
1028 thd->protocol->store(msg, strlen(msg), system_charset_info);
1029 thd->protocol->write();
1030 }
1031}
1032
1033int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) {
1034 TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name());
1035 const char* orig_proc_info = tokudb_thd_get_proc_info(thd);
1036 int result = HA_ADMIN_OK;
1037 int r;
1038
1039 int keep_going = 1;
1040 if (check_opt->flags & T_QUICK) {
1041 keep_going = 0;
1042 }
1043 if (check_opt->flags & T_EXTEND) {
1044 keep_going = 1;
1045 }
1046
1047 r = acquire_table_lock(transaction, lock_write);
1048 if (r != 0)
1049 result = HA_ADMIN_INTERNAL_ERROR;
1050 if (result == HA_ADMIN_OK) {
1051 uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key);
1052 snprintf(
1053 write_status_msg,
1054 sizeof(write_status_msg),
1055 "%s primary=%d num=%d",
1056 share->table_name(),
1057 primary_key,
1058 num_DBs);
1059 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1060 ha_tokudb_check_info(thd, table, write_status_msg);
1061 time_t now = time(0);
1062 char timebuf[32];
1063 TOKUDB_HANDLER_TRACE(
1064 "%.24s %s",
1065 ctime_r(&now, timebuf),
1066 write_status_msg);
1067 }
1068 for (uint i = 0; i < num_DBs; i++) {
1069 DB* db = share->key_file[i];
1070 assert_always(db != NULL);
1071 const char* kname =
1072 i == primary_key ? "primary" : table_share->key_info[i].name.str;
1073 snprintf(
1074 write_status_msg,
1075 sizeof(write_status_msg),
1076 "%s key=%s %u",
1077 share->table_name(),
1078 kname,
1079 i);
1080 thd_proc_info(thd, write_status_msg);
1081 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1082 ha_tokudb_check_info(thd, table, write_status_msg);
1083 time_t now = time(0);
1084 char timebuf[32];
1085 TOKUDB_HANDLER_TRACE(
1086 "%.24s %s",
1087 ctime_r(&now, timebuf),
1088 write_status_msg);
1089 }
1090 struct check_context check_context = { thd };
1091 r = db->verify_with_progress(
1092 db,
1093 ha_tokudb_check_progress,
1094 &check_context,
1095 (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0,
1096 keep_going);
1097 if (r != 0) {
1098 char msg[32 + strlen(kname)];
1099 sprintf(msg, "Corrupt %s", kname);
1100 ha_tokudb_check_info(thd, table, msg);
1101 }
1102 snprintf(
1103 write_status_msg,
1104 sizeof(write_status_msg),
1105 "%s key=%s %u result=%d",
1106 share->full_table_name(),
1107 kname,
1108 i,
1109 r);
1110 thd_proc_info(thd, write_status_msg);
1111 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) {
1112 ha_tokudb_check_info(thd, table, write_status_msg);
1113 time_t now = time(0);
1114 char timebuf[32];
1115 TOKUDB_HANDLER_TRACE(
1116 "%.24s %s",
1117 ctime_r(&now, timebuf),
1118 write_status_msg);
1119 }
1120 if (result == HA_ADMIN_OK && r != 0) {
1121 result = HA_ADMIN_CORRUPT;
1122 if (!keep_going)
1123 break;
1124 }
1125 }
1126 }
1127 thd_proc_info(thd, orig_proc_info);
1128 TOKUDB_HANDLER_DBUG_RETURN(result);
1129}
1130