1/*****************************************************************************
2
3Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free Software
7Foundation; version 2 of the License.
8
9This program is distributed in the hope that it will be useful, but WITHOUT
10ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13You should have received a copy of the GNU General Public License along with
14this program; if not, write to the Free Software Foundation, Inc.,
1551 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16
17*****************************************************************************/
18
19/*******************************************************************//**
20@file rem/rem0cmp.cc
21Comparison services for records
22
23Created 7/1/1994 Heikki Tuuri
24************************************************************************/
25
26#include "ha_prototypes.h"
27
28#include "rem0cmp.h"
29#include "handler0alter.h"
30#include "srv0srv.h"
31
32#include <gstream.h>
33#include <spatial.h>
34#include <gis0geo.h>
35#include <page0cur.h>
36#include <algorithm>
37
38/* ALPHABETICAL ORDER
39 ==================
40
41The records are put into alphabetical order in the following
42way: let F be the first field where two records disagree.
43If there is a character in some position n where the
44records disagree, the order is determined by comparison of
45the characters at position n, possibly after
46collating transformation. If there is no such character,
47but the corresponding fields have different lengths, then
48if the data type of the fields is paddable,
49shorter field is padded with a padding character. If the
50data type is not paddable, longer field is considered greater.
51Finally, the SQL null is bigger than any other value.
52
53At the present, the comparison functions return 0 in the case,
54where two records disagree only in the way that one
55has more fields than the other. */
56
57/** Compare two data fields.
58@param[in] prtype precise type
59@param[in] a data field
60@param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
61@param[in] b data field
62@param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
63@return positive, 0, negative, if a is greater, equal, less than b,
64respectively */
65UNIV_INLINE
66int
67innobase_mysql_cmp(
68 ulint prtype,
69 const byte* a,
70 unsigned int a_length,
71 const byte* b,
72 unsigned int b_length)
73{
74#ifdef UNIV_DEBUG
75 switch (prtype & DATA_MYSQL_TYPE_MASK) {
76 case MYSQL_TYPE_BIT:
77 case MYSQL_TYPE_STRING:
78 case MYSQL_TYPE_VAR_STRING:
79 case MYSQL_TYPE_TINY_BLOB:
80 case MYSQL_TYPE_MEDIUM_BLOB:
81 case MYSQL_TYPE_BLOB:
82 case MYSQL_TYPE_LONG_BLOB:
83 case MYSQL_TYPE_VARCHAR:
84 break;
85 default:
86 ut_error;
87 }
88#endif /* UNIV_DEBUG */
89
90 uint cs_num = (uint) dtype_get_charset_coll(prtype);
91
92 if (CHARSET_INFO* cs = get_charset(cs_num, MYF(MY_WME))) {
93 return(cs->coll->strnncollsp(
94 cs, a, a_length, b, b_length));
95 }
96
97 ib::fatal() << "Unable to find charset-collation " << cs_num;
98 return(0);
99}
100
101/*************************************************************//**
102Returns TRUE if two columns are equal for comparison purposes.
103@return TRUE if the columns are considered equal in comparisons */
104ibool
105cmp_cols_are_equal(
106/*===============*/
107 const dict_col_t* col1, /*!< in: column 1 */
108 const dict_col_t* col2, /*!< in: column 2 */
109 ibool check_charsets)
110 /*!< in: whether to check charsets */
111{
112 if (dtype_is_non_binary_string_type(col1->mtype, col1->prtype)
113 && dtype_is_non_binary_string_type(col2->mtype, col2->prtype)) {
114
115 /* Both are non-binary string types: they can be compared if
116 and only if the charset-collation is the same */
117
118 if (check_charsets) {
119 return(dtype_get_charset_coll(col1->prtype)
120 == dtype_get_charset_coll(col2->prtype));
121 } else {
122 return(TRUE);
123 }
124 }
125
126 if (dtype_is_binary_string_type(col1->mtype, col1->prtype)
127 && dtype_is_binary_string_type(col2->mtype, col2->prtype)) {
128
129 /* Both are binary string types: they can be compared */
130
131 return(TRUE);
132 }
133
134 if (col1->mtype != col2->mtype) {
135
136 return(FALSE);
137 }
138
139 if (col1->mtype == DATA_INT
140 && (col1->prtype & DATA_UNSIGNED)
141 != (col2->prtype & DATA_UNSIGNED)) {
142
143 /* The storage format of an unsigned integer is different
144 from a signed integer: in a signed integer we OR
145 0x8000... to the value of positive integers. */
146
147 return(FALSE);
148 }
149
150 return(col1->mtype != DATA_INT || col1->len == col2->len);
151}
152
153/** Compare two DATA_DECIMAL (MYSQL_TYPE_DECIMAL) fields.
154TODO: Remove this function. Everything should use MYSQL_TYPE_NEWDECIMAL.
155@param[in] a data field
156@param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
157@param[in] b data field
158@param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
159@return positive, 0, negative, if a is greater, equal, less than b,
160respectively */
161static ATTRIBUTE_COLD
162int
163cmp_decimal(
164 const byte* a,
165 unsigned int a_length,
166 const byte* b,
167 unsigned int b_length)
168{
169 int swap_flag;
170
171 /* Remove preceding spaces */
172 for (; a_length && *a == ' '; a++, a_length--) { }
173 for (; b_length && *b == ' '; b++, b_length--) { }
174
175 if (*a == '-') {
176 swap_flag = -1;
177
178 if (*b != '-') {
179 return(swap_flag);
180 }
181
182 a++; b++;
183 a_length--;
184 b_length--;
185 } else {
186 swap_flag = 1;
187
188 if (*b == '-') {
189 return(swap_flag);
190 }
191 }
192
193 while (a_length > 0 && (*a == '+' || *a == '0')) {
194 a++; a_length--;
195 }
196
197 while (b_length > 0 && (*b == '+' || *b == '0')) {
198 b++; b_length--;
199 }
200
201 if (a_length != b_length) {
202 if (a_length < b_length) {
203 return(-swap_flag);
204 }
205
206 return(swap_flag);
207 }
208
209 while (a_length > 0 && *a == *b) {
210
211 a++; b++; a_length--;
212 }
213
214 if (a_length == 0) {
215 return(0);
216 }
217
218 if (*a <= *b) {
219 swap_flag = -swap_flag;
220 }
221
222 return(swap_flag);
223}
224
225/*************************************************************//**
226Innobase uses this function to compare two geometry data fields
227@return 1, 0, -1, if a is greater, equal, less than b, respectively */
228static
229int
230cmp_geometry_field(
231/*===============*/
232 ulint prtype, /*!< in: precise type */
233 const byte* a, /*!< in: data field */
234 unsigned int a_length, /*!< in: data field length,
235 not UNIV_SQL_NULL */
236 const byte* b, /*!< in: data field */
237 unsigned int b_length) /*!< in: data field length,
238 not UNIV_SQL_NULL */
239{
240 double x1, x2;
241 double y1, y2;
242
243 ut_ad(prtype & DATA_GIS_MBR);
244
245 if (a_length < sizeof(double) || b_length < sizeof(double)) {
246 return(0);
247 }
248
249 /* Try to compare mbr left lower corner (xmin, ymin) */
250 x1 = mach_double_read(a);
251 x2 = mach_double_read(b);
252 y1 = mach_double_read(a + sizeof(double) * SPDIMS);
253 y2 = mach_double_read(b + sizeof(double) * SPDIMS);
254
255 if (x1 > x2) {
256 return(1);
257 } else if (x2 > x1) {
258 return(-1);
259 }
260
261 if (y1 > y2) {
262 return(1);
263 } else if (y2 > y1) {
264 return(-1);
265 }
266
267 /* left lower corner (xmin, ymin) overlaps, now right upper corner */
268 x1 = mach_double_read(a + sizeof(double));
269 x2 = mach_double_read(b + sizeof(double));
270 y1 = mach_double_read(a + sizeof(double) * SPDIMS + sizeof(double));
271 y2 = mach_double_read(b + sizeof(double) * SPDIMS + sizeof(double));
272
273 if (x1 > x2) {
274 return(1);
275 } else if (x2 > x1) {
276 return(-1);
277 }
278
279 if (y1 > y2) {
280 return(1);
281 } else if (y2 > y1) {
282 return(-1);
283 }
284
285 return(0);
286}
287/*************************************************************//**
288Innobase uses this function to compare two gis data fields
289@return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
2901, 0 for rest compare modes, depends on a and b qualifies the
291relationship (CONTAINT, WITHIN etc.) */
292static
293int
294cmp_gis_field(
295/*============*/
296 page_cur_mode_t mode, /*!< in: compare mode */
297 const byte* a, /*!< in: data field */
298 unsigned int a_length, /*!< in: data field length,
299 not UNIV_SQL_NULL */
300 const byte* b, /*!< in: data field */
301 unsigned int b_length) /*!< in: data field length,
302 not UNIV_SQL_NULL */
303{
304 if (mode == PAGE_CUR_MBR_EQUAL) {
305 return cmp_geometry_field(DATA_GIS_MBR,
306 a, a_length, b, b_length);
307 } else {
308 return rtree_key_cmp(mode, a, int(a_length), b, int(b_length));
309 }
310}
311
312/** Compare two data fields.
313@param[in] mtype main type
314@param[in] prtype precise type
315@param[in] a data field
316@param[in] a_length length of a, in bytes (not UNIV_SQL_NULL)
317@param[in] b data field
318@param[in] b_length length of b, in bytes (not UNIV_SQL_NULL)
319@return positive, 0, negative, if a is greater, equal, less than b,
320respectively */
321static
322int
323cmp_whole_field(
324 ulint mtype,
325 ulint prtype,
326 const byte* a,
327 unsigned int a_length,
328 const byte* b,
329 unsigned int b_length)
330{
331 float f_1;
332 float f_2;
333 double d_1;
334 double d_2;
335
336 switch (mtype) {
337 case DATA_DECIMAL:
338 return(cmp_decimal(a, a_length, b, b_length));
339 case DATA_DOUBLE:
340 d_1 = mach_double_read(a);
341 d_2 = mach_double_read(b);
342
343 if (d_1 > d_2) {
344 return(1);
345 } else if (d_2 > d_1) {
346 return(-1);
347 }
348
349 return(0);
350
351 case DATA_FLOAT:
352 f_1 = mach_float_read(a);
353 f_2 = mach_float_read(b);
354
355 if (f_1 > f_2) {
356 return(1);
357 } else if (f_2 > f_1) {
358 return(-1);
359 }
360
361 return(0);
362 case DATA_VARCHAR:
363 case DATA_CHAR:
364 return(my_charset_latin1.coll->strnncollsp(
365 &my_charset_latin1,
366 a, a_length, b, b_length));
367 case DATA_BLOB:
368 if (prtype & DATA_BINARY_TYPE) {
369 ib::error() << "Comparing a binary BLOB"
370 " using a character set collation!";
371 ut_ad(0);
372 }
373 /* fall through */
374 case DATA_VARMYSQL:
375 case DATA_MYSQL:
376 return(innobase_mysql_cmp(prtype,
377 a, a_length, b, b_length));
378 case DATA_GEOMETRY:
379 return cmp_geometry_field(prtype, a, a_length, b, b_length);
380 default:
381 ib::fatal() << "Unknown data type number " << mtype;
382 }
383
384 return(0);
385}
386
387/** Compare two data fields.
388@param[in] mtype main type
389@param[in] prtype precise type
390@param[in] data1 data field
391@param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
392@param[in] data2 data field
393@param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
394@return the comparison result of data1 and data2
395@retval 0 if data1 is equal to data2
396@retval negative if data1 is less than data2
397@retval positive if data1 is greater than data2 */
398inline
399int
400cmp_data(
401 ulint mtype,
402 ulint prtype,
403 const byte* data1,
404 ulint len1,
405 const byte* data2,
406 ulint len2)
407{
408 ut_ad(len1 != UNIV_SQL_DEFAULT);
409 ut_ad(len2 != UNIV_SQL_DEFAULT);
410
411 if (len1 == UNIV_SQL_NULL || len2 == UNIV_SQL_NULL) {
412 if (len1 == len2) {
413 return(0);
414 }
415
416 /* We define the SQL null to be the smallest possible
417 value of a field. */
418 return(len1 == UNIV_SQL_NULL ? -1 : 1);
419 }
420
421 ulint pad;
422
423 switch (mtype) {
424 case DATA_FIXBINARY:
425 case DATA_BINARY:
426 if (dtype_get_charset_coll(prtype)
427 != DATA_MYSQL_BINARY_CHARSET_COLL) {
428 pad = 0x20;
429 break;
430 }
431 /* fall through */
432 case DATA_INT:
433 case DATA_SYS_CHILD:
434 case DATA_SYS:
435 pad = ULINT_UNDEFINED;
436 break;
437 case DATA_GEOMETRY:
438 ut_ad(prtype & DATA_BINARY_TYPE);
439 pad = ULINT_UNDEFINED;
440 if (prtype & DATA_GIS_MBR) {
441 return(cmp_whole_field(mtype, prtype,
442 data1, (unsigned) len1,
443 data2, (unsigned) len2));
444 }
445 break;
446 case DATA_BLOB:
447 if (prtype & DATA_BINARY_TYPE) {
448 pad = ULINT_UNDEFINED;
449 break;
450 }
451 /* fall through */
452 default:
453 return(cmp_whole_field(mtype, prtype,
454 data1, (unsigned) len1,
455 data2, (unsigned) len2));
456 }
457
458 ulint len;
459 int cmp;
460
461 if (len1 < len2) {
462 len = len1;
463 len2 -= len;
464 len1 = 0;
465 } else {
466 len = len2;
467 len1 -= len;
468 len2 = 0;
469 }
470
471 if (len) {
472#if defined __i386__ || defined __x86_64__ || defined _M_IX86 || defined _M_X64
473 /* Compare the first bytes with a loop to avoid the call
474 overhead of memcmp(). On x86 and x86-64, the GCC built-in
475 (repz cmpsb) seems to be very slow, so we will be calling the
476 libc version. http://gcc.gnu.org/bugzilla/show_bug.cgi?id=43052
477 tracks the slowness of the GCC built-in memcmp().
478
479 We compare up to the first 4..7 bytes with the loop.
480 The (len & 3) is used for "normalizing" or
481 "quantizing" the len parameter for the memcmp() call,
482 in case the whole prefix is equal. On x86 and x86-64,
483 the GNU libc memcmp() of equal strings is faster with
484 len=4 than with len=3.
485
486 On other architectures than the IA32 or AMD64, there could
487 be a built-in memcmp() that is faster than the loop.
488 We only use the loop where we know that it can improve
489 the performance. */
490 for (ulint i = 4 + (len & 3); i > 0; i--) {
491 cmp = int(*data1++) - int(*data2++);
492 if (cmp) {
493 return(cmp);
494 }
495
496 if (!--len) {
497 break;
498 }
499 }
500
501 if (len) {
502#endif /* IA32 or AMD64 */
503 cmp = memcmp(data1, data2, len);
504
505 if (cmp) {
506 return(cmp);
507 }
508
509 data1 += len;
510 data2 += len;
511#if defined __i386__ || defined __x86_64__ || defined _M_IX86 || defined _M_X64
512 }
513#endif /* IA32 or AMD64 */
514 }
515
516 cmp = (int) (len1 - len2);
517
518 if (!cmp || pad == ULINT_UNDEFINED) {
519 return(cmp);
520 }
521
522 len = 0;
523
524 if (len1) {
525 do {
526 cmp = static_cast<int>(
527 mach_read_from_1(&data1[len++]) - pad);
528 } while (cmp == 0 && len < len1);
529 } else {
530 ut_ad(len2 > 0);
531
532 do {
533 cmp = static_cast<int>(
534 pad - mach_read_from_1(&data2[len++]));
535 } while (cmp == 0 && len < len2);
536 }
537
538 return(cmp);
539}
540
541/** Compare a GIS data tuple to a physical record.
542@param[in] dtuple data tuple
543@param[in] rec B-tree record
544@param[in] offsets rec_get_offsets(rec)
545@param[in] mode compare mode
546@retval negative if dtuple is less than rec */
547int
548cmp_dtuple_rec_with_gis(
549/*====================*/
550 const dtuple_t* dtuple, /*!< in: data tuple */
551 const rec_t* rec, /*!< in: physical record which differs from
552 dtuple in some of the common fields, or which
553 has an equal number or more fields than
554 dtuple */
555 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
556 page_cur_mode_t mode) /*!< in: compare mode */
557{
558 const dfield_t* dtuple_field; /* current field in logical record */
559 ulint dtuple_f_len; /* the length of the current field
560 in the logical record */
561 ulint rec_f_len; /* length of current field in rec */
562 const byte* rec_b_ptr; /* pointer to the current byte in
563 rec field */
564 int ret = 0; /* return value */
565
566 dtuple_field = dtuple_get_nth_field(dtuple, 0);
567 dtuple_f_len = dfield_get_len(dtuple_field);
568
569 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
570 ret = cmp_gis_field(
571 mode, static_cast<const byte*>(dfield_get_data(dtuple_field)),
572 (unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
573
574 return(ret);
575}
576
577/** Compare a GIS data tuple to a physical record in rtree non-leaf node.
578We need to check the page number field, since we don't store pk field in
579rtree non-leaf node.
580@param[in] dtuple data tuple
581@param[in] rec R-tree record
582@param[in] offsets rec_get_offsets(rec)
583@retval negative if dtuple is less than rec */
584int
585cmp_dtuple_rec_with_gis_internal(
586 const dtuple_t* dtuple,
587 const rec_t* rec,
588 const ulint* offsets)
589{
590 const dfield_t* dtuple_field; /* current field in logical record */
591 ulint dtuple_f_len; /* the length of the current field
592 in the logical record */
593 ulint rec_f_len; /* length of current field in rec */
594 const byte* rec_b_ptr; /* pointer to the current byte in
595 rec field */
596 int ret = 0; /* return value */
597
598 dtuple_field = dtuple_get_nth_field(dtuple, 0);
599 dtuple_f_len = dfield_get_len(dtuple_field);
600
601 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
602 ret = cmp_gis_field(
603 PAGE_CUR_WITHIN,
604 static_cast<const byte*>(dfield_get_data(dtuple_field)),
605 (unsigned) dtuple_f_len, rec_b_ptr, (unsigned) rec_f_len);
606 if (ret != 0) {
607 return(ret);
608 }
609
610 dtuple_field = dtuple_get_nth_field(dtuple, 1);
611 dtuple_f_len = dfield_get_len(dtuple_field);
612 rec_b_ptr = rec_get_nth_field(rec, offsets, 1, &rec_f_len);
613
614 return(cmp_data(dtuple_field->type.mtype,
615 dtuple_field->type.prtype,
616 static_cast<const byte*>(dtuple_field->data),
617 dtuple_f_len,
618 rec_b_ptr,
619 rec_f_len));
620}
621
622/** Compare two data fields.
623@param[in] mtype main type
624@param[in] prtype precise type
625@param[in] data1 data field
626@param[in] len1 length of data1 in bytes, or UNIV_SQL_NULL
627@param[in] data2 data field
628@param[in] len2 length of data2 in bytes, or UNIV_SQL_NULL
629@return the comparison result of data1 and data2
630@retval 0 if data1 is equal to data2
631@retval negative if data1 is less than data2
632@retval positive if data1 is greater than data2 */
633int
634cmp_data_data(
635 ulint mtype,
636 ulint prtype,
637 const byte* data1,
638 ulint len1,
639 const byte* data2,
640 ulint len2)
641{
642 return(cmp_data(mtype, prtype, data1, len1, data2, len2));
643}
644
645/** Compare a data tuple to a physical record.
646@param[in] dtuple data tuple
647@param[in] rec B-tree record
648@param[in] offsets rec_get_offsets(rec)
649@param[in] n_cmp number of fields to compare
650@param[in,out] matched_fields number of completely matched fields
651@return the comparison result of dtuple and rec
652@retval 0 if dtuple is equal to rec
653@retval negative if dtuple is less than rec
654@retval positive if dtuple is greater than rec */
655int
656cmp_dtuple_rec_with_match_low(
657 const dtuple_t* dtuple,
658 const rec_t* rec,
659 const ulint* offsets,
660 ulint n_cmp,
661 ulint* matched_fields)
662{
663 ulint cur_field; /* current field number */
664 int ret; /* return value */
665
666 ut_ad(dtuple_check_typed(dtuple));
667 ut_ad(rec_offs_validate(rec, NULL, offsets));
668
669 cur_field = *matched_fields;
670
671 ut_ad(n_cmp > 0);
672 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
673 ut_ad(cur_field <= n_cmp);
674 ut_ad(cur_field <= rec_offs_n_fields(offsets));
675
676 if (cur_field == 0) {
677 ulint rec_info = rec_get_info_bits(rec,
678 rec_offs_comp(offsets));
679 ulint tup_info = dtuple_get_info_bits(dtuple);
680
681 if (UNIV_UNLIKELY(rec_info & REC_INFO_MIN_REC_FLAG)) {
682 ret = !(tup_info & REC_INFO_MIN_REC_FLAG);
683 goto order_resolved;
684 } else if (UNIV_UNLIKELY(tup_info & REC_INFO_MIN_REC_FLAG)) {
685 ret = -1;
686 goto order_resolved;
687 }
688 }
689
690 /* Match fields in a loop */
691
692 for (; cur_field < n_cmp; cur_field++) {
693 const byte* rec_b_ptr;
694 const dfield_t* dtuple_field
695 = dtuple_get_nth_field(dtuple, cur_field);
696 const byte* dtuple_b_ptr
697 = static_cast<const byte*>(
698 dfield_get_data(dtuple_field));
699 const dtype_t* type
700 = dfield_get_type(dtuple_field);
701 ulint dtuple_f_len
702 = dfield_get_len(dtuple_field);
703 ulint rec_f_len;
704
705 /* We should never compare against an externally
706 stored field. Only clustered index records can
707 contain externally stored fields, and the first fields
708 (primary key fields) should already differ. */
709 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
710 /* We should never compare against instantly added columns.
711 Columns can only be instantly added to clustered index
712 leaf page records, and the first fields (primary key fields)
713 should already differ. */
714 ut_ad(!rec_offs_nth_default(offsets, cur_field));
715
716 rec_b_ptr = rec_get_nth_field(rec, offsets, cur_field,
717 &rec_f_len);
718
719 ut_ad(!dfield_is_ext(dtuple_field));
720
721 ret = cmp_data(type->mtype, type->prtype,
722 dtuple_b_ptr, dtuple_f_len,
723 rec_b_ptr, rec_f_len);
724 if (ret) {
725 goto order_resolved;
726 }
727 }
728
729 ret = 0; /* If we ran out of fields, dtuple was equal to rec
730 up to the common fields */
731order_resolved:
732 *matched_fields = cur_field;
733 return(ret);
734}
735
736/** Get the pad character code point for a type.
737@param[in] type
738@return pad character code point
739@retval ULINT_UNDEFINED if no padding is specified */
740UNIV_INLINE
741ulint
742cmp_get_pad_char(
743 const dtype_t* type)
744{
745 switch (type->mtype) {
746 case DATA_FIXBINARY:
747 case DATA_BINARY:
748 if (dtype_get_charset_coll(type->prtype)
749 == DATA_MYSQL_BINARY_CHARSET_COLL) {
750 /* Starting from 5.0.18, do not pad
751 VARBINARY or BINARY columns. */
752 return(ULINT_UNDEFINED);
753 }
754 /* Fall through */
755 case DATA_CHAR:
756 case DATA_VARCHAR:
757 case DATA_MYSQL:
758 case DATA_VARMYSQL:
759 /* Space is the padding character for all char and binary
760 strings, and starting from 5.0.3, also for TEXT strings. */
761 return(0x20);
762 case DATA_GEOMETRY:
763 /* DATA_GEOMETRY is binary data, not ASCII-based. */
764 return(ULINT_UNDEFINED);
765 case DATA_BLOB:
766 if (!(type->prtype & DATA_BINARY_TYPE)) {
767 return(0x20);
768 }
769 /* Fall through */
770 default:
771 /* No padding specified */
772 return(ULINT_UNDEFINED);
773 }
774}
775
776/** Compare a data tuple to a physical record.
777@param[in] dtuple data tuple
778@param[in] rec B-tree or R-tree index record
779@param[in] index index tree
780@param[in] offsets rec_get_offsets(rec)
781@param[in,out] matched_fields number of completely matched fields
782@param[in,out] matched_bytes number of matched bytes in the first
783field that is not matched
784@return the comparison result of dtuple and rec
785@retval 0 if dtuple is equal to rec
786@retval negative if dtuple is less than rec
787@retval positive if dtuple is greater than rec */
788int
789cmp_dtuple_rec_with_match_bytes(
790 const dtuple_t* dtuple,
791 const rec_t* rec,
792 const dict_index_t* index,
793 const ulint* offsets,
794 ulint* matched_fields,
795 ulint* matched_bytes)
796{
797 ulint n_cmp = dtuple_get_n_fields_cmp(dtuple);
798 ulint cur_field; /* current field number */
799 ulint cur_bytes;
800 int ret; /* return value */
801
802 ut_ad(dtuple_check_typed(dtuple));
803 ut_ad(rec_offs_validate(rec, index, offsets));
804 ut_ad(!(REC_INFO_MIN_REC_FLAG
805 & dtuple_get_info_bits(dtuple)));
806 ut_ad(!(REC_INFO_MIN_REC_FLAG
807 & rec_get_info_bits(rec, rec_offs_comp(offsets))));
808
809 cur_field = *matched_fields;
810 cur_bytes = *matched_bytes;
811
812 ut_ad(n_cmp <= dtuple_get_n_fields(dtuple));
813 ut_ad(cur_field <= n_cmp);
814 ut_ad(cur_field + (cur_bytes > 0) <= rec_offs_n_fields(offsets));
815
816 /* Match fields in a loop; stop if we run out of fields in dtuple
817 or find an externally stored field */
818
819 while (cur_field < n_cmp) {
820 const dfield_t* dfield = dtuple_get_nth_field(
821 dtuple, cur_field);
822 const dtype_t* type = dfield_get_type(dfield);
823 ulint dtuple_f_len = dfield_get_len(dfield);
824 const byte* dtuple_b_ptr;
825 const byte* rec_b_ptr;
826 ulint rec_f_len;
827
828 dtuple_b_ptr = static_cast<const byte*>(
829 dfield_get_data(dfield));
830
831 ut_ad(!rec_offs_nth_default(offsets, cur_field));
832 rec_b_ptr = rec_get_nth_field(rec, offsets,
833 cur_field, &rec_f_len);
834 ut_ad(!rec_offs_nth_extern(offsets, cur_field));
835
836 /* If we have matched yet 0 bytes, it may be that one or
837 both the fields are SQL null, or the record or dtuple may be
838 the predefined minimum record. */
839 if (cur_bytes == 0) {
840 if (dtuple_f_len == UNIV_SQL_NULL) {
841 if (rec_f_len == UNIV_SQL_NULL) {
842
843 goto next_field;
844 }
845
846 ret = -1;
847 goto order_resolved;
848 } else if (rec_f_len == UNIV_SQL_NULL) {
849 /* We define the SQL null to be the
850 smallest possible value of a field
851 in the alphabetical order */
852
853 ret = 1;
854 goto order_resolved;
855 }
856 }
857
858 switch (type->mtype) {
859 case DATA_FIXBINARY:
860 case DATA_BINARY:
861 case DATA_INT:
862 case DATA_SYS_CHILD:
863 case DATA_SYS:
864 break;
865 case DATA_BLOB:
866 if (type->prtype & DATA_BINARY_TYPE) {
867 break;
868 }
869 /* fall through */
870 default:
871 ret = cmp_data(type->mtype, type->prtype,
872 dtuple_b_ptr, dtuple_f_len,
873 rec_b_ptr, rec_f_len);
874
875 if (!ret) {
876 goto next_field;
877 }
878
879 cur_bytes = 0;
880 goto order_resolved;
881 }
882
883 /* Set the pointers at the current byte */
884
885 rec_b_ptr += cur_bytes;
886 dtuple_b_ptr += cur_bytes;
887 /* Compare then the fields */
888
889 for (const ulint pad = cmp_get_pad_char(type);;
890 cur_bytes++) {
891 ulint rec_byte = pad;
892 ulint dtuple_byte = pad;
893
894 if (rec_f_len <= cur_bytes) {
895 if (dtuple_f_len <= cur_bytes) {
896
897 goto next_field;
898 }
899
900 if (rec_byte == ULINT_UNDEFINED) {
901 ret = 1;
902
903 goto order_resolved;
904 }
905 } else {
906 rec_byte = *rec_b_ptr++;
907 }
908
909 if (dtuple_f_len <= cur_bytes) {
910 if (dtuple_byte == ULINT_UNDEFINED) {
911 ret = -1;
912
913 goto order_resolved;
914 }
915 } else {
916 dtuple_byte = *dtuple_b_ptr++;
917 }
918
919 if (dtuple_byte < rec_byte) {
920 ret = -1;
921 goto order_resolved;
922 } else if (dtuple_byte > rec_byte) {
923 ret = 1;
924 goto order_resolved;
925 }
926 }
927
928next_field:
929 cur_field++;
930 cur_bytes = 0;
931 }
932
933 ut_ad(cur_bytes == 0);
934
935 ret = 0; /* If we ran out of fields, dtuple was equal to rec
936 up to the common fields */
937order_resolved:
938 *matched_fields = cur_field;
939 *matched_bytes = cur_bytes;
940
941 return(ret);
942}
943
944/** Compare a data tuple to a physical record.
945@see cmp_dtuple_rec_with_match
946@param[in] dtuple data tuple
947@param[in] rec B-tree record
948@param[in] offsets rec_get_offsets(rec); may be NULL
949for ROW_FORMAT=REDUNDANT
950@return the comparison result of dtuple and rec
951@retval 0 if dtuple is equal to rec
952@retval negative if dtuple is less than rec
953@retval positive if dtuple is greater than rec */
954int
955cmp_dtuple_rec(
956 const dtuple_t* dtuple,
957 const rec_t* rec,
958 const ulint* offsets)
959{
960 ulint matched_fields = 0;
961
962 ut_ad(rec_offs_validate(rec, NULL, offsets));
963 return(cmp_dtuple_rec_with_match(dtuple, rec, offsets,
964 &matched_fields));
965}
966
967/**************************************************************//**
968Checks if a dtuple is a prefix of a record. The last field in dtuple
969is allowed to be a prefix of the corresponding field in the record.
970@return TRUE if prefix */
971ibool
972cmp_dtuple_is_prefix_of_rec(
973/*========================*/
974 const dtuple_t* dtuple, /*!< in: data tuple */
975 const rec_t* rec, /*!< in: physical record */
976 const ulint* offsets)/*!< in: array returned by rec_get_offsets() */
977{
978 ulint n_fields;
979 ulint matched_fields = 0;
980
981 ut_ad(rec_offs_validate(rec, NULL, offsets));
982 n_fields = dtuple_get_n_fields(dtuple);
983
984 if (n_fields > rec_offs_n_fields(offsets)) {
985 ut_ad(0);
986 return(FALSE);
987 }
988
989 cmp_dtuple_rec_with_match(dtuple, rec, offsets, &matched_fields);
990 return(matched_fields == n_fields);
991}
992
993/*************************************************************//**
994Compare two physical record fields.
995@retval positive if rec1 field is greater than rec2
996@retval negative if rec1 field is less than rec2
997@retval 0 if rec1 field equals to rec2 */
998static MY_ATTRIBUTE((nonnull, warn_unused_result))
999int
1000cmp_rec_rec_simple_field(
1001/*=====================*/
1002 const rec_t* rec1, /*!< in: physical record */
1003 const rec_t* rec2, /*!< in: physical record */
1004 const ulint* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
1005 const ulint* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
1006 const dict_index_t* index, /*!< in: data dictionary index */
1007 ulint n) /*!< in: field to compare */
1008{
1009 const byte* rec1_b_ptr;
1010 const byte* rec2_b_ptr;
1011 ulint rec1_f_len;
1012 ulint rec2_f_len;
1013 const dict_col_t* col = dict_index_get_nth_col(index, n);
1014
1015 ut_ad(!rec_offs_nth_extern(offsets1, n));
1016 ut_ad(!rec_offs_nth_extern(offsets2, n));
1017
1018 rec1_b_ptr = rec_get_nth_field(rec1, offsets1, n, &rec1_f_len);
1019 rec2_b_ptr = rec_get_nth_field(rec2, offsets2, n, &rec2_f_len);
1020
1021 return(cmp_data(col->mtype, col->prtype,
1022 rec1_b_ptr, rec1_f_len, rec2_b_ptr, rec2_f_len));
1023}
1024
1025/** Compare two physical records that contain the same number of columns,
1026none of which are stored externally.
1027@retval positive if rec1 (including non-ordering columns) is greater than rec2
1028@retval negative if rec1 (including non-ordering columns) is less than rec2
1029@retval 0 if rec1 is a duplicate of rec2 */
1030int
1031cmp_rec_rec_simple(
1032/*===============*/
1033 const rec_t* rec1, /*!< in: physical record */
1034 const rec_t* rec2, /*!< in: physical record */
1035 const ulint* offsets1,/*!< in: rec_get_offsets(rec1, ...) */
1036 const ulint* offsets2,/*!< in: rec_get_offsets(rec2, ...) */
1037 const dict_index_t* index, /*!< in: data dictionary index */
1038 struct TABLE* table) /*!< in: MySQL table, for reporting
1039 duplicate key value if applicable,
1040 or NULL */
1041{
1042 ulint n;
1043 ulint n_uniq = dict_index_get_n_unique(index);
1044 bool null_eq = false;
1045
1046 ut_ad(rec_offs_n_fields(offsets1) >= n_uniq);
1047 ut_ad(rec_offs_n_fields(offsets2) == rec_offs_n_fields(offsets2));
1048
1049 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1050
1051 for (n = 0; n < n_uniq; n++) {
1052 int cmp = cmp_rec_rec_simple_field(
1053 rec1, rec2, offsets1, offsets2, index, n);
1054
1055 if (cmp) {
1056 return(cmp);
1057 }
1058
1059 /* If the fields are internally equal, they must both
1060 be NULL or non-NULL. */
1061 ut_ad(rec_offs_nth_sql_null(offsets1, n)
1062 == rec_offs_nth_sql_null(offsets2, n));
1063
1064 if (rec_offs_nth_sql_null(offsets1, n)) {
1065 ut_ad(!(dict_index_get_nth_col(index, n)->prtype
1066 & DATA_NOT_NULL));
1067 null_eq = true;
1068 }
1069 }
1070
1071 /* If we ran out of fields, the ordering columns of rec1 were
1072 equal to rec2. Issue a duplicate key error if needed. */
1073
1074 if (!null_eq && table && dict_index_is_unique(index)) {
1075 /* Report erroneous row using new version of table. */
1076 innobase_rec_to_mysql(table, rec1, index, offsets1);
1077 return(0);
1078 }
1079
1080 /* Else, keep comparing so that we have the full internal
1081 order. */
1082 for (; n < dict_index_get_n_fields(index); n++) {
1083 int cmp = cmp_rec_rec_simple_field(
1084 rec1, rec2, offsets1, offsets2, index, n);
1085
1086 if (cmp) {
1087 return(cmp);
1088 }
1089
1090 /* If the fields are internally equal, they must both
1091 be NULL or non-NULL. */
1092 ut_ad(rec_offs_nth_sql_null(offsets1, n)
1093 == rec_offs_nth_sql_null(offsets2, n));
1094 }
1095
1096 /* This should never be reached. Internally, an index must
1097 never contain duplicate entries. */
1098 ut_ad(0);
1099 return(0);
1100}
1101
1102/** Compare two B-tree records.
1103@param[in] rec1 B-tree record
1104@param[in] rec2 B-tree record
1105@param[in] offsets1 rec_get_offsets(rec1, index)
1106@param[in] offsets2 rec_get_offsets(rec2, index)
1107@param[in] index B-tree index
1108@param[in] nulls_unequal true if this is for index cardinality
1109statistics estimation, and innodb_stats_method=nulls_unequal
1110or innodb_stats_method=nulls_ignored
1111@param[out] matched_fields number of completely matched fields
1112within the first field not completely matched
1113@return the comparison result
1114@retval 0 if rec1 is equal to rec2
1115@retval negative if rec1 is less than rec2
1116@retval positive if rec2 is greater than rec2 */
1117int
1118cmp_rec_rec_with_match(
1119 const rec_t* rec1,
1120 const rec_t* rec2,
1121 const ulint* offsets1,
1122 const ulint* offsets2,
1123 const dict_index_t* index,
1124 bool nulls_unequal,
1125 ulint* matched_fields)
1126{
1127 ulint rec1_n_fields; /* the number of fields in rec */
1128 ulint rec1_f_len; /* length of current field in rec */
1129 const byte* rec1_b_ptr; /* pointer to the current byte
1130 in rec field */
1131 ulint rec2_n_fields; /* the number of fields in rec */
1132 ulint rec2_f_len; /* length of current field in rec */
1133 const byte* rec2_b_ptr; /* pointer to the current byte
1134 in rec field */
1135 ulint cur_field = 0; /* current field number */
1136 int ret = 0; /* return value */
1137 ulint comp;
1138
1139 ut_ad(rec1 != NULL);
1140 ut_ad(rec2 != NULL);
1141 ut_ad(index != NULL);
1142 ut_ad(rec_offs_validate(rec1, index, offsets1));
1143 ut_ad(rec_offs_validate(rec2, index, offsets2));
1144 ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
1145
1146 comp = rec_offs_comp(offsets1);
1147 rec1_n_fields = rec_offs_n_fields(offsets1);
1148 rec2_n_fields = rec_offs_n_fields(offsets2);
1149
1150 /* Test if rec is the predefined minimum record */
1151 if (UNIV_UNLIKELY(rec_get_info_bits(rec1, comp)
1152 & REC_INFO_MIN_REC_FLAG)) {
1153 ret = UNIV_UNLIKELY(rec_get_info_bits(rec2, comp)
1154 & REC_INFO_MIN_REC_FLAG)
1155 ? 0 : -1;
1156 goto order_resolved;
1157 } else if (UNIV_UNLIKELY
1158 (rec_get_info_bits(rec2, comp)
1159 & REC_INFO_MIN_REC_FLAG)) {
1160 ret = 1;
1161 goto order_resolved;
1162 }
1163
1164 /* Match fields in a loop */
1165
1166 for (; cur_field < rec1_n_fields && cur_field < rec2_n_fields;
1167 cur_field++) {
1168
1169 ulint mtype;
1170 ulint prtype;
1171
1172 /* If this is node-ptr records then avoid comparing node-ptr
1173 field. Only key field needs to be compared. */
1174 if (cur_field == dict_index_get_n_unique_in_tree(index)) {
1175 break;
1176 }
1177
1178 if (dict_index_is_ibuf(index)) {
1179 /* This is for the insert buffer B-tree. */
1180 mtype = DATA_BINARY;
1181 prtype = 0;
1182 } else {
1183 const dict_col_t* col;
1184
1185 col = dict_index_get_nth_col(index, cur_field);
1186
1187 mtype = col->mtype;
1188 prtype = col->prtype;
1189
1190 /* If the index is spatial index, we mark the
1191 prtype of the first field as MBR field. */
1192 if (cur_field == 0 && dict_index_is_spatial(index)) {
1193 ut_ad(DATA_GEOMETRY_MTYPE(mtype));
1194 prtype |= DATA_GIS_MBR;
1195 }
1196 }
1197
1198 /* We should never encounter an externally stored field.
1199 Externally stored fields only exist in clustered index
1200 leaf page records. These fields should already differ
1201 in the primary key columns already, before DB_TRX_ID,
1202 DB_ROLL_PTR, and any externally stored columns. */
1203 ut_ad(!rec_offs_nth_extern(offsets1, cur_field));
1204 ut_ad(!rec_offs_nth_extern(offsets2, cur_field));
1205 ut_ad(!rec_offs_nth_default(offsets1, cur_field));
1206 ut_ad(!rec_offs_nth_default(offsets2, cur_field));
1207
1208 rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
1209 cur_field, &rec1_f_len);
1210 rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
1211 cur_field, &rec2_f_len);
1212
1213 if (nulls_unequal
1214 && rec1_f_len == UNIV_SQL_NULL
1215 && rec2_f_len == UNIV_SQL_NULL) {
1216 ret = -1;
1217 goto order_resolved;
1218 }
1219
1220 ret = cmp_data(mtype, prtype,
1221 rec1_b_ptr, rec1_f_len,
1222 rec2_b_ptr, rec2_f_len);
1223 if (ret) {
1224 goto order_resolved;
1225 }
1226 }
1227
1228 /* If we ran out of fields, rec1 was equal to rec2 up
1229 to the common fields */
1230 ut_ad(ret == 0);
1231order_resolved:
1232 *matched_fields = cur_field;
1233 return(ret);
1234}
1235
1236#ifdef UNIV_COMPILE_TEST_FUNCS
1237
1238#ifdef HAVE_UT_CHRONO_T
1239
1240void
1241test_cmp_data_data(ulint len)
1242{
1243 int i;
1244 static byte zeros[64];
1245
1246 if (len > sizeof zeros) {
1247 len = sizeof zeros;
1248 }
1249
1250 ut_chrono_t ch(__func__);
1251
1252 for (i = 1000000; i > 0; i--) {
1253 i += cmp_data(DATA_INT, 0, zeros, len, zeros, len);
1254 }
1255}
1256
1257#endif /* HAVE_UT_CHRONO_T */
1258
1259#endif /* UNIV_COMPILE_TEST_FUNCS */
1260