1/* Copyright (c) 2000, 2017, Oracle and/or its affiliates.
2 Copyright (c) 2008, 2017, MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17#include "mariadb.h"
18#include "sql_priv.h"
19#include "unireg.h"
20#include "sql_base.h"
21#include "sql_parse.h" // check_access
22#ifdef HAVE_REPLICATION
23
24#include "rpl_mi.h"
25#include "rpl_rli.h"
26#include "sql_repl.h"
27#include "sql_acl.h" // SUPER_ACL
28#include "log_event.h"
29#include "rpl_filter.h"
30#include <my_dir.h>
31#include "debug_sync.h"
32#include "semisync_master.h"
33#include "semisync_slave.h"
34
35enum enum_gtid_until_state {
36 GTID_UNTIL_NOT_DONE,
37 GTID_UNTIL_STOP_AFTER_STANDALONE,
38 GTID_UNTIL_STOP_AFTER_TRANSACTION
39};
40
41
42int max_binlog_dump_events = 0; // unlimited
43my_bool opt_sporadic_binlog_dump_fail = 0;
44#ifndef DBUG_OFF
45static int binlog_dump_count = 0;
46#endif
47
48extern TYPELIB binlog_checksum_typelib;
49
50
51static int
52fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
53 my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
54 enum enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos)
55{
56 char header[LOG_EVENT_HEADER_LEN];
57 ulong event_len;
58
59 *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
60 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
61
62 /*
63 'when' (the timestamp) is set to 0 so that slave could distinguish between
64 real and fake Rotate events (if necessary)
65 */
66 memset(header, 0, 4);
67 header[EVENT_TYPE_OFFSET] = (uchar)event_type;
68 event_len= LOG_EVENT_HEADER_LEN + extra_len +
69 (*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
70 int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
71 int4store(header + EVENT_LEN_OFFSET, event_len);
72 int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
73 // TODO: check what problems this may cause and fix them
74 int4store(header + LOG_POS_OFFSET, end_pos);
75 if (packet->append(header, sizeof(header)))
76 {
77 *errmsg= "Failed due to out-of-memory writing event";
78 return -1;
79 }
80 if (*do_checksum)
81 {
82 *crc= my_checksum(0, (uchar*)header, sizeof(header));
83 }
84 return 0;
85}
86
87
88static int
89fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
90{
91 if (do_checksum)
92 {
93 char b[BINLOG_CHECKSUM_LEN];
94 int4store(b, crc);
95 if (packet->append(b, sizeof(b)))
96 {
97 *errmsg= "Failed due to out-of-memory writing event checksum";
98 return -1;
99 }
100 }
101 return 0;
102}
103
104
105static int
106fake_event_write(NET *net, String *packet, const char **errmsg)
107{
108 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
109 {
110 *errmsg = "failed on my_net_write()";
111 return -1;
112 }
113 return 0;
114}
115
116
117/*
118 Helper structure, used to pass miscellaneous info from mysql_binlog_send()
119 into the helper functions that it calls.
120*/
121struct binlog_send_info {
122 rpl_binlog_state until_binlog_state;
123 slave_connection_state gtid_state;
124 THD *thd;
125 NET *net;
126 String *packet;
127 char *const log_file_name; // ptr/alias to linfo.log_file_name
128 slave_connection_state *until_gtid_state;
129 slave_connection_state until_gtid_state_obj;
130 Format_description_log_event *fdev;
131 int mariadb_slave_capability;
132 enum_gtid_skip_type gtid_skip_group;
133 enum_gtid_until_state gtid_until_group;
134 ushort flags;
135 enum enum_binlog_checksum_alg current_checksum_alg;
136 bool slave_gtid_strict_mode;
137 bool send_fake_gtid_list;
138 bool slave_gtid_ignore_duplicates;
139 bool using_gtid_state;
140
141 int error;
142 const char *errmsg;
143 char error_text[MAX_SLAVE_ERRMSG];
144 rpl_gtid error_gtid;
145
146 ulonglong heartbeat_period;
147
148 /** start file/pos as requested by slave, for error message */
149 char start_log_file_name[FN_REFLEN];
150 my_off_t start_pos;
151
152 /** last pos for error message */
153 my_off_t last_pos;
154
155#ifndef DBUG_OFF
156 int left_events;
157 uint dbug_reconnect_counter;
158 ulong hb_info_counter;
159#endif
160
161 bool clear_initial_log_pos;
162 bool should_stop;
163 size_t dirlen;
164
165 binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
166 char *lfn)
167 : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
168 log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
169 gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
170 flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
171 slave_gtid_strict_mode(false), send_fake_gtid_list(false),
172 slave_gtid_ignore_duplicates(false),
173 error(0),
174 errmsg("Unknown error"),
175 heartbeat_period(0),
176#ifndef DBUG_OFF
177 left_events(max_binlog_dump_events),
178 dbug_reconnect_counter(0),
179 hb_info_counter(0),
180#endif
181 clear_initial_log_pos(false),
182 should_stop(false)
183 {
184 error_text[0] = 0;
185 bzero(&error_gtid, sizeof(error_gtid));
186 until_binlog_state.init();
187 }
188};
189
190// prototype
191static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
192 ulong *ev_offset, const char **errmsg);
193
194/*
195 fake_rotate_event() builds a fake (=which does not exist physically in any
196 binlog) Rotate event, which contains the name of the binlog we are going to
197 send to the slave (because the slave may not know it if it just asked for
198 MASTER_LOG_FILE='', MASTER_LOG_POS=4).
199 < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
200 After this version we always call it, so that a 3.23.58 slave can rely on
201 it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
202 zeros in the good positions which, by chance, make it possible for the 3.23
203 slave to detect that this event is unexpected) (this is luck which happens
204 because the master and slave disagree on the size of the header of
205 Log_event).
206
207 Relying on the event length of the Rotate event instead of these
208 well-placed zeros was not possible as Rotate events have a variable-length
209 part.
210*/
211
212static int fake_rotate_event(binlog_send_info *info, ulonglong position,
213 const char** errmsg, enum enum_binlog_checksum_alg checksum_alg_arg)
214{
215 DBUG_ENTER("fake_rotate_event");
216 ulong ev_offset;
217 char buf[ROTATE_HEADER_LEN+100];
218 my_bool do_checksum;
219 int err;
220 char* p = info->log_file_name+dirname_length(info->log_file_name);
221 uint ident_len = (uint) strlen(p);
222 String *packet= info->packet;
223 ha_checksum crc;
224
225 /* reset transmit packet for the fake rotate event below */
226 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
227 DBUG_RETURN(1);
228
229 if ((err= fake_event_header(packet, ROTATE_EVENT,
230 ident_len + ROTATE_HEADER_LEN, &do_checksum,
231 &crc,
232 errmsg, checksum_alg_arg, 0)))
233 {
234 info->error= ER_UNKNOWN_ERROR;
235 DBUG_RETURN(err);
236 }
237
238 int8store(buf+R_POS_OFFSET,position);
239 packet->append(buf, ROTATE_HEADER_LEN);
240 packet->append(p, ident_len);
241
242 if (do_checksum)
243 {
244 crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
245 crc= my_checksum(crc, (uchar*)p, ident_len);
246 }
247
248 if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
249 (err= fake_event_write(info->net, packet, errmsg)))
250 {
251 info->error= ER_UNKNOWN_ERROR;
252 DBUG_RETURN(err);
253 }
254 DBUG_RETURN(0);
255}
256
257
258static int fake_gtid_list_event(binlog_send_info *info,
259 Gtid_list_log_event *glev, const char** errmsg,
260 uint32 current_pos)
261{
262 my_bool do_checksum;
263 int err;
264 ha_checksum crc;
265 char buf[128];
266 String str(buf, sizeof(buf), system_charset_info);
267 String* packet= info->packet;
268
269 str.length(0);
270 if (glev->to_packet(&str))
271 {
272 info->error= ER_UNKNOWN_ERROR;
273 *errmsg= "Failed due to out-of-memory writing Gtid_list event";
274 return -1;
275 }
276 if ((err= fake_event_header(packet, GTID_LIST_EVENT,
277 str.length(), &do_checksum, &crc,
278 errmsg, info->current_checksum_alg, current_pos)))
279 {
280 info->error= ER_UNKNOWN_ERROR;
281 return err;
282 }
283
284 packet->append(str);
285 if (do_checksum)
286 {
287 crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
288 }
289
290 if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
291 (err= fake_event_write(info->net, packet, errmsg)))
292 {
293 info->error= ER_UNKNOWN_ERROR;
294 return err;
295 }
296
297 return 0;
298}
299
300
301/*
302 Reset thread transmit packet buffer for event sending
303
304 This function allocates header bytes for event transmission, and
305 should be called before store the event data to the packet buffer.
306*/
307static int reset_transmit_packet(binlog_send_info *info, ushort flags,
308 ulong *ev_offset, const char **errmsg)
309{
310 int ret= 0;
311 String *packet= &info->thd->packet;
312
313 /* reserve and set default header */
314 packet->length(0);
315 packet->set("\0", 1, &my_charset_bin);
316
317 if (info->thd->semi_sync_slave)
318 {
319 if (repl_semisync_master.reserve_sync_header(packet))
320 {
321 info->error= ER_UNKNOWN_ERROR;
322 *errmsg= "Failed to run hook 'reserve_header'";
323 ret= 1;
324 }
325 }
326
327 *ev_offset= packet->length();
328 return ret;
329}
330
331int get_user_var_int(const char *name,
332 long long int *value, int *null_value)
333{
334 bool null_val;
335 user_var_entry *entry=
336 (user_var_entry*) my_hash_search(&current_thd->user_vars,
337 (uchar*) name, strlen(name));
338 if (!entry)
339 return 1;
340 *value= entry->val_int(&null_val);
341 if (null_value)
342 *null_value= null_val;
343 return 0;
344}
345
346inline bool is_semi_sync_slave()
347{
348 int null_value;
349 long long val= 0;
350 get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
351 return val;
352}
353
354static int send_file(THD *thd)
355{
356 NET* net = &thd->net;
357 int fd = -1, error = 1;
358 size_t bytes;
359 char fname[FN_REFLEN+1];
360 const char *errmsg = 0;
361 int old_timeout;
362 unsigned long packet_len;
363 uchar buf[IO_SIZE]; // It's safe to alloc this
364 DBUG_ENTER("send_file");
365
366 /*
367 The client might be slow loading the data, give him wait_timeout to do
368 the job
369 */
370 old_timeout= net->read_timeout;
371 my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
372
373 /*
374 We need net_flush here because the client will not know it needs to send
375 us the file name until it has processed the load event entry
376 */
377 if (unlikely(net_flush(net) || (packet_len = my_net_read(net)) == packet_error))
378 {
379 errmsg = "while reading file name";
380 goto err;
381 }
382
383 // terminate with \0 for fn_format
384 *((char*)net->read_pos + packet_len) = 0;
385 fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
386 // this is needed to make replicate-ignore-db
387 if (!strcmp(fname,"/dev/null"))
388 goto end;
389
390 if ((fd= mysql_file_open(key_file_send_file,
391 fname, O_RDONLY, MYF(0))) < 0)
392 {
393 errmsg = "on open of file";
394 goto err;
395 }
396
397 while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0)
398 {
399 if (my_net_write(net, buf, bytes))
400 {
401 errmsg = "while writing data to client";
402 goto err;
403 }
404 }
405
406 end:
407 if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
408 (my_net_read(net) == packet_error))
409 {
410 errmsg = "while negotiating file transfer close";
411 goto err;
412 }
413 error = 0;
414
415 err:
416 my_net_set_read_timeout(net, old_timeout);
417 if (fd >= 0)
418 mysql_file_close(fd, MYF(0));
419 if (errmsg)
420 {
421 sql_print_error("Failed in send_file() %s", errmsg);
422 DBUG_PRINT("error", ("%s", errmsg));
423 }
424 DBUG_RETURN(error);
425}
426
427
428/**
429 Internal to mysql_binlog_send() routine that recalculates checksum for
430 a FD event (asserted) that needs additional arranment prior sending to slave.
431*/
432inline void fix_checksum(String *packet, ulong ev_offset)
433{
434 /* recalculate the crc for this event */
435 uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
436 ha_checksum crc;
437 DBUG_ASSERT(data_len ==
438 LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
439 BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
440 crc= my_checksum(0, (uchar *)packet->ptr() + ev_offset, data_len -
441 BINLOG_CHECKSUM_LEN);
442 int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
443}
444
445
446static user_var_entry * get_binlog_checksum_uservar(THD * thd)
447{
448 LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")};
449 user_var_entry *entry=
450 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
451 name.length);
452 return entry;
453}
454
455/**
456 Function for calling in mysql_binlog_send
457 to check if slave initiated checksum-handshake.
458
459 @param[in] thd THD to access a user variable
460
461 @return TRUE if handshake took place, FALSE otherwise
462*/
463
464static bool is_slave_checksum_aware(THD * thd)
465{
466 DBUG_ENTER("is_slave_checksum_aware");
467 user_var_entry *entry= get_binlog_checksum_uservar(thd);
468 DBUG_RETURN(entry? true : false);
469}
470
471/**
472 Function for calling in mysql_binlog_send
473 to get the value of @@binlog_checksum of the master at
474 time of checksum-handshake.
475
476 The value tells the master whether to compute or not, and the slave
477 to verify or not the first artificial Rotate event's checksum.
478
479 @param[in] thd THD to access a user variable
480
481 @return value of @@binlog_checksum alg according to
482 @c enum enum_binlog_checksum_alg
483*/
484
485static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd)
486{
487 enum enum_binlog_checksum_alg ret;
488
489 DBUG_ENTER("get_binlog_checksum_value_at_connect");
490 user_var_entry *entry= get_binlog_checksum_uservar(thd);
491 if (!entry)
492 {
493 ret= BINLOG_CHECKSUM_ALG_UNDEF;
494 }
495 else
496 {
497 DBUG_ASSERT(entry->type == STRING_RESULT);
498 String str;
499 uint dummy_errors;
500 str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
501 &dummy_errors);
502 ret= (enum_binlog_checksum_alg)
503 (find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1);
504 DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
505 }
506 DBUG_RETURN(ret);
507}
508
509/*
510 Adjust the position pointer in the binary log file for all running slaves
511
512 SYNOPSIS
513 adjust_linfo_offsets()
514 purge_offset Number of bytes removed from start of log index file
515
516 NOTES
517 - This is called when doing a PURGE when we delete lines from the
518 index log file
519
520 REQUIREMENTS
521 - Before calling this function, we have to ensure that no threads are
522 using any binary log file before purge_offset.a
523
524 TODO
525 - Inform the slave threads that they should sync the position
526 in the binary log file with Relay_log_info::flush().
527 Now they sync is done for next read.
528*/
529
530void adjust_linfo_offsets(my_off_t purge_offset)
531{
532 THD *tmp;
533
534 mysql_mutex_lock(&LOCK_thread_count);
535 I_List_iterator<THD> it(threads);
536
537 while ((tmp=it++))
538 {
539 LOG_INFO* linfo;
540 if ((linfo = tmp->current_linfo))
541 {
542 mysql_mutex_lock(&linfo->lock);
543 /*
544 Index file offset can be less that purge offset only if
545 we just started reading the index file. In that case
546 we have nothing to adjust
547 */
548 if (linfo->index_file_offset < purge_offset)
549 linfo->fatal = (linfo->index_file_offset != 0);
550 else
551 linfo->index_file_offset -= purge_offset;
552 mysql_mutex_unlock(&linfo->lock);
553 }
554 }
555 mysql_mutex_unlock(&LOCK_thread_count);
556}
557
558
559bool log_in_use(const char* log_name)
560{
561 size_t log_name_len = strlen(log_name) + 1;
562 THD *tmp;
563 bool result = 0;
564
565 mysql_mutex_lock(&LOCK_thread_count);
566 I_List_iterator<THD> it(threads);
567
568 while ((tmp=it++))
569 {
570 LOG_INFO* linfo;
571 if ((linfo = tmp->current_linfo))
572 {
573 mysql_mutex_lock(&linfo->lock);
574 result = !memcmp(log_name, linfo->log_file_name, log_name_len);
575 mysql_mutex_unlock(&linfo->lock);
576 if (result)
577 break;
578 }
579 }
580
581 mysql_mutex_unlock(&LOCK_thread_count);
582 return result;
583}
584
585bool purge_error_message(THD* thd, int res)
586{
587 uint errcode;
588
589 if ((errcode= purge_log_get_error_code(res)) != 0)
590 {
591 my_message(errcode, ER_THD(thd, errcode), MYF(0));
592 return TRUE;
593 }
594 my_ok(thd);
595 return FALSE;
596}
597
598
599/**
600 Execute a PURGE BINARY LOGS TO <log> command.
601
602 @param thd Pointer to THD object for the client thread executing the
603 statement.
604
605 @param to_log Name of the last log to purge.
606
607 @retval FALSE success
608 @retval TRUE failure
609*/
610bool purge_master_logs(THD* thd, const char* to_log)
611{
612 char search_file_name[FN_REFLEN];
613 if (!mysql_bin_log.is_open())
614 {
615 my_ok(thd);
616 return FALSE;
617 }
618
619 mysql_bin_log.make_log_name(search_file_name, to_log);
620 return purge_error_message(thd,
621 mysql_bin_log.purge_logs(search_file_name, 0, 1,
622 1, NULL));
623}
624
625
626/**
627 Execute a PURGE BINARY LOGS BEFORE <date> command.
628
629 @param thd Pointer to THD object for the client thread executing the
630 statement.
631
632 @param purge_time Date before which logs should be purged.
633
634 @retval FALSE success
635 @retval TRUE failure
636*/
637bool purge_master_logs_before_date(THD* thd, time_t purge_time)
638{
639 if (!mysql_bin_log.is_open())
640 {
641 my_ok(thd);
642 return 0;
643 }
644 return purge_error_message(thd,
645 mysql_bin_log.purge_logs_before_date(purge_time));
646}
647
648void set_read_error(binlog_send_info *info, int error)
649{
650 if (error == LOG_READ_EOF)
651 {
652 return;
653 }
654 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
655 switch (error) {
656 case LOG_READ_BOGUS:
657 info->errmsg= "bogus data in log event";
658 break;
659 case LOG_READ_TOO_LARGE:
660 info->errmsg= "log event entry exceeded max_allowed_packet; "
661 "Increase max_allowed_packet on master";
662 break;
663 case LOG_READ_IO:
664 info->errmsg= "I/O error reading log event";
665 break;
666 case LOG_READ_MEM:
667 info->errmsg= "memory allocation failed reading log event";
668 break;
669 case LOG_READ_TRUNC:
670 info->errmsg= "binlog truncated in the middle of event; "
671 "consider out of disk space on master";
672 break;
673 case LOG_READ_CHECKSUM_FAILURE:
674 info->errmsg= "event read from binlog did not pass crc check";
675 break;
676 case LOG_READ_DECRYPT:
677 info->errmsg= "event decryption failure";
678 break;
679 default:
680 info->errmsg= "unknown error reading log event on the master";
681 break;
682 }
683}
684
685
686/**
687 An auxiliary function for calling in mysql_binlog_send
688 to initialize the heartbeat timeout in waiting for a binlogged event.
689
690 @param[in] thd THD to access a user variable
691
692 @return heartbeat period an ulonglong of nanoseconds
693 or zero if heartbeat was not demanded by slave
694*/
695static ulonglong get_heartbeat_period(THD * thd)
696{
697 bool null_value;
698 LEX_CSTRING name= { STRING_WITH_LEN("master_heartbeat_period")};
699 user_var_entry *entry=
700 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
701 name.length);
702 return entry? entry->val_int(&null_value) : 0;
703}
704
705/*
706 Lookup the capabilities of the slave, which it announces by setting a value
707 MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
708
709 Older MariaDB slaves, and other MySQL slaves, do not set
710 @mariadb_slave_capability, corresponding to a capability of
711 MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
712*/
713static int
714get_mariadb_slave_capability(THD *thd)
715{
716 bool null_value;
717 const LEX_CSTRING name= { STRING_WITH_LEN("mariadb_slave_capability") };
718 const user_var_entry *entry=
719 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
720 name.length);
721 return entry ?
722 (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
723}
724
725
726/*
727 Get the value of the @slave_connect_state user variable into the supplied
728 String (this is the GTID connect state requested by the connecting slave).
729
730 Returns false if error (ie. slave did not set the variable and does not
731 want to use GTID to set start position), true if success.
732*/
733static bool
734get_slave_connect_state(THD *thd, String *out_str)
735{
736 bool null_value;
737
738 const LEX_CSTRING name= { STRING_WITH_LEN("slave_connect_state") };
739 user_var_entry *entry=
740 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
741 name.length);
742 return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
743}
744
745
746static bool
747get_slave_gtid_strict_mode(THD *thd)
748{
749 bool null_value;
750
751 const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_strict_mode") };
752 user_var_entry *entry=
753 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
754 name.length);
755 return entry && entry->val_int(&null_value) && !null_value;
756}
757
758
759static bool
760get_slave_gtid_ignore_duplicates(THD *thd)
761{
762 bool null_value;
763
764 const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
765 user_var_entry *entry=
766 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
767 name.length);
768 return entry && entry->val_int(&null_value) && !null_value;
769}
770
771
772/*
773 Get the value of the @slave_until_gtid user variable into the supplied
774 String (this is the GTID position specified for START SLAVE UNTIL
775 master_gtid_pos='xxx').
776
777 Returns false if error (ie. slave did not set the variable and is not doing
778 START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
779*/
780static bool
781get_slave_until_gtid(THD *thd, String *out_str)
782{
783 bool null_value;
784
785 const LEX_CSTRING name= { STRING_WITH_LEN("slave_until_gtid") };
786 user_var_entry *entry=
787 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
788 name.length);
789 return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
790}
791
792
793/*
794 Function prepares and sends repliation heartbeat event.
795
796 @param net net object of THD
797 @param packet buffer to store the heartbeat instance
798 @param event_coordinates binlog file name and position of the last
799 real event master sent from binlog
800
801 @note
802 Among three essential pieces of heartbeat data Log_event::when
803 is computed locally.
804 The error to send is serious and should force terminating
805 the dump thread.
806*/
807static int send_heartbeat_event(binlog_send_info *info,
808 NET* net, String* packet,
809 const struct event_coordinates *coord,
810 enum enum_binlog_checksum_alg checksum_alg_arg)
811{
812 DBUG_ENTER("send_heartbeat_event");
813
814 ulong ev_offset;
815 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
816 DBUG_RETURN(1);
817
818 char header[LOG_EVENT_HEADER_LEN];
819 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
820 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
821 /*
822 'when' (the timestamp) is set to 0 so that slave could distinguish between
823 real and fake Rotate events (if necessary)
824 */
825 memset(header, 0, 4); // when
826
827 header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
828
829 char* p= coord->file_name + dirname_length(coord->file_name);
830
831 size_t ident_len = strlen(p);
832 size_t event_len = ident_len + LOG_EVENT_HEADER_LEN +
833 (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
834 int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
835 int4store(header + EVENT_LEN_OFFSET, event_len);
836 int2store(header + FLAGS_OFFSET, 0);
837
838 int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
839
840 packet->append(header, sizeof(header));
841 packet->append(p, ident_len); // log_file_name
842
843 if (do_checksum)
844 {
845 char b[BINLOG_CHECKSUM_LEN];
846 ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header));
847 crc= my_checksum(crc, (uchar*) p, ident_len);
848 int4store(b, crc);
849 packet->append(b, sizeof(b));
850 }
851
852 if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
853 net_flush(net))
854 {
855 info->error= ER_UNKNOWN_ERROR;
856 DBUG_RETURN(-1);
857 }
858
859 DBUG_RETURN(0);
860}
861
862
863struct binlog_file_entry
864{
865 binlog_file_entry *next;
866 char *name;
867};
868
869static binlog_file_entry *
870get_binlog_list(MEM_ROOT *memroot)
871{
872 IO_CACHE *index_file;
873 char fname[FN_REFLEN];
874 size_t length;
875 binlog_file_entry *current_list= NULL, *e;
876 DBUG_ENTER("get_binlog_list");
877
878 if (!mysql_bin_log.is_open())
879 {
880 my_error(ER_NO_BINARY_LOGGING, MYF(0));
881 DBUG_RETURN(NULL);
882 }
883
884 mysql_bin_log.lock_index();
885 index_file=mysql_bin_log.get_index_file();
886 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
887
888 /* The file ends with EOF or empty line */
889 while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
890 {
891 --length; /* Remove the newline */
892 if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
893 !(e->name= strmake_root(memroot, fname, length)))
894 {
895 mysql_bin_log.unlock_index();
896 DBUG_RETURN(NULL);
897 }
898 e->next= current_list;
899 current_list= e;
900 }
901 mysql_bin_log.unlock_index();
902
903 DBUG_RETURN(current_list);
904}
905
906
907/*
908 Check if every GTID requested by the slave is contained in this (or a later)
909 binlog file. Return true if so, false if not.
910
911 We do the check with a single scan of the list of GTIDs, avoiding the need
912 to build an in-memory hash or stuff like that.
913
914 We need to check that slave did not request GTID D-S-N1, when the
915 Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1.
916 (Because this means that requested GTID is in an earlier binlog).
917 However, if the Gtid_list_log_event indicates that D-S-N1 is the very last
918 GTID for domain D in prior binlog files, then it is ok to start from the
919 very start of this binlog file. This special case is important, as it
920 allows to purge old logs even if some domain is unused for long.
921
922 In addition, we need to check that we do not have a GTID D-S-N3 in the
923 Gtid_list_log_event where D is not present in the requested slave state at
924 all. Since if D is not in requested slave state, it means that slave needs
925 to start at the very first GTID in domain D.
926*/
927static bool
928contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
929{
930 uint32 i;
931
932 for (i= 0; i < glev->count; ++i)
933 {
934 uint32 gl_domain_id= glev->list[i].domain_id;
935 const rpl_gtid *gtid= st->find(gl_domain_id);
936 if (!gtid)
937 {
938 /*
939 The slave needs to start from the very beginning of this domain, which
940 is in an earlier binlog file. So we need to search back further.
941 */
942 return false;
943 }
944 if (gtid->server_id == glev->list[i].server_id &&
945 gtid->seq_no <= glev->list[i].seq_no)
946 {
947 /*
948 The slave needs to start after gtid, but it is contained in an earlier
949 binlog file. So we need to search back further, unless it was the very
950 last gtid logged for the domain in earlier binlog files.
951 */
952 if (gtid->seq_no < glev->list[i].seq_no)
953 return false;
954
955 /*
956 The slave requested D-S-N1, which happens to be the last GTID logged
957 in prior binlog files with same domain id D and server id S.
958
959 The Gtid_list is kept sorted on domain_id, with the last GTID in each
960 domain_id group being the last one logged. So if this is the last GTID
961 within the domain_id group, then it is ok to start from the very
962 beginning of this group, per the special case explained in comment at
963 the start of this function. If not, then we need to search back further.
964 */
965 if (i+1 < glev->count && gl_domain_id == glev->list[i+1].domain_id)
966 return false;
967 }
968 }
969
970 return true;
971}
972
973
974static void
975give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
976 rpl_gtid *error_gtid)
977{
978 rpl_gtid binlog_gtid;
979
980 if (mysql_bin_log.lookup_domain_in_binlog_state(error_gtid->domain_id,
981 &binlog_gtid) &&
982 binlog_gtid.seq_no >= error_gtid->seq_no)
983 {
984 *errormsg= "Requested slave GTID state not found in binlog. The slave has "
985 "probably diverged due to executing erroneous transactions";
986 *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2;
987 }
988 else
989 {
990 *errormsg= "Requested slave GTID state not found in binlog";
991 *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
992 }
993}
994
995
996/*
997 Check the start GTID state requested by the slave against our binlog state.
998
999 Give an error if the slave requests something that we do not have in our
1000 binlog.
1001*/
1002
1003static int
1004check_slave_start_position(binlog_send_info *info, const char **errormsg,
1005 rpl_gtid *error_gtid)
1006{
1007 uint32 i;
1008 int err;
1009 slave_connection_state::entry **delete_list= NULL;
1010 uint32 delete_idx= 0;
1011 slave_connection_state *st= &info->gtid_state;
1012
1013 if (rpl_load_gtid_slave_state(info->thd))
1014 {
1015 *errormsg= "Failed to load replication slave GTID state";
1016 err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
1017 goto end;
1018 }
1019
1020 for (i= 0; i < st->hash.records; ++i)
1021 {
1022 slave_connection_state::entry *slave_gtid_entry=
1023 (slave_connection_state::entry *)my_hash_element(&st->hash, i);
1024 rpl_gtid *slave_gtid= &slave_gtid_entry->gtid;
1025 rpl_gtid master_gtid;
1026 rpl_gtid master_replication_gtid;
1027 rpl_gtid start_gtid;
1028 bool start_at_own_slave_pos=
1029 rpl_global_gtid_slave_state->domain_to_gtid(slave_gtid->domain_id,
1030 &master_replication_gtid) &&
1031 slave_gtid->server_id == master_replication_gtid.server_id &&
1032 slave_gtid->seq_no == master_replication_gtid.seq_no;
1033
1034 if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
1035 slave_gtid->server_id,
1036 &master_gtid) &&
1037 master_gtid.seq_no >= slave_gtid->seq_no)
1038 {
1039 /*
1040 If connecting slave requests to start at the GTID we last applied when
1041 we were ourselves a slave, then this GTID may not exist in our binlog
1042 (in case of --log-slave-updates=0). So set the flag to disable the
1043 error about missing GTID in the binlog in this case.
1044 */
1045 if (start_at_own_slave_pos)
1046 slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
1047 continue;
1048 }
1049
1050 if (!start_at_own_slave_pos)
1051 {
1052 rpl_gtid domain_gtid;
1053 slave_connection_state *until_gtid_state= info->until_gtid_state;
1054 rpl_gtid *until_gtid;
1055
1056 if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
1057 &domain_gtid))
1058 {
1059 /*
1060 We do not have anything in this domain, neither in the binlog nor
1061 in the slave state. So we are probably one master in a multi-master
1062 setup, and this domain is served by a different master.
1063
1064 But set a flag so that if we then ever _do_ happen to encounter
1065 anything in this domain, then we will re-check that the requested
1066 slave position exists, and give the error at that time if not.
1067 */
1068 slave_gtid_entry->flags|= slave_connection_state::START_ON_EMPTY_DOMAIN;
1069 continue;
1070 }
1071
1072 if (info->slave_gtid_ignore_duplicates &&
1073 domain_gtid.seq_no < slave_gtid->seq_no)
1074 {
1075 /*
1076 When --gtid-ignore-duplicates, it is ok for the slave to request
1077 something that we do not have (yet) - they might already have gotten
1078 it through another path in a multi-path replication hierarchy.
1079 */
1080 continue;
1081 }
1082
1083 if (until_gtid_state &&
1084 ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
1085 (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
1086 until_gtid->server_id,
1087 &master_gtid) &&
1088 master_gtid.seq_no >= until_gtid->seq_no)))
1089 {
1090 /*
1091 The slave requested to start from a position that is not (yet) in
1092 our binlog, but it also specified an UNTIL condition that _is_ in
1093 our binlog (or a missing UNTIL, which means stop at the very
1094 beginning). So the stop position is before the start position, and
1095 we just delete the entry from the UNTIL hash to mark that this
1096 domain has already reached the UNTIL condition.
1097 */
1098 if(until_gtid)
1099 until_gtid_state->remove(until_gtid);
1100 continue;
1101 }
1102
1103 *error_gtid= *slave_gtid;
1104 give_error_start_pos_missing_in_binlog(&err, errormsg, error_gtid);
1105 goto end;
1106 }
1107
1108 /*
1109 Ok, so connecting slave asked to start at a GTID that we do not have in
1110 our binlog, but it was in fact the last GTID we applied earlier, when we
1111 were acting as a replication slave.
1112
1113 So this means that we were running as a replication slave without
1114 --log-slave-updates, but now we switched to be a master. It is worth it
1115 to handle this special case, as it allows users to run a simple
1116 master -> slave without --log-slave-updates, and then exchange slave and
1117 master, as long as they make sure the slave is caught up before switching.
1118 */
1119
1120 /*
1121 First check if we logged something ourselves as a master after being a
1122 slave. This will be seen as a GTID with our own server_id and bigger
1123 seq_no than what is in the slave state.
1124
1125 If we did not log anything ourselves, then start the connecting slave
1126 replicating from the current binlog end position, which in this case
1127 corresponds to our replication slave state and hence what the connecting
1128 slave is requesting.
1129 */
1130 if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
1131 global_system_variables.server_id,
1132 &start_gtid) &&
1133 start_gtid.seq_no > slave_gtid->seq_no)
1134 {
1135 /*
1136 Start replication within this domain at the first GTID that we logged
1137 ourselves after becoming a master.
1138
1139 Remember that this starting point is in fact a "fake" GTID which may
1140 not exists in the binlog, so that we do not complain about it in
1141 --gtid-strict-mode.
1142 */
1143 slave_gtid->server_id= global_system_variables.server_id;
1144 slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
1145 }
1146 else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
1147 &start_gtid))
1148 {
1149 slave_gtid->server_id= start_gtid.server_id;
1150 slave_gtid->seq_no= start_gtid.seq_no;
1151 }
1152 else
1153 {
1154 /*
1155 We do not have _anything_ in our own binlog for this domain. Just
1156 delete the entry in the slave connection state, then it will pick up
1157 anything new that arrives.
1158
1159 We just queue up the deletion and do it later, after the loop, so that
1160 we do not mess up the iteration over the hash.
1161 */
1162 if (!delete_list)
1163 {
1164 if (!(delete_list= (slave_connection_state::entry **)
1165 my_malloc(sizeof(*delete_list) * st->hash.records, MYF(MY_WME))))
1166 {
1167 *errormsg= "Out of memory while checking slave start position";
1168 err= ER_OUT_OF_RESOURCES;
1169 goto end;
1170 }
1171 }
1172 delete_list[delete_idx++]= slave_gtid_entry;
1173 }
1174 }
1175
1176 /* Do any delayed deletes from the hash. */
1177 if (delete_list)
1178 {
1179 for (i= 0; i < delete_idx; ++i)
1180 st->remove(&(delete_list[i]->gtid));
1181 }
1182 err= 0;
1183
1184end:
1185 if (delete_list)
1186 my_free(delete_list);
1187 return err;
1188}
1189
1190/*
1191 Find the name of the binlog file to start reading for a slave that connects
1192 using GTID state.
1193
1194 Returns the file name in out_name, which must be of size at least FN_REFLEN.
1195
1196 Returns NULL on ok, error message on error.
1197
1198 In case of non-error return, the returned binlog file is guaranteed to
1199 contain the first event to be transmitted to the slave for every domain
1200 present in our binlogs. It is still necessary to skip all GTIDs up to
1201 and including the GTID requested by slave within each domain.
1202
1203 However, as a special case, if the event to be sent to the slave is the very
1204 first event (within that domain) in the returned binlog, then nothing should
1205 be skipped, so that domain is deleted from the passed in slave connection
1206 state.
1207
1208 This is necessary in case the slave requests a GTID within a replication
1209 domain that has long been inactive. The binlog file containing that GTID may
1210 have been long since purged. However, as long as no GTIDs after that have
1211 been purged, we have the GTID requested by slave in the Gtid_list_log_event
1212 of the latest binlog. So we can start from there, as long as we delete the
1213 corresponding entry in the slave state so we do not wrongly skip any events
1214 that might turn up if that domain becomes active again, vainly looking for
1215 the requested GTID that was already purged.
1216*/
1217static const char *
1218gtid_find_binlog_file(slave_connection_state *state, char *out_name,
1219 slave_connection_state *until_gtid_state)
1220{
1221 MEM_ROOT memroot;
1222 binlog_file_entry *list;
1223 Gtid_list_log_event *glev= NULL;
1224 const char *errormsg= NULL;
1225 char buf[FN_REFLEN];
1226
1227 init_alloc_root(&memroot, "gtid_find_binlog_file",
1228 10*(FN_REFLEN+sizeof(binlog_file_entry)),
1229 0, MYF(MY_THREAD_SPECIFIC));
1230 if (!(list= get_binlog_list(&memroot)))
1231 {
1232 errormsg= "Out of memory while looking for GTID position in binlog";
1233 goto end;
1234 }
1235
1236 while (list)
1237 {
1238 File file;
1239 IO_CACHE cache;
1240
1241 if (!list->next)
1242 {
1243 /*
1244 It should be safe to read the currently used binlog, as we will only
1245 read the header part that is already written.
1246
1247 But if that does not work on windows, then we will need to cache the
1248 event somewhere in memory I suppose - that could work too.
1249 */
1250 }
1251 /*
1252 Read the Gtid_list_log_event at the start of the binlog file to
1253 get the binlog state.
1254 */
1255 if (normalize_binlog_name(buf, list->name, false))
1256 {
1257 errormsg= "Failed to determine binlog file name while looking for "
1258 "GTID position in binlog";
1259 goto end;
1260 }
1261 bzero((char*) &cache, sizeof(cache));
1262 if (unlikely((file= open_binlog(&cache, buf, &errormsg)) == (File)-1))
1263 goto end;
1264 errormsg= get_gtid_list_event(&cache, &glev);
1265 end_io_cache(&cache);
1266 mysql_file_close(file, MYF(MY_WME));
1267 if (unlikely(errormsg))
1268 goto end;
1269
1270 if (!glev || contains_all_slave_gtid(state, glev))
1271 {
1272 strmake(out_name, buf, FN_REFLEN);
1273
1274 if (glev)
1275 {
1276 uint32 i;
1277
1278 /*
1279 As a special case, we allow to start from binlog file N if the
1280 requested GTID is the last event (in the corresponding domain) in
1281 binlog file (N-1), but then we need to remove that GTID from the slave
1282 state, rather than skipping events waiting for it to turn up.
1283
1284 If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
1285 that are already included in a previous binlog file. Delete any such
1286 from the UNTIL hash, to mark that such domains have already reached
1287 their UNTIL condition.
1288 */
1289 for (i= 0; i < glev->count; ++i)
1290 {
1291 const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
1292 if (!gtid)
1293 {
1294 /*
1295 Contains_all_slave_gtid() returns false if there is any domain in
1296 Gtid_list_event which is not in the requested slave position.
1297
1298 We may delete a domain from the slave state inside this loop, but
1299 we only do this when it is the very last GTID logged for that
1300 domain in earlier binlogs, and then we can not encounter it in any
1301 further GTIDs in the Gtid_list.
1302 */
1303 DBUG_ASSERT(0);
1304 } else if (gtid->server_id == glev->list[i].server_id &&
1305 gtid->seq_no == glev->list[i].seq_no)
1306 {
1307 /*
1308 The slave requested to start from the very beginning of this
1309 domain in this binlog file. So delete the entry from the state,
1310 we do not need to skip anything.
1311 */
1312 state->remove(gtid);
1313 }
1314
1315 if (until_gtid_state &&
1316 (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
1317 gtid->server_id == glev->list[i].server_id &&
1318 gtid->seq_no <= glev->list[i].seq_no)
1319 {
1320 /*
1321 We've already reached the stop position in UNTIL for this domain,
1322 since it is before the start position.
1323 */
1324 until_gtid_state->remove(gtid);
1325 }
1326 }
1327 }
1328
1329 goto end;
1330 }
1331 delete glev;
1332 glev= NULL;
1333 list= list->next;
1334 }
1335
1336 /* We reached the end without finding anything. */
1337 errormsg= "Could not find GTID state requested by slave in any binlog "
1338 "files. Probably the slave state is too old and required binlog files "
1339 "have been purged.";
1340
1341end:
1342 if (glev)
1343 delete glev;
1344
1345 free_root(&memroot, MYF(0));
1346 return errormsg;
1347}
1348
1349
1350/*
1351 Given an old-style binlog position with file name and file offset, find the
1352 corresponding gtid position. If the offset is not at an event boundary, give
1353 an error.
1354
1355 Return NULL on ok, error message string on error.
1356
1357 ToDo: Improve the performance of this by using binlog index files.
1358*/
1359static const char *
1360gtid_state_from_pos(const char *name, uint32 offset,
1361 slave_connection_state *gtid_state)
1362{
1363 IO_CACHE cache;
1364 File file;
1365 const char *errormsg= NULL;
1366 bool found_gtid_list_event= false;
1367 bool found_format_description_event= false;
1368 bool valid_pos= false;
1369 enum enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
1370 int err;
1371 String packet;
1372 Format_description_log_event *fdev= NULL;
1373
1374 if (unlikely(gtid_state->load((const rpl_gtid *)NULL, 0)))
1375 {
1376 errormsg= "Internal error (out of memory?) initializing slave state "
1377 "while scanning binlog to find start position";
1378 return errormsg;
1379 }
1380
1381 if (unlikely((file= open_binlog(&cache, name, &errormsg)) == (File)-1))
1382 return errormsg;
1383
1384 if (!(fdev= new Format_description_log_event(3)))
1385 {
1386 errormsg= "Out of memory initializing format_description event "
1387 "while scanning binlog to find start position";
1388 goto end;
1389 }
1390
1391 /*
1392 First we need to find the initial GTID_LIST_EVENT. We need this even
1393 if the offset is at the very start of the binlog file.
1394
1395 But if we do not find any GTID_LIST_EVENT, then this is an old binlog
1396 with no GTID information, so we return empty GTID state.
1397 */
1398 for (;;)
1399 {
1400 Log_event_type typ;
1401 uint32 cur_pos;
1402
1403 cur_pos= (uint32)my_b_tell(&cache);
1404 if (cur_pos == offset)
1405 valid_pos= true;
1406 if (found_format_description_event && found_gtid_list_event &&
1407 cur_pos >= offset)
1408 break;
1409
1410 packet.length(0);
1411 err= Log_event::read_log_event(&cache, &packet, fdev,
1412 opt_master_verify_checksum ? current_checksum_alg
1413 : BINLOG_CHECKSUM_ALG_OFF);
1414 if (unlikely(err))
1415 {
1416 errormsg= "Could not read binlog while searching for slave start "
1417 "position on master";
1418 goto end;
1419 }
1420 /*
1421 The cast to uchar is needed to avoid a signed char being converted to a
1422 negative number.
1423 */
1424 typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
1425 if (typ == FORMAT_DESCRIPTION_EVENT)
1426 {
1427 Format_description_log_event *tmp;
1428
1429 if (unlikely(found_format_description_event))
1430 {
1431 errormsg= "Duplicate format description log event found while "
1432 "searching for old-style position in binlog";
1433 goto end;
1434 }
1435
1436 current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
1437 found_format_description_event= true;
1438 if (unlikely(!(tmp= new Format_description_log_event(packet.ptr(),
1439 packet.length(),
1440 fdev))))
1441 {
1442 errormsg= "Corrupt Format_description event found or out-of-memory "
1443 "while searching for old-style position in binlog";
1444 goto end;
1445 }
1446 delete fdev;
1447 fdev= tmp;
1448 }
1449 else if (typ == START_ENCRYPTION_EVENT)
1450 {
1451 uint sele_len = packet.length();
1452 if (current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
1453 {
1454 sele_len -= BINLOG_CHECKSUM_LEN;
1455 }
1456 Start_encryption_log_event sele(packet.ptr(), sele_len, fdev);
1457 if (fdev->start_decryption(&sele))
1458 {
1459 errormsg= "Could not start decryption of binlog.";
1460 goto end;
1461 }
1462 }
1463 else if (unlikely(typ != FORMAT_DESCRIPTION_EVENT &&
1464 !found_format_description_event))
1465 {
1466 errormsg= "Did not find format description log event while searching "
1467 "for old-style position in binlog";
1468 goto end;
1469 }
1470 else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
1471 typ == BINLOG_CHECKPOINT_EVENT)
1472 continue; /* Continue looking */
1473 else if (typ == GTID_LIST_EVENT)
1474 {
1475 rpl_gtid *gtid_list;
1476 bool status;
1477 uint32 list_len;
1478
1479 if (unlikely(found_gtid_list_event))
1480 {
1481 errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
1482 "to find slave start position";
1483 goto end;
1484 }
1485 status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
1486 current_checksum_alg,
1487 &gtid_list, &list_len, fdev);
1488 if (unlikely(status))
1489 {
1490 errormsg= "Error reading Gtid_list_log_event while searching "
1491 "for old-style position in binlog";
1492 goto end;
1493 }
1494 err= gtid_state->load(gtid_list, list_len);
1495 my_free(gtid_list);
1496 if (unlikely(err))
1497 {
1498 errormsg= "Internal error (out of memory?) initialising slave state "
1499 "while scanning binlog to find start position";
1500 goto end;
1501 }
1502 found_gtid_list_event= true;
1503 }
1504 else if (unlikely(!found_gtid_list_event))
1505 {
1506 /* We did not find any Gtid_list_log_event, must be old binlog. */
1507 goto end;
1508 }
1509 else if (typ == GTID_EVENT)
1510 {
1511 rpl_gtid gtid;
1512 uchar flags2;
1513 if (unlikely(Gtid_log_event::peek(packet.ptr(), packet.length(),
1514 current_checksum_alg, &gtid.domain_id,
1515 &gtid.server_id, &gtid.seq_no, &flags2,
1516 fdev)))
1517 {
1518 errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
1519 "initial slave position";
1520 goto end;
1521 }
1522 if (unlikely(gtid_state->update(&gtid)))
1523 {
1524 errormsg= "Internal error (out of memory?) updating slave state while "
1525 "scanning binlog to find start position";
1526 goto end;
1527 }
1528 }
1529 }
1530
1531 if (unlikely(!valid_pos))
1532 {
1533 errormsg= "Slave requested incorrect position in master binlog. "
1534 "Requested position %u in file '%s', but this position does not "
1535 "correspond to the location of any binlog event.";
1536 }
1537
1538end:
1539 delete fdev;
1540 end_io_cache(&cache);
1541 mysql_file_close(file, MYF(MY_WME));
1542
1543 return errormsg;
1544}
1545
1546
1547int
1548gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
1549{
1550 slave_connection_state gtid_state;
1551 const char *lookup_name;
1552 char name_buf[FN_REFLEN];
1553 LOG_INFO linfo;
1554
1555 if (!mysql_bin_log.is_open())
1556 {
1557 my_error(ER_NO_BINARY_LOGGING, MYF(0));
1558 return 1;
1559 }
1560
1561 if (in_name && in_name[0])
1562 {
1563 mysql_bin_log.make_log_name(name_buf, in_name);
1564 lookup_name= name_buf;
1565 }
1566 else
1567 lookup_name= NULL;
1568 linfo.index_file_offset= 0;
1569 if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1))
1570 return 1;
1571
1572 if (pos < 4)
1573 pos= 4;
1574
1575 if (gtid_state_from_pos(linfo.log_file_name, pos, &gtid_state) ||
1576 gtid_state.to_string(out_str))
1577 return 1;
1578 return 0;
1579}
1580
1581
1582static bool
1583is_until_reached(binlog_send_info *info, ulong *ev_offset,
1584 Log_event_type event_type, const char **errmsg,
1585 uint32 current_pos)
1586{
1587 switch (info->gtid_until_group)
1588 {
1589 case GTID_UNTIL_NOT_DONE:
1590 return false;
1591 case GTID_UNTIL_STOP_AFTER_STANDALONE:
1592 if (Log_event::is_part_of_group(event_type))
1593 return false;
1594 break;
1595 case GTID_UNTIL_STOP_AFTER_TRANSACTION:
1596 if (event_type != XID_EVENT &&
1597 (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
1598 !Query_log_event::peek_is_commit_rollback
1599 (info->packet->ptr()+*ev_offset,
1600 info->packet->length()-*ev_offset,
1601 info->current_checksum_alg)))
1602 return false;
1603 break;
1604 }
1605
1606 /*
1607 The last event group has been sent, now the START SLAVE UNTIL condition
1608 has been reached.
1609
1610 Send a last fake Gtid_list_log_event with a flag set to mark that we
1611 stop due to UNTIL condition.
1612 */
1613 if (reset_transmit_packet(info, info->flags, ev_offset, errmsg))
1614 return true;
1615 Gtid_list_log_event glev(&info->until_binlog_state,
1616 Gtid_list_log_event::FLAG_UNTIL_REACHED);
1617 if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
1618 return true;
1619 *errmsg= NULL;
1620 return true;
1621}
1622
1623
1624/*
1625 Helper function for mysql_binlog_send() to write an event down the slave
1626 connection.
1627
1628 Returns NULL on success, error message string on error.
1629*/
1630static const char *
1631send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
1632 IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
1633{
1634 my_off_t pos;
1635 String* const packet= info->packet;
1636 size_t len= packet->length();
1637 int mariadb_slave_capability= info->mariadb_slave_capability;
1638 enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
1639 slave_connection_state *gtid_state= &info->gtid_state;
1640 slave_connection_state *until_gtid_state= info->until_gtid_state;
1641 bool need_sync= false;
1642
1643 if (event_type == GTID_LIST_EVENT &&
1644 info->using_gtid_state && until_gtid_state)
1645 {
1646 rpl_gtid *gtid_list;
1647 uint32 list_len;
1648 bool err;
1649
1650 if (ev_offset > len ||
1651 Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
1652 current_checksum_alg,
1653 &gtid_list, &list_len, info->fdev))
1654 {
1655 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1656 return "Failed to read Gtid_list_log_event: corrupt binlog";
1657 }
1658 err= info->until_binlog_state.load(gtid_list, list_len);
1659 my_free(gtid_list);
1660 if (err)
1661 {
1662 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1663 return "Failed in internal GTID book-keeping: Out of memory";
1664 }
1665 }
1666
1667 /* Skip GTID event groups until we reach slave position within a domain_id. */
1668 if (event_type == GTID_EVENT && info->using_gtid_state)
1669 {
1670 uchar flags2;
1671 slave_connection_state::entry *gtid_entry;
1672 rpl_gtid *gtid;
1673
1674 if (gtid_state->count() > 0 || until_gtid_state)
1675 {
1676 rpl_gtid event_gtid;
1677
1678 if (ev_offset > len ||
1679 Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
1680 current_checksum_alg,
1681 &event_gtid.domain_id, &event_gtid.server_id,
1682 &event_gtid.seq_no, &flags2, info->fdev))
1683 {
1684 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1685 return "Failed to read Gtid_log_event: corrupt binlog";
1686 }
1687
1688 DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
1689 {
1690 rpl_gtid *dbug_gtid;
1691 if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
1692 dbug_gtid->seq_no == 100)
1693 {
1694 DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
1695 DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
1696 info->error= ER_UNKNOWN_ERROR;
1697 return "DBUG-injected forced reconnect";
1698 }
1699 });
1700
1701 if (info->until_binlog_state.update_nolock(&event_gtid, false))
1702 {
1703 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1704 return "Failed in internal GTID book-keeping: Out of memory";
1705 }
1706
1707 if (gtid_state->count() > 0)
1708 {
1709 gtid_entry= gtid_state->find_entry(event_gtid.domain_id);
1710 if (gtid_entry != NULL)
1711 {
1712 gtid= &gtid_entry->gtid;
1713 if (gtid_entry->flags & slave_connection_state::START_ON_EMPTY_DOMAIN)
1714 {
1715 rpl_gtid master_gtid;
1716 if (!mysql_bin_log.find_in_binlog_state(gtid->domain_id,
1717 gtid->server_id,
1718 &master_gtid) ||
1719 master_gtid.seq_no < gtid->seq_no)
1720 {
1721 int err;
1722 const char *errormsg;
1723 *error_gtid= *gtid;
1724 give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
1725 info->error= err;
1726 return errormsg;
1727 }
1728 gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
1729 }
1730
1731 /* Skip this event group if we have not yet reached slave start pos. */
1732 if (event_gtid.server_id != gtid->server_id ||
1733 event_gtid.seq_no <= gtid->seq_no)
1734 info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
1735 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1736 if (event_gtid.server_id == gtid->server_id &&
1737 event_gtid.seq_no >= gtid->seq_no)
1738 {
1739 if (info->slave_gtid_strict_mode &&
1740 event_gtid.seq_no > gtid->seq_no &&
1741 !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
1742 {
1743 /*
1744 In strict mode, it is an error if the slave requests to start
1745 in a "hole" in the master's binlog: a GTID that does not
1746 exist, even though both the prior and subsequent seq_no exists
1747 for same domain_id and server_id.
1748 */
1749 info->error= ER_GTID_START_FROM_BINLOG_HOLE;
1750 *error_gtid= *gtid;
1751 return "The binlog on the master is missing the GTID requested "
1752 "by the slave (even though both a prior and a subsequent "
1753 "sequence number does exist), and GTID strict mode is enabled.";
1754 }
1755
1756 /*
1757 Send a fake Gtid_list event to the slave.
1758 This allows the slave to update its current binlog position
1759 so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
1760 The fake event will be sent at the end of this event group.
1761 */
1762 info->send_fake_gtid_list= true;
1763
1764 /*
1765 Delete this entry if we have reached slave start position (so we
1766 will not skip subsequent events and won't have to look them up
1767 and check).
1768 */
1769 gtid_state->remove(gtid);
1770 }
1771 }
1772 }
1773
1774 if (until_gtid_state)
1775 {
1776 gtid= until_gtid_state->find(event_gtid.domain_id);
1777 if (gtid == NULL)
1778 {
1779 /*
1780 This domain already reached the START SLAVE UNTIL stop condition,
1781 so skip this event group.
1782 */
1783 info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
1784 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1785 }
1786 else if (event_gtid.server_id == gtid->server_id &&
1787 event_gtid.seq_no >= gtid->seq_no)
1788 {
1789 /*
1790 We have reached the stop condition.
1791 Delete this domain_id from the hash, so we will skip all further
1792 events in this domain and eventually stop when all domains are
1793 done.
1794 */
1795 uint64 until_seq_no= gtid->seq_no;
1796 until_gtid_state->remove(gtid);
1797 if (until_gtid_state->count() == 0)
1798 info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
1799 GTID_UNTIL_STOP_AFTER_STANDALONE :
1800 GTID_UNTIL_STOP_AFTER_TRANSACTION);
1801 if (event_gtid.seq_no > until_seq_no)
1802 {
1803 /*
1804 The GTID in START SLAVE UNTIL condition is missing in our binlog.
1805 This should normally not happen (user error), but since we can be
1806 sure that we are now beyond the position that the UNTIL condition
1807 should be in, we can just stop now. And we also need to skip this
1808 event group (as it is beyond the UNTIL condition).
1809 */
1810 info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
1811 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
1812 }
1813 }
1814 }
1815 }
1816 }
1817
1818 /*
1819 Skip event group if we have not yet reached the correct slave GTID position.
1820
1821 Note that slave that understands GTID can also tolerate holes, so there is
1822 no need to supply dummy event.
1823 */
1824 switch (info->gtid_skip_group)
1825 {
1826 case GTID_SKIP_STANDALONE:
1827 if (!Log_event::is_part_of_group(event_type))
1828 info->gtid_skip_group= GTID_SKIP_NOT;
1829 return NULL;
1830 case GTID_SKIP_TRANSACTION:
1831 if (event_type == XID_EVENT ||
1832 (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
1833 Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
1834 len - ev_offset,
1835 current_checksum_alg)))
1836 info->gtid_skip_group= GTID_SKIP_NOT;
1837 return NULL;
1838 case GTID_SKIP_NOT:
1839 break;
1840 }
1841
1842 /* Do not send annotate_rows events unless slave requested it. */
1843 if (event_type == ANNOTATE_ROWS_EVENT &&
1844 !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
1845 {
1846 if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
1847 {
1848 /* This slave can tolerate events omitted from the binlog stream. */
1849 return NULL;
1850 }
1851 else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
1852 {
1853 /*
1854 The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
1855 it will not log them in its own binary log). However, it understands the
1856 event and will just ignore it, and it would break if we omitted it,
1857 leaving a hole in the binlog stream. So just send the event as-is.
1858 */
1859 }
1860 else
1861 {
1862 /*
1863 The slave does not understand ANNOTATE_ROWS_EVENT.
1864
1865 Older MariaDB slaves (and MySQL slaves) will break replication if there
1866 are holes in the binlog stream (they will miscompute the binlog offset
1867 and request the wrong position when reconnecting).
1868
1869 So replace the event with a dummy event of the same size that will be
1870 a no-operation on the slave.
1871 */
1872 if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
1873 {
1874 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1875 return "Failed to replace row annotate event with dummy: too small event.";
1876 }
1877 }
1878 }
1879
1880 /*
1881 Replace GTID events with old-style BEGIN events for slaves that do not
1882 understand global transaction IDs. For stand-alone events, where there is
1883 no terminating COMMIT query event, omit the GTID event or replace it with
1884 a dummy event, as appropriate.
1885 */
1886 if (event_type == GTID_EVENT &&
1887 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
1888 {
1889 bool need_dummy=
1890 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
1891 bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
1892 ev_offset,
1893 current_checksum_alg);
1894 if (err)
1895 {
1896 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1897 return "Failed to replace GTID event with backwards-compatible event: "
1898 "currupt event.";
1899 }
1900 if (!need_dummy)
1901 return NULL;
1902 }
1903
1904 /*
1905 Do not send binlog checkpoint or gtid list events to a slave that does not
1906 understand it.
1907 */
1908 if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
1909 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
1910 (unlikely(event_type == GTID_LIST_EVENT) &&
1911 mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
1912 {
1913 if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
1914 {
1915 /* This slave can tolerate events omitted from the binlog stream. */
1916 return NULL;
1917 }
1918 else
1919 {
1920 /*
1921 The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy
1922 event instead, with same length so slave does not get confused about
1923 binlog positions.
1924 */
1925 if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
1926 {
1927 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1928 return "Failed to replace binlog checkpoint or gtid list event with "
1929 "dummy: too small event.";
1930 }
1931 }
1932 }
1933
1934 /*
1935 Skip events with the @@skip_replication flag set, if slave requested
1936 skipping of such events.
1937 */
1938 if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
1939 {
1940 uint16 event_flags= uint2korr(&((*packet)[FLAGS_OFFSET + ev_offset]));
1941
1942 if (event_flags & LOG_EVENT_SKIP_REPLICATION_F)
1943 return NULL;
1944 }
1945
1946 THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
1947
1948 pos= my_b_tell(log);
1949 if (repl_semisync_master.update_sync_header(info->thd,
1950 (uchar*) packet->c_ptr(),
1951 info->log_file_name + info->dirlen,
1952 pos, &need_sync))
1953 {
1954 info->error= ER_UNKNOWN_ERROR;
1955 return "run 'before_send_event' hook failed";
1956 }
1957
1958 if (my_net_write(info->net, (uchar*) packet->ptr(), len))
1959 {
1960 info->error= ER_UNKNOWN_ERROR;
1961 return "Failed on my_net_write()";
1962 }
1963
1964 DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
1965 if (event_type == LOAD_EVENT)
1966 {
1967 if (send_file(info->thd))
1968 {
1969 info->error= ER_UNKNOWN_ERROR;
1970 return "failed in send_file()";
1971 }
1972 }
1973
1974 if (need_sync && repl_semisync_master.flush_net(info->thd, packet->c_ptr()))
1975 {
1976 info->error= ER_UNKNOWN_ERROR;
1977 return "Failed to run hook 'after_send_event'";
1978 }
1979
1980 return NULL; /* Success */
1981}
1982
1983static int check_start_offset(binlog_send_info *info,
1984 const char *log_file_name,
1985 my_off_t pos)
1986{
1987 IO_CACHE log;
1988 File file= -1;
1989
1990 /** check that requested position is inside of file */
1991 if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0)
1992 {
1993 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1994 return 1;
1995 }
1996
1997 if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
1998 {
1999 const char* msg= "Client requested master to start replication from "
2000 "impossible position";
2001
2002 info->errmsg= NULL; // don't do further modifications of error_text
2003 snprintf(info->error_text, sizeof(info->error_text),
2004 "%s; the first event '%s' at %lld, "
2005 "the last event read from '%s' at %d, "
2006 "the last byte read from '%s' at %d.",
2007 msg,
2008 my_basename(info->start_log_file_name), pos,
2009 my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE,
2010 my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE);
2011 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2012 goto err;
2013 }
2014
2015err:
2016 end_io_cache(&log);
2017 mysql_file_close(file, MYF(MY_WME));
2018 return info->error;
2019}
2020
2021static int init_binlog_sender(binlog_send_info *info,
2022 LOG_INFO *linfo,
2023 const char *log_ident,
2024 my_off_t *pos)
2025{
2026 THD *thd= info->thd;
2027 int error;
2028 char str_buf[128];
2029 String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
2030 char str_buf2[128];
2031 String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
2032 connect_gtid_state.length(0);
2033
2034 /** save start file/pos that was requested by slave */
2035 strmake(info->start_log_file_name, log_ident,
2036 sizeof(info->start_log_file_name));
2037 info->start_pos= *pos;
2038
2039 /** init last pos */
2040 info->last_pos= *pos;
2041
2042 info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
2043 info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
2044 info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
2045 DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
2046 info->using_gtid_state= false;);
2047
2048 if (info->using_gtid_state)
2049 {
2050 info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
2051 info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
2052 if (get_slave_until_gtid(thd, &slave_until_gtid_str))
2053 info->until_gtid_state= &info->until_gtid_state_obj;
2054 }
2055
2056 DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
2057 {
2058 DBUG_SET("-d,binlog_force_reconnect_after_22_events");
2059 DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
2060 info->dbug_reconnect_counter= 22;
2061 });
2062
2063 if (global_system_variables.log_warnings > 1)
2064 sql_print_information(
2065 "Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
2066 thd->variables.server_id, log_ident, (ulong)*pos);
2067
2068#ifndef DBUG_OFF
2069 if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
2070 {
2071 info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover";
2072 info->error= ER_UNKNOWN_ERROR;
2073 return 1;
2074 }
2075#endif
2076
2077 if (!mysql_bin_log.is_open())
2078 {
2079 info->errmsg= "Binary log is not open";
2080 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2081 return 1;
2082 }
2083
2084 char search_file_name[FN_REFLEN];
2085 const char *name=search_file_name;
2086 if (info->using_gtid_state)
2087 {
2088 if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(),
2089 connect_gtid_state.length()))
2090 {
2091 info->errmsg= "Out of memory or malformed slave request when obtaining "
2092 "start position from GTID state";
2093 info->error= ER_UNKNOWN_ERROR;
2094 return 1;
2095 }
2096 if (info->until_gtid_state &&
2097 info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
2098 slave_until_gtid_str.length()))
2099 {
2100 info->errmsg= "Out of memory or malformed slave request when "
2101 "obtaining UNTIL position sent from slave";
2102 info->error= ER_UNKNOWN_ERROR;
2103 return 1;
2104 }
2105 if (unlikely((error= check_slave_start_position(info, &info->errmsg,
2106 &info->error_gtid))))
2107 {
2108 info->error= error;
2109 return 1;
2110 }
2111 if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
2112 search_file_name,
2113 info->until_gtid_state)))
2114 {
2115 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2116 return 1;
2117 }
2118
2119 /* start from beginning of binlog file */
2120 *pos = 4;
2121 }
2122 else
2123 {
2124 if (log_ident[0])
2125 mysql_bin_log.make_log_name(search_file_name, log_ident);
2126 else
2127 name=0; // Find first log
2128 }
2129 linfo->index_file_offset= 0;
2130
2131 if (mysql_bin_log.find_log_pos(linfo, name, 1))
2132 {
2133 info->errmsg= "Could not find first log file name in binary "
2134 "log index file";
2135 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2136 return 1;
2137 }
2138
2139 // set current pos too
2140 linfo->pos= *pos;
2141
2142 // note: publish that we use file, before we open it
2143 thd->current_linfo= linfo;
2144
2145 if (check_start_offset(info, linfo->log_file_name, *pos))
2146 return 1;
2147
2148 if (*pos > BIN_LOG_HEADER_SIZE)
2149 {
2150 /*
2151 mark that first format descriptor with "log_pos=0", so the slave
2152 should not increment master's binlog position
2153 (rli->group_master_log_pos)
2154 */
2155 info->clear_initial_log_pos= true;
2156 }
2157
2158 return 0;
2159}
2160
2161/**
2162 * send format descriptor event for one binlog file
2163 */
2164static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log,
2165 LOG_INFO *linfo, my_off_t start_pos)
2166{
2167 int error;
2168 ulong ev_offset;
2169 THD *thd= info->thd;
2170 String *packet= info->packet;
2171 Log_event_type event_type;
2172 DBUG_ENTER("send_format_descriptor_event");
2173
2174 /**
2175 * 1) reset fdev before each log-file
2176 * 2) read first event, should be the format descriptor
2177 * 3) read second event, *might* be start encryption event
2178 * if it's isn't, seek back to undo this read
2179 */
2180 if (info->fdev != NULL)
2181 delete info->fdev;
2182
2183 if (!(info->fdev= new Format_description_log_event(3)))
2184 {
2185 info->errmsg= "Out of memory initializing format_description event";
2186 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2187 DBUG_RETURN(1);
2188 }
2189
2190 /* reset transmit packet for the event read from binary log file */
2191 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
2192 DBUG_RETURN(1);
2193
2194 /*
2195 Try to find a Format_description_log_event at the beginning of
2196 the binlog
2197 */
2198 info->last_pos= my_b_tell(log);
2199 error= Log_event::read_log_event(log, packet, info->fdev,
2200 opt_master_verify_checksum
2201 ? info->current_checksum_alg
2202 : BINLOG_CHECKSUM_ALG_OFF);
2203 linfo->pos= my_b_tell(log);
2204
2205 if (unlikely(error))
2206 {
2207 set_read_error(info, error);
2208 DBUG_RETURN(1);
2209 }
2210
2211 event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
2212
2213 /*
2214 The packet has offsets equal to the normal offsets in a
2215 binlog event + ev_offset (the first ev_offset characters are
2216 the header (default \0)).
2217 */
2218 DBUG_PRINT("info",
2219 ("Looked for a Format_description_log_event, "
2220 "found event type %d", (int)event_type));
2221
2222 if (event_type != FORMAT_DESCRIPTION_EVENT)
2223 {
2224 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2225 info->errmsg= "Failed to find format descriptor event in start of binlog";
2226 sql_print_warning("Failed to find format descriptor event in "
2227 "start of binlog: %s",
2228 info->log_file_name);
2229 DBUG_RETURN(1);
2230 }
2231
2232 info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
2233 packet->length() - ev_offset);
2234
2235 DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
2236 info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
2237 info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
2238
2239 if (!is_slave_checksum_aware(thd) &&
2240 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
2241 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2242 {
2243 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2244 info->errmsg= "Slave can not handle replication events with the "
2245 "checksum that master is configured to log";
2246 sql_print_warning("Master is configured to log replication events "
2247 "with checksum, but will not send such events to "
2248 "slaves that cannot process them");
2249 DBUG_RETURN(1);
2250 }
2251
2252 uint ev_len= packet->length() - ev_offset;
2253 if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2254 ev_len-= BINLOG_CHECKSUM_LEN;
2255
2256 Format_description_log_event *tmp;
2257 if (!(tmp= new Format_description_log_event(packet->ptr() + ev_offset,
2258 ev_len, info->fdev)))
2259 {
2260 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2261 info->errmsg= "Corrupt Format_description event found "
2262 "or out-of-memory";
2263 DBUG_RETURN(1);
2264 }
2265 delete info->fdev;
2266 info->fdev= tmp;
2267
2268 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
2269
2270 if (info->clear_initial_log_pos)
2271 {
2272 info->clear_initial_log_pos= false;
2273 /*
2274 mark that this event with "log_pos=0", so the slave
2275 should not increment master's binlog position
2276 (rli->group_master_log_pos)
2277 */
2278 int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
2279
2280 /*
2281 if reconnect master sends FD event with `created' as 0
2282 to avoid destroying temp tables.
2283 */
2284 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
2285 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
2286
2287 /* fix the checksum due to latest changes in header */
2288 if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
2289 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2290 fix_checksum(packet, ev_offset);
2291 }
2292 else if (info->using_gtid_state)
2293 {
2294 /*
2295 If this event has the field `created' set, then it will cause the
2296 slave to delete all active temporary tables. This must not happen
2297 if the slave received any later GTIDs in a previous connect, as
2298 those GTIDs might have created new temporary tables that are still
2299 needed.
2300
2301 So here, we check if the starting GTID position was already
2302 reached before this format description event. If not, we clear the
2303 `created' flag to preserve temporary tables on the slave. (If the
2304 slave connects at a position past this event, it means that it
2305 already received and handled it in a previous connect).
2306 */
2307 if (!info->gtid_state.is_pos_reached())
2308 {
2309 int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
2310 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
2311 if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
2312 info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
2313 fix_checksum(packet, ev_offset);
2314 }
2315 }
2316
2317 /* send it */
2318 if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
2319 {
2320 info->errmsg= "Failed on my_net_write()";
2321 info->error= ER_UNKNOWN_ERROR;
2322 DBUG_RETURN(1);
2323 }
2324
2325 /*
2326 Read the following Start_encryption_log_event but don't send it to slave.
2327 Slave doesn't need to know whether master's binlog is encrypted,
2328 and if it'll want to encrypt its logs, it should generate its own
2329 random nonce, not use the one from the master.
2330 */
2331 packet->length(0);
2332 info->last_pos= linfo->pos;
2333 error= Log_event::read_log_event(log, packet, info->fdev,
2334 opt_master_verify_checksum
2335 ? info->current_checksum_alg
2336 : BINLOG_CHECKSUM_ALG_OFF);
2337 linfo->pos= my_b_tell(log);
2338
2339 if (unlikely(error))
2340 {
2341 set_read_error(info, error);
2342 DBUG_RETURN(1);
2343 }
2344
2345 event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET]);
2346 if (event_type == START_ENCRYPTION_EVENT)
2347 {
2348 Start_encryption_log_event *sele= (Start_encryption_log_event *)
2349 Log_event::read_log_event(packet->ptr(), packet->length(), &info->errmsg,
2350 info->fdev, BINLOG_CHECKSUM_ALG_OFF);
2351 if (!sele)
2352 {
2353 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2354 DBUG_RETURN(1);
2355 }
2356
2357 if (info->fdev->start_decryption(sele))
2358 {
2359 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2360 info->errmsg= "Could not decrypt binlog: encryption key error";
2361 delete sele;
2362 DBUG_RETURN(1);
2363 }
2364 delete sele;
2365 }
2366 else if (start_pos == BIN_LOG_HEADER_SIZE)
2367 {
2368 /*
2369 not Start_encryption_log_event - seek back. But only if
2370 send_one_binlog_file() isn't going to seek anyway
2371 */
2372 my_b_seek(log, info->last_pos);
2373 linfo->pos= info->last_pos;
2374 }
2375
2376
2377 /** all done */
2378 DBUG_RETURN(0);
2379}
2380
2381static bool should_stop(binlog_send_info *info)
2382{
2383 return
2384 info->net->error ||
2385 info->net->vio == NULL ||
2386 info->thd->killed ||
2387 info->error != 0 ||
2388 info->should_stop;
2389}
2390
2391/**
2392 * wait for new events to enter binlog
2393 * this function will send heartbeats while waiting if so configured
2394 */
2395static int wait_new_events(binlog_send_info *info, /* in */
2396 LOG_INFO* linfo, /* in */
2397 char binlog_end_pos_filename[], /* out */
2398 my_off_t *end_pos_ptr) /* out */
2399{
2400 int ret= 1;
2401 PSI_stage_info old_stage;
2402
2403 mysql_bin_log.lock_binlog_end_pos();
2404 info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(),
2405 mysql_bin_log.get_binlog_end_pos_lock(),
2406 &stage_master_has_sent_all_binlog_to_slave,
2407 &old_stage);
2408
2409 while (!should_stop(info))
2410 {
2411 *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
2412 if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
2413 {
2414 /* there has been a log file switch, we don't need to wait */
2415 ret= 0;
2416 break;
2417 }
2418
2419 if (linfo->pos < *end_pos_ptr)
2420 {
2421 /* there is data to read, we don't need to wait */
2422 ret= 0;
2423 break;
2424 }
2425
2426 if (info->heartbeat_period)
2427 {
2428 struct timespec ts;
2429 set_timespec_nsec(ts, info->heartbeat_period);
2430 ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts);
2431 if (ret == ETIMEDOUT || ret == ETIME)
2432 {
2433 struct event_coordinates coord = { linfo->log_file_name, linfo->pos };
2434#ifndef DBUG_OFF
2435 const ulong hb_info_counter_limit = 3;
2436 if (info->hb_info_counter < hb_info_counter_limit)
2437 {
2438 sql_print_information("master sends heartbeat message %s:%llu",
2439 linfo->log_file_name, linfo->pos);
2440 info->hb_info_counter++;
2441 if (info->hb_info_counter == hb_info_counter_limit)
2442 sql_print_information("the rest of heartbeat info skipped ...");
2443 }
2444#endif
2445 mysql_bin_log.unlock_binlog_end_pos();
2446 ret= send_heartbeat_event(info,
2447 info->net, info->packet, &coord,
2448 info->current_checksum_alg);
2449 mysql_bin_log.lock_binlog_end_pos();
2450
2451 if (ret)
2452 {
2453 ret= 1; // error
2454 break;
2455 }
2456 /**
2457 * re-read heartbeat period after each sent
2458 */
2459 info->heartbeat_period= get_heartbeat_period(info->thd);
2460 }
2461 else if (ret != 0)
2462 {
2463 ret= 1; // error
2464 break;
2465 }
2466 }
2467 else
2468 {
2469 ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
2470 if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
2471 {
2472 ret= 1; // error
2473 break;
2474 }
2475 }
2476 }
2477
2478 /* it releases the lock set in ENTER_COND */
2479 info->thd->EXIT_COND(&old_stage);
2480 return ret;
2481}
2482
2483/**
2484 * get end pos of current log file, this function
2485 * will wait if there is nothing available
2486 */
2487static my_off_t get_binlog_end_pos(binlog_send_info *info,
2488 IO_CACHE* log,
2489 LOG_INFO* linfo)
2490{
2491 my_off_t log_pos= my_b_tell(log);
2492
2493 /**
2494 * get current binlog end pos
2495 */
2496 mysql_bin_log.lock_binlog_end_pos();
2497 char binlog_end_pos_filename[FN_REFLEN];
2498 my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
2499 mysql_bin_log.unlock_binlog_end_pos();
2500
2501 do
2502 {
2503 if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0)
2504 {
2505 /**
2506 * this file is not active, since it's not written to again,
2507 * it safe to check file length and use that as end_pos
2508 */
2509 end_pos= my_b_filelength(log);
2510
2511 if (log_pos == end_pos)
2512 return 0; // already at end of file inactive file
2513 else
2514 return end_pos; // return size of inactive file
2515 }
2516 else
2517 {
2518 /**
2519 * this is the active file
2520 */
2521
2522 if (log_pos < end_pos)
2523 {
2524 /**
2525 * there is data available to read
2526 */
2527 return end_pos;
2528 }
2529
2530 /**
2531 * check if we should wait for more data
2532 */
2533 if ((info->flags & BINLOG_DUMP_NON_BLOCK) ||
2534 (info->thd->variables.server_id == 0))
2535 {
2536 info->should_stop= true;
2537 return 0;
2538 }
2539
2540 /**
2541 * flush data before waiting
2542 */
2543 if (net_flush(info->net))
2544 {
2545 info->errmsg= "failed on net_flush()";
2546 info->error= ER_UNKNOWN_ERROR;
2547 return 1;
2548 }
2549
2550 if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos))
2551 return 1;
2552 }
2553 } while (!should_stop(info));
2554
2555 return 0;
2556}
2557
2558/**
2559 * This function sends events from one binlog file
2560 * but only up until end_pos
2561 *
2562 * return 0 - OK
2563 * else NOK
2564 */
2565static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
2566 my_off_t end_pos)
2567{
2568 int error;
2569 ulong ev_offset;
2570
2571 String *packet= info->packet;
2572 linfo->pos= my_b_tell(log);
2573 info->last_pos= my_b_tell(log);
2574
2575 while (linfo->pos < end_pos)
2576 {
2577 if (should_stop(info))
2578 return 0;
2579
2580 /* reset the transmit packet for the event read from binary log
2581 file */
2582 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
2583 return 1;
2584
2585 info->last_pos= linfo->pos;
2586 error= Log_event::read_log_event(log, packet, info->fdev,
2587 opt_master_verify_checksum ? info->current_checksum_alg
2588 : BINLOG_CHECKSUM_ALG_OFF);
2589 linfo->pos= my_b_tell(log);
2590
2591 if (unlikely(error))
2592 {
2593 set_read_error(info, error);
2594 return 1;
2595 }
2596
2597 Log_event_type event_type=
2598 (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
2599
2600#ifndef DBUG_OFF
2601 if (info->dbug_reconnect_counter > 0)
2602 {
2603 --info->dbug_reconnect_counter;
2604 if (info->dbug_reconnect_counter == 0)
2605 {
2606 info->errmsg= "DBUG-injected forced reconnect";
2607 info->error= ER_UNKNOWN_ERROR;
2608 return 1;
2609 }
2610 }
2611#endif
2612
2613#ifdef ENABLED_DEBUG_SYNC
2614 DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
2615 {
2616 if (event_type == XID_EVENT)
2617 {
2618 net_flush(info->net);
2619 const char act[]=
2620 "now "
2621 "wait_for signal.continue";
2622 DBUG_ASSERT(debug_sync_service);
2623 DBUG_ASSERT(!debug_sync_set_action(
2624 info->thd,
2625 STRING_WITH_LEN(act)));
2626
2627 const char act2[]=
2628 "now "
2629 "signal signal.continued";
2630 DBUG_ASSERT(!debug_sync_set_action(
2631 info->thd,
2632 STRING_WITH_LEN(act2)));
2633 }
2634 });
2635#endif
2636
2637 if (event_type != START_ENCRYPTION_EVENT &&
2638 ((info->errmsg= send_event_to_slave(info, event_type, log,
2639 ev_offset, &info->error_gtid))))
2640 return 1;
2641
2642 if (unlikely(info->send_fake_gtid_list) &&
2643 info->gtid_skip_group == GTID_SKIP_NOT)
2644 {
2645 Gtid_list_log_event glev(&info->until_binlog_state, 0);
2646
2647 if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
2648 fake_gtid_list_event(info, &glev, &info->errmsg, (uint32)my_b_tell(log)))
2649 {
2650 info->error= ER_UNKNOWN_ERROR;
2651 return 1;
2652 }
2653 info->send_fake_gtid_list= false;
2654 }
2655
2656 if (info->until_gtid_state &&
2657 is_until_reached(info, &ev_offset, event_type, &info->errmsg,
2658 (uint32)my_b_tell(log)))
2659 {
2660 if (info->errmsg)
2661 {
2662 info->error= ER_UNKNOWN_ERROR;
2663 return 1;
2664 }
2665 info->should_stop= true;
2666 return 0;
2667 }
2668
2669 /* Abort server before it sends the XID_EVENT */
2670 DBUG_EXECUTE_IF("crash_before_send_xid",
2671 {
2672 if (event_type == XID_EVENT)
2673 {
2674 my_sleep(2000000);
2675 DBUG_SUICIDE();
2676 }
2677 });
2678 }
2679
2680 return 0;
2681}
2682
2683/**
2684 * This function sends one binlog file to slave
2685 *
2686 * return 0 - OK
2687 * 1 - NOK
2688 */
2689static int send_one_binlog_file(binlog_send_info *info,
2690 IO_CACHE* log,
2691 LOG_INFO* linfo,
2692 my_off_t start_pos)
2693{
2694 mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
2695
2696 /* seek to the requested position, to start the requested dump */
2697 if (start_pos != BIN_LOG_HEADER_SIZE)
2698 {
2699 my_b_seek(log, start_pos);
2700 linfo->pos= start_pos;
2701 }
2702
2703 while (!should_stop(info))
2704 {
2705 /**
2706 * get end pos of current log file, this function
2707 * will wait if there is nothing available
2708 */
2709 my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
2710 if (end_pos <= 1)
2711 {
2712 /** end of file or error */
2713 return (int)end_pos;
2714 }
2715 info->dirlen= dirname_length(info->log_file_name);
2716 /**
2717 * send events from current position up to end_pos
2718 */
2719 if (send_events(info, log, linfo, end_pos))
2720 return 1;
2721 }
2722
2723 return 1;
2724}
2725
2726void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
2727 ushort flags)
2728{
2729 LOG_INFO linfo;
2730
2731 IO_CACHE log;
2732 File file = -1;
2733 String* const packet= &thd->packet;
2734
2735 binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
2736 binlog_send_info *info= &infoobj;
2737 bool has_transmit_started= false;
2738
2739 int old_max_allowed_packet= thd->variables.max_allowed_packet;
2740 thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
2741
2742 DBUG_ENTER("mysql_binlog_send");
2743 DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
2744
2745 bzero((char*) &log,sizeof(log));
2746
2747 if (init_binlog_sender(info, &linfo, log_ident, &pos))
2748 goto err;
2749
2750 has_transmit_started= true;
2751
2752 /* Check if the dump thread is created by a slave with semisync enabled. */
2753 thd->semi_sync_slave = is_semi_sync_slave();
2754 if (repl_semisync_master.dump_start(thd, log_ident, pos))
2755 {
2756 info->errmsg= "Failed to run hook 'transmit_start'";
2757 info->error= ER_UNKNOWN_ERROR;
2758 goto err;
2759 }
2760
2761 /*
2762 heartbeat_period from @master_heartbeat_period user variable
2763 NOTE: this is initialized after transmit_start-hook so that
2764 the hook can affect value of heartbeat period
2765 */
2766 info->heartbeat_period= get_heartbeat_period(thd);
2767
2768 while (!should_stop(info))
2769 {
2770 /*
2771 Tell the client about the log name with a fake Rotate event;
2772 this is needed even if we also send a Format_description_log_event
2773 just after, because that event does not contain the binlog's name.
2774 Note that as this Rotate event is sent before
2775 Format_description_log_event, the slave cannot have any info to
2776 understand this event's format, so the header len of
2777 Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
2778 than other events except FORMAT_DESCRIPTION_EVENT).
2779 Before 4.0.14 we called fake_rotate_event below only if (pos ==
2780 BIN_LOG_HEADER_SIZE), because if this is false then the slave
2781 already knows the binlog's name.
2782 Since, we always call fake_rotate_event; if the slave already knew
2783 the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
2784 useless but does not harm much. It is nice for 3.23 (>=.58) slaves
2785 which test Rotate events to see if the master is 4.0 (then they
2786 choose to stop because they can't replicate 4.0); by always calling
2787 fake_rotate_event we are sure that 3.23.58 and newer will detect the
2788 problem as soon as replication starts (BUG#198).
2789 Always calling fake_rotate_event makes sending of normal
2790 (=from-binlog) Rotate events a priori unneeded, but it is not so
2791 simple: the 2 Rotate events are not equivalent, the normal one is
2792 before the Stop event, the fake one is after. If we don't send the
2793 normal one, then the Stop event will be interpreted (by existing 4.0
2794 slaves) as "the master stopped", which is wrong. So for safety,
2795 given that we want minimum modification of 4.0, we send the normal
2796 and fake Rotates.
2797 */
2798 if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg))
2799 {
2800 /*
2801 This error code is not perfect, as fake_rotate_event() does not
2802 read anything from the binlog; if it fails it's because of an
2803 error in my_net_write(), fortunately it will say so in errmsg.
2804 */
2805 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2806 goto err;
2807 }
2808
2809 if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0)
2810 {
2811 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2812 goto err;
2813 }
2814
2815 if (send_format_descriptor_event(info, &log, &linfo, pos))
2816 {
2817 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2818 goto err;
2819 }
2820
2821 /*
2822 We want to corrupt the first event that will be sent to the slave.
2823 But we do not want the corruption to happen early, eg. when client does
2824 BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
2825 set the real DBUG injection here.
2826 */
2827 DBUG_EXECUTE_IF("corrupt_read_log_event2_set",
2828 {
2829 DBUG_SET("-d,corrupt_read_log_event2_set");
2830 DBUG_SET("+d,corrupt_read_log_event2");
2831 });
2832
2833 /*
2834 Handle the case of START SLAVE UNTIL with an UNTIL condition already
2835 fulfilled at the start position.
2836
2837 We will send one event, the format_description, and then stop.
2838 */
2839 if (info->until_gtid_state && info->until_gtid_state->count() == 0)
2840 info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
2841
2842 THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
2843 if (send_one_binlog_file(info, &log, &linfo, pos))
2844 break;
2845
2846 if (should_stop(info))
2847 break;
2848
2849 DBUG_EXECUTE_IF("wait_after_binlog_EOF",
2850 {
2851 const char act[]= "now wait_for signal.rotate_finished";
2852 DBUG_ASSERT(!debug_sync_set_action(current_thd,
2853 STRING_WITH_LEN(act)));
2854 };);
2855
2856 THD_STAGE_INFO(thd,
2857 stage_finished_reading_one_binlog_switching_to_next_binlog);
2858 if (mysql_bin_log.find_next_log(&linfo, 1))
2859 {
2860 info->errmsg= "could not find next log";
2861 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2862 break;
2863 }
2864
2865 /** start from start of next file */
2866 pos= BIN_LOG_HEADER_SIZE;
2867
2868 /** close current cache/file */
2869 end_io_cache(&log);
2870 mysql_file_close(file, MYF(MY_WME));
2871 file= -1;
2872 }
2873
2874err:
2875 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
2876 if (has_transmit_started)
2877 {
2878 repl_semisync_master.dump_end(thd);
2879 }
2880
2881 if (info->thd->killed == KILL_SLAVE_SAME_ID)
2882 {
2883 info->errmsg= "A slave with the same server_uuid/server_id as this slave "
2884 "has connected to the master";
2885 info->error= ER_SLAVE_SAME_ID;
2886 }
2887
2888 const bool binlog_open = my_b_inited(&log);
2889 if (file >= 0)
2890 {
2891 end_io_cache(&log);
2892 mysql_file_close(file, MYF(MY_WME));
2893 }
2894
2895 thd->reset_current_linfo();
2896 thd->variables.max_allowed_packet= old_max_allowed_packet;
2897 delete info->fdev;
2898
2899 if (likely(info->error == 0))
2900 {
2901 my_eof(thd);
2902 DBUG_VOID_RETURN;
2903 }
2904
2905 if ((info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG ||
2906 info->error == ER_SLAVE_SAME_ID) && binlog_open)
2907 {
2908 /*
2909 detailing the fatal error message with coordinates
2910 of the last position read.
2911 */
2912 my_snprintf(info->error_text, sizeof(info->error_text),
2913 "%s; the first event '%s' at %lld, "
2914 "the last event read from '%s' at %lld, "
2915 "the last byte read from '%s' at %lld.",
2916 info->errmsg,
2917 my_basename(info->start_log_file_name), info->start_pos,
2918 my_basename(info->log_file_name), info->last_pos,
2919 my_basename(info->log_file_name), linfo.pos);
2920 }
2921 else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
2922 {
2923 my_snprintf(info->error_text, sizeof(info->error_text),
2924 "Error: connecting slave requested to start from GTID "
2925 "%u-%u-%llu, which is not in the master's binlog",
2926 info->error_gtid.domain_id,
2927 info->error_gtid.server_id,
2928 info->error_gtid.seq_no);
2929 /* Use this error code so slave will know not to try reconnect. */
2930 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2931 }
2932 else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
2933 {
2934 my_snprintf(info->error_text, sizeof(info->error_text),
2935 "Error: connecting slave requested to start from GTID "
2936 "%u-%u-%llu, which is not in the master's binlog. Since the "
2937 "master's binlog contains GTIDs with higher sequence numbers, "
2938 "it probably means that the slave has diverged due to "
2939 "executing extra erroneous transactions",
2940 info->error_gtid.domain_id,
2941 info->error_gtid.server_id,
2942 info->error_gtid.seq_no);
2943 /* Use this error code so slave will know not to try reconnect. */
2944 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2945 }
2946 else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE)
2947 {
2948 my_snprintf(info->error_text, sizeof(info->error_text),
2949 "The binlog on the master is missing the GTID %u-%u-%llu "
2950 "requested by the slave (even though both a prior and a "
2951 "subsequent sequence number does exist), and GTID strict mode "
2952 "is enabled",
2953 info->error_gtid.domain_id,
2954 info->error_gtid.server_id,
2955 info->error_gtid.seq_no);
2956 /* Use this error code so slave will know not to try reconnect. */
2957 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2958 }
2959 else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
2960 {
2961 my_snprintf(info->error_text, sizeof(info->error_text),
2962 "Failed to load replication slave GTID state from table %s.%s",
2963 "mysql", rpl_gtid_slave_state_table_name.str);
2964 info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
2965 }
2966 else if (info->errmsg != NULL)
2967 strcpy(info->error_text, info->errmsg);
2968
2969 my_message(info->error, info->error_text, MYF(0));
2970
2971 DBUG_VOID_RETURN;
2972}
2973
2974
2975/**
2976 Execute a START SLAVE statement.
2977
2978 @param thd Pointer to THD object for the client thread executing the
2979 statement.
2980
2981 @param mi Pointer to Master_info object for the slave's IO thread.
2982
2983 @param net_report If true, saves the exit status into thd->stmt_da.
2984
2985 @retval 0 success
2986 @retval 1 error
2987 @retval -1 fatal error
2988*/
2989
2990int start_slave(THD* thd , Master_info* mi, bool net_report)
2991{
2992 int slave_errno= 0;
2993 int thread_mask;
2994 char master_info_file_tmp[FN_REFLEN];
2995 char relay_log_info_file_tmp[FN_REFLEN];
2996 DBUG_ENTER("start_slave");
2997
2998 if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
2999 DBUG_RETURN(-1);
3000
3001 create_logfile_name_with_suffix(master_info_file_tmp,
3002 sizeof(master_info_file_tmp),
3003 master_info_file, 0,
3004 &mi->cmp_connection_name);
3005 create_logfile_name_with_suffix(relay_log_info_file_tmp,
3006 sizeof(relay_log_info_file_tmp),
3007 relay_log_info_file, 0,
3008 &mi->cmp_connection_name);
3009
3010 mi->lock_slave_threads();
3011 if (mi->killed)
3012 {
3013 /* connection was deleted while we waited for lock_slave_threads */
3014 mi->unlock_slave_threads();
3015 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3016 mi->connection_name.str);
3017 DBUG_RETURN(-1);
3018 }
3019
3020 // Get a mask of _stopped_ threads
3021 init_thread_mask(&thread_mask,mi,1 /* inverse */);
3022
3023 if (thd->lex->mi.gtid_pos_str.str)
3024 {
3025 if (thread_mask != (SLAVE_IO|SLAVE_SQL))
3026 {
3027 slave_errno= ER_SLAVE_WAS_RUNNING;
3028 goto err;
3029 }
3030 if (thd->lex->slave_thd_opt)
3031 {
3032 slave_errno= ER_BAD_SLAVE_UNTIL_COND;
3033 goto err;
3034 }
3035 if (mi->using_gtid == Master_info::USE_GTID_NO)
3036 {
3037 slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
3038 goto err;
3039 }
3040 }
3041
3042 /*
3043 Below we will start all stopped threads. But if the user wants to
3044 start only one thread, do as if the other thread was running (as we
3045 don't wan't to touch the other thread), so set the bit to 0 for the
3046 other thread
3047 */
3048 if (thd->lex->slave_thd_opt)
3049 thread_mask&= thd->lex->slave_thd_opt;
3050 if (thread_mask) //some threads are stopped, start them
3051 {
3052 if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
3053 thread_mask))
3054 slave_errno=ER_MASTER_INFO;
3055 else if (!*mi->host)
3056 {
3057 slave_errno= ER_BAD_SLAVE; net_report= 0;
3058 my_message(slave_errno, "Misconfigured slave: MASTER_HOST was not set; Fix in config file or with CHANGE MASTER TO",
3059 MYF(0));
3060 }
3061 else
3062 {
3063 /*
3064 If we will start SQL thread we will care about UNTIL options If
3065 not and they are specified we will ignore them and warn user
3066 about this fact.
3067 */
3068 if (thread_mask & SLAVE_SQL)
3069 {
3070 mysql_mutex_lock(&mi->rli.data_lock);
3071
3072 if (thd->lex->mi.pos)
3073 {
3074 if (thd->lex->mi.relay_log_pos)
3075 slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3076 mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
3077 mi->rli.until_log_pos= thd->lex->mi.pos;
3078 /*
3079 We don't check thd->lex->mi.log_file_name for NULL here
3080 since it is checked in sql_yacc.yy
3081 */
3082 strmake_buf(mi->rli.until_log_name, thd->lex->mi.log_file_name);
3083 }
3084 else if (thd->lex->mi.relay_log_pos)
3085 {
3086 mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
3087 mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
3088 strmake_buf(mi->rli.until_log_name, thd->lex->mi.relay_log_name);
3089 }
3090 else if (thd->lex->mi.gtid_pos_str.str)
3091 {
3092 if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
3093 thd->lex->mi.gtid_pos_str.length))
3094 {
3095 slave_errno= ER_INCORRECT_GTID_STATE;
3096 mysql_mutex_unlock(&mi->rli.data_lock);
3097 goto err;
3098 }
3099 mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
3100 }
3101 else
3102 mi->rli.clear_until_condition();
3103
3104 if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
3105 mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
3106 {
3107 /* Preparing members for effective until condition checking */
3108 const char *p= fn_ext(mi->rli.until_log_name);
3109 char *p_end;
3110 if (*p)
3111 {
3112 //p points to '.'
3113 mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
3114 /*
3115 p_end points to the first invalid character. If it equals
3116 to p, no digits were found, error. If it contains '\0' it
3117 means conversion went ok.
3118 */
3119 if (p_end==p || *p_end)
3120 slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3121 }
3122 else
3123 slave_errno=ER_BAD_SLAVE_UNTIL_COND;
3124
3125 /* mark the cached result of the UNTIL comparison as "undefined" */
3126 mi->rli.until_log_names_cmp_result=
3127 Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
3128 }
3129
3130 if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
3131 {
3132 /* Issuing warning then started without --skip-slave-start */
3133 if (!opt_skip_slave_start)
3134 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
3135 ER_MISSING_SKIP_SLAVE,
3136 ER_THD(thd, ER_MISSING_SKIP_SLAVE));
3137 }
3138
3139 mysql_mutex_unlock(&mi->rli.data_lock);
3140 }
3141 else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
3142 push_warning(thd,
3143 Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
3144 ER_THD(thd, ER_UNTIL_COND_IGNORED));
3145
3146 if (!slave_errno)
3147 slave_errno = start_slave_threads(thd,
3148 1,
3149 1 /* wait for start */,
3150 mi,
3151 master_info_file_tmp,
3152 relay_log_info_file_tmp,
3153 thread_mask);
3154 }
3155 }
3156 else
3157 {
3158 /* no error if all threads are already started, only a warning */
3159 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
3160 ER_THD(thd, ER_SLAVE_WAS_RUNNING));
3161 }
3162
3163err:
3164 mi->unlock_slave_threads();
3165 thd_proc_info(thd, 0);
3166
3167 if (slave_errno)
3168 {
3169 if (net_report)
3170 my_error(slave_errno, MYF(0),
3171 (int) mi->connection_name.length,
3172 mi->connection_name.str);
3173 DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1);
3174 }
3175
3176 DBUG_RETURN(0);
3177}
3178
3179
3180/**
3181 Execute a STOP SLAVE statement.
3182
3183 @param thd Pointer to THD object for the client thread executing the
3184 statement.
3185
3186 @param mi Pointer to Master_info object for the slave's IO thread.
3187
3188 @param net_report If true, saves the exit status into thd->stmt_da.
3189
3190 @retval 0 success
3191 @retval 1 error
3192 @retval -1 error
3193*/
3194
3195int stop_slave(THD* thd, Master_info* mi, bool net_report )
3196{
3197 int slave_errno;
3198 DBUG_ENTER("stop_slave");
3199 DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
3200
3201 if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
3202 DBUG_RETURN(-1);
3203 THD_STAGE_INFO(thd, stage_killing_slave);
3204 int thread_mask;
3205 mi->lock_slave_threads();
3206 /*
3207 Get a mask of _running_ threads.
3208 We don't have to test for mi->killed as the thread_mask will take care
3209 of checking if threads exists
3210 */
3211 init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
3212 /*
3213 Below we will stop all running threads.
3214 But if the user wants to stop only one thread, do as if the other thread
3215 was stopped (as we don't wan't to touch the other thread), so set the
3216 bit to 0 for the other thread
3217 */
3218 if (thd->lex->slave_thd_opt)
3219 thread_mask &= thd->lex->slave_thd_opt;
3220
3221 if (thread_mask)
3222 {
3223 slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */);
3224 }
3225 else
3226 {
3227 //no error if both threads are already stopped, only a warning
3228 slave_errno= 0;
3229 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
3230 ER_THD(thd, ER_SLAVE_WAS_NOT_RUNNING));
3231 }
3232
3233 mi->unlock_slave_threads();
3234
3235 if (slave_errno)
3236 {
3237 if (net_report)
3238 my_message(slave_errno, ER_THD(thd, slave_errno), MYF(0));
3239 DBUG_RETURN(1);
3240 }
3241
3242 DBUG_RETURN(0);
3243}
3244
3245
3246/**
3247 Execute a RESET SLAVE statement.
3248
3249 @param thd Pointer to THD object of the client thread executing the
3250 statement.
3251
3252 @param mi Pointer to Master_info object for the slave.
3253
3254 @retval 0 success
3255 @retval 1 error
3256*/
3257int reset_slave(THD *thd, Master_info* mi)
3258{
3259 MY_STAT stat_area;
3260 char fname[FN_REFLEN];
3261 int thread_mask= 0, error= 0;
3262 uint sql_errno=ER_UNKNOWN_ERROR;
3263 const char* errmsg= "Unknown error occurred while reseting slave";
3264 char master_info_file_tmp[FN_REFLEN];
3265 char relay_log_info_file_tmp[FN_REFLEN];
3266 DBUG_ENTER("reset_slave");
3267
3268 mi->lock_slave_threads();
3269 if (mi->killed)
3270 {
3271 /* connection was deleted while we waited for lock_slave_threads */
3272 mi->unlock_slave_threads();
3273 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3274 mi->connection_name.str);
3275 DBUG_RETURN(-1);
3276 }
3277
3278 init_thread_mask(&thread_mask,mi,0 /* not inverse */);
3279 if (thread_mask) // We refuse if any slave thread is running
3280 {
3281 mi->unlock_slave_threads();
3282 my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
3283 mi->connection_name.str);
3284 DBUG_RETURN(ER_SLAVE_MUST_STOP);
3285 }
3286
3287 // delete relay logs, clear relay log coordinates
3288 if (unlikely((error= purge_relay_logs(&mi->rli, thd,
3289 1 /* just reset */,
3290 &errmsg))))
3291 {
3292 sql_errno= ER_RELAY_LOG_FAIL;
3293 goto err;
3294 }
3295
3296 /* Clear master's log coordinates and associated information */
3297 mi->clear_in_memory_info(thd->lex->reset_slave_info.all);
3298
3299 /*
3300 Reset errors (the idea is that we forget about the
3301 old master).
3302 */
3303 mi->clear_error();
3304 mi->rli.clear_error();
3305 mi->rli.clear_until_condition();
3306 mi->rli.clear_sql_delay();
3307 mi->rli.slave_skip_counter= 0;
3308
3309 // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
3310 end_master_info(mi);
3311
3312 end_relay_log_info(&mi->rli);
3313 // and delete these two files
3314 create_logfile_name_with_suffix(master_info_file_tmp,
3315 sizeof(master_info_file_tmp),
3316 master_info_file, 0,
3317 &mi->cmp_connection_name);
3318 create_logfile_name_with_suffix(relay_log_info_file_tmp,
3319 sizeof(relay_log_info_file_tmp),
3320 relay_log_info_file, 0,
3321 &mi->cmp_connection_name);
3322
3323 fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
3324 if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
3325 mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
3326 {
3327 error=1;
3328 goto err;
3329 }
3330 else if (global_system_variables.log_warnings > 1)
3331 sql_print_information("Deleted Master_info file '%s'.", fname);
3332
3333 // delete relay_log_info_file
3334 fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
3335 if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
3336 mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
3337 {
3338 error=1;
3339 goto err;
3340 }
3341 else if (global_system_variables.log_warnings > 1)
3342 sql_print_information("Deleted Master_info file '%s'.", fname);
3343
3344 if (rpl_semi_sync_slave_enabled)
3345 repl_semisync_slave.reset_slave(mi);
3346err:
3347 mi->unlock_slave_threads();
3348 if (unlikely(error))
3349 my_error(sql_errno, MYF(0), errmsg);
3350 DBUG_RETURN(error);
3351}
3352
3353/*
3354
3355 Kill all Binlog_dump threads which previously talked to the same slave
3356 ("same" means with the same server id). Indeed, if the slave stops, if the
3357 Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it
3358 will keep existing until a query is written to the binlog. If the master is
3359 idle, then this could last long, and if the slave reconnects, we could have 2
3360 Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
3361 binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
3362 the master kills any existing thread with the slave's server id (if this id
3363 is not zero; it will be true for real slaves, but false for mysqlbinlog when
3364 it sends COM_BINLOG_DUMP to get a remote binlog dump).
3365
3366 SYNOPSIS
3367 kill_zombie_dump_threads()
3368 slave_server_id the slave's server id
3369*/
3370
3371void kill_zombie_dump_threads(uint32 slave_server_id)
3372{
3373 mysql_mutex_lock(&LOCK_thread_count);
3374 I_List_iterator<THD> it(threads);
3375 THD *tmp;
3376
3377 while ((tmp=it++))
3378 {
3379 if (tmp->get_command() == COM_BINLOG_DUMP &&
3380 tmp->variables.server_id == slave_server_id)
3381 {
3382 mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
3383 break;
3384 }
3385 }
3386 mysql_mutex_unlock(&LOCK_thread_count);
3387 if (tmp)
3388 {
3389 /*
3390 Here we do not call kill_one_thread() as
3391 it will be slow because it will iterate through the list
3392 again. We just to do kill the thread ourselves.
3393 */
3394 tmp->awake_no_mutex(KILL_SLAVE_SAME_ID);
3395 mysql_mutex_unlock(&tmp->LOCK_thd_kill);
3396 }
3397}
3398
3399/**
3400 Get value for a string parameter with error checking
3401
3402 Note that in case of error the original string should not be updated!
3403
3404 @ret 0 ok
3405 @ret 1 error
3406*/
3407
3408static bool get_string_parameter(char *to, const char *from, size_t length,
3409 const char *name, CHARSET_INFO *cs)
3410{
3411 if (from) // Empty paramaters allowed
3412 {
3413 size_t from_length= strlen(from);
3414 size_t from_numchars= cs->cset->numchars(cs, from, from + from_length);
3415 if (from_numchars > length / cs->mbmaxlen)
3416 {
3417 my_error(ER_WRONG_STRING_LENGTH, MYF(0), from, name,
3418 (int) (length / cs->mbmaxlen));
3419 return 1;
3420 }
3421 memcpy(to, from, from_length+1);
3422 }
3423 return 0;
3424}
3425
3426
3427/**
3428 Execute a CHANGE MASTER statement.
3429
3430 @param thd Pointer to THD object for the client thread executing the
3431 statement.
3432
3433 @param mi Pointer to Master_info object belonging to the slave's IO
3434 thread.
3435
3436 @param master_info_added Out parameter saying if the Master_info *mi was
3437 added to the global list of masters. This is useful in error conditions
3438 to know if caller should free Master_info *mi.
3439
3440 @retval FALSE success
3441 @retval TRUE error
3442*/
3443bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
3444{
3445 int thread_mask;
3446 const char* errmsg= 0;
3447 bool need_relay_log_purge= 1;
3448 bool ret= FALSE;
3449 char saved_host[HOSTNAME_LENGTH + 1];
3450 uint saved_port;
3451 char saved_log_name[FN_REFLEN];
3452 Master_info::enum_using_gtid saved_using_gtid;
3453 char master_info_file_tmp[FN_REFLEN];
3454 char relay_log_info_file_tmp[FN_REFLEN];
3455 my_off_t saved_log_pos;
3456 LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
3457 DYNAMIC_ARRAY *do_ids, *ignore_ids;
3458
3459 DBUG_ENTER("change_master");
3460
3461 DBUG_ASSERT(master_info_index);
3462 mysql_mutex_assert_owner(&LOCK_active_mi);
3463
3464 *master_info_added= false;
3465 /*
3466 We need to check if there is an empty master_host. Otherwise
3467 change master succeeds, a master.info file is created containing
3468 empty master_host string and when issuing: start slave; an error
3469 is thrown stating that the server is not configured as slave.
3470 (See BUG#28796).
3471 */
3472 if (lex_mi->host && !*lex_mi->host)
3473 {
3474 my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
3475 DBUG_RETURN(TRUE);
3476 }
3477 if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
3478 lex_mi->host,
3479 lex_mi->port))
3480 DBUG_RETURN(TRUE);
3481
3482 mi->lock_slave_threads();
3483 if (mi->killed)
3484 {
3485 /* connection was deleted while we waited for lock_slave_threads */
3486 mi->unlock_slave_threads();
3487 my_error(WARN_NO_MASTER_INFO, MYF(0), (int) mi->connection_name.length,
3488 mi->connection_name.str);
3489 DBUG_RETURN(TRUE);
3490 }
3491
3492 init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
3493 if (thread_mask) // We refuse if any slave thread is running
3494 {
3495 my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
3496 mi->connection_name.str);
3497 ret= TRUE;
3498 goto err;
3499 }
3500
3501 THD_STAGE_INFO(thd, stage_changing_master);
3502
3503 create_logfile_name_with_suffix(master_info_file_tmp,
3504 sizeof(master_info_file_tmp),
3505 master_info_file, 0,
3506 &mi->cmp_connection_name);
3507 create_logfile_name_with_suffix(relay_log_info_file_tmp,
3508 sizeof(relay_log_info_file_tmp),
3509 relay_log_info_file, 0,
3510 &mi->cmp_connection_name);
3511
3512 /* if new Master_info doesn't exists, add it */
3513 if (!master_info_index->get_master_info(&mi->connection_name,
3514 Sql_condition::WARN_LEVEL_NOTE))
3515 {
3516 if (master_info_index->add_master_info(mi, TRUE))
3517 {
3518 my_error(ER_MASTER_INFO, MYF(0),
3519 (int) lex_mi->connection_name.length,
3520 lex_mi->connection_name.str);
3521 ret= TRUE;
3522 goto err;
3523 }
3524 *master_info_added= true;
3525 }
3526 if (global_system_variables.log_warnings > 1)
3527 sql_print_information("Master connection name: '%.*s' "
3528 "Master_info_file: '%s' "
3529 "Relay_info_file: '%s'",
3530 (int) mi->connection_name.length,
3531 mi->connection_name.str,
3532 master_info_file_tmp, relay_log_info_file_tmp);
3533
3534 if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
3535 thread_mask))
3536 {
3537 my_error(ER_MASTER_INFO, MYF(0),
3538 (int) lex_mi->connection_name.length,
3539 lex_mi->connection_name.str);
3540 ret= TRUE;
3541 goto err;
3542 }
3543
3544 /*
3545 Data lock not needed since we have already stopped the running threads,
3546 and we have the hold on the run locks which will keep all threads that
3547 could possibly modify the data structures from running
3548 */
3549
3550 /*
3551 Before processing the command, save the previous state.
3552 */
3553 strmake_buf(saved_host, mi->host);
3554 saved_port= mi->port;
3555 strmake_buf(saved_log_name, mi->master_log_name);
3556 saved_log_pos= mi->master_log_pos;
3557 saved_using_gtid= mi->using_gtid;
3558
3559 /*
3560 If the user specified host or port without binlog or position,
3561 reset binlog's name to FIRST and position to 4.
3562 */
3563
3564 if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
3565 {
3566 mi->master_log_name[0] = 0;
3567 mi->master_log_pos= BIN_LOG_HEADER_SIZE;
3568 }
3569
3570 if (lex_mi->log_file_name)
3571 strmake_buf(mi->master_log_name, lex_mi->log_file_name);
3572 if (lex_mi->pos)
3573 {
3574 mi->master_log_pos= lex_mi->pos;
3575 }
3576 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3577
3578 if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
3579 "MASTER_HOST", system_charset_info) ||
3580 get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
3581 "MASTER_USER", system_charset_info) ||
3582 get_string_parameter(mi->password, lex_mi->password,
3583 sizeof(mi->password)-1, "MASTER_PASSWORD",
3584 &my_charset_bin))
3585 {
3586 ret= TRUE;
3587 goto err;
3588 }
3589
3590 if (lex_mi->port)
3591 mi->port = lex_mi->port;
3592 if (lex_mi->connect_retry)
3593 mi->connect_retry = lex_mi->connect_retry;
3594 if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3595 mi->heartbeat_period = lex_mi->heartbeat_period;
3596 else
3597 mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD,
3598 (slave_net_timeout/2.0));
3599 mi->received_heartbeats= 0; // counter lives until master is CHANGEd
3600
3601 /*
3602 Reset the last time server_id list if the current CHANGE MASTER
3603 is mentioning IGNORE_SERVER_IDS= (...)
3604 */
3605 if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
3606 {
3607 /* Check if the list contains replicate_same_server_id */
3608 for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i ++)
3609 {
3610 ulong s_id;
3611 get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
3612 if (s_id == global_system_variables.server_id && replicate_same_server_id)
3613 {
3614 my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
3615 ret= TRUE;
3616 goto err;
3617 }
3618 }
3619
3620 /* All ok. Update the old server ids with the new ones. */
3621 update_change_master_ids(&lex_mi->repl_ignore_server_ids,
3622 &mi->ignore_server_ids);
3623 }
3624
3625 if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3626 mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
3627
3628 if (lex_mi->sql_delay != -1)
3629 mi->rli.set_sql_delay(lex_mi->sql_delay);
3630
3631 if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
3632 mi->ssl_verify_server_cert=
3633 (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
3634
3635 if (lex_mi->ssl_ca)
3636 strmake_buf(mi->ssl_ca, lex_mi->ssl_ca);
3637 if (lex_mi->ssl_capath)
3638 strmake_buf(mi->ssl_capath, lex_mi->ssl_capath);
3639 if (lex_mi->ssl_cert)
3640 strmake_buf(mi->ssl_cert, lex_mi->ssl_cert);
3641 if (lex_mi->ssl_cipher)
3642 strmake_buf(mi->ssl_cipher, lex_mi->ssl_cipher);
3643 if (lex_mi->ssl_key)
3644 strmake_buf(mi->ssl_key, lex_mi->ssl_key);
3645 if (lex_mi->ssl_crl)
3646 strmake_buf(mi->ssl_crl, lex_mi->ssl_crl);
3647 if (lex_mi->ssl_crlpath)
3648 strmake_buf(mi->ssl_crlpath, lex_mi->ssl_crlpath);
3649
3650#ifndef HAVE_OPENSSL
3651 if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
3652 lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
3653 lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath)
3654 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
3655 ER_SLAVE_IGNORED_SSL_PARAMS,
3656 ER_THD(thd, ER_SLAVE_IGNORED_SSL_PARAMS));
3657#endif
3658
3659 if (lex_mi->relay_log_name)
3660 {
3661 need_relay_log_purge= 0;
3662 char relay_log_name[FN_REFLEN];
3663 mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
3664 strmake_buf(mi->rli.group_relay_log_name, relay_log_name);
3665 strmake_buf(mi->rli.event_relay_log_name, relay_log_name);
3666 }
3667
3668 if (lex_mi->relay_log_pos)
3669 {
3670 need_relay_log_purge= 0;
3671 mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
3672 }
3673
3674 if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS)
3675 mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
3676 else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS)
3677 mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
3678 else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO ||
3679 lex_mi->log_file_name || lex_mi->pos ||
3680 lex_mi->relay_log_name || lex_mi->relay_log_pos)
3681 mi->using_gtid= Master_info::USE_GTID_NO;
3682
3683 do_ids= ((lex_mi->repl_do_domain_ids_opt ==
3684 LEX_MASTER_INFO::LEX_MI_ENABLE) ?
3685 &lex_mi->repl_do_domain_ids : NULL);
3686
3687 ignore_ids= ((lex_mi->repl_ignore_domain_ids_opt ==
3688 LEX_MASTER_INFO::LEX_MI_ENABLE) ?
3689 &lex_mi->repl_ignore_domain_ids : NULL);
3690
3691 /*
3692 Note: mi->using_gtid stores the previous state in case no MASTER_USE_GTID
3693 is specified.
3694 */
3695 if (mi->domain_id_filter.update_ids(do_ids, ignore_ids, mi->using_gtid))
3696 {
3697 my_error(ER_MASTER_INFO, MYF(0),
3698 (int) lex_mi->connection_name.length,
3699 lex_mi->connection_name.str);
3700 ret= TRUE;
3701 goto err;
3702 }
3703
3704 /*
3705 If user did specify neither host nor port nor any log name nor any log
3706 pos, i.e. he specified only user/password/master_connect_retry, he probably
3707 wants replication to resume from where it had left, i.e. from the
3708 coordinates of the **SQL** thread (imagine the case where the I/O is ahead
3709 of the SQL; restarting from the coordinates of the I/O would lose some
3710 events which is probably unwanted when you are just doing minor changes
3711 like changing master_connect_retry).
3712 A side-effect is that if only the I/O thread was started, this thread may
3713 restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
3714 much more unlikely situation than the one we are fixing here).
3715 Note: coordinates of the SQL thread must be read here, before the
3716 'if (need_relay_log_purge)' block which resets them.
3717 */
3718 if (!lex_mi->host && !lex_mi->port &&
3719 !lex_mi->log_file_name && !lex_mi->pos &&
3720 need_relay_log_purge)
3721 {
3722 /*
3723 Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
3724 not initialized), so we use a MY_MAX().
3725 What happens to mi->rli.master_log_pos during the initialization stages
3726 of replication is not 100% clear, so we guard against problems using
3727 MY_MAX().
3728 */
3729 mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE,
3730 mi->rli.group_master_log_pos);
3731 strmake_buf(mi->master_log_name, mi->rli.group_master_log_name);
3732 }
3733
3734 /*
3735 Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
3736 a slave before).
3737 */
3738 if (flush_master_info(mi, FALSE, FALSE))
3739 {
3740 my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
3741 ret= TRUE;
3742 goto err;
3743 }
3744 if (need_relay_log_purge)
3745 {
3746 THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
3747 if (purge_relay_logs(&mi->rli, thd,
3748 0 /* not only reset, but also reinit */,
3749 &errmsg))
3750 {
3751 my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
3752 ret= TRUE;
3753 goto err;
3754 }
3755 }
3756 else
3757 {
3758 const char* msg;
3759 /* Relay log is already initialized */
3760 if (init_relay_log_pos(&mi->rli,
3761 mi->rli.group_relay_log_name,
3762 mi->rli.group_relay_log_pos,
3763 0 /*no data lock*/,
3764 &msg, 0))
3765 {
3766 my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
3767 ret= TRUE;
3768 goto err;
3769 }
3770 }
3771 /*
3772 Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
3773 so restore them to good values. If we left them to ''/0, that would work;
3774 but that would fail in the case of 2 successive CHANGE MASTER (without a
3775 START SLAVE in between): because first one would set the coords in mi to
3776 the good values of those in rli, the set those in rli to ''/0, then
3777 second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
3778 ''/0: we have lost all copies of the original good coordinates.
3779 That's why we always save good coords in rli.
3780 */
3781 mi->rli.group_master_log_pos= mi->master_log_pos;
3782 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3783 strmake_buf(mi->rli.group_master_log_name,mi->master_log_name);
3784
3785 if (!mi->rli.group_master_log_name[0]) // uninitialized case
3786 mi->rli.group_master_log_pos=0;
3787
3788 mysql_mutex_lock(&mi->rli.data_lock);
3789 mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
3790 /* Clear the errors, for a clean start */
3791 mi->rli.clear_error();
3792 mi->rli.clear_until_condition();
3793 mi->rli.slave_skip_counter= 0;
3794
3795 sql_print_information("'CHANGE MASTER TO executed'. "
3796 "Previous state master_host='%s', master_port='%u', master_log_file='%s', "
3797 "master_log_pos='%ld'. "
3798 "New state master_host='%s', master_port='%u', master_log_file='%s', "
3799 "master_log_pos='%ld'.", saved_host, saved_port, saved_log_name,
3800 (ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name,
3801 (ulong) mi->master_log_pos);
3802 if (saved_using_gtid != Master_info::USE_GTID_NO ||
3803 mi->using_gtid != Master_info::USE_GTID_NO)
3804 sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s",
3805 mi->using_gtid_astext(saved_using_gtid),
3806 mi->using_gtid_astext(mi->using_gtid));
3807
3808 /*
3809 If we don't write new coordinates to disk now, then old will remain in
3810 relay-log.info until START SLAVE is issued; but if mysqld is shutdown
3811 before START SLAVE, then old will remain in relay-log.info, and will be the
3812 in-memory value at restart (thus causing errors, as the old relay log does
3813 not exist anymore).
3814 */
3815 if (mi->rli.flush())
3816 ret= 1;
3817 mysql_cond_broadcast(&mi->data_cond);
3818 mysql_mutex_unlock(&mi->rli.data_lock);
3819
3820err:
3821 mi->unlock_slave_threads();
3822 if (ret == FALSE)
3823 my_ok(thd);
3824 DBUG_RETURN(ret);
3825}
3826
3827
3828/**
3829 Execute a RESET MASTER statement.
3830
3831 @param thd Pointer to THD object of the client thread executing the
3832 statement.
3833
3834 @retval 0 success
3835 @retval 1 error
3836*/
3837int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
3838 ulong next_log_number)
3839{
3840 if (!mysql_bin_log.is_open())
3841 {
3842 my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
3843 ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED),
3844 MYF(ME_BELL+ME_WAITTANG));
3845 return 1;
3846 }
3847
3848 bool ret= 0;
3849 /* Temporarily disable master semisync before reseting master. */
3850 repl_semisync_master.before_reset_master();
3851 ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
3852 next_log_number);
3853 repl_semisync_master.after_reset_master();
3854 return ret;
3855}
3856
3857
3858/**
3859 Execute a SHOW BINLOG EVENTS statement.
3860
3861 @param thd Pointer to THD object for the client thread executing the
3862 statement.
3863
3864 @retval FALSE success
3865 @retval TRUE failure
3866*/
3867bool mysql_show_binlog_events(THD* thd)
3868{
3869 Protocol *protocol= thd->protocol;
3870 List<Item> field_list;
3871 const char *errmsg = 0;
3872 bool ret = TRUE;
3873 IO_CACHE log;
3874 File file = -1;
3875 MYSQL_BIN_LOG *binary_log= NULL;
3876 int old_max_allowed_packet= thd->variables.max_allowed_packet;
3877 Master_info *mi= 0;
3878 LOG_INFO linfo;
3879 LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
3880
3881 DBUG_ENTER("mysql_show_binlog_events");
3882
3883 Log_event::init_show_field_list(thd, &field_list);
3884 if (protocol->send_result_set_metadata(&field_list,
3885 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
3886 DBUG_RETURN(TRUE);
3887
3888 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
3889 thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
3890
3891 /* select which binary log to use: binlog or relay */
3892 if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
3893 {
3894 binary_log= &mysql_bin_log;
3895 }
3896 else /* showing relay log contents */
3897 {
3898 if (!lex_mi->connection_name.str)
3899 lex_mi->connection_name= thd->variables.default_master_connection;
3900 if (!(mi= get_master_info(&lex_mi->connection_name,
3901 Sql_condition::WARN_LEVEL_ERROR)))
3902 {
3903 DBUG_RETURN(TRUE);
3904 }
3905 binary_log= &(mi->rli.relay_log);
3906 }
3907
3908 Format_description_log_event *description_event= new
3909 Format_description_log_event(3); /* MySQL 4.0 by default */
3910
3911 if (binary_log->is_open())
3912 {
3913 SELECT_LEX_UNIT *unit= &thd->lex->unit;
3914 ha_rows event_count, limit_start, limit_end;
3915 my_off_t pos = MY_MAX(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
3916 char search_file_name[FN_REFLEN], *name;
3917 const char *log_file_name = lex_mi->log_file_name;
3918 mysql_mutex_t *log_lock = binary_log->get_log_lock();
3919 Log_event* ev;
3920
3921 if (mi)
3922 {
3923 /* We can unlock the mutex as we have a lock on the file */
3924 mi->release();
3925 mi= 0;
3926 }
3927
3928 unit->set_limit(thd->lex->current_select);
3929 limit_start= unit->offset_limit_cnt;
3930 limit_end= unit->select_limit_cnt;
3931
3932 name= search_file_name;
3933 if (log_file_name)
3934 binary_log->make_log_name(search_file_name, log_file_name);
3935 else
3936 name=0; // Find first log
3937
3938 linfo.index_file_offset = 0;
3939
3940 if (binary_log->find_log_pos(&linfo, name, 1))
3941 {
3942 errmsg = "Could not find target log";
3943 goto err;
3944 }
3945
3946 thd->current_linfo= &linfo;
3947
3948 if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
3949 goto err;
3950
3951 /*
3952 to account binlog event header size
3953 */
3954 thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
3955
3956 mysql_mutex_lock(log_lock);
3957
3958 /*
3959 open_binlog() sought to position 4.
3960 Read the first event in case it's a Format_description_log_event, to
3961 know the format. If there's no such event, we are 3.23 or 4.x. This
3962 code, like before, can't read 3.23 binlogs.
3963 Also read the second event, in case it's a Start_encryption_log_event.
3964 This code will fail on a mixed relay log (one which has Format_desc then
3965 Rotate then Format_desc).
3966 */
3967
3968 my_off_t scan_pos = BIN_LOG_HEADER_SIZE;
3969 while (scan_pos < pos)
3970 {
3971 ev= Log_event::read_log_event(&log, description_event,
3972 opt_master_verify_checksum);
3973 scan_pos = my_b_tell(&log);
3974 if (ev == NULL || !ev->is_valid())
3975 {
3976 mysql_mutex_unlock(log_lock);
3977 errmsg = "Wrong offset or I/O error";
3978 goto err;
3979 }
3980 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
3981 {
3982 delete description_event;
3983 description_event= (Format_description_log_event*) ev;
3984 }
3985 else
3986 {
3987 if (ev->get_type_code() == START_ENCRYPTION_EVENT)
3988 {
3989 if (description_event->start_decryption((Start_encryption_log_event*) ev))
3990 {
3991 delete ev;
3992 mysql_mutex_unlock(log_lock);
3993 errmsg = "Could not initialize decryption of binlog.";
3994 goto err;
3995 }
3996 }
3997 delete ev;
3998 break;
3999 }
4000 }
4001
4002 my_b_seek(&log, pos);
4003
4004 for (event_count = 0;
4005 (ev = Log_event::read_log_event(&log,
4006 description_event,
4007 opt_master_verify_checksum)); )
4008 {
4009 if (event_count >= limit_start &&
4010 ev->net_send(protocol, linfo.log_file_name, pos))
4011 {
4012 errmsg = "Net error";
4013 delete ev;
4014 mysql_mutex_unlock(log_lock);
4015 goto err;
4016 }
4017
4018 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
4019 {
4020 Format_description_log_event* new_fdle=
4021 (Format_description_log_event*) ev;
4022 new_fdle->copy_crypto_data(description_event);
4023 delete description_event;
4024 description_event= new_fdle;
4025 }
4026 else
4027 {
4028 if (ev->get_type_code() == START_ENCRYPTION_EVENT)
4029 {
4030 if (description_event->start_decryption((Start_encryption_log_event*) ev))
4031 {
4032 errmsg = "Error starting decryption";
4033 delete ev;
4034 mysql_mutex_unlock(log_lock);
4035 goto err;
4036 }
4037 }
4038 delete ev;
4039 }
4040
4041 pos = my_b_tell(&log);
4042
4043 if (++event_count >= limit_end)
4044 break;
4045 }
4046
4047 if (unlikely(event_count < limit_end && log.error))
4048 {
4049 errmsg = "Wrong offset or I/O error";
4050 mysql_mutex_unlock(log_lock);
4051 goto err;
4052 }
4053
4054 mysql_mutex_unlock(log_lock);
4055 }
4056 else if (mi)
4057 mi->release();
4058
4059 // Check that linfo is still on the function scope.
4060 DEBUG_SYNC(thd, "after_show_binlog_events");
4061
4062 ret= FALSE;
4063
4064err:
4065 delete description_event;
4066 if (file >= 0)
4067 {
4068 end_io_cache(&log);
4069 mysql_file_close(file, MYF(MY_WME));
4070 }
4071
4072 if (errmsg)
4073 my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
4074 "SHOW BINLOG EVENTS", errmsg);
4075 else
4076 my_eof(thd);
4077
4078 thd->reset_current_linfo();
4079 thd->variables.max_allowed_packet= old_max_allowed_packet;
4080 DBUG_RETURN(ret);
4081}
4082
4083
4084void show_binlog_info_get_fields(THD *thd, List<Item> *field_list)
4085{
4086 MEM_ROOT *mem_root= thd->mem_root;
4087 field_list->push_back(new (mem_root)
4088 Item_empty_string(thd, "File", FN_REFLEN),
4089 mem_root);
4090 field_list->push_back(new (mem_root)
4091 Item_return_int(thd, "Position", 20,
4092 MYSQL_TYPE_LONGLONG),
4093 mem_root);
4094 field_list->push_back(new (mem_root)
4095 Item_empty_string(thd, "Binlog_Do_DB", 255),
4096 mem_root);
4097 field_list->push_back(new (mem_root)
4098 Item_empty_string(thd, "Binlog_Ignore_DB", 255),
4099 mem_root);
4100}
4101
4102
4103/**
4104 Execute a SHOW MASTER STATUS statement.
4105
4106 @param thd Pointer to THD object for the client thread executing the
4107 statement.
4108
4109 @retval FALSE success
4110 @retval TRUE failure
4111*/
4112bool show_binlog_info(THD* thd)
4113{
4114 Protocol *protocol= thd->protocol;
4115 DBUG_ENTER("show_binlog_info");
4116
4117 List<Item> field_list;
4118 show_binlog_info_get_fields(thd, &field_list);
4119
4120 if (protocol->send_result_set_metadata(&field_list,
4121 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4122 DBUG_RETURN(TRUE);
4123 protocol->prepare_for_resend();
4124
4125 if (mysql_bin_log.is_open())
4126 {
4127 LOG_INFO li;
4128 mysql_bin_log.get_current_log(&li);
4129 size_t dir_len = dirname_length(li.log_file_name);
4130 protocol->store(li.log_file_name + dir_len, &my_charset_bin);
4131 protocol->store((ulonglong) li.pos);
4132 protocol->store(binlog_filter->get_do_db());
4133 protocol->store(binlog_filter->get_ignore_db());
4134 if (protocol->write())
4135 DBUG_RETURN(TRUE);
4136 }
4137 my_eof(thd);
4138 DBUG_RETURN(FALSE);
4139}
4140
4141
4142void show_binlogs_get_fields(THD *thd, List<Item> *field_list)
4143{
4144 MEM_ROOT *mem_root= thd->mem_root;
4145 field_list->push_back(new (mem_root)
4146 Item_empty_string(thd, "Log_name", 255),
4147 mem_root);
4148 field_list->push_back(new (mem_root)
4149 Item_return_int(thd, "File_size", 20,
4150 MYSQL_TYPE_LONGLONG),
4151 mem_root);
4152}
4153
4154
4155/**
4156 Execute a SHOW BINARY LOGS statement.
4157
4158 @param thd Pointer to THD object for the client thread executing the
4159 statement.
4160
4161 @retval FALSE success
4162 @retval TRUE failure
4163*/
4164bool show_binlogs(THD* thd)
4165{
4166 IO_CACHE *index_file;
4167 LOG_INFO cur;
4168 File file;
4169 char fname[FN_REFLEN];
4170 List<Item> field_list;
4171 size_t length;
4172 size_t cur_dir_len;
4173 Protocol *protocol= thd->protocol;
4174 DBUG_ENTER("show_binlogs");
4175
4176 if (!mysql_bin_log.is_open())
4177 {
4178 my_error(ER_NO_BINARY_LOGGING, MYF(0));
4179 DBUG_RETURN(TRUE);
4180 }
4181
4182 show_binlogs_get_fields(thd, &field_list);
4183
4184 if (protocol->send_result_set_metadata(&field_list,
4185 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4186 DBUG_RETURN(TRUE);
4187
4188 mysql_mutex_lock(mysql_bin_log.get_log_lock());
4189 mysql_bin_log.lock_index();
4190 index_file=mysql_bin_log.get_index_file();
4191
4192 mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
4193 mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
4194
4195 cur_dir_len= dirname_length(cur.log_file_name);
4196
4197 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
4198
4199 /* The file ends with EOF or empty line */
4200 while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
4201 {
4202 size_t dir_len;
4203 ulonglong file_length= 0; // Length if open fails
4204 fname[--length] = '\0'; // remove the newline
4205
4206 protocol->prepare_for_resend();
4207 dir_len= dirname_length(fname);
4208 length-= dir_len;
4209 protocol->store(fname + dir_len, length, &my_charset_bin);
4210
4211 if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
4212 file_length= cur.pos; /* The active log, use the active position */
4213 else
4214 {
4215 /* this is an old log, open it and find the size */
4216 if ((file= mysql_file_open(key_file_binlog,
4217 fname, O_RDONLY | O_SHARE | O_BINARY,
4218 MYF(0))) >= 0)
4219 {
4220 file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
4221 mysql_file_close(file, MYF(0));
4222 }
4223 }
4224 protocol->store(file_length);
4225 if (protocol->write())
4226 goto err;
4227 }
4228 if (unlikely(index_file->error == -1))
4229 goto err;
4230 mysql_bin_log.unlock_index();
4231 my_eof(thd);
4232 DBUG_RETURN(FALSE);
4233
4234err:
4235 mysql_bin_log.unlock_index();
4236 DBUG_RETURN(TRUE);
4237}
4238
4239/**
4240 Load data's io cache specific hook to be executed
4241 before a chunk of data is being read into the cache's buffer
4242 The fuction instantianates and writes into the binlog
4243 replication events along LOAD DATA processing.
4244
4245 @param file pointer to io-cache
4246 @retval 0 success
4247 @retval 1 failure
4248*/
4249int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
4250{
4251 DBUG_ENTER("log_loaded_block");
4252 LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file);
4253 uint block_len;
4254 /* buffer contains position where we started last read */
4255 uchar* buffer= (uchar*) my_b_get_buffer_start(file);
4256 uint max_event_size= lf_info->thd->variables.max_allowed_packet;
4257
4258 if (lf_info->thd->is_current_stmt_binlog_format_row())
4259 goto ret;
4260 if (lf_info->last_pos_in_file != HA_POS_ERROR &&
4261 lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
4262 goto ret;
4263
4264 for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
4265 buffer += MY_MIN(block_len, max_event_size),
4266 block_len -= MY_MIN(block_len, max_event_size))
4267 {
4268 lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
4269 if (lf_info->wrote_create_file)
4270 {
4271 Append_block_log_event a(lf_info->thd, lf_info->thd->db.str, buffer,
4272 MY_MIN(block_len, max_event_size),
4273 lf_info->log_delayed);
4274 if (mysql_bin_log.write(&a))
4275 DBUG_RETURN(1);
4276 }
4277 else
4278 {
4279 Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db.str,
4280 buffer,
4281 MY_MIN(block_len, max_event_size),
4282 lf_info->log_delayed);
4283 if (mysql_bin_log.write(&b))
4284 DBUG_RETURN(1);
4285 lf_info->wrote_create_file= 1;
4286 }
4287 }
4288ret:
4289 int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
4290 DBUG_RETURN(res);
4291}
4292
4293
4294/**
4295 Initialise the slave replication state from the mysql.gtid_slave_pos table.
4296
4297 This is called each time an SQL thread starts, but the data is only actually
4298 loaded on the first call.
4299
4300 The slave state is the last GTID applied on the slave within each
4301 replication domain.
4302
4303 To avoid row lock contention, there are multiple rows for each domain_id.
4304 The one containing the current slave state is the one with the maximal
4305 sub_id value, within each domain_id.
4306
4307 CREATE TABLE mysql.gtid_slave_pos (
4308 domain_id INT UNSIGNED NOT NULL,
4309 sub_id BIGINT UNSIGNED NOT NULL,
4310 server_id INT UNSIGNED NOT NULL,
4311 seq_no BIGINT UNSIGNED NOT NULL,
4312 PRIMARY KEY (domain_id, sub_id))
4313*/
4314
4315void
4316rpl_init_gtid_slave_state()
4317{
4318 rpl_global_gtid_slave_state= new rpl_slave_state;
4319}
4320
4321
4322void
4323rpl_deinit_gtid_slave_state()
4324{
4325 delete rpl_global_gtid_slave_state;
4326}
4327
4328
4329void
4330rpl_init_gtid_waiting()
4331{
4332 rpl_global_gtid_waiting.init();
4333}
4334
4335
4336void
4337rpl_deinit_gtid_waiting()
4338{
4339 rpl_global_gtid_waiting.destroy();
4340}
4341
4342
4343/*
4344 Format the current GTID state as a string, for returning the value of
4345 @@global.gtid_slave_pos.
4346
4347 If the flag use_binlog is true, then the contents of the binary log (if
4348 enabled) is merged into the current GTID state (@@global.gtid_current_pos).
4349*/
4350int
4351rpl_append_gtid_state(String *dest, bool use_binlog)
4352{
4353 int err;
4354 rpl_gtid *gtid_list= NULL;
4355 uint32 num_gtids= 0;
4356
4357 if (use_binlog && opt_bin_log &&
4358 (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
4359 return err;
4360
4361 err= rpl_global_gtid_slave_state->tostring(dest, gtid_list, num_gtids);
4362 my_free(gtid_list);
4363
4364 return err;
4365}
4366
4367
4368/*
4369 Load the current GTID position into a slave_connection_state, for use when
4370 connecting to a master server with GTID.
4371
4372 If the flag use_binlog is true, then the contents of the binary log (if
4373 enabled) is merged into the current GTID state (master_use_gtid=current_pos).
4374*/
4375int
4376rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
4377{
4378 int err;
4379 rpl_gtid *gtid_list= NULL;
4380 uint32 num_gtids= 0;
4381
4382 if (use_binlog && opt_bin_log &&
4383 (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
4384 return err;
4385
4386 err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids);
4387 my_free(gtid_list);
4388
4389 return err;
4390}
4391
4392
4393bool
4394rpl_gtid_pos_check(THD *thd, char *str, size_t len)
4395{
4396 slave_connection_state tmp_slave_state;
4397 bool gave_conflict_warning= false, gave_missing_warning= false;
4398
4399 /* Check that we can parse the supplied string. */
4400 if (tmp_slave_state.load(str, len))
4401 return true;
4402
4403 /*
4404 Check our own binlog for any of our own transactions that are newer
4405 than the GTID state the user is requesting. Any such transactions would
4406 result in an out-of-order binlog, which could break anyone replicating
4407 with us as master.
4408
4409 So give an error if this is found, requesting the user to do a
4410 RESET MASTER (to clean up the binlog) if they really want this.
4411 */
4412 if (mysql_bin_log.is_open())
4413 {
4414 rpl_gtid *binlog_gtid_list= NULL;
4415 uint32 num_binlog_gtids= 0;
4416 uint32 i;
4417
4418 if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
4419 &num_binlog_gtids))
4420 {
4421 my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
4422 return true;
4423 }
4424 for (i= 0; i < num_binlog_gtids; ++i)
4425 {
4426 rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
4427 rpl_gtid *slave_gtid;
4428 if (binlog_gtid->server_id != global_system_variables.server_id)
4429 continue;
4430 if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
4431 {
4432 if (opt_gtid_strict_mode)
4433 {
4434 my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
4435 binlog_gtid->domain_id, binlog_gtid->domain_id,
4436 binlog_gtid->server_id, binlog_gtid->seq_no);
4437 break;
4438 }
4439 else if (!gave_missing_warning)
4440 {
4441 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4442 ER_MASTER_GTID_POS_MISSING_DOMAIN,
4443 ER_THD(thd, ER_MASTER_GTID_POS_MISSING_DOMAIN),
4444 binlog_gtid->domain_id, binlog_gtid->domain_id,
4445 binlog_gtid->server_id, binlog_gtid->seq_no);
4446 gave_missing_warning= true;
4447 }
4448 }
4449 else if (slave_gtid->seq_no < binlog_gtid->seq_no)
4450 {
4451 if (opt_gtid_strict_mode)
4452 {
4453 my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
4454 slave_gtid->domain_id, slave_gtid->server_id,
4455 slave_gtid->seq_no, binlog_gtid->domain_id,
4456 binlog_gtid->server_id, binlog_gtid->seq_no);
4457 break;
4458 }
4459 else if (!gave_conflict_warning)
4460 {
4461 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4462 ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG,
4463 ER_THD(thd, ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG),
4464 slave_gtid->domain_id, slave_gtid->server_id,
4465 slave_gtid->seq_no, binlog_gtid->domain_id,
4466 binlog_gtid->server_id, binlog_gtid->seq_no);
4467 gave_conflict_warning= true;
4468 }
4469 }
4470 }
4471 my_free(binlog_gtid_list);
4472 if (i != num_binlog_gtids)
4473 return true;
4474 }
4475
4476 return false;
4477}
4478
4479
4480bool
4481rpl_gtid_pos_update(THD *thd, char *str, size_t len)
4482{
4483 if (rpl_global_gtid_slave_state->load(thd, str, len, true, true))
4484 {
4485 my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
4486 return true;
4487 }
4488 else
4489 return false;
4490}
4491
4492
4493#endif /* HAVE_REPLICATION */
4494