1 | /* Copyright 2008-2015 Codership Oy <http://www.codership.com> |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License.x1 |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ |
15 | |
16 | #include "mariadb.h" |
17 | #include <mysqld.h> |
18 | #include <sql_class.h> |
19 | #include <sql_parse.h> |
20 | #include <sql_base.h> /* find_temporary_table() */ |
21 | #include "slave.h" |
22 | #include "rpl_mi.h" |
23 | #include "sql_repl.h" |
24 | #include "rpl_filter.h" |
25 | #include "sql_callback.h" |
26 | #include "sp_head.h" |
27 | #include "sql_show.h" |
28 | #include "sp.h" |
29 | #include "wsrep_priv.h" |
30 | #include "wsrep_thd.h" |
31 | #include "wsrep_sst.h" |
32 | #include "wsrep_utils.h" |
33 | #include "wsrep_var.h" |
34 | #include "wsrep_binlog.h" |
35 | #include "wsrep_applier.h" |
36 | #include "wsrep_xid.h" |
37 | #include <cstdio> |
38 | #include <cstdlib> |
39 | #include "log_event.h" |
40 | #include <slave.h> |
41 | #include "sql_plugin.h" /* wsrep_plugins_pre_init() */ |
42 | |
43 | wsrep_t *wsrep = NULL; |
44 | /* |
45 | wsrep_emulate_bin_log is a flag to tell that binlog has not been configured. |
46 | wsrep needs to get binlog events from transaction cache even when binlog is |
47 | not enabled, wsrep_emulate_bin_log opens needed code paths to make this |
48 | possible |
49 | */ |
50 | my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface |
51 | #ifdef GTID_SUPPORT |
52 | /* Sidno in global_sid_map corresponding to group uuid */ |
53 | rpl_sidno wsrep_sidno= -1; |
54 | #endif /* GTID_SUPPORT */ |
55 | my_bool wsrep_preordered_opt= FALSE; |
56 | |
57 | /* |
58 | * Begin configuration options |
59 | */ |
60 | |
61 | extern my_bool plugins_are_initialized; |
62 | extern uint kill_cached_threads; |
63 | extern mysql_cond_t COND_thread_cache; |
64 | |
65 | /* System variables. */ |
66 | const char *wsrep_provider; |
67 | const char *wsrep_provider_options; |
68 | const char *wsrep_cluster_address; |
69 | const char *wsrep_cluster_name; |
70 | const char *wsrep_node_name; |
71 | const char *wsrep_node_address; |
72 | const char *wsrep_node_incoming_address; |
73 | const char *wsrep_start_position; |
74 | const char *wsrep_data_home_dir; |
75 | const char *wsrep_dbug_option; |
76 | const char *wsrep_notify_cmd; |
77 | const char *wsrep_sst_method; |
78 | const char *wsrep_sst_receive_address; |
79 | const char *wsrep_sst_donor; |
80 | const char *wsrep_sst_auth; |
81 | my_bool wsrep_debug; // Enable debug level logging |
82 | my_bool wsrep_convert_LOCK_to_trx; // Convert locking sessions to trx |
83 | my_bool wsrep_auto_increment_control; // Control auto increment variables |
84 | my_bool wsrep_drupal_282555_workaround; // Retry autoinc insert after dupkey |
85 | my_bool wsrep_certify_nonPK; // Certify, even when no primary key |
86 | my_bool wsrep_recovery; // Recovery |
87 | my_bool wsrep_replicate_myisam; // Enable MyISAM replication |
88 | my_bool wsrep_log_conflicts; |
89 | my_bool wsrep_load_data_splitting; // Commit load data every 10K intervals |
90 | my_bool wsrep_slave_UK_checks; // Slave thread does UK checks |
91 | my_bool wsrep_slave_FK_checks; // Slave thread does FK checks |
92 | my_bool wsrep_sst_donor_rejects_queries; |
93 | my_bool wsrep_restart_slave; // Should mysql slave thread be |
94 | // restarted, when node joins back? |
95 | my_bool wsrep_desync; // De(re)synchronize the node from the |
96 | // cluster |
97 | long wsrep_slave_threads; // No. of slave appliers threads |
98 | ulong wsrep_retry_autocommit; // Retry aborted autocommit trx |
99 | ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size |
100 | ulong wsrep_max_ws_rows; // Max number of rows in ws |
101 | ulong wsrep_forced_binlog_format; |
102 | ulong wsrep_mysql_replication_bundle; |
103 | bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id |
104 | // for galera transactions? |
105 | uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera |
106 | // transactions |
107 | |
108 | /* Other configuration variables and their default values. */ |
109 | my_bool wsrep_incremental_data_collection= 0; // Incremental data collection |
110 | my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave |
111 | // restart will be needed |
112 | bool wsrep_new_cluster= false; // Bootstrap the cluster? |
113 | int wsrep_slave_count_change= 0; // No. of appliers to stop/start |
114 | int wsrep_to_isolation= 0; // No. of active TO isolation threads |
115 | long wsrep_max_protocol_version= 3; // Maximum protocol version to use |
116 | |
117 | /* |
118 | * End configuration options |
119 | */ |
120 | |
121 | /* |
122 | * Other wsrep global variables. |
123 | */ |
124 | |
125 | mysql_mutex_t LOCK_wsrep_ready; |
126 | mysql_cond_t COND_wsrep_ready; |
127 | mysql_mutex_t LOCK_wsrep_sst; |
128 | mysql_cond_t COND_wsrep_sst; |
129 | mysql_mutex_t LOCK_wsrep_sst_init; |
130 | mysql_cond_t COND_wsrep_sst_init; |
131 | mysql_mutex_t LOCK_wsrep_rollback; |
132 | mysql_cond_t COND_wsrep_rollback; |
133 | wsrep_aborting_thd_t wsrep_aborting_thd= NULL; |
134 | mysql_mutex_t LOCK_wsrep_replaying; |
135 | mysql_cond_t COND_wsrep_replaying; |
136 | mysql_mutex_t LOCK_wsrep_slave_threads; |
137 | mysql_mutex_t LOCK_wsrep_desync; |
138 | mysql_mutex_t LOCK_wsrep_config_state; |
139 | |
140 | int wsrep_replaying= 0; |
141 | ulong wsrep_running_threads = 0; // # of currently running wsrep threads |
142 | ulong my_bind_addr; |
143 | |
144 | #ifdef HAVE_PSI_INTERFACE |
145 | PSI_mutex_key key_LOCK_wsrep_rollback, |
146 | key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, |
147 | key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, |
148 | key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, |
149 | key_LOCK_wsrep_config_state; |
150 | |
151 | PSI_cond_key key_COND_wsrep_rollback, |
152 | key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, |
153 | key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; |
154 | |
155 | PSI_file_key key_file_wsrep_gra_log; |
156 | |
157 | static PSI_mutex_info wsrep_mutexes[]= |
158 | { |
159 | { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready" , PSI_FLAG_GLOBAL}, |
160 | { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst" , PSI_FLAG_GLOBAL}, |
161 | { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread" , 0}, |
162 | { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init" , PSI_FLAG_GLOBAL}, |
163 | { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst" , PSI_FLAG_GLOBAL}, |
164 | { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback" , PSI_FLAG_GLOBAL}, |
165 | { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying" , PSI_FLAG_GLOBAL}, |
166 | { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads" , PSI_FLAG_GLOBAL}, |
167 | { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync" , PSI_FLAG_GLOBAL}, |
168 | { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state" , PSI_FLAG_GLOBAL} |
169 | }; |
170 | |
171 | static PSI_cond_info wsrep_conds[]= |
172 | { |
173 | { &key_COND_wsrep_ready, "COND_wsrep_ready" , PSI_FLAG_GLOBAL}, |
174 | { &key_COND_wsrep_sst, "COND_wsrep_sst" , PSI_FLAG_GLOBAL}, |
175 | { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init" , PSI_FLAG_GLOBAL}, |
176 | { &key_COND_wsrep_sst_thread, "wsrep_sst_thread" , 0}, |
177 | { &key_COND_wsrep_rollback, "COND_wsrep_rollback" , PSI_FLAG_GLOBAL}, |
178 | { &key_COND_wsrep_replaying, "COND_wsrep_replaying" , PSI_FLAG_GLOBAL} |
179 | }; |
180 | |
181 | static PSI_file_info wsrep_files[]= |
182 | { |
183 | { &key_file_wsrep_gra_log, "wsrep_gra_log" , 0} |
184 | }; |
185 | #endif |
186 | |
187 | my_bool wsrep_inited = 0; // initialized ? |
188 | |
189 | static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; |
190 | static char cluster_uuid_str[40]= { 0, }; |
191 | static const char* cluster_status_str[WSREP_VIEW_MAX] = |
192 | { |
193 | "Primary" , |
194 | "non-Primary" , |
195 | "Disconnected" |
196 | }; |
197 | |
198 | static char provider_name[256]= { 0, }; |
199 | static char provider_version[256]= { 0, }; |
200 | static char provider_vendor[256]= { 0, }; |
201 | |
202 | /* |
203 | * wsrep status variables |
204 | */ |
205 | my_bool wsrep_connected = FALSE; |
206 | my_bool wsrep_ready = FALSE; // node can accept queries |
207 | const char* wsrep_cluster_state_uuid = cluster_uuid_str; |
208 | long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; |
209 | const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; |
210 | long wsrep_cluster_size = 0; |
211 | long wsrep_local_index = -1; |
212 | long long wsrep_local_bf_aborts = 0; |
213 | const char* wsrep_provider_name = provider_name; |
214 | const char* wsrep_provider_version = provider_version; |
215 | const char* wsrep_provider_vendor = provider_vendor; |
216 | /* End wsrep status variables */ |
217 | |
218 | wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; |
219 | wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; |
220 | long wsrep_protocol_version = 3; |
221 | |
222 | wsp::Config_state *wsrep_config_state; |
223 | |
224 | // Boolean denoting if server is in initial startup phase. This is needed |
225 | // to make sure that main thread waiting in wsrep_sst_wait() is signaled |
226 | // if there was no state gap on receiving first view event. |
227 | static my_bool wsrep_startup = TRUE; |
228 | |
229 | |
230 | static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { |
231 | switch (level) { |
232 | case WSREP_LOG_INFO: |
233 | sql_print_information("WSREP: %s" , msg); |
234 | break; |
235 | case WSREP_LOG_WARN: |
236 | sql_print_warning("WSREP: %s" , msg); |
237 | break; |
238 | case WSREP_LOG_ERROR: |
239 | case WSREP_LOG_FATAL: |
240 | sql_print_error("WSREP: %s" , msg); |
241 | break; |
242 | case WSREP_LOG_DEBUG: |
243 | if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s" , msg); |
244 | default: |
245 | break; |
246 | } |
247 | } |
248 | |
249 | void wsrep_log(void (*fun)(const char *, ...), const char *format, ...) |
250 | { |
251 | va_list args; |
252 | char msg[1024]; |
253 | va_start(args, format); |
254 | vsnprintf(msg, sizeof(msg) - 1, format, args); |
255 | va_end(args); |
256 | (fun)("WSREP: %s" , msg); |
257 | } |
258 | |
259 | |
260 | static void wsrep_log_states (wsrep_log_level_t const level, |
261 | const wsrep_uuid_t* const group_uuid, |
262 | wsrep_seqno_t const group_seqno, |
263 | const wsrep_uuid_t* const node_uuid, |
264 | wsrep_seqno_t const node_seqno) |
265 | { |
266 | char uuid_str[37]; |
267 | char msg[256]; |
268 | |
269 | wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str)); |
270 | snprintf (msg, 255, "WSREP: Group state: %s:%lld" , |
271 | uuid_str, (long long)group_seqno); |
272 | wsrep_log_cb (level, msg); |
273 | |
274 | wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str)); |
275 | snprintf (msg, 255, "WSREP: Local state: %s:%lld" , |
276 | uuid_str, (long long)node_seqno); |
277 | wsrep_log_cb (level, msg); |
278 | } |
279 | |
280 | #ifdef GTID_SUPPORT |
281 | void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid) |
282 | { |
283 | /* generate new Sid map entry from inverted uuid */ |
284 | rpl_sid sid; |
285 | wsrep_uuid_t ltid_uuid; |
286 | |
287 | for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) |
288 | { |
289 | ltid_uuid.data[i] = ~wsrep_uuid.data[i]; |
290 | } |
291 | |
292 | sid.copy_from(ltid_uuid.data); |
293 | global_sid_lock->wrlock(); |
294 | wsrep_sidno= global_sid_map->add_sid(sid); |
295 | WSREP_INFO("Initialized wsrep sidno %d" , wsrep_sidno); |
296 | global_sid_lock->unlock(); |
297 | } |
298 | #endif /* GTID_SUPPORT */ |
299 | |
300 | static wsrep_cb_status_t |
301 | wsrep_view_handler_cb (void* app_ctx, |
302 | void* recv_ctx, |
303 | const wsrep_view_info_t* view, |
304 | const char* state, |
305 | size_t state_len, |
306 | void** sst_req, |
307 | size_t* sst_req_len) |
308 | { |
309 | *sst_req = NULL; |
310 | *sst_req_len = 0; |
311 | |
312 | wsrep_member_status_t memb_status= wsrep_config_state->get_status(); |
313 | |
314 | if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) |
315 | { |
316 | memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid, |
317 | sizeof(cluster_uuid)); |
318 | |
319 | wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, |
320 | sizeof(cluster_uuid_str)); |
321 | } |
322 | |
323 | wsrep_cluster_conf_id= view->view; |
324 | wsrep_cluster_status= cluster_status_str[view->status]; |
325 | wsrep_cluster_size= view->memb_num; |
326 | wsrep_local_index= view->my_idx; |
327 | |
328 | WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " |
329 | "number of nodes: %ld, my index: %ld, protocol version %d" , |
330 | wsrep_cluster_state_uuid, (long long)view->state_id.seqno, |
331 | (long long)wsrep_cluster_conf_id, wsrep_cluster_status, |
332 | wsrep_cluster_size, wsrep_local_index, view->proto_ver); |
333 | |
334 | /* Proceed further only if view is PRIMARY */ |
335 | if (WSREP_VIEW_PRIMARY != view->status) |
336 | { |
337 | #ifdef HAVE_QUERY_CACHE |
338 | // query cache must be initialised by now |
339 | query_cache.flush(); |
340 | #endif /* HAVE_QUERY_CACHE */ |
341 | |
342 | wsrep_ready_set(FALSE); |
343 | memb_status= WSREP_MEMBER_UNDEFINED; |
344 | /* Always record local_uuid and local_seqno in non-prim since this |
345 | * may lead to re-initializing provider and start position is |
346 | * determined according to these variables */ |
347 | // WRONG! local_uuid should be the last primary configuration uuid we were |
348 | // a member of. local_seqno should be updated in commit calls. |
349 | // local_uuid= cluster_uuid; |
350 | // local_seqno= view->first - 1; |
351 | goto out; |
352 | } |
353 | |
354 | switch (view->proto_ver) |
355 | { |
356 | case 0: |
357 | case 1: |
358 | case 2: |
359 | case 3: |
360 | // version change |
361 | if (view->proto_ver != wsrep_protocol_version) |
362 | { |
363 | my_bool wsrep_ready_saved= wsrep_ready; |
364 | wsrep_ready_set(FALSE); |
365 | WSREP_INFO("closing client connections for " |
366 | "protocol change %ld -> %d" , |
367 | wsrep_protocol_version, view->proto_ver); |
368 | wsrep_close_client_connections(TRUE); |
369 | wsrep_protocol_version= view->proto_ver; |
370 | wsrep_ready_set(wsrep_ready_saved); |
371 | } |
372 | break; |
373 | default: |
374 | WSREP_ERROR("Unsupported application protocol version: %d" , |
375 | view->proto_ver); |
376 | unireg_abort(1); |
377 | } |
378 | |
379 | if (view->state_gap) |
380 | { |
381 | WSREP_WARN("Gap in state sequence. Need state transfer." ); |
382 | |
383 | /* After that wsrep will call wsrep_sst_prepare. */ |
384 | /* keep ready flag 0 until we receive the snapshot */ |
385 | wsrep_ready_set(FALSE); |
386 | |
387 | /* Close client connections to ensure that they don't interfere |
388 | * with SST. Necessary only if storage engines are initialized |
389 | * before SST. |
390 | * TODO: Just killing all ongoing transactions should be enough |
391 | * since wsrep_ready is OFF and no new transactions can start. |
392 | */ |
393 | if (!wsrep_before_SE()) |
394 | { |
395 | WSREP_DEBUG("[debug]: closing client connections for PRIM" ); |
396 | wsrep_close_client_connections(FALSE); |
397 | } |
398 | |
399 | ssize_t const req_len= wsrep_sst_prepare (sst_req); |
400 | |
401 | if (req_len < 0) |
402 | { |
403 | WSREP_ERROR("SST preparation failed: %zd (%s)" , -req_len, |
404 | strerror(-req_len)); |
405 | memb_status= WSREP_MEMBER_UNDEFINED; |
406 | } |
407 | else |
408 | { |
409 | assert(sst_req != NULL); |
410 | *sst_req_len= req_len; |
411 | memb_status= WSREP_MEMBER_JOINER; |
412 | } |
413 | } |
414 | else |
415 | { |
416 | /* |
417 | * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized |
418 | * before - OR - it was reinitilized on startup (lp:992840) |
419 | */ |
420 | if (wsrep_startup) |
421 | { |
422 | if (wsrep_before_SE()) |
423 | { |
424 | wsrep_SE_init_grab(); |
425 | // Signal mysqld init thread to continue |
426 | wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); |
427 | // and wait for SE initialization |
428 | wsrep_SE_init_wait(); |
429 | } |
430 | else |
431 | { |
432 | local_uuid= cluster_uuid; |
433 | local_seqno= view->state_id.seqno; |
434 | } |
435 | /* Init storage engine XIDs from first view */ |
436 | wsrep_set_SE_checkpoint(local_uuid, local_seqno); |
437 | #ifdef GTID_SUPPORT |
438 | wsrep_init_sidno(local_uuid); |
439 | #endif /* GTID_SUPPORT */ |
440 | memb_status= WSREP_MEMBER_JOINED; |
441 | } |
442 | |
443 | // just some sanity check |
444 | if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) |
445 | { |
446 | WSREP_ERROR("Undetected state gap. Can't continue." ); |
447 | wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, |
448 | &local_uuid, -1); |
449 | unireg_abort(1); |
450 | } |
451 | } |
452 | |
453 | if (wsrep_auto_increment_control) |
454 | { |
455 | global_system_variables.auto_increment_offset= view->my_idx + 1; |
456 | global_system_variables.auto_increment_increment= view->memb_num; |
457 | } |
458 | |
459 | { /* capabilities may be updated on new configuration */ |
460 | uint64_t const caps(wsrep->capabilities (wsrep)); |
461 | |
462 | my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); |
463 | if (TRUE == wsrep_incremental_data_collection && FALSE == idc) |
464 | { |
465 | WSREP_WARN("Unsupported protocol downgrade: " |
466 | "incremental data collection disabled. Expect abort." ); |
467 | } |
468 | wsrep_incremental_data_collection = idc; |
469 | } |
470 | |
471 | out: |
472 | if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE; |
473 | wsrep_config_state->set(memb_status, view); |
474 | |
475 | return WSREP_CB_SUCCESS; |
476 | } |
477 | |
478 | void wsrep_ready_set (my_bool x) |
479 | { |
480 | WSREP_DEBUG("Setting wsrep_ready to %d" , x); |
481 | if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); |
482 | if (wsrep_ready != x) |
483 | { |
484 | wsrep_ready= x; |
485 | mysql_cond_signal (&COND_wsrep_ready); |
486 | } |
487 | mysql_mutex_unlock (&LOCK_wsrep_ready); |
488 | } |
489 | |
490 | // Wait until wsrep has reached ready state |
491 | void wsrep_ready_wait () |
492 | { |
493 | if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); |
494 | while (!wsrep_ready) |
495 | { |
496 | WSREP_INFO("Waiting to reach ready state" ); |
497 | mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready); |
498 | } |
499 | WSREP_INFO("ready state reached" ); |
500 | mysql_mutex_unlock (&LOCK_wsrep_ready); |
501 | } |
502 | |
503 | static void wsrep_synced_cb(void* app_ctx) |
504 | { |
505 | WSREP_INFO("Synchronized with group, ready for connections" ); |
506 | bool signal_main= false; |
507 | if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); |
508 | if (!wsrep_ready) |
509 | { |
510 | wsrep_ready= TRUE; |
511 | mysql_cond_signal (&COND_wsrep_ready); |
512 | signal_main= true; |
513 | |
514 | } |
515 | wsrep_config_state->set(WSREP_MEMBER_SYNCED); |
516 | mysql_mutex_unlock (&LOCK_wsrep_ready); |
517 | |
518 | if (signal_main) |
519 | { |
520 | wsrep_SE_init_grab(); |
521 | // Signal mysqld init thread to continue |
522 | wsrep_sst_complete (&local_uuid, local_seqno, false); |
523 | // and wait for SE initialization |
524 | wsrep_SE_init_wait(); |
525 | } |
526 | if (wsrep_restart_slave_activated) |
527 | { |
528 | int rcode; |
529 | WSREP_INFO("MariaDB slave restart" ); |
530 | wsrep_restart_slave_activated= FALSE; |
531 | |
532 | mysql_mutex_lock(&LOCK_active_mi); |
533 | if ((rcode = start_slave_threads(0, |
534 | 1 /* need mutex */, |
535 | 0 /* no wait for start*/, |
536 | active_mi, |
537 | master_info_file, |
538 | relay_log_info_file, |
539 | SLAVE_SQL))) |
540 | { |
541 | WSREP_WARN("Failed to create slave threads: %d" , rcode); |
542 | } |
543 | mysql_mutex_unlock(&LOCK_active_mi); |
544 | |
545 | } |
546 | } |
547 | |
548 | static void wsrep_init_position() |
549 | { |
550 | /* read XIDs from storage engines */ |
551 | wsrep_uuid_t uuid; |
552 | wsrep_seqno_t seqno; |
553 | wsrep_get_SE_checkpoint(uuid, seqno); |
554 | |
555 | if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t))) |
556 | { |
557 | WSREP_INFO("Read nil XID from storage engines, skipping position init" ); |
558 | return; |
559 | } |
560 | |
561 | char uuid_str[40] = {0, }; |
562 | wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); |
563 | WSREP_INFO("Initial position: %s:%lld" , uuid_str, (long long)seqno); |
564 | |
565 | if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && |
566 | local_seqno == WSREP_SEQNO_UNDEFINED) |
567 | { |
568 | // Initial state |
569 | local_uuid= uuid; |
570 | local_seqno= seqno; |
571 | } |
572 | else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) || |
573 | local_seqno != seqno) |
574 | { |
575 | WSREP_WARN("Initial position was provided by configuration or SST, " |
576 | "avoiding override" ); |
577 | } |
578 | } |
579 | |
580 | extern char* my_bind_addr_str; |
581 | |
582 | int wsrep_init() |
583 | { |
584 | int rcode= -1; |
585 | DBUG_ASSERT(wsrep_inited == 0); |
586 | |
587 | if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) && |
588 | wsrep_start_position_init(wsrep_start_position)) |
589 | { |
590 | return 1; |
591 | } |
592 | |
593 | wsrep_sst_auth_init(); |
594 | |
595 | wsrep_ready_set(FALSE); |
596 | assert(wsrep_provider); |
597 | |
598 | wsrep_init_position(); |
599 | |
600 | if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK) |
601 | { |
602 | if (strcasecmp(wsrep_provider, WSREP_NONE)) |
603 | { |
604 | WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider." , |
605 | wsrep_provider, strerror(rcode), rcode); |
606 | strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack |
607 | return wsrep_init(); |
608 | } |
609 | else /* this is for recursive call above */ |
610 | { |
611 | WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort." , |
612 | strerror(rcode), rcode); |
613 | unireg_abort(1); |
614 | } |
615 | } |
616 | |
617 | if (!WSREP_PROVIDER_EXISTS) |
618 | { |
619 | // enable normal operation in case no provider is specified |
620 | wsrep_ready_set(TRUE); |
621 | wsrep_inited= 1; |
622 | global_system_variables.wsrep_on = 0; |
623 | wsrep_init_args args; |
624 | args.logger_cb = wsrep_log_cb; |
625 | args.options = (wsrep_provider_options) ? |
626 | wsrep_provider_options : "" ; |
627 | rcode = wsrep->init(wsrep, &args); |
628 | if (rcode) |
629 | { |
630 | DBUG_PRINT("wsrep" ,("wsrep::init() failed: %d" , rcode)); |
631 | WSREP_ERROR("wsrep::init() failed: %d, must shutdown" , rcode); |
632 | wsrep->free(wsrep); |
633 | free(wsrep); |
634 | wsrep = NULL; |
635 | } |
636 | return rcode; |
637 | } |
638 | else |
639 | { |
640 | global_system_variables.wsrep_on = 1; |
641 | strncpy(provider_name, |
642 | wsrep->provider_name, sizeof(provider_name) - 1); |
643 | strncpy(provider_version, |
644 | wsrep->provider_version, sizeof(provider_version) - 1); |
645 | strncpy(provider_vendor, |
646 | wsrep->provider_vendor, sizeof(provider_vendor) - 1); |
647 | } |
648 | |
649 | /* Initialize node address */ |
650 | char node_addr[512]= { 0, }; |
651 | size_t const node_addr_max= sizeof(node_addr) - 1; |
652 | if (!wsrep_node_address || !strcmp(wsrep_node_address, "" )) |
653 | { |
654 | size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); |
655 | if (!(ret > 0 && ret < node_addr_max)) |
656 | { |
657 | WSREP_WARN("Failed to guess base node address. Set it explicitly via " |
658 | "wsrep_node_address." ); |
659 | node_addr[0]= '\0'; |
660 | } |
661 | } |
662 | else |
663 | { |
664 | strncpy(node_addr, wsrep_node_address, node_addr_max); |
665 | } |
666 | |
667 | /* Initialize node's incoming address */ |
668 | char inc_addr[512]= { 0, }; |
669 | size_t const inc_addr_max= sizeof (inc_addr); |
670 | |
671 | /* |
672 | In case wsrep_node_incoming_address is either not set or set to AUTO, |
673 | we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback |
674 | to wsrep_node_address' value if mysqld's bind-address is not set either. |
675 | */ |
676 | if ((!wsrep_node_incoming_address || |
677 | !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) |
678 | { |
679 | bool is_ipv6= false; |
680 | unsigned int my_bind_ip= INADDR_ANY; // default if not set |
681 | |
682 | if (my_bind_addr_str && strlen(my_bind_addr_str)) |
683 | { |
684 | my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6); |
685 | } |
686 | |
687 | if (INADDR_ANY != my_bind_ip) |
688 | { |
689 | /* |
690 | If its a not a valid address, leave inc_addr as empty string. mysqld |
691 | is not listening for client connections on network interfaces. |
692 | */ |
693 | if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) |
694 | { |
695 | const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u" ; |
696 | snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port); |
697 | } |
698 | } |
699 | else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */ |
700 | { |
701 | size_t const node_addr_len= strlen(node_addr); |
702 | if (node_addr_len > 0) |
703 | { |
704 | wsp::Address addr(node_addr); |
705 | |
706 | if (!addr.is_valid()) |
707 | { |
708 | WSREP_DEBUG("Could not parse node address : %s" , node_addr); |
709 | WSREP_WARN("Guessing address for incoming client connections failed. " |
710 | "Try setting wsrep_node_incoming_address explicitly." ); |
711 | goto done; |
712 | } |
713 | |
714 | const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u" ; |
715 | snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), |
716 | (int) mysqld_port); |
717 | } |
718 | } |
719 | } |
720 | else |
721 | { |
722 | wsp::Address addr(wsrep_node_incoming_address); |
723 | |
724 | if (!addr.is_valid()) |
725 | { |
726 | WSREP_WARN("Could not parse wsrep_node_incoming_address : %s" , |
727 | wsrep_node_incoming_address); |
728 | goto done; |
729 | } |
730 | |
731 | /* |
732 | In case port is not specified in wsrep_node_incoming_address, we use |
733 | mysqld_port. |
734 | */ |
735 | int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port; |
736 | const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u" ; |
737 | |
738 | snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port); |
739 | } |
740 | |
741 | done: |
742 | struct wsrep_init_args wsrep_args; |
743 | |
744 | struct wsrep_gtid const state_id = { local_uuid, local_seqno }; |
745 | |
746 | wsrep_args.data_dir = wsrep_data_home_dir; |
747 | wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "" ; |
748 | wsrep_args.node_address = node_addr; |
749 | wsrep_args.node_incoming = inc_addr; |
750 | wsrep_args.options = (wsrep_provider_options) ? |
751 | wsrep_provider_options : "" ; |
752 | wsrep_args.proto_ver = wsrep_max_protocol_version; |
753 | |
754 | wsrep_args.state_id = &state_id; |
755 | |
756 | wsrep_args.logger_cb = wsrep_log_cb; |
757 | wsrep_args.view_handler_cb = wsrep_view_handler_cb; |
758 | wsrep_args.apply_cb = wsrep_apply_cb; |
759 | wsrep_args.commit_cb = wsrep_commit_cb; |
760 | wsrep_args.unordered_cb = wsrep_unordered_cb; |
761 | wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; |
762 | wsrep_args.synced_cb = wsrep_synced_cb; |
763 | |
764 | rcode = wsrep->init(wsrep, &wsrep_args); |
765 | |
766 | if (rcode) |
767 | { |
768 | DBUG_PRINT("wsrep" ,("wsrep::init() failed: %d" , rcode)); |
769 | WSREP_ERROR("wsrep::init() failed: %d, must shutdown" , rcode); |
770 | wsrep->free(wsrep); |
771 | free(wsrep); |
772 | wsrep = NULL; |
773 | } else { |
774 | wsrep_inited= 1; |
775 | } |
776 | |
777 | return rcode; |
778 | } |
779 | |
780 | |
781 | /* Initialize wsrep thread LOCKs and CONDs */ |
782 | void wsrep_thr_init() |
783 | { |
784 | DBUG_ENTER("wsrep_thr_init" ); |
785 | wsrep_config_state = new wsp::Config_state; |
786 | #ifdef HAVE_PSI_INTERFACE |
787 | mysql_mutex_register("sql" , wsrep_mutexes, array_elements(wsrep_mutexes)); |
788 | mysql_cond_register("sql" , wsrep_conds, array_elements(wsrep_conds)); |
789 | mysql_file_register("sql" , wsrep_files, array_elements(wsrep_files)); |
790 | #endif |
791 | |
792 | mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); |
793 | mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); |
794 | mysql_mutex_init(key_LOCK_wsrep_sst, &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); |
795 | mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); |
796 | mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); |
797 | mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); |
798 | mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); |
799 | mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); |
800 | mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); |
801 | mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); |
802 | mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); |
803 | mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); |
804 | mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); |
805 | DBUG_VOID_RETURN; |
806 | } |
807 | |
808 | void wsrep_init_startup (bool first) |
809 | { |
810 | if (wsrep_init()) unireg_abort(1); |
811 | |
812 | wsrep_thr_lock_init( |
813 | (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF, |
814 | (wsrep_abort_thd_fun)wsrep_abort_thd, |
815 | wsrep_debug, wsrep_convert_LOCK_to_trx, |
816 | (wsrep_on_fun)wsrep_on); |
817 | |
818 | /* |
819 | Pre-initialize global_system_variables.table_plugin with a dummy engine |
820 | (placeholder) required during the initialization of wsrep threads (THDs). |
821 | (see: plugin_thdvar_init()) |
822 | Note: This only needs to be done for rsync & xtrabackup based SST methods. |
823 | In case of mysqldump SST method, the wsrep threads are created after the |
824 | server plugins & global system variables are initialized. |
825 | */ |
826 | if (wsrep_before_SE()) |
827 | wsrep_plugins_pre_init(); |
828 | |
829 | /* Skip replication start if dummy wsrep provider is loaded */ |
830 | if (!strcmp(wsrep_provider, WSREP_NONE)) return; |
831 | |
832 | /* Skip replication start if no cluster address */ |
833 | if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return; |
834 | |
835 | if (first) wsrep_sst_grab(); // do it so we can wait for SST below |
836 | |
837 | if (!wsrep_start_replication()) unireg_abort(1); |
838 | |
839 | wsrep_create_rollbacker(); |
840 | wsrep_create_appliers(1); |
841 | |
842 | if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed |
843 | } |
844 | |
845 | |
846 | void wsrep_deinit(bool free_options) |
847 | { |
848 | DBUG_ASSERT(wsrep_inited == 1); |
849 | wsrep_unload(wsrep); |
850 | wsrep= 0; |
851 | provider_name[0]= '\0'; |
852 | provider_version[0]= '\0'; |
853 | provider_vendor[0]= '\0'; |
854 | |
855 | wsrep_inited= 0; |
856 | |
857 | if (free_options) |
858 | { |
859 | wsrep_sst_auth_free(); |
860 | } |
861 | } |
862 | |
863 | /* Destroy wsrep thread LOCKs and CONDs */ |
864 | void wsrep_thr_deinit() |
865 | { |
866 | if (!wsrep_config_state) |
867 | return; // Never initialized |
868 | mysql_mutex_destroy(&LOCK_wsrep_ready); |
869 | mysql_cond_destroy(&COND_wsrep_ready); |
870 | mysql_mutex_destroy(&LOCK_wsrep_sst); |
871 | mysql_cond_destroy(&COND_wsrep_sst); |
872 | mysql_mutex_destroy(&LOCK_wsrep_sst_init); |
873 | mysql_cond_destroy(&COND_wsrep_sst_init); |
874 | mysql_mutex_destroy(&LOCK_wsrep_rollback); |
875 | mysql_cond_destroy(&COND_wsrep_rollback); |
876 | mysql_mutex_destroy(&LOCK_wsrep_replaying); |
877 | mysql_cond_destroy(&COND_wsrep_replaying); |
878 | mysql_mutex_destroy(&LOCK_wsrep_slave_threads); |
879 | mysql_mutex_destroy(&LOCK_wsrep_desync); |
880 | mysql_mutex_destroy(&LOCK_wsrep_config_state); |
881 | delete wsrep_config_state; |
882 | wsrep_config_state= 0; // Safety |
883 | } |
884 | |
885 | void wsrep_recover() |
886 | { |
887 | char uuid_str[40]; |
888 | |
889 | if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) && |
890 | local_seqno == -2) |
891 | { |
892 | wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str)); |
893 | WSREP_INFO("Position %s:%lld given at startup, skipping position recovery" , |
894 | uuid_str, (long long)local_seqno); |
895 | return; |
896 | } |
897 | wsrep_uuid_t uuid; |
898 | wsrep_seqno_t seqno; |
899 | wsrep_get_SE_checkpoint(uuid, seqno); |
900 | wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); |
901 | WSREP_INFO("Recovered position: %s:%lld" , uuid_str, (long long)seqno); |
902 | } |
903 | |
904 | |
905 | void wsrep_stop_replication(THD *thd) |
906 | { |
907 | WSREP_INFO("Stop replication" ); |
908 | if (!wsrep) |
909 | { |
910 | WSREP_INFO("Provider was not loaded, in stop replication" ); |
911 | return; |
912 | } |
913 | |
914 | /* disconnect from group first to get wsrep_ready == FALSE */ |
915 | WSREP_DEBUG("Provider disconnect" ); |
916 | wsrep->disconnect(wsrep); |
917 | |
918 | wsrep_connected= FALSE; |
919 | |
920 | wsrep_close_client_connections(TRUE); |
921 | |
922 | /* wait until appliers have stopped */ |
923 | wsrep_wait_appliers_close(thd); |
924 | |
925 | return; |
926 | } |
927 | |
928 | bool wsrep_start_replication() |
929 | { |
930 | wsrep_status_t rcode; |
931 | |
932 | /* wsrep provider must be loaded. */ |
933 | DBUG_ASSERT(wsrep); |
934 | |
935 | /* |
936 | if provider is trivial, don't even try to connect, |
937 | but resume local node operation |
938 | */ |
939 | if (!WSREP_PROVIDER_EXISTS) |
940 | { |
941 | // enable normal operation in case no provider is specified |
942 | wsrep_ready_set(TRUE); |
943 | return true; |
944 | } |
945 | |
946 | if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0) |
947 | { |
948 | // if provider is non-trivial, but no address is specified, wait for address |
949 | wsrep_ready_set(FALSE); |
950 | return true; |
951 | } |
952 | |
953 | bool const bootstrap= wsrep_new_cluster; |
954 | |
955 | WSREP_INFO("Start replication" ); |
956 | |
957 | if (wsrep_new_cluster) |
958 | { |
959 | WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster" ); |
960 | wsrep_new_cluster= false; |
961 | } |
962 | |
963 | if ((rcode = wsrep->connect(wsrep, |
964 | wsrep_cluster_name, |
965 | wsrep_cluster_address, |
966 | wsrep_sst_donor, |
967 | bootstrap))) |
968 | { |
969 | DBUG_PRINT("wsrep" ,("wsrep->connect(%s) failed: %d" , |
970 | wsrep_cluster_address, rcode)); |
971 | WSREP_ERROR("wsrep::connect(%s) failed: %d" , |
972 | wsrep_cluster_address, rcode); |
973 | return false; |
974 | } |
975 | else |
976 | { |
977 | wsrep_connected= TRUE; |
978 | |
979 | char* opts= wsrep->options_get(wsrep); |
980 | if (opts) |
981 | { |
982 | wsrep_provider_options_init(opts); |
983 | free(opts); |
984 | } |
985 | else |
986 | { |
987 | WSREP_WARN("Failed to get wsrep options" ); |
988 | } |
989 | } |
990 | |
991 | return true; |
992 | } |
993 | |
994 | bool wsrep_must_sync_wait (THD* thd, uint mask) |
995 | { |
996 | return (thd->variables.wsrep_sync_wait & mask) && |
997 | thd->variables.wsrep_on && |
998 | !thd->in_active_multi_stmt_transaction() && |
999 | thd->wsrep_conflict_state != REPLAYING && |
1000 | thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED; |
1001 | } |
1002 | |
1003 | bool wsrep_sync_wait (THD* thd, uint mask) |
1004 | { |
1005 | if (wsrep_must_sync_wait(thd, mask)) |
1006 | { |
1007 | WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u" , |
1008 | thd->variables.wsrep_sync_wait, mask); |
1009 | // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 |
1010 | // TODO: modify to check if thd has locked any rows. |
1011 | wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid); |
1012 | |
1013 | if (unlikely(WSREP_OK != ret)) |
1014 | { |
1015 | const char* msg; |
1016 | int err; |
1017 | |
1018 | // Possibly relevant error codes: |
1019 | // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, |
1020 | // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, |
1021 | // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED |
1022 | |
1023 | switch (ret) |
1024 | { |
1025 | case WSREP_NOT_IMPLEMENTED: |
1026 | msg= "synchronous reads by wsrep backend. " |
1027 | "Please unset wsrep_causal_reads variable." ; |
1028 | err= ER_NOT_SUPPORTED_YET; |
1029 | break; |
1030 | default: |
1031 | msg= "Synchronous wait failed." ; |
1032 | err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed |
1033 | // with ER_LOCK_WAIT_TIMEOUT |
1034 | } |
1035 | |
1036 | my_error(err, MYF(0), msg); |
1037 | |
1038 | return true; |
1039 | } |
1040 | } |
1041 | |
1042 | return false; |
1043 | } |
1044 | |
1045 | /* |
1046 | * Helpers to deal with TOI key arrays |
1047 | */ |
1048 | typedef struct wsrep_key_arr |
1049 | { |
1050 | wsrep_key_t* keys; |
1051 | size_t keys_len; |
1052 | } wsrep_key_arr_t; |
1053 | |
1054 | |
1055 | static void wsrep_keys_free(wsrep_key_arr_t* key_arr) |
1056 | { |
1057 | for (size_t i= 0; i < key_arr->keys_len; ++i) |
1058 | { |
1059 | my_free((void*)key_arr->keys[i].key_parts); |
1060 | } |
1061 | my_free(key_arr->keys); |
1062 | key_arr->keys= 0; |
1063 | key_arr->keys_len= 0; |
1064 | } |
1065 | |
1066 | |
1067 | /*! |
1068 | * @param db Database string |
1069 | * @param table Table string |
1070 | * @param key Array of wsrep_key_t |
1071 | * @param key_len In: number of elements in key array, Out: number of |
1072 | * elements populated |
1073 | * |
1074 | * @return true if preparation was successful, otherwise false. |
1075 | */ |
1076 | |
1077 | static bool wsrep_prepare_key_for_isolation(const char* db, |
1078 | const char* table, |
1079 | wsrep_buf_t* key, |
1080 | size_t* key_len) |
1081 | { |
1082 | if (*key_len < 2) return false; |
1083 | |
1084 | switch (wsrep_protocol_version) |
1085 | { |
1086 | case 0: |
1087 | *key_len= 0; |
1088 | break; |
1089 | case 1: |
1090 | case 2: |
1091 | case 3: |
1092 | { |
1093 | *key_len= 0; |
1094 | if (db) |
1095 | { |
1096 | // sql_print_information("%s.%s", db, table); |
1097 | if (db) |
1098 | { |
1099 | key[*key_len].ptr= db; |
1100 | key[*key_len].len= strlen(db); |
1101 | ++(*key_len); |
1102 | if (table) |
1103 | { |
1104 | key[*key_len].ptr= table; |
1105 | key[*key_len].len= strlen(table); |
1106 | ++(*key_len); |
1107 | } |
1108 | } |
1109 | } |
1110 | break; |
1111 | } |
1112 | default: |
1113 | return false; |
1114 | } |
1115 | |
1116 | return true; |
1117 | } |
1118 | |
1119 | /* Prepare key list from db/table and table_list */ |
1120 | static bool wsrep_prepare_keys_for_isolation(THD* thd, |
1121 | const char* db, |
1122 | const char* table, |
1123 | const TABLE_LIST* table_list, |
1124 | wsrep_key_arr_t* ka) |
1125 | { |
1126 | ka->keys= 0; |
1127 | ka->keys_len= 0; |
1128 | |
1129 | if (db || table) |
1130 | { |
1131 | if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) |
1132 | { |
1133 | WSREP_ERROR("Can't allocate memory for key_array" ); |
1134 | goto err; |
1135 | } |
1136 | ka->keys_len= 1; |
1137 | if (!(ka->keys[0].key_parts= (wsrep_buf_t*) |
1138 | my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) |
1139 | { |
1140 | WSREP_ERROR("Can't allocate memory for key_parts" ); |
1141 | goto err; |
1142 | } |
1143 | ka->keys[0].key_parts_num= 2; |
1144 | if (!wsrep_prepare_key_for_isolation( |
1145 | db, table, |
1146 | (wsrep_buf_t*)ka->keys[0].key_parts, |
1147 | &ka->keys[0].key_parts_num)) |
1148 | { |
1149 | WSREP_ERROR("Preparing keys for isolation failed (1)" ); |
1150 | goto err; |
1151 | } |
1152 | } |
1153 | |
1154 | for (const TABLE_LIST* table= table_list; table; table= table->next_global) |
1155 | { |
1156 | wsrep_key_t* tmp; |
1157 | if (ka->keys) |
1158 | tmp= (wsrep_key_t*)my_realloc(ka->keys, |
1159 | (ka->keys_len + 1) * sizeof(wsrep_key_t), |
1160 | MYF(0)); |
1161 | else |
1162 | tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); |
1163 | |
1164 | if (!tmp) |
1165 | { |
1166 | WSREP_ERROR("Can't allocate memory for key_array" ); |
1167 | goto err; |
1168 | } |
1169 | ka->keys= tmp; |
1170 | if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) |
1171 | my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) |
1172 | { |
1173 | WSREP_ERROR("Can't allocate memory for key_parts" ); |
1174 | goto err; |
1175 | } |
1176 | ka->keys[ka->keys_len].key_parts_num= 2; |
1177 | ++ka->keys_len; |
1178 | if (!wsrep_prepare_key_for_isolation(table->db.str, table->table_name.str, |
1179 | (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, |
1180 | &ka->keys[ka->keys_len - 1].key_parts_num)) |
1181 | { |
1182 | WSREP_ERROR("Preparing keys for isolation failed (2)" ); |
1183 | goto err; |
1184 | } |
1185 | } |
1186 | return 0; |
1187 | err: |
1188 | wsrep_keys_free(ka); |
1189 | return 1; |
1190 | } |
1191 | |
1192 | |
1193 | bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len, |
1194 | const uchar* row_id, size_t row_id_len, |
1195 | wsrep_buf_t* key, size_t* key_len) |
1196 | { |
1197 | if (*key_len < 3) return false; |
1198 | |
1199 | *key_len= 0; |
1200 | switch (wsrep_protocol_version) |
1201 | { |
1202 | case 0: |
1203 | { |
1204 | key[0].ptr = cache_key; |
1205 | key[0].len = cache_key_len; |
1206 | |
1207 | *key_len = 1; |
1208 | break; |
1209 | } |
1210 | case 1: |
1211 | case 2: |
1212 | case 3: |
1213 | { |
1214 | key[0].ptr = cache_key; |
1215 | key[0].len = strlen( (char*)cache_key ); |
1216 | |
1217 | key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; |
1218 | key[1].len = strlen( (char*)(key[1].ptr) ); |
1219 | |
1220 | *key_len = 2; |
1221 | break; |
1222 | } |
1223 | default: |
1224 | return false; |
1225 | } |
1226 | |
1227 | key[*key_len].ptr = row_id; |
1228 | key[*key_len].len = row_id_len; |
1229 | ++(*key_len); |
1230 | |
1231 | return true; |
1232 | } |
1233 | |
1234 | |
1235 | /* |
1236 | * Construct Query_log_Event from thd query and serialize it |
1237 | * into buffer. |
1238 | * |
1239 | * Return 0 in case of success, 1 in case of error. |
1240 | */ |
1241 | int wsrep_to_buf_helper( |
1242 | THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) |
1243 | { |
1244 | IO_CACHE tmp_io_cache; |
1245 | Log_event_writer writer(&tmp_io_cache,0); |
1246 | if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, |
1247 | 65536, MYF(MY_WME))) |
1248 | return 1; |
1249 | int ret(0); |
1250 | enum enum_binlog_checksum_alg current_binlog_check_alg= |
1251 | (enum_binlog_checksum_alg) binlog_checksum_options; |
1252 | |
1253 | Format_description_log_event *tmp_fd= new Format_description_log_event(4); |
1254 | tmp_fd->checksum_alg= current_binlog_check_alg; |
1255 | writer.write(tmp_fd); |
1256 | delete tmp_fd; |
1257 | |
1258 | #ifdef GTID_SUPPORT |
1259 | if (thd->variables.gtid_next.type == GTID_GROUP) |
1260 | { |
1261 | Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next); |
1262 | if (!gtid_ev.is_valid()) ret= 0; |
1263 | if (!ret && writer.write(>id_ev)) ret= 1; |
1264 | } |
1265 | #endif /* GTID_SUPPORT */ |
1266 | if (wsrep_gtid_mode && thd->variables.gtid_seq_no) |
1267 | { |
1268 | Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no, |
1269 | thd->variables.gtid_domain_id, |
1270 | true, LOG_EVENT_SUPPRESS_USE_F, |
1271 | true, 0); |
1272 | gtid_event.server_id= thd->variables.server_id; |
1273 | if (!gtid_event.is_valid()) ret= 0; |
1274 | ret= writer.write(>id_event); |
1275 | } |
1276 | |
1277 | /* if there is prepare query, add event for it */ |
1278 | if (!ret && thd->wsrep_TOI_pre_query) |
1279 | { |
1280 | Query_log_event ev(thd, thd->wsrep_TOI_pre_query, |
1281 | thd->wsrep_TOI_pre_query_len, |
1282 | FALSE, FALSE, FALSE, 0); |
1283 | ev.checksum_alg= current_binlog_check_alg; |
1284 | if (writer.write(&ev)) ret= 1; |
1285 | } |
1286 | |
1287 | /* continue to append the actual query */ |
1288 | Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); |
1289 | ev.checksum_alg= current_binlog_check_alg; |
1290 | if (!ret && writer.write(&ev)) ret= 1; |
1291 | if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; |
1292 | close_cached_file(&tmp_io_cache); |
1293 | return ret; |
1294 | } |
1295 | |
1296 | static int |
1297 | wsrep_alter_query_string(THD *thd, String *buf) |
1298 | { |
1299 | /* Append the "ALTER" part of the query */ |
1300 | if (buf->append(STRING_WITH_LEN("ALTER " ))) |
1301 | return 1; |
1302 | /* Append definer */ |
1303 | append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host)); |
1304 | /* Append the left part of thd->query after event name part */ |
1305 | if (buf->append(thd->lex->stmt_definition_begin, |
1306 | thd->lex->stmt_definition_end - |
1307 | thd->lex->stmt_definition_begin)) |
1308 | return 1; |
1309 | |
1310 | return 0; |
1311 | } |
1312 | |
1313 | static int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len) |
1314 | { |
1315 | String log_query; |
1316 | |
1317 | if (wsrep_alter_query_string(thd, &log_query)) |
1318 | { |
1319 | WSREP_WARN("events alter string failed: schema: %s, query: %s" , |
1320 | thd->get_db(), thd->query()); |
1321 | return 1; |
1322 | } |
1323 | return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); |
1324 | } |
1325 | |
1326 | #include "sql_show.h" |
1327 | static int |
1328 | create_view_query(THD *thd, uchar** buf, size_t* buf_len) |
1329 | { |
1330 | LEX *lex= thd->lex; |
1331 | SELECT_LEX *select_lex= &lex->select_lex; |
1332 | TABLE_LIST *first_table= select_lex->table_list.first; |
1333 | TABLE_LIST *views = first_table; |
1334 | LEX_USER *definer; |
1335 | String buff; |
1336 | const LEX_CSTRING command[3]= |
1337 | {{ STRING_WITH_LEN("CREATE " ) }, |
1338 | { STRING_WITH_LEN("ALTER " ) }, |
1339 | { STRING_WITH_LEN("CREATE OR REPLACE " ) }}; |
1340 | |
1341 | buff.append(&command[thd->lex->create_view->mode]); |
1342 | |
1343 | if (lex->definer) |
1344 | definer= get_current_user(thd, lex->definer); |
1345 | else |
1346 | { |
1347 | /* |
1348 | DEFINER-clause is missing; we have to create default definer in |
1349 | persistent arena to be PS/SP friendly. |
1350 | If this is an ALTER VIEW then the current user should be set as |
1351 | the definer. |
1352 | */ |
1353 | definer= create_default_definer(thd, false); |
1354 | } |
1355 | |
1356 | if (definer) |
1357 | { |
1358 | views->definer.user = definer->user; |
1359 | views->definer.host = definer->host; |
1360 | } else { |
1361 | WSREP_ERROR("Failed to get DEFINER for VIEW." ); |
1362 | return 1; |
1363 | } |
1364 | |
1365 | views->algorithm = lex->create_view->algorithm; |
1366 | views->view_suid = lex->create_view->suid; |
1367 | views->with_check = lex->create_view->check; |
1368 | |
1369 | view_store_options(thd, views, &buff); |
1370 | buff.append(STRING_WITH_LEN("VIEW " )); |
1371 | /* Test if user supplied a db (ie: we did not use thd->db) */ |
1372 | if (views->db.str && views->db.str[0] && |
1373 | (thd->db.str == NULL || cmp(&views->db, &thd->db))) |
1374 | { |
1375 | append_identifier(thd, &buff, &views->db); |
1376 | buff.append('.'); |
1377 | } |
1378 | append_identifier(thd, &buff, &views->table_name); |
1379 | if (lex->view_list.elements) |
1380 | { |
1381 | List_iterator_fast<LEX_CSTRING> names(lex->view_list); |
1382 | LEX_CSTRING *name; |
1383 | int i; |
1384 | |
1385 | for (i= 0; (name= names++); i++) |
1386 | { |
1387 | buff.append(i ? ", " : "(" ); |
1388 | append_identifier(thd, &buff, name); |
1389 | } |
1390 | buff.append(')'); |
1391 | } |
1392 | buff.append(STRING_WITH_LEN(" AS " )); |
1393 | //buff.append(views->source.str, views->source.length); |
1394 | buff.append(thd->lex->create_view->select.str, |
1395 | thd->lex->create_view->select.length); |
1396 | //int errcode= query_error_code(thd, TRUE); |
1397 | //if (thd->binlog_query(THD::STMT_QUERY_TYPE, |
1398 | // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod |
1399 | return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); |
1400 | } |
1401 | |
1402 | /* Forward declarations. */ |
1403 | static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len); |
1404 | static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); |
1405 | |
1406 | /* |
1407 | Decide if statement should run in TOI. |
1408 | |
1409 | Look if table or table_list contain temporary tables. If the |
1410 | statement affects only temporary tables, statement should not run |
1411 | in TOI. If the table list contains mix of regular and temporary tables |
1412 | (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but |
1413 | should be rewritten at later time for replication to contain only |
1414 | non-temporary tables. |
1415 | */ |
1416 | static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, |
1417 | const TABLE_LIST *table_list) |
1418 | { |
1419 | DBUG_ASSERT(!table || db); |
1420 | DBUG_ASSERT(table_list || db); |
1421 | |
1422 | LEX* lex= thd->lex; |
1423 | SELECT_LEX* select_lex= &lex->select_lex; |
1424 | TABLE_LIST* first_table= select_lex->table_list.first; |
1425 | |
1426 | switch (lex->sql_command) |
1427 | { |
1428 | case SQLCOM_CREATE_TABLE: |
1429 | DBUG_ASSERT(!table_list); |
1430 | if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) |
1431 | { |
1432 | return false; |
1433 | } |
1434 | return true; |
1435 | |
1436 | case SQLCOM_CREATE_VIEW: |
1437 | |
1438 | DBUG_ASSERT(!table_list); |
1439 | DBUG_ASSERT(first_table); /* First table is view name */ |
1440 | /* |
1441 | If any of the remaining tables refer to temporary table error |
1442 | is returned to client, so TOI can be skipped |
1443 | */ |
1444 | for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global) |
1445 | { |
1446 | if (thd->find_temporary_table(it)) |
1447 | { |
1448 | return false; |
1449 | } |
1450 | } |
1451 | return true; |
1452 | |
1453 | case SQLCOM_CREATE_TRIGGER: |
1454 | |
1455 | DBUG_ASSERT(!table_list); |
1456 | DBUG_ASSERT(first_table); |
1457 | |
1458 | if (thd->find_temporary_table(first_table)) |
1459 | { |
1460 | return false; |
1461 | } |
1462 | return true; |
1463 | |
1464 | default: |
1465 | if (table && !thd->find_temporary_table(db, table)) |
1466 | { |
1467 | return true; |
1468 | } |
1469 | |
1470 | if (table_list) |
1471 | { |
1472 | for (TABLE_LIST* table= first_table; table; table= table->next_global) |
1473 | { |
1474 | if (!thd->find_temporary_table(table->db.str, table->table_name.str)) |
1475 | { |
1476 | return true; |
1477 | } |
1478 | } |
1479 | } |
1480 | return !(table || table_list); |
1481 | } |
1482 | } |
1483 | |
1484 | /* |
1485 | returns: |
1486 | 0: statement was replicated as TOI |
1487 | 1: TOI replication was skipped |
1488 | -1: TOI replication failed |
1489 | */ |
1490 | static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_, |
1491 | const TABLE_LIST* table_list) |
1492 | { |
1493 | wsrep_status_t ret(WSREP_WARNING); |
1494 | uchar* buf(0); |
1495 | size_t buf_len(0); |
1496 | int buf_err; |
1497 | int rc= 0; |
1498 | |
1499 | if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) |
1500 | { |
1501 | WSREP_DEBUG("No TOI for %s" , WSREP_QUERY(thd)); |
1502 | return 1; |
1503 | } |
1504 | |
1505 | WSREP_DEBUG("TO BEGIN: %lld, %d : %s" , (long long)wsrep_thd_trx_seqno(thd), |
1506 | thd->wsrep_exec_mode, thd->query() ); |
1507 | switch (thd->lex->sql_command) |
1508 | { |
1509 | case SQLCOM_CREATE_VIEW: |
1510 | buf_err= create_view_query(thd, &buf, &buf_len); |
1511 | break; |
1512 | case SQLCOM_CREATE_PROCEDURE: |
1513 | case SQLCOM_CREATE_SPFUNCTION: |
1514 | buf_err= wsrep_create_sp(thd, &buf, &buf_len); |
1515 | break; |
1516 | case SQLCOM_CREATE_TRIGGER: |
1517 | buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len); |
1518 | break; |
1519 | case SQLCOM_CREATE_EVENT: |
1520 | buf_err= wsrep_create_event_query(thd, &buf, &buf_len); |
1521 | break; |
1522 | case SQLCOM_ALTER_EVENT: |
1523 | buf_err= wsrep_alter_event_query(thd, &buf, &buf_len); |
1524 | break; |
1525 | case SQLCOM_CREATE_ROLE: |
1526 | if (sp_process_definer(thd)) |
1527 | { |
1528 | WSREP_WARN("Failed to set CREATE ROLE definer for TOI." ); |
1529 | } |
1530 | /* fallthrough */ |
1531 | default: |
1532 | buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), |
1533 | &buf, &buf_len); |
1534 | break; |
1535 | } |
1536 | |
1537 | wsrep_key_arr_t key_arr= {0, 0}; |
1538 | struct wsrep_buf buff = { buf, buf_len }; |
1539 | if (!buf_err && |
1540 | !wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr) && |
1541 | key_arr.keys_len > 0 && |
1542 | WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, |
1543 | key_arr.keys, key_arr.keys_len, |
1544 | &buff, 1, |
1545 | &thd->wsrep_trx_meta))) |
1546 | { |
1547 | thd->wsrep_exec_mode= TOTAL_ORDER; |
1548 | wsrep_to_isolation++; |
1549 | wsrep_keys_free(&key_arr); |
1550 | WSREP_DEBUG("TO BEGIN: %lld, %d" ,(long long)wsrep_thd_trx_seqno(thd), |
1551 | thd->wsrep_exec_mode); |
1552 | } |
1553 | else if (key_arr.keys_len > 0) { |
1554 | /* jump to error handler in mysql_execute_command() */ |
1555 | WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep " |
1556 | "connection state and retry the query." , |
1557 | ret, |
1558 | thd->get_db(), |
1559 | (thd->query()) ? thd->query() : "void" ); |
1560 | my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check " |
1561 | "your wsrep connection state and retry the query." , MYF(0)); |
1562 | wsrep_keys_free(&key_arr); |
1563 | rc= -1; |
1564 | } |
1565 | else { |
1566 | /* non replicated DDL, affecting temporary tables only */ |
1567 | WSREP_DEBUG("TO isolation skipped for: %d, sql: %s." |
1568 | "Only temporary tables affected." , |
1569 | ret, (thd->query()) ? thd->query() : "void" ); |
1570 | rc= 1; |
1571 | } |
1572 | if (buf) my_free(buf); |
1573 | return rc; |
1574 | } |
1575 | |
1576 | static void wsrep_TOI_end(THD *thd) { |
1577 | wsrep_status_t ret; |
1578 | wsrep_to_isolation--; |
1579 | |
1580 | WSREP_DEBUG("TO END: %lld, %d : %s" , (long long)wsrep_thd_trx_seqno(thd), |
1581 | thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void" ); |
1582 | |
1583 | wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, |
1584 | thd->wsrep_trx_meta.gtid.seqno); |
1585 | WSREP_DEBUG("TO END: %lld, update seqno" , |
1586 | (long long)wsrep_thd_trx_seqno(thd)); |
1587 | |
1588 | if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { |
1589 | WSREP_DEBUG("TO END: %lld" , (long long)wsrep_thd_trx_seqno(thd)); |
1590 | } |
1591 | else { |
1592 | WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s" , |
1593 | ret, |
1594 | thd->get_db(), |
1595 | (thd->query()) ? thd->query() : "void" ); |
1596 | } |
1597 | } |
1598 | |
1599 | static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_) |
1600 | { |
1601 | wsrep_status_t ret(WSREP_WARNING); |
1602 | WSREP_DEBUG("RSU BEGIN: %lld, %d : %s" , (long long)wsrep_thd_trx_seqno(thd), |
1603 | thd->wsrep_exec_mode, thd->query() ); |
1604 | |
1605 | ret = wsrep->desync(wsrep); |
1606 | if (ret != WSREP_OK) |
1607 | { |
1608 | WSREP_WARN("RSU desync failed %d for schema: %s, query: %s" , |
1609 | ret, thd->get_db(), thd->query()); |
1610 | my_error(ER_LOCK_DEADLOCK, MYF(0)); |
1611 | return(ret); |
1612 | } |
1613 | |
1614 | mysql_mutex_lock(&LOCK_wsrep_replaying); |
1615 | wsrep_replaying++; |
1616 | mysql_mutex_unlock(&LOCK_wsrep_replaying); |
1617 | |
1618 | if (wsrep_wait_committing_connections_close(5000)) |
1619 | { |
1620 | /* no can do, bail out from DDL */ |
1621 | WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s" , |
1622 | thd->get_db(), thd->query()); |
1623 | mysql_mutex_lock(&LOCK_wsrep_replaying); |
1624 | wsrep_replaying--; |
1625 | mysql_mutex_unlock(&LOCK_wsrep_replaying); |
1626 | |
1627 | ret = wsrep->resync(wsrep); |
1628 | if (ret != WSREP_OK) |
1629 | { |
1630 | WSREP_WARN("resync failed %d for schema: %s, query: %s" , |
1631 | ret, thd->get_db(), thd->query()); |
1632 | } |
1633 | |
1634 | my_error(ER_LOCK_DEADLOCK, MYF(0)); |
1635 | return(1); |
1636 | } |
1637 | |
1638 | wsrep_seqno_t seqno = wsrep->pause(wsrep); |
1639 | if (seqno == WSREP_SEQNO_UNDEFINED) |
1640 | { |
1641 | WSREP_WARN("pause failed %lld for schema: %s, query: %s" , (long long)seqno, |
1642 | thd->get_db(), thd->query()); |
1643 | return(1); |
1644 | } |
1645 | WSREP_DEBUG("paused at %lld" , (long long)seqno); |
1646 | thd->variables.wsrep_on = 0; |
1647 | return 0; |
1648 | } |
1649 | |
1650 | static void wsrep_RSU_end(THD *thd) |
1651 | { |
1652 | wsrep_status_t ret(WSREP_WARNING); |
1653 | WSREP_DEBUG("RSU END: %lld, %d : %s" , (long long)wsrep_thd_trx_seqno(thd), |
1654 | thd->wsrep_exec_mode, thd->query() ); |
1655 | |
1656 | |
1657 | mysql_mutex_lock(&LOCK_wsrep_replaying); |
1658 | wsrep_replaying--; |
1659 | mysql_mutex_unlock(&LOCK_wsrep_replaying); |
1660 | |
1661 | ret = wsrep->resume(wsrep); |
1662 | if (ret != WSREP_OK) |
1663 | { |
1664 | WSREP_WARN("resume failed %d for schema: %s, query: %s" , ret, |
1665 | thd->get_db(), thd->query()); |
1666 | } |
1667 | |
1668 | ret = wsrep->resync(wsrep); |
1669 | if (ret != WSREP_OK) |
1670 | { |
1671 | WSREP_WARN("resync failed %d for schema: %s, query: %s" , ret, |
1672 | thd->get_db(), thd->query()); |
1673 | return; |
1674 | } |
1675 | |
1676 | thd->variables.wsrep_on = 1; |
1677 | } |
1678 | |
1679 | int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, |
1680 | const TABLE_LIST* table_list) |
1681 | { |
1682 | int ret= 0; |
1683 | |
1684 | /* |
1685 | No isolation for applier or replaying threads. |
1686 | */ |
1687 | if (thd->wsrep_exec_mode == REPL_RECV) |
1688 | return 0; |
1689 | |
1690 | mysql_mutex_lock(&thd->LOCK_thd_data); |
1691 | |
1692 | if (thd->wsrep_conflict_state == MUST_ABORT) |
1693 | { |
1694 | WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict" , |
1695 | (longlong) thd->thread_id, thd->get_db(), thd->query()); |
1696 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
1697 | return WSREP_TRX_FAIL; |
1698 | } |
1699 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
1700 | |
1701 | DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); |
1702 | DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); |
1703 | |
1704 | if (thd->global_read_lock.can_acquire_protection()) |
1705 | { |
1706 | WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld" , |
1707 | thd->query(), (longlong) thd->thread_id); |
1708 | return -1; |
1709 | } |
1710 | |
1711 | if (wsrep_debug && thd->mdl_context.has_locks()) |
1712 | { |
1713 | WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld" , |
1714 | thd->query(), (longlong) thd->thread_id); |
1715 | } |
1716 | |
1717 | /* |
1718 | It makes sense to set auto_increment_* to defaults in TOI operations. |
1719 | Must be done before wsrep_TOI_begin() since Query_log_event encapsulating |
1720 | TOI statement and auto inc variables for wsrep replication is constructed |
1721 | there. Variables are reset back in THD::reset_for_next_command() before |
1722 | processing of next command. |
1723 | */ |
1724 | if (wsrep_auto_increment_control) |
1725 | { |
1726 | thd->variables.auto_increment_offset = 1; |
1727 | thd->variables.auto_increment_increment = 1; |
1728 | } |
1729 | |
1730 | if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) |
1731 | { |
1732 | switch (thd->variables.wsrep_OSU_method) { |
1733 | case WSREP_OSU_TOI: |
1734 | ret = wsrep_TOI_begin(thd, db_, table_, table_list); |
1735 | break; |
1736 | case WSREP_OSU_RSU: |
1737 | ret = wsrep_RSU_begin(thd, db_, table_); |
1738 | break; |
1739 | default: |
1740 | WSREP_ERROR("Unsupported OSU method: %lu" , |
1741 | thd->variables.wsrep_OSU_method); |
1742 | ret= -1; |
1743 | break; |
1744 | } |
1745 | switch (ret) { |
1746 | case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break; |
1747 | case 1: |
1748 | /* TOI replication skipped, treat as success */ |
1749 | ret = 0; |
1750 | break; |
1751 | case -1: |
1752 | /* TOI replication failed, treat as error */ |
1753 | break; |
1754 | } |
1755 | } |
1756 | return ret; |
1757 | } |
1758 | |
1759 | void wsrep_to_isolation_end(THD *thd) |
1760 | { |
1761 | if (thd->wsrep_exec_mode == TOTAL_ORDER) |
1762 | { |
1763 | switch(thd->variables.wsrep_OSU_method) |
1764 | { |
1765 | case WSREP_OSU_TOI: wsrep_TOI_end(thd); break; |
1766 | case WSREP_OSU_RSU: wsrep_RSU_end(thd); break; |
1767 | default: |
1768 | WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu" , |
1769 | thd->variables.wsrep_OSU_method); |
1770 | break; |
1771 | } |
1772 | wsrep_cleanup_transaction(thd); |
1773 | } |
1774 | } |
1775 | |
1776 | #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \ |
1777 | WSREP_##severity( \ |
1778 | "%s\n" \ |
1779 | "schema: %.*s\n" \ |
1780 | "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ |
1781 | "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ |
1782 | msg, schema_len, schema, \ |
1783 | (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ |
1784 | req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ |
1785 | req->get_command(), req->lex->sql_command, req->query(), \ |
1786 | (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ |
1787 | gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ |
1788 | gra->get_command(), gra->lex->sql_command, gra->query()); |
1789 | |
1790 | /** |
1791 | Check if request for the metadata lock should be granted to the requester. |
1792 | |
1793 | @param requestor_ctx The MDL context of the requestor |
1794 | @param ticket MDL ticket for the requested lock |
1795 | |
1796 | @retval TRUE Lock request can be granted |
1797 | @retval FALSE Lock request cannot be granted |
1798 | */ |
1799 | |
1800 | bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, |
1801 | MDL_ticket *ticket, |
1802 | const MDL_key *key) |
1803 | { |
1804 | /* Fallback to the non-wsrep behaviour */ |
1805 | if (!WSREP_ON) return FALSE; |
1806 | |
1807 | THD *request_thd= requestor_ctx->get_thd(); |
1808 | THD *granted_thd= ticket->get_ctx()->get_thd(); |
1809 | bool ret= false; |
1810 | |
1811 | const char* schema= key->db_name(); |
1812 | int schema_len= key->db_name_length(); |
1813 | |
1814 | mysql_mutex_lock(&request_thd->LOCK_thd_data); |
1815 | |
1816 | /* |
1817 | We consider granting MDL exceptions only for appliers (BF THD) and ones |
1818 | executing under TOI mode. |
1819 | |
1820 | Rules: |
1821 | 1. If granted/owner THD is also an applier (BF THD) or one executing |
1822 | under TOI mode, then we grant the requested lock to the requester |
1823 | THD. |
1824 | @return true |
1825 | |
1826 | 2. If granted/owner THD is executing a FLUSH command or already has an |
1827 | explicit lock, then do not grant the requested lock to the requester |
1828 | THD and it has to wait. |
1829 | @return false |
1830 | |
1831 | 3. In all other cases the granted/owner THD is aborted and the requested |
1832 | lock is not granted to the requester THD, thus it has to wait. |
1833 | @return false |
1834 | */ |
1835 | if (request_thd->wsrep_exec_mode == TOTAL_ORDER || |
1836 | request_thd->wsrep_exec_mode == REPL_RECV) |
1837 | { |
1838 | mysql_mutex_unlock(&request_thd->LOCK_thd_data); |
1839 | WSREP_MDL_LOG(DEBUG, "MDL conflict " , schema, schema_len, |
1840 | request_thd, granted_thd); |
1841 | ticket->wsrep_report(wsrep_debug); |
1842 | |
1843 | mysql_mutex_lock(&granted_thd->LOCK_thd_data); |
1844 | if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || |
1845 | granted_thd->wsrep_exec_mode == REPL_RECV) |
1846 | { |
1847 | WSREP_MDL_LOG(INFO, "MDL BF-BF conflict" , schema, schema_len, |
1848 | request_thd, granted_thd); |
1849 | ticket->wsrep_report(true); |
1850 | mysql_mutex_unlock(&granted_thd->LOCK_thd_data); |
1851 | ret= true; |
1852 | } |
1853 | else if (granted_thd->lex->sql_command == SQLCOM_FLUSH || |
1854 | granted_thd->mdl_context.has_explicit_locks()) |
1855 | { |
1856 | WSREP_DEBUG("BF thread waiting for FLUSH" ); |
1857 | ticket->wsrep_report(wsrep_debug); |
1858 | mysql_mutex_unlock(&granted_thd->LOCK_thd_data); |
1859 | ret= false; |
1860 | } |
1861 | else |
1862 | { |
1863 | /* Print some debug information. */ |
1864 | if (wsrep_debug) |
1865 | { |
1866 | if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE || |
1867 | request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE) |
1868 | { |
1869 | WSREP_DEBUG("DROP caused BF abort" ); |
1870 | } |
1871 | else if (granted_thd->wsrep_query_state == QUERY_COMMITTING) |
1872 | { |
1873 | WSREP_DEBUG("MDL granted, but committing thd abort scheduled" ); |
1874 | } |
1875 | else |
1876 | { |
1877 | WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort" , schema, schema_len, |
1878 | request_thd, granted_thd); |
1879 | } |
1880 | ticket->wsrep_report(true); |
1881 | } |
1882 | |
1883 | mysql_mutex_unlock(&granted_thd->LOCK_thd_data); |
1884 | wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1); |
1885 | ret= false; |
1886 | } |
1887 | } |
1888 | else |
1889 | { |
1890 | mysql_mutex_unlock(&request_thd->LOCK_thd_data); |
1891 | } |
1892 | |
1893 | return ret; |
1894 | } |
1895 | |
1896 | |
1897 | pthread_handler_t start_wsrep_THD(void *arg) |
1898 | { |
1899 | THD *thd; |
1900 | wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; |
1901 | |
1902 | if (my_thread_init() || (!(thd= new THD(next_thread_id(), true)))) |
1903 | { |
1904 | goto error; |
1905 | } |
1906 | |
1907 | mysql_mutex_lock(&LOCK_thread_count); |
1908 | |
1909 | if (wsrep_gtid_mode) |
1910 | { |
1911 | /* Adjust domain_id. */ |
1912 | thd->variables.gtid_domain_id= wsrep_gtid_domain_id; |
1913 | } |
1914 | |
1915 | thd->real_id=pthread_self(); // Keep purify happy |
1916 | thread_created++; |
1917 | threads.append(thd); |
1918 | |
1919 | my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); |
1920 | |
1921 | DBUG_PRINT("wsrep" ,(("creating thread %lld" ), (long long)thd->thread_id)); |
1922 | thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); |
1923 | (void) mysql_mutex_unlock(&LOCK_thread_count); |
1924 | |
1925 | /* from bootstrap()... */ |
1926 | thd->bootstrap=1; |
1927 | thd->max_client_packet_length= thd->net.max_packet; |
1928 | thd->security_ctx->master_access= ~(ulong)0; |
1929 | |
1930 | /* from handle_one_connection... */ |
1931 | pthread_detach_this_thread(); |
1932 | |
1933 | mysql_thread_set_psi_id(thd->thread_id); |
1934 | thd->thr_create_utime= microsecond_interval_timer(); |
1935 | if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) |
1936 | { |
1937 | close_connection(thd, ER_OUT_OF_RESOURCES); |
1938 | statistic_increment(aborted_connects,&LOCK_status); |
1939 | MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); |
1940 | goto error; |
1941 | } |
1942 | |
1943 | // </5.1.17> |
1944 | /* |
1945 | handle_one_connection() is normally the only way a thread would |
1946 | start and would always be on the very high end of the stack , |
1947 | therefore, the thread stack always starts at the address of the |
1948 | first local variable of handle_one_connection, which is thd. We |
1949 | need to know the start of the stack so that we could check for |
1950 | stack overruns. |
1951 | */ |
1952 | DBUG_PRINT("wsrep" , ("handle_one_connection called by thread %lld\n" , |
1953 | (long long)thd->thread_id)); |
1954 | /* now that we've called my_thread_init(), it is safe to call DBUG_* */ |
1955 | |
1956 | thd->thread_stack= (char*) &thd; |
1957 | if (thd->store_globals()) |
1958 | { |
1959 | close_connection(thd, ER_OUT_OF_RESOURCES); |
1960 | statistic_increment(aborted_connects,&LOCK_status); |
1961 | MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); |
1962 | goto error; |
1963 | } |
1964 | |
1965 | thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; |
1966 | thd->security_ctx->skip_grants(); |
1967 | |
1968 | /* handle_one_connection() again... */ |
1969 | //thd->version= refresh_version; |
1970 | thd->proc_info= 0; |
1971 | thd->set_command(COM_SLEEP); |
1972 | thd->init_for_queries(); |
1973 | |
1974 | mysql_mutex_lock(&LOCK_thread_count); |
1975 | wsrep_running_threads++; |
1976 | mysql_cond_broadcast(&COND_thread_count); |
1977 | mysql_mutex_unlock(&LOCK_thread_count); |
1978 | |
1979 | processor(thd); |
1980 | |
1981 | close_connection(thd, 0); |
1982 | |
1983 | mysql_mutex_lock(&LOCK_thread_count); |
1984 | wsrep_running_threads--; |
1985 | WSREP_DEBUG("wsrep running threads now: %lu" , wsrep_running_threads); |
1986 | mysql_cond_broadcast(&COND_thread_count); |
1987 | mysql_mutex_unlock(&LOCK_thread_count); |
1988 | |
1989 | // Note: We can't call THD destructor without crashing |
1990 | // if plugins have not been initialized. However, in most of the |
1991 | // cases this means that pre SE initialization SST failed and |
1992 | // we are going to exit anyway. |
1993 | if (plugins_are_initialized) |
1994 | { |
1995 | net_end(&thd->net); |
1996 | MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); |
1997 | } |
1998 | else |
1999 | { |
2000 | // TODO: lightweight cleanup to get rid of: |
2001 | // 'Error in my_thread_global_end(): 2 threads didn't exit' |
2002 | // at server shutdown |
2003 | } |
2004 | |
2005 | unlink_not_visible_thd(thd); |
2006 | delete thd; |
2007 | my_thread_end(); |
2008 | return(NULL); |
2009 | |
2010 | error: |
2011 | WSREP_ERROR("Failed to create/initialize system thread" ); |
2012 | |
2013 | /* Abort if its the first applier/rollbacker thread. */ |
2014 | if (!mysqld_server_initialized) |
2015 | unireg_abort(1); |
2016 | else |
2017 | return NULL; |
2018 | } |
2019 | |
2020 | |
2021 | /**/ |
2022 | static bool abort_replicated(THD *thd) |
2023 | { |
2024 | bool ret_code= false; |
2025 | if (thd->wsrep_query_state== QUERY_COMMITTING) |
2026 | { |
2027 | WSREP_DEBUG("aborting replicated trx: %llu" , (ulonglong)(thd->real_id)); |
2028 | |
2029 | (void)wsrep_abort_thd(thd, thd, TRUE); |
2030 | ret_code= true; |
2031 | } |
2032 | return ret_code; |
2033 | } |
2034 | |
2035 | |
2036 | /**/ |
2037 | static inline bool is_client_connection(THD *thd) |
2038 | { |
2039 | return (thd->wsrep_client_thread && thd->variables.wsrep_on); |
2040 | } |
2041 | |
2042 | |
2043 | static inline bool is_replaying_connection(THD *thd) |
2044 | { |
2045 | bool ret; |
2046 | |
2047 | mysql_mutex_lock(&thd->LOCK_thd_data); |
2048 | ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false; |
2049 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
2050 | |
2051 | return ret; |
2052 | } |
2053 | |
2054 | |
2055 | static inline bool is_committing_connection(THD *thd) |
2056 | { |
2057 | bool ret; |
2058 | |
2059 | mysql_mutex_lock(&thd->LOCK_thd_data); |
2060 | ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false; |
2061 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
2062 | |
2063 | return ret; |
2064 | } |
2065 | |
2066 | |
2067 | static bool have_client_connections() |
2068 | { |
2069 | THD *tmp; |
2070 | |
2071 | I_List_iterator<THD> it(threads); |
2072 | while ((tmp=it++)) |
2073 | { |
2074 | DBUG_PRINT("quit" ,("Informing thread %lld that it's time to die" , |
2075 | (longlong) tmp->thread_id)); |
2076 | if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) |
2077 | { |
2078 | (void)abort_replicated(tmp); |
2079 | return true; |
2080 | } |
2081 | } |
2082 | return false; |
2083 | } |
2084 | |
2085 | static void wsrep_close_thread(THD *thd) |
2086 | { |
2087 | thd->set_killed(KILL_CONNECTION); |
2088 | MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); |
2089 | if (thd->mysys_var) |
2090 | { |
2091 | thd->mysys_var->abort=1; |
2092 | mysql_mutex_lock(&thd->mysys_var->mutex); |
2093 | if (thd->mysys_var->current_cond) |
2094 | { |
2095 | mysql_mutex_lock(thd->mysys_var->current_mutex); |
2096 | mysql_cond_broadcast(thd->mysys_var->current_cond); |
2097 | mysql_mutex_unlock(thd->mysys_var->current_mutex); |
2098 | } |
2099 | mysql_mutex_unlock(&thd->mysys_var->mutex); |
2100 | } |
2101 | } |
2102 | |
2103 | |
2104 | static my_bool have_committing_connections() |
2105 | { |
2106 | THD *tmp; |
2107 | mysql_mutex_lock(&LOCK_thread_count); // For unlink from list |
2108 | |
2109 | I_List_iterator<THD> it(threads); |
2110 | while ((tmp=it++)) |
2111 | { |
2112 | if (!is_client_connection(tmp)) |
2113 | continue; |
2114 | |
2115 | if (is_committing_connection(tmp)) |
2116 | { |
2117 | return TRUE; |
2118 | } |
2119 | } |
2120 | mysql_mutex_unlock(&LOCK_thread_count); |
2121 | return FALSE; |
2122 | } |
2123 | |
2124 | |
2125 | int wsrep_wait_committing_connections_close(int wait_time) |
2126 | { |
2127 | int sleep_time= 100; |
2128 | |
2129 | while (have_committing_connections() && wait_time > 0) |
2130 | { |
2131 | WSREP_DEBUG("wait for committing transaction to close: %d" , wait_time); |
2132 | my_sleep(sleep_time); |
2133 | wait_time -= sleep_time; |
2134 | } |
2135 | if (have_committing_connections()) |
2136 | { |
2137 | return 1; |
2138 | } |
2139 | return 0; |
2140 | } |
2141 | |
2142 | |
2143 | void wsrep_close_client_connections(my_bool wait_to_end) |
2144 | { |
2145 | /* |
2146 | First signal all threads that it's time to die |
2147 | */ |
2148 | |
2149 | THD *tmp; |
2150 | mysql_mutex_lock(&LOCK_thread_count); // For unlink from list |
2151 | |
2152 | bool kill_cached_threads_saved= kill_cached_threads; |
2153 | kill_cached_threads= true; // prevent future threads caching |
2154 | mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die |
2155 | |
2156 | I_List_iterator<THD> it(threads); |
2157 | while ((tmp=it++)) |
2158 | { |
2159 | DBUG_PRINT("quit" ,("Informing thread %lld that it's time to die" , |
2160 | (longlong) tmp->thread_id)); |
2161 | /* We skip slave threads & scheduler on this first loop through. */ |
2162 | if (!is_client_connection(tmp)) |
2163 | continue; |
2164 | |
2165 | if (is_replaying_connection(tmp)) |
2166 | { |
2167 | tmp->set_killed(KILL_CONNECTION); |
2168 | continue; |
2169 | } |
2170 | |
2171 | /* replicated transactions must be skipped */ |
2172 | if (abort_replicated(tmp)) |
2173 | continue; |
2174 | |
2175 | WSREP_DEBUG("closing connection %lld" , (longlong) tmp->thread_id); |
2176 | wsrep_close_thread(tmp); |
2177 | } |
2178 | mysql_mutex_unlock(&LOCK_thread_count); |
2179 | |
2180 | if (thread_count) |
2181 | sleep(2); // Give threads time to die |
2182 | |
2183 | mysql_mutex_lock(&LOCK_thread_count); |
2184 | /* |
2185 | Force remaining threads to die by closing the connection to the client |
2186 | */ |
2187 | |
2188 | I_List_iterator<THD> it2(threads); |
2189 | while ((tmp=it2++)) |
2190 | { |
2191 | #ifndef __bsdi__ // Bug in BSDI kernel |
2192 | if (is_client_connection(tmp) && |
2193 | !abort_replicated(tmp) && |
2194 | !is_replaying_connection(tmp)) |
2195 | { |
2196 | WSREP_INFO("killing local connection: %lld" , (longlong) tmp->thread_id); |
2197 | close_connection(tmp,0); |
2198 | } |
2199 | #endif |
2200 | } |
2201 | |
2202 | DBUG_PRINT("quit" ,("Waiting for threads to die (count=%u)" ,thread_count)); |
2203 | WSREP_DEBUG("waiting for client connections to close: %u" , thread_count); |
2204 | |
2205 | while (wait_to_end && have_client_connections()) |
2206 | { |
2207 | mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); |
2208 | DBUG_PRINT("quit" ,("One thread died (count=%u)" , thread_count)); |
2209 | } |
2210 | |
2211 | kill_cached_threads= kill_cached_threads_saved; |
2212 | |
2213 | mysql_mutex_unlock(&LOCK_thread_count); |
2214 | |
2215 | /* All client connection threads have now been aborted */ |
2216 | } |
2217 | |
2218 | |
2219 | void wsrep_close_applier(THD *thd) |
2220 | { |
2221 | WSREP_DEBUG("closing applier %lld" , (longlong) thd->thread_id); |
2222 | wsrep_close_thread(thd); |
2223 | } |
2224 | |
2225 | |
2226 | void wsrep_close_threads(THD *thd) |
2227 | { |
2228 | THD *tmp; |
2229 | mysql_mutex_lock(&LOCK_thread_count); // For unlink from list |
2230 | |
2231 | I_List_iterator<THD> it(threads); |
2232 | while ((tmp=it++)) |
2233 | { |
2234 | DBUG_PRINT("quit" ,("Informing thread %lld that it's time to die" , |
2235 | (longlong) tmp->thread_id)); |
2236 | /* We skip slave threads & scheduler on this first loop through. */ |
2237 | if (tmp->wsrep_applier && tmp != thd) |
2238 | { |
2239 | WSREP_DEBUG("closing wsrep thread %lld" , (longlong) tmp->thread_id); |
2240 | wsrep_close_thread (tmp); |
2241 | } |
2242 | } |
2243 | |
2244 | mysql_mutex_unlock(&LOCK_thread_count); |
2245 | } |
2246 | |
2247 | void wsrep_wait_appliers_close(THD *thd) |
2248 | { |
2249 | /* Wait for wsrep appliers to gracefully exit */ |
2250 | mysql_mutex_lock(&LOCK_thread_count); |
2251 | while (wsrep_running_threads > 1) |
2252 | // 1 is for rollbacker thread which needs to be killed explicitly. |
2253 | // This gotta be fixed in a more elegant manner if we gonna have arbitrary |
2254 | // number of non-applier wsrep threads. |
2255 | { |
2256 | if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) |
2257 | { |
2258 | mysql_mutex_unlock(&LOCK_thread_count); |
2259 | my_sleep(100); |
2260 | mysql_mutex_lock(&LOCK_thread_count); |
2261 | } |
2262 | else |
2263 | mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); |
2264 | DBUG_PRINT("quit" ,("One applier died (count=%u)" ,thread_count)); |
2265 | } |
2266 | mysql_mutex_unlock(&LOCK_thread_count); |
2267 | /* Now kill remaining wsrep threads: rollbacker */ |
2268 | wsrep_close_threads (thd); |
2269 | /* and wait for them to die */ |
2270 | mysql_mutex_lock(&LOCK_thread_count); |
2271 | while (wsrep_running_threads > 0) |
2272 | { |
2273 | if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) |
2274 | { |
2275 | mysql_mutex_unlock(&LOCK_thread_count); |
2276 | my_sleep(100); |
2277 | mysql_mutex_lock(&LOCK_thread_count); |
2278 | } |
2279 | else |
2280 | mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); |
2281 | DBUG_PRINT("quit" ,("One thread died (count=%u)" ,thread_count)); |
2282 | } |
2283 | mysql_mutex_unlock(&LOCK_thread_count); |
2284 | |
2285 | /* All wsrep applier threads have now been aborted. However, if this thread |
2286 | is also applier, we are still running... |
2287 | */ |
2288 | } |
2289 | |
2290 | |
2291 | void wsrep_kill_mysql(THD *thd) |
2292 | { |
2293 | if (mysqld_server_started) |
2294 | { |
2295 | if (!shutdown_in_progress) |
2296 | { |
2297 | WSREP_INFO("starting shutdown" ); |
2298 | kill_mysql(); |
2299 | } |
2300 | } |
2301 | else |
2302 | { |
2303 | unireg_abort(1); |
2304 | } |
2305 | } |
2306 | |
2307 | |
2308 | static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) |
2309 | { |
2310 | String log_query; |
2311 | sp_head *sp = thd->lex->sphead; |
2312 | sql_mode_t saved_mode= thd->variables.sql_mode; |
2313 | String retstr(64); |
2314 | LEX_CSTRING returns= empty_clex_str; |
2315 | retstr.set_charset(system_charset_info); |
2316 | |
2317 | log_query.set_charset(system_charset_info); |
2318 | |
2319 | if (sp->m_handler->type() == TYPE_ENUM_FUNCTION) |
2320 | { |
2321 | sp_returns_type(thd, retstr, sp); |
2322 | returns= retstr.lex_cstring(); |
2323 | } |
2324 | if (sp->m_handler-> |
2325 | show_create_sp(thd, &log_query, |
2326 | sp->m_explicit_name ? sp->m_db : null_clex_str, |
2327 | sp->m_name, sp->m_params, returns, |
2328 | sp->m_body, sp->chistics(), |
2329 | thd->lex->definer[0], |
2330 | thd->lex->create_info, |
2331 | saved_mode)) |
2332 | { |
2333 | WSREP_WARN("SP create string failed: schema: %s, query: %s" , |
2334 | thd->get_db(), thd->query()); |
2335 | return 1; |
2336 | } |
2337 | |
2338 | return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); |
2339 | } |
2340 | |
2341 | |
2342 | extern int wsrep_on(THD *thd) |
2343 | { |
2344 | return (int)(WSREP(thd)); |
2345 | } |
2346 | |
2347 | |
2348 | extern "C" bool wsrep_thd_is_wsrep_on(THD *thd) |
2349 | { |
2350 | return thd->variables.wsrep_on; |
2351 | } |
2352 | |
2353 | |
2354 | bool wsrep_consistency_check(THD *thd) |
2355 | { |
2356 | return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING; |
2357 | } |
2358 | |
2359 | |
2360 | extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode) |
2361 | { |
2362 | thd->wsrep_exec_mode= mode; |
2363 | } |
2364 | |
2365 | |
2366 | extern "C" void wsrep_thd_set_query_state( |
2367 | THD *thd, enum wsrep_query_state state) |
2368 | { |
2369 | thd->wsrep_query_state= state; |
2370 | } |
2371 | |
2372 | |
2373 | void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state) |
2374 | { |
2375 | if (WSREP(thd)) thd->wsrep_conflict_state= state; |
2376 | } |
2377 | |
2378 | |
2379 | enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) |
2380 | { |
2381 | return thd->wsrep_exec_mode; |
2382 | } |
2383 | |
2384 | |
2385 | const char *wsrep_thd_exec_mode_str(THD *thd) |
2386 | { |
2387 | return |
2388 | (!thd) ? "void" : |
2389 | (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : |
2390 | (thd->wsrep_exec_mode == REPL_RECV) ? "applier" : |
2391 | (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : |
2392 | (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void" ; |
2393 | } |
2394 | |
2395 | |
2396 | enum wsrep_query_state wsrep_thd_query_state(THD *thd) |
2397 | { |
2398 | return thd->wsrep_query_state; |
2399 | } |
2400 | |
2401 | |
2402 | const char *wsrep_thd_query_state_str(THD *thd) |
2403 | { |
2404 | return |
2405 | (!thd) ? "void" : |
2406 | (thd->wsrep_query_state == QUERY_IDLE) ? "idle" : |
2407 | (thd->wsrep_query_state == QUERY_EXEC) ? "executing" : |
2408 | (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" : |
2409 | (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" : |
2410 | (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void" ; |
2411 | } |
2412 | |
2413 | |
2414 | enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd) |
2415 | { |
2416 | return thd->wsrep_conflict_state; |
2417 | } |
2418 | |
2419 | |
2420 | const char *wsrep_thd_conflict_state_str(THD *thd) |
2421 | { |
2422 | return |
2423 | (!thd) ? "void" : |
2424 | (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" : |
2425 | (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" : |
2426 | (thd->wsrep_conflict_state == ABORTING) ? "aborting" : |
2427 | (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" : |
2428 | (thd->wsrep_conflict_state == REPLAYING) ? "replaying" : |
2429 | (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" : |
2430 | (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void" ; |
2431 | } |
2432 | |
2433 | |
2434 | wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) |
2435 | { |
2436 | return &thd->wsrep_ws_handle; |
2437 | } |
2438 | |
2439 | |
2440 | void wsrep_thd_LOCK(THD *thd) |
2441 | { |
2442 | mysql_mutex_lock(&thd->LOCK_thd_data); |
2443 | } |
2444 | |
2445 | |
2446 | void wsrep_thd_UNLOCK(THD *thd) |
2447 | { |
2448 | mysql_mutex_unlock(&thd->LOCK_thd_data); |
2449 | } |
2450 | |
2451 | |
2452 | extern "C" time_t wsrep_thd_query_start(THD *thd) |
2453 | { |
2454 | return thd->query_start(); |
2455 | } |
2456 | |
2457 | |
2458 | extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd) |
2459 | { |
2460 | return thd->wsrep_rand; |
2461 | } |
2462 | |
2463 | longlong wsrep_thd_trx_seqno(THD *thd) |
2464 | { |
2465 | return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED; |
2466 | } |
2467 | |
2468 | |
2469 | extern "C" query_id_t wsrep_thd_query_id(THD *thd) |
2470 | { |
2471 | return thd->query_id; |
2472 | } |
2473 | |
2474 | |
2475 | char *wsrep_thd_query(THD *thd) |
2476 | { |
2477 | return (thd) ? thd->query() : NULL; |
2478 | } |
2479 | |
2480 | |
2481 | extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) |
2482 | { |
2483 | return thd->wsrep_last_query_id; |
2484 | } |
2485 | |
2486 | |
2487 | extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) |
2488 | { |
2489 | thd->wsrep_last_query_id= id; |
2490 | } |
2491 | |
2492 | |
2493 | extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) |
2494 | { |
2495 | if (signal) |
2496 | { |
2497 | thd->awake(KILL_QUERY); |
2498 | } |
2499 | else |
2500 | { |
2501 | mysql_mutex_lock(&LOCK_wsrep_replaying); |
2502 | mysql_cond_broadcast(&COND_wsrep_replaying); |
2503 | mysql_mutex_unlock(&LOCK_wsrep_replaying); |
2504 | } |
2505 | } |
2506 | |
2507 | |
2508 | int wsrep_thd_retry_counter(THD *thd) |
2509 | { |
2510 | return(thd->wsrep_retry_counter); |
2511 | } |
2512 | |
2513 | |
2514 | extern "C" bool wsrep_thd_ignore_table(THD *thd) |
2515 | { |
2516 | return thd->wsrep_ignore_table; |
2517 | } |
2518 | |
2519 | |
2520 | extern int |
2521 | wsrep_trx_order_before(THD *thd1, THD *thd2) |
2522 | { |
2523 | if (wsrep_thd_trx_seqno(thd1) < wsrep_thd_trx_seqno(thd2)) { |
2524 | WSREP_DEBUG("BF conflict, order: %lld %lld\n" , |
2525 | (long long)wsrep_thd_trx_seqno(thd1), |
2526 | (long long)wsrep_thd_trx_seqno(thd2)); |
2527 | return 1; |
2528 | } |
2529 | WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n" , |
2530 | (long long)wsrep_thd_trx_seqno(thd1), |
2531 | (long long)wsrep_thd_trx_seqno(thd2)); |
2532 | return 0; |
2533 | } |
2534 | |
2535 | |
2536 | int wsrep_trx_is_aborting(THD *thd_ptr) |
2537 | { |
2538 | if (thd_ptr) { |
2539 | if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) || |
2540 | (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) { |
2541 | return 1; |
2542 | } |
2543 | } |
2544 | return 0; |
2545 | } |
2546 | |
2547 | |
2548 | void wsrep_copy_query(THD *thd) |
2549 | { |
2550 | thd->wsrep_retry_command = thd->get_command(); |
2551 | thd->wsrep_retry_query_len = thd->query_length(); |
2552 | if (thd->wsrep_retry_query) { |
2553 | my_free(thd->wsrep_retry_query); |
2554 | } |
2555 | thd->wsrep_retry_query = (char *)my_malloc( |
2556 | thd->wsrep_retry_query_len + 1, MYF(0)); |
2557 | strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len); |
2558 | thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0'; |
2559 | } |
2560 | |
2561 | |
2562 | bool wsrep_is_show_query(enum enum_sql_command command) |
2563 | { |
2564 | DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); |
2565 | return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; |
2566 | } |
2567 | |
2568 | bool wsrep_create_like_table(THD* thd, TABLE_LIST* table, |
2569 | TABLE_LIST* src_table, |
2570 | HA_CREATE_INFO *create_info) |
2571 | { |
2572 | if (create_info->tmp_table()) |
2573 | { |
2574 | /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */ |
2575 | WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s" , |
2576 | thd->query()); |
2577 | } |
2578 | else if (!(thd->find_temporary_table(src_table))) |
2579 | { |
2580 | /* this is straight CREATE TABLE LIKE... with no tmp tables */ |
2581 | WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL); |
2582 | } |
2583 | else |
2584 | { |
2585 | /* here we have CREATE TABLE LIKE <temporary table> |
2586 | the temporary table definition will be needed in slaves to |
2587 | enable the create to succeed |
2588 | */ |
2589 | TABLE_LIST tbl; |
2590 | bzero((void*) &tbl, sizeof(tbl)); |
2591 | tbl.db= src_table->db; |
2592 | tbl.table_name= tbl.alias= src_table->table_name; |
2593 | tbl.table= src_table->table; |
2594 | char buf[2048]; |
2595 | String query(buf, sizeof(buf), system_charset_info); |
2596 | query.length(0); // Have to zero it since constructor doesn't |
2597 | |
2598 | (void) show_create_table(thd, &tbl, &query, NULL, WITH_DB_NAME); |
2599 | WSREP_DEBUG("TMP TABLE: %s" , query.ptr()); |
2600 | |
2601 | thd->wsrep_TOI_pre_query= query.ptr(); |
2602 | thd->wsrep_TOI_pre_query_len= query.length(); |
2603 | |
2604 | WSREP_TO_ISOLATION_BEGIN(table->db.str, table->table_name.str, NULL); |
2605 | |
2606 | thd->wsrep_TOI_pre_query= NULL; |
2607 | thd->wsrep_TOI_pre_query_len= 0; |
2608 | } |
2609 | |
2610 | return(false); |
2611 | |
2612 | error: |
2613 | thd->wsrep_TOI_pre_query= NULL; |
2614 | return (true); |
2615 | } |
2616 | |
2617 | |
2618 | static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) |
2619 | { |
2620 | LEX *lex= thd->lex; |
2621 | String stmt_query; |
2622 | |
2623 | LEX_CSTRING definer_user; |
2624 | LEX_CSTRING definer_host; |
2625 | |
2626 | if (!lex->definer) |
2627 | { |
2628 | if (!thd->slave_thread) |
2629 | { |
2630 | if (!(lex->definer= create_default_definer(thd, false))) |
2631 | return 1; |
2632 | } |
2633 | } |
2634 | |
2635 | if (lex->definer) |
2636 | { |
2637 | /* SUID trigger. */ |
2638 | LEX_USER *d= get_current_user(thd, lex->definer); |
2639 | |
2640 | if (!d) |
2641 | return 1; |
2642 | |
2643 | definer_user= d->user; |
2644 | definer_host= d->host; |
2645 | } |
2646 | else |
2647 | { |
2648 | /* non-SUID trigger. */ |
2649 | |
2650 | definer_user.str= 0; |
2651 | definer_user.length= 0; |
2652 | |
2653 | definer_host.str= 0; |
2654 | definer_host.length= 0; |
2655 | } |
2656 | |
2657 | stmt_query.append(STRING_WITH_LEN("CREATE " )); |
2658 | |
2659 | append_definer(thd, &stmt_query, &definer_user, &definer_host); |
2660 | |
2661 | LEX_CSTRING stmt_definition; |
2662 | stmt_definition.str= (char*) thd->lex->stmt_definition_begin; |
2663 | stmt_definition.length= thd->lex->stmt_definition_end |
2664 | - thd->lex->stmt_definition_begin; |
2665 | trim_whitespace(thd->charset(), &stmt_definition); |
2666 | |
2667 | stmt_query.append(stmt_definition.str, stmt_definition.length); |
2668 | |
2669 | return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), |
2670 | buf, buf_len); |
2671 | } |
2672 | |
2673 | /***** callbacks for wsrep service ************/ |
2674 | |
2675 | my_bool get_wsrep_debug() |
2676 | { |
2677 | return wsrep_debug; |
2678 | } |
2679 | |
2680 | my_bool get_wsrep_load_data_splitting() |
2681 | { |
2682 | return wsrep_load_data_splitting; |
2683 | } |
2684 | |
2685 | long get_wsrep_protocol_version() |
2686 | { |
2687 | return wsrep_protocol_version; |
2688 | } |
2689 | |
2690 | my_bool get_wsrep_drupal_282555_workaround() |
2691 | { |
2692 | return wsrep_drupal_282555_workaround; |
2693 | } |
2694 | |
2695 | my_bool get_wsrep_recovery() |
2696 | { |
2697 | return wsrep_recovery; |
2698 | } |
2699 | |
2700 | my_bool get_wsrep_log_conflicts() |
2701 | { |
2702 | return wsrep_log_conflicts; |
2703 | } |
2704 | |
2705 | wsrep_t *get_wsrep() |
2706 | { |
2707 | return wsrep; |
2708 | } |
2709 | |
2710 | my_bool get_wsrep_certify_nonPK() |
2711 | { |
2712 | return wsrep_certify_nonPK; |
2713 | } |
2714 | |
2715 | void wsrep_lock_rollback() |
2716 | { |
2717 | mysql_mutex_lock(&LOCK_wsrep_rollback); |
2718 | } |
2719 | |
2720 | void wsrep_unlock_rollback() |
2721 | { |
2722 | mysql_cond_signal(&COND_wsrep_rollback); |
2723 | mysql_mutex_unlock(&LOCK_wsrep_rollback); |
2724 | } |
2725 | |
2726 | my_bool wsrep_aborting_thd_contains(THD *thd) |
2727 | { |
2728 | mysql_mutex_assert_owner(&LOCK_wsrep_rollback); |
2729 | wsrep_aborting_thd_t abortees = wsrep_aborting_thd; |
2730 | while (abortees) |
2731 | { |
2732 | if (abortees->aborting_thd == thd) |
2733 | return true; |
2734 | abortees = abortees->next; |
2735 | } |
2736 | return false; |
2737 | } |
2738 | |
2739 | void wsrep_aborting_thd_enqueue(THD *thd) |
2740 | { |
2741 | mysql_mutex_assert_owner(&LOCK_wsrep_rollback); |
2742 | wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t) |
2743 | my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0)); |
2744 | aborting->aborting_thd = thd; |
2745 | aborting->next = wsrep_aborting_thd; |
2746 | wsrep_aborting_thd = aborting; |
2747 | } |
2748 | |
2749 | bool wsrep_node_is_donor() |
2750 | { |
2751 | return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false; |
2752 | } |
2753 | |
2754 | bool wsrep_node_is_synced() |
2755 | { |
2756 | return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false; |
2757 | } |
2758 | |