1/* Copyright (C) 2013 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15
16#include "mariadb.h"
17#include "wsrep_thd.h"
18#include "transaction.h"
19#include "rpl_rli.h"
20#include "log_event.h"
21#include "sql_parse.h"
22//#include "global_threads.h" // LOCK_thread_count, etc.
23#include "sql_base.h" // close_thread_tables()
24#include "mysqld.h" // start_wsrep_THD();
25
26#include "slave.h" // opt_log_slave_updates
27#include "rpl_filter.h"
28#include "rpl_rli.h"
29#include "rpl_mi.h"
30
31#if (__LP64__)
32static volatile int64 wsrep_bf_aborts_counter(0);
33#define WSREP_ATOMIC_LOAD_LONG my_atomic_load64
34#define WSREP_ATOMIC_ADD_LONG my_atomic_add64
35#else
36static volatile int32 wsrep_bf_aborts_counter(0);
37#define WSREP_ATOMIC_LOAD_LONG my_atomic_load32
38#define WSREP_ATOMIC_ADD_LONG my_atomic_add32
39#endif
40
41int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
42 enum enum_var_type scope)
43{
44 wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter);
45 var->type = SHOW_LONGLONG;
46 var->value = (char*)&wsrep_local_bf_aborts;
47 return 0;
48}
49
50/* must have (&thd->LOCK_thd_data) */
51void wsrep_client_rollback(THD *thd)
52{
53 WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s",
54 (longlong) thd->thread_id, thd->query());
55
56 WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1);
57
58 thd->wsrep_conflict_state= ABORTING;
59 mysql_mutex_unlock(&thd->LOCK_thd_data);
60 trans_rollback(thd);
61
62 if (thd->locked_tables_mode && thd->lock)
63 {
64 WSREP_DEBUG("unlocking tables for BF abort (%lld)",
65 (longlong) thd->thread_id);
66 thd->locked_tables_list.unlock_locked_tables(thd);
67 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
68 }
69
70 if (thd->global_read_lock.is_acquired())
71 {
72 WSREP_DEBUG("unlocking GRL for BF abort (%lld)",
73 (longlong) thd->thread_id);
74 thd->global_read_lock.unlock_global_read_lock(thd);
75 }
76
77 /* Release transactional metadata locks. */
78 thd->mdl_context.release_transactional_locks();
79
80 /* release explicit MDL locks */
81 thd->mdl_context.release_explicit_locks();
82
83 if (thd->get_binlog_table_maps())
84 {
85 WSREP_DEBUG("clearing binlog table map for BF abort (%lld)",
86 (longlong) thd->thread_id);
87 thd->clear_binlog_table_maps();
88 }
89 mysql_mutex_lock(&thd->LOCK_thd_data);
90 thd->wsrep_conflict_state= ABORTED;
91}
92
93#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
94#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
95
96static rpl_group_info* wsrep_relay_group_init(const char* log_fname)
97{
98 Relay_log_info* rli= new Relay_log_info(false);
99
100 if (!rli->relay_log.description_event_for_exec)
101 {
102 rli->relay_log.description_event_for_exec=
103 new Format_description_log_event(4);
104 }
105
106 static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") };
107
108 /*
109 Master_info's constructor initializes rpl_filter by either an already
110 constructed Rpl_filter object from global 'rpl_filters' list if the
111 specified connection name is same, or it constructs a new Rpl_filter
112 object and adds it to rpl_filters. This object is later destructed by
113 Mater_info's destructor by looking it up based on connection name in
114 rpl_filters list.
115
116 However, since all Master_info objects created here would share same
117 connection name ("wsrep"), destruction of any of the existing Master_info
118 objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced
119 by any/all existing Master_info objects.
120
121 In order to avoid that, we have added a check in Master_info's destructor
122 to not free the "wsrep" rpl_filter. It will eventually be freed by
123 free_all_rpl_filters() when server terminates.
124 */
125 rli->mi = new Master_info(&connection_name, false);
126
127 struct rpl_group_info *rgi= new rpl_group_info(rli);
128 rgi->thd= rli->sql_driver_thd= current_thd;
129
130 if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
131 {
132 rgi->deferred_events= new Deferred_log_events(rli);
133 }
134
135 return rgi;
136}
137
138static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
139{
140 shadow->options = thd->variables.option_bits;
141 shadow->server_status = thd->server_status;
142 shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
143 shadow->vio = thd->net.vio;
144
145 // Disable general logging on applier threads
146 thd->variables.option_bits |= OPTION_LOG_OFF;
147 // Enable binlogging if opt_log_slave_updates is set
148 if (opt_log_slave_updates)
149 thd->variables.option_bits|= OPTION_BIN_LOG;
150 else
151 thd->variables.option_bits&= ~(OPTION_BIN_LOG);
152
153 if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay");
154
155 /* thd->system_thread_info.rpl_sql_info isn't initialized. */
156 thd->system_thread_info.rpl_sql_info=
157 new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
158
159 thd->wsrep_exec_mode= REPL_RECV;
160 thd->net.vio= 0;
161 thd->clear_error();
162
163 shadow->tx_isolation = thd->variables.tx_isolation;
164 thd->variables.tx_isolation = ISO_READ_COMMITTED;
165 thd->tx_isolation = ISO_READ_COMMITTED;
166
167 shadow->db = thd->db.str;
168 shadow->db_length = thd->db.length;
169 shadow->user_time = thd->user_time;
170 shadow->row_count_func= thd->get_row_count_func();
171 thd->reset_db(&null_clex_str);
172}
173
174static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
175{
176 LEX_CSTRING db= {shadow->db, shadow->db_length };
177 thd->variables.option_bits = shadow->options;
178 thd->server_status = shadow->server_status;
179 thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
180 thd->net.vio = shadow->vio;
181 thd->variables.tx_isolation = shadow->tx_isolation;
182 thd->user_time = shadow->user_time;
183 thd->reset_db(&db);
184
185 delete thd->system_thread_info.rpl_sql_info;
186 delete thd->wsrep_rgi->rli->mi;
187 delete thd->wsrep_rgi->rli;
188
189 thd->wsrep_rgi->cleanup_after_session();
190 delete thd->wsrep_rgi;
191 thd->wsrep_rgi = NULL;
192 thd->set_row_count_func(shadow->row_count_func);
193}
194
195void wsrep_replay_transaction(THD *thd)
196{
197 DBUG_ENTER("wsrep_replay_transaction");
198 /* checking if BF trx must be replayed */
199 if (thd->wsrep_conflict_state== MUST_REPLAY) {
200 DBUG_ASSERT(wsrep_thd_trx_seqno(thd));
201 if (thd->wsrep_exec_mode!= REPL_RECV) {
202 if (thd->get_stmt_da()->is_sent())
203 {
204 WSREP_ERROR("replay issue, thd has reported status already");
205 }
206
207
208 /*
209 PS reprepare observer should have been removed already.
210 open_table() will fail if we have dangling observer here.
211 */
212 DBUG_ASSERT(thd->m_reprepare_observer == NULL);
213
214 struct da_shadow
215 {
216 enum Diagnostics_area::enum_diagnostics_status status;
217 ulonglong affected_rows;
218 ulonglong last_insert_id;
219 char message[MYSQL_ERRMSG_SIZE];
220 };
221 struct da_shadow da_status;
222 da_status.status= thd->get_stmt_da()->status();
223 if (da_status.status == Diagnostics_area::DA_OK)
224 {
225 da_status.affected_rows= thd->get_stmt_da()->affected_rows();
226 da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
227 strmake(da_status.message,
228 thd->get_stmt_da()->message(),
229 sizeof(da_status.message)-1);
230 }
231
232 thd->get_stmt_da()->reset_diagnostics_area();
233
234 thd->wsrep_conflict_state= REPLAYING;
235 mysql_mutex_unlock(&thd->LOCK_thd_data);
236
237 thd->reset_for_next_command();
238 thd->reset_killed();
239 close_thread_tables(thd);
240 if (thd->locked_tables_mode && thd->lock)
241 {
242 WSREP_DEBUG("releasing table lock for replaying (%lld)",
243 (longlong) thd->thread_id);
244 thd->locked_tables_list.unlock_locked_tables(thd);
245 thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
246 }
247 thd->mdl_context.release_transactional_locks();
248 /*
249 Replaying will call MYSQL_START_STATEMENT when handling
250 BEGIN Query_log_event so end statement must be called before
251 replaying.
252 */
253 MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
254 thd->m_statement_psi= NULL;
255 thd->m_digest= NULL;
256 thd_proc_info(thd, "WSREP replaying trx");
257 WSREP_DEBUG("replay trx: %s %lld",
258 thd->query() ? thd->query() : "void",
259 (long long)wsrep_thd_trx_seqno(thd));
260 struct wsrep_thd_shadow shadow;
261 wsrep_prepare_bf_thd(thd, &shadow);
262
263 /* From trans_begin() */
264 thd->variables.option_bits|= OPTION_BEGIN;
265 thd->server_status|= SERVER_STATUS_IN_TRANS;
266
267 int rcode = wsrep->replay_trx(wsrep,
268 &thd->wsrep_ws_handle,
269 (void *)thd);
270
271 wsrep_return_from_bf_mode(thd, &shadow);
272 if (thd->wsrep_conflict_state!= REPLAYING)
273 WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
274
275 mysql_mutex_lock(&thd->LOCK_thd_data);
276
277 switch (rcode)
278 {
279 case WSREP_OK:
280 thd->wsrep_conflict_state= NO_CONFLICT;
281 wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
282 WSREP_DEBUG("trx_replay successful for: %lld %lld",
283 (longlong) thd->thread_id, (longlong) thd->real_id);
284 if (thd->get_stmt_da()->is_sent())
285 {
286 WSREP_WARN("replay ok, thd has reported status");
287 }
288 else if (thd->get_stmt_da()->is_set())
289 {
290 if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
291 thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
292 {
293 WSREP_WARN("replay ok, thd has error status %d",
294 thd->get_stmt_da()->status());
295 }
296 }
297 else
298 {
299 if (da_status.status == Diagnostics_area::DA_OK)
300 {
301 my_ok(thd,
302 da_status.affected_rows,
303 da_status.last_insert_id,
304 da_status.message);
305 }
306 else
307 {
308 my_ok(thd);
309 }
310 }
311 break;
312 case WSREP_TRX_FAIL:
313 if (thd->get_stmt_da()->is_sent())
314 {
315 WSREP_ERROR("replay failed, thd has reported status");
316 }
317 else
318 {
319 WSREP_DEBUG("replay failed, rolling back");
320 }
321 thd->wsrep_conflict_state= ABORTED;
322 wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
323 break;
324 default:
325 WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
326 rcode, thd->get_db(),
327 thd->query() ? thd->query() : "void");
328 /* we're now in inconsistent state, must abort */
329
330 /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */
331 mysql_mutex_unlock(&thd->LOCK_thd_data);
332
333 unireg_abort(1);
334 break;
335 }
336
337 wsrep_cleanup_transaction(thd);
338
339 mysql_mutex_lock(&LOCK_wsrep_replaying);
340 wsrep_replaying--;
341 WSREP_DEBUG("replaying decreased: %d, thd: %lld",
342 wsrep_replaying, (longlong) thd->thread_id);
343 mysql_cond_broadcast(&COND_wsrep_replaying);
344 mysql_mutex_unlock(&LOCK_wsrep_replaying);
345 }
346 }
347 DBUG_VOID_RETURN;
348}
349
350static void wsrep_replication_process(THD *thd)
351{
352 int rcode;
353 DBUG_ENTER("wsrep_replication_process");
354
355 struct wsrep_thd_shadow shadow;
356 wsrep_prepare_bf_thd(thd, &shadow);
357
358 /* From trans_begin() */
359 thd->variables.option_bits|= OPTION_BEGIN;
360 thd->server_status|= SERVER_STATUS_IN_TRANS;
361
362 rcode = wsrep->recv(wsrep, (void *)thd);
363 DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
364
365 WSREP_INFO("applier thread exiting (code:%d)", rcode);
366
367 switch (rcode) {
368 case WSREP_OK:
369 case WSREP_NOT_IMPLEMENTED:
370 case WSREP_CONN_FAIL:
371 /* provider does not support slave operations / disconnected from group,
372 * just close applier thread */
373 break;
374 case WSREP_NODE_FAIL:
375 /* data inconsistency => SST is needed */
376 /* Note: we cannot just blindly restart replication here,
377 * SST might require server restart if storage engines must be
378 * initialized after SST */
379 WSREP_ERROR("node consistency compromised, aborting");
380 wsrep_kill_mysql(thd);
381 break;
382 case WSREP_WARNING:
383 case WSREP_TRX_FAIL:
384 case WSREP_TRX_MISSING:
385 /* these suggests a bug in provider code */
386 WSREP_WARN("bad return from recv() call: %d", rcode);
387 /* Shut down this node. */
388 /* fall through */
389 case WSREP_FATAL:
390 /* Cluster connectivity is lost.
391 *
392 * If applier was killed on purpose (KILL_CONNECTION), we
393 * avoid mysql shutdown. This is because the killer will then handle
394 * shutdown processing (or replication restarting)
395 */
396 if (thd->killed != KILL_CONNECTION)
397 {
398 wsrep_kill_mysql(thd);
399 }
400 break;
401 }
402
403 mysql_mutex_lock(&LOCK_thread_count);
404 wsrep_close_applier(thd);
405 mysql_cond_broadcast(&COND_thread_count);
406 mysql_mutex_unlock(&LOCK_thread_count);
407
408 if(thd->has_thd_temporary_tables())
409 {
410 WSREP_WARN("Applier %lld has temporary tables at exit.",
411 thd->thread_id);
412 }
413 wsrep_return_from_bf_mode(thd, &shadow);
414 DBUG_VOID_RETURN;
415}
416
417static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
418{
419 ulong old_wsrep_running_threads= wsrep_running_threads;
420 pthread_t unused;
421 mysql_mutex_lock(&LOCK_thread_count);
422 bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD,
423 (void*)processor);
424 /*
425 if starting a thread on server startup, wait until the this thread's THD
426 is fully initialized (otherwise a THD initialization code might
427 try to access a partially initialized server data structure - MDEV-8208).
428 */
429 if (!mysqld_server_initialized)
430 while (old_wsrep_running_threads == wsrep_running_threads)
431 mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
432 mysql_mutex_unlock(&LOCK_thread_count);
433 return res;
434}
435
436void wsrep_create_appliers(long threads)
437{
438 if (!wsrep_connected)
439 {
440 /* see wsrep_replication_start() for the logic */
441 if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
442 wsrep_provider && strcasecmp(wsrep_provider, "none"))
443 {
444 WSREP_ERROR("Trying to launch slave threads before creating "
445 "connection at '%s'", wsrep_cluster_address);
446 assert(0);
447 }
448 return;
449 }
450
451 long wsrep_threads=0;
452 while (wsrep_threads++ < threads) {
453 if (create_wsrep_THD(wsrep_replication_process))
454 WSREP_WARN("Can't create thread to manage wsrep replication");
455 }
456}
457
458static void wsrep_rollback_process(THD *thd)
459{
460 DBUG_ENTER("wsrep_rollback_process");
461
462 mysql_mutex_lock(&LOCK_wsrep_rollback);
463 wsrep_aborting_thd= NULL;
464
465 while (thd->killed == NOT_KILLED) {
466 thd_proc_info(thd, "WSREP aborter idle");
467 thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
468 thd->mysys_var->current_cond= &COND_wsrep_rollback;
469
470 mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
471
472 WSREP_DEBUG("WSREP rollback thread wakes for signal");
473
474 mysql_mutex_lock(&thd->mysys_var->mutex);
475 thd_proc_info(thd, "WSREP aborter active");
476 thd->mysys_var->current_mutex= 0;
477 thd->mysys_var->current_cond= 0;
478 mysql_mutex_unlock(&thd->mysys_var->mutex);
479
480 /* check for false alarms */
481 if (!wsrep_aborting_thd)
482 {
483 WSREP_DEBUG("WSREP rollback thread has empty abort queue");
484 }
485 /* process all entries in the queue */
486 while (wsrep_aborting_thd) {
487 THD *aborting;
488 wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
489 aborting = wsrep_aborting_thd->aborting_thd;
490 my_free(wsrep_aborting_thd);
491 wsrep_aborting_thd= next;
492 /*
493 * must release mutex, appliers my want to add more
494 * aborting thds in our work queue, while we rollback
495 */
496 mysql_mutex_unlock(&LOCK_wsrep_rollback);
497
498 mysql_mutex_lock(&aborting->LOCK_thd_data);
499 if (aborting->wsrep_conflict_state== ABORTED)
500 {
501 WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
502 (long long)aborting->real_id,
503 aborting->wsrep_conflict_state);
504
505 mysql_mutex_unlock(&aborting->LOCK_thd_data);
506 mysql_mutex_lock(&LOCK_wsrep_rollback);
507 continue;
508 }
509 aborting->wsrep_conflict_state= ABORTING;
510
511 mysql_mutex_unlock(&aborting->LOCK_thd_data);
512
513 set_current_thd(aborting);
514 aborting->store_globals();
515
516 mysql_mutex_lock(&aborting->LOCK_thd_data);
517 wsrep_client_rollback(aborting);
518 WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)",
519 (longlong) aborting->thread_id,
520 (longlong) aborting->real_id);
521 mysql_mutex_unlock(&aborting->LOCK_thd_data);
522
523 set_current_thd(thd);
524 thd->store_globals();
525
526 mysql_mutex_lock(&LOCK_wsrep_rollback);
527 }
528 }
529
530 mysql_mutex_unlock(&LOCK_wsrep_rollback);
531 sql_print_information("WSREP: rollbacker thread exiting");
532
533 DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
534 DBUG_VOID_RETURN;
535}
536
537void wsrep_create_rollbacker()
538{
539 if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
540 {
541 /* create rollbacker */
542 if (create_wsrep_THD(wsrep_rollback_process))
543 WSREP_WARN("Can't create thread to manage wsrep rollback");
544 }
545}
546
547void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
548{
549 if (thd_ptr)
550 {
551 THD* thd = (THD*)thd_ptr;
552 thd->wsrep_PA_safe = safe;
553 }
554}
555
556enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync)
557{
558 enum wsrep_conflict_state state = NO_CONFLICT;
559 if (thd)
560 {
561 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
562
563 state = thd->wsrep_conflict_state;
564 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
565 }
566 return state;
567}
568
569my_bool wsrep_thd_is_wsrep(THD *thd)
570{
571 my_bool status = FALSE;
572 if (thd)
573 {
574 status = (WSREP(thd) && WSREP_PROVIDER_EXISTS);
575 }
576 return status;
577}
578
579my_bool wsrep_thd_is_BF(THD *thd, my_bool sync)
580{
581 my_bool status = FALSE;
582 if (thd)
583 {
584 // THD can be BF only if provider exists
585 if (wsrep_thd_is_wsrep(thd))
586 {
587 if (sync)
588 mysql_mutex_lock(&thd->LOCK_thd_data);
589
590 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
591 (thd->wsrep_exec_mode == TOTAL_ORDER));
592 if (sync)
593 mysql_mutex_unlock(&thd->LOCK_thd_data);
594 }
595 }
596 return status;
597}
598
599extern "C"
600my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
601{
602 bool status = FALSE;
603 if (thd_ptr)
604 {
605 THD* thd = (THD*)thd_ptr;
606 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
607
608 status = ((thd->wsrep_exec_mode == REPL_RECV) ||
609 (thd->wsrep_exec_mode == TOTAL_ORDER) ||
610 (thd->wsrep_exec_mode == LOCAL_COMMIT));
611 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
612 }
613 return status;
614}
615
616extern "C"
617my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
618{
619 bool status = FALSE;
620 if (thd_ptr)
621 {
622 THD* thd = (THD*)thd_ptr;
623 if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
624
625 status = (thd->wsrep_exec_mode == LOCAL_STATE);
626 if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
627 }
628 return status;
629}
630
631int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
632{
633 THD *victim_thd = (THD *) victim_thd_ptr;
634 THD *bf_thd = (THD *) bf_thd_ptr;
635 DBUG_ENTER("wsrep_abort_thd");
636
637 if ( (WSREP(bf_thd) ||
638 ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
639 bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
640 victim_thd)
641 {
642 if ((victim_thd->wsrep_conflict_state == MUST_ABORT) ||
643 (victim_thd->wsrep_conflict_state == ABORTED) ||
644 (victim_thd->wsrep_conflict_state == ABORTING))
645 {
646 WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already "
647 "aborted. Ignoring.",
648 (bf_thd) ? (long long)bf_thd->real_id : 0,
649 (long long)victim_thd->real_id);
650 DBUG_RETURN(1);
651 }
652
653 WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
654 (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
655 ha_abort_transaction(bf_thd, victim_thd, signal);
656 }
657 else
658 {
659 WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
660 }
661
662 DBUG_RETURN(1);
663}
664
665extern "C"
666int wsrep_thd_in_locking_session(void *thd_ptr)
667{
668 if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
669 return 1;
670 }
671 return 0;
672}
673
674bool wsrep_thd_has_explicit_locks(THD *thd)
675{
676 assert(thd);
677 return thd->mdl_context.has_explicit_locks();
678}
679