1/* Copyright (c) 2007, 2013, Oracle and/or its affiliates.
2 Copyright (c) 2008, 2014, SkySQL Ab.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17#include "mariadb.h"
18#include "sql_priv.h"
19#include "unireg.h"
20#include "rpl_rli.h"
21#include "rpl_record.h"
22#include "slave.h" // Need to pull in slave_print_msg
23#include "rpl_utility.h"
24#include "rpl_rli.h"
25
26/**
27 Pack a record of data for a table into a format suitable for
28 transfer via the binary log.
29
30 The format for a row in transfer with N fields is the following:
31
32 ceil(N/8) null bytes:
33 One null bit for every column *regardless of whether it can be
34 null or not*. This simplifies the decoding. Observe that the
35 number of null bits is equal to the number of set bits in the
36 @c cols bitmap. The number of null bytes is the smallest number
37 of bytes necessary to store the null bits.
38
39 Padding bits are 1.
40
41 N packets:
42 Each field is stored in packed format.
43
44
45 @param table Table describing the format of the record
46
47 @param cols Bitmap with a set bit for each column that should
48 be stored in the row
49
50 @param row_data Pointer to memory where row will be written
51
52 @param record Pointer to record that should be packed. It is
53 assumed that the pointer refers to either @c
54 record[0] or @c record[1], but no such check is
55 made since the code does not rely on that.
56
57 @return The number of bytes written at @c row_data.
58 */
59#if !defined(MYSQL_CLIENT)
60size_t
61pack_row(TABLE *table, MY_BITMAP const* cols,
62 uchar *row_data, const uchar *record)
63{
64 Field **p_field= table->field, *field;
65 int const null_byte_count= (bitmap_bits_set(cols) + 7) / 8;
66 uchar *pack_ptr = row_data + null_byte_count;
67 uchar *null_ptr = row_data;
68 my_ptrdiff_t const rec_offset= record - table->record[0];
69 my_ptrdiff_t const def_offset= table->s->default_values - table->record[0];
70
71 DBUG_ENTER("pack_row");
72
73 /*
74 We write the null bits and the packed records using one pass
75 through all the fields. The null bytes are written little-endian,
76 i.e., the first fields are in the first byte.
77 */
78 unsigned int null_bits= (1U << 8) - 1;
79 // Mask to mask out the correct but among the null bits
80 unsigned int null_mask= 1U;
81 for ( ; (field= *p_field) ; p_field++)
82 {
83 if (bitmap_is_set(cols, (uint)(p_field - table->field)))
84 {
85 my_ptrdiff_t offset;
86 if (field->is_null(rec_offset))
87 {
88 offset= def_offset;
89 null_bits |= null_mask;
90 }
91 else
92 {
93 offset= rec_offset;
94 null_bits &= ~null_mask;
95
96 /*
97 We only store the data of the field if it is non-null
98
99 For big-endian machines, we have to make sure that the
100 length is stored in little-endian format, since this is the
101 format used for the binlog.
102 */
103#ifndef DBUG_OFF
104 const uchar *old_pack_ptr= pack_ptr;
105#endif
106 pack_ptr= field->pack(pack_ptr, field->ptr + offset,
107 field->max_data_length());
108 DBUG_PRINT("debug", ("field: %s; real_type: %d, pack_ptr: %p;"
109 " pack_ptr':%p; bytes: %d",
110 field->field_name.str, field->real_type(),
111 old_pack_ptr,pack_ptr,
112 (int) (pack_ptr - old_pack_ptr)));
113 DBUG_DUMP("packed_data", old_pack_ptr, pack_ptr - old_pack_ptr);
114 }
115
116 null_mask <<= 1;
117 if ((null_mask & 0xFF) == 0)
118 {
119 DBUG_ASSERT(null_ptr < row_data + null_byte_count);
120 null_mask = 1U;
121 *null_ptr++ = null_bits;
122 null_bits= (1U << 8) - 1;
123 }
124 }
125 }
126
127 /*
128 Write the last (partial) byte, if there is one
129 */
130 if ((null_mask & 0xFF) > 1)
131 {
132 DBUG_ASSERT(null_ptr < row_data + null_byte_count);
133 *null_ptr++ = null_bits;
134 }
135
136 /*
137 The null pointer should now point to the first byte of the
138 packed data. If it doesn't, something is very wrong.
139 */
140 DBUG_ASSERT(null_ptr == row_data + null_byte_count);
141 DBUG_DUMP("row_data", row_data, pack_ptr - row_data);
142 DBUG_RETURN(static_cast<size_t>(pack_ptr - row_data));
143}
144#endif
145
146
147/**
148 Unpack a row into @c table->record[0].
149
150 The function will always unpack into the @c table->record[0]
151 record. This is because there are too many dependencies on where
152 the various member functions of Field and subclasses expect to
153 write.
154
155 The row is assumed to only consist of the fields for which the
156 corresponding bit in bitset @c cols is set; the other parts of the
157 record are left alone.
158
159 At most @c colcnt columns are read: if the table is larger than
160 that, the remaining fields are not filled in.
161
162 @note The relay log information can be NULL, which means that no
163 checking or comparison with the source table is done, simply
164 because it is not used. This feature is used by MySQL Backup to
165 unpack a row from from the backup image, but can be used for other
166 purposes as well.
167
168 @param rli Relay log info, which can be NULL
169 @param table Table to unpack into
170 @param colcnt Number of columns to read from record
171 @param row_data
172 Packed row data
173 @param cols Pointer to bitset describing columns to fill in
174 @param curr_row_end
175 Pointer to variable that will hold the value of the
176 one-after-end position for the current row
177 @param master_reclength
178 Pointer to variable that will be set to the length of the
179 record on the master side
180 @param row_end
181 Pointer to variable that will hold the value of the
182 end position for the data in the row event
183
184 @retval 0 No error
185
186 @retval HA_ERR_GENERIC
187 A generic, internal, error caused the unpacking to fail.
188 @retval HA_ERR_CORRUPT_EVENT
189 Found error when trying to unpack fields.
190 */
191#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
192int
193unpack_row(rpl_group_info *rgi,
194 TABLE *table, uint const colcnt,
195 uchar const *const row_data, MY_BITMAP const *cols,
196 uchar const **const current_row_end, ulong *const master_reclength,
197 uchar const *const row_end)
198{
199 int error;
200 DBUG_ENTER("unpack_row");
201 DBUG_ASSERT(row_data);
202 DBUG_ASSERT(table);
203 size_t const master_null_byte_count= (bitmap_bits_set(cols) + 7) / 8;
204
205 uchar const *null_ptr= row_data;
206 uchar const *pack_ptr= row_data + master_null_byte_count;
207
208 Field **const begin_ptr = table->field;
209 Field **field_ptr;
210 Field **const end_ptr= begin_ptr + colcnt;
211
212 if (bitmap_is_clear_all(cols))
213 {
214 /**
215 There was no data sent from the master, so there is
216 nothing to unpack.
217 */
218 *current_row_end= pack_ptr;
219 *master_reclength= 0;
220 DBUG_RETURN(0);
221 }
222 DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
223
224 // Mask to mask out the correct bit among the null bits
225 unsigned int null_mask= 1U;
226 // The "current" null bits
227 unsigned int null_bits= *null_ptr++;
228 uint i= 0;
229 table_def *tabledef= NULL;
230 TABLE *conv_table= NULL;
231 bool table_found= rgi && rgi->get_table_data(table, &tabledef, &conv_table);
232 DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p",
233 table_found, tabledef, conv_table));
234 DBUG_ASSERT(table_found);
235
236 /*
237 If rgi is NULL it means that there is no source table and that the
238 row shall just be unpacked without doing any checks. This feature
239 is used by MySQL Backup, but can be used for other purposes as
240 well.
241 */
242 if (rgi && !table_found)
243 DBUG_RETURN(HA_ERR_GENERIC);
244
245 for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
246 {
247 /*
248 If there is a conversion table, we pick up the field pointer to
249 the conversion table. If the conversion table or the field
250 pointer is NULL, no conversions are necessary.
251 */
252 Field *conv_field=
253 conv_table ? conv_table->field[field_ptr - begin_ptr] : NULL;
254 Field *const f=
255 conv_field ? conv_field : *field_ptr;
256 DBUG_PRINT("debug", ("Conversion %srequired for field '%s' (#%ld)",
257 conv_field ? "" : "not ",
258 (*field_ptr)->field_name.str,
259 (long) (field_ptr - begin_ptr)));
260 DBUG_ASSERT(f != NULL);
261
262 /*
263 No need to bother about columns that does not exist: they have
264 gotten default values when being emptied above.
265 */
266 if (bitmap_is_set(cols, (uint)(field_ptr - begin_ptr)))
267 {
268 if ((null_mask & 0xFF) == 0)
269 {
270 DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
271 null_mask= 1U;
272 null_bits= *null_ptr++;
273 }
274
275 DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set
276
277 if (null_bits & null_mask)
278 {
279 if (f->maybe_null())
280 {
281 DBUG_PRINT("debug", ("Was NULL; null mask: 0x%x; null bits: 0x%x",
282 null_mask, null_bits));
283 /**
284 Calling reset just in case one is unpacking on top a
285 record with data.
286
287 This could probably go into set_null() but doing so,
288 (i) triggers assertion in other parts of the code at
289 the moment; (ii) it would make us reset the field,
290 always when setting null, which right now doesn't seem
291 needed anywhere else except here.
292
293 TODO: maybe in the future we should consider moving
294 the reset to make it part of set_null. But then
295 the assertions triggered need to be
296 addressed/revisited.
297 */
298 f->reset();
299 f->set_null();
300 }
301 else
302 {
303 THD *thd= f->table->in_use;
304
305 f->set_default();
306 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
307 ER_BAD_NULL_ERROR,
308 ER_THD(thd, ER_BAD_NULL_ERROR),
309 f->field_name.str);
310 }
311 }
312 else
313 {
314 f->set_notnull();
315
316 /*
317 We only unpack the field if it was non-null.
318 Use the master's size information if available else call
319 normal unpack operation.
320 */
321 uint16 const metadata= tabledef->field_metadata(i);
322 uchar const *const old_pack_ptr= pack_ptr;
323
324 pack_ptr= f->unpack(f->ptr, pack_ptr, row_end, metadata);
325 DBUG_PRINT("debug", ("field: %s; metadata: 0x%x;"
326 " pack_ptr: %p; pack_ptr': %p; bytes: %d",
327 f->field_name.str, metadata,
328 old_pack_ptr, pack_ptr,
329 (int) (pack_ptr - old_pack_ptr)));
330 if (!pack_ptr)
331 {
332 if (WSREP_ON)
333 {
334 /*
335 Debug message to troubleshoot bug:
336 https://mariadb.atlassian.net/browse/MDEV-4404
337 Galera Node throws "Could not read field" error and drops out of cluster
338 */
339 WSREP_WARN("ROW event unpack field: %s metadata: 0x%x;"
340 " pack_ptr: %p; conv_table %p conv_field %p table %s"
341 " row_end: %p",
342 f->field_name.str, metadata,
343 old_pack_ptr, conv_table, conv_field,
344 (table_found) ? "found" : "not found", row_end
345 );
346 }
347
348 rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
349 rgi->gtid_info(),
350 "Could not read field '%s' of table '%s.%s'",
351 f->field_name.str, table->s->db.str,
352 table->s->table_name.str);
353 DBUG_RETURN(HA_ERR_CORRUPT_EVENT);
354 }
355 }
356
357 /*
358 If conv_field is set, then we are doing a conversion. In this
359 case, we have unpacked the master data to the conversion
360 table, so we need to copy the value stored in the conversion
361 table into the final table and do the conversion at the same time.
362 */
363 if (conv_field)
364 {
365 Copy_field copy;
366#ifndef DBUG_OFF
367 char source_buf[MAX_FIELD_WIDTH];
368 char value_buf[MAX_FIELD_WIDTH];
369 String source_type(source_buf, sizeof(source_buf), system_charset_info);
370 String value_string(value_buf, sizeof(value_buf), system_charset_info);
371 conv_field->sql_type(source_type);
372 conv_field->val_str(&value_string);
373 DBUG_PRINT("debug", ("Copying field '%s' of type '%s' with value '%s'",
374 (*field_ptr)->field_name.str,
375 source_type.c_ptr_safe(), value_string.c_ptr_safe()));
376#endif
377 copy.set(*field_ptr, f, TRUE);
378 (*copy.do_copy)(&copy);
379#ifndef DBUG_OFF
380 char target_buf[MAX_FIELD_WIDTH];
381 String target_type(target_buf, sizeof(target_buf), system_charset_info);
382 (*field_ptr)->sql_type(target_type);
383 (*field_ptr)->val_str(&value_string);
384 DBUG_PRINT("debug", ("Value of field '%s' of type '%s' is now '%s'",
385 (*field_ptr)->field_name.str,
386 target_type.c_ptr_safe(), value_string.c_ptr_safe()));
387#endif
388 }
389
390 null_mask <<= 1;
391 }
392 i++;
393 }
394
395 /*
396 throw away master's extra fields
397 */
398 uint max_cols= MY_MIN(tabledef->size(), cols->n_bits);
399 for (; i < max_cols; i++)
400 {
401 if (bitmap_is_set(cols, i))
402 {
403 if ((null_mask & 0xFF) == 0)
404 {
405 DBUG_ASSERT(null_ptr < row_data + master_null_byte_count);
406 null_mask= 1U;
407 null_bits= *null_ptr++;
408 }
409 DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set
410
411 if (!((null_bits & null_mask) && tabledef->maybe_null(i))) {
412 uint32 len= tabledef->calc_field_size(i, (uchar *) pack_ptr);
413 DBUG_DUMP("field_data", pack_ptr, len);
414 pack_ptr+= len;
415 }
416 null_mask <<= 1;
417 }
418 }
419
420 /*
421 Add Extra slave persistent columns
422 */
423 if (unlikely(error= fill_extra_persistent_columns(table, cols->n_bits)))
424 DBUG_RETURN(error);
425
426 /*
427 We should now have read all the null bytes, otherwise something is
428 really wrong.
429 */
430 DBUG_ASSERT(null_ptr == row_data + master_null_byte_count);
431
432 DBUG_DUMP("row_data", row_data, pack_ptr - row_data);
433
434 *current_row_end = pack_ptr;
435 if (master_reclength)
436 {
437 if (*field_ptr)
438 *master_reclength = (ulong)((*field_ptr)->ptr - table->record[0]);
439 else
440 *master_reclength = table->s->reclength;
441 }
442
443 DBUG_RETURN(0);
444}
445
446/**
447 Fills @c table->record[0] with default values.
448
449 First @c restore_record() is called to restore the default values for
450 record concerning the given table. Then, if @c check is true,
451 a check is performed to see if fields are have default value or can
452 be NULL. Otherwise error is reported.
453
454 @param table Table whose record[0] buffer is prepared.
455 @param skip Number of columns for which default/nullable check
456 should be skipped.
457 @param check Specifies if lack of default error needs checking.
458
459 @returns 0 on success or a handler level error code
460 */
461int prepare_record(TABLE *const table, const uint skip, const bool check)
462{
463 DBUG_ENTER("prepare_record");
464
465 restore_record(table, s->default_values);
466
467 /*
468 This skip should be revisited in 6.0, because in 6.0 RBR one
469 can have holes in the row (as the grain of the writeset is
470 the column and not the entire row).
471 */
472 if (skip >= table->s->fields || !check)
473 DBUG_RETURN(0);
474
475 /*
476 For fields the extra fields on the slave, we check if they have a default.
477 The check follows the same rules as the INSERT query without specifying an
478 explicit value for a field not having the explicit default
479 (@c check_that_all_fields_are_given_values()).
480 */
481 for (Field **field_ptr= table->field+skip; *field_ptr; ++field_ptr)
482 {
483 Field *const f= *field_ptr;
484 if ((f->flags & NO_DEFAULT_VALUE_FLAG) &&
485 (f->real_type() != MYSQL_TYPE_ENUM))
486 {
487 THD *thd= f->table->in_use;
488 f->set_default();
489 push_warning_printf(thd,
490 Sql_condition::WARN_LEVEL_WARN,
491 ER_NO_DEFAULT_FOR_FIELD,
492 ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
493 f->field_name.str);
494 }
495 }
496
497 DBUG_RETURN(0);
498}
499/**
500 Fills @c table->record[0] with computed values of extra persistent column which are present on slave but not on master.
501 @param table Table whose record[0] buffer is prepared.
502 @param master_cols No of columns on master
503 @returns 0 on success
504 */
505int fill_extra_persistent_columns(TABLE *table, int master_cols)
506{
507 int error= 0;
508 Field **vfield_ptr, *vfield;
509
510 if (!table->vfield)
511 return 0;
512 for (vfield_ptr= table->vfield; *vfield_ptr; ++vfield_ptr)
513 {
514 vfield= *vfield_ptr;
515 if (vfield->field_index >= master_cols && vfield->stored_in_db())
516 {
517 /*Set bitmap for writing*/
518 bitmap_set_bit(table->vcol_set, vfield->field_index);
519 error= vfield->vcol_info->expr->save_in_field(vfield,0);
520 bitmap_clear_bit(table->vcol_set, vfield->field_index);
521 }
522 }
523 return error;
524}
525#endif // HAVE_REPLICATION
526