1/* Copyright (c) 2011, Oracle and/or its affiliates.
2 Copyright (C) 2009-2011 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17#include <my_global.h>
18#include <m_ctype.h>
19#include <my_base.h>
20#include <my_compare.h>
21#include <my_sys.h>
22
23int ha_compare_text(CHARSET_INFO *charset_info, const uchar *a, size_t a_length,
24 const uchar *b, size_t b_length, my_bool part_key)
25{
26 if (!part_key)
27 return charset_info->coll->strnncollsp(charset_info, a, a_length,
28 b, b_length);
29 return charset_info->coll->strnncoll(charset_info, a, a_length,
30 b, b_length, part_key);
31}
32
33
34static int compare_bin(const uchar *a, uint a_length,
35 const uchar *b, uint b_length,
36 my_bool part_key, my_bool skip_end_space)
37{
38 uint length= MY_MIN(a_length,b_length);
39 const uchar *end= a+ length;
40 int flag;
41
42 while (a < end)
43 if ((flag= (int) *a++ - (int) *b++))
44 return flag;
45 if (part_key && b_length < a_length)
46 return 0;
47 if (skip_end_space && a_length != b_length)
48 {
49 int swap= 1;
50 /*
51 We are using space compression. We have to check if longer key
52 has next character < ' ', in which case it's less than the shorter
53 key that has an implicite space afterwards.
54
55 This code is identical to the one in
56 strings/ctype-simple.c:my_strnncollsp_simple
57 */
58 if (a_length < b_length)
59 {
60 /* put shorter key in a */
61 a_length= b_length;
62 a= b;
63 swap= -1; /* swap sign of result */
64 }
65 for (end= a + a_length-length; a < end ; a++)
66 {
67 if (*a != ' ')
68 return (*a < ' ') ? -swap : swap;
69 }
70 return 0;
71 }
72 return (int) (a_length-b_length);
73}
74
75
76/*
77 Compare two keys
78
79 SYNOPSIS
80 ha_key_cmp()
81 keyseg Array of key segments of key to compare
82 a First key to compare, in format from _mi_pack_key()
83 This is always from the row
84 b Second key to compare. This is from the row or the user
85 key_length Length of key to compare, based on key b. This can be shorter
86 than b to just compare sub keys
87 next_flag How keys should be compared
88 If bit SEARCH_FIND is not set the keys includes the row
89 position and this should also be compared
90 If SEARCH_PAGE_KEY_HAS_TRANSID is set then 'a' has transid
91 If SEARCH_USER_KEY_HAS_TRANSID is set then 'b' has transid
92 diff_pos OUT Number of first keypart where values differ, counting
93 from one.
94 diff_pos[1] OUT (b + diff_pos[1]) points to first value in tuple b
95 that is different from corresponding value in tuple a.
96
97 EXAMPLES
98 Example1: if the function is called for tuples
99 ('aaa','bbb') and ('eee','fff'), then
100 diff_pos[0] = 1 (as 'aaa' != 'eee')
101 diff_pos[1] = 0 (offset from beginning of tuple b to 'eee' keypart).
102
103 Example2: if the index function is called for tuples
104 ('aaa','bbb') and ('aaa','fff'),
105 diff_pos[0] = 2 (as 'aaa' != 'eee')
106 diff_pos[1] = 3 (offset from beginning of tuple b to 'fff' keypart,
107 here we assume that first key part is CHAR(3) NOT NULL)
108
109 NOTES
110 Number-keys can't be splited
111
112 RETURN VALUES
113 <0 If a < b
114 0 If a == b
115 >0 If a > b
116*/
117
118#define FCMP(A,B) ((int) (A) - (int) (B))
119
120int ha_key_cmp(HA_KEYSEG *keyseg, const uchar *a,
121 const uchar *b, uint key_length, uint32 nextflag,
122 uint *diff_pos)
123{
124 int flag;
125 int16 s_1,s_2;
126 int32 l_1,l_2;
127 uint32 u_1,u_2;
128 float f_1,f_2;
129 double d_1,d_2;
130 uint next_key_length;
131 const uchar *orig_b= b;
132
133 *diff_pos=0;
134 for ( ; (int) key_length >0 ; key_length=next_key_length, keyseg++)
135 {
136 const uchar *end;
137 uint piks=! (keyseg->flag & HA_NO_SORT);
138 (*diff_pos)++;
139 diff_pos[1]= (uint)(b - orig_b);
140
141 /* Handle NULL part */
142 if (keyseg->null_bit)
143 {
144 key_length--;
145 if (*a != *b && piks)
146 {
147 flag = (int) *a - (int) *b;
148 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
149 }
150 b++;
151 if (!*a++) /* If key was NULL */
152 {
153 if ((nextflag & (SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT |
154 SEARCH_NULL_ARE_EQUAL)) ==
155 (SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT))
156 {
157 /* Allow duplicate keys */
158 nextflag= (nextflag & ~(SEARCH_FIND | SEARCH_UPDATE)) | SEARCH_SAME;
159 }
160 else if (nextflag & SEARCH_NULL_ARE_NOT_EQUAL)
161 {
162 /*
163 This is only used from mi_check() to calculate cardinality.
164 It can't be used when searching for a key as this would cause
165 compare of (a,b) and (b,a) to return the same value.
166 */
167 return -1;
168 }
169 next_key_length=key_length;
170 continue; /* To next key part */
171 }
172 }
173 end= a+ MY_MIN(keyseg->length,key_length);
174 next_key_length=key_length-keyseg->length;
175
176 switch ((enum ha_base_keytype) keyseg->type) {
177 case HA_KEYTYPE_TEXT: /* Ascii; Key is converted */
178 if (keyseg->flag & HA_SPACE_PACK)
179 {
180 int a_length,b_length,pack_length;
181 get_key_length(a_length,a);
182 get_key_pack_length(b_length,pack_length,b);
183 next_key_length=key_length-b_length-pack_length;
184
185 if (piks &&
186 (flag=ha_compare_text(keyseg->charset,a,a_length,b,b_length,
187 (my_bool) ((nextflag & SEARCH_PREFIX) &&
188 next_key_length <= 0))))
189 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
190 a+=a_length;
191 b+=b_length;
192 break;
193 }
194 else
195 {
196 uint length=(uint) (end-a), a_length=length, b_length=length;
197 if (piks &&
198 (flag= ha_compare_text(keyseg->charset, a, a_length, b, b_length,
199 (my_bool) ((nextflag & SEARCH_PREFIX) &&
200 next_key_length <= 0))))
201 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
202 a=end;
203 b+=length;
204 }
205 break;
206 case HA_KEYTYPE_BINARY:
207 case HA_KEYTYPE_BIT:
208 if (keyseg->flag & HA_SPACE_PACK)
209 {
210 int a_length,b_length,pack_length;
211 get_key_length(a_length,a);
212 get_key_pack_length(b_length,pack_length,b);
213 next_key_length=key_length-b_length-pack_length;
214
215 if (piks &&
216 (flag=compare_bin(a,a_length,b,b_length,
217 (my_bool) ((nextflag & SEARCH_PREFIX) &&
218 next_key_length <= 0),1)))
219 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
220 a+=a_length;
221 b+=b_length;
222 break;
223 }
224 else
225 {
226 uint length=keyseg->length;
227 if (piks &&
228 (flag=compare_bin(a,length,b,length,
229 (my_bool) ((nextflag & SEARCH_PREFIX) &&
230 next_key_length <= 0),0)))
231 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
232 a+=length;
233 b+=length;
234 }
235 break;
236 case HA_KEYTYPE_VARTEXT1:
237 case HA_KEYTYPE_VARTEXT2:
238 {
239 int a_length,b_length,pack_length;
240 get_key_length(a_length,a);
241 get_key_pack_length(b_length,pack_length,b);
242 next_key_length=key_length-b_length-pack_length;
243
244 if (piks &&
245 (flag= ha_compare_text(keyseg->charset,a,a_length,b,b_length,
246 (my_bool) ((nextflag & SEARCH_PREFIX) &&
247 next_key_length <= 0))))
248 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
249 a+= a_length;
250 b+= b_length;
251 break;
252 }
253 break;
254 case HA_KEYTYPE_VARBINARY1:
255 case HA_KEYTYPE_VARBINARY2:
256 {
257 int a_length,b_length,pack_length;
258 get_key_length(a_length,a);
259 get_key_pack_length(b_length,pack_length,b);
260 next_key_length=key_length-b_length-pack_length;
261
262 if (piks &&
263 (flag=compare_bin(a,a_length,b,b_length,
264 (my_bool) ((nextflag & SEARCH_PREFIX) &&
265 next_key_length <= 0), 0)))
266 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
267 a+=a_length;
268 b+=b_length;
269 }
270 break;
271 case HA_KEYTYPE_INT8:
272 {
273 int i_1= (int) *((signed char*) a);
274 int i_2= (int) *((signed char*) b);
275 if (piks && (flag = CMP_NUM(i_1,i_2)))
276 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
277 a= end;
278 b++;
279 break;
280 }
281 case HA_KEYTYPE_SHORT_INT:
282 s_1= mi_sint2korr(a);
283 s_2= mi_sint2korr(b);
284 if (piks && (flag = CMP_NUM(s_1,s_2)))
285 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
286 a= end;
287 b+= 2; /* sizeof(short int); */
288 break;
289 case HA_KEYTYPE_USHORT_INT:
290 {
291 uint16 us_1,us_2;
292 us_1= mi_sint2korr(a);
293 us_2= mi_sint2korr(b);
294 if (piks && (flag = CMP_NUM(us_1,us_2)))
295 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
296 a= end;
297 b+=2; /* sizeof(short int); */
298 break;
299 }
300 case HA_KEYTYPE_LONG_INT:
301 l_1= mi_sint4korr(a);
302 l_2= mi_sint4korr(b);
303 if (piks && (flag = CMP_NUM(l_1,l_2)))
304 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
305 a= end;
306 b+= 4; /* sizeof(long int); */
307 break;
308 case HA_KEYTYPE_ULONG_INT:
309 u_1= mi_sint4korr(a);
310 u_2= mi_sint4korr(b);
311 if (piks && (flag = CMP_NUM(u_1,u_2)))
312 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
313 a= end;
314 b+= 4; /* sizeof(long int); */
315 break;
316 case HA_KEYTYPE_INT24:
317 l_1=mi_sint3korr(a);
318 l_2=mi_sint3korr(b);
319 if (piks && (flag = CMP_NUM(l_1,l_2)))
320 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
321 a= end;
322 b+= 3;
323 break;
324 case HA_KEYTYPE_UINT24:
325 l_1=mi_uint3korr(a);
326 l_2=mi_uint3korr(b);
327 if (piks && (flag = CMP_NUM(l_1,l_2)))
328 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
329 a= end;
330 b+= 3;
331 break;
332 case HA_KEYTYPE_FLOAT:
333 mi_float4get(f_1,a);
334 mi_float4get(f_2,b);
335 /*
336 The following may give a compiler warning about floating point
337 comparison not being safe, but this is ok in this context as
338 we are bascily doing sorting
339 */
340 if (piks && (flag = CMP_NUM(f_1,f_2)))
341 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
342 a= end;
343 b+= 4; /* sizeof(float); */
344 break;
345 case HA_KEYTYPE_DOUBLE:
346 mi_float8get(d_1,a);
347 mi_float8get(d_2,b);
348 /*
349 The following may give a compiler warning about floating point
350 comparison not being safe, but this is ok in this context as
351 we are bascily doing sorting
352 */
353 if (piks && (flag = CMP_NUM(d_1,d_2)))
354 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
355 a= end;
356 b+= 8; /* sizeof(double); */
357 break;
358 case HA_KEYTYPE_NUM: /* Numeric key */
359 {
360 int swap_flag= 0;
361 int alength,blength;
362
363 if (keyseg->flag & HA_REVERSE_SORT)
364 {
365 swap_variables(const uchar*, a, b);
366 swap_flag=1; /* Remember swap of a & b */
367 end= a+ (int) (end-b);
368 }
369 if (keyseg->flag & HA_SPACE_PACK)
370 {
371 alength= *a++; blength= *b++;
372 end=a+alength;
373 next_key_length=key_length-blength-1;
374 }
375 else
376 {
377 alength= (int) (end-a);
378 blength=keyseg->length;
379 /* remove pre space from keys */
380 for ( ; alength && *a == ' ' ; a++, alength--) ;
381 for ( ; blength && *b == ' ' ; b++, blength--) ;
382 }
383 if (piks)
384 {
385 if (*a == '-')
386 {
387 if (*b != '-')
388 return -1;
389 a++; b++;
390 swap_variables(const uchar*, a, b);
391 swap_variables(int, alength, blength);
392 swap_flag=1-swap_flag;
393 alength--; blength--;
394 end=a+alength;
395 }
396 else if (*b == '-')
397 return 1;
398 while (alength && (*a == '+' || *a == '0'))
399 {
400 a++; alength--;
401 }
402 while (blength && (*b == '+' || *b == '0'))
403 {
404 b++; blength--;
405 }
406 if (alength != blength)
407 return (alength < blength) ? -1 : 1;
408 while (a < end)
409 if (*a++ != *b++)
410 return ((int) a[-1] - (int) b[-1]);
411 }
412 else
413 {
414 b+=(end-a);
415 a=end;
416 }
417
418 if (swap_flag) /* Restore pointers */
419 swap_variables(const uchar*, a, b);
420 break;
421 }
422#ifdef HAVE_LONG_LONG
423 case HA_KEYTYPE_LONGLONG:
424 {
425 longlong ll_a,ll_b;
426 ll_a= mi_sint8korr(a);
427 ll_b= mi_sint8korr(b);
428 if (piks && (flag = CMP_NUM(ll_a,ll_b)))
429 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
430 a= end;
431 b+= 8;
432 break;
433 }
434 case HA_KEYTYPE_ULONGLONG:
435 {
436 ulonglong ll_a,ll_b;
437 ll_a= mi_uint8korr(a);
438 ll_b= mi_uint8korr(b);
439 if (piks && (flag = CMP_NUM(ll_a,ll_b)))
440 return ((keyseg->flag & HA_REVERSE_SORT) ? -flag : flag);
441 a= end;
442 b+= 8;
443 break;
444 }
445#endif
446 case HA_KEYTYPE_END: /* Ready */
447 goto end; /* diff_pos is incremented */
448 }
449 }
450 (*diff_pos)++;
451end:
452 if (!(nextflag & SEARCH_FIND))
453 {
454 /*
455 Compare rowid and possible transid
456 This happens in the following case:
457 - INSERT, UPDATE, DELETE when we have not unique keys or
458 are using versioning
459 - SEARCH_NEXT, SEARCH_PREVIOUS when we need to restart search
460
461 The logic for comparing transid are as follows:
462 Keys with have a transid have lowest bit in the rowidt. This means that
463 if we are comparing a key with a transid with another key that doesn't
464 have a tranid, we must reset the lowest bit for both keys.
465
466 When we have transid, the keys are compared in transid order.
467 A key without a transid is regared to be smaller than a key with
468 a transid.
469 */
470
471 uint i;
472 uchar key_mask, tmp_a, tmp_b;
473
474 if (nextflag & (SEARCH_NO_FIND | SEARCH_LAST)) /* Find record after key */
475 return (nextflag & (SEARCH_BIGGER | SEARCH_LAST)) ? -1 : 1;
476 key_mask= (uchar) 255;
477
478 if (!(nextflag & (SEARCH_USER_KEY_HAS_TRANSID |
479 SEARCH_PAGE_KEY_HAS_TRANSID)))
480 {
481 /*
482 Neither key has a trid. Only compare row id's and don't
483 try to store rows in trid order
484 */
485 key_length= keyseg->length;
486 nextflag&= ~SEARCH_INSERT;
487 }
488 else
489 {
490 /*
491 Set key_mask so that we reset the last bit in the rowid before
492 we compare it. This is needed as the lowest bit in the rowid is
493 used to mark if the key has a transid or not.
494 */
495 key_mask= (uchar) 254;
496 if (!test_all_bits(nextflag, (SEARCH_USER_KEY_HAS_TRANSID |
497 SEARCH_PAGE_KEY_HAS_TRANSID)))
498 {
499 /*
500 No transaction id for user key or for key on page
501 Ignore transid as at least one of the keys are visible for all
502 */
503 key_length= keyseg->length;
504 }
505 else
506 {
507 /*
508 Both keys have trids. No need of special handling of incomplete
509 trids below.
510 */
511 nextflag&= ~SEARCH_INSERT;
512 }
513 }
514 DBUG_ASSERT(key_length > 0);
515
516 for (i= key_length-1 ; (int) i-- > 0 ; )
517 {
518 if (*a++ != *b++)
519 {
520 flag= FCMP(a[-1],b[-1]);
521 goto found;
522 }
523 }
524 tmp_a= *a & key_mask;
525 tmp_b= *b & key_mask;
526 flag= FCMP(tmp_a, tmp_b);
527
528 if (flag == 0 && (nextflag & SEARCH_INSERT))
529 {
530 /*
531 Ensure that on insert we get rows stored in trid order.
532 If one of the parts doesn't have a trid, this should be regarded
533 as smaller than the other
534 */
535 return (nextflag & SEARCH_USER_KEY_HAS_TRANSID) ? -1 : 1;
536 }
537found:
538 if (nextflag & SEARCH_SAME)
539 return (flag); /* read same */
540 if (nextflag & SEARCH_BIGGER)
541 return (flag <= 0 ? -1 : 1); /* read next */
542 return (flag < 0 ? -1 : 1); /* read previous */
543 }
544 return 0;
545} /* ha_key_cmp */
546
547/*
548 Find the first NULL value in index-suffix values tuple
549
550 SYNOPSIS
551 ha_find_null()
552 keyseg Array of keyparts for key suffix
553 a Key suffix value tuple
554
555 DESCRIPTION
556 Find the first NULL value in index-suffix values tuple.
557
558 TODO
559 Consider optimizing this function or its use so we don't search for
560 NULL values in completely NOT NULL index suffixes.
561
562 RETURN
563 First key part that has NULL as value in values tuple, or the last key
564 part (with keyseg->type==HA_TYPE_END) if values tuple doesn't contain
565 NULLs.
566*/
567
568HA_KEYSEG *ha_find_null(HA_KEYSEG *keyseg, const uchar *a)
569{
570 for (; (enum ha_base_keytype) keyseg->type != HA_KEYTYPE_END; keyseg++)
571 {
572 const uchar *end;
573 if (keyseg->null_bit)
574 {
575 if (!*a++)
576 return keyseg;
577 }
578 end= a+ keyseg->length;
579
580 switch ((enum ha_base_keytype) keyseg->type) {
581 case HA_KEYTYPE_TEXT:
582 case HA_KEYTYPE_BINARY:
583 case HA_KEYTYPE_BIT:
584 if (keyseg->flag & HA_SPACE_PACK)
585 {
586 int a_length;
587 get_key_length(a_length, a);
588 a += a_length;
589 break;
590 }
591 else
592 a= end;
593 break;
594 case HA_KEYTYPE_VARTEXT1:
595 case HA_KEYTYPE_VARTEXT2:
596 case HA_KEYTYPE_VARBINARY1:
597 case HA_KEYTYPE_VARBINARY2:
598 {
599 int a_length;
600 get_key_length(a_length, a);
601 a+= a_length;
602 break;
603 }
604 case HA_KEYTYPE_NUM:
605 if (keyseg->flag & HA_SPACE_PACK)
606 {
607 int alength= *a++;
608 end= a+alength;
609 }
610 a= end;
611 break;
612 case HA_KEYTYPE_INT8:
613 case HA_KEYTYPE_SHORT_INT:
614 case HA_KEYTYPE_USHORT_INT:
615 case HA_KEYTYPE_LONG_INT:
616 case HA_KEYTYPE_ULONG_INT:
617 case HA_KEYTYPE_INT24:
618 case HA_KEYTYPE_UINT24:
619#ifdef HAVE_LONG_LONG
620 case HA_KEYTYPE_LONGLONG:
621 case HA_KEYTYPE_ULONGLONG:
622#endif
623 case HA_KEYTYPE_FLOAT:
624 case HA_KEYTYPE_DOUBLE:
625 a= end;
626 break;
627 case HA_KEYTYPE_END: /* purecov: inspected */
628 /* keep compiler happy */
629 DBUG_ASSERT(0);
630 break;
631 }
632 }
633 return keyseg;
634}
635
636