1/*
2 Copyright (c) 2015, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17#ifdef _WIN32
18#define _CRT_RAND_S
19#endif
20#include <my_global.h>
21#ifdef _WIN32
22#include <stdlib.h>
23#define rand_r rand_s
24#endif
25/* This C++ file's header file */
26#include "./properties_collector.h"
27
28/* Standard C++ header files */
29#include <algorithm>
30#include <map>
31#include <string>
32#include <vector>
33
34/* MySQL header files */
35#include "./log.h"
36#include "./my_stacktrace.h"
37#include "./sql_array.h"
38
39/* MyRocks header files */
40#include "./rdb_datadic.h"
41#include "./rdb_utils.h"
42
43namespace myrocks {
44
45std::atomic<uint64_t> rocksdb_num_sst_entry_put(0);
46std::atomic<uint64_t> rocksdb_num_sst_entry_delete(0);
47std::atomic<uint64_t> rocksdb_num_sst_entry_singledelete(0);
48std::atomic<uint64_t> rocksdb_num_sst_entry_merge(0);
49std::atomic<uint64_t> rocksdb_num_sst_entry_other(0);
50my_bool rocksdb_compaction_sequential_deletes_count_sd = false;
51
52Rdb_tbl_prop_coll::Rdb_tbl_prop_coll(Rdb_ddl_manager *const ddl_manager,
53 const Rdb_compact_params &params,
54 const uint32_t &cf_id,
55 const uint8_t &table_stats_sampling_pct)
56 : m_cf_id(cf_id), m_ddl_manager(ddl_manager), m_last_stats(nullptr),
57 m_rows(0l), m_window_pos(0l), m_deleted_rows(0l), m_max_deleted_rows(0l),
58 m_file_size(0), m_params(params),
59 m_cardinality_collector(table_stats_sampling_pct) {
60 DBUG_ASSERT(ddl_manager != nullptr);
61
62 m_deleted_rows_window.resize(m_params.m_window, false);
63}
64
65/*
66 This function is called by RocksDB for every key in the SST file
67*/
68rocksdb::Status Rdb_tbl_prop_coll::AddUserKey(const rocksdb::Slice &key,
69 const rocksdb::Slice &value,
70 rocksdb::EntryType type,
71 rocksdb::SequenceNumber seq,
72 uint64_t file_size) {
73 if (key.size() >= 4) {
74 AdjustDeletedRows(type);
75
76 m_rows++;
77
78 CollectStatsForRow(key, value, type, file_size);
79 }
80
81 return rocksdb::Status::OK();
82}
83
84void Rdb_tbl_prop_coll::AdjustDeletedRows(rocksdb::EntryType type) {
85 if (m_params.m_window > 0) {
86 // record the "is deleted" flag into the sliding window
87 // the sliding window is implemented as a circular buffer
88 // in m_deleted_rows_window vector
89 // the current position in the circular buffer is pointed at by
90 // m_rows % m_deleted_rows_window.size()
91 // m_deleted_rows is the current number of 1's in the vector
92 // --update the counter for the element which will be overridden
93 const bool is_delete = (type == rocksdb::kEntryDelete ||
94 (type == rocksdb::kEntrySingleDelete &&
95 rocksdb_compaction_sequential_deletes_count_sd));
96
97 // Only make changes if the value at the current position needs to change
98 if (is_delete != m_deleted_rows_window[m_window_pos]) {
99 // Set or clear the flag at the current position as appropriate
100 m_deleted_rows_window[m_window_pos] = is_delete;
101 if (!is_delete) {
102 m_deleted_rows--;
103 } else if (++m_deleted_rows > m_max_deleted_rows) {
104 m_max_deleted_rows = m_deleted_rows;
105 }
106 }
107
108 if (++m_window_pos == m_params.m_window) {
109 m_window_pos = 0;
110 }
111 }
112}
113
114Rdb_index_stats *Rdb_tbl_prop_coll::AccessStats(const rocksdb::Slice &key) {
115 GL_INDEX_ID gl_index_id;
116 gl_index_id.cf_id = m_cf_id;
117 gl_index_id.index_id = rdb_netbuf_to_uint32(reinterpret_cast<const uchar*>(key.data()));
118
119 if (m_last_stats == nullptr || m_last_stats->m_gl_index_id != gl_index_id) {
120 m_keydef = nullptr;
121
122 // starting a new table
123 // add the new element into m_stats
124 m_stats.emplace_back(gl_index_id);
125 m_last_stats = &m_stats.back();
126
127 if (m_ddl_manager) {
128 // safe_find() returns a std::shared_ptr<Rdb_key_def> with the count
129 // incremented (so it can't be deleted out from under us) and with
130 // the mutex locked (if setup has not occurred yet). We must make
131 // sure to free the mutex (via unblock_setup()) when we are done
132 // with this object. Currently this happens earlier in this function
133 // when we are switching to a new Rdb_key_def and when this object
134 // is destructed.
135 m_keydef = m_ddl_manager->safe_find(gl_index_id);
136 if (m_keydef != nullptr) {
137 // resize the array to the number of columns.
138 // It will be initialized with zeroes
139 m_last_stats->m_distinct_keys_per_prefix.resize(
140 m_keydef->get_key_parts());
141 m_last_stats->m_name = m_keydef->get_name();
142 }
143 }
144 m_cardinality_collector.Reset();
145 }
146
147 return m_last_stats;
148}
149
150void Rdb_tbl_prop_coll::CollectStatsForRow(const rocksdb::Slice &key,
151 const rocksdb::Slice &value,
152 const rocksdb::EntryType &type,
153 const uint64_t &file_size) {
154 auto stats = AccessStats(key);
155
156 stats->m_data_size += key.size() + value.size();
157
158 // Incrementing per-index entry-type statistics
159 switch (type) {
160 case rocksdb::kEntryPut:
161 stats->m_rows++;
162 break;
163 case rocksdb::kEntryDelete:
164 stats->m_entry_deletes++;
165 break;
166 case rocksdb::kEntrySingleDelete:
167 stats->m_entry_single_deletes++;
168 break;
169 case rocksdb::kEntryMerge:
170 stats->m_entry_merges++;
171 break;
172 case rocksdb::kEntryOther:
173 stats->m_entry_others++;
174 break;
175 default:
176 // NO_LINT_DEBUG
177 sql_print_error("RocksDB: Unexpected entry type found: %u. "
178 "This should not happen so aborting the system.",
179 type);
180 abort();
181 break;
182 }
183
184 stats->m_actual_disk_size += file_size - m_file_size;
185 m_file_size = file_size;
186
187 if (m_keydef != nullptr) {
188 m_cardinality_collector.ProcessKey(key, m_keydef.get(), stats);
189 }
190}
191
192const char *Rdb_tbl_prop_coll::INDEXSTATS_KEY = "__indexstats__";
193
194/*
195 This function is called by RocksDB to compute properties to store in sst file
196*/
197rocksdb::Status
198Rdb_tbl_prop_coll::Finish(rocksdb::UserCollectedProperties *const properties) {
199 uint64_t num_sst_entry_put = 0;
200 uint64_t num_sst_entry_delete = 0;
201 uint64_t num_sst_entry_singledelete = 0;
202 uint64_t num_sst_entry_merge = 0;
203 uint64_t num_sst_entry_other = 0;
204
205 DBUG_ASSERT(properties != nullptr);
206
207 for (auto it = m_stats.begin(); it != m_stats.end(); it++) {
208 num_sst_entry_put += it->m_rows;
209 num_sst_entry_delete += it->m_entry_deletes;
210 num_sst_entry_singledelete += it->m_entry_single_deletes;
211 num_sst_entry_merge += it->m_entry_merges;
212 num_sst_entry_other += it->m_entry_others;
213 }
214
215 if (num_sst_entry_put > 0) {
216 rocksdb_num_sst_entry_put += num_sst_entry_put;
217 }
218
219 if (num_sst_entry_delete > 0) {
220 rocksdb_num_sst_entry_delete += num_sst_entry_delete;
221 }
222
223 if (num_sst_entry_singledelete > 0) {
224 rocksdb_num_sst_entry_singledelete += num_sst_entry_singledelete;
225 }
226
227 if (num_sst_entry_merge > 0) {
228 rocksdb_num_sst_entry_merge += num_sst_entry_merge;
229 }
230
231 if (num_sst_entry_other > 0) {
232 rocksdb_num_sst_entry_other += num_sst_entry_other;
233 }
234
235 for (Rdb_index_stats &stat : m_stats) {
236 m_cardinality_collector.AdjustStats(&stat);
237 }
238 properties->insert({INDEXSTATS_KEY, Rdb_index_stats::materialize(m_stats)});
239 return rocksdb::Status::OK();
240}
241
242bool Rdb_tbl_prop_coll::NeedCompact() const {
243 return m_params.m_deletes && (m_params.m_window > 0) &&
244 (m_file_size > m_params.m_file_size) &&
245 (m_max_deleted_rows > m_params.m_deletes);
246}
247
248/*
249 Returns the same as above, but in human-readable way for logging
250*/
251rocksdb::UserCollectedProperties
252Rdb_tbl_prop_coll::GetReadableProperties() const {
253 std::string s;
254#ifdef DBUG_OFF
255 s.append("[...");
256 s.append(std::to_string(m_stats.size()));
257 s.append(" records...]");
258#else
259 bool first = true;
260 for (auto it : m_stats) {
261 if (first) {
262 first = false;
263 } else {
264 s.append(",");
265 }
266 s.append(GetReadableStats(it));
267 }
268#endif
269 return rocksdb::UserCollectedProperties{{INDEXSTATS_KEY, s}};
270}
271
272std::string Rdb_tbl_prop_coll::GetReadableStats(const Rdb_index_stats &it) {
273 std::string s;
274 s.append("(");
275 s.append(std::to_string(it.m_gl_index_id.cf_id));
276 s.append(", ");
277 s.append(std::to_string(it.m_gl_index_id.index_id));
278 s.append("):{name:");
279 s.append(it.m_name);
280 s.append(", size:");
281 s.append(std::to_string(it.m_data_size));
282 s.append(", m_rows:");
283 s.append(std::to_string(it.m_rows));
284 s.append(", m_actual_disk_size:");
285 s.append(std::to_string(it.m_actual_disk_size));
286 s.append(", deletes:");
287 s.append(std::to_string(it.m_entry_deletes));
288 s.append(", single_deletes:");
289 s.append(std::to_string(it.m_entry_single_deletes));
290 s.append(", merges:");
291 s.append(std::to_string(it.m_entry_merges));
292 s.append(", others:");
293 s.append(std::to_string(it.m_entry_others));
294 s.append(", distincts per prefix: [");
295 for (auto num : it.m_distinct_keys_per_prefix) {
296 s.append(std::to_string(num));
297 s.append(" ");
298 }
299 s.append("]}");
300 return s;
301}
302
303/*
304 Given the properties of an SST file, reads the stats from it and returns it.
305*/
306
307void Rdb_tbl_prop_coll::read_stats_from_tbl_props(
308 const std::shared_ptr<const rocksdb::TableProperties> &table_props,
309 std::vector<Rdb_index_stats> *const out_stats_vector) {
310 DBUG_ASSERT(out_stats_vector != nullptr);
311 const auto &user_properties = table_props->user_collected_properties;
312 const auto it2 = user_properties.find(std::string(INDEXSTATS_KEY));
313 if (it2 != user_properties.end()) {
314 auto result MY_ATTRIBUTE((__unused__)) =
315 Rdb_index_stats::unmaterialize(it2->second, out_stats_vector);
316 DBUG_ASSERT(result == 0);
317 }
318}
319
320/*
321 Serializes an array of Rdb_index_stats into a network string.
322*/
323std::string
324Rdb_index_stats::materialize(const std::vector<Rdb_index_stats> &stats) {
325 String ret;
326 rdb_netstr_append_uint16(&ret, INDEX_STATS_VERSION_ENTRY_TYPES);
327 for (const auto &i : stats) {
328 rdb_netstr_append_uint32(&ret, i.m_gl_index_id.cf_id);
329 rdb_netstr_append_uint32(&ret, i.m_gl_index_id.index_id);
330 DBUG_ASSERT(sizeof i.m_data_size <= 8);
331 rdb_netstr_append_uint64(&ret, i.m_data_size);
332 rdb_netstr_append_uint64(&ret, i.m_rows);
333 rdb_netstr_append_uint64(&ret, i.m_actual_disk_size);
334 rdb_netstr_append_uint64(&ret, i.m_distinct_keys_per_prefix.size());
335 rdb_netstr_append_uint64(&ret, i.m_entry_deletes);
336 rdb_netstr_append_uint64(&ret, i.m_entry_single_deletes);
337 rdb_netstr_append_uint64(&ret, i.m_entry_merges);
338 rdb_netstr_append_uint64(&ret, i.m_entry_others);
339 for (const auto &num_keys : i.m_distinct_keys_per_prefix) {
340 rdb_netstr_append_uint64(&ret, num_keys);
341 }
342 }
343
344 return std::string((char *)ret.ptr(), ret.length());
345}
346
347/**
348 @brief
349 Reads an array of Rdb_index_stats from a string.
350 @return HA_EXIT_FAILURE if it detects any inconsistency in the input
351 @return HA_EXIT_SUCCESS if completes successfully
352*/
353int Rdb_index_stats::unmaterialize(const std::string &s,
354 std::vector<Rdb_index_stats> *const ret) {
355 const uchar *p = rdb_std_str_to_uchar_ptr(s);
356 const uchar *const p2 = p + s.size();
357
358 DBUG_ASSERT(ret != nullptr);
359
360 if (p + 2 > p2) {
361 return HA_EXIT_FAILURE;
362 }
363
364 const int version = rdb_netbuf_read_uint16(&p);
365 Rdb_index_stats stats;
366 // Make sure version is within supported range.
367 if (version < INDEX_STATS_VERSION_INITIAL ||
368 version > INDEX_STATS_VERSION_ENTRY_TYPES) {
369 // NO_LINT_DEBUG
370 sql_print_error("Index stats version %d was outside of supported range. "
371 "This should not happen so aborting the system.",
372 version);
373 abort();
374 }
375
376 size_t needed = sizeof(stats.m_gl_index_id.cf_id) +
377 sizeof(stats.m_gl_index_id.index_id) +
378 sizeof(stats.m_data_size) + sizeof(stats.m_rows) +
379 sizeof(stats.m_actual_disk_size) + sizeof(uint64);
380 if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
381 needed += sizeof(stats.m_entry_deletes) +
382 sizeof(stats.m_entry_single_deletes) +
383 sizeof(stats.m_entry_merges) + sizeof(stats.m_entry_others);
384 }
385
386 while (p < p2) {
387 if (p + needed > p2) {
388 return HA_EXIT_FAILURE;
389 }
390 rdb_netbuf_read_gl_index(&p, &stats.m_gl_index_id);
391 stats.m_data_size = rdb_netbuf_read_uint64(&p);
392 stats.m_rows = rdb_netbuf_read_uint64(&p);
393 stats.m_actual_disk_size = rdb_netbuf_read_uint64(&p);
394 stats.m_distinct_keys_per_prefix.resize(rdb_netbuf_read_uint64(&p));
395 if (version >= INDEX_STATS_VERSION_ENTRY_TYPES) {
396 stats.m_entry_deletes = rdb_netbuf_read_uint64(&p);
397 stats.m_entry_single_deletes = rdb_netbuf_read_uint64(&p);
398 stats.m_entry_merges = rdb_netbuf_read_uint64(&p);
399 stats.m_entry_others = rdb_netbuf_read_uint64(&p);
400 }
401 if (p +
402 stats.m_distinct_keys_per_prefix.size() *
403 sizeof(stats.m_distinct_keys_per_prefix[0]) >
404 p2) {
405 return HA_EXIT_FAILURE;
406 }
407 for (std::size_t i = 0; i < stats.m_distinct_keys_per_prefix.size(); i++) {
408 stats.m_distinct_keys_per_prefix[i] = rdb_netbuf_read_uint64(&p);
409 }
410 ret->push_back(stats);
411 }
412 return HA_EXIT_SUCCESS;
413}
414
415/*
416 Merges one Rdb_index_stats into another. Can be used to come up with the stats
417 for the index based on stats for each sst
418*/
419void Rdb_index_stats::merge(const Rdb_index_stats &s, const bool &increment,
420 const int64_t &estimated_data_len) {
421 std::size_t i;
422
423 DBUG_ASSERT(estimated_data_len >= 0);
424
425 m_gl_index_id = s.m_gl_index_id;
426 if (m_distinct_keys_per_prefix.size() < s.m_distinct_keys_per_prefix.size()) {
427 m_distinct_keys_per_prefix.resize(s.m_distinct_keys_per_prefix.size());
428 }
429 if (increment) {
430 m_rows += s.m_rows;
431 m_data_size += s.m_data_size;
432
433 /*
434 The Data_length and Avg_row_length are trailing statistics, meaning
435 they don't get updated for the current SST until the next SST is
436 written. So, if rocksdb reports the data_length as 0,
437 we make a reasoned estimate for the data_file_length for the
438 index in the current SST.
439 */
440 m_actual_disk_size += s.m_actual_disk_size ? s.m_actual_disk_size
441 : estimated_data_len * s.m_rows;
442 m_entry_deletes += s.m_entry_deletes;
443 m_entry_single_deletes += s.m_entry_single_deletes;
444 m_entry_merges += s.m_entry_merges;
445 m_entry_others += s.m_entry_others;
446 if (s.m_distinct_keys_per_prefix.size() > 0) {
447 for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
448 m_distinct_keys_per_prefix[i] += s.m_distinct_keys_per_prefix[i];
449 }
450 } else {
451 for (i = 0; i < m_distinct_keys_per_prefix.size(); i++) {
452 m_distinct_keys_per_prefix[i] +=
453 s.m_rows >> (m_distinct_keys_per_prefix.size() - i - 1);
454 }
455 }
456 } else {
457 m_rows -= s.m_rows;
458 m_data_size -= s.m_data_size;
459 m_actual_disk_size -= s.m_actual_disk_size ? s.m_actual_disk_size
460 : estimated_data_len * s.m_rows;
461 m_entry_deletes -= s.m_entry_deletes;
462 m_entry_single_deletes -= s.m_entry_single_deletes;
463 m_entry_merges -= s.m_entry_merges;
464 m_entry_others -= s.m_entry_others;
465 if (s.m_distinct_keys_per_prefix.size() > 0) {
466 for (i = 0; i < s.m_distinct_keys_per_prefix.size(); i++) {
467 m_distinct_keys_per_prefix[i] -= s.m_distinct_keys_per_prefix[i];
468 }
469 } else {
470 for (i = 0; i < m_distinct_keys_per_prefix.size(); i++) {
471 m_distinct_keys_per_prefix[i] -=
472 s.m_rows >> (m_distinct_keys_per_prefix.size() - i - 1);
473 }
474 }
475 }
476}
477
478Rdb_tbl_card_coll::Rdb_tbl_card_coll(const uint8_t &table_stats_sampling_pct)
479 : m_table_stats_sampling_pct(table_stats_sampling_pct),
480 m_seed(time(nullptr)) {}
481
482bool Rdb_tbl_card_coll::IsSampingDisabled() {
483 // Zero means that we'll use all the keys to update statistics.
484 return m_table_stats_sampling_pct == 0 ||
485 RDB_TBL_STATS_SAMPLE_PCT_MAX == m_table_stats_sampling_pct;
486}
487
488bool Rdb_tbl_card_coll::ShouldCollectStats() {
489 if (IsSampingDisabled()) {
490 return true; // collect every key
491 }
492
493 const int val = rand_r(&m_seed) % (RDB_TBL_STATS_SAMPLE_PCT_MAX -
494 RDB_TBL_STATS_SAMPLE_PCT_MIN + 1) +
495 RDB_TBL_STATS_SAMPLE_PCT_MIN;
496
497 DBUG_ASSERT(val >= RDB_TBL_STATS_SAMPLE_PCT_MIN);
498 DBUG_ASSERT(val <= RDB_TBL_STATS_SAMPLE_PCT_MAX);
499
500 return val <= m_table_stats_sampling_pct;
501}
502
503void Rdb_tbl_card_coll::ProcessKey(const rocksdb::Slice &key,
504 const Rdb_key_def *keydef,
505 Rdb_index_stats *stats) {
506 if (ShouldCollectStats()) {
507 std::size_t column = 0;
508 bool new_key = true;
509
510 if (!m_last_key.empty()) {
511 rocksdb::Slice last(m_last_key.data(), m_last_key.size());
512 new_key = (keydef->compare_keys(&last, &key, &column) == 0);
513 }
514
515 if (new_key) {
516 DBUG_ASSERT(column <= stats->m_distinct_keys_per_prefix.size());
517
518 for (auto i = column; i < stats->m_distinct_keys_per_prefix.size(); i++) {
519 stats->m_distinct_keys_per_prefix[i]++;
520 }
521
522 // assign new last_key for the next call
523 // however, we only need to change the last key
524 // if one of the first n-1 columns is different
525 // If the n-1 prefix is the same, no sense in storing
526 // the new key
527 if (column < stats->m_distinct_keys_per_prefix.size()) {
528 m_last_key.assign(key.data(), key.size());
529 }
530 }
531 }
532}
533
534void Rdb_tbl_card_coll::Reset() { m_last_key.clear(); }
535
536// We need to adjust the index cardinality numbers based on the sampling
537// rate so that the output of "SHOW INDEX" command will reflect reality
538// more closely. It will still be an approximation, just a better one.
539void Rdb_tbl_card_coll::AdjustStats(Rdb_index_stats *stats) {
540 if (IsSampingDisabled()) {
541 // no sampling was done, return as stats is
542 return;
543 }
544 for (int64_t &num_keys : stats->m_distinct_keys_per_prefix) {
545 num_keys = num_keys * 100 / m_table_stats_sampling_pct;
546 }
547}
548
549} // namespace myrocks
550