1 | /* Copyright (c) 2006, 2017, Oracle and/or its affiliates. |
2 | Copyright (c) 2010, 2017, MariaDB Corporation |
3 | |
4 | This program is free software; you can redistribute it and/or modify |
5 | it under the terms of the GNU General Public License as published by |
6 | the Free Software Foundation; version 2 of the License. |
7 | |
8 | This program is distributed in the hope that it will be useful, |
9 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | GNU General Public License for more details. |
12 | |
13 | You should have received a copy of the GNU General Public License |
14 | along with this program; if not, write to the Free Software Foundation, |
15 | 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ |
16 | |
17 | #include "mariadb.h" |
18 | #include "sql_priv.h" |
19 | #include "unireg.h" // HAVE_* |
20 | #include "rpl_mi.h" |
21 | #include "rpl_rli.h" |
22 | #include "sql_base.h" // close_thread_tables |
23 | #include <my_dir.h> // For MY_STAT |
24 | #include "sql_repl.h" // For check_binlog_magic |
25 | #include "log_event.h" // Format_description_log_event, Log_event, |
26 | // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT, |
27 | // PREFIX_SQL_LOAD |
28 | #include "rpl_utility.h" |
29 | #include "transaction.h" |
30 | #include "sql_parse.h" // end_trans, ROLLBACK |
31 | #include "slave.h" |
32 | #include <mysql/plugin.h> |
33 | #include <mysql/service_thd_wait.h> |
34 | #include "lock.h" |
35 | #include "sql_table.h" |
36 | |
37 | static int count_relay_log_space(Relay_log_info* rli); |
38 | |
39 | /** |
40 | Current replication state (hash of last GTID executed, per replication |
41 | domain). |
42 | */ |
43 | rpl_slave_state *rpl_global_gtid_slave_state; |
44 | /* Object used for MASTER_GTID_WAIT(). */ |
45 | gtid_waiting rpl_global_gtid_waiting; |
46 | |
47 | const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event" ; |
48 | |
49 | Relay_log_info::Relay_log_info(bool is_slave_recovery) |
50 | :Slave_reporting_capability("SQL" ), |
51 | replicate_same_server_id(::replicate_same_server_id), |
52 | info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), |
53 | sync_counter(0), is_relay_log_recovery(is_slave_recovery), |
54 | save_temporary_tables(0), |
55 | mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0), |
56 | cur_log_old_open_count(0), error_on_rli_init_info(false), |
57 | group_relay_log_pos(0), event_relay_log_pos(0), |
58 | group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), |
59 | last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), |
60 | abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), |
61 | gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), |
62 | slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), |
63 | until_log_pos(0), retried_trans(0), executed_entries(0), |
64 | sql_delay(0), sql_delay_end(0), |
65 | m_flags(0) |
66 | { |
67 | DBUG_ENTER("Relay_log_info::Relay_log_info" ); |
68 | |
69 | relay_log.is_relay_log= TRUE; |
70 | relay_log_state.init(); |
71 | #ifdef HAVE_PSI_INTERFACE |
72 | relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, |
73 | key_RELAYLOG_COND_relay_log_updated, |
74 | key_RELAYLOG_COND_bin_log_updated, |
75 | key_file_relaylog, |
76 | key_file_relaylog_index, |
77 | key_RELAYLOG_COND_queue_busy, |
78 | key_LOCK_relaylog_end_pos); |
79 | #endif |
80 | |
81 | group_relay_log_name[0]= event_relay_log_name[0]= |
82 | group_master_log_name[0]= 0; |
83 | until_log_name[0]= ign_master_log_name_end[0]= 0; |
84 | max_relay_log_size= global_system_variables.max_relay_log_size; |
85 | bzero((char*) &info_file, sizeof(info_file)); |
86 | bzero((char*) &cache_buf, sizeof(cache_buf)); |
87 | mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); |
88 | mysql_mutex_init(key_relay_log_info_data_lock, |
89 | &data_lock, MY_MUTEX_INIT_FAST); |
90 | mysql_mutex_init(key_relay_log_info_log_space_lock, |
91 | &log_space_lock, MY_MUTEX_INIT_FAST); |
92 | mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); |
93 | mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); |
94 | mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); |
95 | mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); |
96 | relay_log.init_pthread_objects(); |
97 | DBUG_VOID_RETURN; |
98 | } |
99 | |
100 | |
101 | Relay_log_info::~Relay_log_info() |
102 | { |
103 | DBUG_ENTER("Relay_log_info::~Relay_log_info" ); |
104 | |
105 | reset_inuse_relaylog(); |
106 | mysql_mutex_destroy(&run_lock); |
107 | mysql_mutex_destroy(&data_lock); |
108 | mysql_mutex_destroy(&log_space_lock); |
109 | mysql_cond_destroy(&data_cond); |
110 | mysql_cond_destroy(&start_cond); |
111 | mysql_cond_destroy(&stop_cond); |
112 | mysql_cond_destroy(&log_space_cond); |
113 | relay_log.cleanup(); |
114 | DBUG_VOID_RETURN; |
115 | } |
116 | |
117 | |
118 | /** |
119 | Read the relay_log.info file. |
120 | |
121 | @param info_fname The name of the file to read from. |
122 | @retval 0 success |
123 | @retval 1 failure |
124 | */ |
125 | int Relay_log_info::init(const char* info_fname) |
126 | { |
127 | char fname[FN_REFLEN+128]; |
128 | const char* msg = 0; |
129 | int error = 0; |
130 | mysql_mutex_t *log_lock; |
131 | DBUG_ENTER("Relay_log_info::init" ); |
132 | |
133 | if (inited) // Set if this function called |
134 | DBUG_RETURN(0); |
135 | |
136 | log_lock= relay_log.get_log_lock(); |
137 | fn_format(fname, info_fname, mysql_data_home, "" , 4+32); |
138 | mysql_mutex_lock(&data_lock); |
139 | cur_log_fd = -1; |
140 | slave_skip_counter=0; |
141 | abort_pos_wait=0; |
142 | log_space_limit= relay_log_space_limit; |
143 | log_space_total= 0; |
144 | |
145 | if (unlikely(error_on_rli_init_info)) |
146 | goto err; |
147 | |
148 | char pattern[FN_REFLEN]; |
149 | (void) my_realpath(pattern, slave_load_tmpdir, 0); |
150 | if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "" , |
151 | MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) |
152 | { |
153 | mysql_mutex_unlock(&data_lock); |
154 | sql_print_error("Unable to use slave's temporary directory %s" , |
155 | slave_load_tmpdir); |
156 | DBUG_RETURN(1); |
157 | } |
158 | unpack_filename(slave_patternload_file, pattern); |
159 | slave_patternload_file_size= strlen(slave_patternload_file); |
160 | |
161 | /* |
162 | The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. |
163 | Note that the I/O thread flushes it to disk after writing every |
164 | event, in flush_master_info(mi, 1, ?). |
165 | */ |
166 | |
167 | { |
168 | /* Reports an error and returns, if the --relay-log's path |
169 | is a directory.*/ |
170 | if (opt_relay_logname && |
171 | opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) |
172 | { |
173 | mysql_mutex_unlock(&data_lock); |
174 | sql_print_error("Path '%s' is a directory name, please specify \ |
175 | a file name for --relay-log option" , opt_relay_logname); |
176 | DBUG_RETURN(1); |
177 | } |
178 | |
179 | /* Reports an error and returns, if the --relay-log-index's path |
180 | is a directory.*/ |
181 | if (opt_relaylog_index_name && |
182 | opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] |
183 | == FN_LIBCHAR) |
184 | { |
185 | mysql_mutex_unlock(&data_lock); |
186 | sql_print_error("Path '%s' is a directory name, please specify \ |
187 | a file name for --relay-log-index option" , opt_relaylog_index_name); |
188 | DBUG_RETURN(1); |
189 | } |
190 | |
191 | char buf[FN_REFLEN]; |
192 | const char *ln; |
193 | static bool name_warning_sent= 0; |
194 | ln= relay_log.generate_name(opt_relay_logname, "-relay-bin" , |
195 | 1, buf); |
196 | /* We send the warning only at startup, not after every RESET SLAVE */ |
197 | if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent && |
198 | !opt_bootstrap) |
199 | { |
200 | /* |
201 | User didn't give us info to name the relay log index file. |
202 | Picking `hostname`-relay-bin.index like we do, causes replication to |
203 | fail if this slave's hostname is changed later. So, we would like to |
204 | instead require a name. But as we don't want to break many existing |
205 | setups, we only give warning, not error. |
206 | */ |
207 | sql_print_warning("Neither --relay-log nor --relay-log-index were used;" |
208 | " so replication " |
209 | "may break when this MySQL server acts as a " |
210 | "slave and has his hostname changed!! Please " |
211 | "use '--log-basename=#' or '--relay-log=%s' to avoid " |
212 | "this problem." , ln); |
213 | name_warning_sent= 1; |
214 | } |
215 | |
216 | /* For multimaster, add connection name to relay log filenames */ |
217 | char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; |
218 | char *buf_relaylog_index_name= opt_relaylog_index_name; |
219 | |
220 | create_logfile_name_with_suffix(buf_relay_logname, |
221 | sizeof(buf_relay_logname), |
222 | ln, 1, &mi->cmp_connection_name); |
223 | ln= buf_relay_logname; |
224 | |
225 | if (opt_relaylog_index_name) |
226 | { |
227 | buf_relaylog_index_name= buf_relaylog_index_name_buff; |
228 | create_logfile_name_with_suffix(buf_relaylog_index_name_buff, |
229 | sizeof(buf_relaylog_index_name_buff), |
230 | opt_relaylog_index_name, 0, |
231 | &mi->cmp_connection_name); |
232 | } |
233 | |
234 | /* |
235 | note, that if open() fails, we'll still have index file open |
236 | but a destructor will take care of that |
237 | */ |
238 | mysql_mutex_lock(log_lock); |
239 | if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || |
240 | relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, |
241 | (ulong)max_relay_log_size, 1, TRUE)) |
242 | { |
243 | mysql_mutex_unlock(log_lock); |
244 | mysql_mutex_unlock(&data_lock); |
245 | sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M" , ln, my_errno); |
246 | DBUG_RETURN(1); |
247 | } |
248 | mysql_mutex_unlock(log_lock); |
249 | } |
250 | |
251 | /* if file does not exist */ |
252 | if (access(fname,F_OK)) |
253 | { |
254 | /* |
255 | If someone removed the file from underneath our feet, just close |
256 | the old descriptor and re-create the old file |
257 | */ |
258 | if (info_fd >= 0) |
259 | mysql_file_close(info_fd, MYF(MY_WME)); |
260 | if ((info_fd= mysql_file_open(key_file_relay_log_info, |
261 | fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0) |
262 | { |
263 | sql_print_error("Failed to create a new relay log info file (" |
264 | "file '%s', errno %d)" , fname, my_errno); |
265 | msg= current_thd->get_stmt_da()->message(); |
266 | goto err; |
267 | } |
268 | if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, |
269 | MYF(MY_WME))) |
270 | { |
271 | sql_print_error("Failed to create a cache on relay log info file '%s'" , |
272 | fname); |
273 | msg= current_thd->get_stmt_da()->message(); |
274 | goto err; |
275 | } |
276 | |
277 | /* Init relay log with first entry in the relay index file */ |
278 | if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, |
279 | &msg, 0)) |
280 | { |
281 | sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)" ); |
282 | goto err; |
283 | } |
284 | group_master_log_name[0]= 0; |
285 | group_master_log_pos= 0; |
286 | } |
287 | else // file exists |
288 | { |
289 | if (info_fd >= 0) |
290 | reinit_io_cache(&info_file, READ_CACHE, 0L,0,0); |
291 | else |
292 | { |
293 | int error=0; |
294 | if ((info_fd= mysql_file_open(key_file_relay_log_info, |
295 | fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0) |
296 | { |
297 | sql_print_error("\ |
298 | Failed to open the existing relay log info file '%s' (errno %d)" , |
299 | fname, my_errno); |
300 | error= 1; |
301 | } |
302 | else if (init_io_cache(&info_file, info_fd, |
303 | IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) |
304 | { |
305 | sql_print_error("Failed to create a cache on relay log info file '%s'" , |
306 | fname); |
307 | error= 1; |
308 | } |
309 | if (unlikely(error)) |
310 | { |
311 | if (info_fd >= 0) |
312 | mysql_file_close(info_fd, MYF(0)); |
313 | info_fd= -1; |
314 | mysql_mutex_lock(log_lock); |
315 | relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
316 | mysql_mutex_unlock(log_lock); |
317 | mysql_mutex_unlock(&data_lock); |
318 | DBUG_RETURN(1); |
319 | } |
320 | } |
321 | |
322 | int relay_log_pos, master_log_pos, lines; |
323 | char *first_non_digit; |
324 | |
325 | /* |
326 | Starting from MySQL 5.6.x, relay-log.info has a new format. |
327 | Now, its first line contains the number of lines in the file. |
328 | By reading this number we can determine which version our master.info |
329 | comes from. We can't simply count the lines in the file, since |
330 | versions before 5.6.x could generate files with more lines than |
331 | needed. If first line doesn't contain a number, or if it |
332 | contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY, |
333 | then the file is treated like a file from pre-5.6.x version. |
334 | There is no ambiguity when reading an old master.info: before |
335 | 5.6.x, the first line contained the binlog's name, which is |
336 | either empty or has an extension (contains a '.'), so can't be |
337 | confused with an integer. |
338 | |
339 | So we're just reading first line and trying to figure which |
340 | version is this. |
341 | */ |
342 | |
343 | /* |
344 | The first row is temporarily stored in mi->master_log_name, if |
345 | it is line count and not binlog name (new format) it will be |
346 | overwritten by the second row later. |
347 | */ |
348 | if (init_strvar_from_file(group_relay_log_name, |
349 | sizeof(group_relay_log_name), |
350 | &info_file, "" )) |
351 | { |
352 | msg="Error reading slave log configuration" ; |
353 | goto err; |
354 | } |
355 | |
356 | lines= strtoul(group_relay_log_name, &first_non_digit, 10); |
357 | |
358 | if (group_relay_log_name[0] != '\0' && |
359 | *first_non_digit == '\0' && |
360 | lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) |
361 | { |
362 | DBUG_PRINT("info" , ("relay_log_info file is in new format." )); |
363 | /* Seems to be new format => read relay log name from next line */ |
364 | if (init_strvar_from_file(group_relay_log_name, |
365 | sizeof(group_relay_log_name), |
366 | &info_file, "" )) |
367 | { |
368 | msg="Error reading slave log configuration" ; |
369 | goto err; |
370 | } |
371 | } |
372 | else |
373 | DBUG_PRINT("info" , ("relay_log_info file is in old format." )); |
374 | |
375 | if (init_intvar_from_file(&relay_log_pos, |
376 | &info_file, BIN_LOG_HEADER_SIZE) || |
377 | init_strvar_from_file(group_master_log_name, |
378 | sizeof(group_master_log_name), |
379 | &info_file, "" ) || |
380 | init_intvar_from_file(&master_log_pos, &info_file, 0) || |
381 | (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && |
382 | init_intvar_from_file(&sql_delay, &info_file, 0))) |
383 | { |
384 | msg="Error reading slave log configuration" ; |
385 | goto err; |
386 | } |
387 | |
388 | strmake_buf(event_relay_log_name,group_relay_log_name); |
389 | group_relay_log_pos= event_relay_log_pos= relay_log_pos; |
390 | group_master_log_pos= master_log_pos; |
391 | |
392 | if (is_relay_log_recovery && init_recovery(mi, &msg)) |
393 | goto err; |
394 | |
395 | relay_log_state.load(rpl_global_gtid_slave_state); |
396 | if (init_relay_log_pos(this, |
397 | group_relay_log_name, |
398 | group_relay_log_pos, |
399 | 0 /* no data lock*/, |
400 | &msg, 0)) |
401 | { |
402 | sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)" , |
403 | group_relay_log_name, group_relay_log_pos); |
404 | goto err; |
405 | } |
406 | } |
407 | |
408 | DBUG_PRINT("info" , ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu" , |
409 | my_b_tell(cur_log), event_relay_log_pos)); |
410 | DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE); |
411 | DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos); |
412 | |
413 | /* |
414 | Now change the cache from READ to WRITE - must do this |
415 | before Relay_log_info::flush() |
416 | */ |
417 | reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1); |
418 | if (unlikely((error= flush()))) |
419 | { |
420 | msg= "Failed to flush relay log info file" ; |
421 | goto err; |
422 | } |
423 | if (count_relay_log_space(this)) |
424 | { |
425 | msg="Error counting relay log space" ; |
426 | goto err; |
427 | } |
428 | inited= 1; |
429 | error_on_rli_init_info= false; |
430 | mysql_mutex_unlock(&data_lock); |
431 | DBUG_RETURN(0); |
432 | |
433 | err: |
434 | error_on_rli_init_info= true; |
435 | if (msg) |
436 | sql_print_error("%s" , msg); |
437 | end_io_cache(&info_file); |
438 | if (info_fd >= 0) |
439 | mysql_file_close(info_fd, MYF(0)); |
440 | info_fd= -1; |
441 | mysql_mutex_lock(log_lock); |
442 | relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
443 | mysql_mutex_unlock(log_lock); |
444 | mysql_mutex_unlock(&data_lock); |
445 | DBUG_RETURN(1); |
446 | } |
447 | |
448 | |
449 | static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo) |
450 | { |
451 | MY_STAT s; |
452 | DBUG_ENTER("add_relay_log" ); |
453 | if (!mysql_file_stat(key_file_relaylog, |
454 | linfo->log_file_name, &s, MYF(0))) |
455 | { |
456 | sql_print_error("log %s listed in the index, but failed to stat" , |
457 | linfo->log_file_name); |
458 | DBUG_RETURN(1); |
459 | } |
460 | rli->log_space_total += s.st_size; |
461 | DBUG_PRINT("info" ,("log_space_total: %llu" , rli->log_space_total)); |
462 | DBUG_RETURN(0); |
463 | } |
464 | |
465 | |
466 | static int count_relay_log_space(Relay_log_info* rli) |
467 | { |
468 | LOG_INFO linfo; |
469 | DBUG_ENTER("count_relay_log_space" ); |
470 | rli->log_space_total= 0; |
471 | if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) |
472 | { |
473 | sql_print_error("Could not find first log while counting relay log space" ); |
474 | DBUG_RETURN(1); |
475 | } |
476 | do |
477 | { |
478 | if (add_relay_log(rli,&linfo)) |
479 | DBUG_RETURN(1); |
480 | } while (!rli->relay_log.find_next_log(&linfo, 1)); |
481 | /* |
482 | As we have counted everything, including what may have written in a |
483 | preceding write, we must reset bytes_written, or we may count some space |
484 | twice. |
485 | */ |
486 | rli->relay_log.reset_bytes_written(); |
487 | DBUG_RETURN(0); |
488 | } |
489 | |
490 | |
491 | /* |
492 | Reset UNTIL condition for Relay_log_info |
493 | |
494 | SYNOPSYS |
495 | clear_until_condition() |
496 | rli - Relay_log_info structure where UNTIL condition should be reset |
497 | */ |
498 | |
499 | void Relay_log_info::clear_until_condition() |
500 | { |
501 | DBUG_ENTER("clear_until_condition" ); |
502 | |
503 | until_condition= Relay_log_info::UNTIL_NONE; |
504 | until_log_name[0]= 0; |
505 | until_log_pos= 0; |
506 | DBUG_VOID_RETURN; |
507 | } |
508 | |
509 | |
510 | /* |
511 | Read the correct format description event for starting to replicate from |
512 | a given position in a relay log file. |
513 | */ |
514 | Format_description_log_event * |
515 | read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, |
516 | const char **errmsg) |
517 | { |
518 | Log_event *ev; |
519 | Format_description_log_event *fdev; |
520 | bool found= false; |
521 | |
522 | /* |
523 | By default the relay log is in binlog format 3 (4.0). |
524 | Even if format is 4, this will work enough to read the first event |
525 | (Format_desc) (remember that format 4 is just lenghtened compared to format |
526 | 3; format 3 is a prefix of format 4). |
527 | */ |
528 | fdev= new Format_description_log_event(3); |
529 | |
530 | while (!found) |
531 | { |
532 | Log_event_type typ; |
533 | |
534 | /* |
535 | Read the possible Format_description_log_event; if position |
536 | was 4, no need, it will be read naturally. |
537 | */ |
538 | DBUG_PRINT("info" ,("looking for a Format_description_log_event" )); |
539 | |
540 | if (my_b_tell(cur_log) >= start_pos) |
541 | break; |
542 | |
543 | if (!(ev= Log_event::read_log_event(cur_log, fdev, |
544 | opt_slave_sql_verify_checksum))) |
545 | { |
546 | DBUG_PRINT("info" ,("could not read event, cur_log->error=%d" , |
547 | cur_log->error)); |
548 | if (cur_log->error) /* not EOF */ |
549 | { |
550 | *errmsg= "I/O error reading event at position 4" ; |
551 | delete fdev; |
552 | return NULL; |
553 | } |
554 | break; |
555 | } |
556 | typ= ev->get_type_code(); |
557 | if (typ == FORMAT_DESCRIPTION_EVENT) |
558 | { |
559 | Format_description_log_event *old= fdev; |
560 | DBUG_PRINT("info" ,("found Format_description_log_event" )); |
561 | fdev= (Format_description_log_event*) ev; |
562 | fdev->copy_crypto_data(old); |
563 | delete old; |
564 | |
565 | /* |
566 | As ev was returned by read_log_event, it has passed is_valid(), so |
567 | my_malloc() in ctor worked, no need to check again. |
568 | */ |
569 | /* |
570 | Ok, we found a Format_description event. But it is not sure that this |
571 | describes the whole relay log; indeed, one can have this sequence |
572 | (starting from position 4): |
573 | Format_desc (of slave) |
574 | Rotate (of master) |
575 | Format_desc (of master) |
576 | So the Format_desc which really describes the rest of the relay log |
577 | is the 3rd event (it can't be further than that, because we rotate |
578 | the relay log when we queue a Rotate event from the master). |
579 | But what describes the Rotate is the first Format_desc. |
580 | So what we do is: |
581 | go on searching for Format_description events, until you exceed the |
582 | position (argument 'pos') or until you find another event than Rotate |
583 | or Format_desc. |
584 | */ |
585 | } |
586 | else if (typ == START_ENCRYPTION_EVENT) |
587 | { |
588 | if (fdev->start_decryption((Start_encryption_log_event*) ev)) |
589 | { |
590 | *errmsg= "Unable to set up decryption of binlog." ; |
591 | delete ev; |
592 | delete fdev; |
593 | return NULL; |
594 | } |
595 | delete ev; |
596 | } |
597 | else |
598 | { |
599 | DBUG_PRINT("info" ,("found event of another type=%d" , typ)); |
600 | found= (typ != ROTATE_EVENT); |
601 | delete ev; |
602 | } |
603 | } |
604 | return fdev; |
605 | } |
606 | |
607 | |
608 | /* |
609 | Open the given relay log |
610 | |
611 | SYNOPSIS |
612 | init_relay_log_pos() |
613 | rli Relay information (will be initialized) |
614 | log Name of relay log file to read from. NULL = First log |
615 | pos Position in relay log file |
616 | need_data_lock Set to 1 if this functions should do mutex locks |
617 | errmsg Store pointer to error message here |
618 | look_for_description_event |
619 | 1 if we should look for such an event. We only need |
620 | this when the SQL thread starts and opens an existing |
621 | relay log and has to execute it (possibly from an |
622 | offset >4); then we need to read the first event of |
623 | the relay log to be able to parse the events we have |
624 | to execute. |
625 | |
626 | DESCRIPTION |
627 | - Close old open relay log files. |
628 | - If we are using the same relay log as the running IO-thread, then set |
629 | rli->cur_log to point to the same IO_CACHE entry. |
630 | - If not, open the 'log' binary file. |
631 | |
632 | TODO |
633 | - check proper initialization of group_master_log_name/group_master_log_pos |
634 | |
635 | RETURN VALUES |
636 | 0 ok |
637 | 1 error. errmsg is set to point to the error message |
638 | */ |
639 | |
640 | int init_relay_log_pos(Relay_log_info* rli,const char* log, |
641 | ulonglong pos, bool need_data_lock, |
642 | const char** errmsg, |
643 | bool look_for_description_event) |
644 | { |
645 | DBUG_ENTER("init_relay_log_pos" ); |
646 | DBUG_PRINT("info" , ("pos: %lu" , (ulong) pos)); |
647 | |
648 | *errmsg=0; |
649 | mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); |
650 | |
651 | if (need_data_lock) |
652 | mysql_mutex_lock(&rli->data_lock); |
653 | |
654 | /* |
655 | Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER |
656 | is, too, and init_slave() too; these 2 functions allocate a description |
657 | event in init_relay_log_pos, which is not freed by the terminating SQL slave |
658 | thread as that thread is not started by these functions. So we have to free |
659 | the description_event here, in case, so that there is no memory leak in |
660 | running, say, CHANGE MASTER. |
661 | */ |
662 | delete rli->relay_log.description_event_for_exec; |
663 | /* |
664 | By default the relay log is in binlog format 3 (4.0). |
665 | Even if format is 4, this will work enough to read the first event |
666 | (Format_desc) (remember that format 4 is just lenghtened compared to format |
667 | 3; format 3 is a prefix of format 4). |
668 | */ |
669 | rli->relay_log.description_event_for_exec= new |
670 | Format_description_log_event(3); |
671 | |
672 | mysql_mutex_lock(log_lock); |
673 | |
674 | /* Close log file and free buffers if it's already open */ |
675 | if (rli->cur_log_fd >= 0) |
676 | { |
677 | end_io_cache(&rli->cache_buf); |
678 | mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); |
679 | rli->cur_log_fd = -1; |
680 | } |
681 | |
682 | rli->group_relay_log_pos = rli->event_relay_log_pos = pos; |
683 | rli->clear_flag(Relay_log_info::IN_STMT); |
684 | rli->clear_flag(Relay_log_info::IN_TRANSACTION); |
685 | |
686 | /* |
687 | Test to see if the previous run was with the skip of purging |
688 | If yes, we do not purge when we restart |
689 | */ |
690 | if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1)) |
691 | { |
692 | *errmsg="Could not find first log during relay log initialization" ; |
693 | goto err; |
694 | } |
695 | |
696 | if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1)) |
697 | { |
698 | *errmsg="Could not find target log during relay log initialization" ; |
699 | goto err; |
700 | } |
701 | strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name); |
702 | strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); |
703 | if (rli->relay_log.is_active(rli->linfo.log_file_name)) |
704 | { |
705 | /* |
706 | The IO thread is using this log file. |
707 | In this case, we will use the same IO_CACHE pointer to |
708 | read data as the IO thread is using to write data. |
709 | */ |
710 | my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0); |
711 | if (check_binlog_magic(rli->cur_log,errmsg)) |
712 | goto err; |
713 | rli->cur_log_old_open_count=rli->relay_log.get_open_count(); |
714 | } |
715 | else |
716 | { |
717 | /* |
718 | Open the relay log and set rli->cur_log to point at this one |
719 | */ |
720 | if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, |
721 | rli->linfo.log_file_name,errmsg)) < 0) |
722 | goto err; |
723 | rli->cur_log = &rli->cache_buf; |
724 | } |
725 | /* |
726 | In all cases, check_binlog_magic() has been called so we're at offset 4 for |
727 | sure. |
728 | */ |
729 | if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ |
730 | { |
731 | if (look_for_description_event) |
732 | { |
733 | Format_description_log_event *fdev; |
734 | if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) |
735 | goto err; |
736 | delete rli->relay_log.description_event_for_exec; |
737 | rli->relay_log.description_event_for_exec= fdev; |
738 | } |
739 | my_b_seek(rli->cur_log,(off_t)pos); |
740 | DBUG_PRINT("info" , ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu" , |
741 | my_b_tell(rli->cur_log), rli->event_relay_log_pos)); |
742 | |
743 | } |
744 | |
745 | err: |
746 | /* |
747 | If we don't purge, we can't honour relay_log_space_limit ; |
748 | silently discard it |
749 | */ |
750 | if (!relay_log_purge) |
751 | rli->log_space_limit= 0; |
752 | mysql_cond_broadcast(&rli->data_cond); |
753 | |
754 | mysql_mutex_unlock(log_lock); |
755 | |
756 | if (need_data_lock) |
757 | mysql_mutex_unlock(&rli->data_lock); |
758 | if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg) |
759 | *errmsg= "Invalid Format_description log event; could be out of memory" ; |
760 | |
761 | DBUG_PRINT("info" , ("Returning %d from init_relay_log_pos" , (*errmsg)?1:0)); |
762 | |
763 | DBUG_RETURN ((*errmsg) ? 1 : 0); |
764 | } |
765 | |
766 | |
767 | /* |
768 | Waits until the SQL thread reaches (has executed up to) the |
769 | log/position or timed out. |
770 | |
771 | SYNOPSIS |
772 | wait_for_pos() |
773 | thd client thread that sent SELECT MASTER_POS_WAIT |
774 | log_name log name to wait for |
775 | log_pos position to wait for |
776 | timeout timeout in seconds before giving up waiting |
777 | |
778 | NOTES |
779 | timeout is longlong whereas it should be ulong ; but this is |
780 | to catch if the user submitted a negative timeout. |
781 | |
782 | RETURN VALUES |
783 | -2 improper arguments (log_pos<0) |
784 | or slave not running, or master info changed |
785 | during the function's execution, |
786 | or client thread killed. -2 is translated to NULL by caller |
787 | -1 timed out |
788 | >=0 number of log events the function had to wait |
789 | before reaching the desired log/position |
790 | */ |
791 | |
792 | int Relay_log_info::wait_for_pos(THD* thd, String* log_name, |
793 | longlong log_pos, |
794 | longlong timeout) |
795 | { |
796 | int event_count = 0; |
797 | ulong init_abort_pos_wait; |
798 | int error=0; |
799 | struct timespec abstime; // for timeout checking |
800 | PSI_stage_info old_stage; |
801 | DBUG_ENTER("Relay_log_info::wait_for_pos" ); |
802 | |
803 | if (!inited) |
804 | DBUG_RETURN(-2); |
805 | |
806 | DBUG_PRINT("enter" ,("log_name: '%s' log_pos: %lu timeout: %lu" , |
807 | log_name->c_ptr(), (ulong) log_pos, (ulong) timeout)); |
808 | |
809 | set_timespec(abstime,timeout); |
810 | mysql_mutex_lock(&data_lock); |
811 | thd->ENTER_COND(&data_cond, &data_lock, |
812 | &stage_waiting_for_the_slave_thread_to_advance_position, |
813 | &old_stage); |
814 | /* |
815 | This function will abort when it notices that some CHANGE MASTER or |
816 | RESET MASTER has changed the master info. |
817 | To catch this, these commands modify abort_pos_wait ; We just monitor |
818 | abort_pos_wait and see if it has changed. |
819 | Why do we have this mechanism instead of simply monitoring slave_running |
820 | in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that |
821 | the SQL thread be stopped? |
822 | This is becasue if someones does: |
823 | STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE; |
824 | the change may happen very quickly and we may not notice that |
825 | slave_running briefly switches between 1/0/1. |
826 | */ |
827 | init_abort_pos_wait= abort_pos_wait; |
828 | |
829 | /* |
830 | We'll need to |
831 | handle all possible log names comparisons (e.g. 999 vs 1000). |
832 | We use ulong for string->number conversion ; this is no |
833 | stronger limitation than in find_uniq_filename in sql/log.cc |
834 | */ |
835 | ulong log_name_extension; |
836 | char log_name_tmp[FN_REFLEN]; //make a char[] from String |
837 | |
838 | strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1)); |
839 | |
840 | char *p= fn_ext(log_name_tmp); |
841 | char *p_end; |
842 | if (!*p || log_pos<0) |
843 | { |
844 | error= -2; //means improper arguments |
845 | goto err; |
846 | } |
847 | // Convert 0-3 to 4 |
848 | log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE); |
849 | /* p points to '.' */ |
850 | log_name_extension= strtoul(++p, &p_end, 10); |
851 | /* |
852 | p_end points to the first invalid character. |
853 | If it equals to p, no digits were found, error. |
854 | If it contains '\0' it means conversion went ok. |
855 | */ |
856 | if (p_end==p || *p_end) |
857 | { |
858 | error= -2; |
859 | goto err; |
860 | } |
861 | |
862 | /* The "compare and wait" main loop */ |
863 | while (!thd->killed && |
864 | init_abort_pos_wait == abort_pos_wait && |
865 | slave_running) |
866 | { |
867 | bool pos_reached; |
868 | int cmp_result= 0; |
869 | |
870 | DBUG_PRINT("info" , |
871 | ("init_abort_pos_wait: %ld abort_pos_wait: %ld" , |
872 | init_abort_pos_wait, abort_pos_wait)); |
873 | DBUG_PRINT("info" ,("group_master_log_name: '%s' pos: %lu" , |
874 | group_master_log_name, (ulong) group_master_log_pos)); |
875 | |
876 | /* |
877 | group_master_log_name can be "", if we are just after a fresh |
878 | replication start or after a CHANGE MASTER TO MASTER_HOST/PORT |
879 | (before we have executed one Rotate event from the master) or |
880 | (rare) if the user is doing a weird slave setup (see next |
881 | paragraph). If group_master_log_name is "", we assume we don't |
882 | have enough info to do the comparison yet, so we just wait until |
883 | more data. In this case master_log_pos is always 0 except if |
884 | somebody (wrongly) sets this slave to be a slave of itself |
885 | without using --replicate-same-server-id (an unsupported |
886 | configuration which does nothing), then group_master_log_pos |
887 | will grow and group_master_log_name will stay "". |
888 | */ |
889 | if (*group_master_log_name) |
890 | { |
891 | char *basename= (group_master_log_name + |
892 | dirname_length(group_master_log_name)); |
893 | /* |
894 | First compare the parts before the extension. |
895 | Find the dot in the master's log basename, |
896 | and protect against user's input error : |
897 | if the names do not match up to '.' included, return error |
898 | */ |
899 | char *q= (char*)(fn_ext(basename)+1); |
900 | if (strncmp(basename, log_name_tmp, (int)(q-basename))) |
901 | { |
902 | error= -2; |
903 | break; |
904 | } |
905 | // Now compare extensions. |
906 | char *q_end; |
907 | ulong group_master_log_name_extension= strtoul(q, &q_end, 10); |
908 | if (group_master_log_name_extension < log_name_extension) |
909 | cmp_result= -1 ; |
910 | else |
911 | cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ; |
912 | |
913 | pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) || |
914 | cmp_result > 0); |
915 | if (pos_reached || thd->killed) |
916 | break; |
917 | } |
918 | |
919 | //wait for master update, with optional timeout. |
920 | |
921 | DBUG_PRINT("info" ,("Waiting for master update" )); |
922 | /* |
923 | We are going to mysql_cond_(timed)wait(); if the SQL thread stops it |
924 | will wake us up. |
925 | */ |
926 | thd_wait_begin(thd, THD_WAIT_BINLOG); |
927 | if (timeout > 0) |
928 | { |
929 | /* |
930 | Note that mysql_cond_timedwait checks for the timeout |
931 | before for the condition ; i.e. it returns ETIMEDOUT |
932 | if the system time equals or exceeds the time specified by abstime |
933 | before the condition variable is signaled or broadcast, _or_ if |
934 | the absolute time specified by abstime has already passed at the time |
935 | of the call. |
936 | For that reason, mysql_cond_timedwait will do the "timeoutting" job |
937 | even if its condition is always immediately signaled (case of a loaded |
938 | master). |
939 | */ |
940 | error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime); |
941 | } |
942 | else |
943 | mysql_cond_wait(&data_cond, &data_lock); |
944 | thd_wait_end(thd); |
945 | DBUG_PRINT("info" ,("Got signal of master update or timed out" )); |
946 | if (error == ETIMEDOUT || error == ETIME) |
947 | { |
948 | error= -1; |
949 | break; |
950 | } |
951 | error=0; |
952 | event_count++; |
953 | DBUG_PRINT("info" ,("Testing if killed or SQL thread not running" )); |
954 | } |
955 | |
956 | err: |
957 | thd->EXIT_COND(&old_stage); |
958 | DBUG_PRINT("exit" ,("killed: %d abort: %d slave_running: %d \ |
959 | improper_arguments: %d timed_out: %d" , |
960 | thd->killed_errno(), |
961 | (int) (init_abort_pos_wait != abort_pos_wait), |
962 | (int) slave_running, |
963 | (int) (error == -2), |
964 | (int) (error == -1))); |
965 | if (thd->killed || init_abort_pos_wait != abort_pos_wait || |
966 | !slave_running) |
967 | { |
968 | error= -2; |
969 | } |
970 | DBUG_RETURN( error ? error : event_count ); |
971 | } |
972 | |
973 | |
974 | void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, |
975 | rpl_group_info *rgi, |
976 | bool skip_lock) |
977 | { |
978 | DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos" ); |
979 | |
980 | if (skip_lock) |
981 | mysql_mutex_assert_owner(&data_lock); |
982 | else |
983 | mysql_mutex_lock(&data_lock); |
984 | |
985 | rgi->inc_event_relay_log_pos(); |
986 | DBUG_PRINT("info" , ("log_pos: %lu group_master_log_pos: %lu" , |
987 | (long) log_pos, (long) group_master_log_pos)); |
988 | if (rgi->is_parallel_exec) |
989 | { |
990 | /* In case of parallel replication, do not update the position backwards. */ |
991 | int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name); |
992 | if (cmp < 0) |
993 | { |
994 | group_relay_log_pos= rgi->future_event_relay_log_pos; |
995 | strmake_buf(group_relay_log_name, rgi->event_relay_log_name); |
996 | notify_group_relay_log_name_update(); |
997 | } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) |
998 | group_relay_log_pos= rgi->future_event_relay_log_pos; |
999 | |
1000 | /* |
1001 | In the parallel case we need to update the master_log_name here, rather |
1002 | than in Rotate_log_event::do_update_pos(). |
1003 | */ |
1004 | cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); |
1005 | if (cmp <= 0) |
1006 | { |
1007 | if (cmp < 0) |
1008 | { |
1009 | strcpy(group_master_log_name, rgi->future_event_master_log_name); |
1010 | group_master_log_pos= log_pos; |
1011 | } |
1012 | else if (group_master_log_pos < log_pos) |
1013 | group_master_log_pos= log_pos; |
1014 | } |
1015 | |
1016 | /* |
1017 | In the parallel case, we only update the Seconds_Behind_Master at the |
1018 | end of a transaction. In the non-parallel case, the value is updated as |
1019 | soon as an event is read from the relay log; however this would be too |
1020 | confusing for the user, seeing the slave reported as up-to-date when |
1021 | potentially thousands of events are still queued up for worker threads |
1022 | waiting for execution. |
1023 | */ |
1024 | if (rgi->last_master_timestamp && |
1025 | rgi->last_master_timestamp > last_master_timestamp) |
1026 | last_master_timestamp= rgi->last_master_timestamp; |
1027 | } |
1028 | else |
1029 | { |
1030 | /* Non-parallel case. */ |
1031 | group_relay_log_pos= event_relay_log_pos; |
1032 | strmake_buf(group_relay_log_name, event_relay_log_name); |
1033 | notify_group_relay_log_name_update(); |
1034 | if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event |
1035 | group_master_log_pos= log_pos; |
1036 | } |
1037 | |
1038 | /* |
1039 | If the slave does not support transactions and replicates a transaction, |
1040 | users should not trust group_master_log_pos (which they can display with |
1041 | SHOW SLAVE STATUS or read from relay-log.info), because to compute |
1042 | group_master_log_pos the slave relies on log_pos stored in the master's |
1043 | binlog, but if we are in a master's transaction these positions are always |
1044 | the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does |
1045 | not advance as it should on the non-transactional slave (it advances by |
1046 | big leaps, whereas it should advance by small leaps). |
1047 | */ |
1048 | /* |
1049 | In 4.x we used the event's len to compute the positions here. This is |
1050 | wrong if the event was 3.23/4.0 and has been converted to 5.0, because |
1051 | then the event's len is not what is was in the master's binlog, so this |
1052 | will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0 |
1053 | replication: Exec_master_log_pos is wrong). Only way to solve this is to |
1054 | have the original offset of the end of the event the relay log. This is |
1055 | what we do in 5.0: log_pos has become "end_log_pos" (because the real use |
1056 | of log_pos in 4.0 was to compute the end_log_pos; so better to store |
1057 | end_log_pos instead of begin_log_pos. |
1058 | If we had not done this fix here, the problem would also have appeared |
1059 | when the slave and master are 5.0 but with different event length (for |
1060 | example the slave is more recent than the master and features the event |
1061 | UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in |
1062 | SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this |
1063 | value which would lead to badly broken replication. |
1064 | Even the relay_log_pos will be corrupted in this case, because the len is |
1065 | the relay log is not "val". |
1066 | With the end_log_pos solution, we avoid computations involving lengthes. |
1067 | */ |
1068 | mysql_cond_broadcast(&data_cond); |
1069 | if (!skip_lock) |
1070 | mysql_mutex_unlock(&data_lock); |
1071 | DBUG_VOID_RETURN; |
1072 | } |
1073 | |
1074 | |
1075 | void Relay_log_info::close_temporary_tables() |
1076 | { |
1077 | DBUG_ENTER("Relay_log_info::close_temporary_tables" ); |
1078 | |
1079 | TMP_TABLE_SHARE *share; |
1080 | TABLE *table; |
1081 | |
1082 | if (!save_temporary_tables) |
1083 | { |
1084 | /* There are no temporary tables. */ |
1085 | DBUG_VOID_RETURN; |
1086 | } |
1087 | |
1088 | while ((share= save_temporary_tables->pop_front())) |
1089 | { |
1090 | /* |
1091 | Iterate over the list of tables for this TABLE_SHARE and close them. |
1092 | */ |
1093 | while ((table= share->all_tmp_tables.pop_front())) |
1094 | { |
1095 | DBUG_PRINT("tmptable" , ("closing table: '%s'.'%s'" , |
1096 | table->s->db.str, table->s->table_name.str)); |
1097 | |
1098 | /* Reset in_use as the table may have been created by another thd */ |
1099 | table->in_use= 0; |
1100 | /* |
1101 | Lets not free TABLE_SHARE here as there could be multiple TABLEs opened |
1102 | for the same table (TABLE_SHARE). |
1103 | */ |
1104 | closefrm(table); |
1105 | my_free(table); |
1106 | } |
1107 | |
1108 | /* |
1109 | Don't ask for disk deletion. For now, anyway they will be deleted when |
1110 | slave restarts, but it is a better intention to not delete them. |
1111 | */ |
1112 | |
1113 | free_table_share(share); |
1114 | my_free(share); |
1115 | } |
1116 | |
1117 | /* By now, there mustn't be any elements left in the list. */ |
1118 | DBUG_ASSERT(save_temporary_tables->is_empty()); |
1119 | |
1120 | my_free(save_temporary_tables); |
1121 | save_temporary_tables= NULL; |
1122 | slave_open_temp_tables= 0; |
1123 | |
1124 | DBUG_VOID_RETURN; |
1125 | } |
1126 | |
1127 | /* |
1128 | purge_relay_logs() |
1129 | |
1130 | @param rli Relay log information |
1131 | @param thd thread id. May be zero during startup |
1132 | |
1133 | NOTES |
1134 | Assumes to have a run lock on rli and that no slave thread are running. |
1135 | */ |
1136 | |
1137 | int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, |
1138 | const char** errmsg) |
1139 | { |
1140 | int error=0; |
1141 | const char *ln; |
1142 | char name_buf[FN_REFLEN]; |
1143 | DBUG_ENTER("purge_relay_logs" ); |
1144 | |
1145 | /* |
1146 | Even if rli->inited==0, we still try to empty rli->master_log_* variables. |
1147 | Indeed, rli->inited==0 does not imply that they already are empty. |
1148 | It could be that slave's info initialization partly succeeded : |
1149 | for example if relay-log.info existed but *relay-bin*.* |
1150 | have been manually removed, Relay_log_info::init() reads the old |
1151 | relay-log.info and fills rli->master_log_*, then Relay_log_info::init() |
1152 | checks for the existence of the relay log, this fails and |
1153 | Relay_log_info::init() leaves rli->inited to 0. |
1154 | In that pathological case, rli->master_log_pos* will be properly reinited |
1155 | at the next START SLAVE (as RESET SLAVE or CHANGE |
1156 | MASTER, the callers of purge_relay_logs, will delete bogus *.info files |
1157 | or replace them with correct files), however if the user does SHOW SLAVE |
1158 | STATUS before START SLAVE, he will see old, confusing rli->master_log_*. |
1159 | In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS |
1160 | to display fine in any case. |
1161 | */ |
1162 | |
1163 | rli->group_master_log_name[0]= 0; |
1164 | rli->group_master_log_pos= 0; |
1165 | |
1166 | if (!rli->inited) |
1167 | { |
1168 | DBUG_PRINT("info" , ("rli->inited == 0" )); |
1169 | if (rli->error_on_rli_init_info) |
1170 | { |
1171 | ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin" , |
1172 | 1, name_buf); |
1173 | |
1174 | if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE)) |
1175 | { |
1176 | sql_print_error("Unable to purge relay log files. Failed to open relay " |
1177 | "log index file:%s." , rli->relay_log.get_index_fname()); |
1178 | DBUG_RETURN(1); |
1179 | } |
1180 | mysql_mutex_lock(rli->relay_log.get_log_lock()); |
1181 | if (rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, |
1182 | (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size : |
1183 | max_binlog_size), 1, TRUE)) |
1184 | { |
1185 | sql_print_error("Unable to purge relay log files. Failed to open relay " |
1186 | "log file:%s." , rli->relay_log.get_log_fname()); |
1187 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
1188 | DBUG_RETURN(1); |
1189 | } |
1190 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
1191 | } |
1192 | else |
1193 | DBUG_RETURN(0); |
1194 | } |
1195 | else |
1196 | { |
1197 | DBUG_ASSERT(rli->slave_running == 0); |
1198 | DBUG_ASSERT(rli->mi->slave_running == 0); |
1199 | } |
1200 | mysql_mutex_lock(&rli->data_lock); |
1201 | |
1202 | /* |
1203 | we close the relay log fd possibly left open by the slave SQL thread, |
1204 | to be able to delete it; the relay log fd possibly left open by the slave |
1205 | I/O thread will be closed naturally in reset_logs() by the |
1206 | close(LOG_CLOSE_TO_BE_OPENED) call |
1207 | */ |
1208 | if (rli->cur_log_fd >= 0) |
1209 | { |
1210 | end_io_cache(&rli->cache_buf); |
1211 | mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); |
1212 | rli->cur_log_fd= -1; |
1213 | } |
1214 | |
1215 | if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0)) |
1216 | { |
1217 | *errmsg = "Failed during log reset" ; |
1218 | error=1; |
1219 | goto err; |
1220 | } |
1221 | rli->relay_log_state.load(rpl_global_gtid_slave_state); |
1222 | if (!just_reset) |
1223 | { |
1224 | /* Save name of used relay log file */ |
1225 | strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); |
1226 | strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); |
1227 | rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; |
1228 | rli->log_space_total= 0; |
1229 | |
1230 | if (count_relay_log_space(rli)) |
1231 | { |
1232 | *errmsg= "Error counting relay log space" ; |
1233 | error=1; |
1234 | goto err; |
1235 | } |
1236 | error= init_relay_log_pos(rli, rli->group_relay_log_name, |
1237 | rli->group_relay_log_pos, |
1238 | 0 /* do not need data lock */, errmsg, 0); |
1239 | } |
1240 | else |
1241 | { |
1242 | /* Ensure relay log names are not used */ |
1243 | rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0; |
1244 | } |
1245 | |
1246 | if (!rli->inited && rli->error_on_rli_init_info) |
1247 | { |
1248 | mysql_mutex_lock(rli->relay_log.get_log_lock()); |
1249 | rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
1250 | mysql_mutex_unlock(rli->relay_log.get_log_lock()); |
1251 | } |
1252 | err: |
1253 | DBUG_PRINT("info" ,("log_space_total: %llu" ,rli->log_space_total)); |
1254 | mysql_mutex_unlock(&rli->data_lock); |
1255 | DBUG_RETURN(error); |
1256 | } |
1257 | |
1258 | |
1259 | /* |
1260 | Check if condition stated in UNTIL clause of START SLAVE is reached. |
1261 | SYNOPSYS |
1262 | Relay_log_info::is_until_satisfied() |
1263 | master_beg_pos position of the beginning of to be executed event |
1264 | (not log_pos member of the event that points to the |
1265 | beginning of the following event) |
1266 | |
1267 | |
1268 | DESCRIPTION |
1269 | Checks if UNTIL condition is reached. Uses caching result of last |
1270 | comparison of current log file name and target log file name. So cached |
1271 | value should be invalidated if current log file name changes |
1272 | (see Relay_log_info::notify_... functions). |
1273 | |
1274 | This caching is needed to avoid of expensive string comparisons and |
1275 | strtol() conversions needed for log names comparison. We don't need to |
1276 | compare them each time this function is called, we only need to do this |
1277 | when current log name changes. If we have UNTIL_MASTER_POS condition we |
1278 | need to do this only after Rotate_log_event::do_apply_event() (which is |
1279 | rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS |
1280 | condition then we should invalidate cached comarison value after |
1281 | inc_group_relay_log_pos() which called for each group of events (so we |
1282 | have some benefit if we have something like queries that use |
1283 | autoincrement or if we have transactions). |
1284 | |
1285 | Should be called ONLY if until_condition != UNTIL_NONE ! |
1286 | RETURN VALUE |
1287 | true - condition met or error happened (condition seems to have |
1288 | bad log file name) |
1289 | false - condition not met |
1290 | */ |
1291 | |
1292 | bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos) |
1293 | { |
1294 | const char *log_name; |
1295 | ulonglong log_pos; |
1296 | DBUG_ENTER("Relay_log_info::is_until_satisfied" ); |
1297 | |
1298 | if (until_condition == UNTIL_MASTER_POS) |
1299 | { |
1300 | log_name= (mi->using_parallel() ? future_event_master_log_name |
1301 | : group_master_log_name); |
1302 | log_pos= master_beg_pos; |
1303 | } |
1304 | else |
1305 | { |
1306 | DBUG_ASSERT(until_condition == UNTIL_RELAY_POS); |
1307 | log_name= group_relay_log_name; |
1308 | log_pos= group_relay_log_pos; |
1309 | } |
1310 | |
1311 | DBUG_PRINT("info" , ("group_master_log_name='%s', group_master_log_pos=%llu" , |
1312 | group_master_log_name, group_master_log_pos)); |
1313 | DBUG_PRINT("info" , ("group_relay_log_name='%s', group_relay_log_pos=%llu" , |
1314 | group_relay_log_name, group_relay_log_pos)); |
1315 | DBUG_PRINT("info" , ("(%s) log_name='%s', log_pos=%llu" , |
1316 | until_condition == UNTIL_MASTER_POS ? "master" : "relay" , |
1317 | log_name, log_pos)); |
1318 | DBUG_PRINT("info" , ("(%s) until_log_name='%s', until_log_pos=%llu" , |
1319 | until_condition == UNTIL_MASTER_POS ? "master" : "relay" , |
1320 | until_log_name, until_log_pos)); |
1321 | |
1322 | if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) |
1323 | { |
1324 | /* |
1325 | We have no cached comparison results so we should compare log names |
1326 | and cache result. |
1327 | If we are after RESET SLAVE, and the SQL slave thread has not processed |
1328 | any event yet, it could be that group_master_log_name is "". In that case, |
1329 | just wait for more events (as there is no sensible comparison to do). |
1330 | */ |
1331 | |
1332 | if (*log_name) |
1333 | { |
1334 | const char *basename= log_name + dirname_length(log_name); |
1335 | |
1336 | const char *q= (const char*)(fn_ext(basename)+1); |
1337 | if (strncmp(basename, until_log_name, (int)(q-basename)) == 0) |
1338 | { |
1339 | /* Now compare extensions. */ |
1340 | char *q_end; |
1341 | ulong log_name_extension= strtoul(q, &q_end, 10); |
1342 | if (log_name_extension < until_log_name_extension) |
1343 | until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS; |
1344 | else |
1345 | until_log_names_cmp_result= |
1346 | (log_name_extension > until_log_name_extension) ? |
1347 | UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ; |
1348 | } |
1349 | else |
1350 | { |
1351 | /* Probably error so we aborting */ |
1352 | sql_print_error("Slave SQL thread is stopped because UNTIL " |
1353 | "condition is bad." ); |
1354 | DBUG_RETURN(TRUE); |
1355 | } |
1356 | } |
1357 | else |
1358 | DBUG_RETURN(until_log_pos == 0); |
1359 | } |
1360 | |
1361 | DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && |
1362 | log_pos >= until_log_pos) || |
1363 | until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER)); |
1364 | } |
1365 | |
1366 | |
1367 | bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, |
1368 | rpl_group_info *rgi) |
1369 | { |
1370 | int error= 0; |
1371 | DBUG_ENTER("Relay_log_info::stmt_done" ); |
1372 | |
1373 | DBUG_ASSERT(!belongs_to_client()); |
1374 | DBUG_ASSERT(rgi->rli == this); |
1375 | /* |
1376 | If in a transaction, and if the slave supports transactions, just |
1377 | inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN |
1378 | (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with |
1379 | BEGIN/COMMIT, not with SET AUTOCOMMIT= . |
1380 | |
1381 | We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN |
1382 | is also used for single row transactions. |
1383 | |
1384 | CAUTION: opt_using_transactions means innodb || bdb ; suppose the |
1385 | master supports InnoDB and BDB, but the slave supports only BDB, |
1386 | problems will arise: - suppose an InnoDB table is created on the |
1387 | master, - then it will be MyISAM on the slave - but as |
1388 | opt_using_transactions is true, the slave will believe he is |
1389 | transactional with the MyISAM table. And problems will come when |
1390 | one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will |
1391 | resume at BEGIN whereas there has not been any rollback). This is |
1392 | the problem of using opt_using_transactions instead of a finer |
1393 | "does the slave support _transactional handler used on the |
1394 | master_". |
1395 | |
1396 | More generally, we'll have problems when a query mixes a |
1397 | transactional handler and MyISAM and STOP SLAVE is issued in the |
1398 | middle of the "transaction". START SLAVE will resume at BEGIN |
1399 | while the MyISAM table has already been updated. |
1400 | */ |
1401 | if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && |
1402 | opt_using_transactions) |
1403 | rgi->inc_event_relay_log_pos(); |
1404 | else |
1405 | { |
1406 | inc_group_relay_log_pos(event_master_log_pos, rgi); |
1407 | if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi)) |
1408 | { |
1409 | report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), |
1410 | "Failed to update GTID state in %s.%s, slave state may become " |
1411 | "inconsistent: %d: %s" , |
1412 | "mysql" , rpl_gtid_slave_state_table_name.str, |
1413 | thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); |
1414 | /* |
1415 | At this point we are not in a transaction (for example after DDL), |
1416 | so we can not roll back. Anyway, normally updates to the slave |
1417 | state table should not fail, and if they do, at least we made the |
1418 | DBA aware of the problem in the error log. |
1419 | */ |
1420 | } |
1421 | DBUG_EXECUTE_IF("inject_crash_before_flush_rli" , DBUG_SUICIDE();); |
1422 | if (mi->using_gtid == Master_info::USE_GTID_NO) |
1423 | if (flush()) |
1424 | error= 1; |
1425 | DBUG_EXECUTE_IF("inject_crash_after_flush_rli" , DBUG_SUICIDE();); |
1426 | } |
1427 | DBUG_RETURN(error); |
1428 | } |
1429 | |
1430 | |
1431 | int |
1432 | Relay_log_info::alloc_inuse_relaylog(const char *name) |
1433 | { |
1434 | inuse_relaylog *ir; |
1435 | uint32 gtid_count; |
1436 | rpl_gtid *gtid_list; |
1437 | |
1438 | if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) |
1439 | { |
1440 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); |
1441 | return 1; |
1442 | } |
1443 | gtid_count= relay_log_state.count(); |
1444 | if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, |
1445 | MYF(MY_WME)))) |
1446 | { |
1447 | my_free(ir); |
1448 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); |
1449 | return 1; |
1450 | } |
1451 | if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) |
1452 | { |
1453 | my_free(gtid_list); |
1454 | my_free(ir); |
1455 | DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); |
1456 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
1457 | return 1; |
1458 | } |
1459 | ir->rli= this; |
1460 | strmake_buf(ir->name, name); |
1461 | ir->relay_log_state= gtid_list; |
1462 | ir->relay_log_state_count= gtid_count; |
1463 | |
1464 | if (!inuse_relaylog_list) |
1465 | inuse_relaylog_list= ir; |
1466 | else |
1467 | { |
1468 | last_inuse_relaylog->completed= true; |
1469 | last_inuse_relaylog->next= ir; |
1470 | } |
1471 | last_inuse_relaylog= ir; |
1472 | |
1473 | return 0; |
1474 | } |
1475 | |
1476 | |
1477 | void |
1478 | Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) |
1479 | { |
1480 | my_free(ir->relay_log_state); |
1481 | my_free(ir); |
1482 | } |
1483 | |
1484 | |
1485 | void |
1486 | Relay_log_info::reset_inuse_relaylog() |
1487 | { |
1488 | inuse_relaylog *cur= inuse_relaylog_list; |
1489 | while (cur) |
1490 | { |
1491 | DBUG_ASSERT(cur->queued_count == cur->dequeued_count); |
1492 | inuse_relaylog *next= cur->next; |
1493 | free_inuse_relaylog(cur); |
1494 | cur= next; |
1495 | } |
1496 | inuse_relaylog_list= last_inuse_relaylog= NULL; |
1497 | } |
1498 | |
1499 | |
1500 | int |
1501 | Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) |
1502 | { |
1503 | int res= 0; |
1504 | while (count) |
1505 | { |
1506 | if (relay_log_state.update_nolock(gtid_list, false)) |
1507 | res= 1; |
1508 | ++gtid_list; |
1509 | --count; |
1510 | } |
1511 | return res; |
1512 | } |
1513 | |
1514 | |
1515 | #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) |
1516 | struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; }; |
1517 | |
1518 | static int |
1519 | scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, |
1520 | LEX_CSTRING *tablename, void **out_hton) |
1521 | { |
1522 | TABLE_LIST tlist; |
1523 | TABLE *UNINIT_VAR(table); |
1524 | bool table_opened= false; |
1525 | bool table_scanned= false; |
1526 | struct gtid_pos_element tmp_entry, *entry; |
1527 | int err= 0; |
1528 | |
1529 | thd->reset_for_next_command(); |
1530 | tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ); |
1531 | if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) |
1532 | goto end; |
1533 | table_opened= true; |
1534 | table= tlist.table; |
1535 | |
1536 | if ((err= gtid_check_rpl_slave_state_table(table))) |
1537 | goto end; |
1538 | |
1539 | bitmap_set_all(table->read_set); |
1540 | if (unlikely(err= table->file->ha_rnd_init_with_error(1))) |
1541 | goto end; |
1542 | |
1543 | table_scanned= true; |
1544 | for (;;) |
1545 | { |
1546 | uint32 domain_id, server_id; |
1547 | uint64 sub_id, seq_no; |
1548 | uchar *rec; |
1549 | |
1550 | if ((err= table->file->ha_rnd_next(table->record[0]))) |
1551 | { |
1552 | if (err == HA_ERR_END_OF_FILE) |
1553 | break; |
1554 | else |
1555 | { |
1556 | table->file->print_error(err, MYF(0)); |
1557 | goto end; |
1558 | } |
1559 | } |
1560 | domain_id= (uint32)table->field[0]->val_int(); |
1561 | sub_id= (ulonglong)table->field[1]->val_int(); |
1562 | server_id= (uint32)table->field[2]->val_int(); |
1563 | seq_no= (ulonglong)table->field[3]->val_int(); |
1564 | DBUG_PRINT("info" , ("Read slave state row: %u-%u-%lu sub_id=%lu\n" , |
1565 | (unsigned)domain_id, (unsigned)server_id, |
1566 | (ulong)seq_no, (ulong)sub_id)); |
1567 | |
1568 | tmp_entry.sub_id= sub_id; |
1569 | tmp_entry.gtid.domain_id= domain_id; |
1570 | tmp_entry.gtid.server_id= server_id; |
1571 | tmp_entry.gtid.seq_no= seq_no; |
1572 | tmp_entry.hton= table->s->db_type(); |
1573 | if ((err= insert_dynamic(array, (uchar *)&tmp_entry))) |
1574 | { |
1575 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
1576 | goto end; |
1577 | } |
1578 | |
1579 | if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0))) |
1580 | { |
1581 | entry= (struct gtid_pos_element *)rec; |
1582 | if (entry->sub_id >= sub_id) |
1583 | continue; |
1584 | entry->sub_id= sub_id; |
1585 | DBUG_ASSERT(entry->gtid.domain_id == domain_id); |
1586 | entry->gtid.server_id= server_id; |
1587 | entry->gtid.seq_no= seq_no; |
1588 | entry->hton= table->s->db_type(); |
1589 | } |
1590 | else |
1591 | { |
1592 | if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry), |
1593 | MYF(MY_WME)))) |
1594 | { |
1595 | my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); |
1596 | err= 1; |
1597 | goto end; |
1598 | } |
1599 | entry->sub_id= sub_id; |
1600 | entry->gtid.domain_id= domain_id; |
1601 | entry->gtid.server_id= server_id; |
1602 | entry->gtid.seq_no= seq_no; |
1603 | entry->hton= table->s->db_type(); |
1604 | if ((err= my_hash_insert(hash, (uchar *)entry))) |
1605 | { |
1606 | my_free(entry); |
1607 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
1608 | goto end; |
1609 | } |
1610 | } |
1611 | } |
1612 | err= 0; /* Clear HA_ERR_END_OF_FILE */ |
1613 | |
1614 | end: |
1615 | if (table_scanned) |
1616 | { |
1617 | table->file->ha_index_or_rnd_end(); |
1618 | ha_commit_trans(thd, FALSE); |
1619 | ha_commit_trans(thd, TRUE); |
1620 | } |
1621 | if (table_opened) |
1622 | { |
1623 | *out_hton= table->s->db_type(); |
1624 | close_thread_tables(thd); |
1625 | thd->mdl_context.release_transactional_locks(); |
1626 | } |
1627 | return err; |
1628 | } |
1629 | |
1630 | |
1631 | /* |
1632 | Look for all tables mysql.gtid_slave_pos*. Read all rows from each such |
1633 | table found into ARRAY. For each domain id, put the row with highest sub_id |
1634 | into HASH. |
1635 | */ |
1636 | static int |
1637 | scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *), |
1638 | void *cb_data) |
1639 | { |
1640 | char path[FN_REFLEN]; |
1641 | MY_DIR *dirp; |
1642 | |
1643 | thd->reset_for_next_command(); |
1644 | if (lock_schema_name(thd, MYSQL_SCHEMA_NAME.str)) |
1645 | return 1; |
1646 | |
1647 | build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "" , "" , 0); |
1648 | if (!(dirp= my_dir(path, MYF(MY_DONT_SORT)))) |
1649 | { |
1650 | my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno); |
1651 | close_thread_tables(thd); |
1652 | thd->mdl_context.release_transactional_locks(); |
1653 | return 1; |
1654 | } |
1655 | else |
1656 | { |
1657 | size_t i; |
1658 | Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files); |
1659 | Discovered_table_list tl(thd, &files); |
1660 | int err; |
1661 | |
1662 | err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false); |
1663 | my_dirend(dirp); |
1664 | close_thread_tables(thd); |
1665 | thd->mdl_context.release_transactional_locks(); |
1666 | if (err) |
1667 | return err; |
1668 | |
1669 | for (i = 0; i < files.elements(); ++i) |
1670 | { |
1671 | if (strncmp(files.at(i)->str, |
1672 | rpl_gtid_slave_state_table_name.str, |
1673 | rpl_gtid_slave_state_table_name.length) == 0) |
1674 | { |
1675 | if ((err= (*cb)(thd, files.at(i), cb_data))) |
1676 | return err; |
1677 | } |
1678 | } |
1679 | } |
1680 | |
1681 | return 0; |
1682 | } |
1683 | |
1684 | |
1685 | struct load_gtid_state_cb_data { |
1686 | HASH *hash; |
1687 | DYNAMIC_ARRAY *array; |
1688 | struct rpl_slave_state::gtid_pos_table *table_list; |
1689 | struct rpl_slave_state::gtid_pos_table *default_entry; |
1690 | }; |
1691 | |
1692 | static int |
1693 | process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton, |
1694 | struct load_gtid_state_cb_data *data) |
1695 | { |
1696 | struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr; |
1697 | bool is_default= |
1698 | (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0); |
1699 | |
1700 | /* |
1701 | Ignore tables with duplicate storage engine, with a warning. |
1702 | Prefer the default mysql.gtid_slave_pos over another table |
1703 | mysql.gtid_slave_posXXX with the same storage engine. |
1704 | */ |
1705 | next_ptr= &data->table_list; |
1706 | entry= data->table_list; |
1707 | while (entry) |
1708 | { |
1709 | if (entry->table_hton == hton) |
1710 | { |
1711 | static const char *warning_msg= "Ignoring redundant table mysql.%s " |
1712 | "since mysql.%s has the same storage engine" ; |
1713 | if (!is_default) |
1714 | { |
1715 | /* Ignore the redundant table. */ |
1716 | sql_print_warning(warning_msg, table_name->str, entry->table_name.str); |
1717 | return 0; |
1718 | } |
1719 | else |
1720 | { |
1721 | sql_print_warning(warning_msg, entry->table_name.str, table_name->str); |
1722 | /* Delete the redundant table, and proceed to add this one instead. */ |
1723 | *next_ptr= entry->next; |
1724 | my_free(entry); |
1725 | break; |
1726 | } |
1727 | } |
1728 | next_ptr= &entry->next; |
1729 | entry= entry->next; |
1730 | } |
1731 | |
1732 | p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, |
1733 | hton, rpl_slave_state::GTID_POS_AVAILABLE); |
1734 | if (!p) |
1735 | return 1; |
1736 | p->next= data->table_list; |
1737 | data->table_list= p; |
1738 | if (is_default) |
1739 | data->default_entry= p; |
1740 | return 0; |
1741 | } |
1742 | |
1743 | |
1744 | /* |
1745 | Put tables corresponding to @@gtid_pos_auto_engines at the end of the list, |
1746 | marked to be auto-created if needed. |
1747 | */ |
1748 | static int |
1749 | gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr) |
1750 | { |
1751 | plugin_ref *auto_engines; |
1752 | int err= 0; |
1753 | mysql_mutex_lock(&LOCK_global_system_variables); |
1754 | for (auto_engines= opt_gtid_pos_auto_plugins; |
1755 | !err && auto_engines && *auto_engines; |
1756 | ++auto_engines) |
1757 | { |
1758 | void *hton= plugin_hton(*auto_engines); |
1759 | char buf[FN_REFLEN+1]; |
1760 | LEX_CSTRING table_name; |
1761 | char *p; |
1762 | rpl_slave_state::gtid_pos_table *entry, **next_ptr; |
1763 | |
1764 | /* See if this engine is already in the list. */ |
1765 | next_ptr= list_ptr; |
1766 | entry= *list_ptr; |
1767 | while (entry) |
1768 | { |
1769 | if (entry->table_hton == hton) |
1770 | break; |
1771 | next_ptr= &entry->next; |
1772 | entry= entry->next; |
1773 | } |
1774 | if (entry) |
1775 | continue; |
1776 | |
1777 | /* Add an auto-create entry for this engine at end of list. */ |
1778 | p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN); |
1779 | p= strmake(p, "_" , FN_REFLEN - (p - buf)); |
1780 | p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf)); |
1781 | table_name.str= buf; |
1782 | table_name.length= p - buf; |
1783 | table_case_convert(const_cast<char*>(table_name.str), |
1784 | static_cast<uint>(table_name.length)); |
1785 | entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table |
1786 | (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE); |
1787 | if (!entry) |
1788 | { |
1789 | err= 1; |
1790 | break; |
1791 | } |
1792 | *next_ptr= entry; |
1793 | } |
1794 | mysql_mutex_unlock(&LOCK_global_system_variables); |
1795 | return err; |
1796 | } |
1797 | |
1798 | |
1799 | static int |
1800 | load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg) |
1801 | { |
1802 | int err; |
1803 | load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); |
1804 | void *hton; |
1805 | |
1806 | if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, |
1807 | table_name, &hton))) |
1808 | return err; |
1809 | return process_gtid_pos_table(thd, table_name, hton, data); |
1810 | } |
1811 | |
1812 | |
1813 | int |
1814 | rpl_load_gtid_slave_state(THD *thd) |
1815 | { |
1816 | bool array_inited= false; |
1817 | struct gtid_pos_element tmp_entry, *entry; |
1818 | HASH hash; |
1819 | DYNAMIC_ARRAY array; |
1820 | int err= 0; |
1821 | uint32 i; |
1822 | load_gtid_state_cb_data cb_data; |
1823 | DBUG_ENTER("rpl_load_gtid_slave_state" ); |
1824 | |
1825 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1826 | bool loaded= rpl_global_gtid_slave_state->loaded; |
1827 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1828 | if (loaded) |
1829 | DBUG_RETURN(0); |
1830 | |
1831 | cb_data.table_list= NULL; |
1832 | cb_data.default_entry= NULL; |
1833 | my_hash_init(&hash, &my_charset_bin, 32, |
1834 | offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), |
1835 | sizeof(uint32), NULL, my_free, HASH_UNIQUE); |
1836 | if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0)))) |
1837 | goto end; |
1838 | array_inited= true; |
1839 | |
1840 | cb_data.hash = &hash; |
1841 | cb_data.array = &array; |
1842 | if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data))) |
1843 | goto end; |
1844 | |
1845 | if (!cb_data.default_entry) |
1846 | { |
1847 | /* |
1848 | If the mysql.gtid_slave_pos table does not exist, but at least one other |
1849 | table is available, arbitrarily pick the first in the list to use as |
1850 | default. |
1851 | */ |
1852 | cb_data.default_entry= cb_data.table_list; |
1853 | } |
1854 | if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) |
1855 | goto end; |
1856 | |
1857 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1858 | if (rpl_global_gtid_slave_state->loaded) |
1859 | { |
1860 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1861 | goto end; |
1862 | } |
1863 | |
1864 | if (!cb_data.table_list) |
1865 | { |
1866 | my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql" , |
1867 | rpl_gtid_slave_state_table_name.str); |
1868 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1869 | err= 1; |
1870 | goto end; |
1871 | } |
1872 | |
1873 | for (i= 0; i < array.elements; ++i) |
1874 | { |
1875 | get_dynamic(&array, (uchar *)&tmp_entry, i); |
1876 | if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, |
1877 | tmp_entry.gtid.server_id, |
1878 | tmp_entry.sub_id, |
1879 | tmp_entry.gtid.seq_no, |
1880 | tmp_entry.hton, |
1881 | NULL))) |
1882 | { |
1883 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1884 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
1885 | goto end; |
1886 | } |
1887 | } |
1888 | |
1889 | for (i= 0; i < hash.records; ++i) |
1890 | { |
1891 | entry= (struct gtid_pos_element *)my_hash_element(&hash, i); |
1892 | if (opt_bin_log && |
1893 | mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, |
1894 | entry->gtid.seq_no)) |
1895 | { |
1896 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1897 | my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
1898 | goto end; |
1899 | } |
1900 | } |
1901 | |
1902 | rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, |
1903 | cb_data.default_entry); |
1904 | cb_data.table_list= NULL; |
1905 | rpl_global_gtid_slave_state->loaded= true; |
1906 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1907 | |
1908 | end: |
1909 | if (array_inited) |
1910 | delete_dynamic(&array); |
1911 | my_hash_free(&hash); |
1912 | if (cb_data.table_list) |
1913 | rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); |
1914 | DBUG_RETURN(err); |
1915 | } |
1916 | |
1917 | |
1918 | static int |
1919 | find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg) |
1920 | { |
1921 | load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); |
1922 | TABLE_LIST tlist; |
1923 | TABLE *table= NULL; |
1924 | int err; |
1925 | |
1926 | thd->reset_for_next_command(); |
1927 | tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ); |
1928 | if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) |
1929 | goto end; |
1930 | table= tlist.table; |
1931 | |
1932 | if ((err= gtid_check_rpl_slave_state_table(table))) |
1933 | goto end; |
1934 | err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data); |
1935 | |
1936 | end: |
1937 | if (table) |
1938 | { |
1939 | ha_commit_trans(thd, FALSE); |
1940 | ha_commit_trans(thd, TRUE); |
1941 | close_thread_tables(thd); |
1942 | thd->mdl_context.release_transactional_locks(); |
1943 | } |
1944 | |
1945 | return err; |
1946 | } |
1947 | |
1948 | |
1949 | /* |
1950 | Re-compute the list of available mysql.gtid_slave_posXXX tables. |
1951 | |
1952 | This is done at START SLAVE to pick up any newly created tables without |
1953 | requiring server restart. |
1954 | */ |
1955 | int |
1956 | find_gtid_slave_pos_tables(THD *thd) |
1957 | { |
1958 | int err= 0; |
1959 | load_gtid_state_cb_data cb_data; |
1960 | uint num_running; |
1961 | |
1962 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1963 | bool loaded= rpl_global_gtid_slave_state->loaded; |
1964 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1965 | if (!loaded) |
1966 | return 0; |
1967 | |
1968 | cb_data.table_list= NULL; |
1969 | cb_data.default_entry= NULL; |
1970 | if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data))) |
1971 | goto end; |
1972 | |
1973 | if (!cb_data.table_list) |
1974 | { |
1975 | my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql" , |
1976 | rpl_gtid_slave_state_table_name.str); |
1977 | err= 1; |
1978 | goto end; |
1979 | } |
1980 | if (!cb_data.default_entry) |
1981 | { |
1982 | /* |
1983 | If the mysql.gtid_slave_pos table does not exist, but at least one other |
1984 | table is available, arbitrarily pick the first in the list to use as |
1985 | default. |
1986 | */ |
1987 | cb_data.default_entry= cb_data.table_list; |
1988 | } |
1989 | if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) |
1990 | goto end; |
1991 | |
1992 | mysql_mutex_lock(&LOCK_active_mi); |
1993 | num_running= any_slave_sql_running(true); |
1994 | mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
1995 | if (num_running <= 1) |
1996 | { |
1997 | /* |
1998 | If no slave is running now, the count will be 1, since this SQL thread |
1999 | which is starting is included in the count. In this case, we can safely |
2000 | replace the list, no-one can be trying to read it without lock. |
2001 | */ |
2002 | DBUG_ASSERT(num_running == 1); |
2003 | rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, |
2004 | cb_data.default_entry); |
2005 | cb_data.table_list= NULL; |
2006 | } |
2007 | else |
2008 | { |
2009 | /* |
2010 | If there are SQL threads running, we cannot safely remove the old list. |
2011 | However we can add new entries, and warn about any tables that |
2012 | disappeared, but may still be visible to running SQL threads. |
2013 | */ |
2014 | rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr; |
2015 | |
2016 | old_entry= (rpl_slave_state::gtid_pos_table *) |
2017 | rpl_global_gtid_slave_state->gtid_pos_tables; |
2018 | while (old_entry) |
2019 | { |
2020 | new_entry= cb_data.table_list; |
2021 | while (new_entry) |
2022 | { |
2023 | if (new_entry->table_hton == old_entry->table_hton) |
2024 | break; |
2025 | new_entry= new_entry->next; |
2026 | } |
2027 | if (!new_entry) |
2028 | sql_print_warning("The table mysql.%s was removed. " |
2029 | "This change will not take full effect " |
2030 | "until all SQL threads have been restarted" , |
2031 | old_entry->table_name.str); |
2032 | old_entry= old_entry->next; |
2033 | } |
2034 | next_ptr_ptr= &cb_data.table_list; |
2035 | new_entry= cb_data.table_list; |
2036 | while (new_entry) |
2037 | { |
2038 | /* Check if we already have a table with this storage engine. */ |
2039 | old_entry= (rpl_slave_state::gtid_pos_table *) |
2040 | rpl_global_gtid_slave_state->gtid_pos_tables; |
2041 | while (old_entry) |
2042 | { |
2043 | if (new_entry->table_hton == old_entry->table_hton) |
2044 | break; |
2045 | old_entry= old_entry->next; |
2046 | } |
2047 | if (old_entry) |
2048 | { |
2049 | /* This new_entry is already available in the list. */ |
2050 | next_ptr_ptr= &new_entry->next; |
2051 | new_entry= new_entry->next; |
2052 | } |
2053 | else |
2054 | { |
2055 | /* Move this new_entry to the list. */ |
2056 | rpl_slave_state::gtid_pos_table *next= new_entry->next; |
2057 | rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry); |
2058 | *next_ptr_ptr= next; |
2059 | new_entry= next; |
2060 | } |
2061 | } |
2062 | } |
2063 | mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); |
2064 | mysql_mutex_unlock(&LOCK_active_mi); |
2065 | |
2066 | end: |
2067 | if (cb_data.table_list) |
2068 | rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); |
2069 | return err; |
2070 | } |
2071 | |
2072 | |
2073 | void |
2074 | rpl_group_info::reinit(Relay_log_info *rli) |
2075 | { |
2076 | this->rli= rli; |
2077 | tables_to_lock= NULL; |
2078 | tables_to_lock_count= 0; |
2079 | trans_retries= 0; |
2080 | last_event_start_time= 0; |
2081 | gtid_sub_id= 0; |
2082 | commit_id= 0; |
2083 | gtid_pending= false; |
2084 | worker_error= 0; |
2085 | row_stmt_start_timestamp= 0; |
2086 | long_find_row_note_printed= false; |
2087 | did_mark_start_commit= false; |
2088 | gtid_ev_flags2= 0; |
2089 | last_master_timestamp = 0; |
2090 | gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; |
2091 | speculation= SPECULATE_NO; |
2092 | commit_orderer.reinit(); |
2093 | } |
2094 | |
2095 | rpl_group_info::rpl_group_info(Relay_log_info *rli) |
2096 | : thd(0), wait_commit_sub_id(0), |
2097 | wait_commit_group_info(0), parallel_entry(0), |
2098 | deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) |
2099 | { |
2100 | reinit(rli); |
2101 | bzero(¤t_gtid, sizeof(current_gtid)); |
2102 | mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, |
2103 | MY_MUTEX_INIT_FAST); |
2104 | mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); |
2105 | } |
2106 | |
2107 | |
2108 | rpl_group_info::~rpl_group_info() |
2109 | { |
2110 | free_annotate_event(); |
2111 | delete deferred_events; |
2112 | mysql_mutex_destroy(&sleep_lock); |
2113 | mysql_cond_destroy(&sleep_cond); |
2114 | } |
2115 | |
2116 | |
2117 | int |
2118 | event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) |
2119 | { |
2120 | uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id); |
2121 | if (!sub_id) |
2122 | { |
2123 | /* Out of memory caused hash insertion to fail. */ |
2124 | return 1; |
2125 | } |
2126 | rgi->gtid_sub_id= sub_id; |
2127 | rgi->current_gtid.domain_id= gev->domain_id; |
2128 | rgi->current_gtid.server_id= gev->server_id; |
2129 | rgi->current_gtid.seq_no= gev->seq_no; |
2130 | rgi->commit_id= gev->commit_id; |
2131 | rgi->gtid_pending= true; |
2132 | return 0; |
2133 | } |
2134 | |
2135 | |
2136 | void |
2137 | delete_or_keep_event_post_apply(rpl_group_info *rgi, |
2138 | Log_event_type typ, Log_event *ev) |
2139 | { |
2140 | /* |
2141 | ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be |
2142 | thread-safe for parallel replication. |
2143 | */ |
2144 | |
2145 | switch (typ) { |
2146 | case FORMAT_DESCRIPTION_EVENT: |
2147 | /* |
2148 | Format_description_log_event should not be deleted because it |
2149 | will be used to read info about the relay log's format; |
2150 | it will be deleted when the SQL thread does not need it, |
2151 | i.e. when this thread terminates. |
2152 | */ |
2153 | break; |
2154 | case ANNOTATE_ROWS_EVENT: |
2155 | /* |
2156 | Annotate_rows event should not be deleted because after it has |
2157 | been applied, thd->query points to the string inside this event. |
2158 | The thd->query will be used to generate new Annotate_rows event |
2159 | during applying the subsequent Rows events. |
2160 | */ |
2161 | rgi->set_annotate_event((Annotate_rows_log_event*) ev); |
2162 | break; |
2163 | case DELETE_ROWS_EVENT_V1: |
2164 | case UPDATE_ROWS_EVENT_V1: |
2165 | case WRITE_ROWS_EVENT_V1: |
2166 | case DELETE_ROWS_EVENT: |
2167 | case UPDATE_ROWS_EVENT: |
2168 | case WRITE_ROWS_EVENT: |
2169 | case WRITE_ROWS_COMPRESSED_EVENT: |
2170 | case DELETE_ROWS_COMPRESSED_EVENT: |
2171 | case UPDATE_ROWS_COMPRESSED_EVENT: |
2172 | case WRITE_ROWS_COMPRESSED_EVENT_V1: |
2173 | case UPDATE_ROWS_COMPRESSED_EVENT_V1: |
2174 | case DELETE_ROWS_COMPRESSED_EVENT_V1: |
2175 | /* |
2176 | After the last Rows event has been applied, the saved Annotate_rows |
2177 | event (if any) is not needed anymore and can be deleted. |
2178 | */ |
2179 | if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) |
2180 | rgi->free_annotate_event(); |
2181 | /* fall through */ |
2182 | default: |
2183 | DBUG_PRINT("info" , ("Deleting the event after it has been executed" )); |
2184 | if (!rgi->is_deferred_event(ev)) |
2185 | delete ev; |
2186 | break; |
2187 | } |
2188 | } |
2189 | |
2190 | |
2191 | void rpl_group_info::cleanup_context(THD *thd, bool error) |
2192 | { |
2193 | DBUG_ENTER("rpl_group_info::cleanup_context" ); |
2194 | DBUG_PRINT("enter" , ("error: %d" , (int) error)); |
2195 | |
2196 | DBUG_ASSERT(this->thd == thd); |
2197 | /* |
2198 | 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, |
2199 | may have opened tables, which we cannot be sure have been closed (because |
2200 | maybe the Rows_log_event have not been found or will not be, because slave |
2201 | SQL thread is stopping, or relay log has a missing tail etc). So we close |
2202 | all thread's tables. And so the table mappings have to be cancelled. |
2203 | 2) Rows_log_event::do_apply_event() may even have started statements or |
2204 | transactions on them, which we need to rollback in case of error. |
2205 | 3) If finding a Format_description_log_event after a BEGIN, we also need |
2206 | to rollback before continuing with the next events. |
2207 | 4) so we need this "context cleanup" function. |
2208 | */ |
2209 | if (unlikely(error)) |
2210 | { |
2211 | trans_rollback_stmt(thd); // if a "statement transaction" |
2212 | /* trans_rollback() also resets OPTION_GTID_BEGIN */ |
2213 | trans_rollback(thd); // if a "real transaction" |
2214 | /* |
2215 | Now that we have rolled back the transaction, make sure we do not |
2216 | erroneously update the GTID position. |
2217 | */ |
2218 | gtid_pending= false; |
2219 | } |
2220 | m_table_map.clear_tables(); |
2221 | slave_close_thread_tables(thd); |
2222 | |
2223 | if (unlikely(error)) |
2224 | { |
2225 | thd->mdl_context.release_transactional_locks(); |
2226 | |
2227 | if (thd == rli->sql_driver_thd) |
2228 | { |
2229 | /* |
2230 | Reset flags. This is needed to handle incident events and errors in |
2231 | the relay log noticed by the sql driver thread. |
2232 | */ |
2233 | rli->clear_flag(Relay_log_info::IN_STMT); |
2234 | rli->clear_flag(Relay_log_info::IN_TRANSACTION); |
2235 | } |
2236 | |
2237 | /* |
2238 | Ensure we always release the domain for others to process, when using |
2239 | --gtid-ignore-duplicates. |
2240 | */ |
2241 | if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL) |
2242 | rpl_global_gtid_slave_state->release_domain_owner(this); |
2243 | } |
2244 | |
2245 | /* |
2246 | Cleanup for the flags that have been set at do_apply_event. |
2247 | */ |
2248 | thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS | |
2249 | OPTION_RELAXED_UNIQUE_CHECKS | |
2250 | OPTION_NO_CHECK_CONSTRAINT_CHECKS); |
2251 | |
2252 | /* |
2253 | Reset state related to long_find_row notes in the error log: |
2254 | - timestamp |
2255 | - flag that decides whether the slave prints or not |
2256 | */ |
2257 | reset_row_stmt_start_timestamp(); |
2258 | unset_long_find_row_note_printed(); |
2259 | |
2260 | DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x" , { |
2261 | if (current_gtid.domain_id == 100) |
2262 | my_sleep(50000); |
2263 | };); |
2264 | |
2265 | DBUG_VOID_RETURN; |
2266 | } |
2267 | |
2268 | |
2269 | void rpl_group_info::clear_tables_to_lock() |
2270 | { |
2271 | DBUG_ENTER("rpl_group_info::clear_tables_to_lock()" ); |
2272 | #ifndef DBUG_OFF |
2273 | /** |
2274 | When replicating in RBR and MyISAM Merge tables are involved |
2275 | open_and_lock_tables (called in do_apply_event) appends the |
2276 | base tables to the list of tables_to_lock. Then these are |
2277 | removed from the list in close_thread_tables (which is called |
2278 | before we reach this point). |
2279 | |
2280 | This assertion just confirms that we get no surprises at this |
2281 | point. |
2282 | */ |
2283 | uint i=0; |
2284 | for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; |
2285 | DBUG_ASSERT(i == tables_to_lock_count); |
2286 | #endif |
2287 | |
2288 | while (tables_to_lock) |
2289 | { |
2290 | uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); |
2291 | if (tables_to_lock->m_tabledef_valid) |
2292 | { |
2293 | tables_to_lock->m_tabledef.table_def::~table_def(); |
2294 | tables_to_lock->m_tabledef_valid= FALSE; |
2295 | } |
2296 | |
2297 | /* |
2298 | If blob fields were used during conversion of field values |
2299 | from the master table into the slave table, then we need to |
2300 | free the memory used temporarily to store their values before |
2301 | copying into the slave's table. |
2302 | */ |
2303 | if (tables_to_lock->m_conv_table) |
2304 | free_blobs(tables_to_lock->m_conv_table); |
2305 | |
2306 | tables_to_lock= |
2307 | static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); |
2308 | tables_to_lock_count--; |
2309 | my_free(to_free); |
2310 | } |
2311 | DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); |
2312 | DBUG_VOID_RETURN; |
2313 | } |
2314 | |
2315 | |
2316 | void rpl_group_info::slave_close_thread_tables(THD *thd) |
2317 | { |
2318 | DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)" ); |
2319 | thd->get_stmt_da()->set_overwrite_status(true); |
2320 | thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); |
2321 | thd->get_stmt_da()->set_overwrite_status(false); |
2322 | |
2323 | close_thread_tables(thd); |
2324 | /* |
2325 | - If transaction rollback was requested due to deadlock |
2326 | perform it and release metadata locks. |
2327 | - If inside a multi-statement transaction, |
2328 | defer the release of metadata locks until the current |
2329 | transaction is either committed or rolled back. This prevents |
2330 | other statements from modifying the table for the entire |
2331 | duration of this transaction. This provides commit ordering |
2332 | and guarantees serializability across multiple transactions. |
2333 | - If in autocommit mode, or outside a transactional context, |
2334 | automatically release metadata locks of the current statement. |
2335 | */ |
2336 | if (thd->transaction_rollback_request) |
2337 | { |
2338 | trans_rollback_implicit(thd); |
2339 | thd->mdl_context.release_transactional_locks(); |
2340 | } |
2341 | else if (! thd->in_multi_stmt_transaction_mode()) |
2342 | thd->mdl_context.release_transactional_locks(); |
2343 | else |
2344 | thd->mdl_context.release_statement_locks(); |
2345 | |
2346 | clear_tables_to_lock(); |
2347 | DBUG_VOID_RETURN; |
2348 | } |
2349 | |
2350 | |
2351 | |
2352 | static void |
2353 | mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco, |
2354 | rpl_group_info *rgi) |
2355 | { |
2356 | group_commit_orderer *tmp; |
2357 | uint64 count= ++e->count_committing_event_groups; |
2358 | /* Signal any following GCO whose wait_count has been reached now. */ |
2359 | tmp= gco; |
2360 | while ((tmp= tmp->next_gco)) |
2361 | { |
2362 | uint64 wait_count= tmp->wait_count; |
2363 | if (wait_count > count) |
2364 | break; |
2365 | mysql_cond_broadcast(&tmp->COND_group_commit_orderer); |
2366 | } |
2367 | } |
2368 | |
2369 | |
2370 | void |
2371 | rpl_group_info::mark_start_commit_no_lock() |
2372 | { |
2373 | if (did_mark_start_commit) |
2374 | return; |
2375 | did_mark_start_commit= true; |
2376 | mark_start_commit_inner(parallel_entry, gco, this); |
2377 | } |
2378 | |
2379 | |
2380 | void |
2381 | rpl_group_info::mark_start_commit() |
2382 | { |
2383 | rpl_parallel_entry *e; |
2384 | |
2385 | if (did_mark_start_commit) |
2386 | return; |
2387 | did_mark_start_commit= true; |
2388 | |
2389 | e= this->parallel_entry; |
2390 | mysql_mutex_lock(&e->LOCK_parallel_entry); |
2391 | mark_start_commit_inner(e, gco, this); |
2392 | mysql_mutex_unlock(&e->LOCK_parallel_entry); |
2393 | } |
2394 | |
2395 | |
2396 | /* |
2397 | Format the current GTID as a string suitable for printing in error messages. |
2398 | |
2399 | The string is stored in a buffer inside rpl_group_info, so remains valid |
2400 | until next call to gtid_info() or until destruction of rpl_group_info. |
2401 | |
2402 | If no GTID is available, then NULL is returned. |
2403 | */ |
2404 | char * |
2405 | rpl_group_info::gtid_info() |
2406 | { |
2407 | if (!gtid_sub_id || !current_gtid.seq_no) |
2408 | return NULL; |
2409 | my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu" , |
2410 | current_gtid.domain_id, current_gtid.server_id, |
2411 | current_gtid.seq_no); |
2412 | return gtid_info_buf; |
2413 | } |
2414 | |
2415 | |
2416 | /* |
2417 | Undo the effect of a prior mark_start_commit(). |
2418 | |
2419 | This is only used for retrying a transaction in parallel replication, after |
2420 | we have encountered a deadlock or other temporary error. |
2421 | |
2422 | When we get such a deadlock, it means that the current group of transactions |
2423 | did not yet all start committing (else they would not have deadlocked). So |
2424 | we will not yet have woken up anything in the next group, our rgi->gco is |
2425 | still live, and we can simply decrement the counter (to be incremented again |
2426 | later, when the retry succeeds and reaches the commit step). |
2427 | */ |
2428 | void |
2429 | rpl_group_info::unmark_start_commit() |
2430 | { |
2431 | rpl_parallel_entry *e; |
2432 | |
2433 | if (!did_mark_start_commit) |
2434 | return; |
2435 | did_mark_start_commit= false; |
2436 | |
2437 | e= this->parallel_entry; |
2438 | mysql_mutex_lock(&e->LOCK_parallel_entry); |
2439 | --e->count_committing_event_groups; |
2440 | mysql_mutex_unlock(&e->LOCK_parallel_entry); |
2441 | } |
2442 | |
2443 | |
2444 | rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) |
2445 | : rpl_filter(filter) |
2446 | { |
2447 | cached_charset_invalidate(); |
2448 | } |
2449 | |
2450 | |
2451 | void rpl_sql_thread_info::cached_charset_invalidate() |
2452 | { |
2453 | DBUG_ENTER("rpl_group_info::cached_charset_invalidate" ); |
2454 | |
2455 | /* Full of zeroes means uninitialized. */ |
2456 | bzero(cached_charset, sizeof(cached_charset)); |
2457 | DBUG_VOID_RETURN; |
2458 | } |
2459 | |
2460 | |
2461 | bool rpl_sql_thread_info::cached_charset_compare(char *charset) const |
2462 | { |
2463 | DBUG_ENTER("rpl_group_info::cached_charset_compare" ); |
2464 | |
2465 | if (memcmp(cached_charset, charset, sizeof(cached_charset))) |
2466 | { |
2467 | memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); |
2468 | DBUG_RETURN(1); |
2469 | } |
2470 | DBUG_RETURN(0); |
2471 | } |
2472 | |
2473 | |
2474 | /** |
2475 | Store the file and position where the slave's SQL thread are in the |
2476 | relay log. |
2477 | |
2478 | Notes: |
2479 | |
2480 | - This function should be called either from the slave SQL thread, |
2481 | or when the slave thread is not running. (It reads the |
2482 | group_{relay|master}_log_{pos|name} and delay fields in the rli |
2483 | object. These may only be modified by the slave SQL thread or by |
2484 | a client thread when the slave SQL thread is not running.) |
2485 | |
2486 | - If there is an active transaction, then we do not update the |
2487 | position in the relay log. This is to ensure that we re-execute |
2488 | statements if we die in the middle of an transaction that was |
2489 | rolled back. |
2490 | |
2491 | - As a transaction never spans binary logs, we don't have to handle |
2492 | the case where we do a relay-log-rotation in the middle of the |
2493 | transaction. If transactions could span several binlogs, we would |
2494 | have to ensure that we do not delete the relay log file where the |
2495 | transaction started before switching to a new relay log file. |
2496 | |
2497 | - Error can happen if writing to file fails or if flushing the file |
2498 | fails. |
2499 | |
2500 | @param rli The object representing the Relay_log_info. |
2501 | |
2502 | @todo Change the log file information to a binary format to avoid |
2503 | calling longlong2str. |
2504 | |
2505 | @return 0 on success, 1 on error. |
2506 | */ |
2507 | bool Relay_log_info::flush() |
2508 | { |
2509 | bool error=0; |
2510 | |
2511 | DBUG_ENTER("Relay_log_info::flush()" ); |
2512 | |
2513 | IO_CACHE *file = &info_file; |
2514 | // 2*file name, 2*long long, 2*unsigned long, 6*'\n' |
2515 | char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos; |
2516 | my_b_seek(file, 0L); |
2517 | pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10); |
2518 | *pos++='\n'; |
2519 | pos=strmov(pos, group_relay_log_name); |
2520 | *pos++='\n'; |
2521 | pos=longlong10_to_str(group_relay_log_pos, pos, 10); |
2522 | *pos++='\n'; |
2523 | pos=strmov(pos, group_master_log_name); |
2524 | *pos++='\n'; |
2525 | pos=longlong10_to_str(group_master_log_pos, pos, 10); |
2526 | *pos++='\n'; |
2527 | pos= longlong10_to_str(sql_delay, pos, 10); |
2528 | *pos++= '\n'; |
2529 | if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff))) |
2530 | error=1; |
2531 | if (flush_io_cache(file)) |
2532 | error=1; |
2533 | if (sync_relayloginfo_period && |
2534 | !error && |
2535 | ++sync_counter >= sync_relayloginfo_period) |
2536 | { |
2537 | if (my_sync(info_fd, MYF(MY_WME))) |
2538 | error=1; |
2539 | sync_counter= 0; |
2540 | } |
2541 | /* |
2542 | Flushing the relay log is done by the slave I/O thread |
2543 | or by the user on STOP SLAVE. |
2544 | */ |
2545 | DBUG_RETURN(error); |
2546 | } |
2547 | |
2548 | #endif |
2549 | |