1/* Copyright (c) 2007, 2018, Oracle and/or its affiliates.
2 Copyright (c) 2009, 2018, MariaDB
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17#include "mariadb.h"
18#include "sql_priv.h"
19#ifndef MYSQL_CLIENT
20#include "unireg.h"
21#endif
22#include "log_event.h"
23#ifndef MYSQL_CLIENT
24#include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
25#include "sql_base.h" // close_tables_for_reopen
26#include "key.h" // key_copy
27#include "lock.h" // mysql_unlock_tables
28#include "rpl_rli.h"
29#include "rpl_utility.h"
30#endif
31#include "log_event_old.h"
32#include "rpl_record_old.h"
33#include "transaction.h"
34
35#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
36
37// Old implementation of do_apply_event()
38int
39Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
40{
41 DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
42 int error= 0;
43 THD *ev_thd= ev->thd;
44 uchar const *row_start= ev->m_rows_buf;
45 const Relay_log_info *rli= rgi->rli;
46
47 /*
48 If m_table_id == ~0UL, then we have a dummy event that does not
49 contain any data. In that case, we just remove all tables in the
50 tables_to_lock list, close the thread tables, and return with
51 success.
52 */
53 if (ev->m_table_id == ~0UL)
54 {
55 /*
56 This one is supposed to be set: just an extra check so that
57 nothing strange has happened.
58 */
59 DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
60
61 rgi->slave_close_thread_tables(ev_thd);
62 ev_thd->clear_error();
63 DBUG_RETURN(0);
64 }
65
66 /*
67 'ev_thd' has been set by exec_relay_log_event(), just before calling
68 do_apply_event(). We still check here to prevent future coding
69 errors.
70 */
71 DBUG_ASSERT(rgi->thd == ev_thd);
72
73 /*
74 If there is no locks taken, this is the first binrow event seen
75 after the table map events. We should then lock all the tables
76 used in the transaction and proceed with execution of the actual
77 event.
78 */
79 if (!ev_thd->lock)
80 {
81 /*
82 Lock_tables() reads the contents of ev_thd->lex, so they must be
83 initialized.
84
85 We also call the THD::reset_for_next_command(), since this
86 is the logical start of the next "statement". Note that this
87 call might reset the value of current_stmt_binlog_format, so
88 we need to do any changes to that value after this function.
89 */
90 delete_explain_query(thd->lex);
91 lex_start(ev_thd);
92 ev_thd->reset_for_next_command();
93
94 /*
95 This is a row injection, so we flag the "statement" as
96 such. Note that this code is called both when the slave does row
97 injections and when the BINLOG statement is used to do row
98 injections.
99 */
100 ev_thd->lex->set_stmt_row_injection();
101
102 if (unlikely(open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)))
103 {
104 uint actual_error= ev_thd->get_stmt_da()->sql_errno();
105 if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
106 {
107 /*
108 Error reporting borrowed from Query_log_event with many excessive
109 simplifications (we don't honour --slave-skip-errors)
110 */
111 rli->report(ERROR_LEVEL, actual_error, NULL,
112 "Error '%s' on opening tables",
113 (actual_error ? ev_thd->get_stmt_da()->message() :
114 "unexpected success or fatal error"));
115 ev_thd->is_slave_error= 1;
116 }
117 rgi->slave_close_thread_tables(thd);
118 DBUG_RETURN(actual_error);
119 }
120
121 /*
122 When the open and locking succeeded, we check all tables to
123 ensure that they still have the correct type.
124 */
125
126 {
127 TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
128 for (uint i=0 ; table_list_ptr&& (i< rgi->tables_to_lock_count);
129 table_list_ptr= table_list_ptr->next_global, i++)
130 {
131 /*
132 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
133 function for the explanation of the below if condition
134 */
135 if (table_list_ptr->parent_l)
136 continue;
137 /*
138 We can use a down cast here since we know that every table added
139 to the tables_to_lock is a RPL_TABLE_LIST(or child table which is
140 skipped above).
141 */
142 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
143 DBUG_ASSERT(ptr->m_tabledef_valid);
144 TABLE *conv_table;
145 if (!ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
146 {
147 ev_thd->is_slave_error= 1;
148 rgi->slave_close_thread_tables(ev_thd);
149 DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
150 }
151 DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
152 " - conv_table: %p",
153 ptr->table->s->db.str,
154 ptr->table->s->table_name.str, conv_table));
155 ptr->m_conv_table= conv_table;
156 }
157 }
158
159 /*
160 ... and then we add all the tables to the table map and remove
161 them from tables to lock.
162
163 We also invalidate the query cache for all the tables, since
164 they will now be changed.
165
166 TODO [/Matz]: Maybe the query cache should not be invalidated
167 here? It might be that a table is not changed, even though it
168 was locked for the statement. We do know that each
169 Old_rows_log_event contain at least one row, so after processing one
170 Old_rows_log_event, we can invalidate the query cache for the
171 associated table.
172 */
173 TABLE_LIST *ptr= rgi->tables_to_lock;
174 for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
175 {
176 /*
177 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
178 function for the explanation of the below if condition
179 */
180 if (ptr->parent_l)
181 continue;
182 rgi->m_table_map.set_table(ptr->table_id, ptr->table);
183 }
184#ifdef HAVE_QUERY_CACHE
185 query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
186#endif
187 }
188
189 TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
190
191 if (table)
192 {
193 /*
194 table == NULL means that this table should not be replicated
195 (this was set up by Table_map_log_event::do_apply_event()
196 which tested replicate-* rules).
197 */
198
199 /*
200 It's not needed to set_time() but
201 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
202 much slave is behind
203 2) it will be needed when we allow replication from a table with no
204 TIMESTAMP column to a table with one.
205 So we call set_time(), like in SBR. Presently it changes nothing.
206 */
207 ev_thd->set_time(ev->when, ev->when_sec_part);
208 /*
209 There are a few flags that are replicated with each row event.
210 Make sure to set/clear them before executing the main body of
211 the event.
212 */
213 if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
214 ev_thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
215 else
216 ev_thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
217
218 if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
219 ev_thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
220 else
221 ev_thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
222 /* A small test to verify that objects have consistent types */
223 DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
224
225 table->rpl_write_set= table->write_set;
226
227 error= do_before_row_operations(table);
228 while (error == 0 && row_start < ev->m_rows_end)
229 {
230 uchar const *row_end= NULL;
231 if (unlikely((error= do_prepare_row(ev_thd, rgi, table, row_start,
232 &row_end))))
233 break; // We should perform the after-row operation even in
234 // the case of error
235
236 DBUG_ASSERT(row_end != NULL); // cannot happen
237 DBUG_ASSERT(row_end <= ev->m_rows_end);
238
239 /* in_use can have been set to NULL in close_tables_for_reopen */
240 THD* old_thd= table->in_use;
241 if (!table->in_use)
242 table->in_use= ev_thd;
243 error= do_exec_row(table);
244 table->in_use = old_thd;
245 switch (error)
246 {
247 /* Some recoverable errors */
248 case HA_ERR_RECORD_CHANGED:
249 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
250 tuple does not exist */
251 error= 0;
252 case 0:
253 break;
254
255 default:
256 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
257 "Error in %s event: row application failed. %s",
258 ev->get_type_str(),
259 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
260 thd->is_slave_error= 1;
261 break;
262 }
263
264 row_start= row_end;
265 }
266 DBUG_EXECUTE_IF("stop_slave_middle_group",
267 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
268 error= do_after_row_operations(table, error);
269 }
270
271 if (unlikely(error))
272 { /* error has occurred during the transaction */
273 rli->report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(), NULL,
274 "Error in %s event: error during transaction execution "
275 "on table %s.%s. %s",
276 ev->get_type_str(), table->s->db.str,
277 table->s->table_name.str,
278 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() : "");
279
280 /*
281 If one day we honour --skip-slave-errors in row-based replication, and
282 the error should be skipped, then we would clear mappings, rollback,
283 close tables, but the slave SQL thread would not stop and then may
284 assume the mapping is still available, the tables are still open...
285 So then we should clear mappings/rollback/close here only if this is a
286 STMT_END_F.
287 For now we code, knowing that error is not skippable and so slave SQL
288 thread is certainly going to stop.
289 rollback at the caller along with sbr.
290 */
291 ev_thd->reset_current_stmt_binlog_format_row();
292 rgi->cleanup_context(ev_thd, error);
293 ev_thd->is_slave_error= 1;
294 DBUG_RETURN(error);
295 }
296
297 DBUG_RETURN(0);
298}
299#endif
300
301
302#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
303
304/*
305 Check if there are more UNIQUE keys after the given key.
306*/
307static int
308last_uniq_key(TABLE *table, uint keyno)
309{
310 while (++keyno < table->s->keys)
311 if (table->key_info[keyno].flags & HA_NOSAME)
312 return 0;
313 return 1;
314}
315
316
317/*
318 Compares table->record[0] and table->record[1]
319
320 Returns TRUE if different.
321*/
322static bool record_compare(TABLE *table)
323{
324 bool result= FALSE;
325 if (table->s->blob_fields + table->s->varchar_fields == 0)
326 {
327 result= cmp_record(table,record[1]);
328 goto record_compare_exit;
329 }
330
331 /* Compare null bits */
332 if (memcmp(table->null_flags,
333 table->null_flags+table->s->rec_buff_length,
334 table->s->null_bytes))
335 {
336 result= TRUE; // Diff in NULL value
337 goto record_compare_exit;
338 }
339
340 /* Compare updated fields */
341 for (Field **ptr=table->field ; *ptr ; ptr++)
342 {
343 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
344 {
345 result= TRUE;
346 goto record_compare_exit;
347 }
348 }
349
350record_compare_exit:
351 return result;
352}
353
354
355/*
356 Copy "extra" columns from record[1] to record[0].
357
358 Copy the extra fields that are not present on the master but are
359 present on the slave from record[1] to record[0]. This is used
360 after fetching a record that are to be updated, either inside
361 replace_record() or as part of executing an update_row().
362 */
363static int
364copy_extra_record_fields(TABLE *table,
365 size_t master_reclength,
366 my_ptrdiff_t master_fields)
367{
368 DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
369 DBUG_PRINT("info", ("Copying to %p "
370 "from field %lu at offset %lu "
371 "to field %d at offset %lu",
372 table->record[0],
373 (ulong) master_fields, (ulong) master_reclength,
374 table->s->fields, table->s->reclength));
375 /*
376 Copying the extra fields of the slave that does not exist on
377 master into record[0] (which are basically the default values).
378 */
379
380 if (table->s->fields < (uint) master_fields)
381 DBUG_RETURN(0);
382
383 DBUG_ASSERT(master_reclength <= table->s->reclength);
384 if (master_reclength < table->s->reclength)
385 memcpy(table->record[0] + master_reclength,
386 table->record[1] + master_reclength,
387 table->s->reclength - master_reclength);
388
389 /*
390 Bit columns are special. We iterate over all the remaining
391 columns and copy the "extra" bits to the new record. This is
392 not a very good solution: it should be refactored on
393 opportunity.
394
395 REFACTORING SUGGESTION (Matz). Introduce a member function
396 similar to move_field_offset() called copy_field_offset() to
397 copy field values and implement it for all Field subclasses. Use
398 this function to copy data from the found record to the record
399 that are going to be inserted.
400
401 The copy_field_offset() function need to be a virtual function,
402 which in this case will prevent copying an entire range of
403 fields efficiently.
404 */
405 {
406 Field **field_ptr= table->field + master_fields;
407 for ( ; *field_ptr ; ++field_ptr)
408 {
409 /*
410 Set the null bit according to the values in record[1]
411 */
412 if ((*field_ptr)->maybe_null() &&
413 (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
414 (*field_ptr)->set_null();
415 else
416 (*field_ptr)->set_notnull();
417
418 /*
419 Do the extra work for special columns.
420 */
421 switch ((*field_ptr)->real_type())
422 {
423 default:
424 /* Nothing to do */
425 break;
426
427 case MYSQL_TYPE_BIT:
428 Field_bit *f= static_cast<Field_bit*>(*field_ptr);
429 if (f->bit_len > 0)
430 {
431 my_ptrdiff_t const offset= table->record[1] - table->record[0];
432 uchar const bits=
433 get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
434 set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
435 }
436 break;
437 }
438 }
439 }
440 DBUG_RETURN(0); // All OK
441}
442
443
444/*
445 Replace the provided record in the database.
446
447 SYNOPSIS
448 replace_record()
449 thd Thread context for writing the record.
450 table Table to which record should be written.
451 master_reclength
452 Offset to first column that is not present on the master,
453 alternatively the length of the record on the master
454 side.
455
456 RETURN VALUE
457 Error code on failure, 0 on success.
458
459 DESCRIPTION
460 Similar to how it is done in mysql_insert(), we first try to do
461 a ha_write_row() and of that fails due to duplicated keys (or
462 indices), we do an ha_update_row() or a ha_delete_row() instead.
463 */
464static int
465replace_record(THD *thd, TABLE *table,
466 ulong const master_reclength,
467 uint const master_fields)
468{
469 DBUG_ENTER("replace_record");
470 DBUG_ASSERT(table != NULL && thd != NULL);
471
472 int error;
473 int keynum;
474 auto_afree_ptr<char> key(NULL);
475
476#ifndef DBUG_OFF
477 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
478 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
479 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
480#endif
481
482 while (unlikely(error= table->file->ha_write_row(table->record[0])))
483 {
484 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
485 {
486 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
487 DBUG_RETURN(error);
488 }
489 if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
490 {
491 table->file->print_error(error, MYF(0));
492 /*
493 We failed to retrieve the duplicate key
494 - either because the error was not "duplicate key" error
495 - or because the information which key is not available
496 */
497 DBUG_RETURN(error);
498 }
499
500 /*
501 We need to retrieve the old row into record[1] to be able to
502 either update or delete the offending record. We either:
503
504 - use rnd_pos() with a row-id (available as dupp_row) to the
505 offending row, if that is possible (MyISAM and Blackhole), or else
506
507 - use index_read_idx() with the key that is duplicated, to
508 retrieve the offending row.
509 */
510 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
511 {
512 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
513 if (unlikely(error))
514 {
515 DBUG_PRINT("info",("rnd_pos() returns error %d",error));
516 table->file->print_error(error, MYF(0));
517 DBUG_RETURN(error);
518 }
519 }
520 else
521 {
522 if (unlikely(table->file->extra(HA_EXTRA_FLUSH_CACHE)))
523 {
524 DBUG_RETURN(my_errno);
525 }
526
527 if (key.get() == NULL)
528 {
529 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
530 if (unlikely(key.get() == NULL))
531 DBUG_RETURN(ENOMEM);
532 }
533
534 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
535 0);
536 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
537 (const uchar*)key.get(),
538 HA_WHOLE_KEY,
539 HA_READ_KEY_EXACT);
540 if (unlikely(error))
541 {
542 DBUG_PRINT("info", ("index_read_idx() returns error %d", error));
543 table->file->print_error(error, MYF(0));
544 DBUG_RETURN(error);
545 }
546 }
547
548 /*
549 Now, table->record[1] should contain the offending row. That
550 will enable us to update it or, alternatively, delete it (so
551 that we can insert the new row afterwards).
552
553 First we copy the columns into table->record[0] that are not
554 present on the master from table->record[1], if there are any.
555 */
556 copy_extra_record_fields(table, master_reclength, master_fields);
557
558 /*
559 REPLACE is defined as either INSERT or DELETE + INSERT. If
560 possible, we can replace it with an UPDATE, but that will not
561 work on InnoDB if FOREIGN KEY checks are necessary.
562
563 I (Matz) am not sure of the reason for the last_uniq_key()
564 check as, but I'm guessing that it's something along the
565 following lines.
566
567 Suppose that we got the duplicate key to be a key that is not
568 the last unique key for the table and we perform an update:
569 then there might be another key for which the unique check will
570 fail, so we're better off just deleting the row and inserting
571 the correct row.
572 */
573 if (last_uniq_key(table, keynum) &&
574 !table->file->referenced_by_foreign_key())
575 {
576 error=table->file->ha_update_row(table->record[1],
577 table->record[0]);
578 if (unlikely(error) && error != HA_ERR_RECORD_IS_THE_SAME)
579 table->file->print_error(error, MYF(0));
580 else
581 error= 0;
582 DBUG_RETURN(error);
583 }
584 else
585 {
586 if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
587 {
588 table->file->print_error(error, MYF(0));
589 DBUG_RETURN(error);
590 }
591 /* Will retry ha_write_row() with the offending row removed. */
592 }
593 }
594
595 DBUG_RETURN(error);
596}
597
598
599/**
600 Find the row given by 'key', if the table has keys, or else use a table scan
601 to find (and fetch) the row.
602
603 If the engine allows random access of the records, a combination of
604 position() and rnd_pos() will be used.
605
606 @param table Pointer to table to search
607 @param key Pointer to key to use for search, if table has key
608
609 @pre <code>table->record[0]</code> shall contain the row to locate
610 and <code>key</code> shall contain a key to use for searching, if
611 the engine has a key.
612
613 @post If the return value is zero, <code>table->record[1]</code>
614 will contain the fetched row and the internal "cursor" will refer to
615 the row. If the return value is non-zero,
616 <code>table->record[1]</code> is undefined. In either case,
617 <code>table->record[0]</code> is undefined.
618
619 @return Zero if the row was successfully fetched into
620 <code>table->record[1]</code>, error code otherwise.
621 */
622
623static int find_and_fetch_row(TABLE *table, uchar *key)
624{
625 DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
626 DBUG_PRINT("enter", ("table: %p, key: %p record: %p",
627 table, key, table->record[1]));
628
629 DBUG_ASSERT(table->in_use != NULL);
630
631 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
632
633 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
634 table->s->primary_key < MAX_KEY)
635 {
636 /*
637 Use a more efficient method to fetch the record given by
638 table->record[0] if the engine allows it. We first compute a
639 row reference using the position() member function (it will be
640 stored in table->file->ref) and the use rnd_pos() to position
641 the "cursor" (i.e., record[0] in this case) at the correct row.
642
643 TODO: Add a check that the correct record has been fetched by
644 comparing with the original record. Take into account that the
645 record on the master and slave can be of different
646 length. Something along these lines should work:
647
648 ADD>>> store_record(table,record[1]);
649 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
650 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
651 table->s->reclength) == 0);
652
653 */
654 table->file->position(table->record[0]);
655 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
656 /*
657 rnd_pos() returns the record in table->record[0], so we have to
658 move it to table->record[1].
659 */
660 memcpy(table->record[1], table->record[0], table->s->reclength);
661 DBUG_RETURN(error);
662 }
663
664 /* We need to retrieve all fields */
665 /* TODO: Move this out from this function to main loop */
666 table->use_all_columns();
667
668 if (table->s->keys > 0)
669 {
670 int error;
671 /* We have a key: search the table using the index */
672 if (!table->file->inited &&
673 unlikely(error= table->file->ha_index_init(0, FALSE)))
674 {
675 table->file->print_error(error, MYF(0));
676 DBUG_RETURN(error);
677 }
678
679 /*
680 Don't print debug messages when running valgrind since they can
681 trigger false warnings.
682 */
683#ifndef HAVE_valgrind
684 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
685 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
686#endif
687
688 /*
689 We need to set the null bytes to ensure that the filler bit are
690 all set when returning. There are storage engines that just set
691 the necessary bits on the bytes and don't set the filler bits
692 correctly.
693 */
694 my_ptrdiff_t const pos=
695 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
696 table->record[1][pos]= 0xFF;
697 if (unlikely((error= table->file->ha_index_read_map(table->record[1], key,
698 HA_WHOLE_KEY,
699 HA_READ_KEY_EXACT))))
700 {
701 table->file->print_error(error, MYF(0));
702 table->file->ha_index_end();
703 DBUG_RETURN(error);
704 }
705
706 /*
707 Don't print debug messages when running valgrind since they can
708 trigger false warnings.
709 */
710#ifndef HAVE_valgrind
711 DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
712 DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
713#endif
714 /*
715 Below is a minor "optimization". If the key (i.e., key number
716 0) has the HA_NOSAME flag set, we know that we have found the
717 correct record (since there can be no duplicates); otherwise, we
718 have to compare the record with the one found to see if it is
719 the correct one.
720
721 CAVEAT! This behaviour is essential for the replication of,
722 e.g., the mysql.proc table since the correct record *shall* be
723 found using the primary key *only*. There shall be no
724 comparison of non-PK columns to decide if the correct record is
725 found. I can see no scenario where it would be incorrect to
726 chose the row to change only using a PK or an UNNI.
727 */
728 if (table->key_info->flags & HA_NOSAME)
729 {
730 table->file->ha_index_end();
731 DBUG_RETURN(0);
732 }
733
734 while (record_compare(table))
735 {
736 int error;
737
738 while ((error= table->file->ha_index_next(table->record[1])))
739 {
740 table->file->print_error(error, MYF(0));
741 table->file->ha_index_end();
742 DBUG_RETURN(error);
743 }
744 }
745
746 /*
747 Have to restart the scan to be able to fetch the next row.
748 */
749 table->file->ha_index_end();
750 }
751 else
752 {
753 int restart_count= 0; // Number of times scanning has restarted from top
754 int error;
755
756 /* We don't have a key: search the table using rnd_next() */
757 if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
758 return error;
759
760 /* Continue until we find the right record or have made a full loop */
761 do
762 {
763 error= table->file->ha_rnd_next(table->record[1]);
764
765 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
766 DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
767
768 switch (error) {
769 case 0:
770 break;
771
772 case HA_ERR_END_OF_FILE:
773 if (++restart_count < 2)
774 {
775 int error2;
776 if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
777 DBUG_RETURN(error2);
778 }
779 break;
780
781 default:
782 table->file->print_error(error, MYF(0));
783 DBUG_PRINT("info", ("Record not found"));
784 (void) table->file->ha_rnd_end();
785 DBUG_RETURN(error);
786 }
787 }
788 while (restart_count < 2 && record_compare(table));
789
790 /*
791 Have to restart the scan to be able to fetch the next row.
792 */
793 DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
794 table->file->ha_rnd_end();
795
796 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
797 DBUG_RETURN(error);
798 }
799
800 DBUG_RETURN(0);
801}
802
803
804/**********************************************************
805 Row handling primitives for Write_rows_log_event_old
806 **********************************************************/
807
808int Write_rows_log_event_old::do_before_row_operations(TABLE *table)
809{
810 int error= 0;
811
812 /*
813 We are using REPLACE semantics and not INSERT IGNORE semantics
814 when writing rows, that is: new rows replace old rows. We need to
815 inform the storage engine that it should use this behaviour.
816 */
817
818 /* Tell the storage engine that we are using REPLACE semantics. */
819 thd->lex->duplicates= DUP_REPLACE;
820
821 thd->lex->sql_command= SQLCOM_REPLACE;
822 /*
823 Do not raise the error flag in case of hitting to an unique attribute
824 */
825 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
826 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
827 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
828 table->file->ha_start_bulk_insert(0);
829 return error;
830}
831
832
833int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
834{
835 int local_error= 0;
836 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
837 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
838 /*
839 reseting the extra with
840 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
841 fires bug#27077
842 todo: explain or fix
843 */
844 if (unlikely((local_error= table->file->ha_end_bulk_insert())))
845 {
846 table->file->print_error(local_error, MYF(0));
847 }
848 return error? error : local_error;
849}
850
851
852int
853Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
854 rpl_group_info *rgi,
855 TABLE *table,
856 uchar const *row_start,
857 uchar const **row_end)
858{
859 DBUG_ASSERT(table != NULL);
860 DBUG_ASSERT(row_start && row_end);
861
862 int error;
863 error= unpack_row_old(rgi,
864 table, m_width, table->record[0],
865 row_start, m_rows_end,
866 &m_cols, row_end, &m_master_reclength,
867 table->write_set, PRE_GA_WRITE_ROWS_EVENT);
868 bitmap_copy(table->read_set, table->write_set);
869 return error;
870}
871
872
873int Write_rows_log_event_old::do_exec_row(TABLE *table)
874{
875 DBUG_ASSERT(table != NULL);
876 int error= replace_record(thd, table, m_master_reclength, m_width);
877 return error;
878}
879
880
881/**********************************************************
882 Row handling primitives for Delete_rows_log_event_old
883 **********************************************************/
884
885int Delete_rows_log_event_old::do_before_row_operations(TABLE *table)
886{
887 DBUG_ASSERT(m_memory == NULL);
888
889 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
890 table->s->primary_key < MAX_KEY)
891 {
892 /*
893 We don't need to allocate any memory for m_after_image and
894 m_key since they are not used.
895 */
896 return 0;
897 }
898
899 int error= 0;
900
901 if (table->s->keys > 0)
902 {
903 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
904 &m_after_image,
905 (uint) table->s->reclength,
906 &m_key,
907 (uint) table->key_info->key_length,
908 NullS);
909 }
910 else
911 {
912 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
913 m_memory= (uchar*)m_after_image;
914 m_key= NULL;
915 }
916 if (!m_memory)
917 return HA_ERR_OUT_OF_MEM;
918
919 return error;
920}
921
922
923int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
924{
925 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
926 table->file->ha_index_or_rnd_end();
927 my_free(m_memory); // Free for multi_malloc
928 m_memory= NULL;
929 m_after_image= NULL;
930 m_key= NULL;
931
932 return error;
933}
934
935
936int
937Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
938 rpl_group_info *rgi,
939 TABLE *table,
940 uchar const *row_start,
941 uchar const **row_end)
942{
943 int error;
944 DBUG_ASSERT(row_start && row_end);
945 /*
946 This assertion actually checks that there is at least as many
947 columns on the slave as on the master.
948 */
949 DBUG_ASSERT(table->s->fields >= m_width);
950
951 error= unpack_row_old(rgi,
952 table, m_width, table->record[0],
953 row_start, m_rows_end,
954 &m_cols, row_end, &m_master_reclength,
955 table->read_set, PRE_GA_DELETE_ROWS_EVENT);
956 /*
957 If we will access rows using the random access method, m_key will
958 be set to NULL, so we do not need to make a key copy in that case.
959 */
960 if (m_key)
961 {
962 KEY *const key_info= table->key_info;
963
964 key_copy(m_key, table->record[0], key_info, 0);
965 }
966
967 return error;
968}
969
970
971int Delete_rows_log_event_old::do_exec_row(TABLE *table)
972{
973 int error;
974 DBUG_ASSERT(table != NULL);
975
976 if (likely(!(error= ::find_and_fetch_row(table, m_key))))
977 {
978 /*
979 Now we should have the right row to delete. We are using
980 record[0] since it is guaranteed to point to a record with the
981 correct value.
982 */
983 error= table->file->ha_delete_row(table->record[0]);
984 }
985 return error;
986}
987
988
989/**********************************************************
990 Row handling primitives for Update_rows_log_event_old
991 **********************************************************/
992
993int Update_rows_log_event_old::do_before_row_operations(TABLE *table)
994{
995 DBUG_ASSERT(m_memory == NULL);
996
997 int error= 0;
998
999 if (table->s->keys > 0)
1000 {
1001 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1002 &m_after_image,
1003 (uint) table->s->reclength,
1004 &m_key,
1005 (uint) table->key_info->key_length,
1006 NullS);
1007 }
1008 else
1009 {
1010 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1011 m_memory= m_after_image;
1012 m_key= NULL;
1013 }
1014 if (!m_memory)
1015 return HA_ERR_OUT_OF_MEM;
1016
1017 return error;
1018}
1019
1020
1021int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
1022{
1023 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
1024 table->file->ha_index_or_rnd_end();
1025 my_free(m_memory);
1026 m_memory= NULL;
1027 m_after_image= NULL;
1028 m_key= NULL;
1029
1030 return error;
1031}
1032
1033
1034int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1035 rpl_group_info *rgi,
1036 TABLE *table,
1037 uchar const *row_start,
1038 uchar const **row_end)
1039{
1040 int error;
1041 DBUG_ASSERT(row_start && row_end);
1042 /*
1043 This assertion actually checks that there is at least as many
1044 columns on the slave as on the master.
1045 */
1046 DBUG_ASSERT(table->s->fields >= m_width);
1047
1048 /* record[0] is the before image for the update */
1049 error= unpack_row_old(rgi,
1050 table, m_width, table->record[0],
1051 row_start, m_rows_end,
1052 &m_cols, row_end, &m_master_reclength,
1053 table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1054 row_start = *row_end;
1055 /* m_after_image is the after image for the update */
1056 error= unpack_row_old(rgi,
1057 table, m_width, m_after_image,
1058 row_start, m_rows_end,
1059 &m_cols, row_end, &m_master_reclength,
1060 table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1061
1062 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1063 DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
1064
1065 /*
1066 If we will access rows using the random access method, m_key will
1067 be set to NULL, so we do not need to make a key copy in that case.
1068 */
1069 if (m_key)
1070 {
1071 KEY *const key_info= table->key_info;
1072
1073 key_copy(m_key, table->record[0], key_info, 0);
1074 }
1075
1076 return error;
1077}
1078
1079
1080int Update_rows_log_event_old::do_exec_row(TABLE *table)
1081{
1082 DBUG_ASSERT(table != NULL);
1083
1084 int error= ::find_and_fetch_row(table, m_key);
1085 if (unlikely(error))
1086 return error;
1087
1088 /*
1089 We have to ensure that the new record (i.e., the after image) is
1090 in record[0] and the old record (i.e., the before image) is in
1091 record[1]. This since some storage engines require this (for
1092 example, the partition engine).
1093
1094 Since find_and_fetch_row() puts the fetched record (i.e., the old
1095 record) in record[1], we can keep it there. We put the new record
1096 (i.e., the after image) into record[0], and copy the fields that
1097 are on the slave (i.e., in record[1]) into record[0], effectively
1098 overwriting the default values that where put there by the
1099 unpack_row() function.
1100 */
1101 memcpy(table->record[0], m_after_image, table->s->reclength);
1102 copy_extra_record_fields(table, m_master_reclength, m_width);
1103
1104 /*
1105 Now we have the right row to update. The old row (the one we're
1106 looking for) is in record[1] and the new row has is in record[0].
1107 We also have copied the original values already in the slave's
1108 database into the after image delivered from the master.
1109 */
1110 error= table->file->ha_update_row(table->record[1], table->record[0]);
1111 if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
1112 error= 0;
1113
1114 return error;
1115}
1116
1117#endif
1118
1119
1120/**************************************************************************
1121 Rows_log_event member functions
1122**************************************************************************/
1123
1124#ifndef MYSQL_CLIENT
1125Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
1126 MY_BITMAP const *cols,
1127 bool is_transactional)
1128 : Log_event(thd_arg, 0, is_transactional),
1129 m_row_count(0),
1130 m_table(tbl_arg),
1131 m_table_id(tid),
1132 m_width(tbl_arg ? tbl_arg->s->fields : 1),
1133 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1134#ifdef HAVE_REPLICATION
1135 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1136#endif
1137{
1138
1139 // This constructor should not be reached.
1140 assert(0);
1141
1142 /*
1143 We allow a special form of dummy event when the table, and cols
1144 are null and the table id is ~0UL. This is a temporary
1145 solution, to be able to terminate a started statement in the
1146 binary log: the extraneous events will be removed in the future.
1147 */
1148 DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1149 (!tbl_arg && !cols && tid == ~0UL));
1150
1151 if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
1152 set_flags(NO_FOREIGN_KEY_CHECKS_F);
1153 if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
1154 set_flags(RELAXED_UNIQUE_CHECKS_F);
1155 /* if my_bitmap_init fails, caught in is_valid() */
1156 if (likely(!my_bitmap_init(&m_cols,
1157 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1158 m_width,
1159 false)))
1160 {
1161 /* Cols can be zero if this is a dummy binrows event */
1162 if (likely(cols != NULL))
1163 {
1164 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1165 create_last_word_mask(&m_cols);
1166 }
1167 }
1168 else
1169 {
1170 // Needed because my_bitmap_init() does not set it to null on failure
1171 m_cols.bitmap= 0;
1172 }
1173}
1174#endif
1175
1176
1177Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len,
1178 Log_event_type event_type,
1179 const Format_description_log_event
1180 *description_event)
1181 : Log_event(buf, description_event),
1182 m_row_count(0),
1183#ifndef MYSQL_CLIENT
1184 m_table(NULL),
1185#endif
1186 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1187#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1188 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1189#endif
1190{
1191 DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1192 uint8 const common_header_len= description_event->common_header_len;
1193 uint8 const post_header_len= description_event->post_header_len[event_type-1];
1194
1195 DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
1196 "post_header_len: %d",
1197 event_len, common_header_len,
1198 post_header_len));
1199
1200 const char *post_start= buf + common_header_len;
1201 DBUG_DUMP("post_header", (uchar*) post_start, post_header_len);
1202 post_start+= RW_MAPID_OFFSET;
1203 if (post_header_len == 6)
1204 {
1205 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
1206 m_table_id= uint4korr(post_start);
1207 post_start+= 4;
1208 }
1209 else
1210 {
1211 m_table_id= (ulong) uint6korr(post_start);
1212 post_start+= RW_FLAGS_OFFSET;
1213 }
1214
1215 m_flags= uint2korr(post_start);
1216
1217 uchar const *const var_start=
1218 (const uchar *)buf + common_header_len + post_header_len;
1219 uchar const *const ptr_width= var_start;
1220 uchar *ptr_after_width= (uchar*) ptr_width;
1221 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1222 m_width = net_field_length(&ptr_after_width);
1223 DBUG_PRINT("debug", ("m_width=%lu", m_width));
1224 /* Avoid reading out of buffer */
1225 if (ptr_after_width + m_width > (uchar *)buf + event_len)
1226 {
1227 m_cols.bitmap= NULL;
1228 DBUG_VOID_RETURN;
1229 }
1230
1231 /* if my_bitmap_init fails, catched in is_valid() */
1232 if (likely(!my_bitmap_init(&m_cols,
1233 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1234 m_width,
1235 false)))
1236 {
1237 DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
1238 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1239 create_last_word_mask(&m_cols);
1240 ptr_after_width+= (m_width + 7) / 8;
1241 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1242 }
1243 else
1244 {
1245 // Needed because my_bitmap_init() does not set it to null on failure
1246 m_cols.bitmap= NULL;
1247 DBUG_VOID_RETURN;
1248 }
1249
1250 const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
1251 size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
1252 DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %zu",
1253 m_table_id, m_flags, m_width, data_size));
1254 DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size);
1255
1256 m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1257 if (likely((bool)m_rows_buf))
1258 {
1259#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1260 m_curr_row= m_rows_buf;
1261#endif
1262 m_rows_end= m_rows_buf + data_size;
1263 m_rows_cur= m_rows_end;
1264 memcpy(m_rows_buf, ptr_rows_data, data_size);
1265 }
1266 else
1267 m_cols.bitmap= 0; // to not free it
1268
1269 DBUG_VOID_RETURN;
1270}
1271
1272
1273Old_rows_log_event::~Old_rows_log_event()
1274{
1275 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
1276 m_cols.bitmap= 0; // so no my_free in my_bitmap_free
1277 my_bitmap_free(&m_cols); // To pair with my_bitmap_init().
1278 my_free(m_rows_buf);
1279}
1280
1281
1282int Old_rows_log_event::get_data_size()
1283{
1284 uchar buf[MAX_INT_WIDTH];
1285 uchar *end= net_store_length(buf, (m_width + 7) / 8);
1286
1287 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1288 return (int)(6 + no_bytes_in_map(&m_cols) + (end - buf) +
1289 m_rows_cur - m_rows_buf););
1290 int data_size= ROWS_HEADER_LEN;
1291 data_size+= no_bytes_in_map(&m_cols);
1292 data_size+= (uint) (end - buf);
1293
1294 data_size+= (uint) (m_rows_cur - m_rows_buf);
1295 return data_size;
1296}
1297
1298
1299#ifndef MYSQL_CLIENT
1300int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
1301{
1302 /*
1303 When the table has a primary key, we would probably want, by default, to
1304 log only the primary key value instead of the entire "before image". This
1305 would save binlog space. TODO
1306 */
1307 DBUG_ENTER("Old_rows_log_event::do_add_row_data");
1308 DBUG_PRINT("enter", ("row_data: %p length: %zu",row_data,
1309 length));
1310 /*
1311 Don't print debug messages when running valgrind since they can
1312 trigger false warnings.
1313 */
1314#ifndef HAVE_valgrind
1315 DBUG_DUMP("row_data", row_data, MY_MIN(length, 32));
1316#endif
1317
1318 DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1319 DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1320 DBUG_ASSERT(m_rows_cur <= m_rows_end);
1321
1322 /* The cast will always work since m_rows_cur <= m_rows_end */
1323 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1324 {
1325 size_t const block_size= 1024;
1326 my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
1327 my_ptrdiff_t const new_alloc=
1328 block_size * ((cur_size + length + block_size - 1) / block_size);
1329
1330 uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1331 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1332 if (unlikely(!new_buf))
1333 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1334
1335 /* If the memory moved, we need to move the pointers */
1336 if (new_buf != m_rows_buf)
1337 {
1338 m_rows_buf= new_buf;
1339 m_rows_cur= m_rows_buf + cur_size;
1340 }
1341
1342 /*
1343 The end pointer should always be changed to point to the end of
1344 the allocated memory.
1345 */
1346 m_rows_end= m_rows_buf + new_alloc;
1347 }
1348
1349 DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1350 memcpy(m_rows_cur, row_data, length);
1351 m_rows_cur+= length;
1352 m_row_count++;
1353 DBUG_RETURN(0);
1354}
1355#endif
1356
1357
1358#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1359int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
1360{
1361 DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
1362 int error= 0;
1363 Relay_log_info const *rli= rgi->rli;
1364
1365 /*
1366 If m_table_id == ~0UL, then we have a dummy event that does not
1367 contain any data. In that case, we just remove all tables in the
1368 tables_to_lock list, close the thread tables, and return with
1369 success.
1370 */
1371 if (m_table_id == ~0UL)
1372 {
1373 /*
1374 This one is supposed to be set: just an extra check so that
1375 nothing strange has happened.
1376 */
1377 DBUG_ASSERT(get_flags(STMT_END_F));
1378
1379 rgi->slave_close_thread_tables(thd);
1380 thd->clear_error();
1381 DBUG_RETURN(0);
1382 }
1383
1384 /*
1385 'thd' has been set by exec_relay_log_event(), just before calling
1386 do_apply_event(). We still check here to prevent future coding
1387 errors.
1388 */
1389 DBUG_ASSERT(rgi->thd == thd);
1390
1391 /*
1392 If there is no locks taken, this is the first binrow event seen
1393 after the table map events. We should then lock all the tables
1394 used in the transaction and proceed with execution of the actual
1395 event.
1396 */
1397 if (!thd->lock)
1398 {
1399 /*
1400 lock_tables() reads the contents of thd->lex, so they must be
1401 initialized. Contrary to in
1402 Table_map_log_event::do_apply_event() we don't call
1403 mysql_init_query() as that may reset the binlog format.
1404 */
1405 lex_start(thd);
1406
1407 if (unlikely((error= lock_tables(thd, rgi->tables_to_lock,
1408 rgi->tables_to_lock_count, 0))))
1409 {
1410 if (thd->is_slave_error || thd->is_fatal_error)
1411 {
1412 /*
1413 Error reporting borrowed from Query_log_event with many excessive
1414 simplifications (we don't honour --slave-skip-errors)
1415 */
1416 uint actual_error= thd->net.last_errno;
1417 rli->report(ERROR_LEVEL, actual_error, NULL,
1418 "Error '%s' in %s event: when locking tables",
1419 (actual_error ? thd->net.last_error :
1420 "unexpected success or fatal error"),
1421 get_type_str());
1422 thd->is_fatal_error= 1;
1423 }
1424 else
1425 {
1426 rli->report(ERROR_LEVEL, error, NULL,
1427 "Error in %s event: when locking tables",
1428 get_type_str());
1429 }
1430 rgi->slave_close_thread_tables(thd);
1431 DBUG_RETURN(error);
1432 }
1433
1434 /*
1435 When the open and locking succeeded, we check all tables to
1436 ensure that they still have the correct type.
1437 */
1438
1439 {
1440 TABLE_LIST *table_list_ptr= rgi->tables_to_lock;
1441 for (uint i=0; table_list_ptr&& (i< rgi->tables_to_lock_count);
1442 table_list_ptr= static_cast<RPL_TABLE_LIST*>(table_list_ptr->next_global), i++)
1443 {
1444 /*
1445 Please see comment in log_event.cc-Rows_log_event::do_apply_event()
1446 function for the explanation of the below if condition
1447 */
1448 if (table_list_ptr->parent_l)
1449 continue;
1450 /*
1451 We can use a down cast here since we know that every table added
1452 to the tables_to_lock is a RPL_TABLE_LIST (or child table which is
1453 skipped above).
1454 */
1455 RPL_TABLE_LIST *ptr=static_cast<RPL_TABLE_LIST*>(table_list_ptr);
1456 TABLE *conv_table;
1457 if (ptr->m_tabledef.compatible_with(thd, rgi, ptr->table, &conv_table))
1458 {
1459 thd->is_slave_error= 1;
1460 rgi->slave_close_thread_tables(thd);
1461 DBUG_RETURN(ERR_BAD_TABLE_DEF);
1462 }
1463 ptr->m_conv_table= conv_table;
1464 }
1465 }
1466
1467 /*
1468 ... and then we add all the tables to the table map but keep
1469 them in the tables to lock list.
1470
1471
1472 We also invalidate the query cache for all the tables, since
1473 they will now be changed.
1474
1475 TODO [/Matz]: Maybe the query cache should not be invalidated
1476 here? It might be that a table is not changed, even though it
1477 was locked for the statement. We do know that each
1478 Old_rows_log_event contain at least one row, so after processing one
1479 Old_rows_log_event, we can invalidate the query cache for the
1480 associated table.
1481 */
1482 for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
1483 {
1484 rgi->m_table_map.set_table(ptr->table_id, ptr->table);
1485 }
1486#ifdef HAVE_QUERY_CACHE
1487 query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
1488#endif
1489 }
1490
1491 TABLE*
1492 table=
1493 m_table= rgi->m_table_map.get_table(m_table_id);
1494
1495 if (table)
1496 {
1497 /*
1498 table == NULL means that this table should not be replicated
1499 (this was set up by Table_map_log_event::do_apply_event()
1500 which tested replicate-* rules).
1501 */
1502
1503 /*
1504 It's not needed to set_time() but
1505 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
1506 much slave is behind
1507 2) it will be needed when we allow replication from a table with no
1508 TIMESTAMP column to a table with one.
1509 So we call set_time(), like in SBR. Presently it changes nothing.
1510 */
1511 thd->set_time(when, when_sec_part);
1512 /*
1513 There are a few flags that are replicated with each row event.
1514 Make sure to set/clear them before executing the main body of
1515 the event.
1516 */
1517 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1518 thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
1519 else
1520 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1521
1522 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1523 thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
1524 else
1525 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1526 /* A small test to verify that objects have consistent types */
1527 DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
1528
1529 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1530 set_flags(COMPLETE_ROWS_F);
1531
1532 /*
1533 Set tables write and read sets.
1534
1535 Read_set contains all slave columns (in case we are going to fetch
1536 a complete record from slave)
1537
1538 Write_set equals the m_cols bitmap sent from master but it can be
1539 longer if slave has extra columns.
1540 */
1541
1542 DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
1543
1544 bitmap_set_all(table->read_set);
1545 bitmap_set_all(table->write_set);
1546 if (!get_flags(COMPLETE_ROWS_F))
1547 bitmap_intersect(table->write_set,&m_cols);
1548 table->rpl_write_set= table->write_set;
1549
1550 // Do event specific preparations
1551
1552 error= do_before_row_operations(rli);
1553
1554 // row processing loop
1555
1556 while (error == 0 && m_curr_row < m_rows_end)
1557 {
1558 /* in_use can have been set to NULL in close_tables_for_reopen */
1559 THD* old_thd= table->in_use;
1560 if (!table->in_use)
1561 table->in_use= thd;
1562
1563 error= do_exec_row(rgi);
1564
1565 DBUG_PRINT("info", ("error: %d", error));
1566 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1567
1568 table->in_use = old_thd;
1569 switch (error)
1570 {
1571 case 0:
1572 break;
1573
1574 /* Some recoverable errors */
1575 case HA_ERR_RECORD_CHANGED:
1576 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
1577 tuple does not exist */
1578 error= 0;
1579 break;
1580
1581 default:
1582 rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1583 "Error in %s event: row application failed. %s",
1584 get_type_str(), thd->net.last_error);
1585 thd->is_slave_error= 1;
1586 break;
1587 }
1588
1589 /*
1590 If m_curr_row_end was not set during event execution (e.g., because
1591 of errors) we can't proceed to the next row. If the error is transient
1592 (i.e., error==0 at this point) we must call unpack_current_row() to set
1593 m_curr_row_end.
1594 */
1595
1596 DBUG_PRINT("info", ("error: %d", error));
1597 DBUG_PRINT("info", ("curr_row: %p; curr_row_end:%p; rows_end: %p",
1598 m_curr_row, m_curr_row_end, m_rows_end));
1599
1600 if (!m_curr_row_end && likely(!error))
1601 unpack_current_row(rgi);
1602
1603 // at this moment m_curr_row_end should be set
1604 DBUG_ASSERT(error || m_curr_row_end != NULL);
1605 DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1606 DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1607
1608 m_curr_row= m_curr_row_end;
1609
1610 } // row processing loop
1611
1612 DBUG_EXECUTE_IF("stop_slave_middle_group",
1613 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1614 error= do_after_row_operations(rli, error);
1615 } // if (table)
1616
1617 if (unlikely(error))
1618 { /* error has occurred during the transaction */
1619 rli->report(ERROR_LEVEL, thd->net.last_errno, NULL,
1620 "Error in %s event: error during transaction execution "
1621 "on table %s.%s. %s",
1622 get_type_str(), table->s->db.str,
1623 table->s->table_name.str,
1624 thd->net.last_error);
1625
1626 /*
1627 If one day we honour --skip-slave-errors in row-based replication, and
1628 the error should be skipped, then we would clear mappings, rollback,
1629 close tables, but the slave SQL thread would not stop and then may
1630 assume the mapping is still available, the tables are still open...
1631 So then we should clear mappings/rollback/close here only if this is a
1632 STMT_END_F.
1633 For now we code, knowing that error is not skippable and so slave SQL
1634 thread is certainly going to stop.
1635 rollback at the caller along with sbr.
1636 */
1637 thd->reset_current_stmt_binlog_format_row();
1638 rgi->cleanup_context(thd, error);
1639 thd->is_slave_error= 1;
1640 DBUG_RETURN(error);
1641 }
1642
1643 /*
1644 This code would ideally be placed in do_update_pos() instead, but
1645 since we have no access to table there, we do the setting of
1646 last_event_start_time here instead.
1647 */
1648 if (table && (table->s->primary_key == MAX_KEY) &&
1649 !use_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1650 {
1651 /*
1652 ------------ Temporary fix until WL#2975 is implemented ---------
1653
1654 This event is not the last one (no STMT_END_F). If we stop now
1655 (in case of terminate_slave_thread()), how will we restart? We
1656 have to restart from Table_map_log_event, but as this table is
1657 not transactional, the rows already inserted will still be
1658 present, and idempotency is not guaranteed (no PK) so we risk
1659 that repeating leads to double insert. So we desperately try to
1660 continue, hope we'll eventually leave this buggy situation (by
1661 executing the final Old_rows_log_event). If we are in a hopeless
1662 wait (reached end of last relay log and nothing gets appended
1663 there), we timeout after one minute, and notify DBA about the
1664 problem. When WL#2975 is implemented, just remove the member
1665 Relay_log_info::last_event_start_time and all its occurrences.
1666 */
1667 rgi->last_event_start_time= my_time(0);
1668 }
1669
1670 if (get_flags(STMT_END_F))
1671 {
1672 /*
1673 This is the end of a statement or transaction, so close (and
1674 unlock) the tables we opened when processing the
1675 Table_map_log_event starting the statement.
1676
1677 OBSERVER. This will clear *all* mappings, not only those that
1678 are open for the table. There is not good handle for on-close
1679 actions for tables.
1680
1681 NOTE. Even if we have no table ('table' == 0) we still need to be
1682 here, so that we increase the group relay log position. If we didn't, we
1683 could have a group relay log position which lags behind "forever"
1684 (assume the last master's transaction is ignored by the slave because of
1685 replicate-ignore rules).
1686 */
1687 int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1688
1689 /*
1690 If this event is not in a transaction, the call below will, if some
1691 transactional storage engines are involved, commit the statement into
1692 them and flush the pending event to binlog.
1693 If this event is in a transaction, the call will do nothing, but a
1694 Xid_log_event will come next which will, if some transactional engines
1695 are involved, commit the transaction and flush the pending event to the
1696 binlog.
1697 If there was a deadlock the transaction should have been rolled back
1698 already. So there should be no need to rollback the transaction.
1699 */
1700 DBUG_ASSERT(! thd->transaction_rollback_request);
1701 if (unlikely((error= (binlog_error ?
1702 trans_rollback_stmt(thd) :
1703 trans_commit_stmt(thd)))))
1704 rli->report(ERROR_LEVEL, error, NULL,
1705 "Error in %s event: commit of row events failed, "
1706 "table `%s`.`%s`",
1707 get_type_str(), m_table->s->db.str,
1708 m_table->s->table_name.str);
1709 error|= binlog_error;
1710
1711 /*
1712 Now what if this is not a transactional engine? we still need to
1713 flush the pending event to the binlog; we did it with
1714 thd->binlog_flush_pending_rows_event(). Note that we imitate
1715 what is done for real queries: a call to
1716 ha_autocommit_or_rollback() (sometimes only if involves a
1717 transactional engine), and a call to be sure to have the pending
1718 event flushed.
1719 */
1720
1721 thd->reset_current_stmt_binlog_format_row();
1722 rgi->cleanup_context(thd, 0);
1723 }
1724
1725 DBUG_RETURN(error);
1726}
1727
1728
1729Log_event::enum_skip_reason
1730Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
1731{
1732 /*
1733 If the slave skip counter is 1 and this event does not end a
1734 statement, then we should not start executing on the next event.
1735 Otherwise, we defer the decision to the normal skipping logic.
1736 */
1737 if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1738 return Log_event::EVENT_SKIP_IGNORE;
1739 else
1740 return Log_event::do_shall_skip(rgi);
1741}
1742
1743int
1744Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
1745{
1746 Relay_log_info *rli= rgi->rli;
1747 int error= 0;
1748 DBUG_ENTER("Old_rows_log_event::do_update_pos");
1749
1750 DBUG_PRINT("info", ("flags: %s",
1751 get_flags(STMT_END_F) ? "STMT_END_F " : ""));
1752
1753 if (get_flags(STMT_END_F))
1754 {
1755 /*
1756 Indicate that a statement is finished.
1757 Step the group log position if we are not in a transaction,
1758 otherwise increase the event log position.
1759 */
1760 error= rli->stmt_done(log_pos, thd, rgi);
1761 /*
1762 Clear any errors in thd->net.last_err*. It is not known if this is
1763 needed or not. It is believed that any errors that may exist in
1764 thd->net.last_err* are allowed. Examples of errors are "key not
1765 found", which is produced in the test case rpl_row_conflicts.test
1766 */
1767 thd->clear_error();
1768 }
1769 else
1770 {
1771 rgi->inc_event_relay_log_pos();
1772 }
1773
1774 DBUG_RETURN(error);
1775}
1776
1777#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
1778
1779
1780#ifndef MYSQL_CLIENT
1781bool Old_rows_log_event::write_data_header()
1782{
1783 uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
1784
1785 // This method should not be reached.
1786 assert(0);
1787
1788 DBUG_ASSERT(m_table_id != ~0UL);
1789 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
1790 {
1791 int4store(buf + 0, m_table_id);
1792 int2store(buf + 4, m_flags);
1793 return write_data(buf, 6);
1794 });
1795 int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
1796 int2store(buf + RW_FLAGS_OFFSET, m_flags);
1797 return write_data(buf, ROWS_HEADER_LEN);
1798}
1799
1800
1801bool Old_rows_log_event::write_data_body()
1802{
1803 /*
1804 Note that this should be the number of *bits*, not the number of
1805 bytes.
1806 */
1807 uchar sbuf[MAX_INT_WIDTH];
1808 my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
1809
1810 // This method should not be reached.
1811 assert(0);
1812
1813 bool res= false;
1814 uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
1815 DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
1816
1817 DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
1818 res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
1819
1820 DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1821 res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
1822 DBUG_DUMP("rows", m_rows_buf, data_size);
1823 res= res || write_data(m_rows_buf, (size_t) data_size);
1824
1825 return res;
1826
1827}
1828#endif
1829
1830
1831#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1832void Old_rows_log_event::pack_info(Protocol *protocol)
1833{
1834 char buf[256];
1835 char const *const flagstr=
1836 get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
1837 size_t bytes= my_snprintf(buf, sizeof(buf),
1838 "table_id: %lu%s", m_table_id, flagstr);
1839 protocol->store(buf, bytes, &my_charset_bin);
1840}
1841#endif
1842
1843
1844#ifdef MYSQL_CLIENT
1845bool Old_rows_log_event::print_helper(FILE *file,
1846 PRINT_EVENT_INFO *print_event_info,
1847 char const *const name)
1848{
1849 IO_CACHE *const head= &print_event_info->head_cache;
1850 IO_CACHE *const body= &print_event_info->body_cache;
1851 if (!print_event_info->short_form)
1852 {
1853 bool const last_stmt_event= get_flags(STMT_END_F);
1854 if (print_header(head, print_event_info, !last_stmt_event) ||
1855 my_b_printf(head, "\t%s: table id %lu%s\n",
1856 name, m_table_id,
1857 last_stmt_event ? " flags: STMT_END_F" : "") ||
1858 print_base64(body, print_event_info, !last_stmt_event))
1859 goto err;
1860 }
1861
1862 if (get_flags(STMT_END_F))
1863 {
1864 if (copy_event_cache_to_file_and_reinit(head, file) ||
1865 copy_event_cache_to_file_and_reinit(body, file))
1866 goto err;
1867 }
1868 return 0;
1869err:
1870 return 1;
1871}
1872#endif
1873
1874
1875#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1876/**
1877 Write the current row into event's table.
1878
1879 The row is located in the row buffer, pointed by @c m_curr_row member.
1880 Number of columns of the row is stored in @c m_width member (it can be
1881 different from the number of columns in the table to which we insert).
1882 Bitmap @c m_cols indicates which columns are present in the row. It is assumed
1883 that event's table is already open and pointed by @c m_table.
1884
1885 If the same record already exists in the table it can be either overwritten
1886 or an error is reported depending on the value of @c overwrite flag
1887 (error reporting not yet implemented). Note that the matching record can be
1888 different from the row we insert if we use primary keys to identify records in
1889 the table.
1890
1891 The row to be inserted can contain values only for selected columns. The
1892 missing columns are filled with default values using @c prepare_record()
1893 function. If a matching record is found in the table and @c overwritte is
1894 true, the missing columns are taken from it.
1895
1896 @param rli Relay log info (needed for row unpacking).
1897 @param overwrite
1898 Shall we overwrite if the row already exists or signal
1899 error (currently ignored).
1900
1901 @returns Error code on failure, 0 on success.
1902
1903 This method, if successful, sets @c m_curr_row_end pointer to point at the
1904 next row in the rows buffer. This is done when unpacking the row to be
1905 inserted.
1906
1907 @note If a matching record is found, it is either updated using
1908 @c ha_update_row() or first deleted and then new record written.
1909*/
1910
1911int
1912Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
1913{
1914 DBUG_ENTER("write_row");
1915 DBUG_ASSERT(m_table != NULL && thd != NULL);
1916
1917 TABLE *table= m_table; // pointer to event's table
1918 int error;
1919 int keynum;
1920 auto_afree_ptr<char> key(NULL);
1921
1922 /* fill table->record[0] with default values */
1923
1924 if (unlikely((error=
1925 prepare_record(table, m_width,
1926 TRUE /* check if columns have def. values */))))
1927 DBUG_RETURN(error);
1928
1929 /* unpack row into table->record[0] */
1930 if ((error= unpack_current_row(rgi)))
1931 DBUG_RETURN(error);
1932
1933#ifndef DBUG_OFF
1934 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
1935 DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
1936 DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
1937#endif
1938
1939 /*
1940 Try to write record. If a corresponding record already exists in the table,
1941 we try to change it using ha_update_row() if possible. Otherwise we delete
1942 it and repeat the whole process again.
1943
1944 TODO: Add safety measures against infinite looping.
1945 */
1946
1947 while (unlikely(error= table->file->ha_write_row(table->record[0])))
1948 {
1949 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
1950 {
1951 table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
1952 DBUG_RETURN(error);
1953 }
1954 if (unlikely((keynum= table->file->get_dup_key(error)) < 0))
1955 {
1956 DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
1957 table->file->print_error(error, MYF(0));
1958 /*
1959 We failed to retrieve the duplicate key
1960 - either because the error was not "duplicate key" error
1961 - or because the information which key is not available
1962 */
1963 DBUG_RETURN(error);
1964 }
1965
1966 /*
1967 We need to retrieve the old row into record[1] to be able to
1968 either update or delete the offending record. We either:
1969
1970 - use rnd_pos() with a row-id (available as dupp_row) to the
1971 offending row, if that is possible (MyISAM and Blackhole), or else
1972
1973 - use index_read_idx() with the key that is duplicated, to
1974 retrieve the offending row.
1975 */
1976 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1977 {
1978 DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
1979 error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
1980 if (unlikely(error))
1981 {
1982 DBUG_PRINT("info",("rnd_pos() returns error %d",error));
1983 table->file->print_error(error, MYF(0));
1984 DBUG_RETURN(error);
1985 }
1986 }
1987 else
1988 {
1989 DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
1990
1991 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
1992 {
1993 DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
1994 DBUG_RETURN(my_errno);
1995 }
1996
1997 if (key.get() == NULL)
1998 {
1999 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2000 if (unlikely(key.get() == NULL))
2001 {
2002 DBUG_PRINT("info",("Can't allocate key buffer"));
2003 DBUG_RETURN(ENOMEM);
2004 }
2005 }
2006
2007 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2008 0);
2009 error= table->file->ha_index_read_idx_map(table->record[1], keynum,
2010 (const uchar*)key.get(),
2011 HA_WHOLE_KEY,
2012 HA_READ_KEY_EXACT);
2013 if (unlikely(error))
2014 {
2015 DBUG_PRINT("info",("index_read_idx() returns error %d", error));
2016 table->file->print_error(error, MYF(0));
2017 DBUG_RETURN(error);
2018 }
2019 }
2020
2021 /*
2022 Now, record[1] should contain the offending row. That
2023 will enable us to update it or, alternatively, delete it (so
2024 that we can insert the new row afterwards).
2025 */
2026
2027 /*
2028 If row is incomplete we will use the record found to fill
2029 missing columns.
2030 */
2031 if (!get_flags(COMPLETE_ROWS_F))
2032 {
2033 restore_record(table,record[1]);
2034 error= unpack_current_row(rgi);
2035 }
2036
2037#ifndef DBUG_OFF
2038 DBUG_PRINT("debug",("preparing for update: before and after image"));
2039 DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
2040 DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
2041#endif
2042
2043 /*
2044 REPLACE is defined as either INSERT or DELETE + INSERT. If
2045 possible, we can replace it with an UPDATE, but that will not
2046 work on InnoDB if FOREIGN KEY checks are necessary.
2047
2048 I (Matz) am not sure of the reason for the last_uniq_key()
2049 check as, but I'm guessing that it's something along the
2050 following lines.
2051
2052 Suppose that we got the duplicate key to be a key that is not
2053 the last unique key for the table and we perform an update:
2054 then there might be another key for which the unique check will
2055 fail, so we're better off just deleting the row and inserting
2056 the correct row.
2057 */
2058 if (last_uniq_key(table, keynum) &&
2059 !table->file->referenced_by_foreign_key())
2060 {
2061 DBUG_PRINT("info",("Updating row using ha_update_row()"));
2062 error=table->file->ha_update_row(table->record[1],
2063 table->record[0]);
2064 switch (error) {
2065
2066 case HA_ERR_RECORD_IS_THE_SAME:
2067 DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2068 " ha_update_row()"));
2069 error= 0;
2070
2071 case 0:
2072 break;
2073
2074 default:
2075 DBUG_PRINT("info",("ha_update_row() returns error %d",error));
2076 table->file->print_error(error, MYF(0));
2077 }
2078
2079 DBUG_RETURN(error);
2080 }
2081 else
2082 {
2083 DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
2084 if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
2085 {
2086 DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
2087 table->file->print_error(error, MYF(0));
2088 DBUG_RETURN(error);
2089 }
2090 /* Will retry ha_write_row() with the offending row removed. */
2091 }
2092 }
2093
2094 DBUG_RETURN(error);
2095}
2096
2097
2098/**
2099 Locate the current row in event's table.
2100
2101 The current row is pointed by @c m_curr_row. Member @c m_width tells how many
2102 columns are there in the row (this can be differnet from the number of columns
2103 in the table). It is assumed that event's table is already open and pointed
2104 by @c m_table.
2105
2106 If a corresponding record is found in the table it is stored in
2107 @c m_table->record[0]. Note that when record is located based on a primary
2108 key, it is possible that the record found differs from the row being located.
2109
2110 If no key is specified or table does not have keys, a table scan is used to
2111 find the row. In that case the row should be complete and contain values for
2112 all columns. However, it can still be shorter than the table, i.e. the table
2113 can contain extra columns not present in the row. It is also possible that
2114 the table has fewer columns than the row being located.
2115
2116 @returns Error code on failure, 0 on success.
2117
2118 @post In case of success @c m_table->record[0] contains the record found.
2119 Also, the internal "cursor" of the table is positioned at the record found.
2120
2121 @note If the engine allows random access of the records, a combination of
2122 @c position() and @c rnd_pos() will be used.
2123
2124 Note that one MUST call ha_index_or_rnd_end() after this function if
2125 it returns 0 as we must leave the row position in the handler intact
2126 for any following update/delete command.
2127*/
2128
2129int Old_rows_log_event::find_row(rpl_group_info *rgi)
2130{
2131 DBUG_ENTER("find_row");
2132
2133 DBUG_ASSERT(m_table && m_table->in_use != NULL);
2134
2135 TABLE *table= m_table;
2136 int error;
2137
2138 /* unpack row - missing fields get default values */
2139
2140 // TODO: shall we check and report errors here?
2141 prepare_record(table, m_width, FALSE /* don't check errors */);
2142 error= unpack_current_row(rgi);
2143
2144#ifndef DBUG_OFF
2145 DBUG_PRINT("info",("looking for the following record"));
2146 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2147#endif
2148
2149 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2150 table->s->primary_key < MAX_KEY)
2151 {
2152 /*
2153 Use a more efficient method to fetch the record given by
2154 table->record[0] if the engine allows it. We first compute a
2155 row reference using the position() member function (it will be
2156 stored in table->file->ref) and the use rnd_pos() to position
2157 the "cursor" (i.e., record[0] in this case) at the correct row.
2158
2159 TODO: Add a check that the correct record has been fetched by
2160 comparing with the original record. Take into account that the
2161 record on the master and slave can be of different
2162 length. Something along these lines should work:
2163
2164 ADD>>> store_record(table,record[1]);
2165 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
2166 ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
2167 table->s->reclength) == 0);
2168
2169 */
2170 DBUG_PRINT("info",("locating record using primary key (position)"));
2171 int error= table->file->ha_rnd_pos_by_record(table->record[0]);
2172 if (unlikely(error))
2173 {
2174 DBUG_PRINT("info",("rnd_pos returns error %d",error));
2175 table->file->print_error(error, MYF(0));
2176 }
2177 DBUG_RETURN(error);
2178 }
2179
2180 // We can't use position() - try other methods.
2181
2182 /*
2183 We need to retrieve all fields
2184 TODO: Move this out from this function to main loop
2185 */
2186 table->use_all_columns();
2187
2188 /*
2189 Save copy of the record in table->record[1]. It might be needed
2190 later if linear search is used to find exact match.
2191 */
2192 store_record(table,record[1]);
2193
2194 if (table->s->keys > 0)
2195 {
2196 DBUG_PRINT("info",("locating record using primary key (index_read)"));
2197
2198 /* We have a key: search the table using the index */
2199 if (!table->file->inited &&
2200 unlikely(error= table->file->ha_index_init(0, FALSE)))
2201 {
2202 DBUG_PRINT("info",("ha_index_init returns error %d",error));
2203 table->file->print_error(error, MYF(0));
2204 DBUG_RETURN(error);
2205 }
2206
2207 /* Fill key data for the row */
2208
2209 DBUG_ASSERT(m_key);
2210 key_copy(m_key, table->record[0], table->key_info, 0);
2211
2212 /*
2213 Don't print debug messages when running valgrind since they can
2214 trigger false warnings.
2215 */
2216#ifndef HAVE_valgrind
2217 DBUG_DUMP("key data", m_key, table->key_info->key_length);
2218#endif
2219
2220 /*
2221 We need to set the null bytes to ensure that the filler bit are
2222 all set when returning. There are storage engines that just set
2223 the necessary bits on the bytes and don't set the filler bits
2224 correctly.
2225 */
2226 my_ptrdiff_t const pos=
2227 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2228 table->record[0][pos]= 0xFF;
2229
2230 if (unlikely((error= table->file->ha_index_read_map(table->record[0],
2231 m_key,
2232 HA_WHOLE_KEY,
2233 HA_READ_KEY_EXACT))))
2234 {
2235 DBUG_PRINT("info",("no record matching the key found in the table"));
2236 table->file->print_error(error, MYF(0));
2237 table->file->ha_index_end();
2238 DBUG_RETURN(error);
2239 }
2240
2241 /*
2242 Don't print debug messages when running valgrind since they can
2243 trigger false warnings.
2244 */
2245#ifndef HAVE_valgrind
2246 DBUG_PRINT("info",("found first matching record"));
2247 DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
2248#endif
2249 /*
2250 Below is a minor "optimization". If the key (i.e., key number
2251 0) has the HA_NOSAME flag set, we know that we have found the
2252 correct record (since there can be no duplicates); otherwise, we
2253 have to compare the record with the one found to see if it is
2254 the correct one.
2255
2256 CAVEAT! This behaviour is essential for the replication of,
2257 e.g., the mysql.proc table since the correct record *shall* be
2258 found using the primary key *only*. There shall be no
2259 comparison of non-PK columns to decide if the correct record is
2260 found. I can see no scenario where it would be incorrect to
2261 chose the row to change only using a PK or an UNNI.
2262 */
2263 if (table->key_info->flags & HA_NOSAME)
2264 {
2265 /* Unique does not have non nullable part */
2266 if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
2267 {
2268 DBUG_RETURN(0);
2269 }
2270 else
2271 {
2272 KEY *keyinfo= table->key_info;
2273 /*
2274 Unique has nullable part. We need to check if there is any
2275 field in the BI image that is null and part of UNNI.
2276 */
2277 bool null_found= FALSE;
2278 for (uint i=0; i < keyinfo->user_defined_key_parts && !null_found; i++)
2279 {
2280 uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
2281 Field **f= table->field+fieldnr;
2282 null_found= (*f)->is_null();
2283 }
2284
2285 if (!null_found)
2286 {
2287 DBUG_RETURN(0);
2288 }
2289
2290 /* else fall through to index scan */
2291 }
2292 }
2293
2294 /*
2295 In case key is not unique, we still have to iterate over records found
2296 and find the one which is identical to the row given. A copy of the
2297 record we are looking for is stored in record[1].
2298 */
2299 DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
2300
2301 while (record_compare(table))
2302 {
2303 while (unlikely(error= table->file->ha_index_next(table->record[0])))
2304 {
2305 DBUG_PRINT("info",("no record matching the given row found"));
2306 table->file->print_error(error, MYF(0));
2307 (void) table->file->ha_index_end();
2308 DBUG_RETURN(error);
2309 }
2310 }
2311 }
2312 else
2313 {
2314 DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
2315
2316 int restart_count= 0; // Number of times scanning has restarted from top
2317
2318 /* We don't have a key: search the table using rnd_next() */
2319 if (unlikely((error= table->file->ha_rnd_init_with_error(1))))
2320 {
2321 DBUG_PRINT("info",("error initializing table scan"
2322 " (ha_rnd_init returns %d)",error));
2323 DBUG_RETURN(error);
2324 }
2325
2326 /* Continue until we find the right record or have made a full loop */
2327 do
2328 {
2329 restart_rnd_next:
2330 error= table->file->ha_rnd_next(table->record[0]);
2331
2332 switch (error) {
2333
2334 case 0:
2335 break;
2336
2337 case HA_ERR_END_OF_FILE:
2338 if (++restart_count < 2)
2339 {
2340 int error2;
2341 table->file->ha_rnd_end();
2342 if (unlikely((error2= table->file->ha_rnd_init_with_error(1))))
2343 DBUG_RETURN(error2);
2344 goto restart_rnd_next;
2345 }
2346 break;
2347
2348 default:
2349 DBUG_PRINT("info", ("Failed to get next record"
2350 " (rnd_next returns %d)",error));
2351 table->file->print_error(error, MYF(0));
2352 table->file->ha_rnd_end();
2353 DBUG_RETURN(error);
2354 }
2355 }
2356 while (restart_count < 2 && record_compare(table));
2357
2358 /*
2359 Note: above record_compare will take into accout all record fields
2360 which might be incorrect in case a partial row was given in the event
2361 */
2362
2363 /*
2364 Have to restart the scan to be able to fetch the next row.
2365 */
2366 if (restart_count == 2)
2367 DBUG_PRINT("info", ("Record not found"));
2368 else
2369 DBUG_DUMP("record found", table->record[0], table->s->reclength);
2370 if (error)
2371 table->file->ha_rnd_end();
2372
2373 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2374 DBUG_RETURN(error);
2375 }
2376
2377 DBUG_RETURN(0);
2378}
2379
2380#endif
2381
2382
2383/**************************************************************************
2384 Write_rows_log_event member functions
2385**************************************************************************/
2386
2387/*
2388 Constructor used to build an event for writing to the binary log.
2389 */
2390#if !defined(MYSQL_CLIENT)
2391Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2392 TABLE *tbl_arg,
2393 ulong tid_arg,
2394 MY_BITMAP const *cols,
2395 bool is_transactional)
2396 : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
2397{
2398
2399 // This constructor should not be reached.
2400 assert(0);
2401
2402}
2403#endif
2404
2405
2406/*
2407 Constructor used by slave to read the event from the binary log.
2408 */
2409#ifdef HAVE_REPLICATION
2410Write_rows_log_event_old::Write_rows_log_event_old(const char *buf,
2411 uint event_len,
2412 const Format_description_log_event
2413 *description_event)
2414: Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT,
2415 description_event)
2416{
2417}
2418#endif
2419
2420
2421#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2422int
2423Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2424{
2425 int error= 0;
2426
2427 /*
2428 We are using REPLACE semantics and not INSERT IGNORE semantics
2429 when writing rows, that is: new rows replace old rows. We need to
2430 inform the storage engine that it should use this behaviour.
2431 */
2432
2433 /* Tell the storage engine that we are using REPLACE semantics. */
2434 thd->lex->duplicates= DUP_REPLACE;
2435
2436 thd->lex->sql_command= SQLCOM_REPLACE;
2437 /*
2438 Do not raise the error flag in case of hitting to an unique attribute
2439 */
2440 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2441 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2442 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2443 m_table->file->ha_start_bulk_insert(0);
2444 return error;
2445}
2446
2447
2448int
2449Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2450 int error)
2451{
2452 int local_error= 0;
2453 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2454 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2455 /*
2456 reseting the extra with
2457 table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
2458 fires bug#27077
2459 todo: explain or fix
2460 */
2461 if (unlikely((local_error= m_table->file->ha_end_bulk_insert())))
2462 {
2463 m_table->file->print_error(local_error, MYF(0));
2464 }
2465 return error? error : local_error;
2466}
2467
2468
2469int
2470Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2471{
2472 DBUG_ASSERT(m_table != NULL);
2473 int error= write_row(rgi, TRUE /* overwrite */);
2474
2475 if (unlikely(error) && !thd->net.last_errno)
2476 thd->net.last_errno= error;
2477
2478 return error;
2479}
2480
2481#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2482
2483
2484#ifdef MYSQL_CLIENT
2485bool Write_rows_log_event_old::print(FILE *file,
2486 PRINT_EVENT_INFO* print_event_info)
2487{
2488 return Old_rows_log_event::print_helper(file, print_event_info,
2489 "Write_rows_old");
2490}
2491#endif
2492
2493
2494/**************************************************************************
2495 Delete_rows_log_event member functions
2496**************************************************************************/
2497
2498/*
2499 Constructor used to build an event for writing to the binary log.
2500 */
2501
2502#ifndef MYSQL_CLIENT
2503Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2504 TABLE *tbl_arg,
2505 ulong tid,
2506 MY_BITMAP const *cols,
2507 bool is_transactional)
2508 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2509 m_after_image(NULL), m_memory(NULL)
2510{
2511
2512 // This constructor should not be reached.
2513 assert(0);
2514
2515}
2516#endif /* #if !defined(MYSQL_CLIENT) */
2517
2518
2519/*
2520 Constructor used by slave to read the event from the binary log.
2521 */
2522#ifdef HAVE_REPLICATION
2523Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf,
2524 uint event_len,
2525 const Format_description_log_event
2526 *description_event)
2527 : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT,
2528 description_event),
2529 m_after_image(NULL), m_memory(NULL)
2530{
2531}
2532#endif
2533
2534
2535#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2536
2537int
2538Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2539{
2540 if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2541 m_table->s->primary_key < MAX_KEY)
2542 {
2543 /*
2544 We don't need to allocate any memory for m_key since it is not used.
2545 */
2546 return 0;
2547 }
2548
2549 if (m_table->s->keys > 0)
2550 {
2551 // Allocate buffer for key searches
2552 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2553 if (!m_key)
2554 return HA_ERR_OUT_OF_MEM;
2555 }
2556 return 0;
2557}
2558
2559
2560int
2561Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2562 int error)
2563{
2564 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2565 m_table->file->ha_index_or_rnd_end();
2566 my_free(m_key);
2567 m_key= NULL;
2568
2569 return error;
2570}
2571
2572
2573int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2574{
2575 int error;
2576 DBUG_ASSERT(m_table != NULL);
2577
2578 if (likely(!(error= find_row(rgi))) )
2579 {
2580 /*
2581 Delete the record found, located in record[0]
2582 */
2583 error= m_table->file->ha_delete_row(m_table->record[0]);
2584 m_table->file->ha_index_or_rnd_end();
2585 }
2586 return error;
2587}
2588
2589#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2590
2591
2592#ifdef MYSQL_CLIENT
2593bool Delete_rows_log_event_old::print(FILE *file,
2594 PRINT_EVENT_INFO* print_event_info)
2595{
2596 return Old_rows_log_event::print_helper(file, print_event_info,
2597 "Delete_rows_old");
2598}
2599#endif
2600
2601
2602/**************************************************************************
2603 Update_rows_log_event member functions
2604**************************************************************************/
2605
2606/*
2607 Constructor used to build an event for writing to the binary log.
2608 */
2609#if !defined(MYSQL_CLIENT)
2610Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2611 TABLE *tbl_arg,
2612 ulong tid,
2613 MY_BITMAP const *cols,
2614 bool is_transactional)
2615 : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional),
2616 m_after_image(NULL), m_memory(NULL)
2617{
2618
2619 // This constructor should not be reached.
2620 assert(0);
2621}
2622#endif /* !defined(MYSQL_CLIENT) */
2623
2624
2625/*
2626 Constructor used by slave to read the event from the binary log.
2627 */
2628#ifdef HAVE_REPLICATION
2629Update_rows_log_event_old::Update_rows_log_event_old(const char *buf,
2630 uint event_len,
2631 const
2632 Format_description_log_event
2633 *description_event)
2634 : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT,
2635 description_event),
2636 m_after_image(NULL), m_memory(NULL)
2637{
2638}
2639#endif
2640
2641
2642#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2643
2644int
2645Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const)
2646{
2647 if (m_table->s->keys > 0)
2648 {
2649 // Allocate buffer for key searches
2650 m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
2651 if (!m_key)
2652 return HA_ERR_OUT_OF_MEM;
2653 }
2654
2655 return 0;
2656}
2657
2658
2659int
2660Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const,
2661 int error)
2662{
2663 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
2664 m_table->file->ha_index_or_rnd_end();
2665 my_free(m_key); // Free for multi_malloc
2666 m_key= NULL;
2667
2668 return error;
2669}
2670
2671
2672int
2673Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
2674{
2675 DBUG_ASSERT(m_table != NULL);
2676
2677 int error= find_row(rgi);
2678 if (unlikely(error))
2679 {
2680 /*
2681 We need to read the second image in the event of error to be
2682 able to skip to the next pair of updates
2683 */
2684 m_curr_row= m_curr_row_end;
2685 unpack_current_row(rgi);
2686 return error;
2687 }
2688
2689 /*
2690 This is the situation after locating BI:
2691
2692 ===|=== before image ====|=== after image ===|===
2693 ^ ^
2694 m_curr_row m_curr_row_end
2695
2696 BI found in the table is stored in record[0]. We copy it to record[1]
2697 and unpack AI to record[0].
2698 */
2699
2700 store_record(m_table,record[1]);
2701
2702 m_curr_row= m_curr_row_end;
2703 error= unpack_current_row(rgi); // this also updates m_curr_row_end
2704
2705 /*
2706 Now we have the right row to update. The old row (the one we're
2707 looking for) is in record[1] and the new row is in record[0].
2708 */
2709#ifndef HAVE_valgrind
2710 /*
2711 Don't print debug messages when running valgrind since they can
2712 trigger false warnings.
2713 */
2714 DBUG_PRINT("info",("Updating row in table"));
2715 DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
2716 DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
2717#endif
2718
2719 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2720 m_table->file->ha_index_or_rnd_end();
2721
2722 if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
2723 error= 0;
2724
2725 return error;
2726}
2727
2728#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
2729
2730
2731#ifdef MYSQL_CLIENT
2732bool Update_rows_log_event_old::print(FILE *file,
2733 PRINT_EVENT_INFO* print_event_info)
2734{
2735 return Old_rows_log_event::print_helper(file, print_event_info,
2736 "Update_rows_old");
2737}
2738#endif
2739