1 | /* Copyright (C) 2013 Codership Oy <info@codership.com> |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License along |
13 | with this program; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ |
15 | |
16 | #include "mariadb.h" |
17 | #include "wsrep_thd.h" |
18 | #include "transaction.h" |
19 | #include "rpl_rli.h" |
20 | #include "log_event.h" |
21 | #include "sql_parse.h" |
22 | //#include "global_threads.h" // LOCK_thread_count, etc. |
23 | #include "sql_base.h" // close_thread_tables() |
24 | #include "mysqld.h" // start_wsrep_THD(); |
25 | |
26 | #include "slave.h" // opt_log_slave_updates |
27 | #include "rpl_filter.h" |
28 | #include "rpl_rli.h" |
29 | #include "rpl_mi.h" |
30 | |
31 | #if (__LP64__) |
32 | static volatile int64 wsrep_bf_aborts_counter(0); |
33 | #define WSREP_ATOMIC_LOAD_LONG my_atomic_load64 |
34 | #define WSREP_ATOMIC_ADD_LONG my_atomic_add64 |
35 | #else |
36 | static volatile int32 wsrep_bf_aborts_counter(0); |
37 | #define WSREP_ATOMIC_LOAD_LONG my_atomic_load32 |
38 | #define WSREP_ATOMIC_ADD_LONG my_atomic_add32 |
39 | #endif |
40 | |
41 | int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff, |
42 | enum enum_var_type scope) |
43 | { |
44 | wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter); |
45 | var->type = SHOW_LONGLONG; |
46 | var->value = (char*)&wsrep_local_bf_aborts; |
47 | return 0; |
48 | } |
49 | |
50 | /* must have (&thd->LOCK_thd_data) */ |
51 | void wsrep_client_rollback(THD *thd) |
52 | { |
53 | WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s" , |
54 | (longlong) thd->thread_id, thd->query()); |
55 | |
56 | WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1); |
57 | |
58 | thd->wsrep_conflict_state= ABORTING; |
59 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
60 | trans_rollback(thd); |
61 | |
62 | if (thd->locked_tables_mode && thd->lock) |
63 | { |
64 | WSREP_DEBUG("unlocking tables for BF abort (%lld)" , |
65 | (longlong) thd->thread_id); |
66 | thd->locked_tables_list.unlock_locked_tables(thd); |
67 | thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); |
68 | } |
69 | |
70 | if (thd->global_read_lock.is_acquired()) |
71 | { |
72 | WSREP_DEBUG("unlocking GRL for BF abort (%lld)" , |
73 | (longlong) thd->thread_id); |
74 | thd->global_read_lock.unlock_global_read_lock(thd); |
75 | } |
76 | |
77 | /* Release transactional metadata locks. */ |
78 | thd->mdl_context.release_transactional_locks(); |
79 | |
80 | /* release explicit MDL locks */ |
81 | thd->mdl_context.release_explicit_locks(); |
82 | |
83 | if (thd->get_binlog_table_maps()) |
84 | { |
85 | WSREP_DEBUG("clearing binlog table map for BF abort (%lld)" , |
86 | (longlong) thd->thread_id); |
87 | thd->clear_binlog_table_maps(); |
88 | } |
89 | mysql_mutex_lock(&thd->LOCK_thd_data); |
90 | thd->wsrep_conflict_state= ABORTED; |
91 | } |
92 | |
93 | #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1 |
94 | #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 |
95 | |
96 | static rpl_group_info* wsrep_relay_group_init(const char* log_fname) |
97 | { |
98 | Relay_log_info* rli= new Relay_log_info(false); |
99 | |
100 | if (!rli->relay_log.description_event_for_exec) |
101 | { |
102 | rli->relay_log.description_event_for_exec= |
103 | new Format_description_log_event(4); |
104 | } |
105 | |
106 | static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep" ) }; |
107 | |
108 | /* |
109 | Master_info's constructor initializes rpl_filter by either an already |
110 | constructed Rpl_filter object from global 'rpl_filters' list if the |
111 | specified connection name is same, or it constructs a new Rpl_filter |
112 | object and adds it to rpl_filters. This object is later destructed by |
113 | Mater_info's destructor by looking it up based on connection name in |
114 | rpl_filters list. |
115 | |
116 | However, since all Master_info objects created here would share same |
117 | connection name ("wsrep"), destruction of any of the existing Master_info |
118 | objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced |
119 | by any/all existing Master_info objects. |
120 | |
121 | In order to avoid that, we have added a check in Master_info's destructor |
122 | to not free the "wsrep" rpl_filter. It will eventually be freed by |
123 | free_all_rpl_filters() when server terminates. |
124 | */ |
125 | rli->mi = new Master_info(&connection_name, false); |
126 | |
127 | struct rpl_group_info *rgi= new rpl_group_info(rli); |
128 | rgi->thd= rli->sql_driver_thd= current_thd; |
129 | |
130 | if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) |
131 | { |
132 | rgi->deferred_events= new Deferred_log_events(rli); |
133 | } |
134 | |
135 | return rgi; |
136 | } |
137 | |
138 | static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) |
139 | { |
140 | shadow->options = thd->variables.option_bits; |
141 | shadow->server_status = thd->server_status; |
142 | shadow->wsrep_exec_mode = thd->wsrep_exec_mode; |
143 | shadow->vio = thd->net.vio; |
144 | |
145 | // Disable general logging on applier threads |
146 | thd->variables.option_bits |= OPTION_LOG_OFF; |
147 | // Enable binlogging if opt_log_slave_updates is set |
148 | if (opt_log_slave_updates) |
149 | thd->variables.option_bits|= OPTION_BIN_LOG; |
150 | else |
151 | thd->variables.option_bits&= ~(OPTION_BIN_LOG); |
152 | |
153 | if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay" ); |
154 | |
155 | /* thd->system_thread_info.rpl_sql_info isn't initialized. */ |
156 | thd->system_thread_info.rpl_sql_info= |
157 | new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter); |
158 | |
159 | thd->wsrep_exec_mode= REPL_RECV; |
160 | thd->net.vio= 0; |
161 | thd->clear_error(); |
162 | |
163 | shadow->tx_isolation = thd->variables.tx_isolation; |
164 | thd->variables.tx_isolation = ISO_READ_COMMITTED; |
165 | thd->tx_isolation = ISO_READ_COMMITTED; |
166 | |
167 | shadow->db = thd->db.str; |
168 | shadow->db_length = thd->db.length; |
169 | shadow->user_time = thd->user_time; |
170 | shadow->row_count_func= thd->get_row_count_func(); |
171 | thd->reset_db(&null_clex_str); |
172 | } |
173 | |
174 | static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) |
175 | { |
176 | LEX_CSTRING db= {shadow->db, shadow->db_length }; |
177 | thd->variables.option_bits = shadow->options; |
178 | thd->server_status = shadow->server_status; |
179 | thd->wsrep_exec_mode = shadow->wsrep_exec_mode; |
180 | thd->net.vio = shadow->vio; |
181 | thd->variables.tx_isolation = shadow->tx_isolation; |
182 | thd->user_time = shadow->user_time; |
183 | thd->reset_db(&db); |
184 | |
185 | delete thd->system_thread_info.rpl_sql_info; |
186 | delete thd->wsrep_rgi->rli->mi; |
187 | delete thd->wsrep_rgi->rli; |
188 | |
189 | thd->wsrep_rgi->cleanup_after_session(); |
190 | delete thd->wsrep_rgi; |
191 | thd->wsrep_rgi = NULL; |
192 | thd->set_row_count_func(shadow->row_count_func); |
193 | } |
194 | |
195 | void wsrep_replay_transaction(THD *thd) |
196 | { |
197 | DBUG_ENTER("wsrep_replay_transaction" ); |
198 | /* checking if BF trx must be replayed */ |
199 | if (thd->wsrep_conflict_state== MUST_REPLAY) { |
200 | DBUG_ASSERT(wsrep_thd_trx_seqno(thd)); |
201 | if (thd->wsrep_exec_mode!= REPL_RECV) { |
202 | if (thd->get_stmt_da()->is_sent()) |
203 | { |
204 | WSREP_ERROR("replay issue, thd has reported status already" ); |
205 | } |
206 | |
207 | |
208 | /* |
209 | PS reprepare observer should have been removed already. |
210 | open_table() will fail if we have dangling observer here. |
211 | */ |
212 | DBUG_ASSERT(thd->m_reprepare_observer == NULL); |
213 | |
214 | struct da_shadow |
215 | { |
216 | enum Diagnostics_area::enum_diagnostics_status status; |
217 | ulonglong affected_rows; |
218 | ulonglong last_insert_id; |
219 | char message[MYSQL_ERRMSG_SIZE]; |
220 | }; |
221 | struct da_shadow da_status; |
222 | da_status.status= thd->get_stmt_da()->status(); |
223 | if (da_status.status == Diagnostics_area::DA_OK) |
224 | { |
225 | da_status.affected_rows= thd->get_stmt_da()->affected_rows(); |
226 | da_status.last_insert_id= thd->get_stmt_da()->last_insert_id(); |
227 | strmake(da_status.message, |
228 | thd->get_stmt_da()->message(), |
229 | sizeof(da_status.message)-1); |
230 | } |
231 | |
232 | thd->get_stmt_da()->reset_diagnostics_area(); |
233 | |
234 | thd->wsrep_conflict_state= REPLAYING; |
235 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
236 | |
237 | thd->reset_for_next_command(); |
238 | thd->reset_killed(); |
239 | close_thread_tables(thd); |
240 | if (thd->locked_tables_mode && thd->lock) |
241 | { |
242 | WSREP_DEBUG("releasing table lock for replaying (%lld)" , |
243 | (longlong) thd->thread_id); |
244 | thd->locked_tables_list.unlock_locked_tables(thd); |
245 | thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); |
246 | } |
247 | thd->mdl_context.release_transactional_locks(); |
248 | /* |
249 | Replaying will call MYSQL_START_STATEMENT when handling |
250 | BEGIN Query_log_event so end statement must be called before |
251 | replaying. |
252 | */ |
253 | MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); |
254 | thd->m_statement_psi= NULL; |
255 | thd->m_digest= NULL; |
256 | thd_proc_info(thd, "WSREP replaying trx" ); |
257 | WSREP_DEBUG("replay trx: %s %lld" , |
258 | thd->query() ? thd->query() : "void" , |
259 | (long long)wsrep_thd_trx_seqno(thd)); |
260 | struct wsrep_thd_shadow shadow; |
261 | wsrep_prepare_bf_thd(thd, &shadow); |
262 | |
263 | /* From trans_begin() */ |
264 | thd->variables.option_bits|= OPTION_BEGIN; |
265 | thd->server_status|= SERVER_STATUS_IN_TRANS; |
266 | |
267 | int rcode = wsrep->replay_trx(wsrep, |
268 | &thd->wsrep_ws_handle, |
269 | (void *)thd); |
270 | |
271 | wsrep_return_from_bf_mode(thd, &shadow); |
272 | if (thd->wsrep_conflict_state!= REPLAYING) |
273 | WSREP_WARN("lost replaying mode: %d" , thd->wsrep_conflict_state ); |
274 | |
275 | mysql_mutex_lock(&thd->LOCK_thd_data); |
276 | |
277 | switch (rcode) |
278 | { |
279 | case WSREP_OK: |
280 | thd->wsrep_conflict_state= NO_CONFLICT; |
281 | wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); |
282 | WSREP_DEBUG("trx_replay successful for: %lld %lld" , |
283 | (longlong) thd->thread_id, (longlong) thd->real_id); |
284 | if (thd->get_stmt_da()->is_sent()) |
285 | { |
286 | WSREP_WARN("replay ok, thd has reported status" ); |
287 | } |
288 | else if (thd->get_stmt_da()->is_set()) |
289 | { |
290 | if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK && |
291 | thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK) |
292 | { |
293 | WSREP_WARN("replay ok, thd has error status %d" , |
294 | thd->get_stmt_da()->status()); |
295 | } |
296 | } |
297 | else |
298 | { |
299 | if (da_status.status == Diagnostics_area::DA_OK) |
300 | { |
301 | my_ok(thd, |
302 | da_status.affected_rows, |
303 | da_status.last_insert_id, |
304 | da_status.message); |
305 | } |
306 | else |
307 | { |
308 | my_ok(thd); |
309 | } |
310 | } |
311 | break; |
312 | case WSREP_TRX_FAIL: |
313 | if (thd->get_stmt_da()->is_sent()) |
314 | { |
315 | WSREP_ERROR("replay failed, thd has reported status" ); |
316 | } |
317 | else |
318 | { |
319 | WSREP_DEBUG("replay failed, rolling back" ); |
320 | } |
321 | thd->wsrep_conflict_state= ABORTED; |
322 | wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); |
323 | break; |
324 | default: |
325 | WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s" , |
326 | rcode, thd->get_db(), |
327 | thd->query() ? thd->query() : "void" ); |
328 | /* we're now in inconsistent state, must abort */ |
329 | |
330 | /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */ |
331 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
332 | |
333 | unireg_abort(1); |
334 | break; |
335 | } |
336 | |
337 | wsrep_cleanup_transaction(thd); |
338 | |
339 | mysql_mutex_lock(&LOCK_wsrep_replaying); |
340 | wsrep_replaying--; |
341 | WSREP_DEBUG("replaying decreased: %d, thd: %lld" , |
342 | wsrep_replaying, (longlong) thd->thread_id); |
343 | mysql_cond_broadcast(&COND_wsrep_replaying); |
344 | mysql_mutex_unlock(&LOCK_wsrep_replaying); |
345 | } |
346 | } |
347 | DBUG_VOID_RETURN; |
348 | } |
349 | |
350 | static void wsrep_replication_process(THD *thd) |
351 | { |
352 | int rcode; |
353 | DBUG_ENTER("wsrep_replication_process" ); |
354 | |
355 | struct wsrep_thd_shadow shadow; |
356 | wsrep_prepare_bf_thd(thd, &shadow); |
357 | |
358 | /* From trans_begin() */ |
359 | thd->variables.option_bits|= OPTION_BEGIN; |
360 | thd->server_status|= SERVER_STATUS_IN_TRANS; |
361 | |
362 | rcode = wsrep->recv(wsrep, (void *)thd); |
363 | DBUG_PRINT("wsrep" ,("wsrep_repl returned: %d" , rcode)); |
364 | |
365 | WSREP_INFO("applier thread exiting (code:%d)" , rcode); |
366 | |
367 | switch (rcode) { |
368 | case WSREP_OK: |
369 | case WSREP_NOT_IMPLEMENTED: |
370 | case WSREP_CONN_FAIL: |
371 | /* provider does not support slave operations / disconnected from group, |
372 | * just close applier thread */ |
373 | break; |
374 | case WSREP_NODE_FAIL: |
375 | /* data inconsistency => SST is needed */ |
376 | /* Note: we cannot just blindly restart replication here, |
377 | * SST might require server restart if storage engines must be |
378 | * initialized after SST */ |
379 | WSREP_ERROR("node consistency compromised, aborting" ); |
380 | wsrep_kill_mysql(thd); |
381 | break; |
382 | case WSREP_WARNING: |
383 | case WSREP_TRX_FAIL: |
384 | case WSREP_TRX_MISSING: |
385 | /* these suggests a bug in provider code */ |
386 | WSREP_WARN("bad return from recv() call: %d" , rcode); |
387 | /* Shut down this node. */ |
388 | /* fall through */ |
389 | case WSREP_FATAL: |
390 | /* Cluster connectivity is lost. |
391 | * |
392 | * If applier was killed on purpose (KILL_CONNECTION), we |
393 | * avoid mysql shutdown. This is because the killer will then handle |
394 | * shutdown processing (or replication restarting) |
395 | */ |
396 | if (thd->killed != KILL_CONNECTION) |
397 | { |
398 | wsrep_kill_mysql(thd); |
399 | } |
400 | break; |
401 | } |
402 | |
403 | mysql_mutex_lock(&LOCK_thread_count); |
404 | wsrep_close_applier(thd); |
405 | mysql_cond_broadcast(&COND_thread_count); |
406 | mysql_mutex_unlock(&LOCK_thread_count); |
407 | |
408 | if(thd->has_thd_temporary_tables()) |
409 | { |
410 | WSREP_WARN("Applier %lld has temporary tables at exit." , |
411 | thd->thread_id); |
412 | } |
413 | wsrep_return_from_bf_mode(thd, &shadow); |
414 | DBUG_VOID_RETURN; |
415 | } |
416 | |
417 | static bool create_wsrep_THD(wsrep_thd_processor_fun processor) |
418 | { |
419 | ulong old_wsrep_running_threads= wsrep_running_threads; |
420 | pthread_t unused; |
421 | mysql_mutex_lock(&LOCK_thread_count); |
422 | bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD, |
423 | (void*)processor); |
424 | /* |
425 | if starting a thread on server startup, wait until the this thread's THD |
426 | is fully initialized (otherwise a THD initialization code might |
427 | try to access a partially initialized server data structure - MDEV-8208). |
428 | */ |
429 | if (!mysqld_server_initialized) |
430 | while (old_wsrep_running_threads == wsrep_running_threads) |
431 | mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); |
432 | mysql_mutex_unlock(&LOCK_thread_count); |
433 | return res; |
434 | } |
435 | |
436 | void wsrep_create_appliers(long threads) |
437 | { |
438 | if (!wsrep_connected) |
439 | { |
440 | /* see wsrep_replication_start() for the logic */ |
441 | if (wsrep_cluster_address && strlen(wsrep_cluster_address) && |
442 | wsrep_provider && strcasecmp(wsrep_provider, "none" )) |
443 | { |
444 | WSREP_ERROR("Trying to launch slave threads before creating " |
445 | "connection at '%s'" , wsrep_cluster_address); |
446 | assert(0); |
447 | } |
448 | return; |
449 | } |
450 | |
451 | long wsrep_threads=0; |
452 | while (wsrep_threads++ < threads) { |
453 | if (create_wsrep_THD(wsrep_replication_process)) |
454 | WSREP_WARN("Can't create thread to manage wsrep replication" ); |
455 | } |
456 | } |
457 | |
458 | static void wsrep_rollback_process(THD *thd) |
459 | { |
460 | DBUG_ENTER("wsrep_rollback_process" ); |
461 | |
462 | mysql_mutex_lock(&LOCK_wsrep_rollback); |
463 | wsrep_aborting_thd= NULL; |
464 | |
465 | while (thd->killed == NOT_KILLED) { |
466 | thd_proc_info(thd, "WSREP aborter idle" ); |
467 | thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; |
468 | thd->mysys_var->current_cond= &COND_wsrep_rollback; |
469 | |
470 | mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); |
471 | |
472 | WSREP_DEBUG("WSREP rollback thread wakes for signal" ); |
473 | |
474 | mysql_mutex_lock(&thd->mysys_var->mutex); |
475 | thd_proc_info(thd, "WSREP aborter active" ); |
476 | thd->mysys_var->current_mutex= 0; |
477 | thd->mysys_var->current_cond= 0; |
478 | mysql_mutex_unlock(&thd->mysys_var->mutex); |
479 | |
480 | /* check for false alarms */ |
481 | if (!wsrep_aborting_thd) |
482 | { |
483 | WSREP_DEBUG("WSREP rollback thread has empty abort queue" ); |
484 | } |
485 | /* process all entries in the queue */ |
486 | while (wsrep_aborting_thd) { |
487 | THD *aborting; |
488 | wsrep_aborting_thd_t next = wsrep_aborting_thd->next; |
489 | aborting = wsrep_aborting_thd->aborting_thd; |
490 | my_free(wsrep_aborting_thd); |
491 | wsrep_aborting_thd= next; |
492 | /* |
493 | * must release mutex, appliers my want to add more |
494 | * aborting thds in our work queue, while we rollback |
495 | */ |
496 | mysql_mutex_unlock(&LOCK_wsrep_rollback); |
497 | |
498 | mysql_mutex_lock(&aborting->LOCK_thd_data); |
499 | if (aborting->wsrep_conflict_state== ABORTED) |
500 | { |
501 | WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d" , |
502 | (long long)aborting->real_id, |
503 | aborting->wsrep_conflict_state); |
504 | |
505 | mysql_mutex_unlock(&aborting->LOCK_thd_data); |
506 | mysql_mutex_lock(&LOCK_wsrep_rollback); |
507 | continue; |
508 | } |
509 | aborting->wsrep_conflict_state= ABORTING; |
510 | |
511 | mysql_mutex_unlock(&aborting->LOCK_thd_data); |
512 | |
513 | set_current_thd(aborting); |
514 | aborting->store_globals(); |
515 | |
516 | mysql_mutex_lock(&aborting->LOCK_thd_data); |
517 | wsrep_client_rollback(aborting); |
518 | WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)" , |
519 | (longlong) aborting->thread_id, |
520 | (longlong) aborting->real_id); |
521 | mysql_mutex_unlock(&aborting->LOCK_thd_data); |
522 | |
523 | set_current_thd(thd); |
524 | thd->store_globals(); |
525 | |
526 | mysql_mutex_lock(&LOCK_wsrep_rollback); |
527 | } |
528 | } |
529 | |
530 | mysql_mutex_unlock(&LOCK_wsrep_rollback); |
531 | sql_print_information("WSREP: rollbacker thread exiting" ); |
532 | |
533 | DBUG_PRINT("wsrep" ,("wsrep rollbacker thread exiting" )); |
534 | DBUG_VOID_RETURN; |
535 | } |
536 | |
537 | void wsrep_create_rollbacker() |
538 | { |
539 | if (wsrep_provider && strcasecmp(wsrep_provider, "none" )) |
540 | { |
541 | /* create rollbacker */ |
542 | if (create_wsrep_THD(wsrep_rollback_process)) |
543 | WSREP_WARN("Can't create thread to manage wsrep rollback" ); |
544 | } |
545 | } |
546 | |
547 | void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe) |
548 | { |
549 | if (thd_ptr) |
550 | { |
551 | THD* thd = (THD*)thd_ptr; |
552 | thd->wsrep_PA_safe = safe; |
553 | } |
554 | } |
555 | |
556 | enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync) |
557 | { |
558 | enum wsrep_conflict_state state = NO_CONFLICT; |
559 | if (thd) |
560 | { |
561 | if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); |
562 | |
563 | state = thd->wsrep_conflict_state; |
564 | if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); |
565 | } |
566 | return state; |
567 | } |
568 | |
569 | my_bool wsrep_thd_is_wsrep(THD *thd) |
570 | { |
571 | my_bool status = FALSE; |
572 | if (thd) |
573 | { |
574 | status = (WSREP(thd) && WSREP_PROVIDER_EXISTS); |
575 | } |
576 | return status; |
577 | } |
578 | |
579 | my_bool wsrep_thd_is_BF(THD *thd, my_bool sync) |
580 | { |
581 | my_bool status = FALSE; |
582 | if (thd) |
583 | { |
584 | // THD can be BF only if provider exists |
585 | if (wsrep_thd_is_wsrep(thd)) |
586 | { |
587 | if (sync) |
588 | mysql_mutex_lock(&thd->LOCK_thd_data); |
589 | |
590 | status = ((thd->wsrep_exec_mode == REPL_RECV) || |
591 | (thd->wsrep_exec_mode == TOTAL_ORDER)); |
592 | if (sync) |
593 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
594 | } |
595 | } |
596 | return status; |
597 | } |
598 | |
599 | extern "C" |
600 | my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync) |
601 | { |
602 | bool status = FALSE; |
603 | if (thd_ptr) |
604 | { |
605 | THD* thd = (THD*)thd_ptr; |
606 | if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); |
607 | |
608 | status = ((thd->wsrep_exec_mode == REPL_RECV) || |
609 | (thd->wsrep_exec_mode == TOTAL_ORDER) || |
610 | (thd->wsrep_exec_mode == LOCAL_COMMIT)); |
611 | if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); |
612 | } |
613 | return status; |
614 | } |
615 | |
616 | extern "C" |
617 | my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync) |
618 | { |
619 | bool status = FALSE; |
620 | if (thd_ptr) |
621 | { |
622 | THD* thd = (THD*)thd_ptr; |
623 | if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); |
624 | |
625 | status = (thd->wsrep_exec_mode == LOCAL_STATE); |
626 | if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); |
627 | } |
628 | return status; |
629 | } |
630 | |
631 | int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) |
632 | { |
633 | THD *victim_thd = (THD *) victim_thd_ptr; |
634 | THD *bf_thd = (THD *) bf_thd_ptr; |
635 | DBUG_ENTER("wsrep_abort_thd" ); |
636 | |
637 | if ( (WSREP(bf_thd) || |
638 | ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) && |
639 | bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && |
640 | victim_thd) |
641 | { |
642 | if ((victim_thd->wsrep_conflict_state == MUST_ABORT) || |
643 | (victim_thd->wsrep_conflict_state == ABORTED) || |
644 | (victim_thd->wsrep_conflict_state == ABORTING)) |
645 | { |
646 | WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already " |
647 | "aborted. Ignoring." , |
648 | (bf_thd) ? (long long)bf_thd->real_id : 0, |
649 | (long long)victim_thd->real_id); |
650 | DBUG_RETURN(1); |
651 | } |
652 | |
653 | WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu" , (bf_thd) ? |
654 | (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); |
655 | ha_abort_transaction(bf_thd, victim_thd, signal); |
656 | } |
657 | else |
658 | { |
659 | WSREP_DEBUG("wsrep_abort_thd not effective: %p %p" , bf_thd, victim_thd); |
660 | } |
661 | |
662 | DBUG_RETURN(1); |
663 | } |
664 | |
665 | extern "C" |
666 | int wsrep_thd_in_locking_session(void *thd_ptr) |
667 | { |
668 | if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { |
669 | return 1; |
670 | } |
671 | return 0; |
672 | } |
673 | |
674 | bool wsrep_thd_has_explicit_locks(THD *thd) |
675 | { |
676 | assert(thd); |
677 | return thd->mdl_context.has_explicit_locks(); |
678 | } |
679 | |