1/* Copyright (C) 2013-2015 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15
16#include "mariadb.h"
17#include "wsrep_priv.h"
18#include "wsrep_binlog.h" // wsrep_dump_rbr_buf()
19#include "wsrep_xid.h"
20
21#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc.
22#include "wsrep_applier.h"
23#include "debug_sync.h"
24
25/*
26 read the first event from (*buf). The size of the (*buf) is (*buf_len).
27 At the end (*buf) is shitfed to point to the following event or NULL and
28 (*buf_len) will be changed to account just being read bytes of the 1st event.
29*/
30
31static Log_event* wsrep_read_log_event(
32 char **arg_buf, size_t *arg_buf_len,
33 const Format_description_log_event *description_event)
34{
35 DBUG_ENTER("wsrep_read_log_event");
36 char *head= (*arg_buf);
37
38 uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
39 char *buf= (*arg_buf);
40 const char *error= 0;
41 Log_event *res= 0;
42
43 res= Log_event::read_log_event(buf, data_len, &error, description_event,
44 true);
45
46 if (!res)
47 {
48 DBUG_ASSERT(error != 0);
49 sql_print_error("Error in Log_event::read_log_event(): "
50 "'%s', data_len: %d, event_type: %d",
51 error,data_len,head[EVENT_TYPE_OFFSET]);
52 }
53 (*arg_buf)+= data_len;
54 (*arg_buf_len)-= data_len;
55 DBUG_RETURN(res);
56}
57
58#include "transaction.h" // trans_commit(), trans_rollback()
59#include "rpl_rli.h" // class Relay_log_info;
60
61void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
62{
63 if (thd->wsrep_apply_format)
64 {
65 delete (Format_description_log_event*)thd->wsrep_apply_format;
66 }
67 thd->wsrep_apply_format= ev;
68}
69
70Format_description_log_event* wsrep_get_apply_format(THD* thd)
71{
72 if (thd->wsrep_apply_format)
73 {
74 return (Format_description_log_event*) thd->wsrep_apply_format;
75 }
76
77 DBUG_ASSERT(thd->wsrep_rgi);
78
79 return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
80}
81
82static wsrep_cb_status_t wsrep_apply_events(THD* thd,
83 const void* events_buf,
84 size_t buf_len)
85{
86 char *buf= (char *)events_buf;
87 int rcode= 0;
88 int event= 1;
89 Log_event_type typ;
90
91 DBUG_ENTER("wsrep_apply_events");
92
93 if (thd->killed == KILL_CONNECTION &&
94 thd->wsrep_conflict_state != REPLAYING)
95 {
96 WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
97 (long long) wsrep_thd_trx_seqno(thd));
98 DBUG_RETURN(WSREP_CB_FAILURE);
99 }
100
101 mysql_mutex_lock(&thd->LOCK_thd_data);
102 thd->wsrep_query_state= QUERY_EXEC;
103 if (thd->wsrep_conflict_state!= REPLAYING)
104 thd->wsrep_conflict_state= NO_CONFLICT;
105 mysql_mutex_unlock(&thd->LOCK_thd_data);
106
107 if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
108 (long long) wsrep_thd_trx_seqno(thd));
109
110 while(buf_len)
111 {
112 int exec_res;
113 Log_event* ev= wsrep_read_log_event(&buf, &buf_len,
114 wsrep_get_apply_format(thd));
115
116 if (!ev)
117 {
118 WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu",
119 (long long)wsrep_thd_trx_seqno(thd), buf_len);
120 rcode= 1;
121 goto error;
122 }
123
124 typ= ev->get_type_code();
125
126 switch (typ) {
127 case FORMAT_DESCRIPTION_EVENT:
128 wsrep_set_apply_format(thd, (Format_description_log_event*)ev);
129 continue;
130#ifdef GTID_SUPPORT
131 case GTID_LOG_EVENT:
132 {
133 Gtid_log_event* gev= (Gtid_log_event*)ev;
134 if (gev->get_gno() == 0)
135 {
136 /* Skip GTID log event to make binlog to generate LTID on commit */
137 delete ev;
138 continue;
139 }
140 }
141#endif /* GTID_SUPPORT */
142 default:
143 break;
144 }
145
146 /* Use the original server id for logging. */
147 thd->set_server_id(ev->server_id);
148 thd->set_time(); // time the query
149 wsrep_xid_init(&thd->transaction.xid_state.xid,
150 thd->wsrep_trx_meta.gtid.uuid,
151 thd->wsrep_trx_meta.gtid.seqno);
152 thd->lex->current_select= 0;
153 if (!ev->when)
154 {
155 my_hrtime_t hrtime= my_hrtime();
156 ev->when= hrtime_to_my_time(hrtime);
157 ev->when_sec_part= hrtime_sec_part(hrtime);
158 }
159
160 thd->variables.option_bits=
161 (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
162 (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
163
164 ev->thd = thd;
165 exec_res = ev->apply_event(thd->wsrep_rgi);
166 DBUG_PRINT("info", ("exec_event result: %d", exec_res));
167
168 if (exec_res)
169 {
170 WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
171 event, ev->get_type_str(), exec_res,
172 (long long) wsrep_thd_trx_seqno(thd));
173 rcode= exec_res;
174 /* stop processing for the first error */
175 delete ev;
176 goto error;
177 }
178 event++;
179
180 if (thd->wsrep_conflict_state!= NO_CONFLICT &&
181 thd->wsrep_conflict_state!= REPLAYING)
182 WSREP_WARN("conflict state after RBR event applying: %d, %lld",
183 thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd));
184
185 if (thd->wsrep_conflict_state == MUST_ABORT) {
186 WSREP_WARN("RBR event apply failed, rolling back: %lld",
187 (long long) wsrep_thd_trx_seqno(thd));
188 trans_rollback(thd);
189 thd->locked_tables_list.unlock_locked_tables(thd);
190 /* Release transactional metadata locks. */
191 thd->mdl_context.release_transactional_locks();
192 thd->wsrep_conflict_state= NO_CONFLICT;
193 DBUG_RETURN(WSREP_CB_FAILURE);
194 }
195
196 delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
197 }
198
199 error:
200 mysql_mutex_lock(&thd->LOCK_thd_data);
201 thd->wsrep_query_state= QUERY_IDLE;
202 mysql_mutex_unlock(&thd->LOCK_thd_data);
203
204 assert(thd->wsrep_exec_mode== REPL_RECV);
205
206 if (thd->killed == KILL_CONNECTION)
207 WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd));
208
209 if (rcode) DBUG_RETURN(WSREP_CB_FAILURE);
210 DBUG_RETURN(WSREP_CB_SUCCESS);
211}
212
213wsrep_cb_status_t wsrep_apply_cb(void* const ctx,
214 const void* const buf,
215 size_t const buf_len,
216 uint32_t const flags,
217 const wsrep_trx_meta_t* meta)
218{
219 THD* const thd((THD*)ctx);
220
221 assert(thd->wsrep_apply_toi == false);
222
223 // Allow tests to block the applier thread using the DBUG facilities.
224 DBUG_EXECUTE_IF("sync.wsrep_apply_cb",
225 {
226 const char act[]=
227 "now "
228 "SIGNAL sync.wsrep_apply_cb_reached "
229 "WAIT_FOR signal.wsrep_apply_cb";
230 DBUG_ASSERT(!debug_sync_set_action(thd,
231 STRING_WITH_LEN(act)));
232 };);
233
234 thd->wsrep_trx_meta = *meta;
235
236#ifdef WSREP_PROC_INFO
237 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
238 "Applying write set %lld: %p, %zu",
239 (long long)wsrep_thd_trx_seqno(thd), buf, buf_len);
240 thd_proc_info(thd, thd->wsrep_info);
241#else
242 thd_proc_info(thd, "Applying write set");
243#endif /* WSREP_PROC_INFO */
244
245 /* tune FK and UK checking policy */
246 if (wsrep_slave_UK_checks == FALSE)
247 thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
248 else
249 thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
250
251 if (wsrep_slave_FK_checks == FALSE)
252 thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
253 else
254 thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
255
256 /* With galera we assume that the master has done the constraint checks */
257 thd->variables.option_bits|= OPTION_NO_CHECK_CONSTRAINT_CHECKS;
258
259 if (flags & WSREP_FLAG_ISOLATION)
260 {
261 thd->wsrep_apply_toi= true;
262 /*
263 Don't run in transaction mode with TOI actions.
264 */
265 thd->variables.option_bits&= ~OPTION_BEGIN;
266 thd->server_status&= ~SERVER_STATUS_IN_TRANS;
267 }
268 wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len));
269
270#ifdef WSREP_PROC_INFO
271 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
272 "Applied write set %lld", (long long)wsrep_thd_trx_seqno(thd));
273 thd_proc_info(thd, thd->wsrep_info);
274#else
275 thd_proc_info(thd, "Applied write set");
276#endif /* WSREP_PROC_INFO */
277
278 if (WSREP_CB_SUCCESS != rcode)
279 {
280 wsrep_dump_rbr_buf_with_header(thd, buf, buf_len);
281 }
282
283 if (thd->has_thd_temporary_tables())
284 {
285 WSREP_DEBUG("Applier %lld has temporary tables. Closing them now..",
286 thd->thread_id);
287 thd->close_temporary_tables();
288 }
289
290 return rcode;
291}
292
293static wsrep_cb_status_t wsrep_commit(THD* const thd)
294{
295#ifdef WSREP_PROC_INFO
296 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
297 "Committing %lld", (long long)wsrep_thd_trx_seqno(thd));
298 thd_proc_info(thd, thd->wsrep_info);
299#else
300 thd_proc_info(thd, "Committing");
301#endif /* WSREP_PROC_INFO */
302
303 wsrep_cb_status_t const rcode(trans_commit(thd) ?
304 WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
305
306 if (WSREP_CB_SUCCESS == rcode)
307 {
308 thd->wsrep_rgi->cleanup_context(thd, false);
309#ifdef GTID_SUPPORT
310 thd->variables.gtid_next.set_automatic();
311#endif /* GTID_SUPPORT */
312 if (thd->wsrep_apply_toi)
313 {
314 wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
315 thd->wsrep_trx_meta.gtid.seqno);
316 }
317 }
318
319#ifdef WSREP_PROC_INFO
320 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
321 "Committed %lld", (long long) wsrep_thd_trx_seqno(thd));
322 thd_proc_info(thd, thd->wsrep_info);
323#else
324 thd_proc_info(thd, "Committed");
325#endif /* WSREP_PROC_INFO */
326
327 return rcode;
328}
329
330static wsrep_cb_status_t wsrep_rollback(THD* const thd)
331{
332#ifdef WSREP_PROC_INFO
333 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
334 "Rolling back %lld", (long long)wsrep_thd_trx_seqno(thd));
335 thd_proc_info(thd, thd->wsrep_info);
336#else
337 thd_proc_info(thd, "Rolling back");
338#endif /* WSREP_PROC_INFO */
339
340 wsrep_cb_status_t const rcode(trans_rollback(thd) ?
341 WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
342
343#ifdef WSREP_PROC_INFO
344 snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
345 "Rolled back %lld", (long long)wsrep_thd_trx_seqno(thd));
346 thd_proc_info(thd, thd->wsrep_info);
347#else
348 thd_proc_info(thd, "Rolled back");
349#endif /* WSREP_PROC_INFO */
350
351 return rcode;
352}
353
354wsrep_cb_status_t wsrep_commit_cb(void* const ctx,
355 uint32_t const flags,
356 const wsrep_trx_meta_t* meta,
357 wsrep_bool_t* const exit,
358 bool const commit)
359{
360 THD* const thd((THD*)ctx);
361
362 assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd));
363
364 wsrep_cb_status_t rcode;
365
366 if (commit)
367 rcode = wsrep_commit(thd);
368 else
369 rcode = wsrep_rollback(thd);
370
371 /* Cleanup */
372 wsrep_set_apply_format(thd, NULL);
373 thd->mdl_context.release_transactional_locks();
374 thd->reset_query(); /* Mutex protected */
375 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
376 thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
377
378 if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode)
379 {
380 mysql_mutex_lock(&LOCK_wsrep_slave_threads);
381 if (wsrep_slave_count_change < 0)
382 {
383 wsrep_slave_count_change++;
384 *exit = true;
385 }
386 mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
387 }
388
389 if (thd->wsrep_applier)
390 {
391 /* From trans_begin() */
392 thd->variables.option_bits|= OPTION_BEGIN;
393 thd->server_status|= SERVER_STATUS_IN_TRANS;
394 thd->wsrep_apply_toi= false;
395 }
396
397 return rcode;
398}
399
400
401wsrep_cb_status_t wsrep_unordered_cb(void* const ctx,
402 const void* const data,
403 size_t const size)
404{
405 return WSREP_CB_SUCCESS;
406}
407