1/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.x1
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16#include "mariadb.h"
17#include <mysqld.h>
18#include <sql_class.h>
19#include <sql_parse.h>
20#include <sql_base.h> /* find_temporary_table() */
21#include "slave.h"
22#include "rpl_mi.h"
23#include "sql_repl.h"
24#include "rpl_filter.h"
25#include "sql_callback.h"
26#include "sp_head.h"
27#include "sql_show.h"
28#include "sp.h"
29#include "wsrep_priv.h"
30#include "wsrep_thd.h"
31#include "wsrep_sst.h"
32#include "wsrep_utils.h"
33#include "wsrep_var.h"
34#include "wsrep_binlog.h"
35#include "wsrep_applier.h"
36#include "wsrep_xid.h"
37#include <cstdio>
38#include <cstdlib>
39#include "log_event.h"
40#include <slave.h>
41#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
42
43wsrep_t *wsrep = NULL;
44/*
45 wsrep_emulate_bin_log is a flag to tell that binlog has not been configured.
46 wsrep needs to get binlog events from transaction cache even when binlog is
47 not enabled, wsrep_emulate_bin_log opens needed code paths to make this
48 possible
49*/
50my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
51#ifdef GTID_SUPPORT
52/* Sidno in global_sid_map corresponding to group uuid */
53rpl_sidno wsrep_sidno= -1;
54#endif /* GTID_SUPPORT */
55my_bool wsrep_preordered_opt= FALSE;
56
57/*
58 * Begin configuration options
59 */
60
61extern my_bool plugins_are_initialized;
62extern uint kill_cached_threads;
63extern mysql_cond_t COND_thread_cache;
64
65/* System variables. */
66const char *wsrep_provider;
67const char *wsrep_provider_options;
68const char *wsrep_cluster_address;
69const char *wsrep_cluster_name;
70const char *wsrep_node_name;
71const char *wsrep_node_address;
72const char *wsrep_node_incoming_address;
73const char *wsrep_start_position;
74const char *wsrep_data_home_dir;
75const char *wsrep_dbug_option;
76const char *wsrep_notify_cmd;
77const char *wsrep_sst_method;
78const char *wsrep_sst_receive_address;
79const char *wsrep_sst_donor;
80const char *wsrep_sst_auth;
81my_bool wsrep_debug; // Enable debug level logging
82my_bool wsrep_convert_LOCK_to_trx; // Convert locking sessions to trx
83my_bool wsrep_auto_increment_control; // Control auto increment variables
84my_bool wsrep_drupal_282555_workaround; // Retry autoinc insert after dupkey
85my_bool wsrep_certify_nonPK; // Certify, even when no primary key
86my_bool wsrep_recovery; // Recovery
87my_bool wsrep_replicate_myisam; // Enable MyISAM replication
88my_bool wsrep_log_conflicts;
89my_bool wsrep_load_data_splitting; // Commit load data every 10K intervals
90my_bool wsrep_slave_UK_checks; // Slave thread does UK checks
91my_bool wsrep_slave_FK_checks; // Slave thread does FK checks
92my_bool wsrep_sst_donor_rejects_queries;
93my_bool wsrep_restart_slave; // Should mysql slave thread be
94 // restarted, when node joins back?
95my_bool wsrep_desync; // De(re)synchronize the node from the
96 // cluster
97long wsrep_slave_threads; // No. of slave appliers threads
98ulong wsrep_retry_autocommit; // Retry aborted autocommit trx
99ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size
100ulong wsrep_max_ws_rows; // Max number of rows in ws
101ulong wsrep_forced_binlog_format;
102ulong wsrep_mysql_replication_bundle;
103bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id
104 // for galera transactions?
105uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera
106 // transactions
107
108/* Other configuration variables and their default values. */
109my_bool wsrep_incremental_data_collection= 0; // Incremental data collection
110my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave
111 // restart will be needed
112bool wsrep_new_cluster= false; // Bootstrap the cluster?
113int wsrep_slave_count_change= 0; // No. of appliers to stop/start
114int wsrep_to_isolation= 0; // No. of active TO isolation threads
115long wsrep_max_protocol_version= 3; // Maximum protocol version to use
116
117/*
118 * End configuration options
119 */
120
121/*
122 * Other wsrep global variables.
123 */
124
125mysql_mutex_t LOCK_wsrep_ready;
126mysql_cond_t COND_wsrep_ready;
127mysql_mutex_t LOCK_wsrep_sst;
128mysql_cond_t COND_wsrep_sst;
129mysql_mutex_t LOCK_wsrep_sst_init;
130mysql_cond_t COND_wsrep_sst_init;
131mysql_mutex_t LOCK_wsrep_rollback;
132mysql_cond_t COND_wsrep_rollback;
133wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
134mysql_mutex_t LOCK_wsrep_replaying;
135mysql_cond_t COND_wsrep_replaying;
136mysql_mutex_t LOCK_wsrep_slave_threads;
137mysql_mutex_t LOCK_wsrep_desync;
138mysql_mutex_t LOCK_wsrep_config_state;
139
140int wsrep_replaying= 0;
141ulong wsrep_running_threads = 0; // # of currently running wsrep threads
142ulong my_bind_addr;
143
144#ifdef HAVE_PSI_INTERFACE
145PSI_mutex_key key_LOCK_wsrep_rollback,
146 key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
147 key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
148 key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
149 key_LOCK_wsrep_config_state;
150
151PSI_cond_key key_COND_wsrep_rollback,
152 key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
153 key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
154
155PSI_file_key key_file_wsrep_gra_log;
156
157static PSI_mutex_info wsrep_mutexes[]=
158{
159 { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
160 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
161 { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
162 { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
163 { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
164 { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
165 { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
166 { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
167 { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
168 { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
169};
170
171static PSI_cond_info wsrep_conds[]=
172{
173 { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
174 { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
175 { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
176 { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
177 { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
178 { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
179};
180
181static PSI_file_info wsrep_files[]=
182{
183 { &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
184};
185#endif
186
187my_bool wsrep_inited = 0; // initialized ?
188
189static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
190static char cluster_uuid_str[40]= { 0, };
191static const char* cluster_status_str[WSREP_VIEW_MAX] =
192{
193 "Primary",
194 "non-Primary",
195 "Disconnected"
196};
197
198static char provider_name[256]= { 0, };
199static char provider_version[256]= { 0, };
200static char provider_vendor[256]= { 0, };
201
202/*
203 * wsrep status variables
204 */
205my_bool wsrep_connected = FALSE;
206my_bool wsrep_ready = FALSE; // node can accept queries
207const char* wsrep_cluster_state_uuid = cluster_uuid_str;
208long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
209const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
210long wsrep_cluster_size = 0;
211long wsrep_local_index = -1;
212long long wsrep_local_bf_aborts = 0;
213const char* wsrep_provider_name = provider_name;
214const char* wsrep_provider_version = provider_version;
215const char* wsrep_provider_vendor = provider_vendor;
216/* End wsrep status variables */
217
218wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
219wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
220long wsrep_protocol_version = 3;
221
222wsp::Config_state *wsrep_config_state;
223
224// Boolean denoting if server is in initial startup phase. This is needed
225// to make sure that main thread waiting in wsrep_sst_wait() is signaled
226// if there was no state gap on receiving first view event.
227static my_bool wsrep_startup = TRUE;
228
229
230static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
231 switch (level) {
232 case WSREP_LOG_INFO:
233 sql_print_information("WSREP: %s", msg);
234 break;
235 case WSREP_LOG_WARN:
236 sql_print_warning("WSREP: %s", msg);
237 break;
238 case WSREP_LOG_ERROR:
239 case WSREP_LOG_FATAL:
240 sql_print_error("WSREP: %s", msg);
241 break;
242 case WSREP_LOG_DEBUG:
243 if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
244 default:
245 break;
246 }
247}
248
249void wsrep_log(void (*fun)(const char *, ...), const char *format, ...)
250{
251 va_list args;
252 char msg[1024];
253 va_start(args, format);
254 vsnprintf(msg, sizeof(msg) - 1, format, args);
255 va_end(args);
256 (fun)("WSREP: %s", msg);
257}
258
259
260static void wsrep_log_states (wsrep_log_level_t const level,
261 const wsrep_uuid_t* const group_uuid,
262 wsrep_seqno_t const group_seqno,
263 const wsrep_uuid_t* const node_uuid,
264 wsrep_seqno_t const node_seqno)
265{
266 char uuid_str[37];
267 char msg[256];
268
269 wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
270 snprintf (msg, 255, "WSREP: Group state: %s:%lld",
271 uuid_str, (long long)group_seqno);
272 wsrep_log_cb (level, msg);
273
274 wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
275 snprintf (msg, 255, "WSREP: Local state: %s:%lld",
276 uuid_str, (long long)node_seqno);
277 wsrep_log_cb (level, msg);
278}
279
280#ifdef GTID_SUPPORT
281void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid)
282{
283 /* generate new Sid map entry from inverted uuid */
284 rpl_sid sid;
285 wsrep_uuid_t ltid_uuid;
286
287 for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
288 {
289 ltid_uuid.data[i] = ~wsrep_uuid.data[i];
290 }
291
292 sid.copy_from(ltid_uuid.data);
293 global_sid_lock->wrlock();
294 wsrep_sidno= global_sid_map->add_sid(sid);
295 WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
296 global_sid_lock->unlock();
297}
298#endif /* GTID_SUPPORT */
299
300static wsrep_cb_status_t
301wsrep_view_handler_cb (void* app_ctx,
302 void* recv_ctx,
303 const wsrep_view_info_t* view,
304 const char* state,
305 size_t state_len,
306 void** sst_req,
307 size_t* sst_req_len)
308{
309 *sst_req = NULL;
310 *sst_req_len = 0;
311
312 wsrep_member_status_t memb_status= wsrep_config_state->get_status();
313
314 if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
315 {
316 memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
317 sizeof(cluster_uuid));
318
319 wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
320 sizeof(cluster_uuid_str));
321 }
322
323 wsrep_cluster_conf_id= view->view;
324 wsrep_cluster_status= cluster_status_str[view->status];
325 wsrep_cluster_size= view->memb_num;
326 wsrep_local_index= view->my_idx;
327
328 WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
329 "number of nodes: %ld, my index: %ld, protocol version %d",
330 wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
331 (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
332 wsrep_cluster_size, wsrep_local_index, view->proto_ver);
333
334 /* Proceed further only if view is PRIMARY */
335 if (WSREP_VIEW_PRIMARY != view->status)
336 {
337#ifdef HAVE_QUERY_CACHE
338 // query cache must be initialised by now
339 query_cache.flush();
340#endif /* HAVE_QUERY_CACHE */
341
342 wsrep_ready_set(FALSE);
343 memb_status= WSREP_MEMBER_UNDEFINED;
344 /* Always record local_uuid and local_seqno in non-prim since this
345 * may lead to re-initializing provider and start position is
346 * determined according to these variables */
347 // WRONG! local_uuid should be the last primary configuration uuid we were
348 // a member of. local_seqno should be updated in commit calls.
349 // local_uuid= cluster_uuid;
350 // local_seqno= view->first - 1;
351 goto out;
352 }
353
354 switch (view->proto_ver)
355 {
356 case 0:
357 case 1:
358 case 2:
359 case 3:
360 // version change
361 if (view->proto_ver != wsrep_protocol_version)
362 {
363 my_bool wsrep_ready_saved= wsrep_ready;
364 wsrep_ready_set(FALSE);
365 WSREP_INFO("closing client connections for "
366 "protocol change %ld -> %d",
367 wsrep_protocol_version, view->proto_ver);
368 wsrep_close_client_connections(TRUE);
369 wsrep_protocol_version= view->proto_ver;
370 wsrep_ready_set(wsrep_ready_saved);
371 }
372 break;
373 default:
374 WSREP_ERROR("Unsupported application protocol version: %d",
375 view->proto_ver);
376 unireg_abort(1);
377 }
378
379 if (view->state_gap)
380 {
381 WSREP_WARN("Gap in state sequence. Need state transfer.");
382
383 /* After that wsrep will call wsrep_sst_prepare. */
384 /* keep ready flag 0 until we receive the snapshot */
385 wsrep_ready_set(FALSE);
386
387 /* Close client connections to ensure that they don't interfere
388 * with SST. Necessary only if storage engines are initialized
389 * before SST.
390 * TODO: Just killing all ongoing transactions should be enough
391 * since wsrep_ready is OFF and no new transactions can start.
392 */
393 if (!wsrep_before_SE())
394 {
395 WSREP_DEBUG("[debug]: closing client connections for PRIM");
396 wsrep_close_client_connections(FALSE);
397 }
398
399 ssize_t const req_len= wsrep_sst_prepare (sst_req);
400
401 if (req_len < 0)
402 {
403 WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
404 strerror(-req_len));
405 memb_status= WSREP_MEMBER_UNDEFINED;
406 }
407 else
408 {
409 assert(sst_req != NULL);
410 *sst_req_len= req_len;
411 memb_status= WSREP_MEMBER_JOINER;
412 }
413 }
414 else
415 {
416 /*
417 * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
418 * before - OR - it was reinitilized on startup (lp:992840)
419 */
420 if (wsrep_startup)
421 {
422 if (wsrep_before_SE())
423 {
424 wsrep_SE_init_grab();
425 // Signal mysqld init thread to continue
426 wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
427 // and wait for SE initialization
428 wsrep_SE_init_wait();
429 }
430 else
431 {
432 local_uuid= cluster_uuid;
433 local_seqno= view->state_id.seqno;
434 }
435 /* Init storage engine XIDs from first view */
436 wsrep_set_SE_checkpoint(local_uuid, local_seqno);
437#ifdef GTID_SUPPORT
438 wsrep_init_sidno(local_uuid);
439#endif /* GTID_SUPPORT */
440 memb_status= WSREP_MEMBER_JOINED;
441 }
442
443 // just some sanity check
444 if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
445 {
446 WSREP_ERROR("Undetected state gap. Can't continue.");
447 wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
448 &local_uuid, -1);
449 unireg_abort(1);
450 }
451 }
452
453 if (wsrep_auto_increment_control)
454 {
455 global_system_variables.auto_increment_offset= view->my_idx + 1;
456 global_system_variables.auto_increment_increment= view->memb_num;
457 }
458
459 { /* capabilities may be updated on new configuration */
460 uint64_t const caps(wsrep->capabilities (wsrep));
461
462 my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
463 if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
464 {
465 WSREP_WARN("Unsupported protocol downgrade: "
466 "incremental data collection disabled. Expect abort.");
467 }
468 wsrep_incremental_data_collection = idc;
469 }
470
471out:
472 if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
473 wsrep_config_state->set(memb_status, view);
474
475 return WSREP_CB_SUCCESS;
476}
477
478void wsrep_ready_set (my_bool x)
479{
480 WSREP_DEBUG("Setting wsrep_ready to %d", x);
481 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
482 if (wsrep_ready != x)
483 {
484 wsrep_ready= x;
485 mysql_cond_signal (&COND_wsrep_ready);
486 }
487 mysql_mutex_unlock (&LOCK_wsrep_ready);
488}
489
490// Wait until wsrep has reached ready state
491void wsrep_ready_wait ()
492{
493 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
494 while (!wsrep_ready)
495 {
496 WSREP_INFO("Waiting to reach ready state");
497 mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
498 }
499 WSREP_INFO("ready state reached");
500 mysql_mutex_unlock (&LOCK_wsrep_ready);
501}
502
503static void wsrep_synced_cb(void* app_ctx)
504{
505 WSREP_INFO("Synchronized with group, ready for connections");
506 bool signal_main= false;
507 if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
508 if (!wsrep_ready)
509 {
510 wsrep_ready= TRUE;
511 mysql_cond_signal (&COND_wsrep_ready);
512 signal_main= true;
513
514 }
515 wsrep_config_state->set(WSREP_MEMBER_SYNCED);
516 mysql_mutex_unlock (&LOCK_wsrep_ready);
517
518 if (signal_main)
519 {
520 wsrep_SE_init_grab();
521 // Signal mysqld init thread to continue
522 wsrep_sst_complete (&local_uuid, local_seqno, false);
523 // and wait for SE initialization
524 wsrep_SE_init_wait();
525 }
526 if (wsrep_restart_slave_activated)
527 {
528 int rcode;
529 WSREP_INFO("MariaDB slave restart");
530 wsrep_restart_slave_activated= FALSE;
531
532 mysql_mutex_lock(&LOCK_active_mi);
533 if ((rcode = start_slave_threads(0,
534 1 /* need mutex */,
535 0 /* no wait for start*/,
536 active_mi,
537 master_info_file,
538 relay_log_info_file,
539 SLAVE_SQL)))
540 {
541 WSREP_WARN("Failed to create slave threads: %d", rcode);
542 }
543 mysql_mutex_unlock(&LOCK_active_mi);
544
545 }
546}
547
548static void wsrep_init_position()
549{
550 /* read XIDs from storage engines */
551 wsrep_uuid_t uuid;
552 wsrep_seqno_t seqno;
553 wsrep_get_SE_checkpoint(uuid, seqno);
554
555 if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
556 {
557 WSREP_INFO("Read nil XID from storage engines, skipping position init");
558 return;
559 }
560
561 char uuid_str[40] = {0, };
562 wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
563 WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
564
565 if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
566 local_seqno == WSREP_SEQNO_UNDEFINED)
567 {
568 // Initial state
569 local_uuid= uuid;
570 local_seqno= seqno;
571 }
572 else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) ||
573 local_seqno != seqno)
574 {
575 WSREP_WARN("Initial position was provided by configuration or SST, "
576 "avoiding override");
577 }
578}
579
580extern char* my_bind_addr_str;
581
582int wsrep_init()
583{
584 int rcode= -1;
585 DBUG_ASSERT(wsrep_inited == 0);
586
587 if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) &&
588 wsrep_start_position_init(wsrep_start_position))
589 {
590 return 1;
591 }
592
593 wsrep_sst_auth_init();
594
595 wsrep_ready_set(FALSE);
596 assert(wsrep_provider);
597
598 wsrep_init_position();
599
600 if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
601 {
602 if (strcasecmp(wsrep_provider, WSREP_NONE))
603 {
604 WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
605 wsrep_provider, strerror(rcode), rcode);
606 strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
607 return wsrep_init();
608 }
609 else /* this is for recursive call above */
610 {
611 WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
612 strerror(rcode), rcode);
613 unireg_abort(1);
614 }
615 }
616
617 if (!WSREP_PROVIDER_EXISTS)
618 {
619 // enable normal operation in case no provider is specified
620 wsrep_ready_set(TRUE);
621 wsrep_inited= 1;
622 global_system_variables.wsrep_on = 0;
623 wsrep_init_args args;
624 args.logger_cb = wsrep_log_cb;
625 args.options = (wsrep_provider_options) ?
626 wsrep_provider_options : "";
627 rcode = wsrep->init(wsrep, &args);
628 if (rcode)
629 {
630 DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
631 WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
632 wsrep->free(wsrep);
633 free(wsrep);
634 wsrep = NULL;
635 }
636 return rcode;
637 }
638 else
639 {
640 global_system_variables.wsrep_on = 1;
641 strncpy(provider_name,
642 wsrep->provider_name, sizeof(provider_name) - 1);
643 strncpy(provider_version,
644 wsrep->provider_version, sizeof(provider_version) - 1);
645 strncpy(provider_vendor,
646 wsrep->provider_vendor, sizeof(provider_vendor) - 1);
647 }
648
649 /* Initialize node address */
650 char node_addr[512]= { 0, };
651 size_t const node_addr_max= sizeof(node_addr) - 1;
652 if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
653 {
654 size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
655 if (!(ret > 0 && ret < node_addr_max))
656 {
657 WSREP_WARN("Failed to guess base node address. Set it explicitly via "
658 "wsrep_node_address.");
659 node_addr[0]= '\0';
660 }
661 }
662 else
663 {
664 strncpy(node_addr, wsrep_node_address, node_addr_max);
665 }
666
667 /* Initialize node's incoming address */
668 char inc_addr[512]= { 0, };
669 size_t const inc_addr_max= sizeof (inc_addr);
670
671 /*
672 In case wsrep_node_incoming_address is either not set or set to AUTO,
673 we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback
674 to wsrep_node_address' value if mysqld's bind-address is not set either.
675 */
676 if ((!wsrep_node_incoming_address ||
677 !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
678 {
679 bool is_ipv6= false;
680 unsigned int my_bind_ip= INADDR_ANY; // default if not set
681
682 if (my_bind_addr_str && strlen(my_bind_addr_str))
683 {
684 my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
685 }
686
687 if (INADDR_ANY != my_bind_ip)
688 {
689 /*
690 If its a not a valid address, leave inc_addr as empty string. mysqld
691 is not listening for client connections on network interfaces.
692 */
693 if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
694 {
695 const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u";
696 snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port);
697 }
698 }
699 else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
700 {
701 size_t const node_addr_len= strlen(node_addr);
702 if (node_addr_len > 0)
703 {
704 wsp::Address addr(node_addr);
705
706 if (!addr.is_valid())
707 {
708 WSREP_DEBUG("Could not parse node address : %s", node_addr);
709 WSREP_WARN("Guessing address for incoming client connections failed. "
710 "Try setting wsrep_node_incoming_address explicitly.");
711 goto done;
712 }
713
714 const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
715 snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(),
716 (int) mysqld_port);
717 }
718 }
719 }
720 else
721 {
722 wsp::Address addr(wsrep_node_incoming_address);
723
724 if (!addr.is_valid())
725 {
726 WSREP_WARN("Could not parse wsrep_node_incoming_address : %s",
727 wsrep_node_incoming_address);
728 goto done;
729 }
730
731 /*
732 In case port is not specified in wsrep_node_incoming_address, we use
733 mysqld_port.
734 */
735 int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port;
736 const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
737
738 snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
739 }
740
741done:
742 struct wsrep_init_args wsrep_args;
743
744 struct wsrep_gtid const state_id = { local_uuid, local_seqno };
745
746 wsrep_args.data_dir = wsrep_data_home_dir;
747 wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
748 wsrep_args.node_address = node_addr;
749 wsrep_args.node_incoming = inc_addr;
750 wsrep_args.options = (wsrep_provider_options) ?
751 wsrep_provider_options : "";
752 wsrep_args.proto_ver = wsrep_max_protocol_version;
753
754 wsrep_args.state_id = &state_id;
755
756 wsrep_args.logger_cb = wsrep_log_cb;
757 wsrep_args.view_handler_cb = wsrep_view_handler_cb;
758 wsrep_args.apply_cb = wsrep_apply_cb;
759 wsrep_args.commit_cb = wsrep_commit_cb;
760 wsrep_args.unordered_cb = wsrep_unordered_cb;
761 wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
762 wsrep_args.synced_cb = wsrep_synced_cb;
763
764 rcode = wsrep->init(wsrep, &wsrep_args);
765
766 if (rcode)
767 {
768 DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
769 WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
770 wsrep->free(wsrep);
771 free(wsrep);
772 wsrep = NULL;
773 } else {
774 wsrep_inited= 1;
775 }
776
777 return rcode;
778}
779
780
781/* Initialize wsrep thread LOCKs and CONDs */
782void wsrep_thr_init()
783{
784 DBUG_ENTER("wsrep_thr_init");
785 wsrep_config_state = new wsp::Config_state;
786#ifdef HAVE_PSI_INTERFACE
787 mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
788 mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
789 mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
790#endif
791
792 mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
793 mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
794 mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
795 mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
796 mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
797 mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
798 mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
799 mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
800 mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
801 mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
802 mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
803 mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
804 mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
805 DBUG_VOID_RETURN;
806}
807
808void wsrep_init_startup (bool first)
809{
810 if (wsrep_init()) unireg_abort(1);
811
812 wsrep_thr_lock_init(
813 (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF,
814 (wsrep_abort_thd_fun)wsrep_abort_thd,
815 wsrep_debug, wsrep_convert_LOCK_to_trx,
816 (wsrep_on_fun)wsrep_on);
817
818 /*
819 Pre-initialize global_system_variables.table_plugin with a dummy engine
820 (placeholder) required during the initialization of wsrep threads (THDs).
821 (see: plugin_thdvar_init())
822 Note: This only needs to be done for rsync & xtrabackup based SST methods.
823 In case of mysqldump SST method, the wsrep threads are created after the
824 server plugins & global system variables are initialized.
825 */
826 if (wsrep_before_SE())
827 wsrep_plugins_pre_init();
828
829 /* Skip replication start if dummy wsrep provider is loaded */
830 if (!strcmp(wsrep_provider, WSREP_NONE)) return;
831
832 /* Skip replication start if no cluster address */
833 if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return;
834
835 if (first) wsrep_sst_grab(); // do it so we can wait for SST below
836
837 if (!wsrep_start_replication()) unireg_abort(1);
838
839 wsrep_create_rollbacker();
840 wsrep_create_appliers(1);
841
842 if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
843}
844
845
846void wsrep_deinit(bool free_options)
847{
848 DBUG_ASSERT(wsrep_inited == 1);
849 wsrep_unload(wsrep);
850 wsrep= 0;
851 provider_name[0]= '\0';
852 provider_version[0]= '\0';
853 provider_vendor[0]= '\0';
854
855 wsrep_inited= 0;
856
857 if (free_options)
858 {
859 wsrep_sst_auth_free();
860 }
861}
862
863/* Destroy wsrep thread LOCKs and CONDs */
864void wsrep_thr_deinit()
865{
866 if (!wsrep_config_state)
867 return; // Never initialized
868 mysql_mutex_destroy(&LOCK_wsrep_ready);
869 mysql_cond_destroy(&COND_wsrep_ready);
870 mysql_mutex_destroy(&LOCK_wsrep_sst);
871 mysql_cond_destroy(&COND_wsrep_sst);
872 mysql_mutex_destroy(&LOCK_wsrep_sst_init);
873 mysql_cond_destroy(&COND_wsrep_sst_init);
874 mysql_mutex_destroy(&LOCK_wsrep_rollback);
875 mysql_cond_destroy(&COND_wsrep_rollback);
876 mysql_mutex_destroy(&LOCK_wsrep_replaying);
877 mysql_cond_destroy(&COND_wsrep_replaying);
878 mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
879 mysql_mutex_destroy(&LOCK_wsrep_desync);
880 mysql_mutex_destroy(&LOCK_wsrep_config_state);
881 delete wsrep_config_state;
882 wsrep_config_state= 0; // Safety
883}
884
885void wsrep_recover()
886{
887 char uuid_str[40];
888
889 if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
890 local_seqno == -2)
891 {
892 wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
893 WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
894 uuid_str, (long long)local_seqno);
895 return;
896 }
897 wsrep_uuid_t uuid;
898 wsrep_seqno_t seqno;
899 wsrep_get_SE_checkpoint(uuid, seqno);
900 wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
901 WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno);
902}
903
904
905void wsrep_stop_replication(THD *thd)
906{
907 WSREP_INFO("Stop replication");
908 if (!wsrep)
909 {
910 WSREP_INFO("Provider was not loaded, in stop replication");
911 return;
912 }
913
914 /* disconnect from group first to get wsrep_ready == FALSE */
915 WSREP_DEBUG("Provider disconnect");
916 wsrep->disconnect(wsrep);
917
918 wsrep_connected= FALSE;
919
920 wsrep_close_client_connections(TRUE);
921
922 /* wait until appliers have stopped */
923 wsrep_wait_appliers_close(thd);
924
925 return;
926}
927
928bool wsrep_start_replication()
929{
930 wsrep_status_t rcode;
931
932 /* wsrep provider must be loaded. */
933 DBUG_ASSERT(wsrep);
934
935 /*
936 if provider is trivial, don't even try to connect,
937 but resume local node operation
938 */
939 if (!WSREP_PROVIDER_EXISTS)
940 {
941 // enable normal operation in case no provider is specified
942 wsrep_ready_set(TRUE);
943 return true;
944 }
945
946 if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
947 {
948 // if provider is non-trivial, but no address is specified, wait for address
949 wsrep_ready_set(FALSE);
950 return true;
951 }
952
953 bool const bootstrap= wsrep_new_cluster;
954
955 WSREP_INFO("Start replication");
956
957 if (wsrep_new_cluster)
958 {
959 WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster");
960 wsrep_new_cluster= false;
961 }
962
963 if ((rcode = wsrep->connect(wsrep,
964 wsrep_cluster_name,
965 wsrep_cluster_address,
966 wsrep_sst_donor,
967 bootstrap)))
968 {
969 DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d",
970 wsrep_cluster_address, rcode));
971 WSREP_ERROR("wsrep::connect(%s) failed: %d",
972 wsrep_cluster_address, rcode);
973 return false;
974 }
975 else
976 {
977 wsrep_connected= TRUE;
978
979 char* opts= wsrep->options_get(wsrep);
980 if (opts)
981 {
982 wsrep_provider_options_init(opts);
983 free(opts);
984 }
985 else
986 {
987 WSREP_WARN("Failed to get wsrep options");
988 }
989 }
990
991 return true;
992}
993
994bool wsrep_must_sync_wait (THD* thd, uint mask)
995{
996 return (thd->variables.wsrep_sync_wait & mask) &&
997 thd->variables.wsrep_on &&
998 !thd->in_active_multi_stmt_transaction() &&
999 thd->wsrep_conflict_state != REPLAYING &&
1000 thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
1001}
1002
1003bool wsrep_sync_wait (THD* thd, uint mask)
1004{
1005 if (wsrep_must_sync_wait(thd, mask))
1006 {
1007 WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
1008 thd->variables.wsrep_sync_wait, mask);
1009 // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
1010 // TODO: modify to check if thd has locked any rows.
1011 wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
1012
1013 if (unlikely(WSREP_OK != ret))
1014 {
1015 const char* msg;
1016 int err;
1017
1018 // Possibly relevant error codes:
1019 // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
1020 // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
1021 // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
1022
1023 switch (ret)
1024 {
1025 case WSREP_NOT_IMPLEMENTED:
1026 msg= "synchronous reads by wsrep backend. "
1027 "Please unset wsrep_causal_reads variable.";
1028 err= ER_NOT_SUPPORTED_YET;
1029 break;
1030 default:
1031 msg= "Synchronous wait failed.";
1032 err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
1033 // with ER_LOCK_WAIT_TIMEOUT
1034 }
1035
1036 my_error(err, MYF(0), msg);
1037
1038 return true;
1039 }
1040 }
1041
1042 return false;
1043}
1044
1045/*
1046 * Helpers to deal with TOI key arrays
1047 */
1048typedef struct wsrep_key_arr
1049{
1050 wsrep_key_t* keys;
1051 size_t keys_len;
1052} wsrep_key_arr_t;
1053
1054
1055static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
1056{
1057 for (size_t i= 0; i < key_arr->keys_len; ++i)
1058 {
1059 my_free((void*)key_arr->keys[i].key_parts);
1060 }
1061 my_free(key_arr->keys);
1062 key_arr->keys= 0;
1063 key_arr->keys_len= 0;
1064}
1065
1066
1067/*!
1068 * @param db Database string
1069 * @param table Table string
1070 * @param key Array of wsrep_key_t
1071 * @param key_len In: number of elements in key array, Out: number of
1072 * elements populated
1073 *
1074 * @return true if preparation was successful, otherwise false.
1075 */
1076
1077static bool wsrep_prepare_key_for_isolation(const char* db,
1078 const char* table,
1079 wsrep_buf_t* key,
1080 size_t* key_len)
1081{
1082 if (*key_len < 2) return false;
1083
1084 switch (wsrep_protocol_version)
1085 {
1086 case 0:
1087 *key_len= 0;
1088 break;
1089 case 1:
1090 case 2:
1091 case 3:
1092 {
1093 *key_len= 0;
1094 if (db)
1095 {
1096 // sql_print_information("%s.%s", db, table);
1097 if (db)
1098 {
1099 key[*key_len].ptr= db;
1100 key[*key_len].len= strlen(db);
1101 ++(*key_len);
1102 if (table)
1103 {
1104 key[*key_len].ptr= table;
1105 key[*key_len].len= strlen(table);
1106 ++(*key_len);
1107 }
1108 }
1109 }
1110 break;
1111 }
1112 default:
1113 return false;
1114 }
1115
1116 return true;
1117}
1118
1119/* Prepare key list from db/table and table_list */
1120static bool wsrep_prepare_keys_for_isolation(THD* thd,
1121 const char* db,
1122 const char* table,
1123 const TABLE_LIST* table_list,
1124 wsrep_key_arr_t* ka)
1125{
1126 ka->keys= 0;
1127 ka->keys_len= 0;
1128
1129 if (db || table)
1130 {
1131 if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
1132 {
1133 WSREP_ERROR("Can't allocate memory for key_array");
1134 goto err;
1135 }
1136 ka->keys_len= 1;
1137 if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
1138 my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1139 {
1140 WSREP_ERROR("Can't allocate memory for key_parts");
1141 goto err;
1142 }
1143 ka->keys[0].key_parts_num= 2;
1144 if (!wsrep_prepare_key_for_isolation(
1145 db, table,
1146 (wsrep_buf_t*)ka->keys[0].key_parts,
1147 &ka->keys[0].key_parts_num))
1148 {
1149 WSREP_ERROR("Preparing keys for isolation failed (1)");
1150 goto err;
1151 }
1152 }
1153
1154 for (const TABLE_LIST* table= table_list; table; table= table->next_global)
1155 {
1156 wsrep_key_t* tmp;
1157 if (ka->keys)
1158 tmp= (wsrep_key_t*)my_realloc(ka->keys,
1159 (ka->keys_len + 1) * sizeof(wsrep_key_t),
1160 MYF(0));
1161 else
1162 tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
1163
1164 if (!tmp)
1165 {
1166 WSREP_ERROR("Can't allocate memory for key_array");
1167 goto err;
1168 }
1169 ka->keys= tmp;
1170 if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
1171 my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
1172 {
1173 WSREP_ERROR("Can't allocate memory for key_parts");
1174 goto err;
1175 }
1176 ka->keys[ka->keys_len].key_parts_num= 2;
1177 ++ka->keys_len;
1178 if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str,
1179 (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
1180 &ka->keys[ka->keys_len - 1].key_parts_num))
1181 {
1182 WSREP_ERROR("Preparing keys for isolation failed (2)");
1183 goto err;
1184 }
1185 }
1186 return 0;
1187err:
1188 wsrep_keys_free(ka);
1189 return 1;
1190}
1191
1192
1193bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
1194 const uchar* row_id, size_t row_id_len,
1195 wsrep_buf_t* key, size_t* key_len)
1196{
1197 if (*key_len < 3) return false;
1198
1199 *key_len= 0;
1200 switch (wsrep_protocol_version)
1201 {
1202 case 0:
1203 {
1204 key[0].ptr = cache_key;
1205 key[0].len = cache_key_len;
1206
1207 *key_len = 1;
1208 break;
1209 }
1210 case 1:
1211 case 2:
1212 case 3:
1213 {
1214 key[0].ptr = cache_key;
1215 key[0].len = strlen( (char*)cache_key );
1216
1217 key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
1218 key[1].len = strlen( (char*)(key[1].ptr) );
1219
1220 *key_len = 2;
1221 break;
1222 }
1223 default:
1224 return false;
1225 }
1226
1227 key[*key_len].ptr = row_id;
1228 key[*key_len].len = row_id_len;
1229 ++(*key_len);
1230
1231 return true;
1232}
1233
1234
1235/*
1236 * Construct Query_log_Event from thd query and serialize it
1237 * into buffer.
1238 *
1239 * Return 0 in case of success, 1 in case of error.
1240 */
1241int wsrep_to_buf_helper(
1242 THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
1243{
1244 IO_CACHE tmp_io_cache;
1245 Log_event_writer writer(&tmp_io_cache,0);
1246 if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
1247 65536, MYF(MY_WME)))
1248 return 1;
1249 int ret(0);
1250 enum enum_binlog_checksum_alg current_binlog_check_alg=
1251 (enum_binlog_checksum_alg) binlog_checksum_options;
1252
1253 Format_description_log_event *tmp_fd= new Format_description_log_event(4);
1254 tmp_fd->checksum_alg= current_binlog_check_alg;
1255 writer.write(tmp_fd);
1256 delete tmp_fd;
1257
1258#ifdef GTID_SUPPORT
1259 if (thd->variables.gtid_next.type == GTID_GROUP)
1260 {
1261 Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
1262 if (!gtid_ev.is_valid()) ret= 0;
1263 if (!ret && writer.write(&gtid_ev)) ret= 1;
1264 }
1265#endif /* GTID_SUPPORT */
1266 if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
1267 {
1268 Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
1269 thd->variables.gtid_domain_id,
1270 true, LOG_EVENT_SUPPRESS_USE_F,
1271 true, 0);
1272 gtid_event.server_id= thd->variables.server_id;
1273 if (!gtid_event.is_valid()) ret= 0;
1274 ret= writer.write(&gtid_event);
1275 }
1276
1277 /* if there is prepare query, add event for it */
1278 if (!ret && thd->wsrep_TOI_pre_query)
1279 {
1280 Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
1281 thd->wsrep_TOI_pre_query_len,
1282 FALSE, FALSE, FALSE, 0);
1283 ev.checksum_alg= current_binlog_check_alg;
1284 if (writer.write(&ev)) ret= 1;
1285 }
1286
1287 /* continue to append the actual query */
1288 Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
1289 ev.checksum_alg= current_binlog_check_alg;
1290 if (!ret && writer.write(&ev)) ret= 1;
1291 if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
1292 close_cached_file(&tmp_io_cache);
1293 return ret;
1294}
1295
1296static int
1297wsrep_alter_query_string(THD *thd, String *buf)
1298{
1299 /* Append the "ALTER" part of the query */
1300 if (buf->append(STRING_WITH_LEN("ALTER ")))
1301 return 1;
1302 /* Append definer */
1303 append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
1304 /* Append the left part of thd->query after event name part */
1305 if (buf->append(thd->lex->stmt_definition_begin,
1306 thd->lex->stmt_definition_end -
1307 thd->lex->stmt_definition_begin))
1308 return 1;
1309
1310 return 0;
1311}
1312
1313static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len)
1314{
1315 String log_query;
1316
1317 if (wsrep_alter_query_string(thd, &log_query))
1318 {
1319 WSREP_WARN("events alter string failed: schema: %s, query: %s",
1320 thd->get_db(), thd->query());
1321 return 1;
1322 }
1323 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
1324}
1325
1326#include "sql_show.h"
1327static int
1328create_view_query(THD *thd, uchar** buf, size_t* buf_len)
1329{
1330 LEX *lex= thd->lex;
1331 SELECT_LEX *select_lex= &lex->select_lex;
1332 TABLE_LIST *first_table= select_lex->table_list.first;
1333 TABLE_LIST *views = first_table;
1334 LEX_USER *definer;
1335 String buff;
1336 const LEX_CSTRING command[3]=
1337 {{ STRING_WITH_LEN("CREATE ") },
1338 { STRING_WITH_LEN("ALTER ") },
1339 { STRING_WITH_LEN("CREATE OR REPLACE ") }};
1340
1341 buff.append(&command[thd->lex->create_view->mode]);
1342
1343 if (lex->definer)
1344 definer= get_current_user(thd, lex->definer);
1345 else
1346 {
1347 /*
1348 DEFINER-clause is missing; we have to create default definer in
1349 persistent arena to be PS/SP friendly.
1350 If this is an ALTER VIEW then the current user should be set as
1351 the definer.
1352 */
1353 definer= create_default_definer(thd, false);
1354 }
1355
1356 if (definer)
1357 {
1358 views->definer.user = definer->user;
1359 views->definer.host = definer->host;
1360 } else {
1361 WSREP_ERROR("Failed to get DEFINER for VIEW.");
1362 return 1;
1363 }
1364
1365 views->algorithm = lex->create_view->algorithm;
1366 views->view_suid = lex->create_view->suid;
1367 views->with_check = lex->create_view->check;
1368
1369 view_store_options(thd, views, &buff);
1370 buff.append(STRING_WITH_LEN("VIEW "));
1371 /* Test if user supplied a db (ie: we did not use thd->db) */
1372 if (views->db.str && views->db.str[0] &&
1373 (thd->db.str == NULL || cmp(&views->db, &thd->db)))
1374 {
1375 append_identifier(thd, &buff, &views->db);
1376 buff.append('.');
1377 }
1378 append_identifier(thd, &buff, &views->table_name);
1379 if (lex->view_list.elements)
1380 {
1381 List_iterator_fast<LEX_CSTRING> names(lex->view_list);
1382 LEX_CSTRING *name;
1383 int i;
1384
1385 for (i= 0; (name= names++); i++)
1386 {
1387 buff.append(i ? ", " : "(");
1388 append_identifier(thd, &buff, name);
1389 }
1390 buff.append(')');
1391 }
1392 buff.append(STRING_WITH_LEN(" AS "));
1393 //buff.append(views->source.str, views->source.length);
1394 buff.append(thd->lex->create_view->select.str,
1395 thd->lex->create_view->select.length);
1396 //int errcode= query_error_code(thd, TRUE);
1397 //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
1398 // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
1399 return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
1400}
1401
1402/* Forward declarations. */
1403static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
1404static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
1405
1406/*
1407 Decide if statement should run in TOI.
1408
1409 Look if table or table_list contain temporary tables. If the
1410 statement affects only temporary tables, statement should not run
1411 in TOI. If the table list contains mix of regular and temporary tables
1412 (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
1413 should be rewritten at later time for replication to contain only
1414 non-temporary tables.
1415 */
1416static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
1417 const TABLE_LIST *table_list)
1418{
1419 DBUG_ASSERT(!table || db);
1420 DBUG_ASSERT(table_list || db);
1421
1422 LEX* lex= thd->lex;
1423 SELECT_LEX* select_lex= &lex->select_lex;
1424 TABLE_LIST* first_table= select_lex->table_list.first;
1425
1426 switch (lex->sql_command)
1427 {
1428 case SQLCOM_CREATE_TABLE:
1429 DBUG_ASSERT(!table_list);
1430 if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
1431 {
1432 return false;
1433 }
1434 return true;
1435
1436 case SQLCOM_CREATE_VIEW:
1437
1438 DBUG_ASSERT(!table_list);
1439 DBUG_ASSERT(first_table); /* First table is view name */
1440 /*
1441 If any of the remaining tables refer to temporary table error
1442 is returned to client, so TOI can be skipped
1443 */
1444 for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
1445 {
1446 if (thd->find_temporary_table(it))
1447 {
1448 return false;
1449 }
1450 }
1451 return true;
1452
1453 case SQLCOM_CREATE_TRIGGER:
1454
1455 DBUG_ASSERT(!table_list);
1456 DBUG_ASSERT(first_table);
1457
1458 if (thd->find_temporary_table(first_table))
1459 {
1460 return false;
1461 }
1462 return true;
1463
1464 default:
1465 if (table && !thd->find_temporary_table(db, table))
1466 {
1467 return true;
1468 }
1469
1470 if (table_list)
1471 {
1472 for (TABLE_LIST* table= first_table; table; table= table->next_global)
1473 {
1474 if (!thd->find_temporary_table(table->db.str, table->table_name.str))
1475 {
1476 return true;
1477 }
1478 }
1479 }
1480 return !(table || table_list);
1481 }
1482}
1483
1484/*
1485 returns:
1486 0: statement was replicated as TOI
1487 1: TOI replication was skipped
1488 -1: TOI replication failed
1489 */
1490static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
1491 const TABLE_LIST* table_list)
1492{
1493 wsrep_status_t ret(WSREP_WARNING);
1494 uchar* buf(0);
1495 size_t buf_len(0);
1496 int buf_err;
1497 int rc= 0;
1498
1499 if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
1500 {
1501 WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
1502 return 1;
1503 }
1504
1505 WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1506 thd->wsrep_exec_mode, thd->query() );
1507 switch (thd->lex->sql_command)
1508 {
1509 case SQLCOM_CREATE_VIEW:
1510 buf_err= create_view_query(thd, &buf, &buf_len);
1511 break;
1512 case SQLCOM_CREATE_PROCEDURE:
1513 case SQLCOM_CREATE_SPFUNCTION:
1514 buf_err= wsrep_create_sp(thd, &buf, &buf_len);
1515 break;
1516 case SQLCOM_CREATE_TRIGGER:
1517 buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
1518 break;
1519 case SQLCOM_CREATE_EVENT:
1520 buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
1521 break;
1522 case SQLCOM_ALTER_EVENT:
1523 buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
1524 break;
1525 case SQLCOM_CREATE_ROLE:
1526 if (sp_process_definer(thd))
1527 {
1528 WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
1529 }
1530 /* fallthrough */
1531 default:
1532 buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
1533 &buf, &buf_len);
1534 break;
1535 }
1536
1537 wsrep_key_arr_t key_arr= {0, 0};
1538 struct wsrep_buf buff = { buf, buf_len };
1539 if (!buf_err &&
1540 !wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr) &&
1541 key_arr.keys_len > 0 &&
1542 WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
1543 key_arr.keys, key_arr.keys_len,
1544 &buff, 1,
1545 &thd->wsrep_trx_meta)))
1546 {
1547 thd->wsrep_exec_mode= TOTAL_ORDER;
1548 wsrep_to_isolation++;
1549 wsrep_keys_free(&key_arr);
1550 WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
1551 thd->wsrep_exec_mode);
1552 }
1553 else if (key_arr.keys_len > 0) {
1554 /* jump to error handler in mysql_execute_command() */
1555 WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep "
1556 "connection state and retry the query.",
1557 ret,
1558 thd->get_db(),
1559 (thd->query()) ? thd->query() : "void");
1560 my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check "
1561 "your wsrep connection state and retry the query.", MYF(0));
1562 wsrep_keys_free(&key_arr);
1563 rc= -1;
1564 }
1565 else {
1566 /* non replicated DDL, affecting temporary tables only */
1567 WSREP_DEBUG("TO isolation skipped for: %d, sql: %s."
1568 "Only temporary tables affected.",
1569 ret, (thd->query()) ? thd->query() : "void");
1570 rc= 1;
1571 }
1572 if (buf) my_free(buf);
1573 return rc;
1574}
1575
1576static void wsrep_TOI_end(THD *thd) {
1577 wsrep_status_t ret;
1578 wsrep_to_isolation--;
1579
1580 WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1581 thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void");
1582
1583 wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
1584 thd->wsrep_trx_meta.gtid.seqno);
1585 WSREP_DEBUG("TO END: %lld, update seqno",
1586 (long long)wsrep_thd_trx_seqno(thd));
1587
1588 if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
1589 WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
1590 }
1591 else {
1592 WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
1593 ret,
1594 thd->get_db(),
1595 (thd->query()) ? thd->query() : "void");
1596 }
1597}
1598
1599static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
1600{
1601 wsrep_status_t ret(WSREP_WARNING);
1602 WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1603 thd->wsrep_exec_mode, thd->query() );
1604
1605 ret = wsrep->desync(wsrep);
1606 if (ret != WSREP_OK)
1607 {
1608 WSREP_WARN("RSU desync failed %d for schema: %s, query: %s",
1609 ret, thd->get_db(), thd->query());
1610 my_error(ER_LOCK_DEADLOCK, MYF(0));
1611 return(ret);
1612 }
1613
1614 mysql_mutex_lock(&LOCK_wsrep_replaying);
1615 wsrep_replaying++;
1616 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1617
1618 if (wsrep_wait_committing_connections_close(5000))
1619 {
1620 /* no can do, bail out from DDL */
1621 WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s",
1622 thd->get_db(), thd->query());
1623 mysql_mutex_lock(&LOCK_wsrep_replaying);
1624 wsrep_replaying--;
1625 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1626
1627 ret = wsrep->resync(wsrep);
1628 if (ret != WSREP_OK)
1629 {
1630 WSREP_WARN("resync failed %d for schema: %s, query: %s",
1631 ret, thd->get_db(), thd->query());
1632 }
1633
1634 my_error(ER_LOCK_DEADLOCK, MYF(0));
1635 return(1);
1636 }
1637
1638 wsrep_seqno_t seqno = wsrep->pause(wsrep);
1639 if (seqno == WSREP_SEQNO_UNDEFINED)
1640 {
1641 WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno,
1642 thd->get_db(), thd->query());
1643 return(1);
1644 }
1645 WSREP_DEBUG("paused at %lld", (long long)seqno);
1646 thd->variables.wsrep_on = 0;
1647 return 0;
1648}
1649
1650static void wsrep_RSU_end(THD *thd)
1651{
1652 wsrep_status_t ret(WSREP_WARNING);
1653 WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
1654 thd->wsrep_exec_mode, thd->query() );
1655
1656
1657 mysql_mutex_lock(&LOCK_wsrep_replaying);
1658 wsrep_replaying--;
1659 mysql_mutex_unlock(&LOCK_wsrep_replaying);
1660
1661 ret = wsrep->resume(wsrep);
1662 if (ret != WSREP_OK)
1663 {
1664 WSREP_WARN("resume failed %d for schema: %s, query: %s", ret,
1665 thd->get_db(), thd->query());
1666 }
1667
1668 ret = wsrep->resync(wsrep);
1669 if (ret != WSREP_OK)
1670 {
1671 WSREP_WARN("resync failed %d for schema: %s, query: %s", ret,
1672 thd->get_db(), thd->query());
1673 return;
1674 }
1675
1676 thd->variables.wsrep_on = 1;
1677}
1678
1679int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
1680 const TABLE_LIST* table_list)
1681{
1682 int ret= 0;
1683
1684 /*
1685 No isolation for applier or replaying threads.
1686 */
1687 if (thd->wsrep_exec_mode == REPL_RECV)
1688 return 0;
1689
1690 mysql_mutex_lock(&thd->LOCK_thd_data);
1691
1692 if (thd->wsrep_conflict_state == MUST_ABORT)
1693 {
1694 WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict",
1695 (longlong) thd->thread_id, thd->get_db(), thd->query());
1696 mysql_mutex_unlock(&thd->LOCK_thd_data);
1697 return WSREP_TRX_FAIL;
1698 }
1699 mysql_mutex_unlock(&thd->LOCK_thd_data);
1700
1701 DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
1702 DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
1703
1704 if (thd->global_read_lock.can_acquire_protection())
1705 {
1706 WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld",
1707 thd->query(), (longlong) thd->thread_id);
1708 return -1;
1709 }
1710
1711 if (wsrep_debug && thd->mdl_context.has_locks())
1712 {
1713 WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld",
1714 thd->query(), (longlong) thd->thread_id);
1715 }
1716
1717 /*
1718 It makes sense to set auto_increment_* to defaults in TOI operations.
1719 Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
1720 TOI statement and auto inc variables for wsrep replication is constructed
1721 there. Variables are reset back in THD::reset_for_next_command() before
1722 processing of next command.
1723 */
1724 if (wsrep_auto_increment_control)
1725 {
1726 thd->variables.auto_increment_offset = 1;
1727 thd->variables.auto_increment_increment = 1;
1728 }
1729
1730 if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
1731 {
1732 switch (thd->variables.wsrep_OSU_method) {
1733 case WSREP_OSU_TOI:
1734 ret = wsrep_TOI_begin(thd, db_, table_, table_list);
1735 break;
1736 case WSREP_OSU_RSU:
1737 ret = wsrep_RSU_begin(thd, db_, table_);
1738 break;
1739 default:
1740 WSREP_ERROR("Unsupported OSU method: %lu",
1741 thd->variables.wsrep_OSU_method);
1742 ret= -1;
1743 break;
1744 }
1745 switch (ret) {
1746 case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break;
1747 case 1:
1748 /* TOI replication skipped, treat as success */
1749 ret = 0;
1750 break;
1751 case -1:
1752 /* TOI replication failed, treat as error */
1753 break;
1754 }
1755 }
1756 return ret;
1757}
1758
1759void wsrep_to_isolation_end(THD *thd)
1760{
1761 if (thd->wsrep_exec_mode == TOTAL_ORDER)
1762 {
1763 switch(thd->variables.wsrep_OSU_method)
1764 {
1765 case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
1766 case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
1767 default:
1768 WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu",
1769 thd->variables.wsrep_OSU_method);
1770 break;
1771 }
1772 wsrep_cleanup_transaction(thd);
1773 }
1774}
1775
1776#define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \
1777 WSREP_##severity( \
1778 "%s\n" \
1779 "schema: %.*s\n" \
1780 "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
1781 "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
1782 msg, schema_len, schema, \
1783 (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
1784 req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
1785 req->get_command(), req->lex->sql_command, req->query(), \
1786 (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
1787 gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
1788 gra->get_command(), gra->lex->sql_command, gra->query());
1789
1790/**
1791 Check if request for the metadata lock should be granted to the requester.
1792
1793 @param requestor_ctx The MDL context of the requestor
1794 @param ticket MDL ticket for the requested lock
1795
1796 @retval TRUE Lock request can be granted
1797 @retval FALSE Lock request cannot be granted
1798*/
1799
1800bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
1801 MDL_ticket *ticket,
1802 const MDL_key *key)
1803{
1804 /* Fallback to the non-wsrep behaviour */
1805 if (!WSREP_ON) return FALSE;
1806
1807 THD *request_thd= requestor_ctx->get_thd();
1808 THD *granted_thd= ticket->get_ctx()->get_thd();
1809 bool ret= false;
1810
1811 const char* schema= key->db_name();
1812 int schema_len= key->db_name_length();
1813
1814 mysql_mutex_lock(&request_thd->LOCK_thd_data);
1815
1816 /*
1817 We consider granting MDL exceptions only for appliers (BF THD) and ones
1818 executing under TOI mode.
1819
1820 Rules:
1821 1. If granted/owner THD is also an applier (BF THD) or one executing
1822 under TOI mode, then we grant the requested lock to the requester
1823 THD.
1824 @return true
1825
1826 2. If granted/owner THD is executing a FLUSH command or already has an
1827 explicit lock, then do not grant the requested lock to the requester
1828 THD and it has to wait.
1829 @return false
1830
1831 3. In all other cases the granted/owner THD is aborted and the requested
1832 lock is not granted to the requester THD, thus it has to wait.
1833 @return false
1834 */
1835 if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
1836 request_thd->wsrep_exec_mode == REPL_RECV)
1837 {
1838 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
1839 WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
1840 request_thd, granted_thd);
1841 ticket->wsrep_report(wsrep_debug);
1842
1843 mysql_mutex_lock(&granted_thd->LOCK_thd_data);
1844 if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
1845 granted_thd->wsrep_exec_mode == REPL_RECV)
1846 {
1847 WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
1848 request_thd, granted_thd);
1849 ticket->wsrep_report(true);
1850 mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
1851 ret= true;
1852 }
1853 else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
1854 granted_thd->mdl_context.has_explicit_locks())
1855 {
1856 WSREP_DEBUG("BF thread waiting for FLUSH");
1857 ticket->wsrep_report(wsrep_debug);
1858 mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
1859 ret= false;
1860 }
1861 else
1862 {
1863 /* Print some debug information. */
1864 if (wsrep_debug)
1865 {
1866 if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE ||
1867 request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE)
1868 {
1869 WSREP_DEBUG("DROP caused BF abort");
1870 }
1871 else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
1872 {
1873 WSREP_DEBUG("MDL granted, but committing thd abort scheduled");
1874 }
1875 else
1876 {
1877 WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
1878 request_thd, granted_thd);
1879 }
1880 ticket->wsrep_report(true);
1881 }
1882
1883 mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
1884 wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1);
1885 ret= false;
1886 }
1887 }
1888 else
1889 {
1890 mysql_mutex_unlock(&request_thd->LOCK_thd_data);
1891 }
1892
1893 return ret;
1894}
1895
1896
1897pthread_handler_t start_wsrep_THD(void *arg)
1898{
1899 THD *thd;
1900 wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
1901
1902 if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
1903 {
1904 goto error;
1905 }
1906
1907 mysql_mutex_lock(&LOCK_thread_count);
1908
1909 if (wsrep_gtid_mode)
1910 {
1911 /* Adjust domain_id. */
1912 thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
1913 }
1914
1915 thd->real_id=pthread_self(); // Keep purify happy
1916 thread_created++;
1917 threads.append(thd);
1918
1919 my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
1920
1921 DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
1922 thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
1923 (void) mysql_mutex_unlock(&LOCK_thread_count);
1924
1925 /* from bootstrap()... */
1926 thd->bootstrap=1;
1927 thd->max_client_packet_length= thd->net.max_packet;
1928 thd->security_ctx->master_access= ~(ulong)0;
1929
1930 /* from handle_one_connection... */
1931 pthread_detach_this_thread();
1932
1933 mysql_thread_set_psi_id(thd->thread_id);
1934 thd->thr_create_utime= microsecond_interval_timer();
1935 if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
1936 {
1937 close_connection(thd, ER_OUT_OF_RESOURCES);
1938 statistic_increment(aborted_connects,&LOCK_status);
1939 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
1940 goto error;
1941 }
1942
1943// </5.1.17>
1944 /*
1945 handle_one_connection() is normally the only way a thread would
1946 start and would always be on the very high end of the stack ,
1947 therefore, the thread stack always starts at the address of the
1948 first local variable of handle_one_connection, which is thd. We
1949 need to know the start of the stack so that we could check for
1950 stack overruns.
1951 */
1952 DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
1953 (long long)thd->thread_id));
1954 /* now that we've called my_thread_init(), it is safe to call DBUG_* */
1955
1956 thd->thread_stack= (char*) &thd;
1957 if (thd->store_globals())
1958 {
1959 close_connection(thd, ER_OUT_OF_RESOURCES);
1960 statistic_increment(aborted_connects,&LOCK_status);
1961 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
1962 goto error;
1963 }
1964
1965 thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
1966 thd->security_ctx->skip_grants();
1967
1968 /* handle_one_connection() again... */
1969 //thd->version= refresh_version;
1970 thd->proc_info= 0;
1971 thd->set_command(COM_SLEEP);
1972 thd->init_for_queries();
1973
1974 mysql_mutex_lock(&LOCK_thread_count);
1975 wsrep_running_threads++;
1976 mysql_cond_broadcast(&COND_thread_count);
1977 mysql_mutex_unlock(&LOCK_thread_count);
1978
1979 processor(thd);
1980
1981 close_connection(thd, 0);
1982
1983 mysql_mutex_lock(&LOCK_thread_count);
1984 wsrep_running_threads--;
1985 WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
1986 mysql_cond_broadcast(&COND_thread_count);
1987 mysql_mutex_unlock(&LOCK_thread_count);
1988
1989 // Note: We can't call THD destructor without crashing
1990 // if plugins have not been initialized. However, in most of the
1991 // cases this means that pre SE initialization SST failed and
1992 // we are going to exit anyway.
1993 if (plugins_are_initialized)
1994 {
1995 net_end(&thd->net);
1996 MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
1997 }
1998 else
1999 {
2000 // TODO: lightweight cleanup to get rid of:
2001 // 'Error in my_thread_global_end(): 2 threads didn't exit'
2002 // at server shutdown
2003 }
2004
2005 unlink_not_visible_thd(thd);
2006 delete thd;
2007 my_thread_end();
2008 return(NULL);
2009
2010error:
2011 WSREP_ERROR("Failed to create/initialize system thread");
2012
2013 /* Abort if its the first applier/rollbacker thread. */
2014 if (!mysqld_server_initialized)
2015 unireg_abort(1);
2016 else
2017 return NULL;
2018}
2019
2020
2021/**/
2022static bool abort_replicated(THD *thd)
2023{
2024 bool ret_code= false;
2025 if (thd->wsrep_query_state== QUERY_COMMITTING)
2026 {
2027 WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
2028
2029 (void)wsrep_abort_thd(thd, thd, TRUE);
2030 ret_code= true;
2031 }
2032 return ret_code;
2033}
2034
2035
2036/**/
2037static inline bool is_client_connection(THD *thd)
2038{
2039 return (thd->wsrep_client_thread && thd->variables.wsrep_on);
2040}
2041
2042
2043static inline bool is_replaying_connection(THD *thd)
2044{
2045 bool ret;
2046
2047 mysql_mutex_lock(&thd->LOCK_thd_data);
2048 ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
2049 mysql_mutex_unlock(&thd->LOCK_thd_data);
2050
2051 return ret;
2052}
2053
2054
2055static inline bool is_committing_connection(THD *thd)
2056{
2057 bool ret;
2058
2059 mysql_mutex_lock(&thd->LOCK_thd_data);
2060 ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
2061 mysql_mutex_unlock(&thd->LOCK_thd_data);
2062
2063 return ret;
2064}
2065
2066
2067static bool have_client_connections()
2068{
2069 THD *tmp;
2070
2071 I_List_iterator<THD> it(threads);
2072 while ((tmp=it++))
2073 {
2074 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2075 (longlong) tmp->thread_id));
2076 if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
2077 {
2078 (void)abort_replicated(tmp);
2079 return true;
2080 }
2081 }
2082 return false;
2083}
2084
2085static void wsrep_close_thread(THD *thd)
2086{
2087 thd->set_killed(KILL_CONNECTION);
2088 MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
2089 if (thd->mysys_var)
2090 {
2091 thd->mysys_var->abort=1;
2092 mysql_mutex_lock(&thd->mysys_var->mutex);
2093 if (thd->mysys_var->current_cond)
2094 {
2095 mysql_mutex_lock(thd->mysys_var->current_mutex);
2096 mysql_cond_broadcast(thd->mysys_var->current_cond);
2097 mysql_mutex_unlock(thd->mysys_var->current_mutex);
2098 }
2099 mysql_mutex_unlock(&thd->mysys_var->mutex);
2100 }
2101}
2102
2103
2104static my_bool have_committing_connections()
2105{
2106 THD *tmp;
2107 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2108
2109 I_List_iterator<THD> it(threads);
2110 while ((tmp=it++))
2111 {
2112 if (!is_client_connection(tmp))
2113 continue;
2114
2115 if (is_committing_connection(tmp))
2116 {
2117 return TRUE;
2118 }
2119 }
2120 mysql_mutex_unlock(&LOCK_thread_count);
2121 return FALSE;
2122}
2123
2124
2125int wsrep_wait_committing_connections_close(int wait_time)
2126{
2127 int sleep_time= 100;
2128
2129 while (have_committing_connections() && wait_time > 0)
2130 {
2131 WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
2132 my_sleep(sleep_time);
2133 wait_time -= sleep_time;
2134 }
2135 if (have_committing_connections())
2136 {
2137 return 1;
2138 }
2139 return 0;
2140}
2141
2142
2143void wsrep_close_client_connections(my_bool wait_to_end)
2144{
2145 /*
2146 First signal all threads that it's time to die
2147 */
2148
2149 THD *tmp;
2150 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2151
2152 bool kill_cached_threads_saved= kill_cached_threads;
2153 kill_cached_threads= true; // prevent future threads caching
2154 mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
2155
2156 I_List_iterator<THD> it(threads);
2157 while ((tmp=it++))
2158 {
2159 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2160 (longlong) tmp->thread_id));
2161 /* We skip slave threads & scheduler on this first loop through. */
2162 if (!is_client_connection(tmp))
2163 continue;
2164
2165 if (is_replaying_connection(tmp))
2166 {
2167 tmp->set_killed(KILL_CONNECTION);
2168 continue;
2169 }
2170
2171 /* replicated transactions must be skipped */
2172 if (abort_replicated(tmp))
2173 continue;
2174
2175 WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
2176 wsrep_close_thread(tmp);
2177 }
2178 mysql_mutex_unlock(&LOCK_thread_count);
2179
2180 if (thread_count)
2181 sleep(2); // Give threads time to die
2182
2183 mysql_mutex_lock(&LOCK_thread_count);
2184 /*
2185 Force remaining threads to die by closing the connection to the client
2186 */
2187
2188 I_List_iterator<THD> it2(threads);
2189 while ((tmp=it2++))
2190 {
2191#ifndef __bsdi__ // Bug in BSDI kernel
2192 if (is_client_connection(tmp) &&
2193 !abort_replicated(tmp) &&
2194 !is_replaying_connection(tmp))
2195 {
2196 WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
2197 close_connection(tmp,0);
2198 }
2199#endif
2200 }
2201
2202 DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
2203 WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
2204
2205 while (wait_to_end && have_client_connections())
2206 {
2207 mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
2208 DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
2209 }
2210
2211 kill_cached_threads= kill_cached_threads_saved;
2212
2213 mysql_mutex_unlock(&LOCK_thread_count);
2214
2215 /* All client connection threads have now been aborted */
2216}
2217
2218
2219void wsrep_close_applier(THD *thd)
2220{
2221 WSREP_DEBUG("closing applier %lld", (longlong) thd->thread_id);
2222 wsrep_close_thread(thd);
2223}
2224
2225
2226void wsrep_close_threads(THD *thd)
2227{
2228 THD *tmp;
2229 mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
2230
2231 I_List_iterator<THD> it(threads);
2232 while ((tmp=it++))
2233 {
2234 DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
2235 (longlong) tmp->thread_id));
2236 /* We skip slave threads & scheduler on this first loop through. */
2237 if (tmp->wsrep_applier && tmp != thd)
2238 {
2239 WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
2240 wsrep_close_thread (tmp);
2241 }
2242 }
2243
2244 mysql_mutex_unlock(&LOCK_thread_count);
2245}
2246
2247void wsrep_wait_appliers_close(THD *thd)
2248{
2249 /* Wait for wsrep appliers to gracefully exit */
2250 mysql_mutex_lock(&LOCK_thread_count);
2251 while (wsrep_running_threads > 1)
2252 // 1 is for rollbacker thread which needs to be killed explicitly.
2253 // This gotta be fixed in a more elegant manner if we gonna have arbitrary
2254 // number of non-applier wsrep threads.
2255 {
2256 if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2257 {
2258 mysql_mutex_unlock(&LOCK_thread_count);
2259 my_sleep(100);
2260 mysql_mutex_lock(&LOCK_thread_count);
2261 }
2262 else
2263 mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2264 DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
2265 }
2266 mysql_mutex_unlock(&LOCK_thread_count);
2267 /* Now kill remaining wsrep threads: rollbacker */
2268 wsrep_close_threads (thd);
2269 /* and wait for them to die */
2270 mysql_mutex_lock(&LOCK_thread_count);
2271 while (wsrep_running_threads > 0)
2272 {
2273 if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
2274 {
2275 mysql_mutex_unlock(&LOCK_thread_count);
2276 my_sleep(100);
2277 mysql_mutex_lock(&LOCK_thread_count);
2278 }
2279 else
2280 mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
2281 DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
2282 }
2283 mysql_mutex_unlock(&LOCK_thread_count);
2284
2285 /* All wsrep applier threads have now been aborted. However, if this thread
2286 is also applier, we are still running...
2287 */
2288}
2289
2290
2291void wsrep_kill_mysql(THD *thd)
2292{
2293 if (mysqld_server_started)
2294 {
2295 if (!shutdown_in_progress)
2296 {
2297 WSREP_INFO("starting shutdown");
2298 kill_mysql();
2299 }
2300 }
2301 else
2302 {
2303 unireg_abort(1);
2304 }
2305}
2306
2307
2308static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
2309{
2310 String log_query;
2311 sp_head *sp = thd->lex->sphead;
2312 sql_mode_t saved_mode= thd->variables.sql_mode;
2313 String retstr(64);
2314 LEX_CSTRING returns= empty_clex_str;
2315 retstr.set_charset(system_charset_info);
2316
2317 log_query.set_charset(system_charset_info);
2318
2319 if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
2320 {
2321 sp_returns_type(thd, retstr, sp);
2322 returns= retstr.lex_cstring();
2323 }
2324 if (sp->m_handler->
2325 show_create_sp(thd, &log_query,
2326 sp->m_explicit_name ? sp->m_db : null_clex_str,
2327 sp->m_name, sp->m_params, returns,
2328 sp->m_body, sp->chistics(),
2329 thd->lex->definer[0],
2330 thd->lex->create_info,
2331 saved_mode))
2332 {
2333 WSREP_WARN("SP create string failed: schema: %s, query: %s",
2334 thd->get_db(), thd->query());
2335 return 1;
2336 }
2337
2338 return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
2339}
2340
2341
2342extern int wsrep_on(THD *thd)
2343{
2344 return (int)(WSREP(thd));
2345}
2346
2347
2348extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
2349{
2350 return thd->variables.wsrep_on;
2351}
2352
2353
2354bool wsrep_consistency_check(THD *thd)
2355{
2356 return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
2357}
2358
2359
2360extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
2361{
2362 thd->wsrep_exec_mode= mode;
2363}
2364
2365
2366extern "C" void wsrep_thd_set_query_state(
2367 THD *thd, enum wsrep_query_state state)
2368{
2369 thd->wsrep_query_state= state;
2370}
2371
2372
2373void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state)
2374{
2375 if (WSREP(thd)) thd->wsrep_conflict_state= state;
2376}
2377
2378
2379enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
2380{
2381 return thd->wsrep_exec_mode;
2382}
2383
2384
2385const char *wsrep_thd_exec_mode_str(THD *thd)
2386{
2387 return
2388 (!thd) ? "void" :
2389 (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
2390 (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
2391 (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
2392 (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
2393}
2394
2395
2396enum wsrep_query_state wsrep_thd_query_state(THD *thd)
2397{
2398 return thd->wsrep_query_state;
2399}
2400
2401
2402const char *wsrep_thd_query_state_str(THD *thd)
2403{
2404 return
2405 (!thd) ? "void" :
2406 (thd->wsrep_query_state == QUERY_IDLE) ? "idle" :
2407 (thd->wsrep_query_state == QUERY_EXEC) ? "executing" :
2408 (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" :
2409 (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" :
2410 (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void";
2411}
2412
2413
2414enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd)
2415{
2416 return thd->wsrep_conflict_state;
2417}
2418
2419
2420const char *wsrep_thd_conflict_state_str(THD *thd)
2421{
2422 return
2423 (!thd) ? "void" :
2424 (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" :
2425 (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" :
2426 (thd->wsrep_conflict_state == ABORTING) ? "aborting" :
2427 (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" :
2428 (thd->wsrep_conflict_state == REPLAYING) ? "replaying" :
2429 (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" :
2430 (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
2431}
2432
2433
2434wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
2435{
2436 return &thd->wsrep_ws_handle;
2437}
2438
2439
2440void wsrep_thd_LOCK(THD *thd)
2441{
2442 mysql_mutex_lock(&thd->LOCK_thd_data);
2443}
2444
2445
2446void wsrep_thd_UNLOCK(THD *thd)
2447{
2448 mysql_mutex_unlock(&thd->LOCK_thd_data);
2449}
2450
2451
2452extern "C" time_t wsrep_thd_query_start(THD *thd)
2453{
2454 return thd->query_start();
2455}
2456
2457
2458extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
2459{
2460 return thd->wsrep_rand;
2461}
2462
2463longlong wsrep_thd_trx_seqno(THD *thd)
2464{
2465 return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
2466}
2467
2468
2469extern "C" query_id_t wsrep_thd_query_id(THD *thd)
2470{
2471 return thd->query_id;
2472}
2473
2474
2475char *wsrep_thd_query(THD *thd)
2476{
2477 return (thd) ? thd->query() : NULL;
2478}
2479
2480
2481extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
2482{
2483 return thd->wsrep_last_query_id;
2484}
2485
2486
2487extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
2488{
2489 thd->wsrep_last_query_id= id;
2490}
2491
2492
2493extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
2494{
2495 if (signal)
2496 {
2497 thd->awake(KILL_QUERY);
2498 }
2499 else
2500 {
2501 mysql_mutex_lock(&LOCK_wsrep_replaying);
2502 mysql_cond_broadcast(&COND_wsrep_replaying);
2503 mysql_mutex_unlock(&LOCK_wsrep_replaying);
2504 }
2505}
2506
2507
2508int wsrep_thd_retry_counter(THD *thd)
2509{
2510 return(thd->wsrep_retry_counter);
2511}
2512
2513
2514extern "C" bool wsrep_thd_ignore_table(THD *thd)
2515{
2516 return thd->wsrep_ignore_table;
2517}
2518
2519
2520extern int
2521wsrep_trx_order_before(THD *thd1, THD *thd2)
2522{
2523 if (wsrep_thd_trx_seqno(thd1) < wsrep_thd_trx_seqno(thd2)) {
2524 WSREP_DEBUG("BF conflict, order: %lld %lld\n",
2525 (long long)wsrep_thd_trx_seqno(thd1),
2526 (long long)wsrep_thd_trx_seqno(thd2));
2527 return 1;
2528 }
2529 WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
2530 (long long)wsrep_thd_trx_seqno(thd1),
2531 (long long)wsrep_thd_trx_seqno(thd2));
2532 return 0;
2533}
2534
2535
2536int wsrep_trx_is_aborting(THD *thd_ptr)
2537{
2538 if (thd_ptr) {
2539 if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
2540 (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
2541 return 1;
2542 }
2543 }
2544 return 0;
2545}
2546
2547
2548void wsrep_copy_query(THD *thd)
2549{
2550 thd->wsrep_retry_command = thd->get_command();
2551 thd->wsrep_retry_query_len = thd->query_length();
2552 if (thd->wsrep_retry_query) {
2553 my_free(thd->wsrep_retry_query);
2554 }
2555 thd->wsrep_retry_query = (char *)my_malloc(
2556 thd->wsrep_retry_query_len + 1, MYF(0));
2557 strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
2558 thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
2559}
2560
2561
2562bool wsrep_is_show_query(enum enum_sql_command command)
2563{
2564 DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
2565 return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
2566}
2567
2568bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
2569 TABLE_LIST* src_table,
2570 HA_CREATE_INFO *create_info)
2571{
2572 if (create_info->tmp_table())
2573 {
2574 /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
2575 WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
2576 thd->query());
2577 }
2578 else if (!(thd->find_temporary_table(src_table)))
2579 {
2580 /* this is straight CREATE TABLE LIKE... with no tmp tables */
2581 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2582 }
2583 else
2584 {
2585 /* here we have CREATE TABLE LIKE <temporary table>
2586 the temporary table definition will be needed in slaves to
2587 enable the create to succeed
2588 */
2589 TABLE_LIST tbl;
2590 bzero((void*) &tbl, sizeof(tbl));
2591 tbl.db= src_table->db;
2592 tbl.table_name= tbl.alias= src_table->table_name;
2593 tbl.table= src_table->table;
2594 char buf[2048];
2595 String query(buf, sizeof(buf), system_charset_info);
2596 query.length(0); // Have to zero it since constructor doesn't
2597
2598 (void) show_create_table(thd, &tbl, &query, NULL, WITH_DB_NAME);
2599 WSREP_DEBUG("TMP TABLE: %s", query.ptr());
2600
2601 thd->wsrep_TOI_pre_query= query.ptr();
2602 thd->wsrep_TOI_pre_query_len= query.length();
2603
2604 WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL);
2605
2606 thd->wsrep_TOI_pre_query= NULL;
2607 thd->wsrep_TOI_pre_query_len= 0;
2608 }
2609
2610 return(false);
2611
2612error:
2613 thd->wsrep_TOI_pre_query= NULL;
2614 return (true);
2615}
2616
2617
2618static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
2619{
2620 LEX *lex= thd->lex;
2621 String stmt_query;
2622
2623 LEX_CSTRING definer_user;
2624 LEX_CSTRING definer_host;
2625
2626 if (!lex->definer)
2627 {
2628 if (!thd->slave_thread)
2629 {
2630 if (!(lex->definer= create_default_definer(thd, false)))
2631 return 1;
2632 }
2633 }
2634
2635 if (lex->definer)
2636 {
2637 /* SUID trigger. */
2638 LEX_USER *d= get_current_user(thd, lex->definer);
2639
2640 if (!d)
2641 return 1;
2642
2643 definer_user= d->user;
2644 definer_host= d->host;
2645 }
2646 else
2647 {
2648 /* non-SUID trigger. */
2649
2650 definer_user.str= 0;
2651 definer_user.length= 0;
2652
2653 definer_host.str= 0;
2654 definer_host.length= 0;
2655 }
2656
2657 stmt_query.append(STRING_WITH_LEN("CREATE "));
2658
2659 append_definer(thd, &stmt_query, &definer_user, &definer_host);
2660
2661 LEX_CSTRING stmt_definition;
2662 stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
2663 stmt_definition.length= thd->lex->stmt_definition_end
2664 - thd->lex->stmt_definition_begin;
2665 trim_whitespace(thd->charset(), &stmt_definition);
2666
2667 stmt_query.append(stmt_definition.str, stmt_definition.length);
2668
2669 return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
2670 buf, buf_len);
2671}
2672
2673/***** callbacks for wsrep service ************/
2674
2675my_bool get_wsrep_debug()
2676{
2677 return wsrep_debug;
2678}
2679
2680my_bool get_wsrep_load_data_splitting()
2681{
2682 return wsrep_load_data_splitting;
2683}
2684
2685long get_wsrep_protocol_version()
2686{
2687 return wsrep_protocol_version;
2688}
2689
2690my_bool get_wsrep_drupal_282555_workaround()
2691{
2692 return wsrep_drupal_282555_workaround;
2693}
2694
2695my_bool get_wsrep_recovery()
2696{
2697 return wsrep_recovery;
2698}
2699
2700my_bool get_wsrep_log_conflicts()
2701{
2702 return wsrep_log_conflicts;
2703}
2704
2705wsrep_t *get_wsrep()
2706{
2707 return wsrep;
2708}
2709
2710my_bool get_wsrep_certify_nonPK()
2711{
2712 return wsrep_certify_nonPK;
2713}
2714
2715void wsrep_lock_rollback()
2716{
2717 mysql_mutex_lock(&LOCK_wsrep_rollback);
2718}
2719
2720void wsrep_unlock_rollback()
2721{
2722 mysql_cond_signal(&COND_wsrep_rollback);
2723 mysql_mutex_unlock(&LOCK_wsrep_rollback);
2724}
2725
2726my_bool wsrep_aborting_thd_contains(THD *thd)
2727{
2728 mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
2729 wsrep_aborting_thd_t abortees = wsrep_aborting_thd;
2730 while (abortees)
2731 {
2732 if (abortees->aborting_thd == thd)
2733 return true;
2734 abortees = abortees->next;
2735 }
2736 return false;
2737}
2738
2739void wsrep_aborting_thd_enqueue(THD *thd)
2740{
2741 mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
2742 wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
2743 my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0));
2744 aborting->aborting_thd = thd;
2745 aborting->next = wsrep_aborting_thd;
2746 wsrep_aborting_thd = aborting;
2747}
2748
2749bool wsrep_node_is_donor()
2750{
2751 return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false;
2752}
2753
2754bool wsrep_node_is_synced()
2755{
2756 return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
2757}
2758