1 | /* Copyright (C) 2013-2015 Codership Oy <info@codership.com> |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License along |
13 | with this program; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ |
15 | |
16 | #include "mariadb.h" |
17 | #include "wsrep_priv.h" |
18 | #include "wsrep_binlog.h" // wsrep_dump_rbr_buf() |
19 | #include "wsrep_xid.h" |
20 | |
21 | #include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. |
22 | #include "wsrep_applier.h" |
23 | #include "debug_sync.h" |
24 | |
25 | /* |
26 | read the first event from (*buf). The size of the (*buf) is (*buf_len). |
27 | At the end (*buf) is shitfed to point to the following event or NULL and |
28 | (*buf_len) will be changed to account just being read bytes of the 1st event. |
29 | */ |
30 | |
31 | static Log_event* wsrep_read_log_event( |
32 | char **arg_buf, size_t *arg_buf_len, |
33 | const Format_description_log_event *description_event) |
34 | { |
35 | DBUG_ENTER("wsrep_read_log_event" ); |
36 | char *head= (*arg_buf); |
37 | |
38 | uint data_len = uint4korr(head + EVENT_LEN_OFFSET); |
39 | char *buf= (*arg_buf); |
40 | const char *error= 0; |
41 | Log_event *res= 0; |
42 | |
43 | res= Log_event::read_log_event(buf, data_len, &error, description_event, |
44 | true); |
45 | |
46 | if (!res) |
47 | { |
48 | DBUG_ASSERT(error != 0); |
49 | sql_print_error("Error in Log_event::read_log_event(): " |
50 | "'%s', data_len: %d, event_type: %d" , |
51 | error,data_len,head[EVENT_TYPE_OFFSET]); |
52 | } |
53 | (*arg_buf)+= data_len; |
54 | (*arg_buf_len)-= data_len; |
55 | DBUG_RETURN(res); |
56 | } |
57 | |
58 | #include "transaction.h" // trans_commit(), trans_rollback() |
59 | #include "rpl_rli.h" // class Relay_log_info; |
60 | |
61 | void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) |
62 | { |
63 | if (thd->wsrep_apply_format) |
64 | { |
65 | delete (Format_description_log_event*)thd->wsrep_apply_format; |
66 | } |
67 | thd->wsrep_apply_format= ev; |
68 | } |
69 | |
70 | Format_description_log_event* wsrep_get_apply_format(THD* thd) |
71 | { |
72 | if (thd->wsrep_apply_format) |
73 | { |
74 | return (Format_description_log_event*) thd->wsrep_apply_format; |
75 | } |
76 | |
77 | DBUG_ASSERT(thd->wsrep_rgi); |
78 | |
79 | return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; |
80 | } |
81 | |
82 | static wsrep_cb_status_t wsrep_apply_events(THD* thd, |
83 | const void* events_buf, |
84 | size_t buf_len) |
85 | { |
86 | char *buf= (char *)events_buf; |
87 | int rcode= 0; |
88 | int event= 1; |
89 | Log_event_type typ; |
90 | |
91 | DBUG_ENTER("wsrep_apply_events" ); |
92 | |
93 | if (thd->killed == KILL_CONNECTION && |
94 | thd->wsrep_conflict_state != REPLAYING) |
95 | { |
96 | WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld" , |
97 | (long long) wsrep_thd_trx_seqno(thd)); |
98 | DBUG_RETURN(WSREP_CB_FAILURE); |
99 | } |
100 | |
101 | mysql_mutex_lock(&thd->LOCK_thd_data); |
102 | thd->wsrep_query_state= QUERY_EXEC; |
103 | if (thd->wsrep_conflict_state!= REPLAYING) |
104 | thd->wsrep_conflict_state= NO_CONFLICT; |
105 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
106 | |
107 | if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld" , |
108 | (long long) wsrep_thd_trx_seqno(thd)); |
109 | |
110 | while(buf_len) |
111 | { |
112 | int exec_res; |
113 | Log_event* ev= wsrep_read_log_event(&buf, &buf_len, |
114 | wsrep_get_apply_format(thd)); |
115 | |
116 | if (!ev) |
117 | { |
118 | WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu" , |
119 | (long long)wsrep_thd_trx_seqno(thd), buf_len); |
120 | rcode= 1; |
121 | goto error; |
122 | } |
123 | |
124 | typ= ev->get_type_code(); |
125 | |
126 | switch (typ) { |
127 | case FORMAT_DESCRIPTION_EVENT: |
128 | wsrep_set_apply_format(thd, (Format_description_log_event*)ev); |
129 | continue; |
130 | #ifdef GTID_SUPPORT |
131 | case GTID_LOG_EVENT: |
132 | { |
133 | Gtid_log_event* gev= (Gtid_log_event*)ev; |
134 | if (gev->get_gno() == 0) |
135 | { |
136 | /* Skip GTID log event to make binlog to generate LTID on commit */ |
137 | delete ev; |
138 | continue; |
139 | } |
140 | } |
141 | #endif /* GTID_SUPPORT */ |
142 | default: |
143 | break; |
144 | } |
145 | |
146 | /* Use the original server id for logging. */ |
147 | thd->set_server_id(ev->server_id); |
148 | thd->set_time(); // time the query |
149 | wsrep_xid_init(&thd->transaction.xid_state.xid, |
150 | thd->wsrep_trx_meta.gtid.uuid, |
151 | thd->wsrep_trx_meta.gtid.seqno); |
152 | thd->lex->current_select= 0; |
153 | if (!ev->when) |
154 | { |
155 | my_hrtime_t hrtime= my_hrtime(); |
156 | ev->when= hrtime_to_my_time(hrtime); |
157 | ev->when_sec_part= hrtime_sec_part(hrtime); |
158 | } |
159 | |
160 | thd->variables.option_bits= |
161 | (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | |
162 | (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); |
163 | |
164 | ev->thd = thd; |
165 | exec_res = ev->apply_event(thd->wsrep_rgi); |
166 | DBUG_PRINT("info" , ("exec_event result: %d" , exec_res)); |
167 | |
168 | if (exec_res) |
169 | { |
170 | WSREP_WARN("RBR event %d %s apply warning: %d, %lld" , |
171 | event, ev->get_type_str(), exec_res, |
172 | (long long) wsrep_thd_trx_seqno(thd)); |
173 | rcode= exec_res; |
174 | /* stop processing for the first error */ |
175 | delete ev; |
176 | goto error; |
177 | } |
178 | event++; |
179 | |
180 | if (thd->wsrep_conflict_state!= NO_CONFLICT && |
181 | thd->wsrep_conflict_state!= REPLAYING) |
182 | WSREP_WARN("conflict state after RBR event applying: %d, %lld" , |
183 | thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd)); |
184 | |
185 | if (thd->wsrep_conflict_state == MUST_ABORT) { |
186 | WSREP_WARN("RBR event apply failed, rolling back: %lld" , |
187 | (long long) wsrep_thd_trx_seqno(thd)); |
188 | trans_rollback(thd); |
189 | thd->locked_tables_list.unlock_locked_tables(thd); |
190 | /* Release transactional metadata locks. */ |
191 | thd->mdl_context.release_transactional_locks(); |
192 | thd->wsrep_conflict_state= NO_CONFLICT; |
193 | DBUG_RETURN(WSREP_CB_FAILURE); |
194 | } |
195 | |
196 | delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); |
197 | } |
198 | |
199 | error: |
200 | mysql_mutex_lock(&thd->LOCK_thd_data); |
201 | thd->wsrep_query_state= QUERY_IDLE; |
202 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
203 | |
204 | assert(thd->wsrep_exec_mode== REPL_RECV); |
205 | |
206 | if (thd->killed == KILL_CONNECTION) |
207 | WSREP_INFO("applier aborted: %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
208 | |
209 | if (rcode) DBUG_RETURN(WSREP_CB_FAILURE); |
210 | DBUG_RETURN(WSREP_CB_SUCCESS); |
211 | } |
212 | |
213 | wsrep_cb_status_t wsrep_apply_cb(void* const ctx, |
214 | const void* const buf, |
215 | size_t const buf_len, |
216 | uint32_t const flags, |
217 | const wsrep_trx_meta_t* meta) |
218 | { |
219 | THD* const thd((THD*)ctx); |
220 | |
221 | assert(thd->wsrep_apply_toi == false); |
222 | |
223 | // Allow tests to block the applier thread using the DBUG facilities. |
224 | DBUG_EXECUTE_IF("sync.wsrep_apply_cb" , |
225 | { |
226 | const char act[]= |
227 | "now " |
228 | "SIGNAL sync.wsrep_apply_cb_reached " |
229 | "WAIT_FOR signal.wsrep_apply_cb" ; |
230 | DBUG_ASSERT(!debug_sync_set_action(thd, |
231 | STRING_WITH_LEN(act))); |
232 | };); |
233 | |
234 | thd->wsrep_trx_meta = *meta; |
235 | |
236 | #ifdef WSREP_PROC_INFO |
237 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
238 | "Applying write set %lld: %p, %zu" , |
239 | (long long)wsrep_thd_trx_seqno(thd), buf, buf_len); |
240 | thd_proc_info(thd, thd->wsrep_info); |
241 | #else |
242 | thd_proc_info(thd, "Applying write set" ); |
243 | #endif /* WSREP_PROC_INFO */ |
244 | |
245 | /* tune FK and UK checking policy */ |
246 | if (wsrep_slave_UK_checks == FALSE) |
247 | thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS; |
248 | else |
249 | thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; |
250 | |
251 | if (wsrep_slave_FK_checks == FALSE) |
252 | thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS; |
253 | else |
254 | thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; |
255 | |
256 | /* With galera we assume that the master has done the constraint checks */ |
257 | thd->variables.option_bits|= OPTION_NO_CHECK_CONSTRAINT_CHECKS; |
258 | |
259 | if (flags & WSREP_FLAG_ISOLATION) |
260 | { |
261 | thd->wsrep_apply_toi= true; |
262 | /* |
263 | Don't run in transaction mode with TOI actions. |
264 | */ |
265 | thd->variables.option_bits&= ~OPTION_BEGIN; |
266 | thd->server_status&= ~SERVER_STATUS_IN_TRANS; |
267 | } |
268 | wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len)); |
269 | |
270 | #ifdef WSREP_PROC_INFO |
271 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
272 | "Applied write set %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
273 | thd_proc_info(thd, thd->wsrep_info); |
274 | #else |
275 | thd_proc_info(thd, "Applied write set" ); |
276 | #endif /* WSREP_PROC_INFO */ |
277 | |
278 | if (WSREP_CB_SUCCESS != rcode) |
279 | { |
280 | wsrep_dump_rbr_buf_with_header(thd, buf, buf_len); |
281 | } |
282 | |
283 | if (thd->has_thd_temporary_tables()) |
284 | { |
285 | WSREP_DEBUG("Applier %lld has temporary tables. Closing them now.." , |
286 | thd->thread_id); |
287 | thd->close_temporary_tables(); |
288 | } |
289 | |
290 | return rcode; |
291 | } |
292 | |
293 | static wsrep_cb_status_t wsrep_commit(THD* const thd) |
294 | { |
295 | #ifdef WSREP_PROC_INFO |
296 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
297 | "Committing %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
298 | thd_proc_info(thd, thd->wsrep_info); |
299 | #else |
300 | thd_proc_info(thd, "Committing" ); |
301 | #endif /* WSREP_PROC_INFO */ |
302 | |
303 | wsrep_cb_status_t const rcode(trans_commit(thd) ? |
304 | WSREP_CB_FAILURE : WSREP_CB_SUCCESS); |
305 | |
306 | if (WSREP_CB_SUCCESS == rcode) |
307 | { |
308 | thd->wsrep_rgi->cleanup_context(thd, false); |
309 | #ifdef GTID_SUPPORT |
310 | thd->variables.gtid_next.set_automatic(); |
311 | #endif /* GTID_SUPPORT */ |
312 | if (thd->wsrep_apply_toi) |
313 | { |
314 | wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, |
315 | thd->wsrep_trx_meta.gtid.seqno); |
316 | } |
317 | } |
318 | |
319 | #ifdef WSREP_PROC_INFO |
320 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
321 | "Committed %lld" , (long long) wsrep_thd_trx_seqno(thd)); |
322 | thd_proc_info(thd, thd->wsrep_info); |
323 | #else |
324 | thd_proc_info(thd, "Committed" ); |
325 | #endif /* WSREP_PROC_INFO */ |
326 | |
327 | return rcode; |
328 | } |
329 | |
330 | static wsrep_cb_status_t wsrep_rollback(THD* const thd) |
331 | { |
332 | #ifdef WSREP_PROC_INFO |
333 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
334 | "Rolling back %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
335 | thd_proc_info(thd, thd->wsrep_info); |
336 | #else |
337 | thd_proc_info(thd, "Rolling back" ); |
338 | #endif /* WSREP_PROC_INFO */ |
339 | |
340 | wsrep_cb_status_t const rcode(trans_rollback(thd) ? |
341 | WSREP_CB_FAILURE : WSREP_CB_SUCCESS); |
342 | |
343 | #ifdef WSREP_PROC_INFO |
344 | snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, |
345 | "Rolled back %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
346 | thd_proc_info(thd, thd->wsrep_info); |
347 | #else |
348 | thd_proc_info(thd, "Rolled back" ); |
349 | #endif /* WSREP_PROC_INFO */ |
350 | |
351 | return rcode; |
352 | } |
353 | |
354 | wsrep_cb_status_t wsrep_commit_cb(void* const ctx, |
355 | uint32_t const flags, |
356 | const wsrep_trx_meta_t* meta, |
357 | wsrep_bool_t* const exit, |
358 | bool const commit) |
359 | { |
360 | THD* const thd((THD*)ctx); |
361 | |
362 | assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd)); |
363 | |
364 | wsrep_cb_status_t rcode; |
365 | |
366 | if (commit) |
367 | rcode = wsrep_commit(thd); |
368 | else |
369 | rcode = wsrep_rollback(thd); |
370 | |
371 | /* Cleanup */ |
372 | wsrep_set_apply_format(thd, NULL); |
373 | thd->mdl_context.release_transactional_locks(); |
374 | thd->reset_query(); /* Mutex protected */ |
375 | free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); |
376 | thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; |
377 | |
378 | if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode) |
379 | { |
380 | mysql_mutex_lock(&LOCK_wsrep_slave_threads); |
381 | if (wsrep_slave_count_change < 0) |
382 | { |
383 | wsrep_slave_count_change++; |
384 | *exit = true; |
385 | } |
386 | mysql_mutex_unlock(&LOCK_wsrep_slave_threads); |
387 | } |
388 | |
389 | if (thd->wsrep_applier) |
390 | { |
391 | /* From trans_begin() */ |
392 | thd->variables.option_bits|= OPTION_BEGIN; |
393 | thd->server_status|= SERVER_STATUS_IN_TRANS; |
394 | thd->wsrep_apply_toi= false; |
395 | } |
396 | |
397 | return rcode; |
398 | } |
399 | |
400 | |
401 | wsrep_cb_status_t wsrep_unordered_cb(void* const ctx, |
402 | const void* const data, |
403 | size_t const size) |
404 | { |
405 | return WSREP_CB_SUCCESS; |
406 | } |
407 | |