1/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16#include "mariadb.h"
17#include "wsrep_sst.h"
18#include <mysqld.h>
19#include <m_ctype.h>
20#include <strfunc.h>
21#include <sql_class.h>
22#include <set_var.h>
23#include <sql_acl.h>
24#include <sql_reload.h>
25#include <sql_parse.h>
26#include "wsrep_priv.h"
27#include "wsrep_utils.h"
28#include "wsrep_xid.h"
29#include <cstdio>
30#include <cstdlib>
31
32static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
33 sizeof(WSREP_SST_OPT_CONF) +
34 sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
35 sizeof(WSREP_SST_OPT_CONF_EXTRA)] = {0};
36
37// container for real auth string
38static const char* sst_auth_real = NULL;
39
40bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
41{
42 if ((! var->save_result.string_value.str) ||
43 (var->save_result.string_value.length == 0 ))
44 {
45 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
46 var->save_result.string_value.str ?
47 var->save_result.string_value.str : "NULL");
48 return 1;
49 }
50
51 return 0;
52}
53
54bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
55{
56 return 0;
57}
58
59
60static void make_wsrep_defaults_file()
61{
62 if (!wsrep_defaults_file[0])
63 {
64 char *ptr= wsrep_defaults_file;
65 char *end= ptr + sizeof(wsrep_defaults_file);
66 if (my_defaults_file)
67 ptr= strxnmov(ptr, end - ptr,
68 WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL);
69
70 if (my_defaults_extra_file)
71 ptr= strxnmov(ptr, end - ptr,
72 WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL);
73
74 if (my_defaults_group_suffix)
75 ptr= strxnmov(ptr, end - ptr,
76 WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL);
77 }
78}
79
80
81bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
82{
83 if ((! var->save_result.string_value.str) ||
84 (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety
85 {
86 goto err;
87 }
88
89 return 0;
90
91err:
92 my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
93 var->save_result.string_value.str ?
94 var->save_result.string_value.str : "NULL");
95 return 1;
96}
97
98bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
99 enum_var_type type)
100{
101 return 0;
102}
103
104bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
105{
106 return 0;
107}
108
109static bool sst_auth_real_set (const char* value)
110{
111 const char* v= NULL;
112
113 if (value)
114 {
115 v= my_strdup(value, MYF(0));
116 }
117 else // its NULL
118 {
119 wsrep_sst_auth_free();
120 return 0;
121 }
122
123 if (v)
124 {
125 // set sst_auth_real
126 if (sst_auth_real) { my_free((void *) sst_auth_real); }
127 sst_auth_real = v;
128
129 // mask wsrep_sst_auth
130 if (strlen(sst_auth_real))
131 {
132 if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); }
133 wsrep_sst_auth= my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
134 }
135 return 0;
136 }
137 return 1;
138}
139
140void wsrep_sst_auth_free()
141{
142 if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); }
143 if (sst_auth_real) { my_free((void *) sst_auth_real); }
144 wsrep_sst_auth= NULL;
145 sst_auth_real= NULL;
146}
147
148bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
149{
150 return sst_auth_real_set (wsrep_sst_auth);
151}
152
153void wsrep_sst_auth_init ()
154{
155 sst_auth_real_set(wsrep_sst_auth);
156}
157
158bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
159{
160 return 0;
161}
162
163bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
164{
165 return 0;
166}
167
168bool wsrep_before_SE()
169{
170 return (wsrep_provider != NULL
171 && strcmp (wsrep_provider, WSREP_NONE)
172 && strcmp (wsrep_sst_method, WSREP_SST_SKIP)
173 && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
174}
175
176static bool sst_complete = false;
177static bool sst_needed = false;
178
179void wsrep_sst_grab ()
180{
181 WSREP_INFO("wsrep_sst_grab()");
182 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
183 sst_complete = false;
184 mysql_mutex_unlock (&LOCK_wsrep_sst);
185}
186
187// Wait for end of SST
188bool wsrep_sst_wait ()
189{
190 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
191 while (!sst_complete)
192 {
193 WSREP_INFO("Waiting for SST to complete.");
194 mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
195 }
196
197 if (local_seqno >= 0)
198 {
199 WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
200 }
201 else
202 {
203 WSREP_ERROR("SST failed: %d (%s)",
204 int(-local_seqno), strerror(-local_seqno));
205 }
206
207 mysql_mutex_unlock (&LOCK_wsrep_sst);
208
209 return (local_seqno >= 0);
210}
211
212// Signal end of SST
213void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
214 wsrep_seqno_t sst_seqno,
215 bool needed)
216{
217 if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
218 if (!sst_complete)
219 {
220 sst_complete = true;
221 sst_needed = needed;
222 local_uuid = *sst_uuid;
223 local_seqno = sst_seqno;
224 mysql_cond_signal (&COND_wsrep_sst);
225 }
226 else
227 {
228 /* This can happen when called from wsrep_synced_cb().
229 At the moment there is no way to check there
230 if main thread is still waiting for signal,
231 so wsrep_sst_complete() is called from there
232 each time wsrep_ready changes from FALSE -> TRUE.
233 */
234 WSREP_DEBUG("Nobody is waiting for SST.");
235 }
236 mysql_mutex_unlock (&LOCK_wsrep_sst);
237}
238
239/*
240 If wsrep provider is loaded, inform that the new state snapshot
241 has been received. Also update the local checkpoint.
242
243 @param wsrep [IN] wsrep handle
244 @param uuid [IN] Initial state UUID
245 @param seqno [IN] Initial state sequence number
246 @param state [IN] Always NULL, also ignored by wsrep provider (?)
247 @param state_len [IN] Always 0, also ignored by wsrep provider (?)
248 @param implicit [IN] Whether invoked implicitly due to SST
249 (true) or explicitly because if change
250 in wsrep_start_position by user (false).
251 @return false Success
252 true Error
253
254*/
255bool wsrep_sst_received (wsrep_t* const wsrep,
256 const wsrep_uuid_t& uuid,
257 const wsrep_seqno_t seqno,
258 const void* const state,
259 const size_t state_len,
260 const bool implicit)
261{
262 /*
263 To keep track of whether the local uuid:seqno should be updated. Also, note
264 that local state (uuid:seqno) is updated/checkpointed only after we get an
265 OK from wsrep provider. By doing so, the values remain consistent across
266 the server & wsrep provider.
267 */
268 bool do_update= false;
269
270 // Get the locally stored uuid:seqno.
271 if (wsrep_get_SE_checkpoint(local_uuid, local_seqno))
272 {
273 return true;
274 }
275
276 if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
277 local_seqno < seqno)
278 {
279 do_update= true;
280 }
281 else if (local_seqno > seqno)
282 {
283 WSREP_WARN("SST position can't be set in past. Requested: %lld, Current: "
284 " %lld.", (long long)seqno, (long long)local_seqno);
285 /*
286 If we are here because of SET command, simply return true (error) instead of
287 aborting.
288 */
289 if (implicit)
290 {
291 WSREP_WARN("Can't continue.");
292 unireg_abort(1);
293 }
294 else
295 {
296 return true;
297 }
298 }
299
300#ifdef GTID_SUPPORT
301 wsrep_init_sidno(uuid);
302#endif /* GTID_SUPPORT */
303
304 if (wsrep)
305 {
306 int const rcode(seqno < 0 ? seqno : 0);
307 wsrep_gtid_t const state_id= {uuid,
308 (rcode ? WSREP_SEQNO_UNDEFINED : seqno)};
309
310 wsrep_status_t ret= wsrep->sst_received(wsrep, &state_id, state,
311 state_len, rcode);
312
313 if (ret != WSREP_OK)
314 {
315 return true;
316 }
317 }
318
319 // Now is the good time to update the local state and checkpoint.
320 if (do_update)
321 {
322 if (wsrep_set_SE_checkpoint(uuid, seqno))
323 {
324 return true;
325 }
326
327 local_uuid= uuid;
328 local_seqno= seqno;
329 }
330
331 return false;
332}
333
334// Let applier threads to continue
335bool wsrep_sst_continue ()
336{
337 if (sst_needed)
338 {
339 WSREP_INFO("Signalling provider to continue.");
340 return wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0, true);
341 }
342 return false;
343}
344
345struct sst_thread_arg
346{
347 const char* cmd;
348 char** env;
349 char* ret_str;
350 int err;
351 mysql_mutex_t lock;
352 mysql_cond_t cond;
353
354 sst_thread_arg (const char* c, char** e)
355 : cmd(c), env(e), ret_str(0), err(-1)
356 {
357 mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
358 mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
359 }
360
361 ~sst_thread_arg()
362 {
363 mysql_cond_destroy (&cond);
364 mysql_mutex_unlock (&lock);
365 mysql_mutex_destroy (&lock);
366 }
367};
368
369static int sst_scan_uuid_seqno (const char* str,
370 wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
371{
372 int offt = wsrep_uuid_scan (str, strlen(str), uuid);
373 errno= 0; /* Reset the errno */
374 if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
375 {
376 *seqno = strtoll (str + offt + 1, NULL, 10);
377 if (*seqno != LLONG_MAX || errno != ERANGE)
378 {
379 return 0;
380 }
381 }
382
383 WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
384 return EINVAL;
385}
386
387// get rid of trailing \n
388static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
389{
390 char* ret= fgets (buf, buf_len, stream);
391
392 if (ret)
393 {
394 size_t len = strlen(ret);
395 if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
396 }
397
398 return ret;
399}
400
401/*
402 Generate opt_binlog_opt_val for sst_donate_other(), sst_prepare_other().
403
404 Returns zero on success, negative error code otherwise.
405
406 String containing binlog name is stored in param ret if binlog is enabled
407 and GTID mode is on, otherwise empty string. Returned string should be
408 freed with my_free().
409 */
410static int generate_binlog_opt_val(char** ret)
411{
412 DBUG_ASSERT(ret);
413 *ret= NULL;
414 if (opt_bin_log)
415 {
416 assert(opt_bin_logname);
417 *ret= strcmp(opt_bin_logname, "0") ?
418 my_strdup(opt_bin_logname, MYF(0)) : my_strdup("", MYF(0));
419 }
420 else
421 {
422 *ret= my_strdup("", MYF(0));
423 }
424 if (!*ret) return -ENOMEM;
425 return 0;
426}
427
428static void* sst_joiner_thread (void* a)
429{
430 sst_thread_arg* arg= (sst_thread_arg*) a;
431 int err= 1;
432
433 {
434 const char magic[] = "ready";
435 const size_t magic_len = sizeof(magic) - 1;
436 const size_t out_len = 512;
437 char out[out_len];
438
439 WSREP_INFO("Running: '%s'", arg->cmd);
440
441 wsp::process proc (arg->cmd, "r", arg->env);
442
443 if (proc.pipe() && !proc.error())
444 {
445 const char* tmp= my_fgets (out, out_len, proc.pipe());
446
447 if (!tmp || strlen(tmp) < (magic_len + 2) ||
448 strncasecmp (tmp, magic, magic_len))
449 {
450 WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
451 magic, arg->cmd, tmp);
452 proc.wait();
453 if (proc.error()) err = proc.error();
454 }
455 else
456 {
457 err = 0;
458 }
459 }
460 else
461 {
462 err = proc.error();
463 WSREP_ERROR("Failed to execute: %s : %d (%s)",
464 arg->cmd, err, strerror(err));
465 }
466
467 // signal sst_prepare thread with ret code,
468 // it will go on sending SST request
469 mysql_mutex_lock (&arg->lock);
470 if (!err)
471 {
472 arg->ret_str = strdup (out + magic_len + 1);
473 if (!arg->ret_str) err = ENOMEM;
474 }
475 arg->err = -err;
476 mysql_cond_signal (&arg->cond);
477 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
478
479 if (err) return NULL; /* lp:808417 - return immediately, don't signal
480 * initializer thread to ensure single thread of
481 * shutdown. */
482
483 wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
484 wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
485
486 // in case of successfull receiver start, wait for SST completion/end
487 char* tmp = my_fgets (out, out_len, proc.pipe());
488
489 proc.wait();
490 err= EINVAL;
491
492 if (!tmp)
493 {
494 WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
495 "joiner script.");
496 if (proc.error()) err = proc.error();
497 }
498 else
499 {
500 // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
501 const char *pos= strchr(out, ' ');
502
503 if (!pos) {
504 // There is no wsrep_gtid_domain_id (some older version SST script?).
505 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
506
507 } else {
508 // Scan state ID first followed by wsrep_gtid_domain_id.
509 unsigned long int domain_id;
510
511 // Null-terminate the state-id.
512 out[pos - out]= 0;
513 err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
514
515 if (err)
516 {
517 goto err;
518 }
519 else if (wsrep_gtid_mode)
520 {
521 errno= 0; /* Reset the errno */
522 domain_id= strtoul(pos + 1, NULL, 10);
523 err= errno;
524
525 /* Check if we received a valid gtid_domain_id. */
526 if (err == EINVAL || err == ERANGE)
527 {
528 WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id.");
529 err= EINVAL;
530 goto err;
531 } else {
532 wsrep_gtid_domain_id= (uint32) domain_id;
533 }
534 }
535 }
536 }
537
538err:
539
540 if (err)
541 {
542 ret_uuid= WSREP_UUID_UNDEFINED;
543 ret_seqno= -err;
544 }
545
546 // Tell initializer thread that SST is complete
547 wsrep_sst_complete (&ret_uuid, ret_seqno, true);
548 }
549
550 return NULL;
551}
552
553#define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH"
554
555static int sst_append_auth_env(wsp::env& env, const char* sst_auth)
556{
557 int const sst_auth_size= strlen(WSREP_SST_AUTH_ENV) + 1 /* = */
558 + (sst_auth ? strlen(sst_auth) : 0) + 1 /* \0 */;
559
560 wsp::string sst_auth_str(sst_auth_size); // for automatic cleanup on return
561 if (!sst_auth_str()) return -ENOMEM;
562
563 int ret= snprintf(sst_auth_str(), sst_auth_size, "%s=%s",
564 WSREP_SST_AUTH_ENV, sst_auth ? sst_auth : "");
565
566 if (ret < 0 || ret >= sst_auth_size)
567 {
568 WSREP_ERROR("sst_append_auth_env(): snprintf() failed: %d", ret);
569 return (ret < 0 ? ret : -EMSGSIZE);
570 }
571
572 env.append(sst_auth_str());
573 return -env.error();
574}
575
576static ssize_t sst_prepare_other (const char* method,
577 const char* sst_auth,
578 const char* addr_in,
579 const char** addr_out)
580{
581 int const cmd_len= 4096;
582 wsp::string cmd_str(cmd_len);
583
584 if (!cmd_str())
585 {
586 WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes",
587 cmd_len);
588 return -ENOMEM;
589 }
590
591 const char* binlog_opt= "";
592 char* binlog_opt_val= NULL;
593
594 int ret;
595 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
596 {
597 WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d",
598 ret);
599 return ret;
600 }
601 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
602
603 make_wsrep_defaults_file();
604
605 ret= snprintf (cmd_str(), cmd_len,
606 "wsrep_sst_%s "
607 WSREP_SST_OPT_ROLE" 'joiner' "
608 WSREP_SST_OPT_ADDR" '%s' "
609 WSREP_SST_OPT_DATA" '%s' "
610 " %s "
611 WSREP_SST_OPT_PARENT" '%d'"
612 " %s '%s' ",
613 method, addr_in, mysql_real_data_home,
614 wsrep_defaults_file,
615 (int)getpid(), binlog_opt, binlog_opt_val);
616 my_free(binlog_opt_val);
617
618 if (ret < 0 || ret >= cmd_len)
619 {
620 WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
621 return (ret < 0 ? ret : -EMSGSIZE);
622 }
623
624 wsp::env env(NULL);
625 if (env.error())
626 {
627 WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error());
628 return -env.error();
629 }
630
631 if ((ret= sst_append_auth_env(env, sst_auth)))
632 {
633 WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret);
634 return ret;
635 }
636
637 pthread_t tmp;
638 sst_thread_arg arg(cmd_str(), env());
639 mysql_mutex_lock (&arg.lock);
640 ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
641 if (ret)
642 {
643 WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
644 ret, strerror(ret));
645 return -ret;
646 }
647 mysql_cond_wait (&arg.cond, &arg.lock);
648
649 *addr_out= arg.ret_str;
650
651 if (!arg.err)
652 ret = strlen(*addr_out);
653 else
654 {
655 assert (arg.err < 0);
656 ret = arg.err;
657 }
658
659 pthread_detach (tmp);
660
661 return ret;
662}
663
664extern uint mysqld_port;
665
666/*! Just tells donor where to send mysqldump */
667static ssize_t sst_prepare_mysqldump (const char* addr_in,
668 const char** addr_out)
669{
670 ssize_t ret = strlen (addr_in);
671
672 if (!strrchr(addr_in, ':'))
673 {
674 ssize_t s = ret + 7;
675 char* tmp = (char*) malloc (s);
676
677 if (tmp)
678 {
679 ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
680
681 if (ret > 0 && ret < s)
682 {
683 *addr_out= tmp;
684 return ret;
685 }
686 if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
687 free (tmp);
688 }
689 else {
690 ret= -ENOMEM;
691 }
692
693 WSREP_ERROR ("Could not prepare state transfer request: "
694 "adding default port failed: %zd.", ret);
695 }
696 else {
697 *addr_out= addr_in;
698 }
699
700 return ret;
701}
702
703static bool SE_initialized = false;
704
705ssize_t wsrep_sst_prepare (void** msg)
706{
707 const char* addr_in= NULL;
708 const char* addr_out= NULL;
709
710 if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
711 {
712 ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
713 *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
714 if (!msg)
715 {
716 WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
717 unireg_abort(1);
718 }
719 return ret;
720 }
721
722 /*
723 Figure out SST receive address. Common for all SST methods.
724 */
725 char ip_buf[256];
726 const ssize_t ip_max= sizeof(ip_buf);
727
728 // Attempt 1: wsrep_sst_receive_address
729 if (wsrep_sst_receive_address &&
730 strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
731 {
732 addr_in= wsrep_sst_receive_address;
733 }
734
735 //Attempt 2: wsrep_node_address
736 else if (wsrep_node_address && strlen(wsrep_node_address))
737 {
738 wsp::Address addr(wsrep_node_address);
739
740 if (!addr.is_valid())
741 {
742 WSREP_ERROR("Could not parse wsrep_node_address : %s",
743 wsrep_node_address);
744 unireg_abort(1);
745 }
746 memcpy(ip_buf, addr.get_address(), addr.get_address_len());
747 addr_in= ip_buf;
748 }
749 // Attempt 3: Try to get the IP from the list of available interfaces.
750 else
751 {
752 ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
753
754 if (ret && ret < ip_max)
755 {
756 addr_in= ip_buf;
757 }
758 else
759 {
760 WSREP_ERROR("Failed to guess address to accept state transfer. "
761 "wsrep_sst_receive_address must be set manually.");
762 unireg_abort(1);
763 }
764 }
765
766 ssize_t addr_len= -ENOSYS;
767 if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
768 {
769 addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
770 if (addr_len < 0) unireg_abort(1);
771 }
772 else
773 {
774 /*! A heuristic workaround until we learn how to stop and start engines */
775 if (SE_initialized)
776 {
777 // we already did SST at initializaiton, now engines are running
778 // sql_print_information() is here because the message is too long
779 // for WSREP_INFO.
780 sql_print_information ("WSREP: "
781 "You have configured '%s' state snapshot transfer method "
782 "which cannot be performed on a running server. "
783 "Wsrep provider won't be able to fall back to it "
784 "if other means of state transfer are unavailable. "
785 "In that case you will need to restart the server.",
786 wsrep_sst_method);
787 *msg = 0;
788 return 0;
789 }
790
791 addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
792 addr_in, &addr_out);
793 if (addr_len < 0)
794 {
795 WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
796 wsrep_sst_method);
797 unireg_abort(1);
798 }
799 }
800
801 size_t const method_len(strlen(wsrep_sst_method));
802 size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
803
804 *msg = malloc (msg_len);
805 if (NULL != *msg) {
806 char* const method_ptr(reinterpret_cast<char*>(*msg));
807 strcpy (method_ptr, wsrep_sst_method);
808 char* const addr_ptr(method_ptr + method_len + 1);
809 strcpy (addr_ptr, addr_out);
810
811 WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
812 }
813 else {
814 WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
815 msg_len);
816 unireg_abort(1);
817 }
818
819 if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
820
821 return msg_len;
822}
823
824// helper method for donors
825static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
826{
827 int ret = 0;
828
829 for (int tries=1; tries <= max_tries; tries++)
830 {
831 wsp::process proc (cmd_str, "r", env);
832
833 if (NULL != proc.pipe())
834 {
835 proc.wait();
836 }
837
838 if ((ret = proc.error()))
839 {
840 WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
841 tries, max_tries, proc.cmd(), ret, strerror(ret));
842 sleep (1);
843 }
844 else
845 {
846 WSREP_DEBUG("SST script successfully completed.");
847 break;
848 }
849 }
850
851 return -ret;
852}
853
854static void sst_reject_queries(my_bool close_conn)
855{
856 wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
857 WSREP_INFO("Rejecting client queries for the duration of SST.");
858 if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
859}
860
861static int sst_donate_mysqldump (const char* addr,
862 const wsrep_uuid_t* uuid,
863 const char* uuid_str,
864 wsrep_seqno_t seqno,
865 bool bypass,
866 char** env) // carries auth info
867{
868 char host[256];
869 wsp::Address address(addr);
870 if (!address.is_valid())
871 {
872 WSREP_ERROR("Could not parse SST address : %s", addr);
873 return 0;
874 }
875 memcpy(host, address.get_address(), address.get_address_len());
876 int port= address.get_port();
877 int const cmd_len= 4096;
878 wsp::string cmd_str(cmd_len);
879
880 if (!cmd_str())
881 {
882 WSREP_ERROR("sst_donate_mysqldump(): "
883 "could not allocate cmd buffer of %d bytes", cmd_len);
884 return -ENOMEM;
885 }
886
887 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
888
889 make_wsrep_defaults_file();
890
891 int ret= snprintf (cmd_str(), cmd_len,
892 "wsrep_sst_mysqldump "
893 WSREP_SST_OPT_ADDR" '%s' "
894 WSREP_SST_OPT_PORT" '%d' "
895 WSREP_SST_OPT_LPORT" '%u' "
896 WSREP_SST_OPT_SOCKET" '%s' "
897 " '%s' "
898 WSREP_SST_OPT_GTID" '%s:%lld' "
899 WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'"
900 "%s",
901 addr, port, mysqld_port, mysqld_unix_port,
902 wsrep_defaults_file, uuid_str,
903 (long long)seqno, wsrep_gtid_domain_id,
904 bypass ? " " WSREP_SST_OPT_BYPASS : "");
905
906 if (ret < 0 || ret >= cmd_len)
907 {
908 WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret);
909 return (ret < 0 ? ret : -EMSGSIZE);
910 }
911
912 WSREP_DEBUG("Running: '%s'", cmd_str());
913
914 ret= sst_run_shell (cmd_str(), env, 3);
915
916 wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
917
918 wsrep->sst_sent (wsrep, &state_id, ret);
919
920 return ret;
921}
922
923wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
924
925
926/*
927 Create a file under data directory.
928*/
929static int sst_create_file(const char *name, const char *content)
930{
931 int err= 0;
932 char *real_name;
933 char *tmp_name;
934 ssize_t len;
935 FILE *file;
936
937 len= strlen(mysql_real_data_home) + strlen(name) + 2;
938 real_name= (char *) alloca(len);
939
940 snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name);
941
942 tmp_name= (char *) alloca(len + 4);
943 snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name);
944
945 file= fopen(tmp_name, "w+");
946
947 if (0 == file)
948 {
949 err= errno;
950 WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err));
951 }
952 else
953 {
954 // Write the specified content into the file.
955 if (content != NULL)
956 {
957 fprintf(file, "%s\n", content);
958 fsync(fileno(file));
959 }
960
961 fclose(file);
962
963 if (rename(tmp_name, real_name) == -1)
964 {
965 err= errno;
966 WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name,
967 real_name, err, strerror(err));
968 }
969 }
970
971 return err;
972}
973
974
975static int run_sql_command(THD *thd, const char *query)
976{
977 thd->set_query((char *)query, strlen(query));
978
979 Parser_state ps;
980 if (ps.init(thd, thd->query(), thd->query_length()))
981 {
982 WSREP_ERROR("SST query: %s failed", query);
983 return -1;
984 }
985
986 mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE, FALSE);
987 if (thd->is_error())
988 {
989 int const err= thd->get_stmt_da()->sql_errno();
990 WSREP_WARN ("Error executing '%s': %d (%s)%s",
991 query, err, thd->get_stmt_da()->message(),
992 err == ER_UNKNOWN_SYSTEM_VARIABLE ?
993 ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
994 thd->clear_error();
995 return -1;
996 }
997 return 0;
998}
999
1000
1001static int sst_flush_tables(THD* thd)
1002{
1003 WSREP_INFO("Flushing tables for SST...");
1004
1005 int err= 0;
1006 int not_used;
1007 /*
1008 Files created to notify the SST script about the outcome of table flush
1009 operation.
1010 */
1011 const char *flush_success= "tables_flushed";
1012 const char *flush_error= "sst_error";
1013
1014 CHARSET_INFO *current_charset= thd->variables.character_set_client;
1015
1016 if (!is_supported_parser_charset(current_charset))
1017 {
1018 /* Do not use non-supported parser character sets */
1019 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1020 thd->variables.character_set_client = &my_charset_latin1;
1021 WSREP_WARN("For SST temporally setting character set to : %s",
1022 my_charset_latin1.csname);
1023 }
1024
1025 if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
1026 {
1027 err= -1;
1028 }
1029 else
1030 {
1031 /*
1032 Make sure logs are flushed after global read lock acquired. In case
1033 reload fails, we must also release the acquired FTWRL.
1034 */
1035 if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG,
1036 (TABLE_LIST*) 0, &not_used))
1037 {
1038 thd->global_read_lock.unlock_global_read_lock(thd);
1039 err= -1;
1040 }
1041 }
1042
1043 thd->variables.character_set_client = current_charset;
1044
1045 if (err)
1046 {
1047 WSREP_ERROR("Failed to flush and lock tables");
1048
1049 /*
1050 The SST must be aborted as the flush tables failed. Notify this to SST
1051 script by creating the error file.
1052 */
1053 int tmp;
1054 if ((tmp= sst_create_file(flush_error, NULL))) {
1055 err= tmp;
1056 }
1057 }
1058 else
1059 {
1060 WSREP_INFO("Tables flushed.");
1061
1062 /*
1063 Tables have been flushed. Create a file with cluster state ID and
1064 wsrep_gtid_domain_id.
1065 */
1066 char content[100];
1067 snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
1068 (long long)wsrep_locked_seqno, wsrep_gtid_domain_id);
1069 err= sst_create_file(flush_success, content);
1070 }
1071
1072 return err;
1073}
1074
1075
1076static void sst_disallow_writes (THD* thd, bool yes)
1077{
1078 char query_str[64] = { 0, };
1079 ssize_t const query_max = sizeof(query_str) - 1;
1080 CHARSET_INFO *current_charset;
1081
1082 current_charset = thd->variables.character_set_client;
1083
1084 if (!is_supported_parser_charset(current_charset))
1085 {
1086 /* Do not use non-supported parser character sets */
1087 WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
1088 thd->variables.character_set_client = &my_charset_latin1;
1089 WSREP_WARN("For SST temporally setting character set to : %s",
1090 my_charset_latin1.csname);
1091 }
1092
1093 snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
1094 yes ? 1 : 0);
1095
1096 if (run_sql_command(thd, query_str))
1097 {
1098 WSREP_ERROR("Failed to disallow InnoDB writes");
1099 }
1100 thd->variables.character_set_client = current_charset;
1101}
1102
1103static void* sst_donor_thread (void* a)
1104{
1105 sst_thread_arg* arg= (sst_thread_arg*)a;
1106
1107 WSREP_INFO("Running: '%s'", arg->cmd);
1108
1109 int err= 1;
1110 bool locked= false;
1111
1112 const char* out= NULL;
1113 const size_t out_len= 128;
1114 char out_buf[out_len];
1115
1116 wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
1117 wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
1118
1119 wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
1120 // operate with wsrep_ready == OFF
1121 wsp::process proc(arg->cmd, "r", arg->env);
1122
1123 err= proc.error();
1124
1125/* Inform server about SST script startup and release TO isolation */
1126 mysql_mutex_lock (&arg->lock);
1127 arg->err = -err;
1128 mysql_cond_signal (&arg->cond);
1129 mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
1130
1131 if (proc.pipe() && !err)
1132 {
1133wait_signal:
1134 out= my_fgets (out_buf, out_len, proc.pipe());
1135
1136 if (out)
1137 {
1138 const char magic_flush[]= "flush tables";
1139 const char magic_cont[]= "continue";
1140 const char magic_done[]= "done";
1141
1142 if (!strcasecmp (out, magic_flush))
1143 {
1144 err= sst_flush_tables (thd.ptr);
1145 if (!err)
1146 {
1147 sst_disallow_writes (thd.ptr, true);
1148 /*
1149 Lets also keep statements that modify binary logs (like RESET LOGS,
1150 RESET MASTER) from proceeding until the files have been transferred
1151 to the joiner node.
1152 */
1153 if (mysql_bin_log.is_open())
1154 {
1155 mysql_mutex_lock(mysql_bin_log.get_log_lock());
1156 }
1157
1158 locked= true;
1159 goto wait_signal;
1160 }
1161 }
1162 else if (!strcasecmp (out, magic_cont))
1163 {
1164 if (locked)
1165 {
1166 if (mysql_bin_log.is_open())
1167 {
1168 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1169 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1170 }
1171 sst_disallow_writes (thd.ptr, false);
1172 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1173 locked= false;
1174 }
1175 err= 0;
1176 goto wait_signal;
1177 }
1178 else if (!strncasecmp (out, magic_done, strlen(magic_done)))
1179 {
1180 err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
1181 &ret_uuid, &ret_seqno);
1182 }
1183 else
1184 {
1185 WSREP_WARN("Received unknown signal: '%s'", out);
1186 }
1187 }
1188 else
1189 {
1190 WSREP_ERROR("Failed to read from: %s", proc.cmd());
1191 proc.wait();
1192 }
1193 if (!err && proc.error()) err= proc.error();
1194 }
1195 else
1196 {
1197 WSREP_ERROR("Failed to execute: %s : %d (%s)",
1198 proc.cmd(), err, strerror(err));
1199 }
1200
1201 if (locked) // don't forget to unlock server before return
1202 {
1203 if (mysql_bin_log.is_open())
1204 {
1205 mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
1206 mysql_mutex_unlock(mysql_bin_log.get_log_lock());
1207 }
1208 sst_disallow_writes (thd.ptr, false);
1209 thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
1210 }
1211
1212 // signal to donor that SST is over
1213 struct wsrep_gtid const state_id = {
1214 ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
1215 };
1216 wsrep->sst_sent (wsrep, &state_id, -err);
1217 proc.wait();
1218
1219 return NULL;
1220}
1221
1222
1223
1224static int sst_donate_other (const char* method,
1225 const char* addr,
1226 const char* uuid,
1227 wsrep_seqno_t seqno,
1228 bool bypass,
1229 char** env) // carries auth info
1230{
1231 int const cmd_len= 4096;
1232 wsp::string cmd_str(cmd_len);
1233
1234 if (!cmd_str())
1235 {
1236 WSREP_ERROR("sst_donate_other(): "
1237 "could not allocate cmd buffer of %d bytes", cmd_len);
1238 return -ENOMEM;
1239 }
1240
1241 const char* binlog_opt= "";
1242 char* binlog_opt_val= NULL;
1243
1244 int ret;
1245 if ((ret= generate_binlog_opt_val(&binlog_opt_val)))
1246 {
1247 WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret);
1248 return ret;
1249 }
1250 if (strlen(binlog_opt_val)) binlog_opt= WSREP_SST_OPT_BINLOG;
1251
1252 make_wsrep_defaults_file();
1253
1254 ret= snprintf (cmd_str(), cmd_len,
1255 "wsrep_sst_%s "
1256 WSREP_SST_OPT_ROLE" 'donor' "
1257 WSREP_SST_OPT_ADDR" '%s' "
1258 WSREP_SST_OPT_SOCKET" '%s' "
1259 WSREP_SST_OPT_DATA" '%s' "
1260 " %s "
1261 " %s '%s' "
1262 WSREP_SST_OPT_GTID" '%s:%lld' "
1263 WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'"
1264 "%s",
1265 method, addr, mysqld_unix_port, mysql_real_data_home,
1266 wsrep_defaults_file,
1267 binlog_opt, binlog_opt_val,
1268 uuid, (long long) seqno, wsrep_gtid_domain_id,
1269 bypass ? " " WSREP_SST_OPT_BYPASS : "");
1270 my_free(binlog_opt_val);
1271
1272 if (ret < 0 || ret >= cmd_len)
1273 {
1274 WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
1275 return (ret < 0 ? ret : -EMSGSIZE);
1276 }
1277
1278 if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE);
1279
1280 pthread_t tmp;
1281 sst_thread_arg arg(cmd_str(), env);
1282 mysql_mutex_lock (&arg.lock);
1283 ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
1284 if (ret)
1285 {
1286 WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
1287 ret, strerror(ret));
1288 return ret;
1289 }
1290 mysql_cond_wait (&arg.cond, &arg.lock);
1291
1292 WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
1293 return arg.err;
1294}
1295
1296wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
1297 const void* msg, size_t msg_len,
1298 const wsrep_gtid_t* current_gtid,
1299 const char* state, size_t state_len,
1300 bool bypass)
1301{
1302 /* This will be reset when sync callback is called.
1303 * Should we set wsrep_ready to FALSE here too? */
1304
1305 wsrep_config_state->set(WSREP_MEMBER_DONOR);
1306
1307 const char* method = (char*)msg;
1308 size_t method_len = strlen (method);
1309 const char* data = method + method_len + 1;
1310
1311 char uuid_str[37];
1312 wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
1313
1314 wsp::env env(NULL);
1315 if (env.error())
1316 {
1317 WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error());
1318 return WSREP_CB_FAILURE;
1319 }
1320
1321 int ret;
1322 if ((ret= sst_append_auth_env(env, sst_auth_real)))
1323 {
1324 WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret);
1325 return WSREP_CB_FAILURE;
1326 }
1327
1328 if (!strcmp (WSREP_SST_MYSQLDUMP, method))
1329 {
1330 ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
1331 current_gtid->seqno, bypass, env());
1332 }
1333 else
1334 {
1335 ret = sst_donate_other(method, data, uuid_str,
1336 current_gtid->seqno, bypass, env());
1337 }
1338
1339 return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
1340}
1341
1342void wsrep_SE_init_grab()
1343{
1344 if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort();
1345}
1346
1347void wsrep_SE_init_wait()
1348{
1349 while (SE_initialized == false)
1350 {
1351 mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
1352 }
1353 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1354}
1355
1356void wsrep_SE_init_done()
1357{
1358 mysql_cond_signal (&COND_wsrep_sst_init);
1359 mysql_mutex_unlock (&LOCK_wsrep_sst_init);
1360}
1361
1362void wsrep_SE_initialized()
1363{
1364 SE_initialized = true;
1365}
1366