| 1 | /* Copyright (c) 2006, 2017, Oracle and/or its affiliates. |
| 2 | Copyright (c) 2010, 2017, MariaDB Corporation |
| 3 | |
| 4 | This program is free software; you can redistribute it and/or modify |
| 5 | it under the terms of the GNU General Public License as published by |
| 6 | the Free Software Foundation; version 2 of the License. |
| 7 | |
| 8 | This program is distributed in the hope that it will be useful, |
| 9 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | GNU General Public License for more details. |
| 12 | |
| 13 | You should have received a copy of the GNU General Public License |
| 14 | along with this program; if not, write to the Free Software Foundation, |
| 15 | 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ |
| 16 | |
| 17 | #include "mariadb.h" |
| 18 | #include "sql_priv.h" |
| 19 | #include "unireg.h" // HAVE_* |
| 20 | #include "rpl_mi.h" |
| 21 | #include "rpl_rli.h" |
| 22 | #include "sql_base.h" // close_thread_tables |
| 23 | #include <my_dir.h> // For MY_STAT |
| 24 | #include "sql_repl.h" // For check_binlog_magic |
| 25 | #include "log_event.h" // Format_description_log_event, Log_event, |
| 26 | // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT, |
| 27 | // PREFIX_SQL_LOAD |
| 28 | #include "rpl_utility.h" |
| 29 | #include "transaction.h" |
| 30 | #include "sql_parse.h" // end_trans, ROLLBACK |
| 31 | #include "slave.h" |
| 32 | #include <mysql/plugin.h> |
| 33 | #include <mysql/service_thd_wait.h> |
| 34 | #include "lock.h" |
| 35 | #include "sql_table.h" |
| 36 | |
| 37 | static int count_relay_log_space(Relay_log_info* rli); |
| 38 | |
| 39 | /** |
| 40 | Current replication state (hash of last GTID executed, per replication |
| 41 | domain). |
| 42 | */ |
| 43 | rpl_slave_state *rpl_global_gtid_slave_state; |
| 44 | /* Object used for MASTER_GTID_WAIT(). */ |
| 45 | gtid_waiting rpl_global_gtid_waiting; |
| 46 | |
| 47 | const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event" ; |
| 48 | |
| 49 | Relay_log_info::Relay_log_info(bool is_slave_recovery) |
| 50 | :Slave_reporting_capability("SQL" ), |
| 51 | replicate_same_server_id(::replicate_same_server_id), |
| 52 | info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), |
| 53 | sync_counter(0), is_relay_log_recovery(is_slave_recovery), |
| 54 | save_temporary_tables(0), |
| 55 | mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0), |
| 56 | cur_log_old_open_count(0), error_on_rli_init_info(false), |
| 57 | group_relay_log_pos(0), event_relay_log_pos(0), |
| 58 | group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), |
| 59 | last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), |
| 60 | abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), |
| 61 | gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), |
| 62 | slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), |
| 63 | until_log_pos(0), retried_trans(0), executed_entries(0), |
| 64 | sql_delay(0), sql_delay_end(0), |
| 65 | m_flags(0) |
| 66 | { |
| 67 | DBUG_ENTER("Relay_log_info::Relay_log_info" ); |
| 68 | |
| 69 | relay_log.is_relay_log= TRUE; |
| 70 | relay_log_state.init(); |
| 71 | #ifdef HAVE_PSI_INTERFACE |
| 72 | relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, |
| 73 | key_RELAYLOG_COND_relay_log_updated, |
| 74 | key_RELAYLOG_COND_bin_log_updated, |
| 75 | key_file_relaylog, |
| 76 | key_file_relaylog_index, |
| 77 | key_RELAYLOG_COND_queue_busy, |
| 78 | key_LOCK_relaylog_end_pos); |
| 79 | #endif |
| 80 | |
| 81 | group_relay_log_name[0]= event_relay_log_name[0]= |
| 82 | group_master_log_name[0]= 0; |
| 83 | until_log_name[0]= ign_master_log_name_end[0]= 0; |
| 84 | max_relay_log_size= global_system_variables.max_relay_log_size; |
| 85 | bzero((char*) &info_file, sizeof(info_file)); |
| 86 | bzero((char*) &cache_buf, sizeof(cache_buf)); |
| 87 | mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); |
| 88 | mysql_mutex_init(key_relay_log_info_data_lock, |
| 89 | &data_lock, MY_MUTEX_INIT_FAST); |
| 90 | mysql_mutex_init(key_relay_log_info_log_space_lock, |
| 91 | &log_space_lock, MY_MUTEX_INIT_FAST); |
| 92 | mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); |
| 93 | mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); |
| 94 | mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); |
| 95 | mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); |
| 96 | relay_log.init_pthread_objects(); |
| 97 | DBUG_VOID_RETURN; |
| 98 | } |
| 99 | |
| 100 | |
| 101 | Relay_log_info::~Relay_log_info() |
| 102 | { |
| 103 | DBUG_ENTER("Relay_log_info::~Relay_log_info" ); |
| 104 | |
| 105 | reset_inuse_relaylog(); |
| 106 | mysql_mutex_destroy(&run_lock); |
| 107 | mysql_mutex_destroy(&data_lock); |
| 108 | mysql_mutex_destroy(&log_space_lock); |
| 109 | mysql_cond_destroy(&data_cond); |
| 110 | mysql_cond_destroy(&start_cond); |
| 111 | mysql_cond_destroy(&stop_cond); |
| 112 | mysql_cond_destroy(&log_space_cond); |
| 113 | relay_log.cleanup(); |
| 114 | DBUG_VOID_RETURN; |
| 115 | } |
| 116 | |
| 117 | |
| 118 | /** |
| 119 | Read the relay_log.info file. |
| 120 | |
| 121 | @param info_fname The name of the file to read from. |
| 122 | @retval 0 success |
| 123 | @retval 1 failure |
| 124 | */ |
| 125 | int Relay_log_info::init(const char* info_fname) |
| 126 | { |
| 127 | char fname[FN_REFLEN+128]; |
| 128 | const char* msg = 0; |
| 129 | int error = 0; |
| 130 | mysql_mutex_t *log_lock; |
| 131 | DBUG_ENTER("Relay_log_info::init" ); |
| 132 | |
| 133 | if (inited) // Set if this function called |
| 134 | DBUG_RETURN(0); |
| 135 | |
| 136 | log_lock= relay_log.get_log_lock(); |
| 137 | fn_format(fname, info_fname, mysql_data_home, "" , 4+32); |
| 138 | mysql_mutex_lock(&data_lock); |
| 139 | cur_log_fd = -1; |
| 140 | slave_skip_counter=0; |
| 141 | abort_pos_wait=0; |
| 142 | log_space_limit= relay_log_space_limit; |
| 143 | log_space_total= 0; |
| 144 | |
| 145 | if (unlikely(error_on_rli_init_info)) |
| 146 | goto err; |
| 147 | |
| 148 | char pattern[FN_REFLEN]; |
| 149 | (void) my_realpath(pattern, slave_load_tmpdir, 0); |
| 150 | if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "" , |
| 151 | MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) |
| 152 | { |
| 153 | mysql_mutex_unlock(&data_lock); |
| 154 | sql_print_error("Unable to use slave's temporary directory %s" , |
| 155 | slave_load_tmpdir); |
| 156 | DBUG_RETURN(1); |
| 157 | } |
| 158 | unpack_filename(slave_patternload_file, pattern); |
| 159 | slave_patternload_file_size= strlen(slave_patternload_file); |
| 160 | |
| 161 | /* |
| 162 | The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. |
| 163 | Note that the I/O thread flushes it to disk after writing every |
| 164 | event, in flush_master_info(mi, 1, ?). |
| 165 | */ |
| 166 | |
| 167 | { |
| 168 | /* Reports an error and returns, if the --relay-log's path |
| 169 | is a directory.*/ |
| 170 | if (opt_relay_logname && |
| 171 | opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) |
| 172 | { |
| 173 | mysql_mutex_unlock(&data_lock); |
| 174 | sql_print_error("Path '%s' is a directory name, please specify \ |
| 175 | a file name for --relay-log option" , opt_relay_logname); |
| 176 | DBUG_RETURN(1); |
| 177 | } |
| 178 | |
| 179 | /* Reports an error and returns, if the --relay-log-index's path |
| 180 | is a directory.*/ |
| 181 | if (opt_relaylog_index_name && |
| 182 | opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] |
| 183 | == FN_LIBCHAR) |
| 184 | { |
| 185 | mysql_mutex_unlock(&data_lock); |
| 186 | sql_print_error("Path '%s' is a directory name, please specify \ |
| 187 | a file name for --relay-log-index option" , opt_relaylog_index_name); |
| 188 | DBUG_RETURN(1); |
| 189 | } |
| 190 | |
| 191 | char buf[FN_REFLEN]; |
| 192 | const char *ln; |
| 193 | static bool name_warning_sent= 0; |
| 194 | ln= relay_log.generate_name(opt_relay_logname, "-relay-bin" , |
| 195 | 1, buf); |
| 196 | /* We send the warning only at startup, not after every RESET SLAVE */ |
| 197 | if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent && |
| 198 | !opt_bootstrap) |
| 199 | { |
| 200 | /* |
| 201 | User didn't give us info to name the relay log index file. |
| 202 | Picking `hostname`-relay-bin.index like we do, causes replication to |
| 203 | fail if this slave's hostname is changed later. So, we would like to |
| 204 | instead require a name. But as we don't want to break many existing |
| 205 | setups, we only give warning, not error. |
| 206 | */ |
| 207 | sql_print_warning("Neither --relay-log nor --relay-log-index were used;" |
| 208 | " so replication " |
| 209 | "may break when this MySQL server acts as a " |
| 210 | "slave and has his hostname changed!! Please " |
| 211 | "use '--log-basename=#' or '--relay-log=%s' to avoid " |
| 212 | "this problem." , ln); |
| 213 | name_warning_sent= 1; |
| 214 | } |
| 215 | |
| 216 | /* For multimaster, add connection name to relay log filenames */ |
| 217 | char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; |
| 218 | char *buf_relaylog_index_name= opt_relaylog_index_name; |
| 219 | |
| 220 | create_logfile_name_with_suffix(buf_relay_logname, |
| 221 | sizeof(buf_relay_logname), |
| 222 | ln, 1, &mi->cmp_connection_name); |
| 223 | ln= buf_relay_logname; |
| 224 | |
| 225 | if (opt_relaylog_index_name) |
| 226 | { |
| 227 | buf_relaylog_index_name= buf_relaylog_index_name_buff; |
| 228 | create_logfile_name_with_suffix(buf_relaylog_index_name_buff, |
| 229 | sizeof(buf_relaylog_index_name_buff), |
| 230 | opt_relaylog_index_name, 0, |
| 231 | &mi->cmp_connection_name); |
| 232 | } |
| 233 | |
| 234 | /* |
| 235 | note, that if open() fails, we'll still have index file open |
| 236 | but a destructor will take care of that |
| 237 | */ |
| 238 | mysql_mutex_lock(log_lock); |
| 239 | if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || |
| 240 | relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, |
| 241 | (ulong)max_relay_log_size, 1, TRUE)) |
| 242 | { |
| 243 | mysql_mutex_unlock(log_lock); |
| 244 | mysql_mutex_unlock(&data_lock); |
| 245 | sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M" , ln, my_errno); |
| 246 | DBUG_RETURN(1); |
| 247 | } |
| 248 | mysql_mutex_unlock(log_lock); |
| 249 | } |
| 250 | |
| 251 | /* if file does not exist */ |
| 252 | if (access(fname,F_OK)) |
| 253 | { |
| 254 | /* |
| 255 | If someone removed the file from underneath our feet, just close |
| 256 | the old descriptor and re-create the old file |
| 257 | */ |
| 258 | if (info_fd >= 0) |
| 259 | mysql_file_close(info_fd, MYF(MY_WME)); |
| 260 | if ((info_fd= mysql_file_open(key_file_relay_log_info, |
| 261 | fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0) |
| 262 | { |
| 263 | sql_print_error("Failed to create a new relay log info file (" |
| 264 | "file '%s', errno %d)" , fname, my_errno); |
| 265 | msg= current_thd->get_stmt_da()->message(); |
| 266 | goto err; |
| 267 | } |
| 268 | if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, |
| 269 | MYF(MY_WME))) |
| 270 | { |
| 271 | sql_print_error("Failed to create a cache on relay log info file '%s'" , |
| 272 | fname); |
| 273 | msg= current_thd->get_stmt_da()->message(); |
| 274 | goto err; |
| 275 | } |
| 276 | |
| 277 | /* Init relay log with first entry in the relay index file */ |
| 278 | if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, |
| 279 | &msg, 0)) |
| 280 | { |
| 281 | sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)" ); |
| 282 | goto err; |
| 283 | } |
| 284 | group_master_log_name[0]= 0; |
| 285 | group_master_log_pos= 0; |
| 286 | } |
| 287 | else // file exists |
| 288 | { |
| 289 | if (info_fd >= 0) |
| 290 | reinit_io_cache(&info_file, READ_CACHE, 0L,0,0); |
| 291 | else |
| 292 | { |
| 293 | int error=0; |
| 294 | if ((info_fd= mysql_file_open(key_file_relay_log_info, |
| 295 | fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0) |
| 296 | { |
| 297 | sql_print_error("\ |
| 298 | Failed to open the existing relay log info file '%s' (errno %d)" , |
| 299 | fname, my_errno); |
| 300 | error= 1; |
| 301 | } |
| 302 | else if (init_io_cache(&info_file, info_fd, |
| 303 | IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) |
| 304 | { |
| 305 | sql_print_error("Failed to create a cache on relay log info file '%s'" , |
| 306 | fname); |
| 307 | error= 1; |
| 308 | } |
| 309 | if (unlikely(error)) |
| 310 | { |
| 311 | if (info_fd >= 0) |
| 312 | mysql_file_close(info_fd, MYF(0)); |
| 313 | info_fd= -1; |
| 314 | mysql_mutex_lock(log_lock); |
| 315 | relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
| 316 | mysql_mutex_unlock(log_lock); |
| 317 | mysql_mutex_unlock(&data_lock); |
| 318 | DBUG_RETURN(1); |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | int relay_log_pos, master_log_pos, lines; |
| 323 | char *first_non_digit; |
| 324 | |
| 325 | /* |
| 326 | Starting from MySQL 5.6.x, relay-log.info has a new format. |
| 327 | Now, its first line contains the number of lines in the file. |
| 328 | By reading this number we can determine which version our master.info |
| 329 | comes from. We can't simply count the lines in the file, since |
| 330 | versions before 5.6.x could generate files with more lines than |
| 331 | needed. If first line doesn't contain a number, or if it |
| 332 | contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY, |
| 333 | then the file is treated like a file from pre-5.6.x version. |
| 334 | There is no ambiguity when reading an old master.info: before |
| 335 | 5.6.x, the first line contained the binlog's name, which is |
| 336 | either empty or has an extension (contains a '.'), so can't be |
| 337 | confused with an integer. |
| 338 | |
| 339 | So we're just reading first line and trying to figure which |
| 340 | version is this. |
| 341 | */ |
| 342 | |
| 343 | /* |
| 344 | The first row is temporarily stored in mi->master_log_name, if |
| 345 | it is line count and not binlog name (new format) it will be |
| 346 | overwritten by the second row later. |
| 347 | */ |
| 348 | if (init_strvar_from_file(group_relay_log_name, |
| 349 | sizeof(group_relay_log_name), |
| 350 | &info_file, "" )) |
| 351 | { |
| 352 | msg="Error reading slave log configuration" ; |
| 353 | goto err; |
| 354 | } |
| 355 | |
| 356 | lines= strtoul(group_relay_log_name, &first_non_digit, 10); |
| 357 | |
| 358 | if (group_relay_log_name[0] != '\0' && |
| 359 | *first_non_digit == '\0' && |
| 360 | lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) |
| 361 | { |
| 362 | DBUG_PRINT("info" , ("relay_log_info file is in new format." )); |
| 363 | /* Seems to be new format => read relay log name from next line */ |
| 364 | if (init_strvar_from_file(group_relay_log_name, |
| 365 | sizeof(group_relay_log_name), |
| 366 | &info_file, "" )) |
| 367 | { |
| 368 | msg="Error reading slave log configuration" ; |
| 369 | goto err; |
| 370 | } |
| 371 | } |
| 372 | else |
| 373 | DBUG_PRINT("info" , ("relay_log_info file is in old format." )); |
| 374 | |
| 375 | if (init_intvar_from_file(&relay_log_pos, |
| 376 | &info_file, BIN_LOG_HEADER_SIZE) || |
| 377 | init_strvar_from_file(group_master_log_name, |
| 378 | sizeof(group_master_log_name), |
| 379 | &info_file, "" ) || |
| 380 | init_intvar_from_file(&master_log_pos, &info_file, 0) || |
| 381 | (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && |
| 382 | init_intvar_from_file(&sql_delay, &info_file, 0))) |
| 383 | { |
| 384 | msg="Error reading slave log configuration" ; |
| 385 | goto err; |
| 386 | } |
| 387 | |
| 388 | strmake_buf(event_relay_log_name,group_relay_log_name); |
| 389 | group_relay_log_pos= event_relay_log_pos= relay_log_pos; |
| 390 | group_master_log_pos= master_log_pos; |
| 391 | |
| 392 | if (is_relay_log_recovery && init_recovery(mi, &msg)) |
| 393 | goto err; |
| 394 | |
| 395 | relay_log_state.load(rpl_global_gtid_slave_state); |
| 396 | if (init_relay_log_pos(this, |
| 397 | group_relay_log_name, |
| 398 | group_relay_log_pos, |
| 399 | 0 /* no data lock*/, |
| 400 | &msg, 0)) |
| 401 | { |
| 402 | sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)" , |
| 403 | group_relay_log_name, group_relay_log_pos); |
| 404 | goto err; |
| 405 | } |
| 406 | } |
| 407 | |
| 408 | DBUG_PRINT("info" , ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu" , |
| 409 | my_b_tell(cur_log), event_relay_log_pos)); |
| 410 | DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE); |
| 411 | DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos); |
| 412 | |
| 413 | /* |
| 414 | Now change the cache from READ to WRITE - must do this |
| 415 | before Relay_log_info::flush() |
| 416 | */ |
| 417 | reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1); |
| 418 | if (unlikely((error= flush()))) |
| 419 | { |
| 420 | msg= "Failed to flush relay log info file" ; |
| 421 | goto err; |
| 422 | } |
| 423 | if (count_relay_log_space(this)) |
| 424 | { |
| 425 | msg="Error counting relay log space" ; |
| 426 | goto err; |
| 427 | } |
| 428 | inited= 1; |
| 429 | error_on_rli_init_info= false; |
| 430 | mysql_mutex_unlock(&data_lock); |
| 431 | DBUG_RETURN(0); |
| 432 | |
| 433 | err: |
| 434 | error_on_rli_init_info= true; |
| 435 | if (msg) |
| 436 | sql_print_error("%s" , msg); |
| 437 | end_io_cache(&info_file); |
| 438 | if (info_fd >= 0) |
| 439 | mysql_file_close(info_fd, MYF(0)); |
| 440 | info_fd= -1; |
| 441 | mysql_mutex_lock(log_lock); |
| 442 | relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
| 443 | mysql_mutex_unlock(log_lock); |
| 444 | mysql_mutex_unlock(&data_lock); |
| 445 | DBUG_RETURN(1); |
| 446 | } |
| 447 | |
| 448 | |
| 449 | static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo) |
| 450 | { |
| 451 | MY_STAT s; |
| 452 | DBUG_ENTER("add_relay_log" ); |
| 453 | if (!mysql_file_stat(key_file_relaylog, |
| 454 | linfo->log_file_name, &s, MYF(0))) |
| 455 | { |
| 456 | sql_print_error("log %s listed in the index, but failed to stat" , |
| 457 | linfo->log_file_name); |
| 458 | DBUG_RETURN(1); |
| 459 | } |
| 460 | rli->log_space_total += s.st_size; |
| 461 | DBUG_PRINT("info" ,("log_space_total: %llu" , rli->log_space_total)); |
| 462 | DBUG_RETURN(0); |
| 463 | } |
| 464 | |
| 465 | |
| 466 | static int count_relay_log_space(Relay_log_info* rli) |
| 467 | { |
| 468 | LOG_INFO linfo; |
| 469 | DBUG_ENTER("count_relay_log_space" ); |
| 470 | rli->log_space_total= 0; |
| 471 | if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) |
| 472 | { |
| 473 | sql_print_error("Could not find first log while counting relay log space" ); |
| 474 | DBUG_RETURN(1); |
| 475 | } |
| 476 | do |
| 477 | { |
| 478 | if (add_relay_log(rli,&linfo)) |
| 479 | DBUG_RETURN(1); |
| 480 | } while (!rli->relay_log.find_next_log(&linfo, 1)); |
| 481 | /* |
| 482 | As we have counted everything, including what may have written in a |
| 483 | preceding write, we must reset bytes_written, or we may count some space |
| 484 | twice. |
| 485 | */ |
| 486 | rli->relay_log.reset_bytes_written(); |
| 487 | DBUG_RETURN(0); |
| 488 | } |
| 489 | |
| 490 | |
| 491 | /* |
| 492 | Reset UNTIL condition for Relay_log_info |
| 493 | |
| 494 | SYNOPSYS |
| 495 | clear_until_condition() |
| 496 | rli - Relay_log_info structure where UNTIL condition should be reset |
| 497 | */ |
| 498 | |
| 499 | void Relay_log_info::clear_until_condition() |
| 500 | { |
| 501 | DBUG_ENTER("clear_until_condition" ); |
| 502 | |
| 503 | until_condition= Relay_log_info::UNTIL_NONE; |
| 504 | until_log_name[0]= 0; |
| 505 | until_log_pos= 0; |
| 506 | DBUG_VOID_RETURN; |
| 507 | } |
| 508 | |
| 509 | |
| 510 | /* |
| 511 | Read the correct format description event for starting to replicate from |
| 512 | a given position in a relay log file. |
| 513 | */ |
| 514 | Format_description_log_event * |
| 515 | read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, |
| 516 | const char **errmsg) |
| 517 | { |
| 518 | Log_event *ev; |
| 519 | Format_description_log_event *fdev; |
| 520 | bool found= false; |
| 521 | |
| 522 | /* |
| 523 | By default the relay log is in binlog format 3 (4.0). |
| 524 | Even if format is 4, this will work enough to read the first event |
| 525 | (Format_desc) (remember that format 4 is just lenghtened compared to format |
| 526 | 3; format 3 is a prefix of format 4). |
| 527 | */ |
| 528 | fdev= new Format_description_log_event(3); |
| 529 | |
| 530 | while (!found) |
| 531 | { |
| 532 | Log_event_type typ; |
| 533 | |
| 534 | /* |
| 535 | Read the possible Format_description_log_event; if position |
| 536 | was 4, no need, it will be read naturally. |
| 537 | */ |
| 538 | DBUG_PRINT("info" ,("looking for a Format_description_log_event" )); |
| 539 | |
| 540 | if (my_b_tell(cur_log) >= start_pos) |
| 541 | break; |
| 542 | |
| 543 | if (!(ev= Log_event::read_log_event(cur_log, fdev, |
| 544 | opt_slave_sql_verify_checksum))) |
| 545 | { |
| 546 | DBUG_PRINT("info" ,("could not read event, cur_log->error=%d" , |
| 547 | cur_log->error)); |
| 548 | if (cur_log->error) /* not EOF */ |
| 549 | { |
| 550 | *errmsg= "I/O error reading event at position 4" ; |
| 551 | delete fdev; |
| 552 | return NULL; |
| 553 | } |
| 554 | break; |
| 555 | } |
| 556 | typ= ev->get_type_code(); |
| 557 | if (typ == FORMAT_DESCRIPTION_EVENT) |
| 558 | { |
| 559 | Format_description_log_event *old= fdev; |
| 560 | DBUG_PRINT("info" ,("found Format_description_log_event" )); |
| 561 | fdev= (Format_description_log_event*) ev; |
| 562 | fdev->copy_crypto_data(old); |
| 563 | delete old; |
| 564 | |
| 565 | /* |
| 566 | As ev was returned by read_log_event, it has passed is_valid(), so |
| 567 | my_malloc() in ctor worked, no need to check again. |
| 568 | */ |
| 569 | /* |
| 570 | Ok, we found a Format_description event. But it is not sure that this |
| 571 | describes the whole relay log; indeed, one can have this sequence |
| 572 | (starting from position 4): |
| 573 | Format_desc (of slave) |
| 574 | Rotate (of master) |
| 575 | Format_desc (of master) |
| 576 | So the Format_desc which really describes the rest of the relay log |
| 577 | is the 3rd event (it can't be further than that, because we rotate |
| 578 | the relay log when we queue a Rotate event from the master). |
| 579 | But what describes the Rotate is the first Format_desc. |
| 580 | So what we do is: |
| 581 | go on searching for Format_description events, until you exceed the |
| 582 | position (argument 'pos') or until you find another event than Rotate |
| 583 | or Format_desc. |
| 584 | */ |
| 585 | } |
| 586 | else if (typ == START_ENCRYPTION_EVENT) |
| 587 | { |
| 588 | if (fdev->start_decryption((Start_encryption_log_event*) ev)) |
| 589 | { |
| 590 | *errmsg= "Unable to set up decryption of binlog." ; |
| 591 | delete ev; |
| 592 | delete fdev; |
| 593 | return NULL; |
| 594 | } |
| 595 | delete ev; |
| 596 | } |
| 597 | else |
| 598 | { |
| 599 | DBUG_PRINT("info" ,("found event of another type=%d" , typ)); |
| 600 | found= (typ != ROTATE_EVENT); |
| 601 | delete ev; |
| 602 | } |
| 603 | } |
| 604 | return fdev; |
| 605 | } |
| 606 | |
| 607 | |
| 608 | /* |
| 609 | Open the given relay log |
| 610 | |
| 611 | SYNOPSIS |
| 612 | init_relay_log_pos() |
| 613 | rli Relay information (will be initialized) |
| 614 | log Name of relay log file to read from. NULL = First log |
| 615 | pos Position in relay log file |
| 616 | need_data_lock Set to 1 if this functions should do mutex locks |
| 617 | errmsg Store pointer to error message here |
| 618 | look_for_description_event |
| 619 | 1 if we should look for such an event. We only need |
| 620 | this when the SQL thread starts and opens an existing |
| 621 | relay log and has to execute it (possibly from an |
| 622 | offset >4); then we need to read the first event of |
| 623 | the relay log to be able to parse the events we have |
| 624 | to execute. |
| 625 | |
| 626 | DESCRIPTION |
| 627 | - Close old open relay log files. |
| 628 | - If we are using the same relay log as the running IO-thread, then set |
| 629 | rli->cur_log to point to the same IO_CACHE entry. |
| 630 | - If not, open the 'log' binary file. |
| 631 | |
| 632 | TODO |
| 633 | - check proper initialization of group_master_log_name/group_master_log_pos |
| 634 | |
| 635 | RETURN VALUES |
| 636 | 0 ok |
| 637 | 1 error. errmsg is set to point to the error message |
| 638 | */ |
| 639 | |
| 640 | int init_relay_log_pos(Relay_log_info* rli,const char* log, |
| 641 | ulonglong pos, bool need_data_lock, |
| 642 | const char** errmsg, |
| 643 | bool look_for_description_event) |
| 644 | { |
| 645 | DBUG_ENTER("init_relay_log_pos" ); |
| 646 | DBUG_PRINT("info" , ("pos: %lu" , (ulong) pos)); |
| 647 | |
| 648 | *errmsg=0; |
| 649 | mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); |
| 650 | |
| 651 | if (need_data_lock) |
| 652 | mysql_mutex_lock(&rli->data_lock); |
| 653 | |
| 654 | /* |
| 655 | Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER |
| 656 | is, too, and init_slave() too; these 2 functions allocate a description |
| 657 | event in init_relay_log_pos, which is not freed by the terminating SQL slave |
| 658 | thread as that thread is not started by these functions. So we have to free |
| 659 | the description_event here, in case, so that there is no memory leak in |
| 660 | running, say, CHANGE MASTER. |
| 661 | */ |
| 662 | delete rli->relay_log.description_event_for_exec; |
| 663 | /* |
| 664 | By default the relay log is in binlog format 3 (4.0). |
| 665 | Even if format is 4, this will work enough to read the first event |
| 666 | (Format_desc) (remember that format 4 is just lenghtened compared to format |
| 667 | 3; format 3 is a prefix of format 4). |
| 668 | */ |
| 669 | rli->relay_log.description_event_for_exec= new |
| 670 | Format_description_log_event(3); |
| 671 | |
| 672 | mysql_mutex_lock(log_lock); |
| 673 | |
| 674 | /* Close log file and free buffers if it's already open */ |
| 675 | if (rli->cur_log_fd >= 0) |
| 676 | { |
| 677 | end_io_cache(&rli->cache_buf); |
| 678 | mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); |
| 679 | rli->cur_log_fd = -1; |
| 680 | } |
| 681 | |
| 682 | rli->group_relay_log_pos = rli->event_relay_log_pos = pos; |
| 683 | rli->clear_flag(Relay_log_info::IN_STMT); |
| 684 | rli->clear_flag(Relay_log_info::IN_TRANSACTION); |
| 685 | |
| 686 | /* |
| 687 | Test to see if the previous run was with the skip of purging |
| 688 | If yes, we do not purge when we restart |
| 689 | */ |
| 690 | if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1)) |
| 691 | { |
| 692 | *errmsg="Could not find first log during relay log initialization" ; |
| 693 | goto err; |
| 694 | } |
| 695 | |
| 696 | if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1)) |
| 697 | { |
| 698 | *errmsg="Could not find target log during relay log initialization" ; |
| 699 | goto err; |
| 700 | } |
| 701 | strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name); |
| 702 | strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); |
| 703 | if (rli->relay_log.is_active(rli->linfo.log_file_name)) |
| 704 | { |
| 705 | /* |
| 706 | The IO thread is using this log file. |
| 707 | In this case, we will use the same IO_CACHE pointer to |
| 708 | read data as the IO thread is using to write data. |
| 709 | */ |
| 710 | my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0); |
| 711 | if (check_binlog_magic(rli->cur_log,errmsg)) |
| 712 | goto err; |
| 713 | rli->cur_log_old_open_count=rli->relay_log.get_open_count(); |
| 714 | } |
| 715 | else |
| 716 | { |
| 717 | /* |
| 718 | Open the relay log and set rli->cur_log to point at this one |
| 719 | */ |
| 720 | if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, |
| 721 | rli->linfo.log_file_name,errmsg)) < 0) |
| 722 | goto err; |
| 723 | rli->cur_log = &rli->cache_buf; |
| 724 | } |
| 725 | /* |
| 726 | In all cases, check_binlog_magic() has been called so we're at offset 4 for |
| 727 | sure. |
| 728 | */ |
| 729 | if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ |
| 730 | { |
| 731 | if (look_for_description_event) |
| 732 | { |
| 733 | Format_description_log_event *fdev; |
| 734 | if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) |
| 735 | goto err; |
| 736 | delete rli->relay_log.description_event_for_exec; |
| 737 | rli->relay_log.description_event_for_exec= fdev; |
| 738 | } |
| 739 | my_b_seek(rli->cur_log,(off_t)pos); |
| 740 | DBUG_PRINT("info" , ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu" , |
| 741 | my_b_tell(rli->cur_log), rli->event_relay_log_pos)); |
| 742 | |
| 743 | } |
| 744 | |
| 745 | err: |
| 746 | /* |
| 747 | If we don't purge, we can't honour relay_log_space_limit ; |
| 748 | silently discard it |
| 749 | */ |
| 750 | if (!relay_log_purge) |
| 751 | rli->log_space_limit= 0; |
| 752 | mysql_cond_broadcast(&rli->data_cond); |
| 753 | |
| 754 | mysql_mutex_unlock(log_lock); |
| 755 | |
| 756 | if (need_data_lock) |
| 757 | mysql_mutex_unlock(&rli->data_lock); |
| 758 | if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg) |
| 759 | *errmsg= "Invalid Format_description log event; could be out of memory" ; |
| 760 | |
| 761 | DBUG_PRINT("info" , ("Returning %d from init_relay_log_pos" , (*errmsg)?1:0)); |
| 762 | |
| 763 | DBUG_RETURN ((*errmsg) ? 1 : 0); |
| 764 | } |
| 765 | |
| 766 | |
| 767 | /* |
| 768 | Waits until the SQL thread reaches (has executed up to) the |
| 769 | log/position or timed out. |
| 770 | |
| 771 | SYNOPSIS |
| 772 | wait_for_pos() |
| 773 | thd client thread that sent SELECT MASTER_POS_WAIT |
| 774 | log_name log name to wait for |
| 775 | log_pos position to wait for |
| 776 | timeout timeout in seconds before giving up waiting |
| 777 | |
| 778 | NOTES |
| 779 | timeout is longlong whereas it should be ulong ; but this is |
| 780 | to catch if the user submitted a negative timeout. |
| 781 | |
| 782 | RETURN VALUES |
| 783 | -2 improper arguments (log_pos<0) |
| 784 | or slave not running, or master info changed |
| 785 | during the function's execution, |
| 786 | or client thread killed. -2 is translated to NULL by caller |
| 787 | -1 timed out |
| 788 | >=0 number of log events the function had to wait |
| 789 | before reaching the desired log/position |
| 790 | */ |
| 791 | |
| 792 | int Relay_log_info::wait_for_pos(THD* thd, String* log_name, |
| 793 | longlong log_pos, |
| 794 | longlong timeout) |
| 795 | { |
| 796 | int event_count = 0; |
| 797 | ulong init_abort_pos_wait; |
| 798 | int error=0; |
| 799 | struct timespec abstime; // for timeout checking |
| 800 | PSI_stage_info old_stage; |
| 801 | DBUG_ENTER("Relay_log_info::wait_for_pos" ); |
| 802 | |
| 803 | if (!inited) |
| 804 | DBUG_RETURN(-2); |
| 805 | |
| 806 | DBUG_PRINT("enter" ,("log_name: '%s' log_pos: %lu timeout: %lu" , |
| 807 | log_name->c_ptr(), (ulong) log_pos, (ulong) timeout)); |
| 808 | |
| 809 | set_timespec(abstime,timeout); |
| 810 | mysql_mutex_lock(&data_lock); |
| 811 | thd->ENTER_COND(&data_cond, &data_lock, |
| 812 | &stage_waiting_for_the_slave_thread_to_advance_position, |
| 813 | &old_stage); |
| 814 | /* |
| 815 | This function will abort when it notices that some CHANGE MASTER or |
| 816 | RESET MASTER has changed the master info. |
| 817 | To catch this, these commands modify abort_pos_wait ; We just monitor |
| 818 | abort_pos_wait and see if it has changed. |
| 819 | Why do we have this mechanism instead of simply monitoring slave_running |
| 820 | in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that |
| 821 | the SQL thread be stopped? |
| 822 | This is becasue if someones does: |
| 823 | STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE; |
| 824 | the change may happen very quickly and we may not notice that |
| 825 | slave_running briefly switches between 1/0/1. |
| 826 | */ |
| 827 | init_abort_pos_wait= abort_pos_wait; |
| 828 | |
| 829 | /* |
| 830 | We'll need to |
| 831 | handle all possible log names comparisons (e.g. 999 vs 1000). |
| 832 | We use ulong for string->number conversion ; this is no |
| 833 | stronger limitation than in find_uniq_filename in sql/log.cc |
| 834 | */ |
| 835 | ulong log_name_extension; |
| 836 | char log_name_tmp[FN_REFLEN]; //make a char[] from String |
| 837 | |
| 838 | strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1)); |
| 839 | |
| 840 | char *p= fn_ext(log_name_tmp); |
| 841 | char *p_end; |
| 842 | if (!*p || log_pos<0) |
| 843 | { |
| 844 | error= -2; //means improper arguments |
| 845 | goto err; |
| 846 | } |
| 847 | // Convert 0-3 to 4 |
| 848 | log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE); |
| 849 | /* p points to '.' */ |
| 850 | log_name_extension= strtoul(++p, &p_end, 10); |
| 851 | /* |
| 852 | p_end points to the first invalid character. |
| 853 | If it equals to p, no digits were found, error. |
| 854 | If it contains '\0' it means conversion went ok. |
| 855 | */ |
| 856 | if (p_end==p || *p_end) |
| 857 | { |
| 858 | error= -2; |
| 859 | goto err; |
| 860 | } |
| 861 | |
| 862 | /* The "compare and wait" main loop */ |
| 863 | while (!thd->killed && |
| 864 | init_abort_pos_wait == abort_pos_wait && |
| 865 | slave_running) |
| 866 | { |
| 867 | bool pos_reached; |
| 868 | int cmp_result= 0; |
| 869 | |
| 870 | DBUG_PRINT("info" , |
| 871 | ("init_abort_pos_wait: %ld abort_pos_wait: %ld" , |
| 872 | init_abort_pos_wait, abort_pos_wait)); |
| 873 | DBUG_PRINT("info" ,("group_master_log_name: '%s' pos: %lu" , |
| 874 | group_master_log_name, (ulong) group_master_log_pos)); |
| 875 | |
| 876 | /* |
| 877 | group_master_log_name can be "", if we are just after a fresh |
| 878 | replication start or after a CHANGE MASTER TO MASTER_HOST/PORT |
| 879 | (before we have executed one Rotate event from the master) or |
| 880 | (rare) if the user is doing a weird slave setup (see next |
| 881 | paragraph). If group_master_log_name is "", we assume we don't |
| 882 | have enough info to do the comparison yet, so we just wait until |
| 883 | more data. In this case master_log_pos is always 0 except if |
| 884 | somebody (wrongly) sets this slave to be a slave of itself |
| 885 | without using --replicate-same-server-id (an unsupported |
| 886 | configuration which does nothing), then group_master_log_pos |
| 887 | will grow and group_master_log_name will stay "". |
| 888 | */ |
| 889 | if (*group_master_log_name) |
| 890 | { |
| 891 | char *basename= (group_master_log_name + |
| 892 | dirname_length(group_master_log_name)); |
| 893 | /* |
| 894 | First compare the parts before the extension. |
| 895 | Find the dot in the master's log basename, |
| 896 | and protect against user's input error : |
| 897 | if the names do not match up to '.' included, return error |
| 898 | */ |
| 899 | char *q= (char*)(fn_ext(basename)+1); |
| 900 | if (strncmp(basename, log_name_tmp, (int)(q-basename))) |
| 901 | { |
| 902 | error= -2; |
| 903 | break; |
| 904 | } |
| 905 | // Now compare extensions. |
| 906 | char *q_end; |
| 907 | ulong group_master_log_name_extension= strtoul(q, &q_end, 10); |
| 908 | if (group_master_log_name_extension < log_name_extension) |
| 909 | cmp_result= -1 ; |
| 910 | else |
| 911 | cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ; |
| 912 | |
| 913 | pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) || |
| 914 | cmp_result > 0); |
| 915 | if (pos_reached || thd->killed) |
| 916 | break; |
| 917 | } |
| 918 | |
| 919 | //wait for master update, with optional timeout. |
| 920 | |
| 921 | DBUG_PRINT("info" ,("Waiting for master update" )); |
| 922 | /* |
| 923 | We are going to mysql_cond_(timed)wait(); if the SQL thread stops it |
| 924 | will wake us up. |
| 925 | */ |
| 926 | thd_wait_begin(thd, THD_WAIT_BINLOG); |
| 927 | if (timeout > 0) |
| 928 | { |
| 929 | /* |
| 930 | Note that mysql_cond_timedwait checks for the timeout |
| 931 | before for the condition ; i.e. it returns ETIMEDOUT |
| 932 | if the system time equals or exceeds the time specified by abstime |
| 933 | before the condition variable is signaled or broadcast, _or_ if |
| 934 | the absolute time specified by abstime has already passed at the time |
| 935 | of the call. |
| 936 | For that reason, mysql_cond_timedwait will do the "timeoutting" job |
| 937 | even if its condition is always immediately signaled (case of a loaded |
| 938 | master). |
| 939 | */ |
| 940 | error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime); |
| 941 | } |
| 942 | else |
| 943 | mysql_cond_wait(&data_cond, &data_lock); |
| 944 | thd_wait_end(thd); |
| 945 | DBUG_PRINT("info" ,("Got signal of master update or timed out" )); |
| 946 | if (error == ETIMEDOUT || error == ETIME) |
| 947 | { |
| 948 | error= -1; |
| 949 | break; |
| 950 | } |
| 951 | error=0; |
| 952 | event_count++; |
| 953 | DBUG_PRINT("info" ,("Testing if killed or SQL thread not running" )); |
| 954 | } |
| 955 | |
| 956 | err: |
| 957 | thd->EXIT_COND(&old_stage); |
| 958 | DBUG_PRINT("exit" ,("killed: %d abort: %d slave_running: %d \ |
| 959 | improper_arguments: %d timed_out: %d" , |
| 960 | thd->killed_errno(), |
| 961 | (int) (init_abort_pos_wait != abort_pos_wait), |
| 962 | (int) slave_running, |
| 963 | (int) (error == -2), |
| 964 | (int) (error == -1))); |
| 965 | if (thd->killed || init_abort_pos_wait != abort_pos_wait || |
| 966 | !slave_running) |
| 967 | { |
| 968 | error= -2; |
| 969 | } |
| 970 | DBUG_RETURN( error ? error : event_count ); |
| 971 | } |
| 972 | |
| 973 | |
| 974 | void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, |
| 975 | rpl_group_info *rgi, |
| 976 | bool skip_lock) |
| 977 | { |
| 978 | DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos" ); |
| 979 | |
| 980 | if (skip_lock) |
| 981 | mysql_mutex_assert_owner(&data_lock); |
| 982 | else |
| 983 | mysql_mutex_lock(&data_lock); |
| 984 | |
| 985 | rgi->inc_event_relay_log_pos(); |
| 986 | DBUG_PRINT("info" , ("log_pos: %lu group_master_log_pos: %lu" , |
| 987 | (long) log_pos, (long) group_master_log_pos)); |
| 988 | if (rgi->is_parallel_exec) |
| 989 | { |
| 990 | /* In case of parallel replication, do not update the position backwards. */ |
| 991 | int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name); |
| 992 | if (cmp < 0) |
| 993 | { |
| 994 | group_relay_log_pos= rgi->future_event_relay_log_pos; |
| 995 | strmake_buf(group_relay_log_name, rgi->event_relay_log_name); |
| 996 | notify_group_relay_log_name_update(); |
| 997 | } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) |
| 998 | group_relay_log_pos= rgi->future_event_relay_log_pos; |
| 999 | |
| 1000 | /* |
| 1001 | In the parallel case we need to update the master_log_name here, rather |
| 1002 | than in Rotate_log_event::do_update_pos(). |
| 1003 | */ |
| 1004 | cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); |
| 1005 | if (cmp <= 0) |
| 1006 | { |
| 1007 | if (cmp < 0) |
| 1008 | { |
| 1009 | strcpy(group_master_log_name, rgi->future_event_master_log_name); |
| 1010 | group_master_log_pos= log_pos; |
| 1011 | } |
| 1012 | else if (group_master_log_pos < log_pos) |
| 1013 | group_master_log_pos= log_pos; |
| 1014 | } |
| 1015 | |
| 1016 | /* |
| 1017 | In the parallel case, we only update the Seconds_Behind_Master at the |
| 1018 | end of a transaction. In the non-parallel case, the value is updated as |
| 1019 | soon as an event is read from the relay log; however this would be too |
| 1020 | confusing for the user, seeing the slave reported as up-to-date when |
| 1021 | potentially thousands of events are still queued up for worker threads |
| 1022 | waiting for execution. |
| 1023 | */ |
| 1024 | if (rgi->last_master_timestamp && |
| 1025 | rgi->last_master_timestamp > last_master_timestamp) |
| 1026 | last_master_timestamp= rgi->last_master_timestamp; |
| 1027 | } |
| 1028 | else |
| 1029 | { |
| 1030 | /* Non-parallel case. */ |
| 1031 | group_relay_log_pos= event_relay_log_pos; |
| 1032 | strmake_buf(group_relay_log_name, event_relay_log_name); |
| 1033 | notify_group_relay_log_name_update(); |
| 1034 | if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event |
| 1035 | group_master_log_pos= log_pos; |
| 1036 | } |
| 1037 | |
| 1038 | /* |
| 1039 | If the slave does not support transactions and replicates a transaction, |
| 1040 | users should not trust group_master_log_pos (which they can display with |
| 1041 | SHOW SLAVE STATUS or read from relay-log.info), because to compute |
| 1042 | group_master_log_pos the slave relies on log_pos stored in the master's |
| 1043 | binlog, but if we are in a master's transaction these positions are always |
| 1044 | the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does |
| 1045 | not advance as it should on the non-transactional slave (it advances by |
| 1046 | big leaps, whereas it should advance by small leaps). |
| 1047 | */ |
| 1048 | /* |
| 1049 | In 4.x we used the event's len to compute the positions here. This is |
| 1050 | wrong if the event was 3.23/4.0 and has been converted to 5.0, because |
| 1051 | then the event's len is not what is was in the master's binlog, so this |
| 1052 | will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0 |
| 1053 | replication: Exec_master_log_pos is wrong). Only way to solve this is to |
| 1054 | have the original offset of the end of the event the relay log. This is |
| 1055 | what we do in 5.0: log_pos has become "end_log_pos" (because the real use |
| 1056 | of log_pos in 4.0 was to compute the end_log_pos; so better to store |
| 1057 | end_log_pos instead of begin_log_pos. |
| 1058 | If we had not done this fix here, the problem would also have appeared |
| 1059 | when the slave and master are 5.0 but with different event length (for |
| 1060 | example the slave is more recent than the master and features the event |
| 1061 | UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in |
| 1062 | SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this |
| 1063 | value which would lead to badly broken replication. |
| 1064 | Even the relay_log_pos will be corrupted in this case, because the len is |
| 1065 | the relay log is not "val". |
| 1066 | With the end_log_pos solution, we avoid computations involving lengthes. |
| 1067 | */ |
| 1068 | mysql_cond_broadcast(&data_cond); |
| 1069 | if (!skip_lock) |
| 1070 | mysql_mutex_unlock(&data_lock); |
| 1071 | DBUG_VOID_RETURN; |
| 1072 | } |
| 1073 | |
| 1074 | |
| 1075 | void Relay_log_info::close_temporary_tables() |
| 1076 | { |
| 1077 | DBUG_ENTER("Relay_log_info::close_temporary_tables" ); |
| 1078 | |
| 1079 | TMP_TABLE_SHARE *share; |
| 1080 | TABLE *table; |
| 1081 | |
| 1082 | if (!save_temporary_tables) |
| 1083 | { |
| 1084 | /* There are no temporary tables. */ |
| 1085 | DBUG_VOID_RETURN; |
| 1086 | } |
| 1087 | |
| 1088 | while ((share= save_temporary_tables->pop_front())) |
| 1089 | { |
| 1090 | /* |
| 1091 | Iterate over the list of tables for this TABLE_SHARE and close them. |
| 1092 | */ |
| 1093 | while ((table= share->all_tmp_tables.pop_front())) |
| 1094 | { |
| 1095 | DBUG_PRINT("tmptable" , ("closing table: '%s'.'%s'" , |
| 1096 | table->s->db.str, table->s->table_name.str)); |
| 1097 | |
| 1098 | /* Reset in_use as the table may have been created by another thd */ |
| 1099 | table->in_use= 0; |
| 1100 | /* |
| 1101 | Lets not free TABLE_SHARE here as there could be multiple TABLEs opened |
| 1102 | for the same table (TABLE_SHARE). |
| 1103 | */ |
| 1104 | closefrm(table); |
| 1105 | my_free(table); |
| 1106 | } |
| 1107 | |
| 1108 | /* |
| 1109 | Don't ask for disk deletion. For now, anyway they will be deleted when |
| 1110 | slave restarts, but it is a better intention to not delete them. |
| 1111 | */ |
| 1112 | |
| 1113 | free_table_share(share); |
| 1114 | my_free(share); |
| 1115 | } |
| 1116 | |
| 1117 | /* By now, there mustn't be any elements left in the list. */ |
| 1118 | DBUG_ASSERT(save_temporary_tables->is_empty()); |
| 1119 | |
| 1120 | my_free(save_temporary_tables); |
| 1121 | save_temporary_tables= NULL; |
| 1122 | slave_open_temp_tables= 0; |
| 1123 | |
| 1124 | DBUG_VOID_RETURN; |
| 1125 | } |
| 1126 | |
| 1127 | /* |
| 1128 | purge_relay_logs() |
| 1129 | |
| 1130 | @param rli Relay log information |
| 1131 | @param thd thread id. May be zero during startup |
| 1132 | |
| 1133 | NOTES |
| 1134 | Assumes to have a run lock on rli and that no slave thread are running. |
| 1135 | */ |
| 1136 | |
| 1137 | int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, |
| 1138 | const char** errmsg) |
| 1139 | { |
| 1140 | int error=0; |
| 1141 | const char *ln; |
| 1142 | char name_buf[FN_REFLEN]; |
| 1143 | DBUG_ENTER("purge_relay_logs" ); |
| 1144 | |
| 1145 | /* |
| 1146 | Even if rli->inited==0, we still try to empty rli->master_log_* variables. |
| 1147 | Indeed, rli->inited==0 does not imply that they already are empty. |
| 1148 | It could be that slave's info initialization partly succeeded : |
| 1149 | for example if relay-log.info existed but *relay-bin*.* |
| 1150 | have been manually removed, Relay_log_info::init() reads the old |
| 1151 | relay-log.info and fills rli->master_log_*, then Relay_log_info::init() |
| 1152 | checks for the existence of the relay log, this fails and |
| 1153 | Relay_log_info::init() leaves rli->inited to 0. |
| 1154 | In that pathological case, rli->master_log_pos* will be properly reinited |
| 1155 | at the next START SLAVE (as RESET SLAVE or CHANGE |
| 1156 | MASTER, the callers of purge_relay_logs, will delete bogus *.info files |
| 1157 | or replace them with correct files), however if the user does SHOW SLAVE |
| 1158 | STATUS before START SLAVE, he will see old, confusing rli->master_log_*. |
| 1159 | In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS |
| 1160 | to display fine in any case. |
| 1161 | */ |
| 1162 | |
| 1163 | rli->group_master_log_name[0]= 0; |
| 1164 | rli->group_master_log_pos= 0; |
| 1165 | |
| 1166 | if (!rli->inited) |
| 1167 | { |
| 1168 | DBUG_PRINT("info" , ("rli->inited == 0" )); |
| 1169 | if (rli->error_on_rli_init_info) |
| 1170 | { |
| 1171 | ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin" , |
| 1172 | 1, name_buf); |
| 1173 | |
| 1174 | if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE)) |
| 1175 | { |
| 1176 | sql_print_error("Unable to purge relay log files. Failed to open relay " |
| 1177 | "log index file:%s." , rli->relay_log.get_index_fname()); |
| 1178 | DBUG_RETURN(1); |
| 1179 | } |
| 1180 | mysql_mutex_lock(rli->relay_log.get_log_lock()); |
| 1181 | if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, |
| 1182 | (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size : |
| 1183 | max_binlog_size), 1, TRUE)) |
| 1184 | { |
| 1185 | sql_print_error("Unable to purge relay log files. Failed to open relay " |
| 1186 | "log file:%s." , rli->relay_log.get_log_fname()); |
| 1187 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
| 1188 | DBUG_RETURN(1); |
| 1189 | } |
| 1190 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
| 1191 | } |
| 1192 | else |
| 1193 | DBUG_RETURN(0); |
| 1194 | } |
| 1195 | else |
| 1196 | { |
| 1197 | DBUG_ASSERT(rli->slave_running == 0); |
| 1198 | DBUG_ASSERT(rli->mi->slave_running == 0); |
| 1199 | } |
| 1200 | mysql_mutex_lock(&rli->data_lock); |
| 1201 | |
| 1202 | /* |
| 1203 | we close the relay log fd possibly left open by the slave SQL thread, |
| 1204 | to be able to delete it; the relay log fd possibly left open by the slave |
| 1205 | I/O thread will be closed naturally in reset_logs() by the |
| 1206 | close(LOG_CLOSE_TO_BE_OPENED) call |
| 1207 | */ |
| 1208 | if (rli->cur_log_fd >= 0) |
| 1209 | { |
| 1210 | end_io_cache(&rli->cache_buf); |
| 1211 | mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); |
| 1212 | rli->cur_log_fd= -1; |
| 1213 | } |
| 1214 | |
| 1215 | if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0)) |
| 1216 | { |
| 1217 | *errmsg = "Failed during log reset" ; |
| 1218 | error=1; |
| 1219 | goto err; |
| 1220 | } |
| 1221 | rli->relay_log_state.load(rpl_global_gtid_slave_state); |
| 1222 | if (!just_reset) |
| 1223 | { |
| 1224 | /* Save name of used relay log file */ |
| 1225 | strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); |
| 1226 | strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); |
| 1227 | rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; |
| 1228 | rli->log_space_total= 0; |
| 1229 | |
| 1230 | if (count_relay_log_space(rli)) |
| 1231 | { |
| 1232 | *errmsg= "Error counting relay log space" ; |
| 1233 | error=1; |
| 1234 | goto err; |
| 1235 | } |
| 1236 | error= init_relay_log_pos(rli, rli->group_relay_log_name, |
| 1237 | rli->group_relay_log_pos, |
| 1238 | 0 /* do not need data lock */, errmsg, 0); |
| 1239 | } |
| 1240 | else |
| 1241 | { |
| 1242 | /* Ensure relay log names are not used */ |
| 1243 | rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0; |
| 1244 | } |
| 1245 | |
| 1246 | if (!rli->inited && rli->error_on_rli_init_info) |
| 1247 | { |
| 1248 | mysql_mutex_lock(rli->relay_log.get_log_lock()); |
| 1249 | rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
| 1250 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
| 1251 | } |
| 1252 | err: |
| 1253 | DBUG_PRINT("info" ,("log_space_total: %llu" ,rli->log_space_total)); |
| 1254 | mysql_mutex_unlock(&rli->data_lock); |
| 1255 | DBUG_RETURN(error); |
| 1256 | } |
| 1257 | |
| 1258 | |
| 1259 | /* |
| 1260 | Check if condition stated in UNTIL clause of START SLAVE is reached. |
| 1261 | SYNOPSYS |
| 1262 | Relay_log_info::is_until_satisfied() |
| 1263 | master_beg_pos position of the beginning of to be executed event |
| 1264 | (not log_pos member of the event that points to the |
| 1265 | beginning of the following event) |
| 1266 | |
| 1267 | |
| 1268 | DESCRIPTION |
| 1269 | Checks if UNTIL condition is reached. Uses caching result of last |
| 1270 | comparison of current log file name and target log file name. So cached |
| 1271 | value should be invalidated if current log file name changes |
| 1272 | (see Relay_log_info::notify_... functions). |
| 1273 | |
| 1274 | This caching is needed to avoid of expensive string comparisons and |
| 1275 | strtol() conversions needed for log names comparison. We don't need to |
| 1276 | compare them each time this function is called, we only need to do this |
| 1277 | when current log name changes. If we have UNTIL_MASTER_POS condition we |
| 1278 | need to do this only after Rotate_log_event::do_apply_event() (which is |
| 1279 | rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS |
| 1280 | condition then we should invalidate cached comarison value after |
| 1281 | inc_group_relay_log_pos() which called for each group of events (so we |
| 1282 | have some benefit if we have something like queries that use |
| 1283 | autoincrement or if we have transactions). |
| 1284 | |
| 1285 | Should be called ONLY if until_condition != UNTIL_NONE ! |
| 1286 | RETURN VALUE |
| 1287 | true - condition met or error happened (condition seems to have |
| 1288 | bad log file name) |
| 1289 | false - condition not met |
| 1290 | */ |
| 1291 | |
| 1292 | bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos) |
| 1293 | { |
| 1294 | const char *log_name; |
| 1295 | ulonglong log_pos; |
| 1296 | DBUG_ENTER("Relay_log_info::is_until_satisfied" ); |
| 1297 | |
| 1298 | if (until_condition == UNTIL_MASTER_POS) |
| 1299 | { |
| 1300 | log_name= (mi->using_parallel() ? future_event_master_log_name |
| 1301 | : group_master_log_name); |
| 1302 | log_pos= master_beg_pos; |
| 1303 | } |
| 1304 | else |
| 1305 | { |
| 1306 | DBUG_ASSERT(until_condition == UNTIL_RELAY_POS); |
| 1307 | log_name= group_relay_log_name; |
| 1308 | log_pos= group_relay_log_pos; |
| 1309 | } |
| 1310 | |
| 1311 | DBUG_PRINT("info" , ("group_master_log_name='%s', group_master_log_pos=%llu" , |
| 1312 | group_master_log_name, group_master_log_pos)); |
| 1313 | DBUG_PRINT("info" , ("group_relay_log_name='%s', group_relay_log_pos=%llu" , |
| 1314 | group_relay_log_name, group_relay_log_pos)); |
| 1315 | DBUG_PRINT("info" , ("(%s) log_name='%s', log_pos=%llu" , |
| 1316 | until_condition == UNTIL_MASTER_POS ? "master" : "relay" , |
| 1317 | log_name, log_pos)); |
| 1318 | DBUG_PRINT("info" , ("(%s) until_log_name='%s', until_log_pos=%llu" , |
| 1319 | until_condition == UNTIL_MASTER_POS ? "master" : "relay" , |
| 1320 | until_log_name, until_log_pos)); |
| 1321 | |
| 1322 | if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) |
| 1323 | { |
| 1324 | /* |
| 1325 | We have no cached comparison results so we should compare log names |
| 1326 | and cache result. |
| 1327 | If we are after RESET SLAVE, and the SQL slave thread has not processed |
| 1328 | any event yet, it could be that group_master_log_name is "". In that case, |
| 1329 | just wait for more events (as there is no sensible comparison to do). |
| 1330 | */ |
| 1331 | |
| 1332 | if (*log_name) |
| 1333 | { |
| 1334 | const char *basename= log_name + dirname_length(log_name); |
| 1335 | |
| 1336 | const char *q= (const char*)(fn_ext(basename)+1); |
| 1337 | if (strncmp(basename, until_log_name, (int)(q-basename)) == 0) |
| 1338 | { |
| 1339 | /* Now compare extensions. */ |
| 1340 | char *q_end; |
| 1341 | ulong log_name_extension= strtoul(q, &q_end, 10); |
| 1342 | if (log_name_extension < until_log_name_extension) |
| 1343 | until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS; |
| 1344 | else |
| 1345 | until_log_names_cmp_result= |
| 1346 | (log_name_extension > until_log_name_extension) ? |
| 1347 | UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ; |
| 1348 | } |
| 1349 | else |
| 1350 | { |
| 1351 | /* Probably error so we aborting */ |
| 1352 | sql_print_error("Slave SQL thread is stopped because UNTIL " |
| 1353 | "condition is bad." ); |
| 1354 | DBUG_RETURN(TRUE); |
| 1355 | } |
| 1356 | } |
| 1357 | else |
| 1358 | DBUG_RETURN(until_log_pos == 0); |
| 1359 | } |
| 1360 | |
| 1361 | DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && |
| 1362 | log_pos >= until_log_pos) || |
| 1363 | until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER)); |
| 1364 | } |
| 1365 | |
| 1366 | |
| 1367 | bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, |
| 1368 | rpl_group_info *rgi) |
| 1369 | { |
| 1370 | int error= 0; |
| 1371 | DBUG_ENTER("Relay_log_info::stmt_done" ); |
| 1372 | |
| 1373 | DBUG_ASSERT(!belongs_to_client()); |
| 1374 | DBUG_ASSERT(rgi->rli == this); |
| 1375 | /* |
| 1376 | If in a transaction, and if the slave supports transactions, just |
| 1377 | inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN |
| 1378 | (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with |
| 1379 | BEGIN/COMMIT, not with SET AUTOCOMMIT= . |
| 1380 | |
| 1381 | We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN |
| 1382 | is also used for single row transactions. |
| 1383 | |
| 1384 | CAUTION: opt_using_transactions means innodb || bdb ; suppose the |
| 1385 | master supports InnoDB and BDB, but the slave supports only BDB, |
| 1386 | problems will arise: - suppose an InnoDB table is created on the |
| 1387 | master, - then it will be MyISAM on the slave - but as |
| 1388 | opt_using_transactions is true, the slave will believe he is |
| 1389 | transactional with the MyISAM table. And problems will come when |
| 1390 | one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will |
| 1391 | resume at BEGIN whereas there has not been any rollback). This is |
| 1392 | the problem of using opt_using_transactions instead of a finer |
| 1393 | "does the slave support _transactional handler used on the |
| 1394 | master_". |
| 1395 | |
| 1396 | More generally, we'll have problems when a query mixes a |
| 1397 | transactional handler and MyISAM and STOP SLAVE is issued in the |
| 1398 | middle of the "transaction". START SLAVE will resume at BEGIN |
| 1399 | while the MyISAM table has already been updated. |
| 1400 | */ |
| 1401 | if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && |
| 1402 | opt_using_transactions) |
| 1403 | rgi->inc_event_relay_log_pos(); |
| 1404 | else |
| 1405 | { |
| 1406 | inc_group_relay_log_pos(event_master_log_pos, rgi); |
| 1407 | if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi)) |
| 1408 | { |
| 1409 | report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), |
| 1410 | "Failed to update GTID state in %s.%s, slave state may become " |
| 1411 | "inconsistent: %d: %s" , |
| 1412 | "mysql" , rpl_gtid_slave_state_table_name.str, |
| 1413 | thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); |
| 1414 | /* |
| 1415 | At this point we are not in a transaction (for example after DDL), |
| 1416 | so we can not roll back. Anyway, normally updates to the slave |
| 1417 | state table should not fail, and if they do, at least we made the |
| 1418 | DBA aware of the problem in the error log. |
| 1419 | */ |
| 1420 | } |
| 1421 | DBUG_EXECUTE_IF("inject_crash_before_flush_rli" , DBUG_SUICIDE();); |
| 1422 | if (mi->using_gtid == Master_info::USE_GTID_NO) |
| 1423 | if (flush()) |
| 1424 | error= 1; |
| 1425 | DBUG_EXECUTE_IF("inject_crash_after_flush_rli" , DBUG_SUICIDE();); |
| 1426 | } |
| 1427 | DBUG_RETURN(error); |
| 1428 | } |
| 1429 | |
| 1430 | |
| 1431 | int |
| 1432 | Relay_log_info::alloc_inuse_relaylog(const char *name) |
| 1433 | { |
| 1434 | inuse_relaylog *ir; |
| 1435 | uint32 gtid_count; |
| 1436 | rpl_gtid *gtid_list; |
| 1437 | |
| 1438 | if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) |
| 1439 | { |
| 1440 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); |
| 1441 | return 1; |
| 1442 | } |
| 1443 | gtid_count= relay_log_state.count(); |
| 1444 | if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, |
| 1445 | MYF(MY_WME)))) |
| 1446 | { |
| 1447 | my_free(ir); |
| 1448 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); |
| 1449 | return 1; |
| 1450 | } |
| 1451 | if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) |
| 1452 | { |
| 1453 | my_free(gtid_list); |
| 1454 | my_free(ir); |
| 1455 | DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); |
| 1456 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
| 1457 | return 1; |
| 1458 | } |
| 1459 | ir->rli= this; |
| 1460 | strmake_buf(ir->name, name); |
| 1461 | ir->relay_log_state= gtid_list; |
| 1462 | ir->relay_log_state_count= gtid_count; |
| 1463 | |
| 1464 | if (!inuse_relaylog_list) |
| 1465 | inuse_relaylog_list= ir; |
| 1466 | else |
| 1467 | { |
| 1468 | last_inuse_relaylog->completed= true; |
| 1469 | last_inuse_relaylog->next= ir; |
| 1470 | } |
| 1471 | last_inuse_relaylog= ir; |
| 1472 | |
| 1473 | return 0; |
| 1474 | } |
| 1475 | |
| 1476 | |
| 1477 | void |
| 1478 | Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) |
| 1479 | { |
| 1480 | my_free(ir->relay_log_state); |
| 1481 | my_free(ir); |
| 1482 | } |
| 1483 | |
| 1484 | |
| 1485 | void |
| 1486 | Relay_log_info::reset_inuse_relaylog() |
| 1487 | { |
| 1488 | inuse_relaylog *cur= inuse_relaylog_list; |
| 1489 | while (cur) |
| 1490 | { |
| 1491 | DBUG_ASSERT(cur->queued_count == cur->dequeued_count); |
| 1492 | inuse_relaylog *next= cur->next; |
| 1493 | free_inuse_relaylog(cur); |
| 1494 | cur= next; |
| 1495 | } |
| 1496 | inuse_relaylog_list= last_inuse_relaylog= NULL; |
| 1497 | } |
| 1498 | |
| 1499 | |
| 1500 | int |
| 1501 | Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) |
| 1502 | { |
| 1503 | int res= 0; |
| 1504 | while (count) |
| 1505 | { |
| 1506 | if (relay_log_state.update_nolock(gtid_list, false)) |
| 1507 | res= 1; |
| 1508 | ++gtid_list; |
| 1509 | --count; |
| 1510 | } |
| 1511 | return res; |
| 1512 | } |
| 1513 | |
| 1514 | |
| 1515 | #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) |
| 1516 | struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; }; |
| 1517 | |
| 1518 | static int |
| 1519 | scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, |
| 1520 | LEX_CSTRING *tablename, void **out_hton) |
| 1521 | { |
| 1522 | TABLE_LIST tlist; |
| 1523 | TABLE *UNINIT_VAR(table); |
| 1524 | bool table_opened= false; |
| 1525 | bool table_scanned= false; |
| 1526 | struct gtid_pos_element tmp_entry, *entry; |
| 1527 | int err= 0; |
| 1528 | |
| 1529 | thd->reset_for_next_command(); |
| 1530 | tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ); |
| 1531 | if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) |
| 1532 | goto end; |
| 1533 | table_opened= true; |
| 1534 | table= tlist.table; |
| 1535 | |
| 1536 | if ((err= gtid_check_rpl_slave_state_table(table))) |
| 1537 | goto end; |
| 1538 | |
| 1539 | bitmap_set_all(table->read_set); |
| 1540 | if (unlikely(err= table->file->ha_rnd_init_with_error(1))) |
| 1541 | goto end; |
| 1542 | |
| 1543 | table_scanned= true; |
| 1544 | for (;;) |
| 1545 | { |
| 1546 | uint32 domain_id, server_id; |
| 1547 | uint64 sub_id, seq_no; |
| 1548 | uchar *rec; |
| 1549 | |
| 1550 | if ((err= table->file->ha_rnd_next(table->record[0]))) |
| 1551 | { |
| 1552 | if (err == HA_ERR_END_OF_FILE) |
| 1553 | break; |
| 1554 | else |
| 1555 | { |
| 1556 | table->file->print_error(err, MYF(0)); |
| 1557 | goto end; |
| 1558 | } |
| 1559 | } |
| 1560 | domain_id= (uint32)table->field[0]->val_int(); |
| 1561 | sub_id= (ulonglong)table->field[1]->val_int(); |
| 1562 | server_id= (uint32)table->field[2]->val_int(); |
| 1563 | seq_no= (ulonglong)table->field[3]->val_int(); |
| 1564 | DBUG_PRINT("info" , ("Read slave state row: %u-%u-%lu sub_id=%lu\n" , |
| 1565 | (unsigned)domain_id, (unsigned)server_id, |
| 1566 | (ulong)seq_no, (ulong)sub_id)); |
| 1567 | |
| 1568 | tmp_entry.sub_id= sub_id; |
| 1569 | tmp_entry.gtid.domain_id= domain_id; |
| 1570 | tmp_entry.gtid.server_id= server_id; |
| 1571 | tmp_entry.gtid.seq_no= seq_no; |
| 1572 | tmp_entry.hton= table->s->db_type(); |
| 1573 | if ((err= insert_dynamic(array, (uchar *)&tmp_entry))) |
| 1574 | { |
| 1575 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
| 1576 | goto end; |
| 1577 | } |
| 1578 | |
| 1579 | if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0))) |
| 1580 | { |
| 1581 | entry= (struct gtid_pos_element *)rec; |
| 1582 | if (entry->sub_id >= sub_id) |
| 1583 | continue; |
| 1584 | entry->sub_id= sub_id; |
| 1585 | DBUG_ASSERT(entry->gtid.domain_id == domain_id); |
| 1586 | entry->gtid.server_id= server_id; |
| 1587 | entry->gtid.seq_no= seq_no; |
| 1588 | entry->hton= table->s->db_type(); |
| 1589 | } |
| 1590 | else |
| 1591 | { |
| 1592 | if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry), |
| 1593 | MYF(MY_WME)))) |
| 1594 | { |
| 1595 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); |
| 1596 | err= 1; |
| 1597 | goto end; |
| 1598 | } |
| 1599 | entry->sub_id= sub_id; |
| 1600 | entry->gtid.domain_id= domain_id; |
| 1601 | entry->gtid.server_id= server_id; |
| 1602 | entry->gtid.seq_no= seq_no; |
| 1603 | entry->hton= table->s->db_type(); |
| 1604 | if ((err= my_hash_insert(hash, (uchar *)entry))) |
| 1605 | { |
| 1606 | my_free(entry); |
| 1607 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
| 1608 | goto end; |
| 1609 | } |
| 1610 | } |
| 1611 | } |
| 1612 | err= 0; /* Clear HA_ERR_END_OF_FILE */ |
| 1613 | |
| 1614 | end: |
| 1615 | if (table_scanned) |
| 1616 | { |
| 1617 | table->file->ha_index_or_rnd_end(); |
| 1618 | ha_commit_trans(thd, FALSE); |
| 1619 | ha_commit_trans(thd, TRUE); |
| 1620 | } |
| 1621 | if (table_opened) |
| 1622 | { |
| 1623 | *out_hton= table->s->db_type(); |
| 1624 | close_thread_tables(thd); |
| 1625 | thd->mdl_context.release_transactional_locks(); |
| 1626 | } |
| 1627 | return err; |
| 1628 | } |
| 1629 | |
| 1630 | |
| 1631 | /* |
| 1632 | Look for all tables mysql.gtid_slave_pos*. Read all rows from each such |
| 1633 | table found into ARRAY. For each domain id, put the row with highest sub_id |
| 1634 | into HASH. |
| 1635 | */ |
| 1636 | static int |
| 1637 | scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *), |
| 1638 | void *cb_data) |
| 1639 | { |
| 1640 | char path[FN_REFLEN]; |
| 1641 | MY_DIR *dirp; |
| 1642 | |
| 1643 | thd->reset_for_next_command(); |
| 1644 | if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str)) |
| 1645 | return 1; |
| 1646 | |
| 1647 | build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "" , "" , 0); |
| 1648 | if (!(dirp= my_dir(path, MYF(MY_DONT_SORT)))) |
| 1649 | { |
| 1650 | my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno); |
| 1651 | close_thread_tables(thd); |
| 1652 | thd->mdl_context.release_transactional_locks(); |
| 1653 | return 1; |
| 1654 | } |
| 1655 | else |
| 1656 | { |
| 1657 | size_t i; |
| 1658 | Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files); |
| 1659 | Discovered_table_list tl(thd, &files); |
| 1660 | int err; |
| 1661 | |
| 1662 | err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false); |
| 1663 | my_dirend(dirp); |
| 1664 | close_thread_tables(thd); |
| 1665 | thd->mdl_context.release_transactional_locks(); |
| 1666 | if (err) |
| 1667 | return err; |
| 1668 | |
| 1669 | for (i = 0; i < files.elements(); ++i) |
| 1670 | { |
| 1671 | if (strncmp(files.at(i)->str, |
| 1672 | rpl_gtid_slave_state_table_name.str, |
| 1673 | rpl_gtid_slave_state_table_name.length) == 0) |
| 1674 | { |
| 1675 | if ((err= (*cb)(thd, files.at(i), cb_data))) |
| 1676 | return err; |
| 1677 | } |
| 1678 | } |
| 1679 | } |
| 1680 | |
| 1681 | return 0; |
| 1682 | } |
| 1683 | |
| 1684 | |
| 1685 | struct load_gtid_state_cb_data { |
| 1686 | HASH *hash; |
| 1687 | DYNAMIC_ARRAY *array; |
| 1688 | struct rpl_slave_state::gtid_pos_table *table_list; |
| 1689 | struct rpl_slave_state::gtid_pos_table *default_entry; |
| 1690 | }; |
| 1691 | |
| 1692 | static int |
| 1693 | process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton, |
| 1694 | struct load_gtid_state_cb_data *data) |
| 1695 | { |
| 1696 | struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr; |
| 1697 | bool is_default= |
| 1698 | (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0); |
| 1699 | |
| 1700 | /* |
| 1701 | Ignore tables with duplicate storage engine, with a warning. |
| 1702 | Prefer the default mysql.gtid_slave_pos over another table |
| 1703 | mysql.gtid_slave_posXXX with the same storage engine. |
| 1704 | */ |
| 1705 | next_ptr= &data->table_list; |
| 1706 | entry= data->table_list; |
| 1707 | while (entry) |
| 1708 | { |
| 1709 | if (entry->table_hton == hton) |
| 1710 | { |
| 1711 | static const char *warning_msg= "Ignoring redundant table mysql.%s " |
| 1712 | "since mysql.%s has the same storage engine" ; |
| 1713 | if (!is_default) |
| 1714 | { |
| 1715 | /* Ignore the redundant table. */ |
| 1716 | sql_print_warning(warning_msg, table_name->str, entry->table_name.str); |
| 1717 | return 0; |
| 1718 | } |
| 1719 | else |
| 1720 | { |
| 1721 | sql_print_warning(warning_msg, entry->table_name.str, table_name->str); |
| 1722 | /* Delete the redundant table, and proceed to add this one instead. */ |
| 1723 | *next_ptr= entry->next; |
| 1724 | my_free(entry); |
| 1725 | break; |
| 1726 | } |
| 1727 | } |
| 1728 | next_ptr= &entry->next; |
| 1729 | entry= entry->next; |
| 1730 | } |
| 1731 | |
| 1732 | p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, |
| 1733 | hton, rpl_slave_state::GTID_POS_AVAILABLE); |
| 1734 | if (!p) |
| 1735 | return 1; |
| 1736 | p->next= data->table_list; |
| 1737 | data->table_list= p; |
| 1738 | if (is_default) |
| 1739 | data->default_entry= p; |
| 1740 | return 0; |
| 1741 | } |
| 1742 | |
| 1743 | |
| 1744 | /* |
| 1745 | Put tables corresponding to @@gtid_pos_auto_engines at the end of the list, |
| 1746 | marked to be auto-created if needed. |
| 1747 | */ |
| 1748 | static int |
| 1749 | gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr) |
| 1750 | { |
| 1751 | plugin_ref *auto_engines; |
| 1752 | int err= 0; |
| 1753 | mysql_mutex_lock(&LOCK_global_system_variables); |
| 1754 | for (auto_engines= opt_gtid_pos_auto_plugins; |
| 1755 | !err && auto_engines && *auto_engines; |
| 1756 | ++auto_engines) |
| 1757 | { |
| 1758 | void *hton= plugin_hton(*auto_engines); |
| 1759 | char buf[FN_REFLEN+1]; |
| 1760 | LEX_CSTRING table_name; |
| 1761 | char *p; |
| 1762 | rpl_slave_state::gtid_pos_table *entry, **next_ptr; |
| 1763 | |
| 1764 | /* See if this engine is already in the list. */ |
| 1765 | next_ptr= list_ptr; |
| 1766 | entry= *list_ptr; |
| 1767 | while (entry) |
| 1768 | { |
| 1769 | if (entry->table_hton == hton) |
| 1770 | break; |
| 1771 | next_ptr= &entry->next; |
| 1772 | entry= entry->next; |
| 1773 | } |
| 1774 | if (entry) |
| 1775 | continue; |
| 1776 | |
| 1777 | /* Add an auto-create entry for this engine at end of list. */ |
| 1778 | p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN); |
| 1779 | p= strmake(p, "_" , FN_REFLEN - (p - buf)); |
| 1780 | p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf)); |
| 1781 | table_name.str= buf; |
| 1782 | table_name.length= p - buf; |
| 1783 | table_case_convert(const_cast<char*>(table_name.str), |
| 1784 | static_cast<uint>(table_name.length)); |
| 1785 | entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table |
| 1786 | (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE); |
| 1787 | if (!entry) |
| 1788 | { |
| 1789 | err= 1; |
| 1790 | break; |
| 1791 | } |
| 1792 | *next_ptr= entry; |
| 1793 | } |
| 1794 | mysql_mutex_unlock(&LOCK_global_system_variables); |
| 1795 | return err; |
| 1796 | } |
| 1797 | |
| 1798 | |
| 1799 | static int |
| 1800 | load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg) |
| 1801 | { |
| 1802 | int err; |
| 1803 | load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); |
| 1804 | void *hton; |
| 1805 | |
| 1806 | if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, |
| 1807 | table_name, &hton))) |
| 1808 | return err; |
| 1809 | return process_gtid_pos_table(thd, table_name, hton, data); |
| 1810 | } |
| 1811 | |
| 1812 | |
| 1813 | int |
| 1814 | rpl_load_gtid_slave_state(THD *thd) |
| 1815 | { |
| 1816 | bool array_inited= false; |
| 1817 | struct gtid_pos_element tmp_entry, *entry; |
| 1818 | HASH hash; |
| 1819 | DYNAMIC_ARRAY array; |
| 1820 | int err= 0; |
| 1821 | uint32 i; |
| 1822 | load_gtid_state_cb_data cb_data; |
| 1823 | DBUG_ENTER("rpl_load_gtid_slave_state" ); |
| 1824 | |
| 1825 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1826 | bool loaded= rpl_global_gtid_slave_state->loaded; |
| 1827 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1828 | if (loaded) |
| 1829 | DBUG_RETURN(0); |
| 1830 | |
| 1831 | cb_data.table_list= NULL; |
| 1832 | cb_data.default_entry= NULL; |
| 1833 | my_hash_init(&hash, &my_charset_bin, 32, |
| 1834 | offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), |
| 1835 | sizeof(uint32), NULL, my_free, HASH_UNIQUE); |
| 1836 | if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0)))) |
| 1837 | goto end; |
| 1838 | array_inited= true; |
| 1839 | |
| 1840 | cb_data.hash = &hash; |
| 1841 | cb_data.array = &array; |
| 1842 | if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data))) |
| 1843 | goto end; |
| 1844 | |
| 1845 | if (!cb_data.default_entry) |
| 1846 | { |
| 1847 | /* |
| 1848 | If the mysql.gtid_slave_pos table does not exist, but at least one other |
| 1849 | table is available, arbitrarily pick the first in the list to use as |
| 1850 | default. |
| 1851 | */ |
| 1852 | cb_data.default_entry= cb_data.table_list; |
| 1853 | } |
| 1854 | if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) |
| 1855 | goto end; |
| 1856 | |
| 1857 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1858 | if (rpl_global_gtid_slave_state->loaded) |
| 1859 | { |
| 1860 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1861 | goto end; |
| 1862 | } |
| 1863 | |
| 1864 | if (!cb_data.table_list) |
| 1865 | { |
| 1866 | my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql" , |
| 1867 | rpl_gtid_slave_state_table_name.str); |
| 1868 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1869 | err= 1; |
| 1870 | goto end; |
| 1871 | } |
| 1872 | |
| 1873 | for (i= 0; i < array.elements; ++i) |
| 1874 | { |
| 1875 | get_dynamic(&array, (uchar *)&tmp_entry, i); |
| 1876 | if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, |
| 1877 | tmp_entry.gtid.server_id, |
| 1878 | tmp_entry.sub_id, |
| 1879 | tmp_entry.gtid.seq_no, |
| 1880 | tmp_entry.hton, |
| 1881 | NULL))) |
| 1882 | { |
| 1883 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1884 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
| 1885 | goto end; |
| 1886 | } |
| 1887 | } |
| 1888 | |
| 1889 | for (i= 0; i < hash.records; ++i) |
| 1890 | { |
| 1891 | entry= (struct gtid_pos_element *)my_hash_element(&hash, i); |
| 1892 | if (opt_bin_log && |
| 1893 | mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, |
| 1894 | entry->gtid.seq_no)) |
| 1895 | { |
| 1896 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1897 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
| 1898 | goto end; |
| 1899 | } |
| 1900 | } |
| 1901 | |
| 1902 | rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, |
| 1903 | cb_data.default_entry); |
| 1904 | cb_data.table_list= NULL; |
| 1905 | rpl_global_gtid_slave_state->loaded= true; |
| 1906 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1907 | |
| 1908 | end: |
| 1909 | if (array_inited) |
| 1910 | delete_dynamic(&array); |
| 1911 | my_hash_free(&hash); |
| 1912 | if (cb_data.table_list) |
| 1913 | rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); |
| 1914 | DBUG_RETURN(err); |
| 1915 | } |
| 1916 | |
| 1917 | |
| 1918 | static int |
| 1919 | find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg) |
| 1920 | { |
| 1921 | load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); |
| 1922 | TABLE_LIST tlist; |
| 1923 | TABLE *table= NULL; |
| 1924 | int err; |
| 1925 | |
| 1926 | thd->reset_for_next_command(); |
| 1927 | tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ); |
| 1928 | if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) |
| 1929 | goto end; |
| 1930 | table= tlist.table; |
| 1931 | |
| 1932 | if ((err= gtid_check_rpl_slave_state_table(table))) |
| 1933 | goto end; |
| 1934 | err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data); |
| 1935 | |
| 1936 | end: |
| 1937 | if (table) |
| 1938 | { |
| 1939 | ha_commit_trans(thd, FALSE); |
| 1940 | ha_commit_trans(thd, TRUE); |
| 1941 | close_thread_tables(thd); |
| 1942 | thd->mdl_context.release_transactional_locks(); |
| 1943 | } |
| 1944 | |
| 1945 | return err; |
| 1946 | } |
| 1947 | |
| 1948 | |
| 1949 | /* |
| 1950 | Re-compute the list of available mysql.gtid_slave_posXXX tables. |
| 1951 | |
| 1952 | This is done at START SLAVE to pick up any newly created tables without |
| 1953 | requiring server restart. |
| 1954 | */ |
| 1955 | int |
| 1956 | find_gtid_slave_pos_tables(THD *thd) |
| 1957 | { |
| 1958 | int err= 0; |
| 1959 | load_gtid_state_cb_data cb_data; |
| 1960 | uint num_running; |
| 1961 | |
| 1962 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1963 | bool loaded= rpl_global_gtid_slave_state->loaded; |
| 1964 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1965 | if (!loaded) |
| 1966 | return 0; |
| 1967 | |
| 1968 | cb_data.table_list= NULL; |
| 1969 | cb_data.default_entry= NULL; |
| 1970 | if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data))) |
| 1971 | goto end; |
| 1972 | |
| 1973 | if (!cb_data.table_list) |
| 1974 | { |
| 1975 | my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql" , |
| 1976 | rpl_gtid_slave_state_table_name.str); |
| 1977 | err= 1; |
| 1978 | goto end; |
| 1979 | } |
| 1980 | if (!cb_data.default_entry) |
| 1981 | { |
| 1982 | /* |
| 1983 | If the mysql.gtid_slave_pos table does not exist, but at least one other |
| 1984 | table is available, arbitrarily pick the first in the list to use as |
| 1985 | default. |
| 1986 | */ |
| 1987 | cb_data.default_entry= cb_data.table_list; |
| 1988 | } |
| 1989 | if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) |
| 1990 | goto end; |
| 1991 | |
| 1992 | mysql_mutex_lock(&LOCK_active_mi); |
| 1993 | num_running= any_slave_sql_running(true); |
| 1994 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 1995 | if (num_running <= 1) |
| 1996 | { |
| 1997 | /* |
| 1998 | If no slave is running now, the count will be 1, since this SQL thread |
| 1999 | which is starting is included in the count. In this case, we can safely |
| 2000 | replace the list, no-one can be trying to read it without lock. |
| 2001 | */ |
| 2002 | DBUG_ASSERT(num_running == 1); |
| 2003 | rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, |
| 2004 | cb_data.default_entry); |
| 2005 | cb_data.table_list= NULL; |
| 2006 | } |
| 2007 | else |
| 2008 | { |
| 2009 | /* |
| 2010 | If there are SQL threads running, we cannot safely remove the old list. |
| 2011 | However we can add new entries, and warn about any tables that |
| 2012 | disappeared, but may still be visible to running SQL threads. |
| 2013 | */ |
| 2014 | rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr; |
| 2015 | |
| 2016 | old_entry= (rpl_slave_state::gtid_pos_table *) |
| 2017 | rpl_global_gtid_slave_state->gtid_pos_tables; |
| 2018 | while (old_entry) |
| 2019 | { |
| 2020 | new_entry= cb_data.table_list; |
| 2021 | while (new_entry) |
| 2022 | { |
| 2023 | if (new_entry->table_hton == old_entry->table_hton) |
| 2024 | break; |
| 2025 | new_entry= new_entry->next; |
| 2026 | } |
| 2027 | if (!new_entry) |
| 2028 | sql_print_warning("The table mysql.%s was removed. " |
| 2029 | "This change will not take full effect " |
| 2030 | "until all SQL threads have been restarted" , |
| 2031 | old_entry->table_name.str); |
| 2032 | old_entry= old_entry->next; |
| 2033 | } |
| 2034 | next_ptr_ptr= &cb_data.table_list; |
| 2035 | new_entry= cb_data.table_list; |
| 2036 | while (new_entry) |
| 2037 | { |
| 2038 | /* Check if we already have a table with this storage engine. */ |
| 2039 | old_entry= (rpl_slave_state::gtid_pos_table *) |
| 2040 | rpl_global_gtid_slave_state->gtid_pos_tables; |
| 2041 | while (old_entry) |
| 2042 | { |
| 2043 | if (new_entry->table_hton == old_entry->table_hton) |
| 2044 | break; |
| 2045 | old_entry= old_entry->next; |
| 2046 | } |
| 2047 | if (old_entry) |
| 2048 | { |
| 2049 | /* This new_entry is already available in the list. */ |
| 2050 | next_ptr_ptr= &new_entry->next; |
| 2051 | new_entry= new_entry->next; |
| 2052 | } |
| 2053 | else |
| 2054 | { |
| 2055 | /* Move this new_entry to the list. */ |
| 2056 | rpl_slave_state::gtid_pos_table *next= new_entry->next; |
| 2057 | rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry); |
| 2058 | *next_ptr_ptr= next; |
| 2059 | new_entry= next; |
| 2060 | } |
| 2061 | } |
| 2062 | } |
| 2063 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
| 2064 | mysql_mutex_unlock(&LOCK_active_mi); |
| 2065 | |
| 2066 | end: |
| 2067 | if (cb_data.table_list) |
| 2068 | rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); |
| 2069 | return err; |
| 2070 | } |
| 2071 | |
| 2072 | |
| 2073 | void |
| 2074 | rpl_group_info::reinit(Relay_log_info *rli) |
| 2075 | { |
| 2076 | this->rli= rli; |
| 2077 | tables_to_lock= NULL; |
| 2078 | tables_to_lock_count= 0; |
| 2079 | trans_retries= 0; |
| 2080 | last_event_start_time= 0; |
| 2081 | gtid_sub_id= 0; |
| 2082 | commit_id= 0; |
| 2083 | gtid_pending= false; |
| 2084 | worker_error= 0; |
| 2085 | row_stmt_start_timestamp= 0; |
| 2086 | long_find_row_note_printed= false; |
| 2087 | did_mark_start_commit= false; |
| 2088 | gtid_ev_flags2= 0; |
| 2089 | last_master_timestamp = 0; |
| 2090 | gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; |
| 2091 | speculation= SPECULATE_NO; |
| 2092 | commit_orderer.reinit(); |
| 2093 | } |
| 2094 | |
| 2095 | rpl_group_info::rpl_group_info(Relay_log_info *rli) |
| 2096 | : thd(0), wait_commit_sub_id(0), |
| 2097 | wait_commit_group_info(0), parallel_entry(0), |
| 2098 | deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) |
| 2099 | { |
| 2100 | reinit(rli); |
| 2101 | bzero(¤t_gtid, sizeof(current_gtid)); |
| 2102 | mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, |
| 2103 | MY_MUTEX_INIT_FAST); |
| 2104 | mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); |
| 2105 | } |
| 2106 | |
| 2107 | |
| 2108 | rpl_group_info::~rpl_group_info() |
| 2109 | { |
| 2110 | free_annotate_event(); |
| 2111 | delete deferred_events; |
| 2112 | mysql_mutex_destroy(&sleep_lock); |
| 2113 | mysql_cond_destroy(&sleep_cond); |
| 2114 | } |
| 2115 | |
| 2116 | |
| 2117 | int |
| 2118 | event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) |
| 2119 | { |
| 2120 | uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id); |
| 2121 | if (!sub_id) |
| 2122 | { |
| 2123 | /* Out of memory caused hash insertion to fail. */ |
| 2124 | return 1; |
| 2125 | } |
| 2126 | rgi->gtid_sub_id= sub_id; |
| 2127 | rgi->current_gtid.domain_id= gev->domain_id; |
| 2128 | rgi->current_gtid.server_id= gev->server_id; |
| 2129 | rgi->current_gtid.seq_no= gev->seq_no; |
| 2130 | rgi->commit_id= gev->commit_id; |
| 2131 | rgi->gtid_pending= true; |
| 2132 | return 0; |
| 2133 | } |
| 2134 | |
| 2135 | |
| 2136 | void |
| 2137 | delete_or_keep_event_post_apply(rpl_group_info *rgi, |
| 2138 | Log_event_type typ, Log_event *ev) |
| 2139 | { |
| 2140 | /* |
| 2141 | ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be |
| 2142 | thread-safe for parallel replication. |
| 2143 | */ |
| 2144 | |
| 2145 | switch (typ) { |
| 2146 | case FORMAT_DESCRIPTION_EVENT: |
| 2147 | /* |
| 2148 | Format_description_log_event should not be deleted because it |
| 2149 | will be used to read info about the relay log's format; |
| 2150 | it will be deleted when the SQL thread does not need it, |
| 2151 | i.e. when this thread terminates. |
| 2152 | */ |
| 2153 | break; |
| 2154 | case ANNOTATE_ROWS_EVENT: |
| 2155 | /* |
| 2156 | Annotate_rows event should not be deleted because after it has |
| 2157 | been applied, thd->query points to the string inside this event. |
| 2158 | The thd->query will be used to generate new Annotate_rows event |
| 2159 | during applying the subsequent Rows events. |
| 2160 | */ |
| 2161 | rgi->set_annotate_event((Annotate_rows_log_event*) ev); |
| 2162 | break; |
| 2163 | case DELETE_ROWS_EVENT_V1: |
| 2164 | case UPDATE_ROWS_EVENT_V1: |
| 2165 | case WRITE_ROWS_EVENT_V1: |
| 2166 | case DELETE_ROWS_EVENT: |
| 2167 | case UPDATE_ROWS_EVENT: |
| 2168 | case WRITE_ROWS_EVENT: |
| 2169 | case WRITE_ROWS_COMPRESSED_EVENT: |
| 2170 | case DELETE_ROWS_COMPRESSED_EVENT: |
| 2171 | case UPDATE_ROWS_COMPRESSED_EVENT: |
| 2172 | case WRITE_ROWS_COMPRESSED_EVENT_V1: |
| 2173 | case UPDATE_ROWS_COMPRESSED_EVENT_V1: |
| 2174 | case DELETE_ROWS_COMPRESSED_EVENT_V1: |
| 2175 | /* |
| 2176 | After the last Rows event has been applied, the saved Annotate_rows |
| 2177 | event (if any) is not needed anymore and can be deleted. |
| 2178 | */ |
| 2179 | if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) |
| 2180 | rgi->free_annotate_event(); |
| 2181 | /* fall through */ |
| 2182 | default: |
| 2183 | DBUG_PRINT("info" , ("Deleting the event after it has been executed" )); |
| 2184 | if (!rgi->is_deferred_event(ev)) |
| 2185 | delete ev; |
| 2186 | break; |
| 2187 | } |
| 2188 | } |
| 2189 | |
| 2190 | |
| 2191 | void rpl_group_info::cleanup_context(THD *thd, bool error) |
| 2192 | { |
| 2193 | DBUG_ENTER("rpl_group_info::cleanup_context" ); |
| 2194 | DBUG_PRINT("enter" , ("error: %d" , (int) error)); |
| 2195 | |
| 2196 | DBUG_ASSERT(this->thd == thd); |
| 2197 | /* |
| 2198 | 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, |
| 2199 | may have opened tables, which we cannot be sure have been closed (because |
| 2200 | maybe the Rows_log_event have not been found or will not be, because slave |
| 2201 | SQL thread is stopping, or relay log has a missing tail etc). So we close |
| 2202 | all thread's tables. And so the table mappings have to be cancelled. |
| 2203 | 2) Rows_log_event::do_apply_event() may even have started statements or |
| 2204 | transactions on them, which we need to rollback in case of error. |
| 2205 | 3) If finding a Format_description_log_event after a BEGIN, we also need |
| 2206 | to rollback before continuing with the next events. |
| 2207 | 4) so we need this "context cleanup" function. |
| 2208 | */ |
| 2209 | if (unlikely(error)) |
| 2210 | { |
| 2211 | trans_rollback_stmt(thd); // if a "statement transaction" |
| 2212 | /* trans_rollback() also resets OPTION_GTID_BEGIN */ |
| 2213 | trans_rollback(thd); // if a "real transaction" |
| 2214 | /* |
| 2215 | Now that we have rolled back the transaction, make sure we do not |
| 2216 | erroneously update the GTID position. |
| 2217 | */ |
| 2218 | gtid_pending= false; |
| 2219 | } |
| 2220 | m_table_map.clear_tables(); |
| 2221 | slave_close_thread_tables(thd); |
| 2222 | |
| 2223 | if (unlikely(error)) |
| 2224 | { |
| 2225 | thd->mdl_context.release_transactional_locks(); |
| 2226 | |
| 2227 | if (thd == rli->sql_driver_thd) |
| 2228 | { |
| 2229 | /* |
| 2230 | Reset flags. This is needed to handle incident events and errors in |
| 2231 | the relay log noticed by the sql driver thread. |
| 2232 | */ |
| 2233 | rli->clear_flag(Relay_log_info::IN_STMT); |
| 2234 | rli->clear_flag(Relay_log_info::IN_TRANSACTION); |
| 2235 | } |
| 2236 | |
| 2237 | /* |
| 2238 | Ensure we always release the domain for others to process, when using |
| 2239 | --gtid-ignore-duplicates. |
| 2240 | */ |
| 2241 | if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL) |
| 2242 | rpl_global_gtid_slave_state->release_domain_owner(this); |
| 2243 | } |
| 2244 | |
| 2245 | /* |
| 2246 | Cleanup for the flags that have been set at do_apply_event. |
| 2247 | */ |
| 2248 | thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS | |
| 2249 | OPTION_RELAXED_UNIQUE_CHECKS | |
| 2250 | OPTION_NO_CHECK_CONSTRAINT_CHECKS); |
| 2251 | |
| 2252 | /* |
| 2253 | Reset state related to long_find_row notes in the error log: |
| 2254 | - timestamp |
| 2255 | - flag that decides whether the slave prints or not |
| 2256 | */ |
| 2257 | reset_row_stmt_start_timestamp(); |
| 2258 | unset_long_find_row_note_printed(); |
| 2259 | |
| 2260 | DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x" , { |
| 2261 | if (current_gtid.domain_id == 100) |
| 2262 | my_sleep(50000); |
| 2263 | };); |
| 2264 | |
| 2265 | DBUG_VOID_RETURN; |
| 2266 | } |
| 2267 | |
| 2268 | |
| 2269 | void rpl_group_info::clear_tables_to_lock() |
| 2270 | { |
| 2271 | DBUG_ENTER("rpl_group_info::clear_tables_to_lock()" ); |
| 2272 | #ifndef DBUG_OFF |
| 2273 | /** |
| 2274 | When replicating in RBR and MyISAM Merge tables are involved |
| 2275 | open_and_lock_tables (called in do_apply_event) appends the |
| 2276 | base tables to the list of tables_to_lock. Then these are |
| 2277 | removed from the list in close_thread_tables (which is called |
| 2278 | before we reach this point). |
| 2279 | |
| 2280 | This assertion just confirms that we get no surprises at this |
| 2281 | point. |
| 2282 | */ |
| 2283 | uint i=0; |
| 2284 | for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; |
| 2285 | DBUG_ASSERT(i == tables_to_lock_count); |
| 2286 | #endif |
| 2287 | |
| 2288 | while (tables_to_lock) |
| 2289 | { |
| 2290 | uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); |
| 2291 | if (tables_to_lock->m_tabledef_valid) |
| 2292 | { |
| 2293 | tables_to_lock->m_tabledef.table_def::~table_def(); |
| 2294 | tables_to_lock->m_tabledef_valid= FALSE; |
| 2295 | } |
| 2296 | |
| 2297 | /* |
| 2298 | If blob fields were used during conversion of field values |
| 2299 | from the master table into the slave table, then we need to |
| 2300 | free the memory used temporarily to store their values before |
| 2301 | copying into the slave's table. |
| 2302 | */ |
| 2303 | if (tables_to_lock->m_conv_table) |
| 2304 | free_blobs(tables_to_lock->m_conv_table); |
| 2305 | |
| 2306 | tables_to_lock= |
| 2307 | static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); |
| 2308 | tables_to_lock_count--; |
| 2309 | my_free(to_free); |
| 2310 | } |
| 2311 | DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); |
| 2312 | DBUG_VOID_RETURN; |
| 2313 | } |
| 2314 | |
| 2315 | |
| 2316 | void rpl_group_info::slave_close_thread_tables(THD *thd) |
| 2317 | { |
| 2318 | DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)" ); |
| 2319 | thd->get_stmt_da()->set_overwrite_status(true); |
| 2320 | thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); |
| 2321 | thd->get_stmt_da()->set_overwrite_status(false); |
| 2322 | |
| 2323 | close_thread_tables(thd); |
| 2324 | /* |
| 2325 | - If transaction rollback was requested due to deadlock |
| 2326 | perform it and release metadata locks. |
| 2327 | - If inside a multi-statement transaction, |
| 2328 | defer the release of metadata locks until the current |
| 2329 | transaction is either committed or rolled back. This prevents |
| 2330 | other statements from modifying the table for the entire |
| 2331 | duration of this transaction. This provides commit ordering |
| 2332 | and guarantees serializability across multiple transactions. |
| 2333 | - If in autocommit mode, or outside a transactional context, |
| 2334 | automatically release metadata locks of the current statement. |
| 2335 | */ |
| 2336 | if (thd->transaction_rollback_request) |
| 2337 | { |
| 2338 | trans_rollback_implicit(thd); |
| 2339 | thd->mdl_context.release_transactional_locks(); |
| 2340 | } |
| 2341 | else if (! thd->in_multi_stmt_transaction_mode()) |
| 2342 | thd->mdl_context.release_transactional_locks(); |
| 2343 | else |
| 2344 | thd->mdl_context.release_statement_locks(); |
| 2345 | |
| 2346 | clear_tables_to_lock(); |
| 2347 | DBUG_VOID_RETURN; |
| 2348 | } |
| 2349 | |
| 2350 | |
| 2351 | |
| 2352 | static void |
| 2353 | mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco, |
| 2354 | rpl_group_info *rgi) |
| 2355 | { |
| 2356 | group_commit_orderer *tmp; |
| 2357 | uint64 count= ++e->count_committing_event_groups; |
| 2358 | /* Signal any following GCO whose wait_count has been reached now. */ |
| 2359 | tmp= gco; |
| 2360 | while ((tmp= tmp->next_gco)) |
| 2361 | { |
| 2362 | uint64 wait_count= tmp->wait_count; |
| 2363 | if (wait_count > count) |
| 2364 | break; |
| 2365 | mysql_cond_broadcast(&tmp->COND_group_commit_orderer); |
| 2366 | } |
| 2367 | } |
| 2368 | |
| 2369 | |
| 2370 | void |
| 2371 | rpl_group_info::mark_start_commit_no_lock() |
| 2372 | { |
| 2373 | if (did_mark_start_commit) |
| 2374 | return; |
| 2375 | did_mark_start_commit= true; |
| 2376 | mark_start_commit_inner(parallel_entry, gco, this); |
| 2377 | } |
| 2378 | |
| 2379 | |
| 2380 | void |
| 2381 | rpl_group_info::mark_start_commit() |
| 2382 | { |
| 2383 | rpl_parallel_entry *e; |
| 2384 | |
| 2385 | if (did_mark_start_commit) |
| 2386 | return; |
| 2387 | did_mark_start_commit= true; |
| 2388 | |
| 2389 | e= this->parallel_entry; |
| 2390 | mysql_mutex_lock(&e->LOCK_parallel_entry); |
| 2391 | mark_start_commit_inner(e, gco, this); |
| 2392 | mysql_mutex_unlock(&e->LOCK_parallel_entry); |
| 2393 | } |
| 2394 | |
| 2395 | |
| 2396 | /* |
| 2397 | Format the current GTID as a string suitable for printing in error messages. |
| 2398 | |
| 2399 | The string is stored in a buffer inside rpl_group_info, so remains valid |
| 2400 | until next call to gtid_info() or until destruction of rpl_group_info. |
| 2401 | |
| 2402 | If no GTID is available, then NULL is returned. |
| 2403 | */ |
| 2404 | char * |
| 2405 | rpl_group_info::gtid_info() |
| 2406 | { |
| 2407 | if (!gtid_sub_id || !current_gtid.seq_no) |
| 2408 | return NULL; |
| 2409 | my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu" , |
| 2410 | current_gtid.domain_id, current_gtid.server_id, |
| 2411 | current_gtid.seq_no); |
| 2412 | return gtid_info_buf; |
| 2413 | } |
| 2414 | |
| 2415 | |
| 2416 | /* |
| 2417 | Undo the effect of a prior mark_start_commit(). |
| 2418 | |
| 2419 | This is only used for retrying a transaction in parallel replication, after |
| 2420 | we have encountered a deadlock or other temporary error. |
| 2421 | |
| 2422 | When we get such a deadlock, it means that the current group of transactions |
| 2423 | did not yet all start committing (else they would not have deadlocked). So |
| 2424 | we will not yet have woken up anything in the next group, our rgi->gco is |
| 2425 | still live, and we can simply decrement the counter (to be incremented again |
| 2426 | later, when the retry succeeds and reaches the commit step). |
| 2427 | */ |
| 2428 | void |
| 2429 | rpl_group_info::unmark_start_commit() |
| 2430 | { |
| 2431 | rpl_parallel_entry *e; |
| 2432 | |
| 2433 | if (!did_mark_start_commit) |
| 2434 | return; |
| 2435 | did_mark_start_commit= false; |
| 2436 | |
| 2437 | e= this->parallel_entry; |
| 2438 | mysql_mutex_lock(&e->LOCK_parallel_entry); |
| 2439 | --e->count_committing_event_groups; |
| 2440 | mysql_mutex_unlock(&e->LOCK_parallel_entry); |
| 2441 | } |
| 2442 | |
| 2443 | |
| 2444 | rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) |
| 2445 | : rpl_filter(filter) |
| 2446 | { |
| 2447 | cached_charset_invalidate(); |
| 2448 | } |
| 2449 | |
| 2450 | |
| 2451 | void rpl_sql_thread_info::cached_charset_invalidate() |
| 2452 | { |
| 2453 | DBUG_ENTER("rpl_group_info::cached_charset_invalidate" ); |
| 2454 | |
| 2455 | /* Full of zeroes means uninitialized. */ |
| 2456 | bzero(cached_charset, sizeof(cached_charset)); |
| 2457 | DBUG_VOID_RETURN; |
| 2458 | } |
| 2459 | |
| 2460 | |
| 2461 | bool rpl_sql_thread_info::cached_charset_compare(char *charset) const |
| 2462 | { |
| 2463 | DBUG_ENTER("rpl_group_info::cached_charset_compare" ); |
| 2464 | |
| 2465 | if (memcmp(cached_charset, charset, sizeof(cached_charset))) |
| 2466 | { |
| 2467 | memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); |
| 2468 | DBUG_RETURN(1); |
| 2469 | } |
| 2470 | DBUG_RETURN(0); |
| 2471 | } |
| 2472 | |
| 2473 | |
| 2474 | /** |
| 2475 | Store the file and position where the slave's SQL thread are in the |
| 2476 | relay log. |
| 2477 | |
| 2478 | Notes: |
| 2479 | |
| 2480 | - This function should be called either from the slave SQL thread, |
| 2481 | or when the slave thread is not running. (It reads the |
| 2482 | group_{relay|master}_log_{pos|name} and delay fields in the rli |
| 2483 | object. These may only be modified by the slave SQL thread or by |
| 2484 | a client thread when the slave SQL thread is not running.) |
| 2485 | |
| 2486 | - If there is an active transaction, then we do not update the |
| 2487 | position in the relay log. This is to ensure that we re-execute |
| 2488 | statements if we die in the middle of an transaction that was |
| 2489 | rolled back. |
| 2490 | |
| 2491 | - As a transaction never spans binary logs, we don't have to handle |
| 2492 | the case where we do a relay-log-rotation in the middle of the |
| 2493 | transaction. If transactions could span several binlogs, we would |
| 2494 | have to ensure that we do not delete the relay log file where the |
| 2495 | transaction started before switching to a new relay log file. |
| 2496 | |
| 2497 | - Error can happen if writing to file fails or if flushing the file |
| 2498 | fails. |
| 2499 | |
| 2500 | @param rli The object representing the Relay_log_info. |
| 2501 | |
| 2502 | @todo Change the log file information to a binary format to avoid |
| 2503 | calling longlong2str. |
| 2504 | |
| 2505 | @return 0 on success, 1 on error. |
| 2506 | */ |
| 2507 | bool Relay_log_info::flush() |
| 2508 | { |
| 2509 | bool error=0; |
| 2510 | |
| 2511 | DBUG_ENTER("Relay_log_info::flush()" ); |
| 2512 | |
| 2513 | IO_CACHE *file = &info_file; |
| 2514 | // 2*file name, 2*long long, 2*unsigned long, 6*'\n' |
| 2515 | char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos; |
| 2516 | my_b_seek(file, 0L); |
| 2517 | pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10); |
| 2518 | *pos++='\n'; |
| 2519 | pos=strmov(pos, group_relay_log_name); |
| 2520 | *pos++='\n'; |
| 2521 | pos=longlong10_to_str(group_relay_log_pos, pos, 10); |
| 2522 | *pos++='\n'; |
| 2523 | pos=strmov(pos, group_master_log_name); |
| 2524 | *pos++='\n'; |
| 2525 | pos=longlong10_to_str(group_master_log_pos, pos, 10); |
| 2526 | *pos++='\n'; |
| 2527 | pos= longlong10_to_str(sql_delay, pos, 10); |
| 2528 | *pos++= '\n'; |
| 2529 | if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff))) |
| 2530 | error=1; |
| 2531 | if (flush_io_cache(file)) |
| 2532 | error=1; |
| 2533 | if (sync_relayloginfo_period && |
| 2534 | !error && |
| 2535 | ++sync_counter >= sync_relayloginfo_period) |
| 2536 | { |
| 2537 | if (my_sync(info_fd, MYF(MY_WME))) |
| 2538 | error=1; |
| 2539 | sync_counter= 0; |
| 2540 | } |
| 2541 | /* |
| 2542 | Flushing the relay log is done by the slave I/O thread |
| 2543 | or by the user on STOP SLAVE. |
| 2544 | */ |
| 2545 | DBUG_RETURN(error); |
| 2546 | } |
| 2547 | |
| 2548 | #endif |
| 2549 | |