| 1 | /* Copyright (C) 2013-2015 Codership Oy <info@codership.com> |
| 2 | |
| 3 | This program is free software; you can redistribute it and/or modify |
| 4 | it under the terms of the GNU General Public License as published by |
| 5 | the Free Software Foundation; version 2 of the License. |
| 6 | |
| 7 | This program is distributed in the hope that it will be useful, |
| 8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 10 | GNU General Public License for more details. |
| 11 | |
| 12 | You should have received a copy of the GNU General Public License along |
| 13 | with this program; if not, write to the Free Software Foundation, Inc., |
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ |
| 15 | |
| 16 | #include "mariadb.h" |
| 17 | #include "wsrep_priv.h" |
| 18 | #include "wsrep_binlog.h" // wsrep_dump_rbr_buf() |
| 19 | #include "wsrep_xid.h" |
| 20 | |
| 21 | #include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. |
| 22 | #include "wsrep_applier.h" |
| 23 | #include "debug_sync.h" |
| 24 | |
| 25 | /* |
| 26 | read the first event from (*buf). The size of the (*buf) is (*buf_len). |
| 27 | At the end (*buf) is shitfed to point to the following event or NULL and |
| 28 | (*buf_len) will be changed to account just being read bytes of the 1st event. |
| 29 | */ |
| 30 | |
| 31 | static Log_event* wsrep_read_log_event( |
| 32 | char **arg_buf, size_t *arg_buf_len, |
| 33 | const Format_description_log_event *description_event) |
| 34 | { |
| 35 | DBUG_ENTER("wsrep_read_log_event" ); |
| 36 | char *head= (*arg_buf); |
| 37 | |
| 38 | uint data_len = uint4korr(head + EVENT_LEN_OFFSET); |
| 39 | char *buf= (*arg_buf); |
| 40 | const char *error= 0; |
| 41 | Log_event *res= 0; |
| 42 | |
| 43 | res= Log_event::read_log_event(buf, data_len, &error, description_event, |
| 44 | true); |
| 45 | |
| 46 | if (!res) |
| 47 | { |
| 48 | DBUG_ASSERT(error != 0); |
| 49 | sql_print_error("Error in Log_event::read_log_event(): " |
| 50 | "'%s', data_len: %d, event_type: %d" , |
| 51 | error,data_len,head[EVENT_TYPE_OFFSET]); |
| 52 | } |
| 53 | (*arg_buf)+= data_len; |
| 54 | (*arg_buf_len)-= data_len; |
| 55 | DBUG_RETURN(res); |
| 56 | } |
| 57 | |
| 58 | #include "transaction.h" // trans_commit(), trans_rollback() |
| 59 | #include "rpl_rli.h" // class Relay_log_info; |
| 60 | |
| 61 | void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) |
| 62 | { |
| 63 | if (thd->wsrep_apply_format) |
| 64 | { |
| 65 | delete (Format_description_log_event*)thd->wsrep_apply_format; |
| 66 | } |
| 67 | thd->wsrep_apply_format= ev; |
| 68 | } |
| 69 | |
| 70 | Format_description_log_event* wsrep_get_apply_format(THD* thd) |
| 71 | { |
| 72 | if (thd->wsrep_apply_format) |
| 73 | { |
| 74 | return (Format_description_log_event*) thd->wsrep_apply_format; |
| 75 | } |
| 76 | |
| 77 | DBUG_ASSERT(thd->wsrep_rgi); |
| 78 | |
| 79 | return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; |
| 80 | } |
| 81 | |
| 82 | static wsrep_cb_status_t wsrep_apply_events(THD* thd, |
| 83 | const void* events_buf, |
| 84 | size_t buf_len) |
| 85 | { |
| 86 | char *buf= (char *)events_buf; |
| 87 | int rcode= 0; |
| 88 | int event= 1; |
| 89 | Log_event_type typ; |
| 90 | |
| 91 | DBUG_ENTER("wsrep_apply_events" ); |
| 92 | |
| 93 | if (thd->killed == KILL_CONNECTION && |
| 94 | thd->wsrep_conflict_state != REPLAYING) |
| 95 | { |
| 96 | WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld" , |
| 97 | (long long) wsrep_thd_trx_seqno(thd)); |
| 98 | DBUG_RETURN(WSREP_CB_FAILURE); |
| 99 | } |
| 100 | |
| 101 | mysql_mutex_lock(&thd->LOCK_thd_data); |
| 102 | thd->wsrep_query_state= QUERY_EXEC; |
| 103 | if (thd->wsrep_conflict_state!= REPLAYING) |
| 104 | thd->wsrep_conflict_state= NO_CONFLICT; |
| 105 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
| 106 | |
| 107 | if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld" , |
| 108 | (long long) wsrep_thd_trx_seqno(thd)); |
| 109 | |
| 110 | while(buf_len) |
| 111 | { |
| 112 | int exec_res; |
| 113 | Log_event* ev= wsrep_read_log_event(&buf, &buf_len, |
| 114 | wsrep_get_apply_format(thd)); |
| 115 | |
| 116 | if (!ev) |
| 117 | { |
| 118 | WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu" , |
| 119 | (long long)wsrep_thd_trx_seqno(thd), buf_len); |
| 120 | rcode= 1; |
| 121 | goto error; |
| 122 | } |
| 123 | |
| 124 | typ= ev->get_type_code(); |
| 125 | |
| 126 | switch (typ) { |
| 127 | case FORMAT_DESCRIPTION_EVENT: |
| 128 | wsrep_set_apply_format(thd, (Format_description_log_event*)ev); |
| 129 | continue; |
| 130 | #ifdef GTID_SUPPORT |
| 131 | case GTID_LOG_EVENT: |
| 132 | { |
| 133 | Gtid_log_event* gev= (Gtid_log_event*)ev; |
| 134 | if (gev->get_gno() == 0) |
| 135 | { |
| 136 | /* Skip GTID log event to make binlog to generate LTID on commit */ |
| 137 | delete ev; |
| 138 | continue; |
| 139 | } |
| 140 | } |
| 141 | #endif /* GTID_SUPPORT */ |
| 142 | default: |
| 143 | break; |
| 144 | } |
| 145 | |
| 146 | /* Use the original server id for logging. */ |
| 147 | thd->set_server_id(ev->server_id); |
| 148 | thd->set_time(); // time the query |
| 149 | wsrep_xid_init(&thd->transaction.xid_state.xid, |
| 150 | thd->wsrep_trx_meta.gtid.uuid, |
| 151 | thd->wsrep_trx_meta.gtid.seqno); |
| 152 | thd->lex->current_select= 0; |
| 153 | if (!ev->when) |
| 154 | { |
| 155 | my_hrtime_t hrtime= my_hrtime(); |
| 156 | ev->when= hrtime_to_my_time(hrtime); |
| 157 | ev->when_sec_part= hrtime_sec_part(hrtime); |
| 158 | } |
| 159 | |
| 160 | thd->variables.option_bits= |
| 161 | (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | |
| 162 | (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); |
| 163 | |
| 164 | ev->thd = thd; |
| 165 | exec_res = ev->apply_event(thd->wsrep_rgi); |
| 166 | DBUG_PRINT("info" , ("exec_event result: %d" , exec_res)); |
| 167 | |
| 168 | if (exec_res) |
| 169 | { |
| 170 | WSREP_WARN("RBR event %d %s apply warning: %d, %lld" , |
| 171 | event, ev->get_type_str(), exec_res, |
| 172 | (long long) wsrep_thd_trx_seqno(thd)); |
| 173 | rcode= exec_res; |
| 174 | /* stop processing for the first error */ |
| 175 | delete ev; |
| 176 | goto error; |
| 177 | } |
| 178 | event++; |
| 179 | |
| 180 | if (thd->wsrep_conflict_state!= NO_CONFLICT && |
| 181 | thd->wsrep_conflict_state!= REPLAYING) |
| 182 | WSREP_WARN("conflict state after RBR event applying: %d, %lld" , |
| 183 | thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd)); |
| 184 | |
| 185 | if (thd->wsrep_conflict_state == MUST_ABORT) { |
| 186 | WSREP_WARN("RBR event apply failed, rolling back: %lld" , |
| 187 | (long long) wsrep_thd_trx_seqno(thd)); |
| 188 | trans_rollback(thd); |
| 189 | thd->locked_tables_list.unlock_locked_tables(thd); |
| 190 | /* Release transactional metadata locks. */ |
| 191 | thd->mdl_context.release_transactional_locks(); |
| 192 | thd->wsrep_conflict_state= NO_CONFLICT; |
| 193 | DBUG_RETURN(WSREP_CB_FAILURE); |
| 194 | } |
| 195 | |
| 196 | delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); |
| 197 | } |
| 198 | |
| 199 | error: |
| 200 | mysql_mutex_lock(&thd->LOCK_thd_data); |
| 201 | thd->wsrep_query_state= QUERY_IDLE; |
| 202 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
| 203 | |
| 204 | assert(thd->wsrep_exec_mode== REPL_RECV); |
| 205 | |
| 206 | if (thd->killed == KILL_CONNECTION) |
| 207 | WSREP_INFO("applier aborted: %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
| 208 | |
| 209 | if (rcode) DBUG_RETURN(WSREP_CB_FAILURE); |
| 210 | DBUG_RETURN(WSREP_CB_SUCCESS); |
| 211 | } |
| 212 | |
| 213 | wsrep_cb_status_t wsrep_apply_cb(void* const ctx, |
| 214 | const void* const buf, |
| 215 | size_t const buf_len, |
| 216 | uint32_t const flags, |
| 217 | const wsrep_trx_meta_t* meta) |
| 218 | { |
| 219 | THD* const thd((THD*)ctx); |
| 220 | |
| 221 | assert(thd->wsrep_apply_toi == false); |
| 222 | |
| 223 | // Allow tests to block the applier thread using the DBUG facilities. |
| 224 | DBUG_EXECUTE_IF("sync.wsrep_apply_cb" , |
| 225 | { |
| 226 | const char act[]= |
| 227 | "now " |
| 228 | "SIGNAL sync.wsrep_apply_cb_reached " |
| 229 | "WAIT_FOR signal.wsrep_apply_cb" ; |
| 230 | DBUG_ASSERT(!debug_sync_set_action(thd, |
| 231 | STRING_WITH_LEN(act))); |
| 232 | };); |
| 233 | |
| 234 | thd->wsrep_trx_meta = *meta; |
| 235 | |
| 236 | #ifdef WSREP_PROC_INFO |
| 237 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 238 | "Applying write set %lld: %p, %zu" , |
| 239 | (long long)wsrep_thd_trx_seqno(thd), buf, buf_len); |
| 240 | thd_proc_info(thd, thd->wsrep_info); |
| 241 | #else |
| 242 | thd_proc_info(thd, "Applying write set" ); |
| 243 | #endif /* WSREP_PROC_INFO */ |
| 244 | |
| 245 | /* tune FK and UK checking policy */ |
| 246 | if (wsrep_slave_UK_checks == FALSE) |
| 247 | thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS; |
| 248 | else |
| 249 | thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; |
| 250 | |
| 251 | if (wsrep_slave_FK_checks == FALSE) |
| 252 | thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS; |
| 253 | else |
| 254 | thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; |
| 255 | |
| 256 | /* With galera we assume that the master has done the constraint checks */ |
| 257 | thd->variables.option_bits|= OPTION_NO_CHECK_CONSTRAINT_CHECKS; |
| 258 | |
| 259 | if (flags & WSREP_FLAG_ISOLATION) |
| 260 | { |
| 261 | thd->wsrep_apply_toi= true; |
| 262 | /* |
| 263 | Don't run in transaction mode with TOI actions. |
| 264 | */ |
| 265 | thd->variables.option_bits&= ~OPTION_BEGIN; |
| 266 | thd->server_status&= ~SERVER_STATUS_IN_TRANS; |
| 267 | } |
| 268 | wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len)); |
| 269 | |
| 270 | #ifdef WSREP_PROC_INFO |
| 271 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 272 | "Applied write set %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
| 273 | thd_proc_info(thd, thd->wsrep_info); |
| 274 | #else |
| 275 | thd_proc_info(thd, "Applied write set" ); |
| 276 | #endif /* WSREP_PROC_INFO */ |
| 277 | |
| 278 | if (WSREP_CB_SUCCESS != rcode) |
| 279 | { |
| 280 | wsrep_dump_rbr_buf_with_header(thd, buf, buf_len); |
| 281 | } |
| 282 | |
| 283 | if (thd->has_thd_temporary_tables()) |
| 284 | { |
| 285 | WSREP_DEBUG("Applier %lld has temporary tables. Closing them now.." , |
| 286 | thd->thread_id); |
| 287 | thd->close_temporary_tables(); |
| 288 | } |
| 289 | |
| 290 | return rcode; |
| 291 | } |
| 292 | |
| 293 | static wsrep_cb_status_t wsrep_commit(THD* const thd) |
| 294 | { |
| 295 | #ifdef WSREP_PROC_INFO |
| 296 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 297 | "Committing %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
| 298 | thd_proc_info(thd, thd->wsrep_info); |
| 299 | #else |
| 300 | thd_proc_info(thd, "Committing" ); |
| 301 | #endif /* WSREP_PROC_INFO */ |
| 302 | |
| 303 | wsrep_cb_status_t const rcode(trans_commit(thd) ? |
| 304 | WSREP_CB_FAILURE : WSREP_CB_SUCCESS); |
| 305 | |
| 306 | if (WSREP_CB_SUCCESS == rcode) |
| 307 | { |
| 308 | thd->wsrep_rgi->cleanup_context(thd, false); |
| 309 | #ifdef GTID_SUPPORT |
| 310 | thd->variables.gtid_next.set_automatic(); |
| 311 | #endif /* GTID_SUPPORT */ |
| 312 | if (thd->wsrep_apply_toi) |
| 313 | { |
| 314 | wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, |
| 315 | thd->wsrep_trx_meta.gtid.seqno); |
| 316 | } |
| 317 | } |
| 318 | |
| 319 | #ifdef WSREP_PROC_INFO |
| 320 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 321 | "Committed %lld" , (long long) wsrep_thd_trx_seqno(thd)); |
| 322 | thd_proc_info(thd, thd->wsrep_info); |
| 323 | #else |
| 324 | thd_proc_info(thd, "Committed" ); |
| 325 | #endif /* WSREP_PROC_INFO */ |
| 326 | |
| 327 | return rcode; |
| 328 | } |
| 329 | |
| 330 | static wsrep_cb_status_t wsrep_rollback(THD* const thd) |
| 331 | { |
| 332 | #ifdef WSREP_PROC_INFO |
| 333 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 334 | "Rolling back %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
| 335 | thd_proc_info(thd, thd->wsrep_info); |
| 336 | #else |
| 337 | thd_proc_info(thd, "Rolling back" ); |
| 338 | #endif /* WSREP_PROC_INFO */ |
| 339 | |
| 340 | wsrep_cb_status_t const rcode(trans_rollback(thd) ? |
| 341 | WSREP_CB_FAILURE : WSREP_CB_SUCCESS); |
| 342 | |
| 343 | #ifdef WSREP_PROC_INFO |
| 344 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
| 345 | "Rolled back %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
| 346 | thd_proc_info(thd, thd->wsrep_info); |
| 347 | #else |
| 348 | thd_proc_info(thd, "Rolled back" ); |
| 349 | #endif /* WSREP_PROC_INFO */ |
| 350 | |
| 351 | return rcode; |
| 352 | } |
| 353 | |
| 354 | wsrep_cb_status_t wsrep_commit_cb(void* const ctx, |
| 355 | uint32_t const flags, |
| 356 | const wsrep_trx_meta_t* meta, |
| 357 | wsrep_bool_t* const exit, |
| 358 | bool const commit) |
| 359 | { |
| 360 | THD* const thd((THD*)ctx); |
| 361 | |
| 362 | assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd)); |
| 363 | |
| 364 | wsrep_cb_status_t rcode; |
| 365 | |
| 366 | if (commit) |
| 367 | rcode = wsrep_commit(thd); |
| 368 | else |
| 369 | rcode = wsrep_rollback(thd); |
| 370 | |
| 371 | /* Cleanup */ |
| 372 | wsrep_set_apply_format(thd, NULL); |
| 373 | thd->mdl_context.release_transactional_locks(); |
| 374 | thd->reset_query(); /* Mutex protected */ |
| 375 | free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); |
| 376 | thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; |
| 377 | |
| 378 | if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode) |
| 379 | { |
| 380 | mysql_mutex_lock(&LOCK_wsrep_slave_threads); |
| 381 | if (wsrep_slave_count_change < 0) |
| 382 | { |
| 383 | wsrep_slave_count_change++; |
| 384 | *exit = true; |
| 385 | } |
| 386 | mysql_mutex_unlock(&LOCK_wsrep_slave_threads); |
| 387 | } |
| 388 | |
| 389 | if (thd->wsrep_applier) |
| 390 | { |
| 391 | /* From trans_begin() */ |
| 392 | thd->variables.option_bits|= OPTION_BEGIN; |
| 393 | thd->server_status|= SERVER_STATUS_IN_TRANS; |
| 394 | thd->wsrep_apply_toi= false; |
| 395 | } |
| 396 | |
| 397 | return rcode; |
| 398 | } |
| 399 | |
| 400 | |
| 401 | wsrep_cb_status_t wsrep_unordered_cb(void* const ctx, |
| 402 | const void* const data, |
| 403 | size_t const size) |
| 404 | { |
| 405 | return WSREP_CB_SUCCESS; |
| 406 | } |
| 407 | |