1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of TokuDB |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | TokuDB is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | TokuDB is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with TokuDB. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ======= */ |
23 | |
24 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
25 | |
26 | #include "tokudb_sysvars.h" |
27 | #include "toku_time.h" |
28 | |
29 | namespace tokudb { |
30 | namespace analyze { |
31 | |
32 | class recount_rows_t : public tokudb::background::job_manager_t::job_t { |
33 | public: |
34 | void* operator new(size_t sz); |
35 | void operator delete(void* p); |
36 | |
37 | recount_rows_t( |
38 | bool user_schedued, |
39 | THD* thd, |
40 | TOKUDB_SHARE* share, |
41 | DB_TXN* txn); |
42 | |
43 | virtual ~recount_rows_t(); |
44 | |
45 | virtual const char* key(); |
46 | virtual const char* database(); |
47 | virtual const char* table(); |
48 | virtual const char* type(); |
49 | virtual const char* parameters(); |
50 | virtual const char* status(); |
51 | |
52 | protected: |
53 | virtual void on_run(); |
54 | |
55 | virtual void on_destroy(); |
56 | |
57 | private: |
58 | // to be provided by the initiator of recount rows |
59 | THD* _thd; |
60 | TOKUDB_SHARE* _share; |
61 | DB_TXN* _txn; |
62 | ulonglong _throttle; |
63 | |
64 | // for recount rows status reporting |
65 | char _parameters[256]; |
66 | char _status[1024]; |
67 | int _result; |
68 | ulonglong _recount_start; // in microseconds |
69 | ulonglong _total_elapsed_time; // in microseconds |
70 | |
71 | bool _local_txn; |
72 | ulonglong _rows; |
73 | ulonglong _deleted_rows; |
74 | ulonglong _ticks; |
75 | |
76 | static int analyze_recount_rows_progress( |
77 | uint64_t count, |
78 | uint64_t deleted, |
79 | void* ); |
80 | int analyze_recount_rows_progress(uint64_t count, uint64_t deleted); |
81 | }; |
82 | |
83 | void* recount_rows_t::operator new(size_t sz) { |
84 | return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); |
85 | } |
86 | void recount_rows_t::operator delete(void* p) { |
87 | tokudb::memory::free(p); |
88 | } |
89 | recount_rows_t::recount_rows_t( |
90 | bool user_scheduled, |
91 | THD* thd, |
92 | TOKUDB_SHARE* share, |
93 | DB_TXN* txn) : |
94 | tokudb::background::job_manager_t::job_t(user_scheduled), |
95 | _share(share), |
96 | _result(HA_ADMIN_OK), |
97 | _recount_start(0), |
98 | _total_elapsed_time(0), |
99 | _local_txn(false), |
100 | _rows(0), |
101 | _deleted_rows(0), |
102 | _ticks(0) { |
103 | |
104 | assert_debug(thd != NULL); |
105 | assert_debug(share != NULL); |
106 | |
107 | if (tokudb::sysvars::analyze_in_background(thd)) { |
108 | _thd = NULL; |
109 | _txn = NULL; |
110 | } else { |
111 | _thd = thd; |
112 | _txn = txn; |
113 | } |
114 | |
115 | _throttle = tokudb::sysvars::analyze_throttle(thd); |
116 | |
117 | snprintf(_parameters, |
118 | sizeof(_parameters), |
119 | "TOKUDB_ANALYZE_THROTTLE=%llu;" , |
120 | _throttle); |
121 | _status[0] = '\0'; |
122 | } |
123 | recount_rows_t::~recount_rows_t() { |
124 | } |
125 | void recount_rows_t::on_run() { |
126 | const char* orig_proc_info = NULL; |
127 | if (_thd) |
128 | orig_proc_info = tokudb_thd_get_proc_info(_thd); |
129 | _recount_start = tokudb::time::microsec(); |
130 | _total_elapsed_time = 0; |
131 | |
132 | if (_txn == NULL) { |
133 | _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED); |
134 | |
135 | if (_result != 0) { |
136 | _txn = NULL; |
137 | _result = HA_ADMIN_FAILED; |
138 | goto error; |
139 | } |
140 | _local_txn = true; |
141 | } else { |
142 | _local_txn = false; |
143 | } |
144 | |
145 | _result = |
146 | _share->file->recount_rows( |
147 | _share->file, |
148 | analyze_recount_rows_progress, |
149 | this); |
150 | |
151 | if (_result != 0) { |
152 | if (_local_txn) { |
153 | _txn->abort(_txn); |
154 | _txn = NULL; |
155 | } |
156 | _result = HA_ADMIN_FAILED; |
157 | goto error; |
158 | } |
159 | |
160 | DB_BTREE_STAT64 dict_stats; |
161 | _result = _share->file->stat64(_share->file, _txn, &dict_stats); |
162 | if (_result == 0) { |
163 | _share->set_row_count(dict_stats.bt_ndata, false); |
164 | } |
165 | if (_result != 0) |
166 | _result = HA_ADMIN_FAILED; |
167 | |
168 | if (_local_txn) { |
169 | if (_result == HA_ADMIN_OK) { |
170 | _txn->commit(_txn, 0); |
171 | } else { |
172 | _txn->abort(_txn); |
173 | } |
174 | _txn = NULL; |
175 | } |
176 | |
177 | sql_print_information( |
178 | "tokudb analyze recount rows %d counted %lld" , |
179 | _result, |
180 | _share->row_count()); |
181 | error: |
182 | if(_thd) |
183 | tokudb_thd_set_proc_info(_thd, orig_proc_info); |
184 | return; |
185 | } |
186 | void recount_rows_t::on_destroy() { |
187 | _share->release(); |
188 | } |
189 | const char* recount_rows_t::key() { |
190 | return _share->full_table_name(); |
191 | } |
192 | const char* recount_rows_t::database() { |
193 | return _share->database_name(); |
194 | } |
195 | const char* recount_rows_t::table() { |
196 | return _share->table_name(); |
197 | } |
198 | const char* recount_rows_t::type() { |
199 | static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS" ; |
200 | return type; |
201 | } |
202 | const char* recount_rows_t::parameters() { |
203 | return _parameters; |
204 | } |
205 | const char* recount_rows_t::status() { |
206 | return _status; |
207 | } |
208 | int recount_rows_t::analyze_recount_rows_progress( |
209 | uint64_t count, |
210 | uint64_t deleted, |
211 | void* ) { |
212 | |
213 | recount_rows_t* context = (recount_rows_t*)extra; |
214 | return context->analyze_recount_rows_progress(count, deleted); |
215 | } |
216 | int recount_rows_t::analyze_recount_rows_progress( |
217 | uint64_t count, |
218 | uint64_t deleted) { |
219 | |
220 | _rows = count; |
221 | _deleted_rows += deleted; |
222 | deleted > 0 ? _ticks += deleted : _ticks++; |
223 | |
224 | if (_ticks > 1000) { |
225 | _ticks = 0; |
226 | uint64_t now = tokudb::time::microsec(); |
227 | _total_elapsed_time = now - _recount_start; |
228 | if ((_thd && thd_kill_level(_thd)) || cancelled()) { |
229 | // client killed |
230 | return ER_ABORTING_CONNECTION; |
231 | } |
232 | |
233 | // rebuild status |
234 | // There is a slight race condition here, |
235 | // _status is used here for tokudb_thd_set_proc_info and it is also used |
236 | // for the status column in i_s.background_job_status. |
237 | // If someone happens to be querying/building the i_s table |
238 | // at the exact same time that the status is being rebuilt here, |
239 | // the i_s table could get some garbage status. |
240 | // This solution is a little heavy handed but it works, it prevents us |
241 | // from changing the status while someone might be immediately observing |
242 | // us and it prevents someone from observing us while we change the |
243 | // status |
244 | tokudb::background::_job_manager->lock(); |
245 | snprintf(_status, |
246 | sizeof(_status), |
247 | "recount_rows %s.%s counted %llu rows and %llu deleted " |
248 | "in %llu seconds." , |
249 | _share->database_name(), |
250 | _share->table_name(), |
251 | _rows, |
252 | _deleted_rows, |
253 | _total_elapsed_time / tokudb::time::MICROSECONDS); |
254 | tokudb::background::_job_manager->unlock(); |
255 | |
256 | // report |
257 | if (_thd) |
258 | tokudb_thd_set_proc_info(_thd, _status); |
259 | |
260 | // throttle |
261 | // given the throttle value, lets calculate the maximum number of rows |
262 | // we should have seen so far in a .1 sec resolution |
263 | if (_throttle > 0) { |
264 | uint64_t estimated_rows = _total_elapsed_time / 100000; |
265 | estimated_rows = estimated_rows * (_throttle / 10); |
266 | if (_rows + _deleted_rows > estimated_rows) { |
267 | // sleep for 1/10 of a second |
268 | tokudb::time::sleep_microsec(100000); |
269 | } |
270 | } |
271 | } |
272 | return 0; |
273 | } |
274 | |
275 | class standard_t : public tokudb::background::job_manager_t::job_t { |
276 | public: |
277 | void* operator new(size_t sz); |
278 | void operator delete(void* p); |
279 | |
280 | standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn); |
281 | |
282 | virtual ~standard_t(); |
283 | |
284 | virtual const char* key(void); |
285 | virtual const char* database(); |
286 | virtual const char* table(); |
287 | virtual const char* type(); |
288 | virtual const char* parameters(); |
289 | virtual const char* status(); |
290 | |
291 | protected: |
292 | virtual void on_run(); |
293 | |
294 | virtual void on_destroy(); |
295 | |
296 | private: |
297 | // to be provided by initiator of analyze |
298 | THD* _thd; |
299 | TOKUDB_SHARE* _share; |
300 | DB_TXN* _txn; |
301 | ulonglong _throttle; // in microseconds |
302 | ulonglong _time_limit; // in microseconds |
303 | double _delete_fraction; |
304 | |
305 | // for analyze status reporting, may also use other state |
306 | char _parameters[256]; |
307 | char _status[1024]; |
308 | int _result; |
309 | ulonglong _analyze_start; // in microseconds |
310 | ulonglong _total_elapsed_time; // in microseconds |
311 | |
312 | // for analyze internal use, pretty much these are per-key/index |
313 | ulonglong _current_key; |
314 | bool _local_txn; |
315 | ulonglong _half_time; |
316 | ulonglong _half_rows; |
317 | ulonglong _rows; |
318 | ulonglong _deleted_rows; |
319 | ulonglong _ticks; |
320 | ulonglong _analyze_key_start; // in microseconds |
321 | ulonglong _key_elapsed_time; // in microseconds |
322 | uint _scan_direction; |
323 | |
324 | static bool analyze_standard_cursor_callback( |
325 | void* , |
326 | uint64_t deleted_rows); |
327 | bool analyze_standard_cursor_callback(uint64_t deleted_rows); |
328 | |
329 | int analyze_key_progress(); |
330 | int analyze_key(uint64_t* rec_per_key_part); |
331 | }; |
332 | |
333 | void* standard_t::operator new(size_t sz) { |
334 | return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); |
335 | } |
336 | void standard_t::operator delete(void* p) { |
337 | tokudb::memory::free(p); |
338 | } |
339 | standard_t::standard_t( |
340 | bool user_scheduled, |
341 | THD* thd, |
342 | TOKUDB_SHARE* share, |
343 | DB_TXN* txn) : |
344 | tokudb::background::job_manager_t::job_t(user_scheduled), |
345 | _share(share), |
346 | _result(HA_ADMIN_OK), |
347 | _analyze_start(0), |
348 | _total_elapsed_time(0), |
349 | _current_key(0), |
350 | _local_txn(false), |
351 | _half_time(0), |
352 | _half_rows(0), |
353 | _rows(0), |
354 | _deleted_rows(0), |
355 | _ticks(0), |
356 | _analyze_key_start(0), |
357 | _key_elapsed_time(0), |
358 | _scan_direction(0) { |
359 | |
360 | assert_debug(thd != NULL); |
361 | assert_debug(share != NULL); |
362 | |
363 | if (tokudb::sysvars::analyze_in_background(thd)) { |
364 | _thd = NULL; |
365 | _txn = NULL; |
366 | } else { |
367 | _thd = thd; |
368 | _txn = txn; |
369 | } |
370 | _throttle = tokudb::sysvars::analyze_throttle(thd); |
371 | _time_limit = |
372 | tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS; |
373 | _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd); |
374 | |
375 | snprintf(_parameters, |
376 | sizeof(_parameters), |
377 | "TOKUDB_ANALYZE_DELETE_FRACTION=%f; " |
378 | "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;" , |
379 | _delete_fraction, |
380 | _time_limit / tokudb::time::MICROSECONDS, |
381 | _throttle); |
382 | |
383 | _status[0] = '\0'; |
384 | } |
385 | standard_t::~standard_t() { |
386 | } |
387 | void standard_t::on_run() { |
388 | DB_BTREE_STAT64 stat64; |
389 | uint64_t rec_per_key_part[_share->_max_key_parts]; |
390 | uint64_t total_key_parts = 0; |
391 | const char* orig_proc_info = NULL; |
392 | if (_thd) |
393 | orig_proc_info = tokudb_thd_get_proc_info(_thd); |
394 | |
395 | _analyze_start = tokudb::time::microsec(); |
396 | _half_time = _time_limit > 0 ? _time_limit/2 : 0; |
397 | |
398 | if (_txn == NULL) { |
399 | _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED); |
400 | |
401 | if (_result != 0) { |
402 | _txn = NULL; |
403 | _result = HA_ADMIN_FAILED; |
404 | goto error; |
405 | } |
406 | _local_txn = true; |
407 | } else { |
408 | _local_txn = false; |
409 | } |
410 | |
411 | assert_always(_share->key_file[0] != NULL); |
412 | _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64); |
413 | if (_result != 0) { |
414 | _result = HA_ADMIN_FAILED; |
415 | goto cleanup; |
416 | } |
417 | _half_rows = stat64.bt_ndata / 2; |
418 | |
419 | for (ulonglong current_key = 0; |
420 | _result == HA_ADMIN_OK && current_key < _share->_keys; |
421 | current_key++) { |
422 | |
423 | _current_key = current_key; |
424 | _rows = _deleted_rows = _ticks = 0; |
425 | _result = analyze_key(&rec_per_key_part[total_key_parts]); |
426 | |
427 | if ((_result != 0 && _result != ETIME) || |
428 | (_result != 0 && _rows == 0 && _deleted_rows > 0)) { |
429 | _result = HA_ADMIN_FAILED; |
430 | } |
431 | if (_thd && (_result == HA_ADMIN_FAILED || |
432 | static_cast<double>(_deleted_rows) > |
433 | _delete_fraction * (_rows + _deleted_rows))) { |
434 | |
435 | char name[256]; int namelen; |
436 | namelen = |
437 | snprintf( |
438 | name, |
439 | sizeof(name), |
440 | "%s.%s.%s" , |
441 | _share->database_name(), |
442 | _share->table_name(), |
443 | _share->_key_descriptors[_current_key]._name); |
444 | _thd->protocol->prepare_for_resend(); |
445 | _thd->protocol->store(name, namelen, system_charset_info); |
446 | _thd->protocol->store("analyze" , 7, system_charset_info); |
447 | _thd->protocol->store("info" , 4, system_charset_info); |
448 | char rowmsg[256]; |
449 | int rowmsglen; |
450 | rowmsglen = |
451 | snprintf( |
452 | rowmsg, |
453 | sizeof(rowmsg), |
454 | "rows processed %llu rows deleted %llu" , |
455 | _rows, |
456 | _deleted_rows); |
457 | _thd->protocol->store(rowmsg, rowmsglen, system_charset_info); |
458 | _thd->protocol->write(); |
459 | |
460 | sql_print_information( |
461 | "tokudb analyze on %.*s %.*s" , |
462 | namelen, |
463 | name, |
464 | rowmsglen, |
465 | rowmsg); |
466 | } |
467 | |
468 | total_key_parts += _share->_key_descriptors[_current_key]._parts; |
469 | } |
470 | if (_result == HA_ADMIN_OK) { |
471 | int error = |
472 | tokudb::set_card_in_status( |
473 | _share->status_block, |
474 | _txn, |
475 | total_key_parts, |
476 | rec_per_key_part); |
477 | if (error) |
478 | _result = HA_ADMIN_FAILED; |
479 | |
480 | _share->lock(); |
481 | _share->update_cardinality_counts(total_key_parts, rec_per_key_part); |
482 | _share->allow_auto_analysis(true); |
483 | _share->unlock(); |
484 | } |
485 | |
486 | cleanup: |
487 | if (_local_txn) { |
488 | if (_result == HA_ADMIN_OK) { |
489 | _txn->commit(_txn, 0); |
490 | } else { |
491 | _txn->abort(_txn); |
492 | } |
493 | _txn = NULL; |
494 | } |
495 | |
496 | error: |
497 | if (_thd) |
498 | tokudb_thd_set_proc_info(_thd, orig_proc_info); |
499 | return; |
500 | } |
501 | void standard_t::on_destroy() { |
502 | _share->lock(); |
503 | _share->allow_auto_analysis(false); |
504 | _share->unlock(); |
505 | _share->release(); |
506 | } |
507 | const char* standard_t::key() { |
508 | return _share->full_table_name(); |
509 | } |
510 | const char* standard_t::database() { |
511 | return _share->database_name(); |
512 | } |
513 | const char* standard_t::table() { |
514 | return _share->table_name(); |
515 | } |
516 | const char* standard_t::type() { |
517 | static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD" ; |
518 | return type; |
519 | } |
520 | const char* standard_t::parameters() { |
521 | return _parameters; |
522 | } |
523 | const char* standard_t::status() { |
524 | return _status; |
525 | } |
526 | bool standard_t::analyze_standard_cursor_callback( |
527 | void* , |
528 | uint64_t deleted_rows) { |
529 | standard_t* context = (standard_t*)extra; |
530 | return context->analyze_standard_cursor_callback(deleted_rows); |
531 | } |
532 | bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) { |
533 | _deleted_rows += deleted_rows; |
534 | _ticks += deleted_rows; |
535 | return analyze_key_progress() != 0; |
536 | } |
537 | int standard_t::analyze_key_progress(void) { |
538 | if (_ticks > 1000) { |
539 | _ticks = 0; |
540 | uint64_t now = tokudb::time::microsec(); |
541 | _total_elapsed_time = now - _analyze_start; |
542 | _key_elapsed_time = now - _analyze_key_start; |
543 | if ((_thd && thd_kill_level(_thd)) || cancelled()) { |
544 | // client killed |
545 | return ER_ABORTING_CONNECTION; |
546 | } else if (_time_limit > 0 && |
547 | static_cast<uint64_t>(_key_elapsed_time) > _time_limit) { |
548 | // time limit reached |
549 | return ETIME; |
550 | } |
551 | |
552 | // rebuild status |
553 | // There is a slight race condition here, |
554 | // _status is used here for tokudb_thd_set_proc_info and it is also used |
555 | // for the status column in i_s.background_job_status. |
556 | // If someone happens to be querying/building the i_s table |
557 | // at the exact same time that the status is being rebuilt here, |
558 | // the i_s table could get some garbage status. |
559 | // This solution is a little heavy handed but it works, it prevents us |
560 | // from changing the status while someone might be immediately observing |
561 | // us and it prevents someone from observing us while we change the |
562 | // status. |
563 | static const char* scan_direction_str[] = {"not scanning" , |
564 | "scanning forward" , |
565 | "scanning backward" , |
566 | "scan unknown" }; |
567 | |
568 | const char* scan_direction = NULL; |
569 | switch (_scan_direction) { |
570 | case 0: |
571 | scan_direction = scan_direction_str[0]; |
572 | break; |
573 | case DB_NEXT: |
574 | scan_direction = scan_direction_str[1]; |
575 | break; |
576 | case DB_PREV: |
577 | scan_direction = scan_direction_str[2]; |
578 | break; |
579 | default: |
580 | scan_direction = scan_direction_str[3]; |
581 | break; |
582 | } |
583 | |
584 | float progress_rows = 0.0; |
585 | if (_share->row_count() > 0) |
586 | progress_rows = static_cast<float>(_rows) / |
587 | static_cast<float>(_share->row_count()); |
588 | float progress_time = 0.0; |
589 | if (_time_limit > 0) |
590 | progress_time = static_cast<float>(_key_elapsed_time) / |
591 | static_cast<float>(_time_limit); |
592 | tokudb::background::_job_manager->lock(); |
593 | snprintf( |
594 | _status, |
595 | sizeof(_status), |
596 | "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% " |
597 | "time, %s" , |
598 | _share->database_name(), |
599 | _share->table_name(), |
600 | _share->_key_descriptors[_current_key]._name, |
601 | _current_key, |
602 | _share->_keys, |
603 | progress_rows * 100.0, |
604 | progress_time * 100.0, |
605 | scan_direction); |
606 | tokudb::background::_job_manager->unlock(); |
607 | |
608 | // report |
609 | if (_thd) |
610 | tokudb_thd_set_proc_info(_thd, _status); |
611 | |
612 | // throttle |
613 | // given the throttle value, lets calculate the maximum number of rows |
614 | // we should have seen so far in a .1 sec resolution |
615 | if (_throttle > 0) { |
616 | uint64_t estimated_rows = _key_elapsed_time / 100000; |
617 | estimated_rows = estimated_rows * (_throttle / 10); |
618 | if (_rows + _deleted_rows > estimated_rows) { |
619 | // sleep for 1/10 of a second |
620 | tokudb::time::sleep_microsec(100000); |
621 | } |
622 | } |
623 | } |
624 | return 0; |
625 | } |
626 | int standard_t::analyze_key(uint64_t* rec_per_key_part) { |
627 | int error = 0; |
628 | DB* db = _share->key_file[_current_key]; |
629 | assert_always(db != NULL); |
630 | uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts; |
631 | uint64_t unique_rows[num_key_parts]; |
632 | bool is_unique = _share->_key_descriptors[_current_key]._is_unique; |
633 | DBC* cursor = NULL; |
634 | int close_error = 0; |
635 | DBT key, prev_key; |
636 | bool copy_key = false; |
637 | |
638 | _analyze_key_start = tokudb::time::microsec(); |
639 | _key_elapsed_time = 0; |
640 | _scan_direction = DB_NEXT; |
641 | |
642 | if (is_unique && num_key_parts == 1) { |
643 | // don't compute for unique keys with a single part. we already know |
644 | // the answer. |
645 | _rows = unique_rows[0] = 1; |
646 | goto done; |
647 | } |
648 | |
649 | for (uint64_t i = 0; i < num_key_parts; i++) |
650 | unique_rows[i] = 1; |
651 | |
652 | // stop looking when the entire dictionary was analyzed, or a |
653 | // cap on execution time was reached, or the analyze was killed. |
654 | while (1) { |
655 | if (cursor == NULL) { |
656 | error = db->cursor(db, _txn, &cursor, 0); |
657 | if (error != 0) |
658 | goto done; |
659 | |
660 | cursor->c_set_check_interrupt_callback( |
661 | cursor, |
662 | analyze_standard_cursor_callback, |
663 | this); |
664 | |
665 | memset(&key, 0, sizeof(DBT)); |
666 | memset(&prev_key, 0, sizeof(DBT)); |
667 | copy_key = true; |
668 | } |
669 | |
670 | error = cursor->c_get(cursor, &key, 0, _scan_direction); |
671 | if (error != 0) { |
672 | if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED) |
673 | error = 0; // not an error |
674 | break; |
675 | } else if (cancelled()) { |
676 | error = ER_ABORTING_CONNECTION; |
677 | break; |
678 | } |
679 | |
680 | _rows++; |
681 | _ticks++; |
682 | |
683 | // if copy_key is false at this pont, we have some value sitting in |
684 | // prev_key that we can compare to |
685 | // if the comparison reveals a unique key, we must set copy_key to true |
686 | // so the code following can copy he current key into prev_key for the |
687 | // next iteration |
688 | if (copy_key == false) { |
689 | // compare this key with the previous key. ignore |
690 | // appended PK for SK's. |
691 | // TODO if a prefix is different, then all larger keys |
692 | // that include the prefix are also different. |
693 | // TODO if we are comparing the entire primary key or |
694 | // the entire unique secondary key, then the cardinality |
695 | // must be 1, so we can avoid computing it. |
696 | for (uint64_t i = 0; i < num_key_parts; i++) { |
697 | int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1); |
698 | if (cmp != 0) { |
699 | unique_rows[i]++; |
700 | copy_key = true; |
701 | } |
702 | } |
703 | } |
704 | |
705 | // prev_key = key or prev_key is NULL |
706 | if (copy_key) { |
707 | prev_key.data = |
708 | tokudb::memory::realloc( |
709 | prev_key.data, |
710 | key.size, |
711 | MYF(MY_WME|MY_ZEROFILL|MY_FAE)); |
712 | assert_always(prev_key.data); |
713 | prev_key.size = key.size; |
714 | memcpy(prev_key.data, key.data, prev_key.size); |
715 | copy_key = false; |
716 | } |
717 | |
718 | error = analyze_key_progress(); |
719 | if (error == ETIME) { |
720 | error = 0; |
721 | break; |
722 | } else if (error) { |
723 | break; |
724 | } |
725 | |
726 | // if we have a time limit, are scanning forward and have exceed the |
727 | // _half_time and not passed the _half_rows number of the rows in the |
728 | // index: clean up the keys, close the cursor and reverse direction. |
729 | if (TOKUDB_UNLIKELY(_half_time > 0 && |
730 | _scan_direction == DB_NEXT && |
731 | _key_elapsed_time >= _half_time && |
732 | _rows < _half_rows)) { |
733 | |
734 | tokudb::memory::free(prev_key.data); prev_key.data = NULL; |
735 | close_error = cursor->c_close(cursor); |
736 | assert_always(close_error == 0); |
737 | cursor = NULL; |
738 | _scan_direction = DB_PREV; |
739 | } |
740 | } |
741 | // cleanup |
742 | if (prev_key.data) tokudb::memory::free(prev_key.data); |
743 | if (cursor) close_error = cursor->c_close(cursor); |
744 | assert_always(close_error == 0); |
745 | |
746 | done: |
747 | // in case we timed out (bunch of deleted records) without hitting a |
748 | // single row |
749 | if (_rows == 0) |
750 | _rows = 1; |
751 | |
752 | // return cardinality |
753 | for (uint64_t i = 0; i < num_key_parts; i++) { |
754 | rec_per_key_part[i] = _rows / unique_rows[i]; |
755 | } |
756 | return error; |
757 | } |
758 | |
759 | } // namespace analyze |
760 | } // namespace tokudb |
761 | |
762 | |
763 | int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) { |
764 | TOKUDB_HANDLER_DBUG_ENTER("%s" , share->table_name()); |
765 | int result = HA_ADMIN_OK; |
766 | tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd); |
767 | |
768 | switch (mode) { |
769 | case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS: |
770 | result = share->analyze_recount_rows(thd, transaction); |
771 | break; |
772 | case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD: |
773 | share->lock(); |
774 | result = share->analyze_standard(thd, transaction); |
775 | share->unlock(); |
776 | break; |
777 | case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL: |
778 | share->cancel_background_jobs(); |
779 | break; |
780 | default: |
781 | break; // no-op |
782 | } |
783 | TOKUDB_HANDLER_DBUG_RETURN(result); |
784 | } |
785 | |
786 | int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) { |
787 | TOKUDB_HANDLER_DBUG_ENTER("%s" , table_name()); |
788 | |
789 | assert_always(thd != NULL); |
790 | |
791 | int result = HA_ADMIN_OK; |
792 | |
793 | tokudb::analyze::recount_rows_t* job |
794 | = new tokudb::analyze::recount_rows_t(true, thd, this, txn); |
795 | assert_always(job != NULL); |
796 | |
797 | // job->destroy will drop the ref |
798 | addref(); |
799 | unlock(); |
800 | |
801 | bool ret = tokudb::background::_job_manager-> |
802 | run_job(job, tokudb::sysvars::analyze_in_background(thd)); |
803 | |
804 | if (!ret) { |
805 | job->destroy(); |
806 | delete job; |
807 | result = HA_ADMIN_FAILED; |
808 | } |
809 | |
810 | TOKUDB_HANDLER_DBUG_RETURN(result); |
811 | } |
812 | |
813 | // on entry, if txn is !NULL, it is a user session invoking ANALYZE directly |
814 | // and no lock will be held on 'this', else if txn is NULL it is an auto and |
815 | // 'this' will be locked. |
816 | int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) { |
817 | TOKUDB_HANDLER_DBUG_ENTER("%s" , table_name()); |
818 | |
819 | assert_always(thd != NULL); |
820 | assert_debug(_mutex.is_owned_by_me() == true); |
821 | |
822 | int result = HA_ADMIN_OK; |
823 | |
824 | // stub out analyze if optimize is remapped to alter recreate + analyze |
825 | // when not auto analyze or if this is an alter |
826 | if ((txn && |
827 | thd_sql_command(thd) != SQLCOM_ANALYZE && |
828 | thd_sql_command(thd) != SQLCOM_ALTER_TABLE) || |
829 | thd_sql_command(thd) == SQLCOM_ALTER_TABLE) { |
830 | TOKUDB_HANDLER_DBUG_RETURN(result); |
831 | } |
832 | |
833 | tokudb::analyze::standard_t* job |
834 | = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd, |
835 | this, txn); |
836 | assert_always(job != NULL); |
837 | |
838 | // akin to calling addref, but we know, right here, right now, everything |
839 | // in the share is set up, files open, etc... |
840 | // job->destroy will drop the ref |
841 | _use_count++; |
842 | |
843 | // don't want any autos kicking off while we are analyzing |
844 | disallow_auto_analysis(); |
845 | |
846 | unlock(); |
847 | |
848 | bool ret = |
849 | tokudb::background::_job_manager->run_job( |
850 | job, |
851 | tokudb::sysvars::analyze_in_background(thd)); |
852 | |
853 | if (!ret) { |
854 | job->destroy(); |
855 | delete job; |
856 | result = HA_ADMIN_FAILED; |
857 | } |
858 | |
859 | lock(); |
860 | |
861 | TOKUDB_HANDLER_DBUG_RETURN(result); |
862 | } |
863 | |
864 | |
865 | typedef struct hot_optimize_context { |
866 | THD* thd; |
867 | char* write_status_msg; |
868 | ha_tokudb* ha; |
869 | uint progress_stage; |
870 | uint current_table; |
871 | uint num_tables; |
872 | float progress_limit; |
873 | uint64_t progress_last_time; |
874 | uint64_t throttle; |
875 | } *HOT_OPTIMIZE_CONTEXT; |
876 | |
877 | static int hot_optimize_progress_fun(void *, float progress) { |
878 | HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra; |
879 | if (thd_kill_level(context->thd)) { |
880 | sprintf( |
881 | context->write_status_msg, |
882 | "The process has been killed, aborting hot optimize." ); |
883 | return ER_ABORTING_CONNECTION; |
884 | } |
885 | float percentage = progress * 100; |
886 | sprintf( |
887 | context->write_status_msg, |
888 | "Optimization of index %u of %u about %.lf%% done" , |
889 | context->current_table + 1, |
890 | context->num_tables, |
891 | percentage); |
892 | thd_proc_info(context->thd, context->write_status_msg); |
893 | #ifdef HA_TOKUDB_HAS_THD_PROGRESS |
894 | if (context->progress_stage < context->current_table) { |
895 | // the progress stage is behind the current table, so move up |
896 | // to the next stage and set the progress stage to current. |
897 | thd_progress_next_stage(context->thd); |
898 | context->progress_stage = context->current_table; |
899 | } |
900 | // the percentage we report here is for the current stage/db |
901 | thd_progress_report(context->thd, (unsigned long long) percentage, 100); |
902 | #endif |
903 | |
904 | // throttle the optimize table |
905 | if (context->throttle) { |
906 | uint64_t time_now = toku_current_time_microsec(); |
907 | uint64_t dt = time_now - context->progress_last_time; |
908 | uint64_t throttle_time = 1000000ULL / context->throttle; |
909 | if (throttle_time > dt) { |
910 | usleep(throttle_time - dt); |
911 | } |
912 | context->progress_last_time = toku_current_time_microsec(); |
913 | } |
914 | |
915 | // return 1 if progress has reach the progress limit |
916 | return progress >= context->progress_limit; |
917 | } |
918 | |
919 | // flatten all DB's in this table, to do so, peform hot optimize on each db |
920 | int ha_tokudb::do_optimize(THD* thd) { |
921 | TOKUDB_HANDLER_DBUG_ENTER("%s" , share->table_name()); |
922 | int error = 0; |
923 | const char* orig_proc_info = tokudb_thd_get_proc_info(thd); |
924 | uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); |
925 | |
926 | #ifdef HA_TOKUDB_HAS_THD_PROGRESS |
927 | // each DB is its own stage. as HOT goes through each db, we'll |
928 | // move on to the next stage. |
929 | thd_progress_init(thd, curr_num_DBs); |
930 | #endif |
931 | |
932 | // for each DB, run optimize and hot_optimize |
933 | for (uint i = 0; i < curr_num_DBs; i++) { |
934 | // only optimize the index if it matches the optimize_index_name |
935 | // session variable |
936 | const char* optimize_index_name = |
937 | tokudb::sysvars::optimize_index_name(thd); |
938 | if (optimize_index_name) { |
939 | const char* this_index_name = |
940 | i >= table_share->keys ? |
941 | "primary" : |
942 | table_share->key_info[i].name.str; |
943 | if (strcasecmp(optimize_index_name, this_index_name) != 0) { |
944 | continue; |
945 | } |
946 | } |
947 | |
948 | DB* db = share->key_file[i]; |
949 | assert_always(db != NULL); |
950 | error = db->optimize(db); |
951 | if (error) { |
952 | goto cleanup; |
953 | } |
954 | |
955 | struct hot_optimize_context hc; |
956 | memset(&hc, 0, sizeof hc); |
957 | hc.thd = thd; |
958 | hc.write_status_msg = this->write_status_msg; |
959 | hc.ha = this; |
960 | hc.current_table = i; |
961 | hc.num_tables = curr_num_DBs; |
962 | hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd); |
963 | hc.progress_last_time = toku_current_time_microsec(); |
964 | hc.throttle = tokudb::sysvars::optimize_throttle(thd); |
965 | uint64_t loops_run; |
966 | error = |
967 | db->hot_optimize( |
968 | db, |
969 | NULL, |
970 | NULL, |
971 | hot_optimize_progress_fun, |
972 | &hc, |
973 | &loops_run); |
974 | if (error) { |
975 | goto cleanup; |
976 | } |
977 | } |
978 | error = 0; |
979 | |
980 | cleanup: |
981 | #ifdef HA_TOKUDB_HAS_THD_PROGRESS |
982 | thd_progress_end(thd); |
983 | #endif |
984 | thd_proc_info(thd, orig_proc_info); |
985 | TOKUDB_HANDLER_DBUG_RETURN(error); |
986 | } |
987 | |
988 | int ha_tokudb::optimize(THD* thd, HA_CHECK_OPT* check_opt) { |
989 | TOKUDB_HANDLER_DBUG_ENTER("%s" , share->table_name()); |
990 | int error; |
991 | #if TOKU_OPTIMIZE_WITH_RECREATE |
992 | error = HA_ADMIN_TRY_ALTER; |
993 | #else |
994 | error = do_optimize(thd); |
995 | #endif |
996 | TOKUDB_HANDLER_DBUG_RETURN(error); |
997 | } |
998 | |
999 | struct check_context { |
1000 | THD* thd; |
1001 | }; |
1002 | |
1003 | static int ha_tokudb_check_progress(void* , float progress) { |
1004 | struct check_context* context = (struct check_context*)extra; |
1005 | int result = 0; |
1006 | if (thd_kill_level(context->thd)) |
1007 | result = ER_ABORTING_CONNECTION; |
1008 | return result; |
1009 | } |
1010 | |
1011 | static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) { |
1012 | if (thd->vio_ok()) { |
1013 | char tablename[ |
1014 | table->s->db.length + 1 + |
1015 | table->s->table_name.length + 1]; |
1016 | snprintf( |
1017 | tablename, |
1018 | sizeof(tablename), |
1019 | "%.*s.%.*s" , |
1020 | (int)table->s->db.length, |
1021 | table->s->db.str, |
1022 | (int)table->s->table_name.length, |
1023 | table->s->table_name.str); |
1024 | thd->protocol->prepare_for_resend(); |
1025 | thd->protocol->store(tablename, strlen(tablename), system_charset_info); |
1026 | thd->protocol->store("check" , 5, system_charset_info); |
1027 | thd->protocol->store("info" , 4, system_charset_info); |
1028 | thd->protocol->store(msg, strlen(msg), system_charset_info); |
1029 | thd->protocol->write(); |
1030 | } |
1031 | } |
1032 | |
1033 | int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) { |
1034 | TOKUDB_HANDLER_DBUG_ENTER("%s" , share->table_name()); |
1035 | const char* orig_proc_info = tokudb_thd_get_proc_info(thd); |
1036 | int result = HA_ADMIN_OK; |
1037 | int r; |
1038 | |
1039 | int keep_going = 1; |
1040 | if (check_opt->flags & T_QUICK) { |
1041 | keep_going = 0; |
1042 | } |
1043 | if (check_opt->flags & T_EXTEND) { |
1044 | keep_going = 1; |
1045 | } |
1046 | |
1047 | r = acquire_table_lock(transaction, lock_write); |
1048 | if (r != 0) |
1049 | result = HA_ADMIN_INTERNAL_ERROR; |
1050 | if (result == HA_ADMIN_OK) { |
1051 | uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key); |
1052 | snprintf( |
1053 | write_status_msg, |
1054 | sizeof(write_status_msg), |
1055 | "%s primary=%d num=%d" , |
1056 | share->table_name(), |
1057 | primary_key, |
1058 | num_DBs); |
1059 | if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { |
1060 | ha_tokudb_check_info(thd, table, write_status_msg); |
1061 | time_t now = time(0); |
1062 | char timebuf[32]; |
1063 | TOKUDB_HANDLER_TRACE( |
1064 | "%.24s %s" , |
1065 | ctime_r(&now, timebuf), |
1066 | write_status_msg); |
1067 | } |
1068 | for (uint i = 0; i < num_DBs; i++) { |
1069 | DB* db = share->key_file[i]; |
1070 | assert_always(db != NULL); |
1071 | const char* kname = |
1072 | i == primary_key ? "primary" : table_share->key_info[i].name.str; |
1073 | snprintf( |
1074 | write_status_msg, |
1075 | sizeof(write_status_msg), |
1076 | "%s key=%s %u" , |
1077 | share->table_name(), |
1078 | kname, |
1079 | i); |
1080 | thd_proc_info(thd, write_status_msg); |
1081 | if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { |
1082 | ha_tokudb_check_info(thd, table, write_status_msg); |
1083 | time_t now = time(0); |
1084 | char timebuf[32]; |
1085 | TOKUDB_HANDLER_TRACE( |
1086 | "%.24s %s" , |
1087 | ctime_r(&now, timebuf), |
1088 | write_status_msg); |
1089 | } |
1090 | struct check_context check_context = { thd }; |
1091 | r = db->verify_with_progress( |
1092 | db, |
1093 | ha_tokudb_check_progress, |
1094 | &check_context, |
1095 | (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0, |
1096 | keep_going); |
1097 | if (r != 0) { |
1098 | char msg[32 + strlen(kname)]; |
1099 | sprintf(msg, "Corrupt %s" , kname); |
1100 | ha_tokudb_check_info(thd, table, msg); |
1101 | } |
1102 | snprintf( |
1103 | write_status_msg, |
1104 | sizeof(write_status_msg), |
1105 | "%s key=%s %u result=%d" , |
1106 | share->full_table_name(), |
1107 | kname, |
1108 | i, |
1109 | r); |
1110 | thd_proc_info(thd, write_status_msg); |
1111 | if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { |
1112 | ha_tokudb_check_info(thd, table, write_status_msg); |
1113 | time_t now = time(0); |
1114 | char timebuf[32]; |
1115 | TOKUDB_HANDLER_TRACE( |
1116 | "%.24s %s" , |
1117 | ctime_r(&now, timebuf), |
1118 | write_status_msg); |
1119 | } |
1120 | if (result == HA_ADMIN_OK && r != 0) { |
1121 | result = HA_ADMIN_CORRUPT; |
1122 | if (!keep_going) |
1123 | break; |
1124 | } |
1125 | } |
1126 | } |
1127 | thd_proc_info(thd, orig_proc_info); |
1128 | TOKUDB_HANDLER_DBUG_RETURN(result); |
1129 | } |
1130 | |