1/*
2 Copyright (c) 2012,2013 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17#ifdef USE_PRAGMA_IMPLEMENTATION
18#pragma implementation // gcc: Class implementation
19#endif
20
21/* For use of 'PRIu64': */
22#define __STDC_FORMAT_MACROS
23
24#include <my_global.h>
25/* This C++ file's header file */
26#include "./rdb_datadic.h"
27
28#include <inttypes.h>
29/* C++ standard header files */
30#include <algorithm>
31#include <array>
32#include <limits>
33#include <map>
34#include <set>
35#include <string>
36#include <utility>
37#include <vector>
38
39/* MySQL header files */
40#include "./field.h"
41#include "./key.h"
42#include "./m_ctype.h"
43#include "./my_bit.h"
44#include "./my_bitmap.h"
45#include "./sql_table.h"
46
47/* MyRocks header files */
48#include "./ha_rocksdb_proto.h"
49#include "./my_stacktrace.h"
50#include "./rdb_cf_manager.h"
51#include "./rdb_utils.h"
52
53namespace myrocks {
54
55void get_mem_comparable_space(const CHARSET_INFO *cs,
56 const std::vector<uchar> **xfrm, size_t *xfrm_len,
57 size_t *mb_len);
58
59/*
60 MariaDB's replacement for FB/MySQL Field::check_field_name_match :
61*/
62inline bool field_check_field_name_match(Field *field, const char *name)
63{
64 return (0 == my_strcasecmp(system_charset_info,
65 field->field_name.str,
66 name));
67}
68
69
70/*
71 Rdb_key_def class implementation
72*/
73
74Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
75 rocksdb::ColumnFamilyHandle *cf_handle_arg,
76 uint16_t index_dict_version_arg, uchar index_type_arg,
77 uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
78 bool is_per_partition_cf_arg, const char *_name,
79 Rdb_index_stats _stats, uint32 index_flags_bitmap,
80 uint32 ttl_rec_offset, uint64 ttl_duration)
81 : m_index_number(indexnr_arg), m_cf_handle(cf_handle_arg),
82 m_index_dict_version(index_dict_version_arg),
83 m_index_type(index_type_arg), m_kv_format_version(kv_format_version_arg),
84 m_is_reverse_cf(is_reverse_cf_arg),
85 m_is_per_partition_cf(is_per_partition_cf_arg), m_name(_name),
86 m_stats(_stats), m_index_flags_bitmap(index_flags_bitmap),
87 m_ttl_rec_offset(ttl_rec_offset), m_ttl_duration(ttl_duration),
88 m_ttl_column(""), m_pk_part_no(nullptr), m_pack_info(nullptr),
89 m_keyno(keyno_arg), m_key_parts(0), m_ttl_pk_key_part_offset(UINT_MAX),
90 m_ttl_field_offset(UINT_MAX), m_prefix_extractor(nullptr),
91 m_maxlength(0) // means 'not intialized'
92{
93 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
94 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
95 m_total_index_flags_length =
96 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
97 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
98 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
99 m_total_index_flags_length == 0);
100 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
101 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
102 m_total_index_flags_length == 0);
103 DBUG_ASSERT(m_cf_handle != nullptr);
104}
105
106Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
107 : m_index_number(k.m_index_number), m_cf_handle(k.m_cf_handle),
108 m_is_reverse_cf(k.m_is_reverse_cf),
109 m_is_per_partition_cf(k.m_is_per_partition_cf), m_name(k.m_name),
110 m_stats(k.m_stats), m_index_flags_bitmap(k.m_index_flags_bitmap),
111 m_ttl_rec_offset(k.m_ttl_rec_offset), m_ttl_duration(k.m_ttl_duration),
112 m_ttl_column(k.m_ttl_column), m_pk_part_no(k.m_pk_part_no),
113 m_pack_info(k.m_pack_info), m_keyno(k.m_keyno),
114 m_key_parts(k.m_key_parts),
115 m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
116 m_ttl_field_offset(UINT_MAX), m_prefix_extractor(k.m_prefix_extractor),
117 m_maxlength(k.m_maxlength) {
118 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
119 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
120 m_total_index_flags_length =
121 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
122 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
123 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
124 m_total_index_flags_length == 0);
125 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
126 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
127 m_total_index_flags_length == 0);
128 if (k.m_pack_info) {
129 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
130 m_pack_info =
131 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
132 memcpy(m_pack_info, k.m_pack_info, size);
133 }
134
135 if (k.m_pk_part_no) {
136 const size_t size = sizeof(uint) * m_key_parts;
137 m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
138 memcpy(m_pk_part_no, k.m_pk_part_no, size);
139 }
140}
141
142Rdb_key_def::~Rdb_key_def() {
143 mysql_mutex_destroy(&m_mutex);
144
145 my_free(m_pk_part_no);
146 m_pk_part_no = nullptr;
147
148 my_free(m_pack_info);
149 m_pack_info = nullptr;
150}
151
152void Rdb_key_def::setup(const TABLE *const tbl,
153 const Rdb_tbl_def *const tbl_def) {
154 DBUG_ASSERT(tbl != nullptr);
155 DBUG_ASSERT(tbl_def != nullptr);
156
157 /*
158 Set max_length based on the table. This can be called concurrently from
159 multiple threads, so there is a mutex to protect this code.
160 */
161 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
162 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
163 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
164 if (!m_maxlength) {
165 RDB_MUTEX_LOCK_CHECK(m_mutex);
166 if (m_maxlength != 0) {
167 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
168 return;
169 }
170
171 KEY *key_info = nullptr;
172 KEY *pk_info = nullptr;
173 if (!is_hidden_pk) {
174 key_info = &tbl->key_info[m_keyno];
175 if (!hidden_pk_exists)
176 pk_info = &tbl->key_info[tbl->s->primary_key];
177 m_name = std::string(key_info->name.str);
178 } else {
179 m_name = HIDDEN_PK_NAME;
180 }
181
182 if (secondary_key)
183 m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
184 else {
185 pk_info = nullptr;
186 m_pk_key_parts = 0;
187 }
188
189 // "unique" secondary keys support:
190 m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
191
192 if (secondary_key) {
193 /*
194 In most cases, SQL layer puts PK columns as invisible suffix at the
195 end of secondary key. There are cases where this doesn't happen:
196 - unique secondary indexes.
197 - partitioned tables.
198
199 Internally, we always need PK columns as suffix (and InnoDB does,
200 too, if you were wondering).
201
202 The loop below will attempt to put all PK columns at the end of key
203 definition. Columns that are already included in the index (either
204 by the user or by "extended keys" feature) are not included for the
205 second time.
206 */
207 m_key_parts += m_pk_key_parts;
208 }
209
210 if (secondary_key)
211 m_pk_part_no = reinterpret_cast<uint *>(
212 my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
213 else
214 m_pk_part_no = nullptr;
215
216 const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
217 m_pack_info =
218 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
219
220 /*
221 Guaranteed not to error here as checks have been made already during
222 table creation.
223 */
224 Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
225 &m_ttl_field_offset, true);
226
227 size_t max_len = INDEX_NUMBER_SIZE;
228 int unpack_len = 0;
229 int max_part_len = 0;
230 bool simulating_extkey = false;
231 uint dst_i = 0;
232
233 uint keyno_to_set = m_keyno;
234 uint keypart_to_set = 0;
235
236 if (is_hidden_pk) {
237 Field *field = nullptr;
238 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
239 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
240 max_len += m_pack_info[dst_i].m_max_image_len;
241 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
242 dst_i++;
243 } else {
244 KEY_PART_INFO *key_part = key_info->key_part;
245
246 /* this loop also loops over the 'extended key' tail */
247 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
248 Field *const field = key_part ? key_part->field : nullptr;
249
250 if (simulating_extkey && !hidden_pk_exists) {
251 DBUG_ASSERT(secondary_key);
252 /* Check if this field is already present in the key definition */
253 bool found = false;
254 for (uint j= 0; j < key_info->ext_key_parts; j++) {
255 if (field->field_index ==
256 key_info->key_part[j].field->field_index &&
257 key_part->length == key_info->key_part[j].length) {
258 found = true;
259 break;
260 }
261 }
262
263 if (found) {
264 key_part++;
265 continue;
266 }
267 }
268
269 if (field && field->real_maybe_null())
270 max_len += 1; // NULL-byte
271
272 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
273 key_part ? key_part->length : 0);
274 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
275
276 if (pk_info) {
277 m_pk_part_no[dst_i] = -1;
278 for (uint j = 0; j < m_pk_key_parts; j++) {
279 if (field->field_index == pk_info->key_part[j].field->field_index) {
280 m_pk_part_no[dst_i] = j;
281 break;
282 }
283 }
284 } else if (secondary_key && hidden_pk_exists) {
285 /*
286 The hidden pk can never be part of the sk. So it is always
287 appended to the end of the sk.
288 */
289 m_pk_part_no[dst_i] = -1;
290 if (simulating_extkey)
291 m_pk_part_no[dst_i] = 0;
292 }
293
294 max_len += m_pack_info[dst_i].m_max_image_len;
295
296 max_part_len =
297 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
298
299 /*
300 Check key part name here, if it matches the TTL column then we store
301 the offset of the TTL key part here.
302 */
303 if (!m_ttl_column.empty() &&
304 field_check_field_name_match(field, m_ttl_column.c_str())) {
305 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
306 DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
307 DBUG_ASSERT(!field->real_maybe_null());
308 m_ttl_pk_key_part_offset = dst_i;
309 }
310
311 key_part++;
312 /*
313 For "unique" secondary indexes, pretend they have
314 "index extensions".
315
316 MariaDB also has this property: if an index has a partially-covered
317 column like KEY(varchar_col(N)), then the SQL layer will think it is
318 not "extended" with PK columns. The code below handles this case,
319 also.
320 */
321 if (secondary_key && src_i+1 == key_info->ext_key_parts) {
322 simulating_extkey = true;
323 if (!hidden_pk_exists) {
324 keyno_to_set = tbl->s->primary_key;
325 key_part = pk_info->key_part;
326 keypart_to_set = (uint)-1;
327 } else {
328 keyno_to_set = tbl_def->m_key_count - 1;
329 key_part = nullptr;
330 keypart_to_set = 0;
331 }
332 }
333
334 dst_i++;
335 }
336 }
337
338 m_key_parts = dst_i;
339
340 /* Initialize the memory needed by the stats structure */
341 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
342
343 /* Cache prefix extractor for bloom filter usage later */
344 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
345 m_prefix_extractor = opt.prefix_extractor;
346
347 /*
348 This should be the last member variable set before releasing the mutex
349 so that other threads can't see the object partially set up.
350 */
351 m_maxlength = max_len;
352
353 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
354 }
355}
356
357/*
358 Determine if the table has TTL enabled by parsing the table comment.
359
360 @param[IN] table_arg
361 @param[IN] tbl_def_arg
362 @param[OUT] ttl_duration Default TTL value parsed from table comment
363*/
364uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
365 const Rdb_tbl_def *const tbl_def_arg,
366 uint64 *ttl_duration) {
367 DBUG_ASSERT(table_arg != nullptr);
368 DBUG_ASSERT(tbl_def_arg != nullptr);
369 DBUG_ASSERT(ttl_duration != nullptr);
370 std::string table_comment(table_arg->s->comment.str,
371 table_arg->s->comment.length);
372
373 bool ttl_duration_per_part_match_found = false;
374 std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
375 table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
376 RDB_TTL_DURATION_QUALIFIER);
377
378 /* If we don't have a ttl duration, nothing to do here. */
379 if (ttl_duration_str.empty()) {
380 return HA_EXIT_SUCCESS;
381 }
382
383 /*
384 Catch errors where a non-integral value was used as ttl duration, strtoull
385 will return 0.
386 */
387 *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
388 if (!*ttl_duration) {
389 my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
390 return HA_EXIT_FAILURE;
391 }
392
393 return HA_EXIT_SUCCESS;
394}
395
396/*
397 Determine if the table has TTL enabled by parsing the table comment.
398
399 @param[IN] table_arg
400 @param[IN] tbl_def_arg
401 @param[OUT] ttl_column TTL column in the table
402 @param[IN] skip_checks Skip validation checks (when called in
403 setup())
404*/
405uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
406 const Rdb_tbl_def *const tbl_def_arg,
407 std::string *ttl_column,
408 uint *ttl_field_offset, bool skip_checks) {
409 std::string table_comment(table_arg->s->comment.str,
410 table_arg->s->comment.length);
411 /*
412 Check if there is a TTL column specified. Note that this is not required
413 and if omitted, an 8-byte ttl field will be prepended to each record
414 implicitly.
415 */
416 bool ttl_col_per_part_match_found = false;
417 std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
418 table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
419 RDB_TTL_COL_QUALIFIER);
420
421 if (skip_checks) {
422 for (uint i = 0; i < table_arg->s->fields; i++) {
423 Field *const field = table_arg->field[i];
424 if (field_check_field_name_match(field, ttl_col_str.c_str())) {
425 *ttl_column = ttl_col_str;
426 *ttl_field_offset = i;
427 }
428 }
429 return HA_EXIT_SUCCESS;
430 }
431
432 /* Check if TTL column exists in table */
433 if (!ttl_col_str.empty()) {
434 bool found = false;
435 for (uint i = 0; i < table_arg->s->fields; i++) {
436 Field *const field = table_arg->field[i];
437 if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
438 field->real_type() == MYSQL_TYPE_LONGLONG &&
439 field->key_type() == HA_KEYTYPE_ULONGLONG &&
440 !field->real_maybe_null()) {
441 *ttl_column = ttl_col_str;
442 *ttl_field_offset = i;
443 found = true;
444 break;
445 }
446 }
447
448 if (!found) {
449 my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
450 return HA_EXIT_FAILURE;
451 }
452 }
453
454 return HA_EXIT_SUCCESS;
455}
456
457const std::string
458Rdb_key_def::gen_qualifier_for_table(const char *const qualifier,
459 const std::string &partition_name) {
460 bool has_partition = !partition_name.empty();
461 std::string qualifier_str = "";
462
463 if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
464 return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
465 : qualifier_str + RDB_CF_NAME_QUALIFIER +
466 RDB_QUALIFIER_VALUE_SEP;
467 } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
468 return has_partition
469 ? gen_ttl_duration_qualifier_for_partition(partition_name)
470 : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
471 RDB_QUALIFIER_VALUE_SEP;
472 } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
473 return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
474 : qualifier_str + RDB_TTL_COL_QUALIFIER +
475 RDB_QUALIFIER_VALUE_SEP;
476 } else {
477 DBUG_ASSERT(0);
478 }
479
480 return qualifier_str;
481}
482
483/*
484 Formats the string and returns the column family name assignment part for a
485 specific partition.
486*/
487const std::string
488Rdb_key_def::gen_cf_name_qualifier_for_partition(const std::string &prefix) {
489 DBUG_ASSERT(!prefix.empty());
490
491 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
492 RDB_QUALIFIER_VALUE_SEP;
493}
494
495const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
496 const std::string &prefix) {
497 DBUG_ASSERT(!prefix.empty());
498
499 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
500 RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
501}
502
503const std::string
504Rdb_key_def::gen_ttl_col_qualifier_for_partition(const std::string &prefix) {
505 DBUG_ASSERT(!prefix.empty());
506
507 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
508 RDB_QUALIFIER_VALUE_SEP;
509}
510
511const std::string Rdb_key_def::parse_comment_for_qualifier(
512 const std::string &comment, const TABLE *const table_arg,
513 const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
514 const char *const qualifier) {
515 DBUG_ASSERT(table_arg != nullptr);
516 DBUG_ASSERT(tbl_def_arg != nullptr);
517 DBUG_ASSERT(per_part_match_found != nullptr);
518 DBUG_ASSERT(qualifier != nullptr);
519
520 std::string empty_result;
521
522 // Flag which marks if partition specific options were found.
523 *per_part_match_found = false;
524
525 if (comment.empty()) {
526 return empty_result;
527 }
528
529 // Let's fetch the comment for a index and check if there's a custom key
530 // name specified for a partition we are handling.
531 std::vector<std::string> v =
532 myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
533
534 std::string search_str = gen_qualifier_for_table(qualifier);
535
536 // If table has partitions then we need to check if user has requested
537 // qualifiers on a per partition basis.
538 //
539 // NOTE: this means if you specify a qualifier for a specific partition it
540 // will take precedence the 'table level' qualifier if one exists.
541 std::string search_str_part;
542 if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
543 std::string partition_name = tbl_def_arg->base_partition();
544 DBUG_ASSERT(!partition_name.empty());
545 search_str_part = gen_qualifier_for_table(qualifier, partition_name);
546 }
547
548 DBUG_ASSERT(!search_str.empty());
549
550 // Basic O(N) search for a matching assignment. At most we expect maybe
551 // ten or so elements here.
552 if (!search_str_part.empty()) {
553 for (const auto &it : v) {
554 if (it.substr(0, search_str_part.length()) == search_str_part) {
555 // We found a prefix match. Try to parse it as an assignment.
556 std::vector<std::string> tokens =
557 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
558
559 // We found a custom qualifier, it was in the form we expected it to be.
560 // Return that instead of whatever we initially wanted to return. In
561 // a case below the `foo` part will be returned to the caller.
562 //
563 // p3_cfname=foo
564 //
565 // If no value was specified then we'll return an empty string which
566 // later gets translated into using a default CF.
567 if (tokens.size() == 2) {
568 *per_part_match_found = true;
569 return tokens[1];
570 } else {
571 return empty_result;
572 }
573 }
574 }
575 }
576
577 // Do this loop again, this time searching for 'table level' qualifiers if we
578 // didn't find any partition level qualifiers above.
579 for (const auto &it : v) {
580 if (it.substr(0, search_str.length()) == search_str) {
581 std::vector<std::string> tokens =
582 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
583 if (tokens.size() == 2) {
584 return tokens[1];
585 } else {
586 return empty_result;
587 }
588 }
589 }
590
591 // If we didn't find any partitioned/non-partitioned qualifiers, return an
592 // empty string.
593 return empty_result;
594}
595
596/**
597 Read a memcmp key part from a slice using the passed in reader.
598
599 Returns -1 if field was null, 1 if error, 0 otherwise.
600*/
601int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
602 Rdb_string_reader *reader,
603 const uint part_num) const {
604 /* It is impossible to unpack the column. Skip it. */
605 if (m_pack_info[part_num].m_maybe_null) {
606 const char *nullp;
607 if (!(nullp = reader->read(1)))
608 return 1;
609 if (*nullp == 0) {
610 /* This is a NULL value */
611 return -1;
612 } else {
613 /* If NULL marker is not '0', it can be only '1' */
614 if (*nullp != 1)
615 return 1;
616 }
617 }
618
619 Rdb_field_packing *fpi = &m_pack_info[part_num];
620 DBUG_ASSERT(table_arg->s != nullptr);
621
622 bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
623 (table_arg->s->primary_key == MAX_INDEXES);
624 Field *field = nullptr;
625 if (!is_hidden_pk_part)
626 field = fpi->get_field_in_table(table_arg);
627 if ((this->*fpi->m_skip_func)(fpi, field, reader))
628 return 1;
629
630 return 0;
631}
632
633/**
634 Get a mem-comparable form of Primary Key from mem-comparable form of this key
635
636 @param
637 pk_descr Primary Key descriptor
638 key Index tuple from this key in mem-comparable form
639 pk_buffer OUT Put here mem-comparable form of the Primary Key.
640
641 @note
642 It may or may not be possible to restore primary key columns to their
643 mem-comparable form. To handle all cases, this function copies mem-
644 comparable forms directly.
645
646 RocksDB SE supports "Extended keys". This means that PK columns are present
647 at the end of every key. If the key already includes PK columns, then
648 these columns are not present at the end of the key.
649
650 Because of the above, we copy each primary key column.
651
652 @todo
653 If we checked crc32 checksums in this function, we would catch some CRC
654 violations that we currently don't. On the other hand, there is a broader
655 set of queries for which we would check the checksum twice.
656*/
657
658uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
659 const Rdb_key_def &pk_descr,
660 const rocksdb::Slice *const key,
661 uchar *const pk_buffer) const {
662 DBUG_ASSERT(table != nullptr);
663 DBUG_ASSERT(key != nullptr);
664 DBUG_ASSERT(pk_buffer);
665
666 uint size = 0;
667 uchar *buf = pk_buffer;
668 DBUG_ASSERT(m_pk_key_parts);
669
670 /* Put the PK number */
671 rdb_netbuf_store_index(buf, pk_descr.m_index_number);
672 buf += INDEX_NUMBER_SIZE;
673 size += INDEX_NUMBER_SIZE;
674
675 const char *start_offs[MAX_REF_PARTS];
676 const char *end_offs[MAX_REF_PARTS];
677 int pk_key_part;
678 uint i;
679 Rdb_string_reader reader(key);
680
681 // Skip the index number
682 if ((!reader.read(INDEX_NUMBER_SIZE)))
683 return RDB_INVALID_KEY_LEN;
684
685 for (i = 0; i < m_key_parts; i++) {
686 if ((pk_key_part = m_pk_part_no[i]) != -1) {
687 start_offs[pk_key_part] = reader.get_current_ptr();
688 }
689
690 if (read_memcmp_key_part(table, &reader, i) > 0) {
691 return RDB_INVALID_KEY_LEN;
692 }
693
694 if (pk_key_part != -1) {
695 end_offs[pk_key_part] = reader.get_current_ptr();
696 }
697 }
698
699 for (i = 0; i < m_pk_key_parts; i++) {
700 const uint part_size = end_offs[i] - start_offs[i];
701 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
702 buf += part_size;
703 size += part_size;
704 }
705
706 return size;
707}
708
709/**
710 Get a mem-comparable form of Secondary Key from mem-comparable form of this
711 key, without the extended primary key tail.
712
713 @param
714 key Index tuple from this key in mem-comparable form
715 sk_buffer OUT Put here mem-comparable form of the Secondary Key.
716 n_null_fields OUT Put number of null fields contained within sk entry
717*/
718uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
719 const rocksdb::Slice &key,
720 uchar *sk_buffer,
721 uint *n_null_fields) const {
722 DBUG_ASSERT(table != nullptr);
723 DBUG_ASSERT(sk_buffer != nullptr);
724 DBUG_ASSERT(n_null_fields != nullptr);
725 DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
726
727 uchar *buf = sk_buffer;
728
729 int res;
730 Rdb_string_reader reader(&key);
731 const char *start = reader.get_current_ptr();
732
733 // Skip the index number
734 if ((!reader.read(INDEX_NUMBER_SIZE)))
735 return RDB_INVALID_KEY_LEN;
736
737 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
738 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
739 return RDB_INVALID_KEY_LEN;
740 } else if (res == -1) {
741 (*n_null_fields)++;
742 }
743 }
744
745 uint sk_memcmp_len = reader.get_current_ptr() - start;
746 memcpy(buf, start, sk_memcmp_len);
747 return sk_memcmp_len;
748}
749
750/**
751 Convert index tuple into storage (i.e. mem-comparable) format
752
753 @detail
754 Currently this is done by unpacking into table->record[0] and then
755 packing index columns into storage format.
756
757 @param pack_buffer Temporary area for packing varchar columns. Its
758 size is at least max_storage_fmt_length() bytes.
759*/
760
761uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
762 uchar *const packed_tuple,
763 const uchar *const key_tuple,
764 const key_part_map &keypart_map) const {
765 DBUG_ASSERT(tbl != nullptr);
766 DBUG_ASSERT(pack_buffer != nullptr);
767 DBUG_ASSERT(packed_tuple != nullptr);
768 DBUG_ASSERT(key_tuple != nullptr);
769
770 /* We were given a record in KeyTupleFormat. First, save it to record */
771 const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
772 key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
773
774 uint n_used_parts = my_count_bits(keypart_map);
775 if (keypart_map == HA_WHOLE_KEY)
776 n_used_parts = 0; // Full key is used
777
778 /* Then, convert the record into a mem-comparable form */
779 return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
780 false, 0, n_used_parts);
781}
782
783/**
784 @brief
785 Check if "unpack info" data includes checksum.
786
787 @detail
788 This is used only by CHECK TABLE to count the number of rows that have
789 checksums.
790*/
791
792bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
793 size_t size = unpack_info.size();
794 if (size == 0) {
795 return false;
796 }
797 const uchar *ptr = (const uchar *)unpack_info.data();
798
799 // Skip unpack info if present.
800 if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
801 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
802 SHIP_ASSERT(size >= skip_len);
803
804 size -= skip_len;
805 ptr += skip_len;
806 }
807
808 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
809}
810
811/*
812 @return Number of bytes that were changed
813*/
814int Rdb_key_def::successor(uchar *const packed_tuple, const uint &len) {
815 DBUG_ASSERT(packed_tuple != nullptr);
816
817 int changed = 0;
818 uchar *p = packed_tuple + len - 1;
819 for (; p > packed_tuple; p--) {
820 changed++;
821 if (*p != uchar(0xFF)) {
822 *p = *p + 1;
823 break;
824 }
825 *p = '\0';
826 }
827 return changed;
828}
829
830/*
831 @return Number of bytes that were changed
832*/
833int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint &len) {
834 DBUG_ASSERT(packed_tuple != nullptr);
835
836 int changed = 0;
837 uchar *p = packed_tuple + len - 1;
838 for (; p > packed_tuple; p--) {
839 changed++;
840 if (*p != uchar(0x00)) {
841 *p = *p - 1;
842 break;
843 }
844 *p = 0xFF;
845 }
846 return changed;
847}
848
849static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
850 {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
851 {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
852
853/*
854 @return The length in bytes of the header specified by the given tag
855*/
856size_t Rdb_key_def::get_unpack_header_size(char tag) {
857 DBUG_ASSERT(is_unpack_data_tag(tag));
858 return UNPACK_HEADER_SIZES.at(tag);
859}
860
861/*
862 Get a bitmap indicating which varchar columns must be covered for this
863 lookup to be covered. If the bitmap is a subset of the covered bitmap, then
864 the lookup is covered. If it can already be determined that the lookup is
865 not covered, map->bitmap will be set to null.
866 */
867void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
868 DBUG_ASSERT(map->bitmap == nullptr);
869 bitmap_init(map, nullptr, MAX_REF_PARTS, false);
870 uint curr_bitmap_pos = 0;
871
872 // Indicates which columns in the read set might be covered.
873 MY_BITMAP maybe_covered_bitmap;
874 bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
875
876 for (uint i = 0; i < m_key_parts; i++) {
877 if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
878 continue;
879 }
880
881 Field *const field = m_pack_info[i].get_field_in_table(table);
882
883 // Columns which are always covered are not stored in the covered bitmap so
884 // we can ignore them here too.
885 if (m_pack_info[i].m_covered &&
886 bitmap_is_set(table->read_set, field->field_index)) {
887 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
888 continue;
889 }
890
891 switch (field->real_type()) {
892 // This type may be covered depending on the record. If it was requested,
893 // we require the covered bitmap to have this bit set.
894 case MYSQL_TYPE_VARCHAR:
895 if (curr_bitmap_pos < MAX_REF_PARTS) {
896 if (bitmap_is_set(table->read_set, field->field_index)) {
897 bitmap_set_bit(map, curr_bitmap_pos);
898 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
899 }
900 curr_bitmap_pos++;
901 } else {
902 bitmap_free(&maybe_covered_bitmap);
903 bitmap_free(map);
904 return;
905 }
906 break;
907 // This column is a type which is never covered. If it was requested, we
908 // know this lookup will never be covered.
909 default:
910 if (bitmap_is_set(table->read_set, field->field_index)) {
911 bitmap_free(&maybe_covered_bitmap);
912 bitmap_free(map);
913 return;
914 }
915 break;
916 }
917 }
918
919 // If there are columns which are not covered in the read set, the lookup
920 // can't be covered.
921 if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
922 bitmap_free(map);
923 }
924 bitmap_free(&maybe_covered_bitmap);
925}
926
927/*
928 Return true if for this secondary index
929 - All of the requested columns are in the index
930 - All values for columns that are prefix-only indexes are shorter or equal
931 in length to the prefix
932 */
933bool Rdb_key_def::covers_lookup(TABLE *const table,
934 const rocksdb::Slice *const unpack_info,
935 const MY_BITMAP *const lookup_bitmap) const {
936 DBUG_ASSERT(lookup_bitmap != nullptr);
937 if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
938 return false;
939 }
940
941 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
942
943 // Check if this unpack_info has a covered_bitmap
944 const char *unpack_header = unp_reader.get_current_ptr();
945 const bool has_covered_unpack_info =
946 unp_reader.remaining_bytes() &&
947 unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
948 if (!has_covered_unpack_info ||
949 !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
950 return false;
951 }
952
953 MY_BITMAP covered_bitmap;
954 my_bitmap_map covered_bits;
955 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
956 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
957 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
958 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
959
960 return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
961}
962
963uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
964 uchar *tuple, uchar *const packed_tuple,
965 uchar *const pack_buffer,
966 Rdb_string_writer *const unpack_info,
967 uint *const n_null_fields) const {
968 if (field->real_maybe_null()) {
969 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
970 if (field->is_real_null()) {
971 /* NULL value. store '\0' so that it sorts before non-NULL values */
972 *tuple++ = 0;
973 /* That's it, don't store anything else */
974 if (n_null_fields)
975 (*n_null_fields)++;
976 return tuple;
977 } else {
978 /* Not a NULL value. Store '1' */
979 *tuple++ = 1;
980 }
981 }
982
983 const bool create_unpack_info =
984 (unpack_info && // we were requested to generate unpack_info
985 pack_info->uses_unpack_info()); // and this keypart uses it
986 Rdb_pack_field_context pack_ctx(unpack_info);
987
988 // Set the offset for methods which do not take an offset as an argument
989 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
990 pack_info->m_max_image_len));
991
992 (this->*pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple,
993 &pack_ctx);
994
995 /* Make "unpack info" to be stored in the value */
996 if (create_unpack_info) {
997 (this->*pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec,
998 field, &pack_ctx);
999 }
1000
1001 return tuple;
1002}
1003
1004/**
1005 Get index columns from the record and pack them into mem-comparable form.
1006
1007 @param
1008 tbl Table we're working on
1009 record IN Record buffer with fields in table->record format
1010 pack_buffer IN Temporary area for packing varchars. The size is
1011 at least max_storage_fmt_length() bytes.
1012 packed_tuple OUT Key in the mem-comparable form
1013 unpack_info OUT Unpack data
1014 unpack_info_len OUT Unpack data length
1015 n_key_parts Number of keyparts to process. 0 means all of them.
1016 n_null_fields OUT Number of key fields with NULL value.
1017 ttl_pk_offset OUT Offset of the ttl column if specified and in the key
1018
1019 @detail
1020 Some callers do not need the unpack information, they can pass
1021 unpack_info=nullptr, unpack_info_len=nullptr.
1022
1023 @return
1024 Length of the packed tuple
1025*/
1026
1027uint Rdb_key_def::pack_record(
1028 const TABLE *const tbl, uchar *const pack_buffer, const uchar *const record,
1029 uchar *const packed_tuple, Rdb_string_writer *const unpack_info,
1030 const bool &should_store_row_debug_checksums, const longlong &hidden_pk_id,
1031 uint n_key_parts, uint *const n_null_fields, uint *const ttl_pk_offset,
1032 const char *const ttl_bytes) const {
1033 DBUG_ASSERT(tbl != nullptr);
1034 DBUG_ASSERT(pack_buffer != nullptr);
1035 DBUG_ASSERT(record != nullptr);
1036 DBUG_ASSERT(packed_tuple != nullptr);
1037 // Checksums for PKs are made when record is packed.
1038 // We should never attempt to make checksum just from PK values
1039 DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1040 (m_index_type == INDEX_TYPE_SECONDARY));
1041
1042 uchar *tuple = packed_tuple;
1043 size_t unpack_start_pos = size_t(-1);
1044 size_t unpack_len_pos = size_t(-1);
1045 size_t covered_bitmap_pos = size_t(-1);
1046 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1047
1048 rdb_netbuf_store_index(tuple, m_index_number);
1049 tuple += INDEX_NUMBER_SIZE;
1050
1051 // If n_key_parts is 0, it means all columns.
1052 // The following includes the 'extended key' tail.
1053 // The 'extended key' includes primary key. This is done to 'uniqify'
1054 // non-unique indexes
1055 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1056
1057 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1058 // hidden key part. So we skip it (its always 1 part).
1059 if (hidden_pk_exists && !hidden_pk_id && use_all_columns)
1060 n_key_parts = m_key_parts - 1;
1061 else if (use_all_columns)
1062 n_key_parts = m_key_parts;
1063
1064 if (n_null_fields)
1065 *n_null_fields = 0;
1066
1067 // Check if we need a covered bitmap. If it is certain that all key parts are
1068 // covering, we don't need one.
1069 bool store_covered_bitmap = false;
1070 if (unpack_info && use_covered_bitmap_format()) {
1071 for (uint i = 0; i < n_key_parts; i++) {
1072 if (!m_pack_info[i].m_covered) {
1073 store_covered_bitmap = true;
1074 break;
1075 }
1076 }
1077 }
1078
1079 const char tag =
1080 store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1081
1082 if (unpack_info) {
1083 unpack_info->clear();
1084
1085 if (m_index_type == INDEX_TYPE_SECONDARY &&
1086 m_total_index_flags_length > 0) {
1087 // Reserve space for index flag fields
1088 unpack_info->allocate(m_total_index_flags_length);
1089
1090 // Insert TTL timestamp
1091 if (has_ttl() && ttl_bytes) {
1092 write_index_flag_field(unpack_info,
1093 reinterpret_cast<const uchar *const>(ttl_bytes),
1094 Rdb_key_def::TTL_FLAG);
1095 }
1096 }
1097
1098 unpack_start_pos = unpack_info->get_current_pos();
1099 unpack_info->write_uint8(tag);
1100 unpack_len_pos = unpack_info->get_current_pos();
1101 // we don't know the total length yet, so write a zero
1102 unpack_info->write_uint16(0);
1103
1104 if (store_covered_bitmap) {
1105 // Reserve two bytes for the covered bitmap. This will store, for key
1106 // parts which are not always covering, whether or not it is covering
1107 // for this record.
1108 covered_bitmap_pos = unpack_info->get_current_pos();
1109 unpack_info->write_uint16(0);
1110 }
1111 }
1112
1113 MY_BITMAP covered_bitmap;
1114 my_bitmap_map covered_bits;
1115 uint curr_bitmap_pos = 0;
1116 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1117
1118 for (uint i = 0; i < n_key_parts; i++) {
1119 // Fill hidden pk id into the last key part for secondary keys for tables
1120 // with no pk
1121 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1122 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1123 break;
1124 }
1125
1126 Field *const field = m_pack_info[i].get_field_in_table(tbl);
1127 DBUG_ASSERT(field != nullptr);
1128
1129 uint field_offset = field->ptr - tbl->record[0];
1130 uint null_offset = field->null_offset(tbl->record[0]);
1131 bool maybe_null = field->real_maybe_null();
1132
1133 // Save the ttl duration offset in the key so we can store it in front of
1134 // the record later.
1135 if (ttl_pk_offset && m_ttl_duration > 0 && i == m_ttl_pk_key_part_offset) {
1136 DBUG_ASSERT(field_check_field_name_match(field, m_ttl_column.c_str()));
1137 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
1138 DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
1139 DBUG_ASSERT(!field->real_maybe_null());
1140 *ttl_pk_offset = tuple - packed_tuple;
1141 }
1142
1143 field->move_field(const_cast<uchar*>(record) + field_offset,
1144 maybe_null ? const_cast<uchar*>(record) + null_offset : nullptr,
1145 field->null_bit);
1146 // WARNING! Don't return without restoring field->ptr and field->null_ptr
1147
1148 tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1149 unpack_info, n_null_fields);
1150
1151 // If this key part is a prefix of a VARCHAR field, check if it's covered.
1152 if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1153 !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1154 size_t data_length = field->data_length();
1155 uint16 key_length;
1156 if (m_pk_part_no[i] == (uint)-1) {
1157 key_length = tbl->key_info[get_keyno()].key_part[i].length;
1158 } else {
1159 key_length =
1160 tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1161 }
1162
1163 if (m_pack_info[i].m_unpack_func != nullptr &&
1164 data_length <= key_length) {
1165 bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1166 }
1167 curr_bitmap_pos++;
1168 }
1169
1170 // Restore field->ptr and field->null_ptr
1171 field->move_field(tbl->record[0] + field_offset,
1172 maybe_null ? tbl->record[0] + null_offset : nullptr,
1173 field->null_bit);
1174 }
1175
1176 if (unpack_info) {
1177 const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1178 DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1179
1180 // Don't store the unpack_info if it has only the header (that is, there's
1181 // no meaningful content).
1182 // Primary Keys are special: for them, store the unpack_info even if it's
1183 // empty (provided m_maybe_unpack_info==true, see
1184 // ha_rocksdb::convert_record_to_storage_format)
1185 if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1186 if (len == get_unpack_header_size(tag) && !covered_bits) {
1187 unpack_info->truncate(unpack_start_pos);
1188 } else if (store_covered_bitmap) {
1189 unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1190 }
1191 } else {
1192 unpack_info->write_uint16_at(unpack_len_pos, len);
1193 }
1194
1195 //
1196 // Secondary keys have key and value checksums in the value part
1197 // Primary key is a special case (the value part has non-indexed columns),
1198 // so the checksums are computed and stored by
1199 // ha_rocksdb::convert_record_to_storage_format
1200 //
1201 if (should_store_row_debug_checksums) {
1202 const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
1203 const uint32_t val_crc32 =
1204 crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
1205
1206 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1207 unpack_info->write_uint32(key_crc32);
1208 unpack_info->write_uint32(val_crc32);
1209 }
1210 }
1211
1212 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1213
1214 return tuple - packed_tuple;
1215}
1216
1217/**
1218 Pack the hidden primary key into mem-comparable form.
1219
1220 @param
1221 tbl Table we're working on
1222 hidden_pk_id IN New value to be packed into key
1223 packed_tuple OUT Key in the mem-comparable form
1224
1225 @return
1226 Length of the packed tuple
1227*/
1228
1229uint Rdb_key_def::pack_hidden_pk(const longlong &hidden_pk_id,
1230 uchar *const packed_tuple) const {
1231 DBUG_ASSERT(packed_tuple != nullptr);
1232
1233 uchar *tuple = packed_tuple;
1234 rdb_netbuf_store_index(tuple, m_index_number);
1235 tuple += INDEX_NUMBER_SIZE;
1236 DBUG_ASSERT(m_key_parts == 1);
1237 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1238 m_pack_info[0].m_max_image_len));
1239
1240 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1241
1242 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1243 return tuple - packed_tuple;
1244}
1245
1246/*
1247 Function of type rdb_index_field_pack_t
1248*/
1249
1250void Rdb_key_def::pack_with_make_sort_key(
1251 Rdb_field_packing *const fpi, Field *const field,
1252 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1253 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) const {
1254 DBUG_ASSERT(fpi != nullptr);
1255 DBUG_ASSERT(field != nullptr);
1256 DBUG_ASSERT(dst != nullptr);
1257 DBUG_ASSERT(*dst != nullptr);
1258
1259 const int max_len = fpi->m_max_image_len;
1260 my_bitmap_map *old_map;
1261
1262 old_map= dbug_tmp_use_all_columns(field->table,
1263 field->table->read_set);
1264 field->sort_string(*dst, max_len);
1265 dbug_tmp_restore_column_map(field->table->read_set, old_map);
1266 *dst += max_len;
1267}
1268
1269/*
1270 Compares two keys without unpacking
1271
1272 @detail
1273 @return
1274 0 - Ok. column_index is the index of the first column which is different.
1275 -1 if two kes are equal
1276 1 - Data format error.
1277*/
1278int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1279 const rocksdb::Slice *key2,
1280 std::size_t *const column_index) const {
1281 DBUG_ASSERT(key1 != nullptr);
1282 DBUG_ASSERT(key2 != nullptr);
1283 DBUG_ASSERT(column_index != nullptr);
1284
1285 // the caller should check the return value and
1286 // not rely on column_index being valid
1287 *column_index = 0xbadf00d;
1288
1289 Rdb_string_reader reader1(key1);
1290 Rdb_string_reader reader2(key2);
1291
1292 // Skip the index number
1293 if ((!reader1.read(INDEX_NUMBER_SIZE)))
1294 return HA_EXIT_FAILURE;
1295
1296 if ((!reader2.read(INDEX_NUMBER_SIZE)))
1297 return HA_EXIT_FAILURE;
1298
1299 for (uint i = 0; i < m_key_parts; i++) {
1300 const Rdb_field_packing *const fpi = &m_pack_info[i];
1301 if (fpi->m_maybe_null) {
1302 const auto nullp1 = reader1.read(1);
1303 const auto nullp2 = reader2.read(1);
1304
1305 if (nullp1 == nullptr || nullp2 == nullptr) {
1306 return HA_EXIT_FAILURE;
1307 }
1308
1309 if (*nullp1 != *nullp2) {
1310 *column_index = i;
1311 return HA_EXIT_SUCCESS;
1312 }
1313
1314 if (*nullp1 == 0) {
1315 /* This is a NULL value */
1316 continue;
1317 }
1318 }
1319
1320 const auto before_skip1 = reader1.get_current_ptr();
1321 const auto before_skip2 = reader2.get_current_ptr();
1322 DBUG_ASSERT(fpi->m_skip_func);
1323 if ((this->*fpi->m_skip_func)(fpi, nullptr, &reader1))
1324 return HA_EXIT_FAILURE;
1325 if ((this->*fpi->m_skip_func)(fpi, nullptr, &reader2))
1326 return HA_EXIT_FAILURE;
1327 const auto size1 = reader1.get_current_ptr() - before_skip1;
1328 const auto size2 = reader2.get_current_ptr() - before_skip2;
1329 if (size1 != size2) {
1330 *column_index = i;
1331 return HA_EXIT_SUCCESS;
1332 }
1333
1334 if (memcmp(before_skip1, before_skip2, size1) != 0) {
1335 *column_index = i;
1336 return HA_EXIT_SUCCESS;
1337 }
1338 }
1339
1340 *column_index = m_key_parts;
1341 return HA_EXIT_SUCCESS;
1342}
1343
1344/*
1345 @brief
1346 Given a zero-padded key, determine its real key length
1347
1348 @detail
1349 Fixed-size skip functions just read.
1350*/
1351
1352size_t Rdb_key_def::key_length(const TABLE *const table,
1353 const rocksdb::Slice &key) const {
1354 DBUG_ASSERT(table != nullptr);
1355
1356 Rdb_string_reader reader(&key);
1357
1358 if ((!reader.read(INDEX_NUMBER_SIZE)))
1359 return size_t(-1);
1360
1361 for (uint i = 0; i < m_key_parts; i++) {
1362 const Rdb_field_packing *fpi = &m_pack_info[i];
1363 const Field *field = nullptr;
1364 if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY)
1365 field = fpi->get_field_in_table(table);
1366 if ((this->*fpi->m_skip_func)(fpi, field, &reader))
1367 return size_t(-1);
1368 }
1369 return key.size() - reader.remaining_bytes();
1370}
1371
1372int Rdb_key_def::unpack_field(
1373 Rdb_field_packing *const fpi,
1374 Field *const field,
1375 Rdb_string_reader* reader,
1376 const uchar *const default_value,
1377 Rdb_string_reader* unp_reader) const
1378{
1379 if (fpi->m_maybe_null) {
1380 const char *nullp;
1381 if (!(nullp = reader->read(1))) {
1382 return HA_EXIT_FAILURE;
1383 }
1384
1385 if (*nullp == 0) {
1386 /* Set the NULL-bit of this field */
1387 field->set_null();
1388 /* Also set the field to its default value */
1389 memcpy(field->ptr, default_value, field->pack_length());
1390 return HA_EXIT_SUCCESS;
1391 } else if (*nullp == 1) {
1392 field->set_notnull();
1393 } else {
1394 return HA_EXIT_FAILURE;
1395 }
1396 }
1397
1398 return (this->*fpi->m_unpack_func)(fpi, field, field->ptr, reader,
1399 unp_reader);
1400}
1401
1402/*
1403 Take mem-comparable form and unpack_info and unpack it to Table->record
1404
1405 @detail
1406 not all indexes support this
1407
1408 @return
1409 HA_EXIT_SUCCESS OK
1410 other HA_ERR error code
1411*/
1412
1413int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1414 const rocksdb::Slice *const packed_key,
1415 const rocksdb::Slice *const unpack_info,
1416 const bool &verify_row_debug_checksums) const {
1417 Rdb_string_reader reader(packed_key);
1418 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1419
1420 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
1421 const bool hidden_pk_exists = table_has_hidden_pk(table);
1422 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
1423 // There is no checksuming data after unpack_info for primary keys, because
1424 // the layout there is different. The checksum is verified in
1425 // ha_rocksdb::convert_record_from_storage_format instead.
1426 DBUG_ASSERT_IMP(!secondary_key, !verify_row_debug_checksums);
1427
1428 // Skip the index number
1429 if ((!reader.read(INDEX_NUMBER_SIZE))) {
1430 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1431 }
1432
1433 // For secondary keys, we expect the value field to contain unpack data and
1434 // checksum data in that order. One or both can be missing, but they cannot
1435 // be reordered.
1436 const char *unpack_header = unp_reader.get_current_ptr();
1437 const bool has_unpack_info =
1438 unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1439 if (has_unpack_info) {
1440 if ((m_index_type == INDEX_TYPE_SECONDARY &&
1441 m_total_index_flags_length > 0 &&
1442 !unp_reader.read(m_total_index_flags_length)) ||
1443 !unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1444 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1445 }
1446 }
1447
1448 // Read the covered bitmap
1449 MY_BITMAP covered_bitmap;
1450 my_bitmap_map covered_bits;
1451 uint curr_bitmap_pos = 0;
1452
1453 const bool has_covered_bitmap =
1454 has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1455 if (has_covered_bitmap) {
1456 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1457 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1458 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1459 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1460 }
1461
1462 for (uint i = 0; i < m_key_parts; i++) {
1463 Rdb_field_packing *const fpi = &m_pack_info[i];
1464
1465 /*
1466 Hidden pk field is packed at the end of the secondary keys, but the SQL
1467 layer does not know about it. Skip retrieving field if hidden pk.
1468 */
1469 if ((secondary_key && hidden_pk_exists && i + 1 == m_key_parts) ||
1470 is_hidden_pk) {
1471 DBUG_ASSERT(fpi->m_unpack_func);
1472 if ((this->*fpi->m_skip_func)(fpi, nullptr, &reader)) {
1473 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1474 }
1475 continue;
1476 }
1477
1478 Field *const field = fpi->get_field_in_table(table);
1479
1480 bool covered_column = true;
1481 if (has_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1482 !m_pack_info[i].m_covered) {
1483 covered_column = curr_bitmap_pos < MAX_REF_PARTS &&
1484 bitmap_is_set(&covered_bitmap, curr_bitmap_pos);
1485 curr_bitmap_pos++;
1486 }
1487 if (fpi->m_unpack_func && covered_column) {
1488 /* It is possible to unpack this column. Do it. */
1489
1490 uint field_offset = field->ptr - table->record[0];
1491 uint null_offset = field->null_offset();
1492 bool maybe_null = field->real_maybe_null();
1493 field->move_field(buf + field_offset,
1494 maybe_null ? buf + null_offset : nullptr,
1495 field->null_bit);
1496 // WARNING! Don't return without restoring field->ptr and field->null_ptr
1497
1498 // If we need unpack info, but there is none, tell the unpack function
1499 // this by passing unp_reader as nullptr. If we never read unpack_info
1500 // during unpacking anyway, then there won't an error.
1501 const bool maybe_missing_unpack =
1502 !has_unpack_info && fpi->uses_unpack_info();
1503 int res = unpack_field(fpi, field, &reader,
1504 table->s->default_values + field_offset,
1505 maybe_missing_unpack ? nullptr : &unp_reader);
1506
1507 // Restore field->ptr and field->null_ptr
1508 field->move_field(table->record[0] + field_offset,
1509 maybe_null ? table->record[0] + null_offset : nullptr,
1510 field->null_bit);
1511
1512 if (res != UNPACK_SUCCESS) {
1513 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1514 }
1515 } else {
1516 /* It is impossible to unpack the column. Skip it. */
1517 if (fpi->m_maybe_null) {
1518 const char *nullp;
1519 if (!(nullp = reader.read(1)))
1520 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1521 if (*nullp == 0) {
1522 /* This is a NULL value */
1523 continue;
1524 }
1525 /* If NULL marker is not '0', it can be only '1' */
1526 if (*nullp != 1)
1527 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1528 }
1529 if ((this->*fpi->m_skip_func)(fpi, field, &reader))
1530 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1531
1532 // If this is a space padded varchar, we need to skip the indicator
1533 // bytes for trailing bytes. They're useless since we can't restore the
1534 // field anyway.
1535 //
1536 // There is a special case for prefixed varchars where we do not
1537 // generate unpack info, because we know prefixed varchars cannot be
1538 // unpacked. In this case, it is not necessary to skip.
1539 if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
1540 !fpi->m_unpack_info_stores_value) {
1541 unp_reader.read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
1542 }
1543 }
1544 }
1545
1546 /*
1547 Check checksum values if present
1548 */
1549 const char *ptr;
1550 if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1551 if (verify_row_debug_checksums) {
1552 uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1553 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1554 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1555 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1556
1557 const uint32_t computed_key_chksum =
1558 crc32(0, (const uchar *)packed_key->data(), packed_key->size());
1559 const uint32_t computed_val_chksum =
1560 crc32(0, (const uchar *)unpack_info->data(),
1561 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1562
1563 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1564 stored_key_chksum++;);
1565
1566 if (stored_key_chksum != computed_key_chksum) {
1567 report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1568 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1569 }
1570
1571 if (stored_val_chksum != computed_val_chksum) {
1572 report_checksum_mismatch(false, unpack_info->data(),
1573 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1574 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1575 }
1576 } else {
1577 /* The checksums are present but we are not checking checksums */
1578 }
1579 }
1580
1581 if (reader.remaining_bytes())
1582 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1583
1584 return HA_EXIT_SUCCESS;
1585}
1586
1587bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1588 return table->s->primary_key == MAX_INDEXES;
1589}
1590
1591void Rdb_key_def::report_checksum_mismatch(const bool &is_key,
1592 const char *const data,
1593 const size_t data_size) const {
1594 // NO_LINT_DEBUG
1595 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1596 is_key ? "key" : "value", get_index_number());
1597
1598 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1599 // NO_LINT_DEBUG
1600 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1601 (uint64_t)data_size, buf.c_str());
1602
1603 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1604}
1605
1606bool Rdb_key_def::index_format_min_check(const int &pk_min,
1607 const int &sk_min) const {
1608 switch (m_index_type) {
1609 case INDEX_TYPE_PRIMARY:
1610 case INDEX_TYPE_HIDDEN_PRIMARY:
1611 return (m_kv_format_version >= pk_min);
1612 case INDEX_TYPE_SECONDARY:
1613 return (m_kv_format_version >= sk_min);
1614 default:
1615 DBUG_ASSERT(0);
1616 return false;
1617 }
1618}
1619
1620///////////////////////////////////////////////////////////////////////////////////////////
1621// Rdb_field_packing
1622///////////////////////////////////////////////////////////////////////////////////////////
1623
1624/*
1625 Function of type rdb_index_field_skip_t
1626*/
1627
1628int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1629 const Field *const field
1630 MY_ATTRIBUTE((__unused__)),
1631 Rdb_string_reader *const reader) const {
1632 if (!reader->read(fpi->m_max_image_len))
1633 return HA_EXIT_FAILURE;
1634 return HA_EXIT_SUCCESS;
1635}
1636
1637/*
1638 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1639 split in the middle of an UTF-8 character. See the implementation of
1640 unpack_binary_or_utf8_varchar.
1641*/
1642
1643#define RDB_ESCAPE_LENGTH 9
1644#define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1645static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1646 "RDB_ESCAPE_LENGTH-1 must be even.");
1647
1648#define RDB_ENCODED_SIZE(len) \
1649 ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1650 RDB_ESCAPE_LENGTH
1651
1652#define RDB_LEGACY_ENCODED_SIZE(len) \
1653 ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1654 RDB_LEGACY_ESCAPE_LENGTH
1655
1656/*
1657 Function of type rdb_index_field_skip_t
1658*/
1659
1660int Rdb_key_def::skip_variable_length(
1661 const Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
1662 const Field *const field, Rdb_string_reader *const reader) const {
1663 const uchar *ptr;
1664 bool finished = false;
1665
1666 size_t dst_len; /* How much data can be there */
1667 if (field) {
1668 const Field_varstring *const field_var =
1669 static_cast<const Field_varstring *>(field);
1670 dst_len = field_var->pack_length() - field_var->length_bytes;
1671 } else {
1672 dst_len = UINT_MAX;
1673 }
1674
1675 bool use_legacy_format = use_legacy_varbinary_format();
1676
1677 /* Decode the length-emitted encoding here */
1678 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1679 uint used_bytes;
1680
1681 /* See pack_with_varchar_encoding. */
1682 if (use_legacy_format) {
1683 used_bytes = calc_unpack_legacy_variable_format(
1684 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1685 } else {
1686 used_bytes =
1687 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1688 }
1689
1690 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1691 return HA_EXIT_FAILURE; // Corruption in the data
1692 }
1693
1694 if (finished) {
1695 break;
1696 }
1697
1698 dst_len -= used_bytes;
1699 }
1700
1701 if (!finished) {
1702 return HA_EXIT_FAILURE;
1703 }
1704
1705 return HA_EXIT_SUCCESS;
1706}
1707
1708const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1709const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1710const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1711
1712/*
1713 Skip a keypart that uses Variable-Length Space-Padded encoding
1714*/
1715
1716int Rdb_key_def::skip_variable_space_pad(
1717 const Rdb_field_packing *const fpi, const Field *const field,
1718 Rdb_string_reader *const reader) const {
1719 const uchar *ptr;
1720 bool finished = false;
1721
1722 size_t dst_len = UINT_MAX; /* How much data can be there */
1723
1724 if (field) {
1725 const Field_varstring *const field_var =
1726 static_cast<const Field_varstring *>(field);
1727 dst_len = field_var->pack_length() - field_var->length_bytes;
1728 }
1729
1730 /* Decode the length-emitted encoding here */
1731 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1732 // See pack_with_varchar_space_pad
1733 const uchar c = ptr[fpi->m_segment_size - 1];
1734 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1735 // This is the last segment
1736 finished = true;
1737 break;
1738 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1739 c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1740 // This is not the last segment
1741 if ((fpi->m_segment_size - 1) > dst_len) {
1742 // The segment is full of data but the table field can't hold that
1743 // much! This must be data corruption.
1744 return HA_EXIT_FAILURE;
1745 }
1746 dst_len -= (fpi->m_segment_size - 1);
1747 } else {
1748 // Encountered a value that's none of the VARCHAR_CMP* constants
1749 // It's data corruption.
1750 return HA_EXIT_FAILURE;
1751 }
1752 }
1753 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1754}
1755
1756/*
1757 Function of type rdb_index_field_unpack_t
1758*/
1759
1760int Rdb_key_def::unpack_integer(
1761 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1762 Rdb_string_reader *const reader,
1763 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1764 const int length = fpi->m_max_image_len;
1765
1766 const uchar *from;
1767 if (!(from = (const uchar *)reader->read(length)))
1768 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1769
1770#ifdef WORDS_BIGENDIAN
1771 {
1772 if (((Field_num *)field)->unsigned_flag)
1773 to[0] = from[0];
1774 else
1775 to[0] = (char)(from[0] ^ 128); // Reverse the sign bit.
1776 memcpy(to + 1, from + 1, length - 1);
1777 }
1778#else
1779 {
1780 const int sign_byte = from[0];
1781 if (((Field_num *)field)->unsigned_flag)
1782 to[length - 1] = sign_byte;
1783 else
1784 to[length - 1] =
1785 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1786 for (int i = 0, j = length - 1; i < length - 1; ++i, --j)
1787 to[i] = from[j];
1788 }
1789#endif
1790 return UNPACK_SUCCESS;
1791}
1792
1793#if !defined(WORDS_BIGENDIAN)
1794static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1795#if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1796 // A few systems store the most-significant _word_ first on little-endian
1797 dst[0] = src[3];
1798 dst[1] = src[2];
1799 dst[2] = src[1];
1800 dst[3] = src[0];
1801 dst[4] = src[7];
1802 dst[5] = src[6];
1803 dst[6] = src[5];
1804 dst[7] = src[4];
1805#else
1806 dst[0] = src[7];
1807 dst[1] = src[6];
1808 dst[2] = src[5];
1809 dst[3] = src[4];
1810 dst[4] = src[3];
1811 dst[5] = src[2];
1812 dst[6] = src[1];
1813 dst[7] = src[0];
1814#endif
1815}
1816
1817static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1818 dst[0] = src[3];
1819 dst[1] = src[2];
1820 dst[2] = src[1];
1821 dst[3] = src[0];
1822}
1823#else
1824#define rdb_swap_double_bytes nullptr
1825#define rdb_swap_float_bytes nullptr
1826#endif
1827
1828int Rdb_key_def::unpack_floating_point(
1829 uchar *const dst, Rdb_string_reader *const reader, const size_t &size,
1830 const int &exp_digit, const uchar *const zero_pattern,
1831 const uchar *const zero_val,
1832 void (*swap_func)(uchar *, const uchar *)) const {
1833 const uchar *const from = (const uchar *)reader->read(size);
1834 if (from == nullptr)
1835 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1836
1837 /* Check to see if the value is zero */
1838 if (memcmp(from, zero_pattern, size) == 0) {
1839 memcpy(dst, zero_val, size);
1840 return UNPACK_SUCCESS;
1841 }
1842
1843#if defined(WORDS_BIGENDIAN)
1844 // On big-endian, output can go directly into result
1845 uchar *const tmp = dst;
1846#else
1847 // Otherwise use a temporary buffer to make byte-swapping easier later
1848 uchar tmp[8];
1849#endif
1850
1851 memcpy(tmp, from, size);
1852
1853 if (tmp[0] & 0x80) {
1854 // If the high bit is set the original value was positive so
1855 // remove the high bit and subtract one from the exponent.
1856 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1857 exp_part &= 0x7FFF; // clear high bit;
1858 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
1859 tmp[0] = (uchar)(exp_part >> 8);
1860 tmp[1] = (uchar)exp_part;
1861 } else {
1862 // Otherwise the original value was negative and all bytes have been
1863 // negated.
1864 for (size_t ii = 0; ii < size; ii++)
1865 tmp[ii] ^= 0xFF;
1866 }
1867
1868#if !defined(WORDS_BIGENDIAN)
1869 // On little-endian, swap the bytes around
1870 swap_func(dst, tmp);
1871#else
1872 DBUG_ASSERT(swap_func == nullptr);
1873#endif
1874
1875 return UNPACK_SUCCESS;
1876}
1877
1878#if !defined(DBL_EXP_DIG)
1879#define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
1880#endif
1881
1882/*
1883 Function of type rdb_index_field_unpack_t
1884
1885 Unpack a double by doing the reverse action of change_double_for_sort
1886 (sql/filesort.cc). Note that this only works on IEEE values.
1887 Note also that this code assumes that NaN and +/-Infinity are never
1888 allowed in the database.
1889*/
1890int Rdb_key_def::unpack_double(
1891 Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
1892 Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
1893 Rdb_string_reader *const reader,
1894 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1895 static double zero_val = 0.0;
1896 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
1897
1898 return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
1899 zero_pattern, (const uchar *)&zero_val,
1900 rdb_swap_double_bytes);
1901}
1902
1903#if !defined(FLT_EXP_DIG)
1904#define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
1905#endif
1906
1907/*
1908 Function of type rdb_index_field_unpack_t
1909
1910 Unpack a float by doing the reverse action of Field_float::make_sort_key
1911 (sql/field.cc). Note that this only works on IEEE values.
1912 Note also that this code assumes that NaN and +/-Infinity are never
1913 allowed in the database.
1914*/
1915int Rdb_key_def::unpack_float(
1916 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
1917 uchar *const field_ptr, Rdb_string_reader *const reader,
1918 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1919 static float zero_val = 0.0;
1920 static const uchar zero_pattern[4] = {128, 0, 0, 0};
1921
1922 return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
1923 zero_pattern, (const uchar *)&zero_val,
1924 rdb_swap_float_bytes);
1925}
1926
1927/*
1928 Function of type rdb_index_field_unpack_t used to
1929 Unpack by doing the reverse action to Field_newdate::make_sort_key.
1930*/
1931
1932int Rdb_key_def::unpack_newdate(
1933 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
1934 uchar *const field_ptr, Rdb_string_reader *const reader,
1935 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1936 const char *from;
1937 DBUG_ASSERT(fpi->m_max_image_len == 3);
1938
1939 if (!(from = reader->read(3)))
1940 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1941
1942 field_ptr[0] = from[2];
1943 field_ptr[1] = from[1];
1944 field_ptr[2] = from[0];
1945 return UNPACK_SUCCESS;
1946}
1947
1948/*
1949 Function of type rdb_index_field_unpack_t, used to
1950 Unpack the string by copying it over.
1951 This is for BINARY(n) where the value occupies the whole length.
1952*/
1953
1954int Rdb_key_def::unpack_binary_str(
1955 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1956 Rdb_string_reader *const reader,
1957 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1958 const char *from;
1959 if (!(from = reader->read(fpi->m_max_image_len)))
1960 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1961
1962 memcpy(to, from, fpi->m_max_image_len);
1963 return UNPACK_SUCCESS;
1964}
1965
1966/*
1967 Function of type rdb_index_field_unpack_t.
1968 For UTF-8, we need to convert 2-byte wide-character entities back into
1969 UTF8 sequences.
1970*/
1971
1972int Rdb_key_def::unpack_utf8_str(
1973 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
1974 Rdb_string_reader *const reader,
1975 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
1976 my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
1977 const uchar *src;
1978 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len)))
1979 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1980
1981 const uchar *const src_end = src + fpi->m_max_image_len;
1982 uchar *const dst_end = dst + field->pack_length();
1983
1984 while (src < src_end) {
1985 my_wc_t wc = (src[0] << 8) | src[1];
1986 src += 2;
1987 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1988 DBUG_ASSERT(res > 0 && res <= 3);
1989 if (res < 0)
1990 return UNPACK_FAILURE;
1991 dst += res;
1992 }
1993
1994 cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
1995 cset->pad_char);
1996 return UNPACK_SUCCESS;
1997}
1998
1999/*
2000 This is the original algorithm to encode a variable binary field. It
2001 sets a flag byte every Nth byte. The flag value is (255 - #pad) where
2002 #pad is the number of padding bytes that were needed (0 if all N-1
2003 bytes were used).
2004
2005 If N=8 and the field is:
2006 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2007 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2008 And the 4 byte string compares as greater than the 3 byte string
2009
2010 Unfortunately the algorithm has a flaw. If the input is exactly a
2011 multiple of N-1, an extra N bytes are written. Since we usually use
2012 N=9, an 8 byte input will generate 18 bytes of output instead of the
2013 9 bytes of output that is optimal.
2014
2015 See pack_variable_format for the newer algorithm.
2016*/
2017void Rdb_key_def::pack_legacy_variable_format(
2018 const uchar *src, // The data to encode
2019 size_t src_len, // The length of the data to encode
2020 uchar **dst) const // The location to encode the data
2021{
2022 size_t copy_len;
2023 size_t padding_bytes;
2024 uchar *ptr = *dst;
2025
2026 do {
2027 copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2028 padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2029 memcpy(ptr, src, copy_len);
2030 ptr += copy_len;
2031 src += copy_len;
2032 // pad with zeros if necessary
2033 if (padding_bytes > 0) {
2034 memset(ptr, 0, padding_bytes);
2035 ptr += padding_bytes;
2036 }
2037
2038 *(ptr++) = 255 - padding_bytes;
2039
2040 src_len -= copy_len;
2041 } while (padding_bytes == 0);
2042
2043 *dst = ptr;
2044}
2045
2046/*
2047 This is the new algorithm. Similarly to the legacy format the input
2048 is split up into N-1 bytes and a flag byte is used as the Nth byte
2049 in the output.
2050
2051 - If the previous segment needed any padding the flag is set to the
2052 number of bytes used (0..N-2). 0 is possible in the first segment
2053 if the input is 0 bytes long.
2054 - If no padding was used and there is no more data left in the input
2055 the flag is set to N-1
2056 - If no padding was used and there is still data left in the input the
2057 flag is set to N.
2058
2059 For N=9, the following input values encode to the specified
2060 outout (where 'X' indicates a byte of the original input):
2061 - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0
2062 - 1 byte is encoded as X 0 0 0 0 0 0 0 1
2063 - 2 bytes is encoded as X X 0 0 0 0 0 0 2
2064 - 7 bytes is encoded as X X X X X X X 0 7
2065 - 8 bytes is encoded as X X X X X X X X 8
2066 - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2067 - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2068*/
2069void Rdb_key_def::pack_variable_format(
2070 const uchar *src, // The data to encode
2071 size_t src_len, // The length of the data to encode
2072 uchar **dst) const // The location to encode the data
2073{
2074 uchar *ptr = *dst;
2075
2076 for (;;) {
2077 // Figure out how many bytes to copy, copy them and adjust pointers
2078 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2079 memcpy(ptr, src, copy_len);
2080 ptr += copy_len;
2081 src += copy_len;
2082 src_len -= copy_len;
2083
2084 // Are we at the end of the input?
2085 if (src_len == 0) {
2086 // pad with zeros if necessary;
2087 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2088 if (padding_bytes > 0) {
2089 memset(ptr, 0, padding_bytes);
2090 ptr += padding_bytes;
2091 }
2092
2093 // Put the flag byte (0 - N-1) in the output
2094 *(ptr++) = (uchar)copy_len;
2095 break;
2096 }
2097
2098 // We have more data - put the flag byte (N) in and continue
2099 *(ptr++) = RDB_ESCAPE_LENGTH;
2100 }
2101
2102 *dst = ptr;
2103}
2104
2105/*
2106 Function of type rdb_index_field_pack_t
2107*/
2108
2109void Rdb_key_def::pack_with_varchar_encoding(
2110 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2111 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) const {
2112 const CHARSET_INFO *const charset = field->charset();
2113 Field_varstring *const field_var = (Field_varstring *)field;
2114
2115 const size_t value_length = (field_var->length_bytes == 1)
2116 ? (uint)*field->ptr
2117 : uint2korr(field->ptr);
2118 size_t xfrm_len = charset->coll->strnxfrm(
2119 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2120 field_var->ptr + field_var->length_bytes, value_length, 0);
2121
2122 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2123 if (use_legacy_varbinary_format()) {
2124 pack_legacy_variable_format(buf, xfrm_len, dst);
2125 } else {
2126 pack_variable_format(buf, xfrm_len, dst);
2127 }
2128}
2129
2130/*
2131 Compare the string in [buf..buf_end) with a string that is an infinite
2132 sequence of strings in space_xfrm
2133*/
2134
2135static int
2136rdb_compare_string_with_spaces(const uchar *buf, const uchar *const buf_end,
2137 const std::vector<uchar> *const space_xfrm) {
2138 int cmp = 0;
2139 while (buf < buf_end) {
2140 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2141 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0)
2142 break;
2143 buf += bytes;
2144 }
2145 return cmp;
2146}
2147
2148static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2149/*
2150 Pack the data with Variable-Length Space-Padded Encoding.
2151
2152 The encoding is there to meet two goals:
2153
2154 Goal#1. Comparison. The SQL standard says
2155
2156 " If the collation for the comparison has the PAD SPACE characteristic,
2157 for the purposes of the comparison, the shorter value is effectively
2158 extended to the length of the longer by concatenation of <space>s on the
2159 right.
2160
2161 At the moment, all MySQL collations except one have the PAD SPACE
2162 characteristic. The exception is the "binary" collation that is used by
2163 [VAR]BINARY columns. (Note that binary collations for specific charsets,
2164 like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2165 the PAD SPACE characteristic).
2166
2167 Goal#2 is to preserve the number of trailing spaces in the original value.
2168
2169 This is achieved by using the following encoding:
2170 The key part:
2171 - Stores mem-comparable image of the column
2172 - It is stored in chunks of fpi->m_segment_size bytes (*)
2173 = If the remainder of the chunk is not occupied, it is padded with mem-
2174 comparable image of the space character (cs->pad_char to be precise).
2175 - The last byte of the chunk shows how the rest of column's mem-comparable
2176 image would compare to mem-comparable image of the column extended with
2177 spaces. There are three possible values.
2178 - VARCHAR_CMP_LESS_THAN_SPACES,
2179 - VARCHAR_CMP_EQUAL_TO_SPACES
2180 - VARCHAR_CMP_GREATER_THAN_SPACES
2181
2182 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2183 is spaces, or something that sorts as spaces, so there is no reason to store
2184 it).
2185
2186 Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2187
2188 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
2189 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2190 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2191 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2192
2193 As mentioned above, the last chunk is padded with mem-comparable images of
2194 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2195
2196 fpi->m_segment_size depends on the used collation. It is chosen to be such
2197 that no mem-comparable image of space will ever stretch across the segments
2198 (see get_segment_size_from_collation).
2199
2200 == The value part (aka unpack_info) ==
2201 The value part stores the number of space characters that one needs to add
2202 when unpacking the string.
2203 - If the number is positive, it means add this many spaces at the end
2204 - If the number is negative, it means padding has added extra spaces which
2205 must be removed.
2206
2207 Storage considerations
2208 - depending on column's max size, the number may occupy 1 or 2 bytes
2209 - the number of spaces that need to be removed is not more than
2210 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2211 then store it as unsigned.
2212
2213 @seealso
2214 unpack_binary_or_utf8_varchar_space_pad
2215 unpack_simple_varchar_space_pad
2216 dummy_make_unpack_info
2217 skip_variable_space_pad
2218*/
2219
2220void Rdb_key_def::pack_with_varchar_space_pad(
2221 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2222 Rdb_pack_field_context *const pack_ctx) const {
2223 Rdb_string_writer *const unpack_info = pack_ctx->writer;
2224 const CHARSET_INFO *const charset = field->charset();
2225 const auto field_var = static_cast<Field_varstring *>(field);
2226
2227 const size_t value_length = (field_var->length_bytes == 1)
2228 ? (uint)*field->ptr
2229 : uint2korr(field->ptr);
2230
2231 const size_t trimmed_len = charset->cset->lengthsp(
2232 charset, (const char *)field_var->ptr + field_var->length_bytes,
2233 value_length);
2234 const size_t xfrm_len = charset->coll->strnxfrm(
2235 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2236 field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2237
2238 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2239 uchar *const buf_end = buf + xfrm_len;
2240
2241 size_t encoded_size = 0;
2242 uchar *ptr = *dst;
2243 size_t padding_bytes;
2244 while (true) {
2245 const size_t copy_len =
2246 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2247 padding_bytes = fpi->m_segment_size - 1 - copy_len;
2248 memcpy(ptr, buf, copy_len);
2249 ptr += copy_len;
2250 buf += copy_len;
2251
2252 if (padding_bytes) {
2253 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2254 ptr += padding_bytes;
2255 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
2256 } else {
2257 // Compare the string suffix with a hypothetical infinite string of
2258 // spaces. It could be that the first difference is beyond the end of
2259 // current chunk.
2260 const int cmp =
2261 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2262
2263 if (cmp < 0)
2264 *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2265 else if (cmp > 0)
2266 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2267 else {
2268 // It turns out all the rest are spaces.
2269 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2270 }
2271 }
2272 encoded_size += fpi->m_segment_size;
2273
2274 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES)
2275 break;
2276 }
2277
2278 // m_unpack_info_stores_value means unpack_info stores the whole original
2279 // value. There is no need to store the number of trimmed/padded endspaces
2280 // in that case.
2281 if (unpack_info && !fpi->m_unpack_info_stores_value) {
2282 // (value_length - trimmed_len) is the number of trimmed space *characters*
2283 // then, padding_bytes is the number of *bytes* added as padding
2284 // then, we add 8, because we don't store negative values.
2285 DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2286 DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2287 const size_t removed_chars =
2288 RDB_TRIMMED_CHARS_OFFSET +
2289 (value_length - trimmed_len) / fpi->space_mb_len -
2290 padding_bytes / fpi->space_xfrm_len;
2291
2292 if (fpi->m_unpack_info_uses_two_bytes) {
2293 unpack_info->write_uint16(removed_chars);
2294 } else {
2295 DBUG_ASSERT(removed_chars < 0x100);
2296 unpack_info->write_uint8(removed_chars);
2297 }
2298 }
2299
2300 *dst += encoded_size;
2301}
2302
2303/*
2304 Calculate the number of used bytes in the chunk and whether this is the
2305 last chunk in the input. This is based on the old legacy format - see
2306 pack_legacy_variable_format.
2307 */
2308uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag,
2309 bool *done) const {
2310 uint pad = 255 - flag;
2311 uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2312 if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2313 return (uint)-1;
2314 }
2315
2316 *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2317 return used_bytes;
2318}
2319
2320/*
2321 Calculate the number of used bytes in the chunk and whether this is the
2322 last chunk in the input. This is based on the new format - see
2323 pack_variable_format.
2324 */
2325uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) const {
2326 // Check for invalid flag values
2327 if (flag > RDB_ESCAPE_LENGTH) {
2328 return (uint)-1;
2329 }
2330
2331 // Values from 1 to N-1 indicate this is the last chunk and that is how
2332 // many bytes were used
2333 if (flag < RDB_ESCAPE_LENGTH) {
2334 *done = true;
2335 return flag;
2336 }
2337
2338 // A value of N means we used N-1 bytes and had more to go
2339 *done = false;
2340 return RDB_ESCAPE_LENGTH - 1;
2341}
2342
2343/*
2344 Unpack data that has charset information. Each two bytes of the input is
2345 treated as a wide-character and converted to its multibyte equivalent in
2346 the output.
2347 */
2348static int
2349unpack_charset(const CHARSET_INFO *cset, // character set information
2350 const uchar *src, // source data to unpack
2351 uint src_len, // length of source data
2352 uchar *dst, // destination of unpacked data
2353 uint dst_len, // length of destination data
2354 uint *used_bytes) // output number of bytes used
2355{
2356 if (src_len & 1) {
2357 /*
2358 UTF-8 characters are encoded into two-byte entities. There is no way
2359 we can have an odd number of bytes after encoding.
2360 */
2361 return UNPACK_FAILURE;
2362 }
2363
2364 uchar *dst_end = dst + dst_len;
2365 uint used = 0;
2366
2367 for (uint ii = 0; ii < src_len; ii += 2) {
2368 my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2369 int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2370 DBUG_ASSERT(res > 0 && res <= 3);
2371 if (res < 0) {
2372 return UNPACK_FAILURE;
2373 }
2374
2375 used += res;
2376 }
2377
2378 *used_bytes = used;
2379 return UNPACK_SUCCESS;
2380}
2381
2382/*
2383 Function of type rdb_index_field_unpack_t
2384*/
2385
2386int Rdb_key_def::unpack_binary_or_utf8_varchar(
2387 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2388 Rdb_string_reader *const reader,
2389 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) const {
2390 const uchar *ptr;
2391 size_t len = 0;
2392 bool finished = false;
2393 uchar *d0 = dst;
2394 Field_varstring *const field_var = (Field_varstring *)field;
2395 dst += field_var->length_bytes;
2396 // How much we can unpack
2397 size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2398
2399 bool use_legacy_format = use_legacy_varbinary_format();
2400
2401 /* Decode the length-emitted encoding here */
2402 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2403 uint used_bytes;
2404
2405 /* See pack_with_varchar_encoding. */
2406 if (use_legacy_format) {
2407 used_bytes = calc_unpack_legacy_variable_format(
2408 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2409 } else {
2410 used_bytes =
2411 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2412 }
2413
2414 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2415 return UNPACK_FAILURE; // Corruption in the data
2416 }
2417
2418 /*
2419 Now, we need to decode used_bytes of data and append them to the value.
2420 */
2421 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2422 int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2423 dst_len, &used_bytes);
2424 if (err != UNPACK_SUCCESS) {
2425 return err;
2426 }
2427 } else {
2428 memcpy(dst, ptr, used_bytes);
2429 }
2430
2431 dst += used_bytes;
2432 dst_len -= used_bytes;
2433 len += used_bytes;
2434
2435 if (finished) {
2436 break;
2437 }
2438 }
2439
2440 if (!finished) {
2441 return UNPACK_FAILURE;
2442 }
2443
2444 /* Save the length */
2445 if (field_var->length_bytes == 1) {
2446 d0[0] = (uchar)len;
2447 } else {
2448 DBUG_ASSERT(field_var->length_bytes == 2);
2449 int2store(d0, len);
2450 }
2451 return UNPACK_SUCCESS;
2452}
2453
2454/*
2455 @seealso
2456 pack_with_varchar_space_pad - packing function
2457 unpack_simple_varchar_space_pad - unpacking function for 'simple'
2458 charsets.
2459 skip_variable_space_pad - skip function
2460*/
2461int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2462 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2463 Rdb_string_reader *const reader,
2464 Rdb_string_reader *const unp_reader) const {
2465 const uchar *ptr;
2466 size_t len = 0;
2467 bool finished = false;
2468 Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2469 uchar *d0 = dst;
2470 uchar *dst_end = dst + field_var->pack_length();
2471 dst += field_var->length_bytes;
2472
2473 uint space_padding_bytes = 0;
2474 uint extra_spaces;
2475 if ((fpi->m_unpack_info_uses_two_bytes
2476 ? unp_reader->read_uint16(&extra_spaces)
2477 : unp_reader->read_uint8(&extra_spaces))) {
2478 return UNPACK_FAILURE;
2479 }
2480
2481 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2482 space_padding_bytes =
2483 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2484 extra_spaces = 0;
2485 } else
2486 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2487
2488 space_padding_bytes *= fpi->space_xfrm_len;
2489
2490 /* Decode the length-emitted encoding here */
2491 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2492 const char last_byte = ptr[fpi->m_segment_size - 1];
2493 size_t used_bytes;
2494 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
2495 {
2496 if (space_padding_bytes > (fpi->m_segment_size - 1))
2497 return UNPACK_FAILURE; // Cannot happen, corrupted data
2498 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2499 finished = true;
2500 } else {
2501 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2502 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2503 return UNPACK_FAILURE; // Invalid value
2504 }
2505 used_bytes = fpi->m_segment_size - 1;
2506 }
2507
2508 // Now, need to decode used_bytes of data and append them to the value.
2509 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2510 if (used_bytes & 1) {
2511 /*
2512 UTF-8 characters are encoded into two-byte entities. There is no way
2513 we can have an odd number of bytes after encoding.
2514 */
2515 return UNPACK_FAILURE;
2516 }
2517
2518 const uchar *src = ptr;
2519 const uchar *const src_end = ptr + used_bytes;
2520 while (src < src_end) {
2521 my_wc_t wc = (src[0] << 8) | src[1];
2522 src += 2;
2523 const CHARSET_INFO *cset = fpi->m_varchar_charset;
2524 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2525 DBUG_ASSERT(res <= 3);
2526 if (res <= 0)
2527 return UNPACK_FAILURE;
2528 dst += res;
2529 len += res;
2530 }
2531 } else {
2532 if (dst + used_bytes > dst_end)
2533 return UNPACK_FAILURE;
2534 memcpy(dst, ptr, used_bytes);
2535 dst += used_bytes;
2536 len += used_bytes;
2537 }
2538
2539 if (finished) {
2540 if (extra_spaces) {
2541 // Both binary and UTF-8 charset store space as ' ',
2542 // so the following is ok:
2543 if (dst + extra_spaces > dst_end)
2544 return UNPACK_FAILURE;
2545 memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2546 len += extra_spaces;
2547 }
2548 break;
2549 }
2550 }
2551
2552 if (!finished)
2553 return UNPACK_FAILURE;
2554
2555 /* Save the length */
2556 if (field_var->length_bytes == 1) {
2557 d0[0] = (uchar)len;
2558 } else {
2559 DBUG_ASSERT(field_var->length_bytes == 2);
2560 int2store(d0, len);
2561 }
2562 return UNPACK_SUCCESS;
2563}
2564
2565/////////////////////////////////////////////////////////////////////////
2566
2567/*
2568 Function of type rdb_make_unpack_info_t
2569*/
2570
2571void Rdb_key_def::make_unpack_unknown(
2572 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2573 const Field *const field, Rdb_pack_field_context *const pack_ctx) const {
2574 pack_ctx->writer->write(field->ptr, field->pack_length());
2575}
2576
2577/*
2578 This point of this function is only to indicate that unpack_info is
2579 available.
2580
2581 The actual unpack_info data is produced by the function that packs the key,
2582 that is, pack_with_varchar_space_pad.
2583*/
2584
2585void Rdb_key_def::dummy_make_unpack_info(
2586 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2587 const Field *field MY_ATTRIBUTE((__unused__)),
2588 Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) const {
2589 // Do nothing
2590}
2591
2592/*
2593 Function of type rdb_index_field_unpack_t
2594*/
2595
2596int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2597 Field *const field, uchar *const dst,
2598 Rdb_string_reader *const reader,
2599 Rdb_string_reader *const unp_reader) const {
2600 const uchar *ptr;
2601 const uint len = fpi->m_unpack_data_len;
2602 // We don't use anything from the key, so skip over it.
2603 if (skip_max_length(fpi, field, reader)) {
2604 return UNPACK_FAILURE;
2605 }
2606
2607 DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2608
2609 if ((ptr = (const uchar *)unp_reader->read(len))) {
2610 memcpy(dst, ptr, len);
2611 return UNPACK_SUCCESS;
2612 }
2613 return UNPACK_FAILURE;
2614}
2615
2616/*
2617 Function of type rdb_make_unpack_info_t
2618*/
2619
2620void Rdb_key_def::make_unpack_unknown_varchar(
2621 const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2622 const Field *const field, Rdb_pack_field_context *const pack_ctx) const {
2623 const auto f = static_cast<const Field_varstring *>(field);
2624 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2625 len += f->length_bytes;
2626 pack_ctx->writer->write(field->ptr, len);
2627}
2628
2629/*
2630 Function of type rdb_index_field_unpack_t
2631
2632 @detail
2633 Unpack a key part in an "unknown" collation from its
2634 (mem_comparable_form, unpack_info) form.
2635
2636 "Unknown" means we have no clue about how mem_comparable_form is made from
2637 the original string, so we keep the whole original string in the unpack_info.
2638
2639 @seealso
2640 make_unpack_unknown, unpack_unknown
2641*/
2642
2643int Rdb_key_def::unpack_unknown_varchar(
2644 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2645 Rdb_string_reader *const reader,
2646 Rdb_string_reader *const unp_reader) const {
2647 const uchar *ptr;
2648 uchar *const d0 = dst;
2649 const auto f = static_cast<Field_varstring *>(field);
2650 dst += f->length_bytes;
2651 const uint len_bytes = f->length_bytes;
2652 // We don't use anything from the key, so skip over it.
2653 if ((this->*fpi->m_skip_func)(fpi, field, reader)) {
2654 return UNPACK_FAILURE;
2655 }
2656
2657 DBUG_ASSERT(len_bytes > 0);
2658 DBUG_ASSERT(unp_reader != nullptr);
2659
2660 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2661 memcpy(d0, ptr, len_bytes);
2662 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2663 if ((ptr = (const uchar *)unp_reader->read(len))) {
2664 memcpy(dst, ptr, len);
2665 return UNPACK_SUCCESS;
2666 }
2667 }
2668 return UNPACK_FAILURE;
2669}
2670
2671/*
2672 Write unpack_data for a "simple" collation
2673*/
2674static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2675 const Rdb_collation_codec *const codec,
2676 const uchar *const src,
2677 const size_t src_len) {
2678 for (uint i = 0; i < src_len; i++) {
2679 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2680 }
2681}
2682
2683static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2684 const Rdb_collation_codec *const codec,
2685 const uchar *const src,
2686 const size_t &src_len, uchar *const dst) {
2687 for (uint i = 0; i < src_len; i++) {
2688 if (codec->m_dec_size[src[i]] > 0) {
2689 uint *ret;
2690 DBUG_ASSERT(reader != nullptr);
2691
2692 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2693 return UNPACK_FAILURE;
2694 }
2695 dst[i] = codec->m_dec_idx[*ret][src[i]];
2696 } else {
2697 dst[i] = codec->m_dec_idx[0][src[i]];
2698 }
2699 }
2700
2701 return UNPACK_SUCCESS;
2702}
2703
2704/*
2705 Function of type rdb_make_unpack_info_t
2706
2707 @detail
2708 Make unpack_data for VARCHAR(n) in a "simple" charset.
2709*/
2710
2711void Rdb_key_def::make_unpack_simple_varchar(
2712 const Rdb_collation_codec *const codec, const Field *const field,
2713 Rdb_pack_field_context *const pack_ctx) const {
2714 const auto f = static_cast<const Field_varstring *>(field);
2715 uchar *const src = f->ptr + f->length_bytes;
2716 const size_t src_len =
2717 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2718 Rdb_bit_writer bit_writer(pack_ctx->writer);
2719 // The std::min compares characters with bytes, but for simple collations,
2720 // mbmaxlen = 1.
2721 rdb_write_unpack_simple(&bit_writer, codec, src,
2722 std::min((size_t)f->char_length(), src_len));
2723}
2724
2725/*
2726 Function of type rdb_index_field_unpack_t
2727
2728 @seealso
2729 pack_with_varchar_space_pad - packing function
2730 unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2731*/
2732
2733int Rdb_key_def::unpack_simple_varchar_space_pad(
2734 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2735 Rdb_string_reader *const reader,
2736 Rdb_string_reader *const unp_reader) const {
2737 const uchar *ptr;
2738 size_t len = 0;
2739 bool finished = false;
2740 uchar *d0 = dst;
2741 const Field_varstring *const field_var =
2742 static_cast<Field_varstring *>(field);
2743 // For simple collations, char_length is also number of bytes.
2744 DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2745 uchar *dst_end = dst + field_var->pack_length();
2746 dst += field_var->length_bytes;
2747 Rdb_bit_reader bit_reader(unp_reader);
2748
2749 uint space_padding_bytes = 0;
2750 uint extra_spaces;
2751 DBUG_ASSERT(unp_reader != nullptr);
2752
2753 if ((fpi->m_unpack_info_uses_two_bytes
2754 ? unp_reader->read_uint16(&extra_spaces)
2755 : unp_reader->read_uint8(&extra_spaces))) {
2756 return UNPACK_FAILURE;
2757 }
2758
2759 if (extra_spaces <= 8) {
2760 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2761 extra_spaces = 0;
2762 } else
2763 extra_spaces -= 8;
2764
2765 space_padding_bytes *= fpi->space_xfrm_len;
2766
2767 /* Decode the length-emitted encoding here */
2768 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2769 const char last_byte =
2770 ptr[fpi->m_segment_size - 1]; // number of padding bytes
2771 size_t used_bytes;
2772 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2773 // this is the last one
2774 if (space_padding_bytes > (fpi->m_segment_size - 1))
2775 return UNPACK_FAILURE; // Cannot happen, corrupted data
2776 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2777 finished = true;
2778 } else {
2779 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2780 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2781 return UNPACK_FAILURE;
2782 }
2783 used_bytes = fpi->m_segment_size - 1;
2784 }
2785
2786 if (dst + used_bytes > dst_end) {
2787 // The value on disk is longer than the field definition allows?
2788 return UNPACK_FAILURE;
2789 }
2790
2791 uint ret;
2792 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2793 used_bytes, dst)) != UNPACK_SUCCESS) {
2794 return ret;
2795 }
2796
2797 dst += used_bytes;
2798 len += used_bytes;
2799
2800 if (finished) {
2801 if (extra_spaces) {
2802 if (dst + extra_spaces > dst_end)
2803 return UNPACK_FAILURE;
2804 // pad_char has a 1-byte form in all charsets that
2805 // are handled by rdb_init_collation_mapping.
2806 memset(dst, field_var->charset()->pad_char, extra_spaces);
2807 len += extra_spaces;
2808 }
2809 break;
2810 }
2811 }
2812
2813 if (!finished)
2814 return UNPACK_FAILURE;
2815
2816 /* Save the length */
2817 if (field_var->length_bytes == 1) {
2818 d0[0] = (uchar)len;
2819 } else {
2820 DBUG_ASSERT(field_var->length_bytes == 2);
2821 int2store(d0, len);
2822 }
2823 return UNPACK_SUCCESS;
2824}
2825
2826/*
2827 Function of type rdb_make_unpack_info_t
2828
2829 @detail
2830 Make unpack_data for CHAR(n) value in a "simple" charset.
2831 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2832
2833 @seealso
2834 The VARCHAR variant is in make_unpack_simple_varchar
2835*/
2836
2837void Rdb_key_def::make_unpack_simple(
2838 const Rdb_collation_codec *const codec, const Field *const field,
2839 Rdb_pack_field_context *const pack_ctx) const {
2840 const uchar *const src = field->ptr;
2841 Rdb_bit_writer bit_writer(pack_ctx->writer);
2842 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2843}
2844
2845/*
2846 Function of type rdb_index_field_unpack_t
2847*/
2848
2849int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2850 Field *const field MY_ATTRIBUTE((__unused__)),
2851 uchar *const dst,
2852 Rdb_string_reader *const reader,
2853 Rdb_string_reader *const unp_reader) const {
2854 const uchar *ptr;
2855 const uint len = fpi->m_max_image_len;
2856 Rdb_bit_reader bit_reader(unp_reader);
2857
2858 if (!(ptr = (const uchar *)reader->read(len))) {
2859 return UNPACK_FAILURE;
2860 }
2861
2862 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2863 fpi->m_charset_codec, ptr, len, dst);
2864}
2865
2866// See Rdb_charset_space_info::spaces_xfrm
2867const int RDB_SPACE_XFRM_SIZE = 32;
2868
2869// A class holding information about how space character is represented in a
2870// charset.
2871class Rdb_charset_space_info {
2872public:
2873 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
2874 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
2875 Rdb_charset_space_info() = default;
2876
2877 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
2878 std::vector<uchar> spaces_xfrm;
2879
2880 // length(strxfrm(' '))
2881 size_t space_xfrm_len;
2882
2883 // length of the space character itself
2884 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
2885 // (length=2)
2886 size_t space_mb_len;
2887};
2888
2889static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
2890 rdb_mem_comparable_space;
2891
2892/*
2893 @brief
2894 For a given charset, get
2895 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
2896 - length of strxfrm(charset, ' ')
2897 - length of the space character in the charset
2898
2899 @param cs IN Charset to get the space for
2900 @param ptr OUT A few space characters
2901 @param len OUT Return length of the space (in bytes)
2902
2903 @detail
2904 It is tempting to pre-generate mem-comparable form of space character for
2905 every charset on server startup.
2906 One can't do that: some charsets are not initialized until somebody
2907 attempts to use them (e.g. create or open a table that has a field that
2908 uses the charset).
2909*/
2910
2911static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
2912 const std::vector<uchar> **xfrm,
2913 size_t *const xfrm_len,
2914 size_t *const mb_len) {
2915 DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
2916 if (!rdb_mem_comparable_space[cs->number].get()) {
2917 RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
2918 if (!rdb_mem_comparable_space[cs->number].get()) {
2919 // Upper bound of how many bytes can be occupied by multi-byte form of a
2920 // character in any charset.
2921 const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
2922 DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
2923
2924 // multi-byte form of the ' ' (space) character
2925 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
2926
2927 const size_t space_mb_len = cs->cset->wc_mb(
2928 cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
2929
2930 uchar space[20]; // mem-comparable image of the space character
2931
2932 const size_t space_len = cs->coll->strnxfrm(cs, space, sizeof(space), 1,
2933 space_mb, space_mb_len, 0);
2934 Rdb_charset_space_info *const info = new Rdb_charset_space_info;
2935 info->space_xfrm_len = space_len;
2936 info->space_mb_len = space_mb_len;
2937 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
2938 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space,
2939 space + space_len);
2940 }
2941 rdb_mem_comparable_space[cs->number].reset(info);
2942 }
2943 RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
2944 }
2945
2946 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
2947 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
2948 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
2949}
2950
2951mysql_mutex_t rdb_mem_cmp_space_mutex;
2952
2953std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
2954 rdb_collation_data;
2955mysql_mutex_t rdb_collation_data_mutex;
2956
2957bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
2958 return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
2959 !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
2960}
2961
2962static const Rdb_collation_codec *
2963rdb_init_collation_mapping(const my_core::CHARSET_INFO *const cs) {
2964 DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
2965 const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
2966
2967 if (codec == nullptr && rdb_is_collation_supported(cs)) {
2968 RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
2969
2970 codec = rdb_collation_data[cs->number];
2971 if (codec == nullptr) {
2972 Rdb_collation_codec *cur = nullptr;
2973
2974 // Compute reverse mapping for simple collations.
2975 if (rdb_is_collation_supported(cs)) {
2976 cur = new Rdb_collation_codec;
2977 std::map<uchar, std::vector<uchar>> rev_map;
2978 size_t max_conflict_size = 0;
2979 for (int src = 0; src < 256; src++) {
2980 uchar dst = cs->sort_order[src];
2981 rev_map[dst].push_back(src);
2982 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
2983 }
2984 cur->m_dec_idx.resize(max_conflict_size);
2985
2986 for (auto const &p : rev_map) {
2987 uchar dst = p.first;
2988 for (uint idx = 0; idx < p.second.size(); idx++) {
2989 uchar src = p.second[idx];
2990 uchar bits =
2991 my_bit_log2(my_round_up_to_next_power(p.second.size()));
2992 cur->m_enc_idx[src] = idx;
2993 cur->m_enc_size[src] = bits;
2994 cur->m_dec_size[dst] = bits;
2995 cur->m_dec_idx[idx][dst] = src;
2996 }
2997 }
2998
2999 cur->m_make_unpack_info_func = {
3000 &Rdb_key_def::make_unpack_simple_varchar,
3001 &Rdb_key_def::make_unpack_simple};
3002 cur->m_unpack_func = {&Rdb_key_def::unpack_simple_varchar_space_pad,
3003 &Rdb_key_def::unpack_simple};
3004 } else {
3005 // Out of luck for now.
3006 }
3007
3008 if (cur != nullptr) {
3009 codec = cur;
3010 cur->m_cs = cs;
3011 rdb_collation_data[cs->number] = cur;
3012 }
3013 }
3014
3015 RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3016 }
3017
3018 return codec;
3019}
3020
3021static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3022 int ret;
3023 if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3024 cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3025 /*
3026 In these collations, a character produces one weight, which is 3 bytes.
3027 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3028 get 3*3+1=10
3029 */
3030 ret = 10;
3031 } else {
3032 /*
3033 All other collations. There are two classes:
3034 - Unicode-based, except for collations mentioned in the if-condition.
3035 For these all weights are 2 bytes long, a character may produce 0..8
3036 weights.
3037 in any case, 8 bytes of payload in the segment guarantee that the last
3038 space character won't span across segments.
3039
3040 - Collations not based on unicode. These have length(strxfrm(' '))=1,
3041 there nothing to worry about.
3042
3043 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3044 */
3045 ret = 9;
3046 }
3047 DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3048 return ret;
3049}
3050
3051/*
3052 @brief
3053 Setup packing of index field into its mem-comparable form
3054
3055 @detail
3056 - It is possible produce mem-comparable form for any datatype.
3057 - Some datatypes also allow to unpack the original value from its
3058 mem-comparable form.
3059 = Some of these require extra information to be stored in "unpack_info".
3060 unpack_info is not a part of mem-comparable form, it is only used to
3061 restore the original value
3062
3063 @param
3064 field IN field to be packed/un-packed
3065
3066 @return
3067 TRUE - Field can be read with index-only reads
3068 FALSE - Otherwise
3069*/
3070
3071bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3072 const Field *const field, const uint &keynr_arg,
3073 const uint &key_part_arg,
3074 const uint16 &key_length) {
3075 int res = false;
3076 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3077
3078 m_keynr = keynr_arg;
3079 m_key_part = key_part_arg;
3080
3081 m_maybe_null = field ? field->real_maybe_null() : false;
3082 m_unpack_func = nullptr;
3083 m_make_unpack_info_func = nullptr;
3084 m_unpack_data_len = 0;
3085 space_xfrm = nullptr; // safety
3086
3087 /* Calculate image length. By default, is is pack_length() */
3088 m_max_image_len =
3089 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3090 m_skip_func = &Rdb_key_def::skip_max_length;
3091 m_pack_func = &Rdb_key_def::pack_with_make_sort_key;
3092
3093 m_covered = false;
3094
3095 switch (type) {
3096 case MYSQL_TYPE_LONGLONG:
3097 case MYSQL_TYPE_LONG:
3098 case MYSQL_TYPE_INT24:
3099 case MYSQL_TYPE_SHORT:
3100 case MYSQL_TYPE_TINY:
3101 m_unpack_func = &Rdb_key_def::unpack_integer;
3102 m_covered = true;
3103 return true;
3104
3105 case MYSQL_TYPE_DOUBLE:
3106 m_unpack_func = &Rdb_key_def::unpack_double;
3107 m_covered = true;
3108 return true;
3109
3110 case MYSQL_TYPE_FLOAT:
3111 m_unpack_func = &Rdb_key_def::unpack_float;
3112 m_covered = true;
3113 return true;
3114
3115 case MYSQL_TYPE_NEWDECIMAL:
3116 /*
3117 Decimal is packed with Field_new_decimal::make_sort_key, which just
3118 does memcpy.
3119 Unpacking decimal values was supported only after fix for issue#253,
3120 because of that ha_rocksdb::get_storage_type() handles decimal values
3121 in a special way.
3122 */
3123 case MYSQL_TYPE_DATETIME2:
3124 case MYSQL_TYPE_TIMESTAMP2:
3125 /* These are packed with Field_temporal_with_date_and_timef::make_sort_key */
3126 case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3127 case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */
3128 /* Everything that comes here is packed with just a memcpy(). */
3129 m_unpack_func = &Rdb_key_def::unpack_binary_str;
3130 m_covered = true;
3131 return true;
3132
3133 case MYSQL_TYPE_NEWDATE:
3134 /*
3135 This is packed by Field_newdate::make_sort_key. It assumes the data is
3136 3 bytes, and packing is done by swapping the byte order (for both big-
3137 and little-endian)
3138 */
3139 m_unpack_func = &Rdb_key_def::unpack_newdate;
3140 m_covered = true;
3141 return true;
3142 case MYSQL_TYPE_TINY_BLOB:
3143 case MYSQL_TYPE_MEDIUM_BLOB:
3144 case MYSQL_TYPE_LONG_BLOB:
3145 case MYSQL_TYPE_BLOB: {
3146 if (key_descr) {
3147 // The my_charset_bin collation is special in that it will consider
3148 // shorter strings sorting as less than longer strings.
3149 //
3150 // See Field_blob::make_sort_key for details.
3151 m_max_image_len =
3152 key_length + (field->charset()->number == COLLATION_BINARY
3153 ? reinterpret_cast<const Field_blob *>(field)
3154 ->pack_length_no_ptr()
3155 : 0);
3156 // Return false because indexes on text/blob will always require
3157 // a prefix. With a prefix, the optimizer will not be able to do an
3158 // index-only scan since there may be content occuring after the prefix
3159 // length.
3160 return false;
3161 }
3162 }
3163 default:
3164 break;
3165 }
3166
3167 m_unpack_info_stores_value = false;
3168 /* Handle [VAR](CHAR|BINARY) */
3169
3170 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3171 /*
3172 For CHAR-based columns, check how strxfrm image will take.
3173 field->field_length = field->char_length() * cs->mbmaxlen.
3174 */
3175 const CHARSET_INFO *cs = field->charset();
3176 m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3177 }
3178 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3179 const CHARSET_INFO *cs = field->charset();
3180 // max_image_len before chunking is taken into account
3181 const int max_image_len_before_chunks = m_max_image_len;
3182
3183 if (is_varchar) {
3184 // The default for varchar is variable-length, without space-padding for
3185 // comparisons
3186 m_varchar_charset = cs;
3187 m_skip_func = &Rdb_key_def::skip_variable_length;
3188 m_pack_func = &Rdb_key_def::pack_with_varchar_encoding;
3189 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3190 m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3191 } else {
3192 // Calculate the maximum size of the short section plus the
3193 // maximum size of the long section
3194 m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3195 }
3196
3197 const auto field_var = static_cast<const Field_varstring *>(field);
3198 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3199 }
3200
3201 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3202 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3203 // information about character-based datatypes are compared.
3204 bool use_unknown_collation = false;
3205 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3206 use_unknown_collation = true;);
3207
3208 if (cs->number == COLLATION_BINARY) {
3209 // - SQL layer pads BINARY(N) so that it always is N bytes long.
3210 // - For VARBINARY(N), values may have different lengths, so we're using
3211 // variable-length encoding. This is also the only charset where the
3212 // values are not space-padded for comparison.
3213 m_unpack_func = is_varchar ? &Rdb_key_def::unpack_binary_or_utf8_varchar
3214 : &Rdb_key_def::unpack_binary_str;
3215 res = true;
3216 } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3217 // For _bin collations, mem-comparable form of the string is the string
3218 // itself.
3219
3220 if (is_varchar) {
3221 // VARCHARs - are compared as if they were space-padded - but are
3222 // not actually space-padded (reading the value back produces the
3223 // original value, without the padding)
3224 m_unpack_func = &Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3225 m_skip_func = &Rdb_key_def::skip_variable_space_pad;
3226 m_pack_func = &Rdb_key_def::pack_with_varchar_space_pad;
3227 m_make_unpack_info_func = &Rdb_key_def::dummy_make_unpack_info;
3228 m_segment_size = get_segment_size_from_collation(cs);
3229 m_max_image_len =
3230 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3231 m_segment_size;
3232 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3233 &space_mb_len);
3234 } else {
3235 // SQL layer pads CHAR(N) values to their maximum length.
3236 // We just store that and restore it back.
3237 m_unpack_func = (cs->number == COLLATION_LATIN1_BIN) ?
3238 &Rdb_key_def::unpack_binary_str
3239 : &Rdb_key_def::unpack_utf8_str;
3240 }
3241 res = true;
3242 } else {
3243 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3244
3245 res = true; // index-only scans are possible
3246 m_unpack_data_len = is_varchar ? 0 : field->field_length;
3247 const uint idx = is_varchar ? 0 : 1;
3248 const Rdb_collation_codec *codec = nullptr;
3249
3250 if (is_varchar) {
3251 // VARCHAR requires space-padding for doing comparisons
3252 //
3253 // The check for cs->levels_for_order is to catch
3254 // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3255 // that Variable-Length Space Padded Encoding can't handle.
3256 // It is not expected to work for any other multi-level collations,
3257 // either.
3258 // Currently we handle these collations as NO_PAD, even if they have
3259 // PAD_SPACE attribute.
3260 if (cs->levels_for_order == 1) {
3261 m_pack_func = &Rdb_key_def::pack_with_varchar_space_pad;
3262 m_skip_func = &Rdb_key_def::skip_variable_space_pad;
3263 m_segment_size = get_segment_size_from_collation(cs);
3264 m_max_image_len =
3265 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3266 m_segment_size;
3267 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3268 &space_mb_len);
3269 } else {
3270 // NO_LINT_DEBUG
3271 sql_print_warning("RocksDB: you're trying to create an index "
3272 "with a multi-level collation %s",
3273 cs->name);
3274 // NO_LINT_DEBUG
3275 sql_print_warning("MyRocks will handle this collation internally "
3276 " as if it had a NO_PAD attribute.");
3277 m_pack_func = &Rdb_key_def::pack_with_varchar_encoding;
3278 m_skip_func = &Rdb_key_def::skip_variable_length;
3279 }
3280 }
3281
3282 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3283 // The collation allows to store extra information in the unpack_info
3284 // which can be used to restore the original value from the
3285 // mem-comparable form.
3286 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3287 m_unpack_func = codec->m_unpack_func[idx];
3288 m_charset_codec = codec;
3289 } else if (use_unknown_collation) {
3290 // We have no clue about how this collation produces mem-comparable
3291 // form. Our way of restoring the original value is to keep a copy of
3292 // the original value in unpack_info.
3293 m_unpack_info_stores_value = true;
3294 m_make_unpack_info_func =
3295 is_varchar ? &Rdb_key_def::make_unpack_unknown_varchar
3296 : &Rdb_key_def::make_unpack_unknown;
3297 m_unpack_func = is_varchar ? &Rdb_key_def::unpack_unknown_varchar
3298 : &Rdb_key_def::unpack_unknown;
3299 } else {
3300 // Same as above: we don't know how to restore the value from its
3301 // mem-comparable form.
3302 // Here, we just indicate to the SQL layer we can't do it.
3303 DBUG_ASSERT(m_unpack_func == nullptr);
3304 m_unpack_info_stores_value = false;
3305 res = false; // Indicate that index-only reads are not possible
3306 }
3307 }
3308
3309 // Make an adjustment: if this column is partially covered, tell the SQL
3310 // layer we can't do index-only scans. Later when we perform an index read,
3311 // we'll check on a record-by-record basis if we can do an index-only scan
3312 // or not.
3313 uint field_length;
3314 if (field->table) {
3315 field_length = field->table->field[field->field_index]->field_length;
3316 } else {
3317 field_length = field->field_length;
3318 }
3319
3320 if (field_length != key_length) {
3321 res = false;
3322 // If this index doesn't support covered bitmaps, then we won't know
3323 // during a read if the column is actually covered or not. If so, we need
3324 // to assume the column isn't covered and skip it during unpacking.
3325 //
3326 // If key_descr == NULL, then this is a dummy field and we probably don't
3327 // need to perform this step. However, to preserve the behavior before
3328 // this change, we'll only skip this step if we have an index which
3329 // supports covered bitmaps.
3330 if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3331 m_unpack_func = nullptr;
3332 m_make_unpack_info_func = nullptr;
3333 m_unpack_info_stores_value = true;
3334 }
3335 }
3336 }
3337
3338 m_covered = res;
3339 return res;
3340}
3341
3342Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3343 return tbl->key_info[m_keynr].key_part[m_key_part].field;
3344}
3345
3346void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3347 const longlong &hidden_pk_id) const {
3348 DBUG_ASSERT(m_max_image_len == 8);
3349
3350 String to;
3351 rdb_netstr_append_uint64(&to, hidden_pk_id);
3352 memcpy(*dst, to.ptr(), m_max_image_len);
3353
3354 *dst += m_max_image_len;
3355}
3356
3357///////////////////////////////////////////////////////////////////////////////////////////
3358// Rdb_ddl_manager
3359///////////////////////////////////////////////////////////////////////////////////////////
3360
3361Rdb_tbl_def::~Rdb_tbl_def() {
3362 auto ddl_manager = rdb_get_ddl_manager();
3363 /* Don't free key definitions */
3364 if (m_key_descr_arr) {
3365 for (uint i = 0; i < m_key_count; i++) {
3366 if (ddl_manager && m_key_descr_arr[i]) {
3367 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3368 }
3369
3370 m_key_descr_arr[i] = nullptr;
3371 }
3372
3373 delete[] m_key_descr_arr;
3374 m_key_descr_arr = nullptr;
3375 }
3376}
3377
3378/*
3379 Put table definition DDL entry. Actual write is done at
3380 Rdb_dict_manager::commit.
3381
3382 We write
3383 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3384
3385 Where key entries are a tuple of
3386 ( cf_id, index_nr )
3387*/
3388
3389bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3390 rocksdb::WriteBatch *const batch, uchar *const key,
3391 const size_t &keylen) {
3392 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3393 indexes.alloc(Rdb_key_def::VERSION_SIZE +
3394 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3395 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3396
3397 for (uint i = 0; i < m_key_count; i++) {
3398 const Rdb_key_def &kd = *m_key_descr_arr[i];
3399
3400 uchar flags =
3401 (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3402 (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3403
3404 const uint cf_id = kd.get_cf()->GetID();
3405 /*
3406 If cf_id already exists, cf_flags must be the same.
3407 To prevent race condition, reading/modifying/committing CF flags
3408 need to be protected by mutex (dict_manager->lock()).
3409 When RocksDB supports transaction with pessimistic concurrency
3410 control, we can switch to use it and removing mutex.
3411 */
3412 uint existing_cf_flags;
3413 const std::string cf_name = kd.get_cf()->GetName();
3414
3415 if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3416 // For the purposes of comparison we'll clear the partitioning bit. The
3417 // intent here is to make sure that both partitioned and non-partitioned
3418 // tables can refer to the same CF.
3419 existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3420 flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3421
3422 if (existing_cf_flags != flags) {
3423 my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3424 existing_cf_flags);
3425 return true;
3426 }
3427 } else {
3428 dict->add_cf_flags(batch, cf_id, flags);
3429 }
3430
3431 rdb_netstr_append_uint32(&indexes, cf_id);
3432 rdb_netstr_append_uint32(&indexes, kd.m_index_number);
3433
3434 struct Rdb_index_info index_info;
3435 index_info.m_gl_index_id = {cf_id, kd.m_index_number};
3436 index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3437 index_info.m_index_type = kd.m_index_type;
3438 index_info.m_kv_version = kd.m_kv_format_version;
3439 index_info.m_index_flags = kd.m_index_flags_bitmap;
3440 index_info.m_ttl_duration = kd.m_ttl_duration;
3441
3442 dict->add_or_update_index_cf_mapping(batch, &index_info);
3443 }
3444
3445 const rocksdb::Slice skey((char *)key, keylen);
3446 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3447
3448 dict->put_key(batch, skey, svalue);
3449 return false;
3450}
3451
3452// Length that each index flag takes inside the record.
3453// Each index in the array maps to the enum INDEX_FLAG
3454static const std::array<uint, 1> index_flag_lengths = {
3455 {ROCKSDB_SIZEOF_TTL_RECORD}};
3456
3457bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3458 return flag & index_flags;
3459}
3460
3461uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3462 enum INDEX_FLAG flag,
3463 uint *const length) {
3464
3465 DBUG_ASSERT_IMP(flag != MAX_FLAG,
3466 Rdb_key_def::has_index_flag(index_flags, flag));
3467
3468 uint offset = 0;
3469 for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3470 int mask = 1 << bit;
3471
3472 /* Exit once we've reached the proper flag */
3473 if (flag & mask) {
3474 if (length != nullptr) {
3475 *length = index_flag_lengths[bit];
3476 }
3477 break;
3478 }
3479
3480 if (index_flags & mask) {
3481 offset += index_flag_lengths[bit];
3482 }
3483 }
3484
3485 return offset;
3486}
3487
3488void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3489 const uchar *const val,
3490 enum INDEX_FLAG flag) const {
3491 uint len;
3492 uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3493 DBUG_ASSERT(offset + len <= buf->get_current_pos());
3494 memcpy(buf->ptr() + offset, val, len);
3495}
3496
3497void Rdb_tbl_def::check_if_is_mysql_system_table() {
3498 static const char *const system_dbs[] = {
3499 "mysql", "performance_schema", "information_schema",
3500 };
3501
3502 m_is_mysql_system_table = false;
3503 for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3504 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3505 m_is_mysql_system_table = true;
3506 break;
3507 }
3508 }
3509}
3510
3511void Rdb_tbl_def::set_name(const std::string &name) {
3512 int err MY_ATTRIBUTE((__unused__));
3513
3514 m_dbname_tablename = name;
3515 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3516 &m_partition);
3517 DBUG_ASSERT(err == 0);
3518
3519 check_if_is_mysql_system_table();
3520}
3521
3522GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3523 for (uint i = 0; i < m_key_count; i++) {
3524 auto &k = m_key_descr_arr[i];
3525 if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3526 k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3527 return k->get_gl_index_id();
3528 }
3529 }
3530
3531 // Every table must have a primary key, even if it's hidden.
3532 abort();
3533 return GL_INDEX_ID();
3534}
3535
3536/*
3537 Static function of type my_hash_get_key that gets invoked by
3538 the m_ddl_hash object of type my_core::HASH.
3539 It manufactures a key (db+table name in our case) from a record
3540 (Rdb_tbl_def in our case).
3541*/
3542const uchar *
3543Rdb_ddl_manager::get_hash_key(Rdb_tbl_def *const rec, size_t *const length,
3544 my_bool not_used MY_ATTRIBUTE((__unused__))) {
3545 const std::string &dbname_tablename = rec->full_tablename();
3546 *length = dbname_tablename.size();
3547 return reinterpret_cast<const uchar *>(dbname_tablename.c_str());
3548}
3549
3550/*
3551 Static function of type void (*my_hash_free_element_func_t)(void*) that gets
3552 invoked by the m_ddl_hash object of type my_core::HASH.
3553 It deletes a record (Rdb_tbl_def in our case).
3554*/
3555void Rdb_ddl_manager::free_hash_elem(void *const data) {
3556 Rdb_tbl_def *elem = reinterpret_cast<Rdb_tbl_def *>(data);
3557 delete elem;
3558}
3559
3560void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3561 m_index_num_to_keydef.erase(gl_index_id);
3562}
3563
3564void Rdb_ddl_manager::add_uncommitted_keydefs(
3565 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3566 mysql_rwlock_wrlock(&m_rwlock);
3567 for (const auto &index : indexes) {
3568 m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3569 }
3570 mysql_rwlock_unlock(&m_rwlock);
3571}
3572
3573void Rdb_ddl_manager::remove_uncommitted_keydefs(
3574 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3575 mysql_rwlock_wrlock(&m_rwlock);
3576 for (const auto &index : indexes) {
3577 m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3578 }
3579 mysql_rwlock_unlock(&m_rwlock);
3580}
3581
3582namespace // anonymous namespace = not visible outside this source file
3583{
3584struct Rdb_validate_tbls : public Rdb_tables_scanner {
3585 using tbl_info_t = std::pair<std::string, bool>;
3586 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3587
3588 tbl_list_t m_list;
3589
3590 int add_table(Rdb_tbl_def *tdef) override;
3591
3592 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3593
3594 bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3595 bool *has_errors);
3596
3597 bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3598 const std::string &tablename, bool *has_errors);
3599};
3600} // anonymous namespace
3601
3602/*
3603 Get a list of tables that we expect to have .frm files for. This will use the
3604 information just read from the RocksDB data dictionary.
3605*/
3606int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3607 DBUG_ASSERT(tdef != nullptr);
3608
3609 /* Add the database/table into the list that are not temp table */
3610 if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3611 bool is_partition = tdef->base_partition().size() != 0;
3612 m_list[tdef->base_dbname()].insert(
3613 tbl_info_t(tdef->base_tablename(), is_partition));
3614 }
3615
3616 return HA_EXIT_SUCCESS;
3617}
3618
3619/*
3620 Access the .frm file for this dbname/tablename and see if it is a RocksDB
3621 table (or partition table).
3622*/
3623bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3624 const std::string &dbname,
3625 const std::string &tablename,
3626 bool *has_errors) {
3627 /* Check this .frm file to see what engine it uses */
3628 String fullfilename(fullpath.c_str(), &my_charset_bin);
3629 fullfilename.append(FN_DIRSEP);
3630 fullfilename.append(tablename.c_str());
3631 fullfilename.append(".frm");
3632
3633 /*
3634 This function will return the legacy_db_type of the table. Currently
3635 it does not reference the first parameter (THD* thd), but if it ever
3636 did in the future we would need to make a version that does it without
3637 the connection handle as we don't have one here.
3638 */
3639 char eng_type_buf[NAME_CHAR_LEN+1];
3640 LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3641 bool is_sequence;
3642 enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str, &is_sequence);
3643 if (type == TABLE_TYPE_UNKNOWN) {
3644 sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3645 fullfilename.ptr());
3646 return false;
3647 }
3648
3649 if (type == TABLE_TYPE_NORMAL) {
3650 /* For a RocksDB table do we have a reference in the data dictionary? */
3651 if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3652 /*
3653 Attempt to remove the table entry from the list of tables. If this
3654 fails then we know we had a .frm file that wasn't registered in RocksDB.
3655 */
3656 tbl_info_t element(tablename, false);
3657 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3658 sql_print_warning("RocksDB: Schema mismatch - "
3659 "A .frm file exists for table %s.%s, "
3660 "but that table is not registered in RocksDB",
3661 dbname.c_str(), tablename.c_str());
3662 *has_errors = true;
3663 }
3664 } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3665 /*
3666 For partition tables, see if it is in the m_list as a partition,
3667 but don't generate an error if it isn't there - we don't know that the
3668 .frm is for RocksDB.
3669 */
3670 if (m_list.count(dbname) > 0) {
3671 m_list[dbname].erase(tbl_info_t(tablename, true));
3672 }
3673 }
3674 }
3675
3676 return true;
3677}
3678
3679/* Scan the database subdirectory for .frm files */
3680bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3681 const std::string &dbname,
3682 bool *has_errors) {
3683 bool result = true;
3684 std::string fullpath = datadir + dbname;
3685 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3686
3687 /* Access the directory */
3688 if (dir_info == nullptr) {
3689 sql_print_warning("RocksDB: Could not open database directory: %s",
3690 fullpath.c_str());
3691 return false;
3692 }
3693
3694 /* Scan through the files in the directory */
3695 struct fileinfo *file_info = dir_info->dir_entry;
3696 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3697 /* Find .frm files that are not temp files (those that contain '#sql') */
3698 const char *ext = strrchr(file_info->name, '.');
3699 if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3700 strcmp(ext, ".frm") == 0) {
3701 std::string tablename =
3702 std::string(file_info->name, ext - file_info->name);
3703
3704 /* Check to see if the .frm file is from RocksDB */
3705 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3706 result = false;
3707 break;
3708 }
3709 }
3710 }
3711
3712 /* Remove any databases who have no more tables listed */
3713 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3714 m_list.erase(dbname);
3715 }
3716
3717 /* Release the directory entry */
3718 my_dirend(dir_info);
3719
3720 return result;
3721}
3722
3723/*
3724 Scan the datadir for all databases (subdirectories) and get a list of .frm
3725 files they contain
3726*/
3727bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3728 bool *has_errors) {
3729 bool result = true;
3730 struct st_my_dir *dir_info;
3731 struct fileinfo *file_info;
3732
3733 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3734 if (dir_info == nullptr) {
3735 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3736 return false;
3737 }
3738
3739 file_info = dir_info->dir_entry;
3740 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3741 /* Ignore files/dirs starting with '.' */
3742 if (file_info->name[0] == '.')
3743 continue;
3744
3745 /* Ignore all non-directory files */
3746 if (!MY_S_ISDIR(file_info->mystat->st_mode))
3747 continue;
3748
3749 /* Scan all the .frm files in the directory */
3750 if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3751 result = false;
3752 break;
3753 }
3754 }
3755
3756 /* Release the directory info */
3757 my_dirend(dir_info);
3758
3759 return result;
3760}
3761
3762/*
3763 Validate that all auto increment values in the data dictionary are on a
3764 supported version.
3765*/
3766bool Rdb_ddl_manager::validate_auto_incr() {
3767 std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3768
3769 uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3770 rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3771 const rocksdb::Slice auto_incr_entry_slice(
3772 reinterpret_cast<char *>(auto_incr_entry),
3773 Rdb_key_def::INDEX_NUMBER_SIZE);
3774 for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3775 const rocksdb::Slice key = it->key();
3776 const rocksdb::Slice val = it->value();
3777 GL_INDEX_ID gl_index_id;
3778
3779 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3780 memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE))
3781 break;
3782
3783 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3784 return false;
3785 }
3786
3787 if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3788 return false;
3789 }
3790
3791 // Check if we have orphaned entries for whatever reason by cross
3792 // referencing ddl entries.
3793 auto ptr = reinterpret_cast<const uchar *>(key.data());
3794 ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3795 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3796 if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3797 // NO_LINT_DEBUG
3798 sql_print_warning("RocksDB: AUTOINC mismatch - "
3799 "Index number (%u, %u) found in AUTOINC "
3800 "but does not exist as a DDL entry",
3801 gl_index_id.cf_id, gl_index_id.index_id);
3802 return false;
3803 }
3804
3805 ptr = reinterpret_cast<const uchar *>(val.data());
3806 const int version = rdb_netbuf_read_uint16(&ptr);
3807 if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3808 // NO_LINT_DEBUG
3809 sql_print_warning("RocksDB: AUTOINC mismatch - "
3810 "Index number (%u, %u) found in AUTOINC "
3811 "is on unsupported version %d",
3812 gl_index_id.cf_id, gl_index_id.index_id, version);
3813 return false;
3814 }
3815 }
3816
3817 if (!it->status().ok()) {
3818 return false;
3819 }
3820
3821 return true;
3822}
3823
3824/*
3825 Validate that all the tables in the RocksDB database dictionary match the .frm
3826 files in the datadir
3827*/
3828bool Rdb_ddl_manager::validate_schemas(void) {
3829 bool has_errors = false;
3830 const std::string datadir = std::string(mysql_real_data_home);
3831 Rdb_validate_tbls table_list;
3832
3833 /* Get the list of tables from the database dictionary */
3834 if (scan_for_tables(&table_list) != 0) {
3835 return false;
3836 }
3837
3838 /* Compare that to the list of actual .frm files */
3839 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
3840 return false;
3841 }
3842
3843 /*
3844 Any tables left in the tables list are ones that are registered in RocksDB
3845 but don't have .frm files.
3846 */
3847 for (const auto &db : table_list.m_list) {
3848 for (const auto &table : db.second) {
3849 sql_print_warning("RocksDB: Schema mismatch - "
3850 "Table %s.%s is registered in RocksDB "
3851 "but does not have a .frm file",
3852 db.first.c_str(), table.first.c_str());
3853 has_errors = true;
3854 }
3855 }
3856
3857 return !has_errors;
3858}
3859
3860bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
3861 Rdb_cf_manager *const cf_manager,
3862 const uint32_t &validate_tables) {
3863 const ulong TABLE_HASH_SIZE = 32;
3864 m_dict = dict_arg;
3865 mysql_rwlock_init(0, &m_rwlock);
3866 (void)my_hash_init(&m_ddl_hash,
3867 /*system_charset_info*/ &my_charset_bin, TABLE_HASH_SIZE,
3868 0, 0, (my_hash_get_key)Rdb_ddl_manager::get_hash_key,
3869 Rdb_ddl_manager::free_hash_elem, 0);
3870
3871 /* Read the data dictionary and populate the hash */
3872 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3873 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3874 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
3875 Rdb_key_def::INDEX_NUMBER_SIZE);
3876
3877 /* Reading data dictionary should always skip bloom filter */
3878 rocksdb::Iterator *it = m_dict->new_iterator();
3879 int i = 0;
3880
3881 uint max_index_id_in_dict = 0;
3882 m_dict->get_max_index_id(&max_index_id_in_dict);
3883
3884 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
3885 const uchar *ptr;
3886 const uchar *ptr_end;
3887 const rocksdb::Slice key = it->key();
3888 const rocksdb::Slice val = it->value();
3889
3890 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3891 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE))
3892 break;
3893
3894 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
3895 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
3896 (int)key.size());
3897 return true;
3898 }
3899
3900 Rdb_tbl_def *const tdef =
3901 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
3902
3903 // Now, read the DDLs.
3904 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
3905 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
3906 sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
3907 tdef->full_tablename().c_str());
3908 return true;
3909 }
3910 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
3911 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
3912
3913 ptr = reinterpret_cast<const uchar *>(val.data());
3914 const int version = rdb_netbuf_read_uint16(&ptr);
3915 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
3916 sql_print_error("RocksDB: DDL ENTRY Version was not expected."
3917 "Expected: %d, Actual: %d",
3918 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
3919 return true;
3920 }
3921 ptr_end = ptr + real_val_size;
3922 for (uint keyno = 0; ptr < ptr_end; keyno++) {
3923 GL_INDEX_ID gl_index_id;
3924 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3925 uint flags = 0;
3926 struct Rdb_index_info index_info;
3927 if (!m_dict->get_index_info(gl_index_id, &index_info)) {
3928 sql_print_error("RocksDB: Could not get index information "
3929 "for Index Number (%u,%u), table %s",
3930 gl_index_id.cf_id, gl_index_id.index_id,
3931 tdef->full_tablename().c_str());
3932 return true;
3933 }
3934 if (max_index_id_in_dict < gl_index_id.index_id) {
3935 sql_print_error("RocksDB: Found max index id %u from data dictionary "
3936 "but also found larger index id %u from dictionary. "
3937 "This should never happen and possibly a bug.",
3938 max_index_id_in_dict, gl_index_id.index_id);
3939 return true;
3940 }
3941 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
3942 sql_print_error("RocksDB: Could not get Column Family Flags "
3943 "for CF Number %d, table %s",
3944 gl_index_id.cf_id, tdef->full_tablename().c_str());
3945 return true;
3946 }
3947
3948 if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
3949 // The per-index cf option is deprecated. Make sure we don't have the
3950 // flag set in any existing database. NO_LINT_DEBUG
3951 sql_print_error("RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
3952 "number %d, table %s",
3953 gl_index_id.cf_id, tdef->full_tablename().c_str());
3954 }
3955
3956 rocksdb::ColumnFamilyHandle *const cfh =
3957 cf_manager->get_cf(gl_index_id.cf_id);
3958 DBUG_ASSERT(cfh != nullptr);
3959
3960 uint32 ttl_rec_offset =
3961 Rdb_key_def::has_index_flag(index_info.m_index_flags,
3962 Rdb_key_def::TTL_FLAG)
3963 ? Rdb_key_def::calculate_index_flag_offset(
3964 index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
3965 : UINT_MAX;
3966
3967 /*
3968 We can't fully initialize Rdb_key_def object here, because full
3969 initialization requires that there is an open TABLE* where we could
3970 look at Field* objects and set max_length and other attributes
3971 */
3972 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
3973 gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
3974 index_info.m_index_type, index_info.m_kv_version,
3975 flags & Rdb_key_def::REVERSE_CF_FLAG,
3976 flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
3977 m_dict->get_stats(gl_index_id), index_info.m_index_flags,
3978 ttl_rec_offset, index_info.m_ttl_duration);
3979 }
3980 put(tdef);
3981 i++;
3982 }
3983
3984 /*
3985 If validate_tables is greater than 0 run the validation. Only fail the
3986 initialzation if the setting is 1. If the setting is 2 we continue.
3987 */
3988 if (validate_tables > 0) {
3989 std::string msg;
3990 if (!validate_schemas()) {
3991 msg = "RocksDB: Problems validating data dictionary "
3992 "against .frm files, exiting";
3993 } else if (!validate_auto_incr()) {
3994 msg = "RocksDB: Problems validating auto increment values in "
3995 "data dictionary, exiting";
3996 }
3997 if (validate_tables == 1 && !msg.empty()) {
3998 // NO_LINT_DEBUG
3999 sql_print_error("%s", msg.c_str());
4000 return true;
4001 }
4002 }
4003
4004 // index ids used by applications should not conflict with
4005 // data dictionary index ids
4006 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4007 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4008 }
4009
4010 m_sequence.init(max_index_id_in_dict + 1);
4011
4012 if (!it->status().ok()) {
4013 rdb_log_status_error(it->status(), "Table_store load error");
4014 return true;
4015 }
4016 delete it;
4017 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4018 i);
4019 return false;
4020}
4021
4022Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4023 const bool &lock) {
4024 if (lock) {
4025 mysql_rwlock_rdlock(&m_rwlock);
4026 }
4027
4028 Rdb_tbl_def *const rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_search(
4029 &m_ddl_hash, reinterpret_cast<const uchar *>(table_name.c_str()),
4030 table_name.size()));
4031
4032 if (lock) {
4033 mysql_rwlock_unlock(&m_rwlock);
4034 }
4035
4036 return rec;
4037}
4038
4039// this is a safe version of the find() function below. It acquires a read
4040// lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4041// are finding it. Copying it into 'ret' increments the count making sure
4042// that the object will not be discarded until we are finished with it.
4043std::shared_ptr<const Rdb_key_def>
4044Rdb_ddl_manager::safe_find(GL_INDEX_ID gl_index_id) {
4045 std::shared_ptr<const Rdb_key_def> ret(nullptr);
4046
4047 mysql_rwlock_rdlock(&m_rwlock);
4048
4049 auto it = m_index_num_to_keydef.find(gl_index_id);
4050 if (it != m_index_num_to_keydef.end()) {
4051 const auto table_def = find(it->second.first, false);
4052 if (table_def && it->second.second < table_def->m_key_count) {
4053 const auto &kd = table_def->m_key_descr_arr[it->second.second];
4054 if (kd->max_storage_fmt_length() != 0) {
4055 ret = kd;
4056 }
4057 }
4058 } else {
4059 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4060 if (it != m_index_num_to_uncommitted_keydef.end()) {
4061 const auto &kd = it->second;
4062 if (kd->max_storage_fmt_length() != 0) {
4063 ret = kd;
4064 }
4065 }
4066 }
4067
4068 mysql_rwlock_unlock(&m_rwlock);
4069
4070 return ret;
4071}
4072
4073// this method assumes at least read-only lock on m_rwlock
4074const std::shared_ptr<Rdb_key_def> &
4075Rdb_ddl_manager::find(GL_INDEX_ID gl_index_id) {
4076 auto it = m_index_num_to_keydef.find(gl_index_id);
4077 if (it != m_index_num_to_keydef.end()) {
4078 auto table_def = find(it->second.first, false);
4079 if (table_def) {
4080 if (it->second.second < table_def->m_key_count) {
4081 return table_def->m_key_descr_arr[it->second.second];
4082 }
4083 }
4084 } else {
4085 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4086 if (it != m_index_num_to_uncommitted_keydef.end()) {
4087 return it->second;
4088 }
4089 }
4090
4091 static std::shared_ptr<Rdb_key_def> empty = nullptr;
4092
4093 return empty;
4094}
4095
4096// this method returns the name of the table based on an index id. It acquires
4097// a read lock on m_rwlock.
4098const std::string
4099Rdb_ddl_manager::safe_get_table_name(const GL_INDEX_ID &gl_index_id) {
4100 std::string ret;
4101 mysql_rwlock_rdlock(&m_rwlock);
4102 auto it = m_index_num_to_keydef.find(gl_index_id);
4103 if (it != m_index_num_to_keydef.end()) {
4104 ret = it->second.first;
4105 }
4106 mysql_rwlock_unlock(&m_rwlock);
4107 return ret;
4108}
4109
4110void Rdb_ddl_manager::set_stats(
4111 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4112 mysql_rwlock_wrlock(&m_rwlock);
4113 for (auto src : stats) {
4114 const auto &keydef = find(src.second.m_gl_index_id);
4115 if (keydef) {
4116 keydef->m_stats = src.second;
4117 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4118 }
4119 }
4120 mysql_rwlock_unlock(&m_rwlock);
4121}
4122
4123void Rdb_ddl_manager::adjust_stats(
4124 const std::vector<Rdb_index_stats> &new_data,
4125 const std::vector<Rdb_index_stats> &deleted_data) {
4126 mysql_rwlock_wrlock(&m_rwlock);
4127 int i = 0;
4128 for (const auto &data : {new_data, deleted_data}) {
4129 for (const auto &src : data) {
4130 const auto &keydef = find(src.m_gl_index_id);
4131 if (keydef) {
4132 keydef->m_stats.m_distinct_keys_per_prefix.resize(
4133 keydef->get_key_parts());
4134 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4135 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4136 }
4137 }
4138 i++;
4139 }
4140 const bool should_save_stats = !m_stats2store.empty();
4141 mysql_rwlock_unlock(&m_rwlock);
4142 if (should_save_stats) {
4143 // Queue an async persist_stats(false) call to the background thread.
4144 rdb_queue_save_stats_request();
4145 }
4146}
4147
4148void Rdb_ddl_manager::persist_stats(const bool &sync) {
4149 mysql_rwlock_wrlock(&m_rwlock);
4150 const auto local_stats2store = std::move(m_stats2store);
4151 m_stats2store.clear();
4152 mysql_rwlock_unlock(&m_rwlock);
4153
4154 // Persist stats
4155 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4156 std::vector<Rdb_index_stats> stats;
4157 std::transform(local_stats2store.begin(), local_stats2store.end(),
4158 std::back_inserter(stats),
4159 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4160 return s.second;
4161 });
4162 m_dict->add_stats(wb.get(), stats);
4163 m_dict->commit(wb.get(), sync);
4164}
4165
4166/*
4167 Put table definition of `tbl` into the mapping, and also write it to the
4168 on-disk data dictionary.
4169*/
4170
4171int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4172 rocksdb::WriteBatch *const batch) {
4173 uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
4174 uint pos = 0;
4175
4176 rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4177 pos += Rdb_key_def::INDEX_NUMBER_SIZE;
4178
4179 const std::string &dbname_tablename = tbl->full_tablename();
4180 memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
4181 pos += dbname_tablename.size();
4182
4183 int res;
4184 if ((res = tbl->put_dict(m_dict, batch, buf, pos))) {
4185 return res;
4186 }
4187 if ((res = put(tbl))) {
4188 return res;
4189 }
4190 return HA_EXIT_SUCCESS;
4191}
4192
4193/* Return 0 - ok, other value - error */
4194/* TODO:
4195 This function modifies m_ddl_hash and m_index_num_to_keydef.
4196 However, these changes need to be reversed if dict_manager.commit fails
4197 See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4198 Tracked by https://github.com/facebook/mysql-5.6/issues/33
4199*/
4200int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool &lock) {
4201 Rdb_tbl_def *rec;
4202 my_bool result;
4203 const std::string &dbname_tablename = tbl->full_tablename();
4204
4205 if (lock)
4206 mysql_rwlock_wrlock(&m_rwlock);
4207
4208 // We have to do this find because 'tbl' is not yet in the list. We need
4209 // to find the one we are replacing ('rec')
4210 rec = find(dbname_tablename, false);
4211 if (rec) {
4212 // this will free the old record.
4213 my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(rec));
4214 }
4215 result = my_hash_insert(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
4216
4217 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4218 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4219 std::make_pair(dbname_tablename, keyno);
4220 }
4221
4222 if (lock)
4223 mysql_rwlock_unlock(&m_rwlock);
4224 return result;
4225}
4226
4227void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4228 rocksdb::WriteBatch *const batch,
4229 const bool &lock) {
4230 if (lock)
4231 mysql_rwlock_wrlock(&m_rwlock);
4232
4233 uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
4234 uint pos = 0;
4235
4236 rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4237 pos += Rdb_key_def::INDEX_NUMBER_SIZE;
4238
4239 const std::string &dbname_tablename = tbl->full_tablename();
4240 memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
4241 pos += dbname_tablename.size();
4242
4243 const rocksdb::Slice tkey((char *)buf, pos);
4244 m_dict->delete_key(batch, tkey);
4245
4246 /* The following will also delete the object: */
4247 my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
4248
4249 if (lock)
4250 mysql_rwlock_unlock(&m_rwlock);
4251}
4252
4253bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4254 rocksdb::WriteBatch *const batch) {
4255 Rdb_tbl_def *rec;
4256 Rdb_tbl_def *new_rec;
4257 bool res = true;
4258 uchar new_buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
4259 uint new_pos = 0;
4260
4261 mysql_rwlock_wrlock(&m_rwlock);
4262 if (!(rec = find(from, false))) {
4263 mysql_rwlock_unlock(&m_rwlock);
4264 return true;
4265 }
4266
4267 new_rec = new Rdb_tbl_def(to);
4268
4269 new_rec->m_key_count = rec->m_key_count;
4270 new_rec->m_auto_incr_val =
4271 rec->m_auto_incr_val.load(std::memory_order_relaxed);
4272 new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4273
4274 new_rec->m_hidden_pk_val =
4275 rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4276
4277 // so that it's not free'd when deleting the old rec
4278 rec->m_key_descr_arr = nullptr;
4279
4280 // Create a new key
4281 rdb_netbuf_store_index(new_buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4282 new_pos += Rdb_key_def::INDEX_NUMBER_SIZE;
4283
4284 const std::string &dbname_tablename = new_rec->full_tablename();
4285 memcpy(new_buf + new_pos, dbname_tablename.c_str(), dbname_tablename.size());
4286 new_pos += dbname_tablename.size();
4287
4288 // Create a key to add
4289 if (!new_rec->put_dict(m_dict, batch, new_buf, new_pos)) {
4290 remove(rec, batch, false);
4291 put(new_rec, false);
4292 res = false; // ok
4293 }
4294
4295 mysql_rwlock_unlock(&m_rwlock);
4296 return res;
4297}
4298
4299void Rdb_ddl_manager::cleanup() {
4300 my_hash_free(&m_ddl_hash);
4301 mysql_rwlock_destroy(&m_rwlock);
4302 m_sequence.cleanup();
4303}
4304
4305int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4306 int i, ret;
4307 Rdb_tbl_def *rec;
4308
4309 DBUG_ASSERT(tables_scanner != nullptr);
4310
4311 mysql_rwlock_rdlock(&m_rwlock);
4312
4313 ret = 0;
4314 i = 0;
4315
4316 while ((
4317 rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_element(&m_ddl_hash, i)))) {
4318 ret = tables_scanner->add_table(rec);
4319 if (ret)
4320 break;
4321 i++;
4322 }
4323
4324 mysql_rwlock_unlock(&m_rwlock);
4325 return ret;
4326}
4327
4328/*
4329 Rdb_binlog_manager class implementation
4330*/
4331
4332bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4333 DBUG_ASSERT(dict_arg != nullptr);
4334 m_dict = dict_arg;
4335
4336 rdb_netbuf_store_index(m_key_buf, Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4337 m_key_slice = rocksdb::Slice(reinterpret_cast<char *>(m_key_buf),
4338 Rdb_key_def::INDEX_NUMBER_SIZE);
4339 return false;
4340}
4341
4342void Rdb_binlog_manager::cleanup() {}
4343
4344/**
4345 Set binlog name, pos and optionally gtid into WriteBatch.
4346 This function should be called as part of transaction commit,
4347 since binlog info is set only at transaction commit.
4348 Actual write into RocksDB is not done here, so checking if
4349 write succeeded or not is not possible here.
4350 @param binlog_name Binlog name
4351 @param binlog_pos Binlog pos
4352 @param batch WriteBatch
4353*/
4354void Rdb_binlog_manager::update(const char *const binlog_name,
4355 const my_off_t binlog_pos,
4356 rocksdb::WriteBatchBase *const batch) {
4357 if (binlog_name && binlog_pos) {
4358 // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4359 const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4360 uchar value_buf[RDB_MAX_BINLOG_INFO_LEN];
4361 m_dict->put_key(
4362 batch, m_key_slice,
4363 pack_value(value_buf, binlog_name, binlog_pos, NULL));
4364 }
4365}
4366
4367/**
4368 Read binlog committed entry stored in RocksDB, then unpack
4369 @param[OUT] binlog_name Binlog name
4370 @param[OUT] binlog_pos Binlog pos
4371 @param[OUT] binlog_gtid Binlog GTID
4372 @return
4373 true is binlog info was found (valid behavior)
4374 false otherwise
4375*/
4376bool Rdb_binlog_manager::read(char *const binlog_name,
4377 my_off_t *const binlog_pos,
4378 char *const binlog_gtid) const {
4379 bool ret = false;
4380 if (binlog_name) {
4381 std::string value;
4382 rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4383 if (status.ok()) {
4384 if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4385 binlog_gtid))
4386 ret = true;
4387 }
4388 }
4389 return ret;
4390}
4391
4392/**
4393 Pack binlog_name, binlog_pos, binlog_gtid into preallocated
4394 buffer, then converting and returning a RocksDB Slice
4395 @param buf Preallocated buffer to set binlog info.
4396 @param binlog_name Binlog name
4397 @param binlog_pos Binlog pos
4398 @return rocksdb::Slice converted from buf and its length
4399*/
4400rocksdb::Slice
4401Rdb_binlog_manager::pack_value(uchar *const buf, const char *const binlog_name,
4402 const my_off_t &binlog_pos,
4403 const char *const binlog_gtid) const {
4404 uint pack_len = 0;
4405
4406 // store version
4407 rdb_netbuf_store_uint16(buf, Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4408 pack_len += Rdb_key_def::VERSION_SIZE;
4409
4410 // store binlog file name length
4411 DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4412 const uint16_t binlog_name_len = (uint16_t)strlen(binlog_name);
4413 rdb_netbuf_store_uint16(buf + pack_len, binlog_name_len);
4414 pack_len += sizeof(uint16);
4415
4416 // store binlog file name
4417 memcpy(buf + pack_len, binlog_name, binlog_name_len);
4418 pack_len += binlog_name_len;
4419
4420 // store binlog pos
4421 rdb_netbuf_store_uint32(buf + pack_len, binlog_pos);
4422 pack_len += sizeof(uint32);
4423
4424 // store binlog gtid length.
4425 // If gtid was not set, store 0 instead
4426#ifdef MARIAROCKS_NOT_YET
4427 const uint16_t binlog_gtid_len = binlog_gtid ? (uint16_t)strlen(binlog_gtid) : 0;
4428 rdb_netbuf_store_uint16(buf + pack_len, binlog_gtid_len);
4429#endif
4430 pack_len += sizeof(uint16);
4431 // MariaDB:
4432 rdb_netbuf_store_uint16(buf + pack_len, 0);
4433
4434#ifdef MARIAROCKS_NOT_YET
4435 if (binlog_gtid_len > 0) {
4436 // store binlog gtid
4437 memcpy(buf + pack_len, binlog_gtid, binlog_gtid_len);
4438 pack_len += binlog_gtid_len;
4439 }
4440#endif
4441
4442 return rocksdb::Slice((char *)buf, pack_len);
4443}
4444
4445/**
4446 Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4447 @param[IN] value Binlog state info fetched from RocksDB
4448 @param[OUT] binlog_name Binlog name
4449 @param[OUT] binlog_pos Binlog pos
4450 @param[OUT] binlog_gtid Binlog GTID
4451 @return true on error
4452*/
4453bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4454 size_t value_size_arg,
4455 char *const binlog_name,
4456 my_off_t *const binlog_pos,
4457 char *const binlog_gtid) const {
4458 uint pack_len = 0;
4459 intmax_t value_size= value_size_arg;
4460
4461 DBUG_ASSERT(binlog_pos != nullptr);
4462
4463 if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4464 return true;
4465 // read version
4466 const uint16_t version = rdb_netbuf_to_uint16(value);
4467
4468 pack_len += Rdb_key_def::VERSION_SIZE;
4469 if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION)
4470 return true;
4471
4472 if ((value_size -= sizeof(uint16)) < 0)
4473 return true;
4474
4475 // read binlog file name length
4476 const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4477 pack_len += sizeof(uint16);
4478
4479 if (binlog_name_len >= (FN_REFLEN+1))
4480 return true;
4481
4482 if ((value_size -= binlog_name_len) < 0)
4483 return true;
4484
4485 if (binlog_name_len) {
4486 // read and set binlog name
4487 memcpy(binlog_name, value + pack_len, binlog_name_len);
4488 binlog_name[binlog_name_len] = '\0';
4489 pack_len += binlog_name_len;
4490
4491 if ((value_size -= sizeof(uint32)) < 0)
4492 return true;
4493 // read and set binlog pos
4494 *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4495 pack_len += sizeof(uint32);
4496
4497 if ((value_size -= sizeof(uint16)) < 0)
4498 return true;
4499 // read gtid length
4500 const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4501 pack_len += sizeof(uint16);
4502
4503 if (binlog_gtid_len >= GTID_BUF_LEN)
4504 return true;
4505 if ((value_size -= binlog_gtid_len) < 0)
4506 return true;
4507
4508 if (binlog_gtid && binlog_gtid_len > 0) {
4509 // read and set gtid
4510 memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4511 binlog_gtid[binlog_gtid_len] = '\0';
4512 pack_len += binlog_gtid_len;
4513 }
4514 }
4515 return false;
4516}
4517
4518/**
4519 Inserts a row into mysql.slave_gtid_info table. Doing this inside
4520 storage engine is more efficient than inserting/updating through MySQL.
4521
4522 @param[IN] id Primary key of the table.
4523 @param[IN] db Database name. This is column 2 of the table.
4524 @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4525 @param[IN] write_batch Handle to storage engine writer.
4526*/
4527void Rdb_binlog_manager::update_slave_gtid_info(
4528 const uint &id, const char *const db, const char *const gtid,
4529 rocksdb::WriteBatchBase *const write_batch) {
4530 if (id && db && gtid) {
4531 // Make sure that if the slave_gtid_info table exists we have a
4532 // pointer to it via m_slave_gtid_info_tbl.
4533 if (!m_slave_gtid_info_tbl.load()) {
4534 m_slave_gtid_info_tbl.store(
4535 rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4536 }
4537 if (!m_slave_gtid_info_tbl.load()) {
4538 // slave_gtid_info table is not present. Simply return.
4539 return;
4540 }
4541 DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4542
4543 const std::shared_ptr<const Rdb_key_def> &kd =
4544 m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4545 String value;
4546
4547 // Build key
4548 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE + 4] = {0};
4549 uchar *buf = key_buf;
4550 rdb_netbuf_store_index(buf, kd->get_index_number());
4551 buf += Rdb_key_def::INDEX_NUMBER_SIZE;
4552 rdb_netbuf_store_uint32(buf, id);
4553 buf += 4;
4554 const rocksdb::Slice key_slice =
4555 rocksdb::Slice((const char *)key_buf, buf - key_buf);
4556
4557 // Build value
4558 uchar value_buf[128] = {0};
4559 DBUG_ASSERT(gtid);
4560 const uint db_len = strlen(db);
4561 const uint gtid_len = strlen(gtid);
4562 buf = value_buf;
4563 // 1 byte used for flags. Empty here.
4564 buf++;
4565
4566 // Write column 1.
4567 DBUG_ASSERT(strlen(db) <= 64);
4568 rdb_netbuf_store_byte(buf, db_len);
4569 buf++;
4570 memcpy(buf, db, db_len);
4571 buf += db_len;
4572
4573 // Write column 2.
4574 DBUG_ASSERT(gtid_len <= 56);
4575 rdb_netbuf_store_byte(buf, gtid_len);
4576 buf++;
4577 memcpy(buf, gtid, gtid_len);
4578 buf += gtid_len;
4579 const rocksdb::Slice value_slice =
4580 rocksdb::Slice((const char *)value_buf, buf - value_buf);
4581
4582 write_batch->Put(kd->get_cf(), key_slice, value_slice);
4583 }
4584}
4585
4586bool Rdb_dict_manager::init(rocksdb::DB *const rdb_dict,
4587 Rdb_cf_manager *const cf_manager) {
4588 DBUG_ASSERT(rdb_dict != nullptr);
4589 DBUG_ASSERT(cf_manager != nullptr);
4590
4591 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4592
4593 m_db = rdb_dict;
4594
4595 m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4596 rocksdb::ColumnFamilyHandle *default_cfh =
4597 cf_manager->get_cf(DEFAULT_CF_NAME);
4598
4599 // System CF and default CF should be initialized
4600 if (m_system_cfh == nullptr || default_cfh == nullptr) {
4601 return HA_EXIT_FAILURE;
4602 }
4603
4604 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4605
4606 m_key_slice_max_index_id =
4607 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4608 Rdb_key_def::INDEX_NUMBER_SIZE);
4609
4610 resume_drop_indexes();
4611 rollback_ongoing_index_creation();
4612
4613 // Initialize system CF and default CF flags
4614 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4615 rocksdb::WriteBatch *const batch = wb.get();
4616
4617 add_cf_flags(batch, m_system_cfh->GetID(), 0);
4618 add_cf_flags(batch, default_cfh->GetID(), 0);
4619 commit(batch);
4620
4621 return HA_EXIT_SUCCESS;
4622}
4623
4624std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4625 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4626}
4627
4628void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4629 const rocksdb::Slice &key,
4630 const rocksdb::Slice &value) const {
4631 batch->Put(m_system_cfh, key, value);
4632}
4633
4634rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4635 std::string *const value) const {
4636 rocksdb::ReadOptions options;
4637 options.total_order_seek = true;
4638 return m_db->Get(options, m_system_cfh, key, value);
4639}
4640
4641void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4642 const rocksdb::Slice &key) const {
4643 batch->Delete(m_system_cfh, key);
4644}
4645
4646rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4647 /* Reading data dictionary should always skip bloom filter */
4648 rocksdb::ReadOptions read_options;
4649 read_options.total_order_seek = true;
4650 return m_db->NewIterator(read_options, m_system_cfh);
4651}
4652
4653int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4654 const bool &sync) const {
4655 if (!batch)
4656 return HA_ERR_ROCKSDB_COMMIT_FAILED;
4657 int res = HA_EXIT_SUCCESS;
4658 rocksdb::WriteOptions options;
4659 options.sync = sync;
4660 rocksdb::Status s = m_db->Write(options, batch);
4661 res = !s.ok(); // we return true when something failed
4662 if (res) {
4663 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4664 }
4665 batch->Clear();
4666 return res;
4667}
4668
4669void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4670 Rdb_key_def::DATA_DICT_TYPE dict_type,
4671 const GL_INDEX_ID &gl_index_id) {
4672 rdb_netbuf_store_uint32(netbuf, dict_type);
4673 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4674 gl_index_id.cf_id);
4675 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4676 gl_index_id.index_id);
4677}
4678
4679void Rdb_dict_manager::delete_with_prefix(
4680 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4681 const GL_INDEX_ID &gl_index_id) const {
4682 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
4683 dump_index_id(key_buf, dict_type, gl_index_id);
4684 rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4685
4686 delete_key(batch, key);
4687}
4688
4689void Rdb_dict_manager::add_or_update_index_cf_mapping(
4690 rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4691 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
4692 uchar value_buf[256] = {0};
4693 dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, index_info->m_gl_index_id);
4694 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4695
4696 uchar *ptr = value_buf;
4697 rdb_netbuf_store_uint16(ptr, Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4698 ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4699 rdb_netbuf_store_byte(ptr, index_info->m_index_type);
4700 ptr += RDB_SIZEOF_INDEX_TYPE;
4701 rdb_netbuf_store_uint16(ptr, index_info->m_kv_version);
4702 ptr += RDB_SIZEOF_KV_VERSION;
4703 rdb_netbuf_store_uint32(ptr, index_info->m_index_flags);
4704 ptr += RDB_SIZEOF_INDEX_FLAGS;
4705 rdb_netbuf_store_uint64(ptr, index_info->m_ttl_duration);
4706 ptr += ROCKSDB_SIZEOF_TTL_RECORD;
4707
4708 const rocksdb::Slice value =
4709 rocksdb::Slice((char *)value_buf, ptr - value_buf);
4710 batch->Put(m_system_cfh, key, value);
4711}
4712
4713void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4714 const uint32_t &cf_id,
4715 const uint32_t &cf_flags) const {
4716 DBUG_ASSERT(batch != nullptr);
4717
4718 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
4719 uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
4720 {0};
4721 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
4722 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
4723 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4724
4725 rdb_netbuf_store_uint16(value_buf, Rdb_key_def::CF_DEFINITION_VERSION);
4726 rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, cf_flags);
4727 const rocksdb::Slice value =
4728 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
4729 batch->Put(m_system_cfh, key, value);
4730}
4731
4732void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4733 const GL_INDEX_ID &gl_index_id) const {
4734 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4735 delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4736 delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4737}
4738
4739bool Rdb_dict_manager::get_index_info(
4740 const GL_INDEX_ID &gl_index_id,
4741 struct Rdb_index_info *const index_info) const {
4742
4743 if (index_info) {
4744 index_info->m_gl_index_id = gl_index_id;
4745 }
4746
4747 bool found = false;
4748 bool error = false;
4749 std::string value;
4750 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
4751 dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, gl_index_id);
4752 const rocksdb::Slice &key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4753
4754 const rocksdb::Status &status = get_value(key, &value);
4755 if (status.ok()) {
4756 if (!index_info) {
4757 return true;
4758 }
4759
4760 const uchar *const val = (const uchar *)value.c_str();
4761 const uchar *ptr = val;
4762 index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4763 ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4764
4765 switch (index_info->m_index_dict_version) {
4766 case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4767 /* Sanity check to prevent reading bogus TTL record. */
4768 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4769 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4770 RDB_SIZEOF_INDEX_FLAGS +
4771 ROCKSDB_SIZEOF_TTL_RECORD) {
4772 error = true;
4773 break;
4774 }
4775 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4776 ptr += RDB_SIZEOF_INDEX_TYPE;
4777 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4778 ptr += RDB_SIZEOF_KV_VERSION;
4779 index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4780 ptr += RDB_SIZEOF_INDEX_FLAGS;
4781 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4782 found = true;
4783 break;
4784
4785 case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4786 /* Sanity check to prevent reading bogus into TTL record. */
4787 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4788 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4789 ROCKSDB_SIZEOF_TTL_RECORD) {
4790 error = true;
4791 break;
4792 }
4793 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4794 ptr += RDB_SIZEOF_INDEX_TYPE;
4795 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4796 ptr += RDB_SIZEOF_KV_VERSION;
4797 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4798 if ((index_info->m_kv_version ==
4799 Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4800 index_info->m_ttl_duration > 0) {
4801 index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4802 }
4803 found = true;
4804 break;
4805
4806 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4807 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4808 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4809 ptr += RDB_SIZEOF_INDEX_TYPE;
4810 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4811 found = true;
4812 break;
4813
4814 default:
4815 error = true;
4816 break;
4817 }
4818
4819 switch (index_info->m_index_type) {
4820 case Rdb_key_def::INDEX_TYPE_PRIMARY:
4821 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4822 error =
4823 index_info->m_kv_version > Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4824 break;
4825 }
4826 case Rdb_key_def::INDEX_TYPE_SECONDARY:
4827 error = index_info->m_kv_version >
4828 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4829 break;
4830 default:
4831 error = true;
4832 break;
4833 }
4834 }
4835
4836 if (error) {
4837 // NO_LINT_DEBUG
4838 sql_print_error(
4839 "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4840 "from data dictionary. This should never happen "
4841 "and it may be a bug.",
4842 index_info->m_index_dict_version, index_info->m_index_type,
4843 index_info->m_kv_version, index_info->m_ttl_duration);
4844 abort();
4845 }
4846
4847 return found;
4848}
4849
4850bool Rdb_dict_manager::get_cf_flags(const uint32_t &cf_id,
4851 uint32_t *const cf_flags) const {
4852 DBUG_ASSERT(cf_flags != nullptr);
4853
4854 bool found = false;
4855 std::string value;
4856 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
4857
4858 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
4859 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
4860
4861 const rocksdb::Slice key =
4862 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
4863 const rocksdb::Status status = get_value(key, &value);
4864
4865 if (status.ok()) {
4866 const uchar *val = (const uchar *)value.c_str();
4867 DBUG_ASSERT(val);
4868
4869 const uint16_t version = rdb_netbuf_to_uint16(val);
4870
4871 if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4872 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4873 found = true;
4874 }
4875 }
4876
4877 return found;
4878}
4879
4880/*
4881 Returning index ids that were marked as deleted (via DROP TABLE) but
4882 still not removed by drop_index_thread yet, or indexes that are marked as
4883 ongoing creation.
4884 */
4885void Rdb_dict_manager::get_ongoing_index_operation(
4886 std::unordered_set<GL_INDEX_ID> *gl_index_ids,
4887 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4888 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4889 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
4890
4891 uchar index_buf[Rdb_key_def::INDEX_NUMBER_SIZE];
4892 rdb_netbuf_store_uint32(index_buf, dd_type);
4893 const rocksdb::Slice index_slice(reinterpret_cast<char *>(index_buf),
4894 Rdb_key_def::INDEX_NUMBER_SIZE);
4895
4896 rocksdb::Iterator *it = new_iterator();
4897 for (it->Seek(index_slice); it->Valid(); it->Next()) {
4898 rocksdb::Slice key = it->key();
4899 const uchar *const ptr = (const uchar *)key.data();
4900
4901 /*
4902 Ongoing drop/create index operations require key to be of the form:
4903 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
4904
4905 This may need to be changed in the future if we want to process a new
4906 ddl_type with different format.
4907 */
4908 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
4909 rdb_netbuf_to_uint32(ptr) != dd_type) {
4910 break;
4911 }
4912
4913 // We don't check version right now since currently we always store only
4914 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
4915 // If increasing version number, we need to add version check logic here.
4916 GL_INDEX_ID gl_index_id;
4917 gl_index_id.cf_id =
4918 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
4919 gl_index_id.index_id =
4920 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
4921 gl_index_ids->insert(gl_index_id);
4922 }
4923 delete it;
4924}
4925
4926/*
4927 Returning true if index_id is create/delete ongoing (undergoing creation or
4928 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
4929 or not.
4930 */
4931bool Rdb_dict_manager::is_index_operation_ongoing(
4932 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4933 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4934 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
4935
4936 bool found = false;
4937 std::string value;
4938 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
4939 dump_index_id(key_buf, dd_type, gl_index_id);
4940 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4941
4942 const rocksdb::Status status = get_value(key, &value);
4943 if (status.ok()) {
4944 found = true;
4945 }
4946 return found;
4947}
4948
4949/*
4950 Adding index_id to data dictionary so that the index id is removed
4951 by drop_index_thread, or to track online index creation.
4952 */
4953void Rdb_dict_manager::start_ongoing_index_operation(
4954 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
4955 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4956 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4957 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
4958
4959 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
4960 uchar value_buf[Rdb_key_def::VERSION_SIZE] = {0};
4961 dump_index_id(key_buf, dd_type, gl_index_id);
4962
4963 // version as needed
4964 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
4965 rdb_netbuf_store_uint16(value_buf,
4966 Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
4967 } else {
4968 rdb_netbuf_store_uint16(value_buf,
4969 Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
4970 }
4971
4972 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
4973 const rocksdb::Slice value =
4974 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
4975 batch->Put(m_system_cfh, key, value);
4976}
4977
4978/*
4979 Removing index_id from data dictionary to confirm drop_index_thread
4980 completed dropping entire key/values of the index_id
4981 */
4982void Rdb_dict_manager::end_ongoing_index_operation(
4983 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
4984 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4985 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4986 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
4987
4988 delete_with_prefix(batch, dd_type, gl_index_id);
4989}
4990
4991/*
4992 Returning true if there is no target index ids to be removed
4993 by drop_index_thread
4994 */
4995bool Rdb_dict_manager::is_drop_index_empty() const {
4996 std::unordered_set<GL_INDEX_ID> gl_index_ids;
4997 get_ongoing_drop_indexes(&gl_index_ids);
4998 return gl_index_ids.empty();
4999}
5000
5001/*
5002 This function is supposed to be called by DROP TABLE. Logging messages
5003 that dropping indexes started, and adding data dictionary so that
5004 all associated indexes to be removed
5005 */
5006void Rdb_dict_manager::add_drop_table(
5007 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
5008 rocksdb::WriteBatch *const batch) const {
5009 std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5010 for (uint32 i = 0; i < n_keys; i++) {
5011 dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5012 }
5013
5014 add_drop_index(dropped_index_ids, batch);
5015}
5016
5017/*
5018 Called during inplace index drop operations. Logging messages
5019 that dropping indexes started, and adding data dictionary so that
5020 all associated indexes to be removed
5021 */
5022void Rdb_dict_manager::add_drop_index(
5023 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5024 rocksdb::WriteBatch *const batch) const {
5025 for (const auto &gl_index_id : gl_index_ids) {
5026 log_start_drop_index(gl_index_id, "Begin");
5027 start_drop_index(batch, gl_index_id);
5028 }
5029}
5030
5031/*
5032 Called during inplace index creation operations. Logging messages
5033 that adding indexes started, and updates data dictionary with all associated
5034 indexes to be added.
5035 */
5036void Rdb_dict_manager::add_create_index(
5037 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5038 rocksdb::WriteBatch *const batch) const {
5039 for (const auto &gl_index_id : gl_index_ids) {
5040 // NO_LINT_DEBUG
5041 sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5042 gl_index_id.cf_id, gl_index_id.index_id);
5043 start_create_index(batch, gl_index_id);
5044 }
5045}
5046
5047/*
5048 This function is supposed to be called by drop_index_thread, when it
5049 finished dropping any index, or at the completion of online index creation.
5050 */
5051void Rdb_dict_manager::finish_indexes_operation(
5052 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5053 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5054 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5055 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5056
5057 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5058 rocksdb::WriteBatch *const batch = wb.get();
5059
5060 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5061 get_ongoing_create_indexes(&incomplete_create_indexes);
5062
5063 for (const auto &gl_index_id : gl_index_ids) {
5064 if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5065 end_ongoing_index_operation(batch, gl_index_id, dd_type);
5066
5067 /*
5068 Remove the corresponding incomplete create indexes from data
5069 dictionary as well
5070 */
5071 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5072 if (incomplete_create_indexes.count(gl_index_id)) {
5073 end_ongoing_index_operation(batch, gl_index_id,
5074 Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5075 }
5076 }
5077 }
5078
5079 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5080 delete_index_info(batch, gl_index_id);
5081 }
5082 }
5083 commit(batch);
5084}
5085
5086/*
5087 This function is supposed to be called when initializing
5088 Rdb_dict_manager (at startup). If there is any index ids that are
5089 drop ongoing, printing out messages for diagnostics purposes.
5090 */
5091void Rdb_dict_manager::resume_drop_indexes() const {
5092 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5093 get_ongoing_drop_indexes(&gl_index_ids);
5094
5095 uint max_index_id_in_dict = 0;
5096 get_max_index_id(&max_index_id_in_dict);
5097
5098 for (const auto &gl_index_id : gl_index_ids) {
5099 log_start_drop_index(gl_index_id, "Resume");
5100 if (max_index_id_in_dict < gl_index_id.index_id) {
5101 sql_print_error("RocksDB: Found max index id %u from data dictionary "
5102 "but also found dropped index id (%u,%u) from drop_index "
5103 "dictionary. This should never happen and is possibly a "
5104 "bug.",
5105 max_index_id_in_dict, gl_index_id.cf_id,
5106 gl_index_id.index_id);
5107 abort();
5108 }
5109 }
5110}
5111
5112void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5113 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5114 rocksdb::WriteBatch *const batch = wb.get();
5115
5116 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5117 get_ongoing_create_indexes(&gl_index_ids);
5118
5119 for (const auto &gl_index_id : gl_index_ids) {
5120 // NO_LINT_DEBUG
5121 sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5122 gl_index_id.cf_id, gl_index_id.index_id);
5123
5124 start_drop_index(batch, gl_index_id);
5125 }
5126
5127 commit(batch);
5128}
5129
5130void Rdb_dict_manager::log_start_drop_table(
5131 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
5132 const char *const log_action) const {
5133 for (uint32 i = 0; i < n_keys; i++) {
5134 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5135 }
5136}
5137
5138void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5139 const char *log_action) const {
5140 struct Rdb_index_info index_info;
5141 if (!get_index_info(gl_index_id, &index_info)) {
5142 /*
5143 If we don't find the index info, it could be that it's because it was a
5144 partially created index that isn't in the data dictionary yet that needs
5145 to be rolled back.
5146 */
5147 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5148 get_ongoing_create_indexes(&incomplete_create_indexes);
5149
5150 if (!incomplete_create_indexes.count(gl_index_id)) {
5151 /* If it's not a partially created index, something is very wrong. */
5152 sql_print_error("RocksDB: Failed to get column family info "
5153 "from index id (%u,%u). MyRocks data dictionary may "
5154 "get corrupted.",
5155 gl_index_id.cf_id, gl_index_id.index_id);
5156 abort();
5157 }
5158 }
5159}
5160
5161bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5162 bool found = false;
5163 std::string value;
5164
5165 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5166 if (status.ok()) {
5167 const uchar *const val = (const uchar *)value.c_str();
5168 const uint16_t &version = rdb_netbuf_to_uint16(val);
5169 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5170 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5171 found = true;
5172 }
5173 }
5174 return found;
5175}
5176
5177bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5178 const uint32_t &index_id) const {
5179 DBUG_ASSERT(batch != nullptr);
5180
5181 uint32_t old_index_id = -1;
5182 if (get_max_index_id(&old_index_id)) {
5183 if (old_index_id > index_id) {
5184 sql_print_error("RocksDB: Found max index id %u from data dictionary "
5185 "but trying to update to older value %u. This should "
5186 "never happen and possibly a bug.",
5187 old_index_id, index_id);
5188 return true;
5189 }
5190 }
5191
5192 uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
5193 {0};
5194 rdb_netbuf_store_uint16(value_buf, Rdb_key_def::MAX_INDEX_ID_VERSION);
5195 rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, index_id);
5196 const rocksdb::Slice value =
5197 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
5198 batch->Put(m_system_cfh, m_key_slice_max_index_id, value);
5199 return false;
5200}
5201
5202void Rdb_dict_manager::add_stats(
5203 rocksdb::WriteBatch *const batch,
5204 const std::vector<Rdb_index_stats> &stats) const {
5205 DBUG_ASSERT(batch != nullptr);
5206
5207 for (const auto &it : stats) {
5208 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
5209 dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5210
5211 // IndexStats::materialize takes complete care of serialization including
5212 // storing the version
5213 const auto value =
5214 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5215
5216 batch->Put(m_system_cfh, rocksdb::Slice((char *)key_buf, sizeof(key_buf)),
5217 value);
5218 }
5219}
5220
5221Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5222 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
5223 dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5224
5225 std::string value;
5226 const rocksdb::Status status = get_value(
5227 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)),
5228 &value);
5229 if (status.ok()) {
5230 std::vector<Rdb_index_stats> v;
5231 // unmaterialize checks if the version matches
5232 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5233 return v[0];
5234 }
5235 }
5236
5237 return Rdb_index_stats();
5238}
5239
5240rocksdb::Status
5241Rdb_dict_manager::put_auto_incr_val(rocksdb::WriteBatchBase *batch,
5242 const GL_INDEX_ID &gl_index_id,
5243 ulonglong val, bool overwrite) const {
5244 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
5245 dump_index_id(key_buf, Rdb_key_def::AUTO_INC, gl_index_id);
5246 const rocksdb::Slice key =
5247 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5248
5249 // Value is constructed by storing the version and the value.
5250 uchar value_buf[RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5251 ROCKSDB_SIZEOF_AUTOINC_VALUE] = {0};
5252 uchar *ptr = value_buf;
5253 rdb_netbuf_store_uint16(ptr, Rdb_key_def::AUTO_INCREMENT_VERSION);
5254 ptr += RDB_SIZEOF_AUTO_INCREMENT_VERSION;
5255 rdb_netbuf_store_uint64(ptr, val);
5256 ptr += ROCKSDB_SIZEOF_AUTOINC_VALUE;
5257 const rocksdb::Slice value =
5258 rocksdb::Slice(reinterpret_cast<char *>(value_buf), ptr - value_buf);
5259
5260 if (overwrite) {
5261 return batch->Put(m_system_cfh, key, value);
5262 }
5263 return batch->Merge(m_system_cfh, key, value);
5264}
5265
5266bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5267 ulonglong *new_val) const {
5268 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
5269 dump_index_id(key_buf, Rdb_key_def::AUTO_INC, gl_index_id);
5270
5271 std::string value;
5272 const rocksdb::Status status = get_value(
5273 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)),
5274 &value);
5275
5276 if (status.ok()) {
5277 const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5278
5279 if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5280 *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5281 return true;
5282 }
5283 }
5284 return false;
5285}
5286
5287uint Rdb_seq_generator::get_and_update_next_number(
5288 Rdb_dict_manager *const dict) {
5289 DBUG_ASSERT(dict != nullptr);
5290
5291 uint res;
5292 RDB_MUTEX_LOCK_CHECK(m_mutex);
5293
5294 res = m_next_number++;
5295
5296 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5297 rocksdb::WriteBatch *const batch = wb.get();
5298
5299 DBUG_ASSERT(batch != nullptr);
5300 dict->update_max_index_id(batch, res);
5301 dict->commit(batch);
5302
5303 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5304
5305 return res;
5306}
5307
5308} // namespace myrocks
5309