1/* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
2 Copyright (c) 2011, 2013, Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17#include "mariadb.h"
18#include <my_bit.h>
19#include "rpl_utility.h"
20#include "log_event.h"
21
22#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
23#include "rpl_rli.h"
24#include "sql_select.h"
25
26/**
27 Calculate display length for MySQL56 temporal data types from their metadata.
28 It contains fractional precision in the low 16-bit word.
29*/
30static uint32
31max_display_length_for_temporal2_field(uint32 int_display_length,
32 unsigned int metadata)
33{
34 metadata&= 0x00ff;
35 return int_display_length + metadata + (metadata ? 1 : 0);
36}
37
38
39/**
40 Compute the maximum display length of a field.
41
42 @param sql_type Type of the field
43 @param metadata The metadata from the master for the field.
44 @return Maximum length of the field in bytes.
45
46 The precise values calculated by field->max_display_length() and
47 calculated by max_display_length_for_field() can differ (by +1 or -1)
48 for integer data types (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
49 This slight difference is not important here, because we call
50 this function only for two *different* integer data types.
51 */
52static uint32
53max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
54{
55 DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
56 DBUG_ASSERT(metadata >> 16 == 0);
57
58 switch (sql_type) {
59 case MYSQL_TYPE_NEWDECIMAL:
60 return metadata >> 8;
61
62 case MYSQL_TYPE_FLOAT:
63 return 12;
64
65 case MYSQL_TYPE_DOUBLE:
66 return 22;
67
68 case MYSQL_TYPE_SET:
69 case MYSQL_TYPE_ENUM:
70 return metadata & 0x00ff;
71
72 case MYSQL_TYPE_STRING:
73 {
74 uchar type= metadata >> 8;
75 if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM)
76 return metadata & 0xff;
77 else
78 /* This is taken from Field_string::unpack. */
79 return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
80 }
81
82 case MYSQL_TYPE_YEAR:
83 case MYSQL_TYPE_TINY:
84 return 4;
85
86 case MYSQL_TYPE_SHORT:
87 return 6;
88
89 case MYSQL_TYPE_INT24:
90 return 9;
91
92 case MYSQL_TYPE_LONG:
93 return 11;
94
95#ifdef HAVE_LONG_LONG
96 case MYSQL_TYPE_LONGLONG:
97 return 20;
98
99#endif
100 case MYSQL_TYPE_NULL:
101 return 0;
102
103 case MYSQL_TYPE_NEWDATE:
104 return 3;
105
106 case MYSQL_TYPE_DATE:
107 return 3;
108
109 case MYSQL_TYPE_TIME:
110 return MIN_TIME_WIDTH;
111
112 case MYSQL_TYPE_TIME2:
113 return max_display_length_for_temporal2_field(MIN_TIME_WIDTH, metadata);
114
115 case MYSQL_TYPE_TIMESTAMP:
116 return MAX_DATETIME_WIDTH;
117
118 case MYSQL_TYPE_TIMESTAMP2:
119 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
120
121 case MYSQL_TYPE_DATETIME:
122 return MAX_DATETIME_WIDTH;
123
124 case MYSQL_TYPE_DATETIME2:
125 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
126
127 case MYSQL_TYPE_BIT:
128 /*
129 Decode the size of the bit field from the master.
130 */
131 DBUG_ASSERT((metadata & 0xff) <= 7);
132 return 8 * (metadata >> 8U) + (metadata & 0x00ff);
133
134 case MYSQL_TYPE_VAR_STRING:
135 case MYSQL_TYPE_VARCHAR:
136 return metadata;
137 case MYSQL_TYPE_VARCHAR_COMPRESSED:
138 return metadata - 1;
139
140 /*
141 The actual length for these types does not really matter since
142 they are used to calc_pack_length, which ignores the given
143 length for these types.
144
145 Since we want this to be accurate for other uses, we return the
146 maximum size in bytes of these BLOBs.
147 */
148
149 case MYSQL_TYPE_TINY_BLOB:
150 return (uint32)my_set_bits(1 * 8);
151
152 case MYSQL_TYPE_MEDIUM_BLOB:
153 return (uint32)my_set_bits(3 * 8);
154
155 case MYSQL_TYPE_BLOB:
156 case MYSQL_TYPE_BLOB_COMPRESSED:
157 /*
158 For the blob type, Field::real_type() lies and say that all
159 blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
160 at the length instead to decide what the max display size is.
161 */
162 return (uint32)my_set_bits(metadata * 8);
163
164 case MYSQL_TYPE_LONG_BLOB:
165 case MYSQL_TYPE_GEOMETRY:
166 return (uint32)my_set_bits(4 * 8);
167
168 default:
169 return ~(uint32) 0;
170 }
171}
172
173
174/*
175 Compare the pack lengths of a source field (on the master) and a
176 target field (on the slave).
177
178 @param field Target field.
179 @param type Source field type.
180 @param metadata Source field metadata.
181
182 @retval -1 The length of the source field is smaller than the target field.
183 @retval 0 The length of the source and target fields are the same.
184 @retval 1 The length of the source field is greater than the target field.
185 */
186int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
187{
188 DBUG_ENTER("compare_lengths");
189 size_t const source_length=
190 max_display_length_for_field(source_type, metadata);
191 size_t const target_length= field->max_display_length();
192 DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
193 " target_length: %lu, target_type: %u",
194 (unsigned long) source_length, source_type,
195 (unsigned long) target_length, field->real_type()));
196 int result= source_length < target_length ? -1 : source_length > target_length;
197 DBUG_PRINT("result", ("%d", result));
198 DBUG_RETURN(result);
199}
200#endif //MYSQL_CLIENT
201/*********************************************************************
202 * table_def member definitions *
203 *********************************************************************/
204
205/*
206 This function returns the field size in raw bytes based on the type
207 and the encoded field data from the master's raw data.
208*/
209uint32 table_def::calc_field_size(uint col, uchar *master_data) const
210{
211 uint32 length= 0;
212
213 switch (type(col)) {
214 case MYSQL_TYPE_NEWDECIMAL:
215 length= my_decimal_get_binary_size(m_field_metadata[col] >> 8,
216 m_field_metadata[col] & 0xff);
217 break;
218 case MYSQL_TYPE_DECIMAL:
219 case MYSQL_TYPE_FLOAT:
220 case MYSQL_TYPE_DOUBLE:
221 length= m_field_metadata[col];
222 break;
223 /*
224 The cases for SET and ENUM are include for completeness, however
225 both are mapped to type MYSQL_TYPE_STRING and their real types
226 are encoded in the field metadata.
227 */
228 case MYSQL_TYPE_SET:
229 case MYSQL_TYPE_ENUM:
230 case MYSQL_TYPE_STRING:
231 {
232 uchar type= m_field_metadata[col] >> 8U;
233 if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
234 length= m_field_metadata[col] & 0x00ff;
235 else
236 {
237 /*
238 We are reading the actual size from the master_data record
239 because this field has the actual lengh stored in the first
240 byte.
241 */
242 length= (uint) *master_data + 1;
243 DBUG_ASSERT(length != 0);
244 }
245 break;
246 }
247 case MYSQL_TYPE_YEAR:
248 case MYSQL_TYPE_TINY:
249 length= 1;
250 break;
251 case MYSQL_TYPE_SHORT:
252 length= 2;
253 break;
254 case MYSQL_TYPE_INT24:
255 length= 3;
256 break;
257 case MYSQL_TYPE_LONG:
258 length= 4;
259 break;
260#ifdef HAVE_LONG_LONG
261 case MYSQL_TYPE_LONGLONG:
262 length= 8;
263 break;
264#endif
265 case MYSQL_TYPE_NULL:
266 length= 0;
267 break;
268 case MYSQL_TYPE_NEWDATE:
269 length= 3;
270 break;
271 case MYSQL_TYPE_DATE:
272 case MYSQL_TYPE_TIME:
273 length= 3;
274 break;
275 case MYSQL_TYPE_TIME2:
276 length= my_time_binary_length(m_field_metadata[col]);
277 break;
278 case MYSQL_TYPE_TIMESTAMP:
279 length= 4;
280 break;
281 case MYSQL_TYPE_TIMESTAMP2:
282 length= my_timestamp_binary_length(m_field_metadata[col]);
283 break;
284 case MYSQL_TYPE_DATETIME:
285 length= 8;
286 break;
287 case MYSQL_TYPE_DATETIME2:
288 length= my_datetime_binary_length(m_field_metadata[col]);
289 break;
290 case MYSQL_TYPE_BIT:
291 {
292 /*
293 Decode the size of the bit field from the master.
294 from_len is the length in bytes from the master
295 from_bit_len is the number of extra bits stored in the master record
296 If from_bit_len is not 0, add 1 to the length to account for accurate
297 number of bytes needed.
298 */
299 uint from_len= (m_field_metadata[col] >> 8U) & 0x00ff;
300 uint from_bit_len= m_field_metadata[col] & 0x00ff;
301 DBUG_ASSERT(from_bit_len <= 7);
302 length= from_len + ((from_bit_len > 0) ? 1 : 0);
303 break;
304 }
305 case MYSQL_TYPE_VARCHAR:
306 case MYSQL_TYPE_VARCHAR_COMPRESSED:
307 {
308 length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
309 length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
310 break;
311 }
312 case MYSQL_TYPE_TINY_BLOB:
313 case MYSQL_TYPE_MEDIUM_BLOB:
314 case MYSQL_TYPE_LONG_BLOB:
315 case MYSQL_TYPE_BLOB:
316 case MYSQL_TYPE_BLOB_COMPRESSED:
317 case MYSQL_TYPE_GEOMETRY:
318 {
319 /*
320 Compute the length of the data. We cannot use get_length() here
321 since it is dependent on the specific table (and also checks the
322 packlength using the internal 'table' pointer) and replication
323 is using a fixed format for storing data in the binlog.
324 */
325 switch (m_field_metadata[col]) {
326 case 1:
327 length= *master_data;
328 break;
329 case 2:
330 length= uint2korr(master_data);
331 break;
332 case 3:
333 length= uint3korr(master_data);
334 break;
335 case 4:
336 length= uint4korr(master_data);
337 break;
338 default:
339 DBUG_ASSERT(0); // Should not come here
340 break;
341 }
342
343 length+= m_field_metadata[col];
344 break;
345 }
346 default:
347 length= ~(uint32) 0;
348 }
349 return length;
350}
351
352#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
353/**
354 */
355void show_sql_type(enum_field_types type, uint16 metadata, String *str, CHARSET_INFO *field_cs)
356{
357 DBUG_ENTER("show_sql_type");
358 DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
359
360 switch (type)
361 {
362 case MYSQL_TYPE_TINY:
363 str->set_ascii(STRING_WITH_LEN("tinyint"));
364 break;
365
366 case MYSQL_TYPE_SHORT:
367 str->set_ascii(STRING_WITH_LEN("smallint"));
368 break;
369
370 case MYSQL_TYPE_LONG:
371 str->set_ascii(STRING_WITH_LEN("int"));
372 break;
373
374 case MYSQL_TYPE_FLOAT:
375 str->set_ascii(STRING_WITH_LEN("float"));
376 break;
377
378 case MYSQL_TYPE_DOUBLE:
379 str->set_ascii(STRING_WITH_LEN("double"));
380 break;
381
382 case MYSQL_TYPE_NULL:
383 str->set_ascii(STRING_WITH_LEN("null"));
384 break;
385
386 case MYSQL_TYPE_TIMESTAMP:
387 case MYSQL_TYPE_TIMESTAMP2:
388 str->set_ascii(STRING_WITH_LEN("timestamp"));
389 break;
390
391 case MYSQL_TYPE_LONGLONG:
392 str->set_ascii(STRING_WITH_LEN("bigint"));
393 break;
394
395 case MYSQL_TYPE_INT24:
396 str->set_ascii(STRING_WITH_LEN("mediumint"));
397 break;
398
399 case MYSQL_TYPE_NEWDATE:
400 case MYSQL_TYPE_DATE:
401 str->set_ascii(STRING_WITH_LEN("date"));
402 break;
403
404 case MYSQL_TYPE_TIME:
405 case MYSQL_TYPE_TIME2:
406 str->set_ascii(STRING_WITH_LEN("time"));
407 break;
408
409 case MYSQL_TYPE_DATETIME:
410 case MYSQL_TYPE_DATETIME2:
411 str->set_ascii(STRING_WITH_LEN("datetime"));
412 break;
413
414 case MYSQL_TYPE_YEAR:
415 str->set_ascii(STRING_WITH_LEN("year"));
416 break;
417
418 case MYSQL_TYPE_VAR_STRING:
419 case MYSQL_TYPE_VARCHAR:
420 case MYSQL_TYPE_VARCHAR_COMPRESSED:
421 {
422 CHARSET_INFO *cs= str->charset();
423 size_t length=
424 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
425 "varchar(%u)%s", metadata,
426 type == MYSQL_TYPE_VARCHAR_COMPRESSED ? " compressed"
427 : "");
428 str->length(length);
429 }
430 break;
431
432 case MYSQL_TYPE_BIT:
433 {
434 CHARSET_INFO *cs= str->charset();
435 int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF);
436 size_t length=
437 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
438 "bit(%d)", bit_length);
439 str->length(length);
440 }
441 break;
442
443 case MYSQL_TYPE_DECIMAL:
444 {
445 CHARSET_INFO *cs= str->charset();
446 size_t length=
447 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
448 "decimal(%d,?)/*old*/", metadata);
449 str->length(length);
450 }
451 break;
452
453 case MYSQL_TYPE_NEWDECIMAL:
454 {
455 CHARSET_INFO *cs= str->charset();
456 size_t length=
457 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
458 "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
459 str->length(length);
460 }
461 break;
462
463 case MYSQL_TYPE_ENUM:
464 str->set_ascii(STRING_WITH_LEN("enum"));
465 break;
466
467 case MYSQL_TYPE_SET:
468 str->set_ascii(STRING_WITH_LEN("set"));
469 break;
470
471 case MYSQL_TYPE_BLOB:
472 case MYSQL_TYPE_BLOB_COMPRESSED:
473 /*
474 Field::real_type() lies regarding the actual type of a BLOB, so
475 it is necessary to check the pack length to figure out what kind
476 of blob it really is.
477 */
478 switch (get_blob_type_from_length(metadata))
479 {
480 case MYSQL_TYPE_TINY_BLOB:
481 str->set_ascii(STRING_WITH_LEN("tinyblob"));
482 break;
483
484 case MYSQL_TYPE_MEDIUM_BLOB:
485 str->set_ascii(STRING_WITH_LEN("mediumblob"));
486 break;
487
488 case MYSQL_TYPE_LONG_BLOB:
489 str->set_ascii(STRING_WITH_LEN("longblob"));
490 break;
491
492 case MYSQL_TYPE_BLOB:
493 str->set_ascii(STRING_WITH_LEN("blob"));
494 break;
495
496 default:
497 DBUG_ASSERT(0);
498 break;
499 }
500
501 if (type == MYSQL_TYPE_BLOB_COMPRESSED)
502 str->append(STRING_WITH_LEN(" compressed"));
503 break;
504
505 case MYSQL_TYPE_STRING:
506 {
507 /*
508 This is taken from Field_string::unpack.
509 */
510 CHARSET_INFO *cs= str->charset();
511 uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
512 size_t length=
513 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
514 "char(%d)", bytes / field_cs->mbmaxlen);
515 str->length(length);
516 }
517 break;
518
519 case MYSQL_TYPE_GEOMETRY:
520 str->set_ascii(STRING_WITH_LEN("geometry"));
521 break;
522
523 default:
524 str->set_ascii(STRING_WITH_LEN("<unknown type>"));
525 }
526 DBUG_VOID_RETURN;
527}
528
529
530/**
531 Check the order variable and print errors if the order is not
532 acceptable according to the current settings.
533
534 @param order The computed order of the conversion needed.
535 @param rli The relay log info data structure: for error reporting.
536 */
537bool is_conversion_ok(int order, Relay_log_info *rli)
538{
539 DBUG_ENTER("is_conversion_ok");
540 bool allow_non_lossy, allow_lossy;
541
542 allow_non_lossy = slave_type_conversions_options &
543 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
544 allow_lossy= slave_type_conversions_options &
545 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
546
547 DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
548 allow_non_lossy ? " ALL_NON_LOSSY" : "",
549 allow_lossy ? " ALL_LOSSY" : ""));
550 if (order < 0 && !allow_non_lossy)
551 {
552 /* !!! Add error message saying that non-lossy conversions need to be allowed. */
553 DBUG_RETURN(false);
554 }
555
556 if (order > 0 && !allow_lossy)
557 {
558 /* !!! Add error message saying that lossy conversions need to be allowed. */
559 DBUG_RETURN(false);
560 }
561
562 DBUG_RETURN(true);
563}
564
565
566/**
567 Can a type potentially be converted to another type?
568
569 This function check if the types are convertible and what
570 conversion is required.
571
572 If conversion is not possible, and error is printed.
573
574 If conversion is possible:
575
576 - *order will be set to -1 if source type is smaller than target
577 type and a non-lossy conversion can be required. This includes
578 the case where the field types are different but types could
579 actually be converted in either direction.
580
581 - *order will be set to 0 if no conversion is required.
582
583 - *order will be set to 1 if the source type is strictly larger
584 than the target type and that conversion is potentially lossy.
585
586 @param[in] field Target field
587 @param[in] type Source field type
588 @param[in] metadata Source field metadata
589 @param[in] rli Relay log info (for error reporting)
590 @param[in] mflags Flags from the table map event
591 @param[out] order Order between source field and target field
592
593 @return @c true if conversion is possible according to the current
594 settings, @c false if conversion is not possible according to the
595 current setting.
596 */
597static bool
598can_convert_field_to(Field *field,
599 enum_field_types source_type, uint16 metadata,
600 Relay_log_info *rli, uint16 mflags,
601 int *order_var)
602{
603 DBUG_ENTER("can_convert_field_to");
604 bool same_type;
605#ifndef DBUG_OFF
606 char field_type_buf[MAX_FIELD_WIDTH];
607 String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
608 field->sql_type(field_type);
609 DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
610 field_type.c_ptr_safe(), field->real_type(), source_type, metadata));
611#endif
612 /**
613 @todo
614 Implement Field_varstring_cmopressed::real_type() and
615 Field_blob_compressed::real_type() properly. All occurencies
616 of Field::real_type() have to be inspected and adjusted if needed.
617
618 Until it is not ready we have to compare source_type against
619 binlog_type() when replicating from or to compressed data types.
620
621 @sa Comment for Field::binlog_type()
622 */
623 if (source_type == MYSQL_TYPE_VARCHAR_COMPRESSED ||
624 source_type == MYSQL_TYPE_BLOB_COMPRESSED ||
625 field->binlog_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
626 field->binlog_type() == MYSQL_TYPE_BLOB_COMPRESSED)
627 same_type= field->binlog_type() == source_type;
628 else
629 same_type= field->real_type() == source_type;
630
631 /*
632 If the real type is the same, we need to check the metadata to
633 decide if conversions are allowed.
634 */
635 if (same_type)
636 {
637 if (metadata == 0) // Metadata can only be zero if no metadata was provided
638 {
639 /*
640 If there is no metadata, we either have an old event where no
641 metadata were supplied, or a type that does not require any
642 metadata. In either case, conversion can be done but no
643 conversion table is necessary.
644 */
645 DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
646 *order_var= 0;
647 DBUG_RETURN(true);
648 }
649
650 DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
651 if (field->compatible_field_size(metadata, rli, mflags, order_var))
652 DBUG_RETURN(is_conversion_ok(*order_var, rli));
653 else
654 DBUG_RETURN(false);
655 }
656 else if (
657 /*
658 Conversion from MariaDB TIMESTAMP(0), TIME(0), DATETIME(0)
659 to the corresponding MySQL56 types is non-lossy.
660 */
661 (metadata == 0 &&
662 ((field->real_type() == MYSQL_TYPE_TIMESTAMP2 &&
663 source_type == MYSQL_TYPE_TIMESTAMP) ||
664 (field->real_type() == MYSQL_TYPE_TIME2 &&
665 source_type == MYSQL_TYPE_TIME) ||
666 (field->real_type() == MYSQL_TYPE_DATETIME2 &&
667 source_type == MYSQL_TYPE_DATETIME))) ||
668 /*
669 Conversion from MySQL56 TIMESTAMP(N), TIME(N), DATETIME(N)
670 to the corresponding MariaDB or MySQL55 types is non-lossy.
671 */
672 (metadata == field->decimals() &&
673 ((field->real_type() == MYSQL_TYPE_TIMESTAMP &&
674 source_type == MYSQL_TYPE_TIMESTAMP2) ||
675 (field->real_type() == MYSQL_TYPE_TIME &&
676 source_type == MYSQL_TYPE_TIME2) ||
677 (field->real_type() == MYSQL_TYPE_DATETIME &&
678 source_type == MYSQL_TYPE_DATETIME2))))
679 {
680 /*
681 TS-TODO: conversion from FSP1>FSP2.
682 */
683 *order_var= -1;
684 DBUG_RETURN(true);
685 }
686 else if (!slave_type_conversions_options)
687 DBUG_RETURN(false);
688
689 /*
690 Here, from and to will always be different. Since the types are
691 different, we cannot use the compatible_field_size() function, but
692 have to rely on hard-coded max-sizes for fields.
693 */
694
695 DBUG_PRINT("debug", ("Base types are different, checking conversion"));
696 switch (source_type) // Source type (on master)
697 {
698 case MYSQL_TYPE_DECIMAL:
699 case MYSQL_TYPE_NEWDECIMAL:
700 case MYSQL_TYPE_FLOAT:
701 case MYSQL_TYPE_DOUBLE:
702 switch (field->real_type())
703 {
704 case MYSQL_TYPE_NEWDECIMAL:
705 /*
706 Then the other type is either FLOAT, DOUBLE, or old style
707 DECIMAL, so we require lossy conversion.
708 */
709 *order_var= 1;
710 DBUG_RETURN(is_conversion_ok(*order_var, rli));
711
712 case MYSQL_TYPE_DECIMAL:
713 case MYSQL_TYPE_FLOAT:
714 case MYSQL_TYPE_DOUBLE:
715 {
716 if (source_type == MYSQL_TYPE_NEWDECIMAL ||
717 source_type == MYSQL_TYPE_DECIMAL)
718 *order_var = 1; // Always require lossy conversions
719 else
720 *order_var= compare_lengths(field, source_type, metadata);
721 DBUG_ASSERT(*order_var != 0);
722 DBUG_RETURN(is_conversion_ok(*order_var, rli));
723 }
724
725 default:
726 DBUG_RETURN(false);
727 }
728 break;
729
730 /*
731 The length comparison check will do the correct job of comparing
732 the field lengths (in bytes) of two integer types.
733 */
734 case MYSQL_TYPE_TINY:
735 case MYSQL_TYPE_SHORT:
736 case MYSQL_TYPE_INT24:
737 case MYSQL_TYPE_LONG:
738 case MYSQL_TYPE_LONGLONG:
739 switch (field->real_type())
740 {
741 case MYSQL_TYPE_TINY:
742 case MYSQL_TYPE_SHORT:
743 case MYSQL_TYPE_INT24:
744 case MYSQL_TYPE_LONG:
745 case MYSQL_TYPE_LONGLONG:
746 /*
747 max_display_length_for_field() is not fully precise for the integer
748 data types. So its result cannot be compared to the result of
749 field->max_dispay_length() when the table field and the binlog field
750 are of the same type.
751 This code should eventually be rewritten not to use
752 compare_lengths(), to detect subtype/supetype relations
753 just using the type codes.
754 */
755 DBUG_ASSERT(source_type != field->real_type());
756 *order_var= compare_lengths(field, source_type, metadata);
757 DBUG_ASSERT(*order_var != 0);
758 DBUG_RETURN(is_conversion_ok(*order_var, rli));
759
760 default:
761 DBUG_RETURN(false);
762 }
763 break;
764
765 /*
766 Since source and target type is different, and it is not possible
767 to convert bit types to anything else, this will return false.
768 */
769 case MYSQL_TYPE_BIT:
770 DBUG_RETURN(false);
771
772 /*
773 If all conversions are disabled, it is not allowed to convert
774 between these types. Since the TEXT vs. BINARY is distinguished by
775 the charset, and the charset is not replicated, we cannot
776 currently distinguish between , e.g., TEXT and BLOB.
777 */
778 case MYSQL_TYPE_TINY_BLOB:
779 case MYSQL_TYPE_MEDIUM_BLOB:
780 case MYSQL_TYPE_LONG_BLOB:
781 case MYSQL_TYPE_BLOB:
782 case MYSQL_TYPE_BLOB_COMPRESSED:
783 case MYSQL_TYPE_STRING:
784 case MYSQL_TYPE_VAR_STRING:
785 case MYSQL_TYPE_VARCHAR:
786 case MYSQL_TYPE_VARCHAR_COMPRESSED:
787 switch (field->real_type())
788 {
789 case MYSQL_TYPE_TINY_BLOB:
790 case MYSQL_TYPE_MEDIUM_BLOB:
791 case MYSQL_TYPE_LONG_BLOB:
792 case MYSQL_TYPE_BLOB:
793 case MYSQL_TYPE_BLOB_COMPRESSED:
794 case MYSQL_TYPE_STRING:
795 case MYSQL_TYPE_VAR_STRING:
796 case MYSQL_TYPE_VARCHAR:
797 case MYSQL_TYPE_VARCHAR_COMPRESSED:
798 *order_var= compare_lengths(field, source_type, metadata);
799 /*
800 Here we know that the types are different, so if the order
801 gives that they do not require any conversion, we still need
802 to have non-lossy conversion enabled to allow conversion
803 between different (string) types of the same length.
804 */
805 if (*order_var == 0)
806 *order_var= -1;
807 DBUG_RETURN(is_conversion_ok(*order_var, rli));
808
809 default:
810 DBUG_RETURN(false);
811 }
812 break;
813
814 case MYSQL_TYPE_GEOMETRY:
815 case MYSQL_TYPE_TIMESTAMP:
816 case MYSQL_TYPE_DATE:
817 case MYSQL_TYPE_TIME:
818 case MYSQL_TYPE_DATETIME:
819 case MYSQL_TYPE_YEAR:
820 case MYSQL_TYPE_NEWDATE:
821 case MYSQL_TYPE_NULL:
822 case MYSQL_TYPE_ENUM:
823 case MYSQL_TYPE_SET:
824 case MYSQL_TYPE_TIMESTAMP2:
825 case MYSQL_TYPE_DATETIME2:
826 case MYSQL_TYPE_TIME2:
827 DBUG_RETURN(false);
828 }
829 DBUG_RETURN(false); // To keep GCC happy
830}
831
832
833/**
834 Is the definition compatible with a table?
835
836 This function will compare the master table with an existing table
837 on the slave and see if they are compatible with respect to the
838 current settings of @c SLAVE_TYPE_CONVERSIONS.
839
840 If the tables are compatible and conversions are required, @c
841 *tmp_table_var will be set to a virtual temporary table with field
842 pointers for the fields that require conversions. This allow simple
843 checking of whether a conversion are to be applied or not.
844
845 If tables are compatible, but no conversions are necessary, @c
846 *tmp_table_var will be set to NULL.
847
848 @param rli_arg[in]
849 Relay log info, for error reporting.
850
851 @param table[in]
852 Table to compare with
853
854 @param tmp_table_var[out]
855 Virtual temporary table for performing conversions, if necessary.
856
857 @retval true Master table is compatible with slave table.
858 @retval false Master table is not compatible with slave table.
859*/
860bool
861table_def::compatible_with(THD *thd, rpl_group_info *rgi,
862 TABLE *table, TABLE **conv_table_var)
863 const
864{
865 /*
866 We only check the initial columns for the tables.
867 */
868 uint const cols_to_check= MY_MIN(table->s->fields, size());
869 Relay_log_info *rli= rgi->rli;
870 TABLE *tmp_table= NULL;
871
872 for (uint col= 0 ; col < cols_to_check ; ++col)
873 {
874 Field *const field= table->field[col];
875 int order;
876 if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
877 {
878 DBUG_PRINT("debug", ("Checking column %d -"
879 " field '%s' can be converted - order: %d",
880 col, field->field_name.str, order));
881 DBUG_ASSERT(order >= -1 && order <= 1);
882
883 /*
884 If order is not 0, a conversion is required, so we need to set
885 up the conversion table.
886 */
887 if (order != 0 && tmp_table == NULL)
888 {
889 /*
890 This will create the full table with all fields. This is
891 necessary to ge the correct field lengths for the record.
892 */
893 tmp_table= create_conversion_table(thd, rgi, table);
894 if (tmp_table == NULL)
895 return false;
896 /*
897 Clear all fields up to, but not including, this column.
898 */
899 for (unsigned int i= 0; i < col; ++i)
900 tmp_table->field[i]= NULL;
901 }
902
903 if (order == 0 && tmp_table != NULL)
904 tmp_table->field[col]= NULL;
905 }
906 else
907 {
908 DBUG_PRINT("debug", ("Checking column %d -"
909 " field '%s' can not be converted",
910 col, field->field_name.str));
911 DBUG_ASSERT(col < size() && col < table->s->fields);
912 DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
913 DBUG_ASSERT(table->in_use);
914 const char *db_name= table->s->db.str;
915 const char *tbl_name= table->s->table_name.str;
916 char source_buf[MAX_FIELD_WIDTH];
917 char target_buf[MAX_FIELD_WIDTH];
918 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
919 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
920 THD *thd= table->in_use;
921
922 show_sql_type(type(col), field_metadata(col), &source_type, field->charset());
923 field->sql_type(target_type);
924 rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, rgi->gtid_info(),
925 ER_THD(thd, ER_SLAVE_CONVERSION_FAILED),
926 col, db_name, tbl_name,
927 source_type.c_ptr_safe(), target_type.c_ptr_safe());
928 return false;
929 }
930 }
931
932#ifndef DBUG_OFF
933 if (tmp_table)
934 {
935 for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
936 if (tmp_table->field[col])
937 {
938 char source_buf[MAX_FIELD_WIDTH];
939 char target_buf[MAX_FIELD_WIDTH];
940 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
941 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
942 tmp_table->field[col]->sql_type(source_type);
943 table->field[col]->sql_type(target_type);
944 DBUG_PRINT("debug", ("Field %s - conversion required."
945 " Source type: '%s', Target type: '%s'",
946 tmp_table->field[col]->field_name.str,
947 source_type.c_ptr_safe(), target_type.c_ptr_safe()));
948 }
949 }
950#endif
951
952 *conv_table_var= tmp_table;
953 return true;
954}
955
956
957/**
958 A wrapper to Virtual_tmp_table, to get access to its constructor,
959 which is protected for safety purposes (against illegal use on stack).
960*/
961class Virtual_conversion_table: public Virtual_tmp_table
962{
963public:
964 Virtual_conversion_table(THD *thd) :Virtual_tmp_table(thd) { }
965 /**
966 Add a new field into the virtual table.
967 @param sql_type - The real_type of the field.
968 @param metadata - The RBR binary log metadata for this field.
969 @param target_field - The field from the target table, to get extra
970 attributes from (e.g. typelib in case of ENUM).
971 */
972 bool add(enum_field_types sql_type,
973 uint16 metadata, const Field *target_field)
974 {
975 const Type_handler *handler= Type_handler::get_handler_by_real_type(sql_type);
976 if (!handler)
977 {
978 sql_print_error("In RBR mode, Slave received unknown field type field %d "
979 " for column Name: %s.%s.%s.",
980 (int) sql_type,
981 target_field->table->s->db.str,
982 target_field->table->s->table_name.str,
983 target_field->field_name.str);
984 return true;
985 }
986 Field *tmp= handler->make_conversion_table_field(this, metadata,
987 target_field);
988 if (!tmp)
989 return true;
990 Virtual_tmp_table::add(tmp);
991 DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
992 " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
993 sql_type, target_field->field_name.str,
994 tmp->field_length, tmp->decimals(), TRUE,
995 tmp->flags, tmp->pack_length()));
996 return false;
997 }
998};
999
1000
1001/**
1002 Create a conversion table.
1003
1004 If the function is unable to create the conversion table, an error
1005 will be printed and NULL will be returned.
1006
1007 @return Pointer to conversion table, or NULL if unable to create
1008 conversion table.
1009 */
1010
1011TABLE *table_def::create_conversion_table(THD *thd, rpl_group_info *rgi,
1012 TABLE *target_table) const
1013{
1014 DBUG_ENTER("table_def::create_conversion_table");
1015
1016 Virtual_conversion_table *conv_table;
1017 Relay_log_info *rli= rgi->rli;
1018 /*
1019 At slave, columns may differ. So we should create
1020 MY_MIN(columns@master, columns@slave) columns in the
1021 conversion table.
1022 */
1023 uint const cols_to_create= MY_MIN(target_table->s->fields, size());
1024 if (!(conv_table= new(thd) Virtual_conversion_table(thd)) ||
1025 conv_table->init(cols_to_create))
1026 goto err;
1027 for (uint col= 0 ; col < cols_to_create; ++col)
1028 {
1029 if (conv_table->add(type(col), field_metadata(col),
1030 target_table->field[col]))
1031 {
1032 DBUG_PRINT("debug", ("binlog_type: %d, metadata: %04X, target_field: '%s'"
1033 " make_conversion_table_field() failed",
1034 binlog_type(col), field_metadata(col),
1035 target_table->field[col]->field_name.str));
1036 goto err;
1037 }
1038 }
1039
1040 if (conv_table->open())
1041 goto err; // Could not allocate record buffer?
1042
1043 DBUG_RETURN(conv_table);
1044
1045err:
1046 if (conv_table)
1047 delete conv_table;
1048 rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, rgi->gtid_info(),
1049 ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
1050 target_table->s->db.str,
1051 target_table->s->table_name.str);
1052 DBUG_RETURN(NULL);
1053}
1054#endif /* MYSQL_CLIENT */
1055
1056table_def::table_def(unsigned char *types, ulong size,
1057 uchar *field_metadata, int metadata_size,
1058 uchar *null_bitmap, uint16 flags)
1059 : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
1060 m_field_metadata(0), m_null_bits(0), m_flags(flags),
1061 m_memory(NULL)
1062{
1063 m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
1064 &m_type, size,
1065 &m_field_metadata,
1066 size * sizeof(uint16),
1067 &m_null_bits, (size + 7) / 8,
1068 NULL);
1069
1070 bzero(m_field_metadata, size * sizeof(uint16));
1071
1072 if (m_type)
1073 memcpy(m_type, types, size);
1074 else
1075 m_size= 0;
1076 /*
1077 Extract the data from the table map into the field metadata array
1078 iff there is field metadata. The variable metadata_size will be
1079 0 if we are replicating from an older version server since no field
1080 metadata was written to the table map. This can also happen if
1081 there were no fields in the master that needed extra metadata.
1082 */
1083 if (m_size && metadata_size)
1084 {
1085 int index= 0;
1086 for (unsigned int i= 0; i < m_size; i++)
1087 {
1088 switch (binlog_type(i)) {
1089 case MYSQL_TYPE_TINY_BLOB:
1090 case MYSQL_TYPE_BLOB:
1091 case MYSQL_TYPE_BLOB_COMPRESSED:
1092 case MYSQL_TYPE_MEDIUM_BLOB:
1093 case MYSQL_TYPE_LONG_BLOB:
1094 case MYSQL_TYPE_DOUBLE:
1095 case MYSQL_TYPE_FLOAT:
1096 case MYSQL_TYPE_GEOMETRY:
1097 {
1098 /*
1099 These types store a single byte.
1100 */
1101 m_field_metadata[i]= field_metadata[index];
1102 index++;
1103 break;
1104 }
1105 case MYSQL_TYPE_SET:
1106 case MYSQL_TYPE_ENUM:
1107 case MYSQL_TYPE_STRING:
1108 {
1109 uint16 x= field_metadata[index++] << 8U; // real_type
1110 x+= field_metadata[index++]; // pack or field length
1111 m_field_metadata[i]= x;
1112 break;
1113 }
1114 case MYSQL_TYPE_BIT:
1115 {
1116 uint16 x= field_metadata[index++];
1117 x = x + (field_metadata[index++] << 8U);
1118 m_field_metadata[i]= x;
1119 break;
1120 }
1121 case MYSQL_TYPE_VARCHAR:
1122 case MYSQL_TYPE_VARCHAR_COMPRESSED:
1123 {
1124 /*
1125 These types store two bytes.
1126 */
1127 char *ptr= (char *)&field_metadata[index];
1128 m_field_metadata[i]= uint2korr(ptr);
1129 index= index + 2;
1130 break;
1131 }
1132 case MYSQL_TYPE_NEWDECIMAL:
1133 {
1134 uint16 x= field_metadata[index++] << 8U; // precision
1135 x+= field_metadata[index++]; // decimals
1136 m_field_metadata[i]= x;
1137 break;
1138 }
1139 case MYSQL_TYPE_TIME2:
1140 case MYSQL_TYPE_DATETIME2:
1141 case MYSQL_TYPE_TIMESTAMP2:
1142 m_field_metadata[i]= field_metadata[index++];
1143 break;
1144 default:
1145 m_field_metadata[i]= 0;
1146 break;
1147 }
1148 }
1149 }
1150 if (m_size && null_bitmap)
1151 memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
1152}
1153
1154
1155table_def::~table_def()
1156{
1157 my_free(m_memory);
1158#ifndef DBUG_OFF
1159 m_type= 0;
1160 m_size= 0;
1161#endif
1162}
1163
1164
1165/**
1166 @param even_buf point to the buffer containing serialized event
1167 @param event_len length of the event accounting possible checksum alg
1168
1169 @return TRUE if test fails
1170 FALSE as success
1171*/
1172bool event_checksum_test(uchar *event_buf, ulong event_len, enum enum_binlog_checksum_alg alg)
1173{
1174 bool res= FALSE;
1175 uint16 flags= 0; // to store in FD's buffer flags orig value
1176
1177 if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
1178 {
1179 ha_checksum incoming;
1180 ha_checksum computed;
1181
1182 if (event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
1183 {
1184#ifdef DBUG_ASSERT_EXISTS
1185 int8 fd_alg= event_buf[event_len - BINLOG_CHECKSUM_LEN -
1186 BINLOG_CHECKSUM_ALG_DESC_LEN];
1187#endif
1188 /*
1189 FD event is checksummed and therefore verified w/o the binlog-in-use flag
1190 */
1191 flags= uint2korr(event_buf + FLAGS_OFFSET);
1192 if (flags & LOG_EVENT_BINLOG_IN_USE_F)
1193 event_buf[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1194 /*
1195 The only algorithm currently is CRC32. Zero indicates
1196 the binlog file is checksum-free *except* the FD-event.
1197 */
1198 DBUG_ASSERT(fd_alg == BINLOG_CHECKSUM_ALG_CRC32 || fd_alg == 0);
1199 DBUG_ASSERT(alg == BINLOG_CHECKSUM_ALG_CRC32);
1200 /*
1201 Complile time guard to watch over the max number of alg
1202 */
1203 compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
1204 }
1205 incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
1206 /* checksum the event content without the checksum part itself */
1207 computed= my_checksum(0, event_buf, event_len - BINLOG_CHECKSUM_LEN);
1208 if (flags != 0)
1209 {
1210 /* restoring the orig value of flags of FD */
1211 DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
1212 event_buf[FLAGS_OFFSET]= (uchar) flags;
1213 }
1214 res= DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, computed != incoming);
1215 }
1216 return res;
1217}
1218
1219#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1220
1221Deferred_log_events::Deferred_log_events(Relay_log_info *rli) : last_added(NULL)
1222{
1223 my_init_dynamic_array(&array, sizeof(Log_event *), 32, 16, MYF(0));
1224}
1225
1226Deferred_log_events::~Deferred_log_events()
1227{
1228 delete_dynamic(&array);
1229}
1230
1231int Deferred_log_events::add(Log_event *ev)
1232{
1233 last_added= ev;
1234 insert_dynamic(&array, (uchar*) &ev);
1235 return 0;
1236}
1237
1238bool Deferred_log_events::is_empty()
1239{
1240 return array.elements == 0;
1241}
1242
1243bool Deferred_log_events::execute(rpl_group_info *rgi)
1244{
1245 bool res= false;
1246 DBUG_ENTER("Deferred_log_events::execute");
1247 DBUG_ASSERT(rgi->deferred_events_collecting);
1248
1249 rgi->deferred_events_collecting= false;
1250 for (uint i= 0; !res && i < array.elements; i++)
1251 {
1252 Log_event *ev= (* (Log_event **)
1253 dynamic_array_ptr(&array, i));
1254 res= ev->apply_event(rgi);
1255 }
1256 rgi->deferred_events_collecting= true;
1257 DBUG_RETURN(res);
1258}
1259
1260void Deferred_log_events::rewind()
1261{
1262 /*
1263 Reset preceding Query log event events which execution was
1264 deferred because of slave side filtering.
1265 */
1266 if (!is_empty())
1267 {
1268 for (uint i= 0; i < array.elements; i++)
1269 {
1270 Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1271 delete ev;
1272 }
1273 last_added= NULL;
1274 if (array.elements > array.max_element)
1275 freeze_size(&array);
1276 reset_dynamic(&array);
1277 }
1278 last_added= NULL;
1279}
1280
1281#endif
1282