1/* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
2 Copyright (c) 2010, 2017, MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software Foundation,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
16
17#include "mariadb.h"
18#include "sql_priv.h"
19#include "unireg.h" // HAVE_*
20#include "rpl_mi.h"
21#include "rpl_rli.h"
22#include "sql_base.h" // close_thread_tables
23#include <my_dir.h> // For MY_STAT
24#include "sql_repl.h" // For check_binlog_magic
25#include "log_event.h" // Format_description_log_event, Log_event,
26 // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
27 // PREFIX_SQL_LOAD
28#include "rpl_utility.h"
29#include "transaction.h"
30#include "sql_parse.h" // end_trans, ROLLBACK
31#include "slave.h"
32#include <mysql/plugin.h>
33#include <mysql/service_thd_wait.h>
34#include "lock.h"
35#include "sql_table.h"
36
37static int count_relay_log_space(Relay_log_info* rli);
38
39/**
40 Current replication state (hash of last GTID executed, per replication
41 domain).
42*/
43rpl_slave_state *rpl_global_gtid_slave_state;
44/* Object used for MASTER_GTID_WAIT(). */
45gtid_waiting rpl_global_gtid_waiting;
46
47const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
48
49Relay_log_info::Relay_log_info(bool is_slave_recovery)
50 :Slave_reporting_capability("SQL"),
51 replicate_same_server_id(::replicate_same_server_id),
52 info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
53 sync_counter(0), is_relay_log_recovery(is_slave_recovery),
54 save_temporary_tables(0),
55 mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0),
56 cur_log_old_open_count(0), error_on_rli_init_info(false),
57 group_relay_log_pos(0), event_relay_log_pos(0),
58 group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
59 last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
60 abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
61 gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
62 slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
63 until_log_pos(0), retried_trans(0), executed_entries(0),
64 sql_delay(0), sql_delay_end(0),
65 m_flags(0)
66{
67 DBUG_ENTER("Relay_log_info::Relay_log_info");
68
69 relay_log.is_relay_log= TRUE;
70 relay_log_state.init();
71#ifdef HAVE_PSI_INTERFACE
72 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
73 key_RELAYLOG_COND_relay_log_updated,
74 key_RELAYLOG_COND_bin_log_updated,
75 key_file_relaylog,
76 key_file_relaylog_index,
77 key_RELAYLOG_COND_queue_busy,
78 key_LOCK_relaylog_end_pos);
79#endif
80
81 group_relay_log_name[0]= event_relay_log_name[0]=
82 group_master_log_name[0]= 0;
83 until_log_name[0]= ign_master_log_name_end[0]= 0;
84 max_relay_log_size= global_system_variables.max_relay_log_size;
85 bzero((char*) &info_file, sizeof(info_file));
86 bzero((char*) &cache_buf, sizeof(cache_buf));
87 mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
88 mysql_mutex_init(key_relay_log_info_data_lock,
89 &data_lock, MY_MUTEX_INIT_FAST);
90 mysql_mutex_init(key_relay_log_info_log_space_lock,
91 &log_space_lock, MY_MUTEX_INIT_FAST);
92 mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
93 mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
94 mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
95 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
96 relay_log.init_pthread_objects();
97 DBUG_VOID_RETURN;
98}
99
100
101Relay_log_info::~Relay_log_info()
102{
103 DBUG_ENTER("Relay_log_info::~Relay_log_info");
104
105 reset_inuse_relaylog();
106 mysql_mutex_destroy(&run_lock);
107 mysql_mutex_destroy(&data_lock);
108 mysql_mutex_destroy(&log_space_lock);
109 mysql_cond_destroy(&data_cond);
110 mysql_cond_destroy(&start_cond);
111 mysql_cond_destroy(&stop_cond);
112 mysql_cond_destroy(&log_space_cond);
113 relay_log.cleanup();
114 DBUG_VOID_RETURN;
115}
116
117
118/**
119 Read the relay_log.info file.
120
121 @param info_fname The name of the file to read from.
122 @retval 0 success
123 @retval 1 failure
124*/
125int Relay_log_info::init(const char* info_fname)
126{
127 char fname[FN_REFLEN+128];
128 const char* msg = 0;
129 int error = 0;
130 mysql_mutex_t *log_lock;
131 DBUG_ENTER("Relay_log_info::init");
132
133 if (inited) // Set if this function called
134 DBUG_RETURN(0);
135
136 log_lock= relay_log.get_log_lock();
137 fn_format(fname, info_fname, mysql_data_home, "", 4+32);
138 mysql_mutex_lock(&data_lock);
139 cur_log_fd = -1;
140 slave_skip_counter=0;
141 abort_pos_wait=0;
142 log_space_limit= relay_log_space_limit;
143 log_space_total= 0;
144
145 if (unlikely(error_on_rli_init_info))
146 goto err;
147
148 char pattern[FN_REFLEN];
149 (void) my_realpath(pattern, slave_load_tmpdir, 0);
150 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
151 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
152 {
153 mysql_mutex_unlock(&data_lock);
154 sql_print_error("Unable to use slave's temporary directory %s",
155 slave_load_tmpdir);
156 DBUG_RETURN(1);
157 }
158 unpack_filename(slave_patternload_file, pattern);
159 slave_patternload_file_size= strlen(slave_patternload_file);
160
161 /*
162 The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
163 Note that the I/O thread flushes it to disk after writing every
164 event, in flush_master_info(mi, 1, ?).
165 */
166
167 {
168 /* Reports an error and returns, if the --relay-log's path
169 is a directory.*/
170 if (opt_relay_logname &&
171 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
172 {
173 mysql_mutex_unlock(&data_lock);
174 sql_print_error("Path '%s' is a directory name, please specify \
175a file name for --relay-log option", opt_relay_logname);
176 DBUG_RETURN(1);
177 }
178
179 /* Reports an error and returns, if the --relay-log-index's path
180 is a directory.*/
181 if (opt_relaylog_index_name &&
182 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
183 == FN_LIBCHAR)
184 {
185 mysql_mutex_unlock(&data_lock);
186 sql_print_error("Path '%s' is a directory name, please specify \
187a file name for --relay-log-index option", opt_relaylog_index_name);
188 DBUG_RETURN(1);
189 }
190
191 char buf[FN_REFLEN];
192 const char *ln;
193 static bool name_warning_sent= 0;
194 ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
195 1, buf);
196 /* We send the warning only at startup, not after every RESET SLAVE */
197 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
198 !opt_bootstrap)
199 {
200 /*
201 User didn't give us info to name the relay log index file.
202 Picking `hostname`-relay-bin.index like we do, causes replication to
203 fail if this slave's hostname is changed later. So, we would like to
204 instead require a name. But as we don't want to break many existing
205 setups, we only give warning, not error.
206 */
207 sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
208 " so replication "
209 "may break when this MySQL server acts as a "
210 "slave and has his hostname changed!! Please "
211 "use '--log-basename=#' or '--relay-log=%s' to avoid "
212 "this problem.", ln);
213 name_warning_sent= 1;
214 }
215
216 /* For multimaster, add connection name to relay log filenames */
217 char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
218 char *buf_relaylog_index_name= opt_relaylog_index_name;
219
220 create_logfile_name_with_suffix(buf_relay_logname,
221 sizeof(buf_relay_logname),
222 ln, 1, &mi->cmp_connection_name);
223 ln= buf_relay_logname;
224
225 if (opt_relaylog_index_name)
226 {
227 buf_relaylog_index_name= buf_relaylog_index_name_buff;
228 create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
229 sizeof(buf_relaylog_index_name_buff),
230 opt_relaylog_index_name, 0,
231 &mi->cmp_connection_name);
232 }
233
234 /*
235 note, that if open() fails, we'll still have index file open
236 but a destructor will take care of that
237 */
238 mysql_mutex_lock(log_lock);
239 if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
240 relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
241 (ulong)max_relay_log_size, 1, TRUE))
242 {
243 mysql_mutex_unlock(log_lock);
244 mysql_mutex_unlock(&data_lock);
245 sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M", ln, my_errno);
246 DBUG_RETURN(1);
247 }
248 mysql_mutex_unlock(log_lock);
249 }
250
251 /* if file does not exist */
252 if (access(fname,F_OK))
253 {
254 /*
255 If someone removed the file from underneath our feet, just close
256 the old descriptor and re-create the old file
257 */
258 if (info_fd >= 0)
259 mysql_file_close(info_fd, MYF(MY_WME));
260 if ((info_fd= mysql_file_open(key_file_relay_log_info,
261 fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
262 {
263 sql_print_error("Failed to create a new relay log info file ("
264 "file '%s', errno %d)", fname, my_errno);
265 msg= current_thd->get_stmt_da()->message();
266 goto err;
267 }
268 if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
269 MYF(MY_WME)))
270 {
271 sql_print_error("Failed to create a cache on relay log info file '%s'",
272 fname);
273 msg= current_thd->get_stmt_da()->message();
274 goto err;
275 }
276
277 /* Init relay log with first entry in the relay index file */
278 if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
279 &msg, 0))
280 {
281 sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
282 goto err;
283 }
284 group_master_log_name[0]= 0;
285 group_master_log_pos= 0;
286 }
287 else // file exists
288 {
289 if (info_fd >= 0)
290 reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
291 else
292 {
293 int error=0;
294 if ((info_fd= mysql_file_open(key_file_relay_log_info,
295 fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
296 {
297 sql_print_error("\
298Failed to open the existing relay log info file '%s' (errno %d)",
299 fname, my_errno);
300 error= 1;
301 }
302 else if (init_io_cache(&info_file, info_fd,
303 IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
304 {
305 sql_print_error("Failed to create a cache on relay log info file '%s'",
306 fname);
307 error= 1;
308 }
309 if (unlikely(error))
310 {
311 if (info_fd >= 0)
312 mysql_file_close(info_fd, MYF(0));
313 info_fd= -1;
314 mysql_mutex_lock(log_lock);
315 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
316 mysql_mutex_unlock(log_lock);
317 mysql_mutex_unlock(&data_lock);
318 DBUG_RETURN(1);
319 }
320 }
321
322 int relay_log_pos, master_log_pos, lines;
323 char *first_non_digit;
324
325 /*
326 Starting from MySQL 5.6.x, relay-log.info has a new format.
327 Now, its first line contains the number of lines in the file.
328 By reading this number we can determine which version our master.info
329 comes from. We can't simply count the lines in the file, since
330 versions before 5.6.x could generate files with more lines than
331 needed. If first line doesn't contain a number, or if it
332 contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
333 then the file is treated like a file from pre-5.6.x version.
334 There is no ambiguity when reading an old master.info: before
335 5.6.x, the first line contained the binlog's name, which is
336 either empty or has an extension (contains a '.'), so can't be
337 confused with an integer.
338
339 So we're just reading first line and trying to figure which
340 version is this.
341 */
342
343 /*
344 The first row is temporarily stored in mi->master_log_name, if
345 it is line count and not binlog name (new format) it will be
346 overwritten by the second row later.
347 */
348 if (init_strvar_from_file(group_relay_log_name,
349 sizeof(group_relay_log_name),
350 &info_file, ""))
351 {
352 msg="Error reading slave log configuration";
353 goto err;
354 }
355
356 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
357
358 if (group_relay_log_name[0] != '\0' &&
359 *first_non_digit == '\0' &&
360 lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
361 {
362 DBUG_PRINT("info", ("relay_log_info file is in new format."));
363 /* Seems to be new format => read relay log name from next line */
364 if (init_strvar_from_file(group_relay_log_name,
365 sizeof(group_relay_log_name),
366 &info_file, ""))
367 {
368 msg="Error reading slave log configuration";
369 goto err;
370 }
371 }
372 else
373 DBUG_PRINT("info", ("relay_log_info file is in old format."));
374
375 if (init_intvar_from_file(&relay_log_pos,
376 &info_file, BIN_LOG_HEADER_SIZE) ||
377 init_strvar_from_file(group_master_log_name,
378 sizeof(group_master_log_name),
379 &info_file, "") ||
380 init_intvar_from_file(&master_log_pos, &info_file, 0) ||
381 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
382 init_intvar_from_file(&sql_delay, &info_file, 0)))
383 {
384 msg="Error reading slave log configuration";
385 goto err;
386 }
387
388 strmake_buf(event_relay_log_name,group_relay_log_name);
389 group_relay_log_pos= event_relay_log_pos= relay_log_pos;
390 group_master_log_pos= master_log_pos;
391
392 if (is_relay_log_recovery && init_recovery(mi, &msg))
393 goto err;
394
395 relay_log_state.load(rpl_global_gtid_slave_state);
396 if (init_relay_log_pos(this,
397 group_relay_log_name,
398 group_relay_log_pos,
399 0 /* no data lock*/,
400 &msg, 0))
401 {
402 sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
403 group_relay_log_name, group_relay_log_pos);
404 goto err;
405 }
406 }
407
408 DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
409 my_b_tell(cur_log), event_relay_log_pos));
410 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
411 DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
412
413 /*
414 Now change the cache from READ to WRITE - must do this
415 before Relay_log_info::flush()
416 */
417 reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
418 if (unlikely((error= flush())))
419 {
420 msg= "Failed to flush relay log info file";
421 goto err;
422 }
423 if (count_relay_log_space(this))
424 {
425 msg="Error counting relay log space";
426 goto err;
427 }
428 inited= 1;
429 error_on_rli_init_info= false;
430 mysql_mutex_unlock(&data_lock);
431 DBUG_RETURN(0);
432
433err:
434 error_on_rli_init_info= true;
435 if (msg)
436 sql_print_error("%s", msg);
437 end_io_cache(&info_file);
438 if (info_fd >= 0)
439 mysql_file_close(info_fd, MYF(0));
440 info_fd= -1;
441 mysql_mutex_lock(log_lock);
442 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
443 mysql_mutex_unlock(log_lock);
444 mysql_mutex_unlock(&data_lock);
445 DBUG_RETURN(1);
446}
447
448
449static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
450{
451 MY_STAT s;
452 DBUG_ENTER("add_relay_log");
453 if (!mysql_file_stat(key_file_relaylog,
454 linfo->log_file_name, &s, MYF(0)))
455 {
456 sql_print_error("log %s listed in the index, but failed to stat",
457 linfo->log_file_name);
458 DBUG_RETURN(1);
459 }
460 rli->log_space_total += s.st_size;
461 DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total));
462 DBUG_RETURN(0);
463}
464
465
466static int count_relay_log_space(Relay_log_info* rli)
467{
468 LOG_INFO linfo;
469 DBUG_ENTER("count_relay_log_space");
470 rli->log_space_total= 0;
471 if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
472 {
473 sql_print_error("Could not find first log while counting relay log space");
474 DBUG_RETURN(1);
475 }
476 do
477 {
478 if (add_relay_log(rli,&linfo))
479 DBUG_RETURN(1);
480 } while (!rli->relay_log.find_next_log(&linfo, 1));
481 /*
482 As we have counted everything, including what may have written in a
483 preceding write, we must reset bytes_written, or we may count some space
484 twice.
485 */
486 rli->relay_log.reset_bytes_written();
487 DBUG_RETURN(0);
488}
489
490
491/*
492 Reset UNTIL condition for Relay_log_info
493
494 SYNOPSYS
495 clear_until_condition()
496 rli - Relay_log_info structure where UNTIL condition should be reset
497 */
498
499void Relay_log_info::clear_until_condition()
500{
501 DBUG_ENTER("clear_until_condition");
502
503 until_condition= Relay_log_info::UNTIL_NONE;
504 until_log_name[0]= 0;
505 until_log_pos= 0;
506 DBUG_VOID_RETURN;
507}
508
509
510/*
511 Read the correct format description event for starting to replicate from
512 a given position in a relay log file.
513*/
514Format_description_log_event *
515read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
516 const char **errmsg)
517{
518 Log_event *ev;
519 Format_description_log_event *fdev;
520 bool found= false;
521
522 /*
523 By default the relay log is in binlog format 3 (4.0).
524 Even if format is 4, this will work enough to read the first event
525 (Format_desc) (remember that format 4 is just lenghtened compared to format
526 3; format 3 is a prefix of format 4).
527 */
528 fdev= new Format_description_log_event(3);
529
530 while (!found)
531 {
532 Log_event_type typ;
533
534 /*
535 Read the possible Format_description_log_event; if position
536 was 4, no need, it will be read naturally.
537 */
538 DBUG_PRINT("info",("looking for a Format_description_log_event"));
539
540 if (my_b_tell(cur_log) >= start_pos)
541 break;
542
543 if (!(ev= Log_event::read_log_event(cur_log, fdev,
544 opt_slave_sql_verify_checksum)))
545 {
546 DBUG_PRINT("info",("could not read event, cur_log->error=%d",
547 cur_log->error));
548 if (cur_log->error) /* not EOF */
549 {
550 *errmsg= "I/O error reading event at position 4";
551 delete fdev;
552 return NULL;
553 }
554 break;
555 }
556 typ= ev->get_type_code();
557 if (typ == FORMAT_DESCRIPTION_EVENT)
558 {
559 Format_description_log_event *old= fdev;
560 DBUG_PRINT("info",("found Format_description_log_event"));
561 fdev= (Format_description_log_event*) ev;
562 fdev->copy_crypto_data(old);
563 delete old;
564
565 /*
566 As ev was returned by read_log_event, it has passed is_valid(), so
567 my_malloc() in ctor worked, no need to check again.
568 */
569 /*
570 Ok, we found a Format_description event. But it is not sure that this
571 describes the whole relay log; indeed, one can have this sequence
572 (starting from position 4):
573 Format_desc (of slave)
574 Rotate (of master)
575 Format_desc (of master)
576 So the Format_desc which really describes the rest of the relay log
577 is the 3rd event (it can't be further than that, because we rotate
578 the relay log when we queue a Rotate event from the master).
579 But what describes the Rotate is the first Format_desc.
580 So what we do is:
581 go on searching for Format_description events, until you exceed the
582 position (argument 'pos') or until you find another event than Rotate
583 or Format_desc.
584 */
585 }
586 else if (typ == START_ENCRYPTION_EVENT)
587 {
588 if (fdev->start_decryption((Start_encryption_log_event*) ev))
589 {
590 *errmsg= "Unable to set up decryption of binlog.";
591 delete ev;
592 delete fdev;
593 return NULL;
594 }
595 delete ev;
596 }
597 else
598 {
599 DBUG_PRINT("info",("found event of another type=%d", typ));
600 found= (typ != ROTATE_EVENT);
601 delete ev;
602 }
603 }
604 return fdev;
605}
606
607
608/*
609 Open the given relay log
610
611 SYNOPSIS
612 init_relay_log_pos()
613 rli Relay information (will be initialized)
614 log Name of relay log file to read from. NULL = First log
615 pos Position in relay log file
616 need_data_lock Set to 1 if this functions should do mutex locks
617 errmsg Store pointer to error message here
618 look_for_description_event
619 1 if we should look for such an event. We only need
620 this when the SQL thread starts and opens an existing
621 relay log and has to execute it (possibly from an
622 offset >4); then we need to read the first event of
623 the relay log to be able to parse the events we have
624 to execute.
625
626 DESCRIPTION
627 - Close old open relay log files.
628 - If we are using the same relay log as the running IO-thread, then set
629 rli->cur_log to point to the same IO_CACHE entry.
630 - If not, open the 'log' binary file.
631
632 TODO
633 - check proper initialization of group_master_log_name/group_master_log_pos
634
635 RETURN VALUES
636 0 ok
637 1 error. errmsg is set to point to the error message
638*/
639
640int init_relay_log_pos(Relay_log_info* rli,const char* log,
641 ulonglong pos, bool need_data_lock,
642 const char** errmsg,
643 bool look_for_description_event)
644{
645 DBUG_ENTER("init_relay_log_pos");
646 DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
647
648 *errmsg=0;
649 mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
650
651 if (need_data_lock)
652 mysql_mutex_lock(&rli->data_lock);
653
654 /*
655 Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
656 is, too, and init_slave() too; these 2 functions allocate a description
657 event in init_relay_log_pos, which is not freed by the terminating SQL slave
658 thread as that thread is not started by these functions. So we have to free
659 the description_event here, in case, so that there is no memory leak in
660 running, say, CHANGE MASTER.
661 */
662 delete rli->relay_log.description_event_for_exec;
663 /*
664 By default the relay log is in binlog format 3 (4.0).
665 Even if format is 4, this will work enough to read the first event
666 (Format_desc) (remember that format 4 is just lenghtened compared to format
667 3; format 3 is a prefix of format 4).
668 */
669 rli->relay_log.description_event_for_exec= new
670 Format_description_log_event(3);
671
672 mysql_mutex_lock(log_lock);
673
674 /* Close log file and free buffers if it's already open */
675 if (rli->cur_log_fd >= 0)
676 {
677 end_io_cache(&rli->cache_buf);
678 mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
679 rli->cur_log_fd = -1;
680 }
681
682 rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
683 rli->clear_flag(Relay_log_info::IN_STMT);
684 rli->clear_flag(Relay_log_info::IN_TRANSACTION);
685
686 /*
687 Test to see if the previous run was with the skip of purging
688 If yes, we do not purge when we restart
689 */
690 if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
691 {
692 *errmsg="Could not find first log during relay log initialization";
693 goto err;
694 }
695
696 if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
697 {
698 *errmsg="Could not find target log during relay log initialization";
699 goto err;
700 }
701 strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
702 strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
703 if (rli->relay_log.is_active(rli->linfo.log_file_name))
704 {
705 /*
706 The IO thread is using this log file.
707 In this case, we will use the same IO_CACHE pointer to
708 read data as the IO thread is using to write data.
709 */
710 my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
711 if (check_binlog_magic(rli->cur_log,errmsg))
712 goto err;
713 rli->cur_log_old_open_count=rli->relay_log.get_open_count();
714 }
715 else
716 {
717 /*
718 Open the relay log and set rli->cur_log to point at this one
719 */
720 if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
721 rli->linfo.log_file_name,errmsg)) < 0)
722 goto err;
723 rli->cur_log = &rli->cache_buf;
724 }
725 /*
726 In all cases, check_binlog_magic() has been called so we're at offset 4 for
727 sure.
728 */
729 if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
730 {
731 if (look_for_description_event)
732 {
733 Format_description_log_event *fdev;
734 if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
735 goto err;
736 delete rli->relay_log.description_event_for_exec;
737 rli->relay_log.description_event_for_exec= fdev;
738 }
739 my_b_seek(rli->cur_log,(off_t)pos);
740 DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
741 my_b_tell(rli->cur_log), rli->event_relay_log_pos));
742
743 }
744
745err:
746 /*
747 If we don't purge, we can't honour relay_log_space_limit ;
748 silently discard it
749 */
750 if (!relay_log_purge)
751 rli->log_space_limit= 0;
752 mysql_cond_broadcast(&rli->data_cond);
753
754 mysql_mutex_unlock(log_lock);
755
756 if (need_data_lock)
757 mysql_mutex_unlock(&rli->data_lock);
758 if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
759 *errmsg= "Invalid Format_description log event; could be out of memory";
760
761 DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
762
763 DBUG_RETURN ((*errmsg) ? 1 : 0);
764}
765
766
767/*
768 Waits until the SQL thread reaches (has executed up to) the
769 log/position or timed out.
770
771 SYNOPSIS
772 wait_for_pos()
773 thd client thread that sent SELECT MASTER_POS_WAIT
774 log_name log name to wait for
775 log_pos position to wait for
776 timeout timeout in seconds before giving up waiting
777
778 NOTES
779 timeout is longlong whereas it should be ulong ; but this is
780 to catch if the user submitted a negative timeout.
781
782 RETURN VALUES
783 -2 improper arguments (log_pos<0)
784 or slave not running, or master info changed
785 during the function's execution,
786 or client thread killed. -2 is translated to NULL by caller
787 -1 timed out
788 >=0 number of log events the function had to wait
789 before reaching the desired log/position
790 */
791
792int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
793 longlong log_pos,
794 longlong timeout)
795{
796 int event_count = 0;
797 ulong init_abort_pos_wait;
798 int error=0;
799 struct timespec abstime; // for timeout checking
800 PSI_stage_info old_stage;
801 DBUG_ENTER("Relay_log_info::wait_for_pos");
802
803 if (!inited)
804 DBUG_RETURN(-2);
805
806 DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
807 log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
808
809 set_timespec(abstime,timeout);
810 mysql_mutex_lock(&data_lock);
811 thd->ENTER_COND(&data_cond, &data_lock,
812 &stage_waiting_for_the_slave_thread_to_advance_position,
813 &old_stage);
814 /*
815 This function will abort when it notices that some CHANGE MASTER or
816 RESET MASTER has changed the master info.
817 To catch this, these commands modify abort_pos_wait ; We just monitor
818 abort_pos_wait and see if it has changed.
819 Why do we have this mechanism instead of simply monitoring slave_running
820 in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
821 the SQL thread be stopped?
822 This is becasue if someones does:
823 STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
824 the change may happen very quickly and we may not notice that
825 slave_running briefly switches between 1/0/1.
826 */
827 init_abort_pos_wait= abort_pos_wait;
828
829 /*
830 We'll need to
831 handle all possible log names comparisons (e.g. 999 vs 1000).
832 We use ulong for string->number conversion ; this is no
833 stronger limitation than in find_uniq_filename in sql/log.cc
834 */
835 ulong log_name_extension;
836 char log_name_tmp[FN_REFLEN]; //make a char[] from String
837
838 strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
839
840 char *p= fn_ext(log_name_tmp);
841 char *p_end;
842 if (!*p || log_pos<0)
843 {
844 error= -2; //means improper arguments
845 goto err;
846 }
847 // Convert 0-3 to 4
848 log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
849 /* p points to '.' */
850 log_name_extension= strtoul(++p, &p_end, 10);
851 /*
852 p_end points to the first invalid character.
853 If it equals to p, no digits were found, error.
854 If it contains '\0' it means conversion went ok.
855 */
856 if (p_end==p || *p_end)
857 {
858 error= -2;
859 goto err;
860 }
861
862 /* The "compare and wait" main loop */
863 while (!thd->killed &&
864 init_abort_pos_wait == abort_pos_wait &&
865 slave_running)
866 {
867 bool pos_reached;
868 int cmp_result= 0;
869
870 DBUG_PRINT("info",
871 ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
872 init_abort_pos_wait, abort_pos_wait));
873 DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
874 group_master_log_name, (ulong) group_master_log_pos));
875
876 /*
877 group_master_log_name can be "", if we are just after a fresh
878 replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
879 (before we have executed one Rotate event from the master) or
880 (rare) if the user is doing a weird slave setup (see next
881 paragraph). If group_master_log_name is "", we assume we don't
882 have enough info to do the comparison yet, so we just wait until
883 more data. In this case master_log_pos is always 0 except if
884 somebody (wrongly) sets this slave to be a slave of itself
885 without using --replicate-same-server-id (an unsupported
886 configuration which does nothing), then group_master_log_pos
887 will grow and group_master_log_name will stay "".
888 */
889 if (*group_master_log_name)
890 {
891 char *basename= (group_master_log_name +
892 dirname_length(group_master_log_name));
893 /*
894 First compare the parts before the extension.
895 Find the dot in the master's log basename,
896 and protect against user's input error :
897 if the names do not match up to '.' included, return error
898 */
899 char *q= (char*)(fn_ext(basename)+1);
900 if (strncmp(basename, log_name_tmp, (int)(q-basename)))
901 {
902 error= -2;
903 break;
904 }
905 // Now compare extensions.
906 char *q_end;
907 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
908 if (group_master_log_name_extension < log_name_extension)
909 cmp_result= -1 ;
910 else
911 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
912
913 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
914 cmp_result > 0);
915 if (pos_reached || thd->killed)
916 break;
917 }
918
919 //wait for master update, with optional timeout.
920
921 DBUG_PRINT("info",("Waiting for master update"));
922 /*
923 We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
924 will wake us up.
925 */
926 thd_wait_begin(thd, THD_WAIT_BINLOG);
927 if (timeout > 0)
928 {
929 /*
930 Note that mysql_cond_timedwait checks for the timeout
931 before for the condition ; i.e. it returns ETIMEDOUT
932 if the system time equals or exceeds the time specified by abstime
933 before the condition variable is signaled or broadcast, _or_ if
934 the absolute time specified by abstime has already passed at the time
935 of the call.
936 For that reason, mysql_cond_timedwait will do the "timeoutting" job
937 even if its condition is always immediately signaled (case of a loaded
938 master).
939 */
940 error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
941 }
942 else
943 mysql_cond_wait(&data_cond, &data_lock);
944 thd_wait_end(thd);
945 DBUG_PRINT("info",("Got signal of master update or timed out"));
946 if (error == ETIMEDOUT || error == ETIME)
947 {
948 error= -1;
949 break;
950 }
951 error=0;
952 event_count++;
953 DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
954 }
955
956err:
957 thd->EXIT_COND(&old_stage);
958 DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
959improper_arguments: %d timed_out: %d",
960 thd->killed_errno(),
961 (int) (init_abort_pos_wait != abort_pos_wait),
962 (int) slave_running,
963 (int) (error == -2),
964 (int) (error == -1)));
965 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
966 !slave_running)
967 {
968 error= -2;
969 }
970 DBUG_RETURN( error ? error : event_count );
971}
972
973
974void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
975 rpl_group_info *rgi,
976 bool skip_lock)
977{
978 DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
979
980 if (skip_lock)
981 mysql_mutex_assert_owner(&data_lock);
982 else
983 mysql_mutex_lock(&data_lock);
984
985 rgi->inc_event_relay_log_pos();
986 DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
987 (long) log_pos, (long) group_master_log_pos));
988 if (rgi->is_parallel_exec)
989 {
990 /* In case of parallel replication, do not update the position backwards. */
991 int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name);
992 if (cmp < 0)
993 {
994 group_relay_log_pos= rgi->future_event_relay_log_pos;
995 strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
996 notify_group_relay_log_name_update();
997 } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
998 group_relay_log_pos= rgi->future_event_relay_log_pos;
999
1000 /*
1001 In the parallel case we need to update the master_log_name here, rather
1002 than in Rotate_log_event::do_update_pos().
1003 */
1004 cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
1005 if (cmp <= 0)
1006 {
1007 if (cmp < 0)
1008 {
1009 strcpy(group_master_log_name, rgi->future_event_master_log_name);
1010 group_master_log_pos= log_pos;
1011 }
1012 else if (group_master_log_pos < log_pos)
1013 group_master_log_pos= log_pos;
1014 }
1015
1016 /*
1017 In the parallel case, we only update the Seconds_Behind_Master at the
1018 end of a transaction. In the non-parallel case, the value is updated as
1019 soon as an event is read from the relay log; however this would be too
1020 confusing for the user, seeing the slave reported as up-to-date when
1021 potentially thousands of events are still queued up for worker threads
1022 waiting for execution.
1023 */
1024 if (rgi->last_master_timestamp &&
1025 rgi->last_master_timestamp > last_master_timestamp)
1026 last_master_timestamp= rgi->last_master_timestamp;
1027 }
1028 else
1029 {
1030 /* Non-parallel case. */
1031 group_relay_log_pos= event_relay_log_pos;
1032 strmake_buf(group_relay_log_name, event_relay_log_name);
1033 notify_group_relay_log_name_update();
1034 if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event
1035 group_master_log_pos= log_pos;
1036 }
1037
1038 /*
1039 If the slave does not support transactions and replicates a transaction,
1040 users should not trust group_master_log_pos (which they can display with
1041 SHOW SLAVE STATUS or read from relay-log.info), because to compute
1042 group_master_log_pos the slave relies on log_pos stored in the master's
1043 binlog, but if we are in a master's transaction these positions are always
1044 the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
1045 not advance as it should on the non-transactional slave (it advances by
1046 big leaps, whereas it should advance by small leaps).
1047 */
1048 /*
1049 In 4.x we used the event's len to compute the positions here. This is
1050 wrong if the event was 3.23/4.0 and has been converted to 5.0, because
1051 then the event's len is not what is was in the master's binlog, so this
1052 will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
1053 replication: Exec_master_log_pos is wrong). Only way to solve this is to
1054 have the original offset of the end of the event the relay log. This is
1055 what we do in 5.0: log_pos has become "end_log_pos" (because the real use
1056 of log_pos in 4.0 was to compute the end_log_pos; so better to store
1057 end_log_pos instead of begin_log_pos.
1058 If we had not done this fix here, the problem would also have appeared
1059 when the slave and master are 5.0 but with different event length (for
1060 example the slave is more recent than the master and features the event
1061 UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
1062 SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
1063 value which would lead to badly broken replication.
1064 Even the relay_log_pos will be corrupted in this case, because the len is
1065 the relay log is not "val".
1066 With the end_log_pos solution, we avoid computations involving lengthes.
1067 */
1068 mysql_cond_broadcast(&data_cond);
1069 if (!skip_lock)
1070 mysql_mutex_unlock(&data_lock);
1071 DBUG_VOID_RETURN;
1072}
1073
1074
1075void Relay_log_info::close_temporary_tables()
1076{
1077 DBUG_ENTER("Relay_log_info::close_temporary_tables");
1078
1079 TMP_TABLE_SHARE *share;
1080 TABLE *table;
1081
1082 if (!save_temporary_tables)
1083 {
1084 /* There are no temporary tables. */
1085 DBUG_VOID_RETURN;
1086 }
1087
1088 while ((share= save_temporary_tables->pop_front()))
1089 {
1090 /*
1091 Iterate over the list of tables for this TABLE_SHARE and close them.
1092 */
1093 while ((table= share->all_tmp_tables.pop_front()))
1094 {
1095 DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
1096 table->s->db.str, table->s->table_name.str));
1097
1098 /* Reset in_use as the table may have been created by another thd */
1099 table->in_use= 0;
1100 /*
1101 Lets not free TABLE_SHARE here as there could be multiple TABLEs opened
1102 for the same table (TABLE_SHARE).
1103 */
1104 closefrm(table);
1105 my_free(table);
1106 }
1107
1108 /*
1109 Don't ask for disk deletion. For now, anyway they will be deleted when
1110 slave restarts, but it is a better intention to not delete them.
1111 */
1112
1113 free_table_share(share);
1114 my_free(share);
1115 }
1116
1117 /* By now, there mustn't be any elements left in the list. */
1118 DBUG_ASSERT(save_temporary_tables->is_empty());
1119
1120 my_free(save_temporary_tables);
1121 save_temporary_tables= NULL;
1122 slave_open_temp_tables= 0;
1123
1124 DBUG_VOID_RETURN;
1125}
1126
1127/*
1128 purge_relay_logs()
1129
1130 @param rli Relay log information
1131 @param thd thread id. May be zero during startup
1132
1133 NOTES
1134 Assumes to have a run lock on rli and that no slave thread are running.
1135*/
1136
1137int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
1138 const char** errmsg)
1139{
1140 int error=0;
1141 const char *ln;
1142 char name_buf[FN_REFLEN];
1143 DBUG_ENTER("purge_relay_logs");
1144
1145 /*
1146 Even if rli->inited==0, we still try to empty rli->master_log_* variables.
1147 Indeed, rli->inited==0 does not imply that they already are empty.
1148 It could be that slave's info initialization partly succeeded :
1149 for example if relay-log.info existed but *relay-bin*.*
1150 have been manually removed, Relay_log_info::init() reads the old
1151 relay-log.info and fills rli->master_log_*, then Relay_log_info::init()
1152 checks for the existence of the relay log, this fails and
1153 Relay_log_info::init() leaves rli->inited to 0.
1154 In that pathological case, rli->master_log_pos* will be properly reinited
1155 at the next START SLAVE (as RESET SLAVE or CHANGE
1156 MASTER, the callers of purge_relay_logs, will delete bogus *.info files
1157 or replace them with correct files), however if the user does SHOW SLAVE
1158 STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
1159 In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
1160 to display fine in any case.
1161 */
1162
1163 rli->group_master_log_name[0]= 0;
1164 rli->group_master_log_pos= 0;
1165
1166 if (!rli->inited)
1167 {
1168 DBUG_PRINT("info", ("rli->inited == 0"));
1169 if (rli->error_on_rli_init_info)
1170 {
1171 ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
1172 1, name_buf);
1173
1174 if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1175 {
1176 sql_print_error("Unable to purge relay log files. Failed to open relay "
1177 "log index file:%s.", rli->relay_log.get_index_fname());
1178 DBUG_RETURN(1);
1179 }
1180 mysql_mutex_lock(rli->relay_log.get_log_lock());
1181 if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
1182 (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size :
1183 max_binlog_size), 1, TRUE))
1184 {
1185 sql_print_error("Unable to purge relay log files. Failed to open relay "
1186 "log file:%s.", rli->relay_log.get_log_fname());
1187 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1188 DBUG_RETURN(1);
1189 }
1190 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1191 }
1192 else
1193 DBUG_RETURN(0);
1194 }
1195 else
1196 {
1197 DBUG_ASSERT(rli->slave_running == 0);
1198 DBUG_ASSERT(rli->mi->slave_running == 0);
1199 }
1200 mysql_mutex_lock(&rli->data_lock);
1201
1202 /*
1203 we close the relay log fd possibly left open by the slave SQL thread,
1204 to be able to delete it; the relay log fd possibly left open by the slave
1205 I/O thread will be closed naturally in reset_logs() by the
1206 close(LOG_CLOSE_TO_BE_OPENED) call
1207 */
1208 if (rli->cur_log_fd >= 0)
1209 {
1210 end_io_cache(&rli->cache_buf);
1211 mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
1212 rli->cur_log_fd= -1;
1213 }
1214
1215 if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0))
1216 {
1217 *errmsg = "Failed during log reset";
1218 error=1;
1219 goto err;
1220 }
1221 rli->relay_log_state.load(rpl_global_gtid_slave_state);
1222 if (!just_reset)
1223 {
1224 /* Save name of used relay log file */
1225 strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
1226 strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
1227 rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1228 rli->log_space_total= 0;
1229
1230 if (count_relay_log_space(rli))
1231 {
1232 *errmsg= "Error counting relay log space";
1233 error=1;
1234 goto err;
1235 }
1236 error= init_relay_log_pos(rli, rli->group_relay_log_name,
1237 rli->group_relay_log_pos,
1238 0 /* do not need data lock */, errmsg, 0);
1239 }
1240 else
1241 {
1242 /* Ensure relay log names are not used */
1243 rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
1244 }
1245
1246 if (!rli->inited && rli->error_on_rli_init_info)
1247 {
1248 mysql_mutex_lock(rli->relay_log.get_log_lock());
1249 rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1250 mysql_mutex_unlock(rli->relay_log.get_log_lock());
1251 }
1252err:
1253 DBUG_PRINT("info",("log_space_total: %llu",rli->log_space_total));
1254 mysql_mutex_unlock(&rli->data_lock);
1255 DBUG_RETURN(error);
1256}
1257
1258
1259/*
1260 Check if condition stated in UNTIL clause of START SLAVE is reached.
1261 SYNOPSYS
1262 Relay_log_info::is_until_satisfied()
1263 master_beg_pos position of the beginning of to be executed event
1264 (not log_pos member of the event that points to the
1265 beginning of the following event)
1266
1267
1268 DESCRIPTION
1269 Checks if UNTIL condition is reached. Uses caching result of last
1270 comparison of current log file name and target log file name. So cached
1271 value should be invalidated if current log file name changes
1272 (see Relay_log_info::notify_... functions).
1273
1274 This caching is needed to avoid of expensive string comparisons and
1275 strtol() conversions needed for log names comparison. We don't need to
1276 compare them each time this function is called, we only need to do this
1277 when current log name changes. If we have UNTIL_MASTER_POS condition we
1278 need to do this only after Rotate_log_event::do_apply_event() (which is
1279 rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
1280 condition then we should invalidate cached comarison value after
1281 inc_group_relay_log_pos() which called for each group of events (so we
1282 have some benefit if we have something like queries that use
1283 autoincrement or if we have transactions).
1284
1285 Should be called ONLY if until_condition != UNTIL_NONE !
1286 RETURN VALUE
1287 true - condition met or error happened (condition seems to have
1288 bad log file name)
1289 false - condition not met
1290*/
1291
1292bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
1293{
1294 const char *log_name;
1295 ulonglong log_pos;
1296 DBUG_ENTER("Relay_log_info::is_until_satisfied");
1297
1298 if (until_condition == UNTIL_MASTER_POS)
1299 {
1300 log_name= (mi->using_parallel() ? future_event_master_log_name
1301 : group_master_log_name);
1302 log_pos= master_beg_pos;
1303 }
1304 else
1305 {
1306 DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
1307 log_name= group_relay_log_name;
1308 log_pos= group_relay_log_pos;
1309 }
1310
1311 DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
1312 group_master_log_name, group_master_log_pos));
1313 DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu",
1314 group_relay_log_name, group_relay_log_pos));
1315 DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu",
1316 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1317 log_name, log_pos));
1318 DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu",
1319 until_condition == UNTIL_MASTER_POS ? "master" : "relay",
1320 until_log_name, until_log_pos));
1321
1322 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1323 {
1324 /*
1325 We have no cached comparison results so we should compare log names
1326 and cache result.
1327 If we are after RESET SLAVE, and the SQL slave thread has not processed
1328 any event yet, it could be that group_master_log_name is "". In that case,
1329 just wait for more events (as there is no sensible comparison to do).
1330 */
1331
1332 if (*log_name)
1333 {
1334 const char *basename= log_name + dirname_length(log_name);
1335
1336 const char *q= (const char*)(fn_ext(basename)+1);
1337 if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
1338 {
1339 /* Now compare extensions. */
1340 char *q_end;
1341 ulong log_name_extension= strtoul(q, &q_end, 10);
1342 if (log_name_extension < until_log_name_extension)
1343 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1344 else
1345 until_log_names_cmp_result=
1346 (log_name_extension > until_log_name_extension) ?
1347 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1348 }
1349 else
1350 {
1351 /* Probably error so we aborting */
1352 sql_print_error("Slave SQL thread is stopped because UNTIL "
1353 "condition is bad.");
1354 DBUG_RETURN(TRUE);
1355 }
1356 }
1357 else
1358 DBUG_RETURN(until_log_pos == 0);
1359 }
1360
1361 DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1362 log_pos >= until_log_pos) ||
1363 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
1364}
1365
1366
1367bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
1368 rpl_group_info *rgi)
1369{
1370 int error= 0;
1371 DBUG_ENTER("Relay_log_info::stmt_done");
1372
1373 DBUG_ASSERT(!belongs_to_client());
1374 DBUG_ASSERT(rgi->rli == this);
1375 /*
1376 If in a transaction, and if the slave supports transactions, just
1377 inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
1378 (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
1379 BEGIN/COMMIT, not with SET AUTOCOMMIT= .
1380
1381 We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
1382 is also used for single row transactions.
1383
1384 CAUTION: opt_using_transactions means innodb || bdb ; suppose the
1385 master supports InnoDB and BDB, but the slave supports only BDB,
1386 problems will arise: - suppose an InnoDB table is created on the
1387 master, - then it will be MyISAM on the slave - but as
1388 opt_using_transactions is true, the slave will believe he is
1389 transactional with the MyISAM table. And problems will come when
1390 one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
1391 resume at BEGIN whereas there has not been any rollback). This is
1392 the problem of using opt_using_transactions instead of a finer
1393 "does the slave support _transactional handler used on the
1394 master_".
1395
1396 More generally, we'll have problems when a query mixes a
1397 transactional handler and MyISAM and STOP SLAVE is issued in the
1398 middle of the "transaction". START SLAVE will resume at BEGIN
1399 while the MyISAM table has already been updated.
1400 */
1401 if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
1402 opt_using_transactions)
1403 rgi->inc_event_relay_log_pos();
1404 else
1405 {
1406 inc_group_relay_log_pos(event_master_log_pos, rgi);
1407 if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
1408 {
1409 report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
1410 "Failed to update GTID state in %s.%s, slave state may become "
1411 "inconsistent: %d: %s",
1412 "mysql", rpl_gtid_slave_state_table_name.str,
1413 thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
1414 /*
1415 At this point we are not in a transaction (for example after DDL),
1416 so we can not roll back. Anyway, normally updates to the slave
1417 state table should not fail, and if they do, at least we made the
1418 DBA aware of the problem in the error log.
1419 */
1420 }
1421 DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
1422 if (mi->using_gtid == Master_info::USE_GTID_NO)
1423 if (flush())
1424 error= 1;
1425 DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
1426 }
1427 DBUG_RETURN(error);
1428}
1429
1430
1431int
1432Relay_log_info::alloc_inuse_relaylog(const char *name)
1433{
1434 inuse_relaylog *ir;
1435 uint32 gtid_count;
1436 rpl_gtid *gtid_list;
1437
1438 if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
1439 {
1440 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
1441 return 1;
1442 }
1443 gtid_count= relay_log_state.count();
1444 if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
1445 MYF(MY_WME))))
1446 {
1447 my_free(ir);
1448 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
1449 return 1;
1450 }
1451 if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
1452 {
1453 my_free(gtid_list);
1454 my_free(ir);
1455 DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
1456 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1457 return 1;
1458 }
1459 ir->rli= this;
1460 strmake_buf(ir->name, name);
1461 ir->relay_log_state= gtid_list;
1462 ir->relay_log_state_count= gtid_count;
1463
1464 if (!inuse_relaylog_list)
1465 inuse_relaylog_list= ir;
1466 else
1467 {
1468 last_inuse_relaylog->completed= true;
1469 last_inuse_relaylog->next= ir;
1470 }
1471 last_inuse_relaylog= ir;
1472
1473 return 0;
1474}
1475
1476
1477void
1478Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
1479{
1480 my_free(ir->relay_log_state);
1481 my_free(ir);
1482}
1483
1484
1485void
1486Relay_log_info::reset_inuse_relaylog()
1487{
1488 inuse_relaylog *cur= inuse_relaylog_list;
1489 while (cur)
1490 {
1491 DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
1492 inuse_relaylog *next= cur->next;
1493 free_inuse_relaylog(cur);
1494 cur= next;
1495 }
1496 inuse_relaylog_list= last_inuse_relaylog= NULL;
1497}
1498
1499
1500int
1501Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
1502{
1503 int res= 0;
1504 while (count)
1505 {
1506 if (relay_log_state.update_nolock(gtid_list, false))
1507 res= 1;
1508 ++gtid_list;
1509 --count;
1510 }
1511 return res;
1512}
1513
1514
1515#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1516struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
1517
1518static int
1519scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
1520 LEX_CSTRING *tablename, void **out_hton)
1521{
1522 TABLE_LIST tlist;
1523 TABLE *UNINIT_VAR(table);
1524 bool table_opened= false;
1525 bool table_scanned= false;
1526 struct gtid_pos_element tmp_entry, *entry;
1527 int err= 0;
1528
1529 thd->reset_for_next_command();
1530 tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ);
1531 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1532 goto end;
1533 table_opened= true;
1534 table= tlist.table;
1535
1536 if ((err= gtid_check_rpl_slave_state_table(table)))
1537 goto end;
1538
1539 bitmap_set_all(table->read_set);
1540 if (unlikely(err= table->file->ha_rnd_init_with_error(1)))
1541 goto end;
1542
1543 table_scanned= true;
1544 for (;;)
1545 {
1546 uint32 domain_id, server_id;
1547 uint64 sub_id, seq_no;
1548 uchar *rec;
1549
1550 if ((err= table->file->ha_rnd_next(table->record[0])))
1551 {
1552 if (err == HA_ERR_END_OF_FILE)
1553 break;
1554 else
1555 {
1556 table->file->print_error(err, MYF(0));
1557 goto end;
1558 }
1559 }
1560 domain_id= (uint32)table->field[0]->val_int();
1561 sub_id= (ulonglong)table->field[1]->val_int();
1562 server_id= (uint32)table->field[2]->val_int();
1563 seq_no= (ulonglong)table->field[3]->val_int();
1564 DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
1565 (unsigned)domain_id, (unsigned)server_id,
1566 (ulong)seq_no, (ulong)sub_id));
1567
1568 tmp_entry.sub_id= sub_id;
1569 tmp_entry.gtid.domain_id= domain_id;
1570 tmp_entry.gtid.server_id= server_id;
1571 tmp_entry.gtid.seq_no= seq_no;
1572 tmp_entry.hton= table->s->db_type();
1573 if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
1574 {
1575 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1576 goto end;
1577 }
1578
1579 if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
1580 {
1581 entry= (struct gtid_pos_element *)rec;
1582 if (entry->sub_id >= sub_id)
1583 continue;
1584 entry->sub_id= sub_id;
1585 DBUG_ASSERT(entry->gtid.domain_id == domain_id);
1586 entry->gtid.server_id= server_id;
1587 entry->gtid.seq_no= seq_no;
1588 entry->hton= table->s->db_type();
1589 }
1590 else
1591 {
1592 if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
1593 MYF(MY_WME))))
1594 {
1595 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
1596 err= 1;
1597 goto end;
1598 }
1599 entry->sub_id= sub_id;
1600 entry->gtid.domain_id= domain_id;
1601 entry->gtid.server_id= server_id;
1602 entry->gtid.seq_no= seq_no;
1603 entry->hton= table->s->db_type();
1604 if ((err= my_hash_insert(hash, (uchar *)entry)))
1605 {
1606 my_free(entry);
1607 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1608 goto end;
1609 }
1610 }
1611 }
1612 err= 0; /* Clear HA_ERR_END_OF_FILE */
1613
1614end:
1615 if (table_scanned)
1616 {
1617 table->file->ha_index_or_rnd_end();
1618 ha_commit_trans(thd, FALSE);
1619 ha_commit_trans(thd, TRUE);
1620 }
1621 if (table_opened)
1622 {
1623 *out_hton= table->s->db_type();
1624 close_thread_tables(thd);
1625 thd->mdl_context.release_transactional_locks();
1626 }
1627 return err;
1628}
1629
1630
1631/*
1632 Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
1633 table found into ARRAY. For each domain id, put the row with highest sub_id
1634 into HASH.
1635*/
1636static int
1637scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
1638 void *cb_data)
1639{
1640 char path[FN_REFLEN];
1641 MY_DIR *dirp;
1642
1643 thd->reset_for_next_command();
1644 if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str))
1645 return 1;
1646
1647 build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0);
1648 if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
1649 {
1650 my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
1651 close_thread_tables(thd);
1652 thd->mdl_context.release_transactional_locks();
1653 return 1;
1654 }
1655 else
1656 {
1657 size_t i;
1658 Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files);
1659 Discovered_table_list tl(thd, &files);
1660 int err;
1661
1662 err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false);
1663 my_dirend(dirp);
1664 close_thread_tables(thd);
1665 thd->mdl_context.release_transactional_locks();
1666 if (err)
1667 return err;
1668
1669 for (i = 0; i < files.elements(); ++i)
1670 {
1671 if (strncmp(files.at(i)->str,
1672 rpl_gtid_slave_state_table_name.str,
1673 rpl_gtid_slave_state_table_name.length) == 0)
1674 {
1675 if ((err= (*cb)(thd, files.at(i), cb_data)))
1676 return err;
1677 }
1678 }
1679 }
1680
1681 return 0;
1682}
1683
1684
1685struct load_gtid_state_cb_data {
1686 HASH *hash;
1687 DYNAMIC_ARRAY *array;
1688 struct rpl_slave_state::gtid_pos_table *table_list;
1689 struct rpl_slave_state::gtid_pos_table *default_entry;
1690};
1691
1692static int
1693process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
1694 struct load_gtid_state_cb_data *data)
1695{
1696 struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
1697 bool is_default=
1698 (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
1699
1700 /*
1701 Ignore tables with duplicate storage engine, with a warning.
1702 Prefer the default mysql.gtid_slave_pos over another table
1703 mysql.gtid_slave_posXXX with the same storage engine.
1704 */
1705 next_ptr= &data->table_list;
1706 entry= data->table_list;
1707 while (entry)
1708 {
1709 if (entry->table_hton == hton)
1710 {
1711 static const char *warning_msg= "Ignoring redundant table mysql.%s "
1712 "since mysql.%s has the same storage engine";
1713 if (!is_default)
1714 {
1715 /* Ignore the redundant table. */
1716 sql_print_warning(warning_msg, table_name->str, entry->table_name.str);
1717 return 0;
1718 }
1719 else
1720 {
1721 sql_print_warning(warning_msg, entry->table_name.str, table_name->str);
1722 /* Delete the redundant table, and proceed to add this one instead. */
1723 *next_ptr= entry->next;
1724 my_free(entry);
1725 break;
1726 }
1727 }
1728 next_ptr= &entry->next;
1729 entry= entry->next;
1730 }
1731
1732 p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
1733 hton, rpl_slave_state::GTID_POS_AVAILABLE);
1734 if (!p)
1735 return 1;
1736 p->next= data->table_list;
1737 data->table_list= p;
1738 if (is_default)
1739 data->default_entry= p;
1740 return 0;
1741}
1742
1743
1744/*
1745 Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
1746 marked to be auto-created if needed.
1747*/
1748static int
1749gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
1750{
1751 plugin_ref *auto_engines;
1752 int err= 0;
1753 mysql_mutex_lock(&LOCK_global_system_variables);
1754 for (auto_engines= opt_gtid_pos_auto_plugins;
1755 !err && auto_engines && *auto_engines;
1756 ++auto_engines)
1757 {
1758 void *hton= plugin_hton(*auto_engines);
1759 char buf[FN_REFLEN+1];
1760 LEX_CSTRING table_name;
1761 char *p;
1762 rpl_slave_state::gtid_pos_table *entry, **next_ptr;
1763
1764 /* See if this engine is already in the list. */
1765 next_ptr= list_ptr;
1766 entry= *list_ptr;
1767 while (entry)
1768 {
1769 if (entry->table_hton == hton)
1770 break;
1771 next_ptr= &entry->next;
1772 entry= entry->next;
1773 }
1774 if (entry)
1775 continue;
1776
1777 /* Add an auto-create entry for this engine at end of list. */
1778 p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
1779 p= strmake(p, "_", FN_REFLEN - (p - buf));
1780 p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
1781 table_name.str= buf;
1782 table_name.length= p - buf;
1783 table_case_convert(const_cast<char*>(table_name.str),
1784 static_cast<uint>(table_name.length));
1785 entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
1786 (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
1787 if (!entry)
1788 {
1789 err= 1;
1790 break;
1791 }
1792 *next_ptr= entry;
1793 }
1794 mysql_mutex_unlock(&LOCK_global_system_variables);
1795 return err;
1796}
1797
1798
1799static int
1800load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1801{
1802 int err;
1803 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1804 void *hton;
1805
1806 if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
1807 table_name, &hton)))
1808 return err;
1809 return process_gtid_pos_table(thd, table_name, hton, data);
1810}
1811
1812
1813int
1814rpl_load_gtid_slave_state(THD *thd)
1815{
1816 bool array_inited= false;
1817 struct gtid_pos_element tmp_entry, *entry;
1818 HASH hash;
1819 DYNAMIC_ARRAY array;
1820 int err= 0;
1821 uint32 i;
1822 load_gtid_state_cb_data cb_data;
1823 DBUG_ENTER("rpl_load_gtid_slave_state");
1824
1825 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1826 bool loaded= rpl_global_gtid_slave_state->loaded;
1827 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1828 if (loaded)
1829 DBUG_RETURN(0);
1830
1831 cb_data.table_list= NULL;
1832 cb_data.default_entry= NULL;
1833 my_hash_init(&hash, &my_charset_bin, 32,
1834 offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
1835 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1836 if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
1837 goto end;
1838 array_inited= true;
1839
1840 cb_data.hash = &hash;
1841 cb_data.array = &array;
1842 if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
1843 goto end;
1844
1845 if (!cb_data.default_entry)
1846 {
1847 /*
1848 If the mysql.gtid_slave_pos table does not exist, but at least one other
1849 table is available, arbitrarily pick the first in the list to use as
1850 default.
1851 */
1852 cb_data.default_entry= cb_data.table_list;
1853 }
1854 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
1855 goto end;
1856
1857 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1858 if (rpl_global_gtid_slave_state->loaded)
1859 {
1860 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1861 goto end;
1862 }
1863
1864 if (!cb_data.table_list)
1865 {
1866 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
1867 rpl_gtid_slave_state_table_name.str);
1868 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1869 err= 1;
1870 goto end;
1871 }
1872
1873 for (i= 0; i < array.elements; ++i)
1874 {
1875 get_dynamic(&array, (uchar *)&tmp_entry, i);
1876 if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
1877 tmp_entry.gtid.server_id,
1878 tmp_entry.sub_id,
1879 tmp_entry.gtid.seq_no,
1880 tmp_entry.hton,
1881 NULL)))
1882 {
1883 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1884 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1885 goto end;
1886 }
1887 }
1888
1889 for (i= 0; i < hash.records; ++i)
1890 {
1891 entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
1892 if (opt_bin_log &&
1893 mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
1894 entry->gtid.seq_no))
1895 {
1896 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1897 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1898 goto end;
1899 }
1900 }
1901
1902 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
1903 cb_data.default_entry);
1904 cb_data.table_list= NULL;
1905 rpl_global_gtid_slave_state->loaded= true;
1906 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1907
1908end:
1909 if (array_inited)
1910 delete_dynamic(&array);
1911 my_hash_free(&hash);
1912 if (cb_data.table_list)
1913 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
1914 DBUG_RETURN(err);
1915}
1916
1917
1918static int
1919find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
1920{
1921 load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
1922 TABLE_LIST tlist;
1923 TABLE *table= NULL;
1924 int err;
1925
1926 thd->reset_for_next_command();
1927 tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ);
1928 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
1929 goto end;
1930 table= tlist.table;
1931
1932 if ((err= gtid_check_rpl_slave_state_table(table)))
1933 goto end;
1934 err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
1935
1936end:
1937 if (table)
1938 {
1939 ha_commit_trans(thd, FALSE);
1940 ha_commit_trans(thd, TRUE);
1941 close_thread_tables(thd);
1942 thd->mdl_context.release_transactional_locks();
1943 }
1944
1945 return err;
1946}
1947
1948
1949/*
1950 Re-compute the list of available mysql.gtid_slave_posXXX tables.
1951
1952 This is done at START SLAVE to pick up any newly created tables without
1953 requiring server restart.
1954*/
1955int
1956find_gtid_slave_pos_tables(THD *thd)
1957{
1958 int err= 0;
1959 load_gtid_state_cb_data cb_data;
1960 uint num_running;
1961
1962 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1963 bool loaded= rpl_global_gtid_slave_state->loaded;
1964 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1965 if (!loaded)
1966 return 0;
1967
1968 cb_data.table_list= NULL;
1969 cb_data.default_entry= NULL;
1970 if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
1971 goto end;
1972
1973 if (!cb_data.table_list)
1974 {
1975 my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
1976 rpl_gtid_slave_state_table_name.str);
1977 err= 1;
1978 goto end;
1979 }
1980 if (!cb_data.default_entry)
1981 {
1982 /*
1983 If the mysql.gtid_slave_pos table does not exist, but at least one other
1984 table is available, arbitrarily pick the first in the list to use as
1985 default.
1986 */
1987 cb_data.default_entry= cb_data.table_list;
1988 }
1989 if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
1990 goto end;
1991
1992 mysql_mutex_lock(&LOCK_active_mi);
1993 num_running= any_slave_sql_running(true);
1994 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
1995 if (num_running <= 1)
1996 {
1997 /*
1998 If no slave is running now, the count will be 1, since this SQL thread
1999 which is starting is included in the count. In this case, we can safely
2000 replace the list, no-one can be trying to read it without lock.
2001 */
2002 DBUG_ASSERT(num_running == 1);
2003 rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
2004 cb_data.default_entry);
2005 cb_data.table_list= NULL;
2006 }
2007 else
2008 {
2009 /*
2010 If there are SQL threads running, we cannot safely remove the old list.
2011 However we can add new entries, and warn about any tables that
2012 disappeared, but may still be visible to running SQL threads.
2013 */
2014 rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
2015
2016 old_entry= (rpl_slave_state::gtid_pos_table *)
2017 rpl_global_gtid_slave_state->gtid_pos_tables;
2018 while (old_entry)
2019 {
2020 new_entry= cb_data.table_list;
2021 while (new_entry)
2022 {
2023 if (new_entry->table_hton == old_entry->table_hton)
2024 break;
2025 new_entry= new_entry->next;
2026 }
2027 if (!new_entry)
2028 sql_print_warning("The table mysql.%s was removed. "
2029 "This change will not take full effect "
2030 "until all SQL threads have been restarted",
2031 old_entry->table_name.str);
2032 old_entry= old_entry->next;
2033 }
2034 next_ptr_ptr= &cb_data.table_list;
2035 new_entry= cb_data.table_list;
2036 while (new_entry)
2037 {
2038 /* Check if we already have a table with this storage engine. */
2039 old_entry= (rpl_slave_state::gtid_pos_table *)
2040 rpl_global_gtid_slave_state->gtid_pos_tables;
2041 while (old_entry)
2042 {
2043 if (new_entry->table_hton == old_entry->table_hton)
2044 break;
2045 old_entry= old_entry->next;
2046 }
2047 if (old_entry)
2048 {
2049 /* This new_entry is already available in the list. */
2050 next_ptr_ptr= &new_entry->next;
2051 new_entry= new_entry->next;
2052 }
2053 else
2054 {
2055 /* Move this new_entry to the list. */
2056 rpl_slave_state::gtid_pos_table *next= new_entry->next;
2057 rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
2058 *next_ptr_ptr= next;
2059 new_entry= next;
2060 }
2061 }
2062 }
2063 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2064 mysql_mutex_unlock(&LOCK_active_mi);
2065
2066end:
2067 if (cb_data.table_list)
2068 rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
2069 return err;
2070}
2071
2072
2073void
2074rpl_group_info::reinit(Relay_log_info *rli)
2075{
2076 this->rli= rli;
2077 tables_to_lock= NULL;
2078 tables_to_lock_count= 0;
2079 trans_retries= 0;
2080 last_event_start_time= 0;
2081 gtid_sub_id= 0;
2082 commit_id= 0;
2083 gtid_pending= false;
2084 worker_error= 0;
2085 row_stmt_start_timestamp= 0;
2086 long_find_row_note_printed= false;
2087 did_mark_start_commit= false;
2088 gtid_ev_flags2= 0;
2089 last_master_timestamp = 0;
2090 gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
2091 speculation= SPECULATE_NO;
2092 commit_orderer.reinit();
2093}
2094
2095rpl_group_info::rpl_group_info(Relay_log_info *rli)
2096 : thd(0), wait_commit_sub_id(0),
2097 wait_commit_group_info(0), parallel_entry(0),
2098 deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
2099{
2100 reinit(rli);
2101 bzero(&current_gtid, sizeof(current_gtid));
2102 mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
2103 MY_MUTEX_INIT_FAST);
2104 mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
2105}
2106
2107
2108rpl_group_info::~rpl_group_info()
2109{
2110 free_annotate_event();
2111 delete deferred_events;
2112 mysql_mutex_destroy(&sleep_lock);
2113 mysql_cond_destroy(&sleep_cond);
2114}
2115
2116
2117int
2118event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
2119{
2120 uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
2121 if (!sub_id)
2122 {
2123 /* Out of memory caused hash insertion to fail. */
2124 return 1;
2125 }
2126 rgi->gtid_sub_id= sub_id;
2127 rgi->current_gtid.domain_id= gev->domain_id;
2128 rgi->current_gtid.server_id= gev->server_id;
2129 rgi->current_gtid.seq_no= gev->seq_no;
2130 rgi->commit_id= gev->commit_id;
2131 rgi->gtid_pending= true;
2132 return 0;
2133}
2134
2135
2136void
2137delete_or_keep_event_post_apply(rpl_group_info *rgi,
2138 Log_event_type typ, Log_event *ev)
2139{
2140 /*
2141 ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
2142 thread-safe for parallel replication.
2143 */
2144
2145 switch (typ) {
2146 case FORMAT_DESCRIPTION_EVENT:
2147 /*
2148 Format_description_log_event should not be deleted because it
2149 will be used to read info about the relay log's format;
2150 it will be deleted when the SQL thread does not need it,
2151 i.e. when this thread terminates.
2152 */
2153 break;
2154 case ANNOTATE_ROWS_EVENT:
2155 /*
2156 Annotate_rows event should not be deleted because after it has
2157 been applied, thd->query points to the string inside this event.
2158 The thd->query will be used to generate new Annotate_rows event
2159 during applying the subsequent Rows events.
2160 */
2161 rgi->set_annotate_event((Annotate_rows_log_event*) ev);
2162 break;
2163 case DELETE_ROWS_EVENT_V1:
2164 case UPDATE_ROWS_EVENT_V1:
2165 case WRITE_ROWS_EVENT_V1:
2166 case DELETE_ROWS_EVENT:
2167 case UPDATE_ROWS_EVENT:
2168 case WRITE_ROWS_EVENT:
2169 case WRITE_ROWS_COMPRESSED_EVENT:
2170 case DELETE_ROWS_COMPRESSED_EVENT:
2171 case UPDATE_ROWS_COMPRESSED_EVENT:
2172 case WRITE_ROWS_COMPRESSED_EVENT_V1:
2173 case UPDATE_ROWS_COMPRESSED_EVENT_V1:
2174 case DELETE_ROWS_COMPRESSED_EVENT_V1:
2175 /*
2176 After the last Rows event has been applied, the saved Annotate_rows
2177 event (if any) is not needed anymore and can be deleted.
2178 */
2179 if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
2180 rgi->free_annotate_event();
2181 /* fall through */
2182 default:
2183 DBUG_PRINT("info", ("Deleting the event after it has been executed"));
2184 if (!rgi->is_deferred_event(ev))
2185 delete ev;
2186 break;
2187 }
2188}
2189
2190
2191void rpl_group_info::cleanup_context(THD *thd, bool error)
2192{
2193 DBUG_ENTER("rpl_group_info::cleanup_context");
2194 DBUG_PRINT("enter", ("error: %d", (int) error));
2195
2196 DBUG_ASSERT(this->thd == thd);
2197 /*
2198 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
2199 may have opened tables, which we cannot be sure have been closed (because
2200 maybe the Rows_log_event have not been found or will not be, because slave
2201 SQL thread is stopping, or relay log has a missing tail etc). So we close
2202 all thread's tables. And so the table mappings have to be cancelled.
2203 2) Rows_log_event::do_apply_event() may even have started statements or
2204 transactions on them, which we need to rollback in case of error.
2205 3) If finding a Format_description_log_event after a BEGIN, we also need
2206 to rollback before continuing with the next events.
2207 4) so we need this "context cleanup" function.
2208 */
2209 if (unlikely(error))
2210 {
2211 trans_rollback_stmt(thd); // if a "statement transaction"
2212 /* trans_rollback() also resets OPTION_GTID_BEGIN */
2213 trans_rollback(thd); // if a "real transaction"
2214 /*
2215 Now that we have rolled back the transaction, make sure we do not
2216 erroneously update the GTID position.
2217 */
2218 gtid_pending= false;
2219 }
2220 m_table_map.clear_tables();
2221 slave_close_thread_tables(thd);
2222
2223 if (unlikely(error))
2224 {
2225 thd->mdl_context.release_transactional_locks();
2226
2227 if (thd == rli->sql_driver_thd)
2228 {
2229 /*
2230 Reset flags. This is needed to handle incident events and errors in
2231 the relay log noticed by the sql driver thread.
2232 */
2233 rli->clear_flag(Relay_log_info::IN_STMT);
2234 rli->clear_flag(Relay_log_info::IN_TRANSACTION);
2235 }
2236
2237 /*
2238 Ensure we always release the domain for others to process, when using
2239 --gtid-ignore-duplicates.
2240 */
2241 if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
2242 rpl_global_gtid_slave_state->release_domain_owner(this);
2243 }
2244
2245 /*
2246 Cleanup for the flags that have been set at do_apply_event.
2247 */
2248 thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS |
2249 OPTION_RELAXED_UNIQUE_CHECKS |
2250 OPTION_NO_CHECK_CONSTRAINT_CHECKS);
2251
2252 /*
2253 Reset state related to long_find_row notes in the error log:
2254 - timestamp
2255 - flag that decides whether the slave prints or not
2256 */
2257 reset_row_stmt_start_timestamp();
2258 unset_long_find_row_note_printed();
2259
2260 DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
2261 if (current_gtid.domain_id == 100)
2262 my_sleep(50000);
2263 };);
2264
2265 DBUG_VOID_RETURN;
2266}
2267
2268
2269void rpl_group_info::clear_tables_to_lock()
2270{
2271 DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
2272#ifndef DBUG_OFF
2273 /**
2274 When replicating in RBR and MyISAM Merge tables are involved
2275 open_and_lock_tables (called in do_apply_event) appends the
2276 base tables to the list of tables_to_lock. Then these are
2277 removed from the list in close_thread_tables (which is called
2278 before we reach this point).
2279
2280 This assertion just confirms that we get no surprises at this
2281 point.
2282 */
2283 uint i=0;
2284 for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
2285 DBUG_ASSERT(i == tables_to_lock_count);
2286#endif
2287
2288 while (tables_to_lock)
2289 {
2290 uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
2291 if (tables_to_lock->m_tabledef_valid)
2292 {
2293 tables_to_lock->m_tabledef.table_def::~table_def();
2294 tables_to_lock->m_tabledef_valid= FALSE;
2295 }
2296
2297 /*
2298 If blob fields were used during conversion of field values
2299 from the master table into the slave table, then we need to
2300 free the memory used temporarily to store their values before
2301 copying into the slave's table.
2302 */
2303 if (tables_to_lock->m_conv_table)
2304 free_blobs(tables_to_lock->m_conv_table);
2305
2306 tables_to_lock=
2307 static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
2308 tables_to_lock_count--;
2309 my_free(to_free);
2310 }
2311 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
2312 DBUG_VOID_RETURN;
2313}
2314
2315
2316void rpl_group_info::slave_close_thread_tables(THD *thd)
2317{
2318 DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
2319 thd->get_stmt_da()->set_overwrite_status(true);
2320 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2321 thd->get_stmt_da()->set_overwrite_status(false);
2322
2323 close_thread_tables(thd);
2324 /*
2325 - If transaction rollback was requested due to deadlock
2326 perform it and release metadata locks.
2327 - If inside a multi-statement transaction,
2328 defer the release of metadata locks until the current
2329 transaction is either committed or rolled back. This prevents
2330 other statements from modifying the table for the entire
2331 duration of this transaction. This provides commit ordering
2332 and guarantees serializability across multiple transactions.
2333 - If in autocommit mode, or outside a transactional context,
2334 automatically release metadata locks of the current statement.
2335 */
2336 if (thd->transaction_rollback_request)
2337 {
2338 trans_rollback_implicit(thd);
2339 thd->mdl_context.release_transactional_locks();
2340 }
2341 else if (! thd->in_multi_stmt_transaction_mode())
2342 thd->mdl_context.release_transactional_locks();
2343 else
2344 thd->mdl_context.release_statement_locks();
2345
2346 clear_tables_to_lock();
2347 DBUG_VOID_RETURN;
2348}
2349
2350
2351
2352static void
2353mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
2354 rpl_group_info *rgi)
2355{
2356 group_commit_orderer *tmp;
2357 uint64 count= ++e->count_committing_event_groups;
2358 /* Signal any following GCO whose wait_count has been reached now. */
2359 tmp= gco;
2360 while ((tmp= tmp->next_gco))
2361 {
2362 uint64 wait_count= tmp->wait_count;
2363 if (wait_count > count)
2364 break;
2365 mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
2366 }
2367}
2368
2369
2370void
2371rpl_group_info::mark_start_commit_no_lock()
2372{
2373 if (did_mark_start_commit)
2374 return;
2375 did_mark_start_commit= true;
2376 mark_start_commit_inner(parallel_entry, gco, this);
2377}
2378
2379
2380void
2381rpl_group_info::mark_start_commit()
2382{
2383 rpl_parallel_entry *e;
2384
2385 if (did_mark_start_commit)
2386 return;
2387 did_mark_start_commit= true;
2388
2389 e= this->parallel_entry;
2390 mysql_mutex_lock(&e->LOCK_parallel_entry);
2391 mark_start_commit_inner(e, gco, this);
2392 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2393}
2394
2395
2396/*
2397 Format the current GTID as a string suitable for printing in error messages.
2398
2399 The string is stored in a buffer inside rpl_group_info, so remains valid
2400 until next call to gtid_info() or until destruction of rpl_group_info.
2401
2402 If no GTID is available, then NULL is returned.
2403*/
2404char *
2405rpl_group_info::gtid_info()
2406{
2407 if (!gtid_sub_id || !current_gtid.seq_no)
2408 return NULL;
2409 my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
2410 current_gtid.domain_id, current_gtid.server_id,
2411 current_gtid.seq_no);
2412 return gtid_info_buf;
2413}
2414
2415
2416/*
2417 Undo the effect of a prior mark_start_commit().
2418
2419 This is only used for retrying a transaction in parallel replication, after
2420 we have encountered a deadlock or other temporary error.
2421
2422 When we get such a deadlock, it means that the current group of transactions
2423 did not yet all start committing (else they would not have deadlocked). So
2424 we will not yet have woken up anything in the next group, our rgi->gco is
2425 still live, and we can simply decrement the counter (to be incremented again
2426 later, when the retry succeeds and reaches the commit step).
2427*/
2428void
2429rpl_group_info::unmark_start_commit()
2430{
2431 rpl_parallel_entry *e;
2432
2433 if (!did_mark_start_commit)
2434 return;
2435 did_mark_start_commit= false;
2436
2437 e= this->parallel_entry;
2438 mysql_mutex_lock(&e->LOCK_parallel_entry);
2439 --e->count_committing_event_groups;
2440 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2441}
2442
2443
2444rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
2445 : rpl_filter(filter)
2446{
2447 cached_charset_invalidate();
2448}
2449
2450
2451void rpl_sql_thread_info::cached_charset_invalidate()
2452{
2453 DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
2454
2455 /* Full of zeroes means uninitialized. */
2456 bzero(cached_charset, sizeof(cached_charset));
2457 DBUG_VOID_RETURN;
2458}
2459
2460
2461bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
2462{
2463 DBUG_ENTER("rpl_group_info::cached_charset_compare");
2464
2465 if (memcmp(cached_charset, charset, sizeof(cached_charset)))
2466 {
2467 memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
2468 DBUG_RETURN(1);
2469 }
2470 DBUG_RETURN(0);
2471}
2472
2473
2474/**
2475 Store the file and position where the slave's SQL thread are in the
2476 relay log.
2477
2478 Notes:
2479
2480 - This function should be called either from the slave SQL thread,
2481 or when the slave thread is not running. (It reads the
2482 group_{relay|master}_log_{pos|name} and delay fields in the rli
2483 object. These may only be modified by the slave SQL thread or by
2484 a client thread when the slave SQL thread is not running.)
2485
2486 - If there is an active transaction, then we do not update the
2487 position in the relay log. This is to ensure that we re-execute
2488 statements if we die in the middle of an transaction that was
2489 rolled back.
2490
2491 - As a transaction never spans binary logs, we don't have to handle
2492 the case where we do a relay-log-rotation in the middle of the
2493 transaction. If transactions could span several binlogs, we would
2494 have to ensure that we do not delete the relay log file where the
2495 transaction started before switching to a new relay log file.
2496
2497 - Error can happen if writing to file fails or if flushing the file
2498 fails.
2499
2500 @param rli The object representing the Relay_log_info.
2501
2502 @todo Change the log file information to a binary format to avoid
2503 calling longlong2str.
2504
2505 @return 0 on success, 1 on error.
2506*/
2507bool Relay_log_info::flush()
2508{
2509 bool error=0;
2510
2511 DBUG_ENTER("Relay_log_info::flush()");
2512
2513 IO_CACHE *file = &info_file;
2514 // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
2515 char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
2516 my_b_seek(file, 0L);
2517 pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
2518 *pos++='\n';
2519 pos=strmov(pos, group_relay_log_name);
2520 *pos++='\n';
2521 pos=longlong10_to_str(group_relay_log_pos, pos, 10);
2522 *pos++='\n';
2523 pos=strmov(pos, group_master_log_name);
2524 *pos++='\n';
2525 pos=longlong10_to_str(group_master_log_pos, pos, 10);
2526 *pos++='\n';
2527 pos= longlong10_to_str(sql_delay, pos, 10);
2528 *pos++= '\n';
2529 if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)))
2530 error=1;
2531 if (flush_io_cache(file))
2532 error=1;
2533 if (sync_relayloginfo_period &&
2534 !error &&
2535 ++sync_counter >= sync_relayloginfo_period)
2536 {
2537 if (my_sync(info_fd, MYF(MY_WME)))
2538 error=1;
2539 sync_counter= 0;
2540 }
2541 /*
2542 Flushing the relay log is done by the slave I/O thread
2543 or by the user on STOP SLAVE.
2544 */
2545 DBUG_RETURN(error);
2546}
2547
2548#endif
2549