1/* Copyright (C) 2007 Google Inc.
2 Copyright (c) 2008, 2013, Oracle and/or its affiliates.
3 Copyright (c) 2011, 2016, MariaDB
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
18
19#include <my_global.h>
20#include "semisync_master.h"
21
22#define TIME_THOUSAND 1000
23#define TIME_MILLION 1000000
24#define TIME_BILLION 1000000000
25
26/* This indicates whether semi-synchronous replication is enabled. */
27my_bool rpl_semi_sync_master_enabled= 0;
28unsigned long long rpl_semi_sync_master_request_ack = 0;
29unsigned long long rpl_semi_sync_master_get_ack = 0;
30my_bool rpl_semi_sync_master_wait_no_slave = 1;
31my_bool rpl_semi_sync_master_status = 0;
32ulong rpl_semi_sync_master_wait_point =
33 SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
34ulong rpl_semi_sync_master_timeout;
35ulong rpl_semi_sync_master_trace_level;
36ulong rpl_semi_sync_master_yes_transactions = 0;
37ulong rpl_semi_sync_master_no_transactions = 0;
38ulong rpl_semi_sync_master_off_times = 0;
39ulong rpl_semi_sync_master_timefunc_fails = 0;
40ulong rpl_semi_sync_master_wait_timeouts = 0;
41ulong rpl_semi_sync_master_wait_sessions = 0;
42ulong rpl_semi_sync_master_wait_pos_backtraverse = 0;
43ulong rpl_semi_sync_master_avg_trx_wait_time = 0;
44ulonglong rpl_semi_sync_master_trx_wait_num = 0;
45ulong rpl_semi_sync_master_avg_net_wait_time = 0;
46ulonglong rpl_semi_sync_master_net_wait_num = 0;
47ulong rpl_semi_sync_master_clients = 0;
48ulonglong rpl_semi_sync_master_net_wait_time = 0;
49ulonglong rpl_semi_sync_master_trx_wait_time = 0;
50
51Repl_semi_sync_master repl_semisync_master;
52Ack_receiver ack_receiver;
53
54/*
55 structure to save transaction log filename and position
56*/
57typedef struct Trans_binlog_info {
58 my_off_t log_pos;
59 char log_file[FN_REFLEN];
60} Trans_binlog_info;
61
62static int get_wait_time(const struct timespec& start_ts);
63
64static ulonglong timespec_to_usec(const struct timespec *ts)
65{
66 return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
67}
68
69/*******************************************************************************
70 *
71 * <Active_tranx> class : manage all active transaction nodes
72 *
73 ******************************************************************************/
74
75Active_tranx::Active_tranx(mysql_mutex_t *lock,
76 ulong trace_level)
77 : Trace(trace_level), m_allocator(max_connections),
78 m_num_entries(max_connections << 1), /* Transaction hash table size
79 * is set to double the size
80 * of max_connections */
81 m_lock(lock)
82{
83 /* No transactions are in the list initially. */
84 m_trx_front = NULL;
85 m_trx_rear = NULL;
86
87 /* Create the hash table to find a transaction's ending event. */
88 m_trx_htb = new Tranx_node *[m_num_entries];
89 for (int idx = 0; idx < m_num_entries; ++idx)
90 m_trx_htb[idx] = NULL;
91
92 sql_print_information("Semi-sync replication initialized for transactions.");
93}
94
95Active_tranx::~Active_tranx()
96{
97 delete [] m_trx_htb;
98 m_trx_htb = NULL;
99 m_num_entries = 0;
100}
101
102unsigned int Active_tranx::calc_hash(const unsigned char *key, size_t length)
103{
104 unsigned int nr = 1, nr2 = 4;
105
106 /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
107 while (length--)
108 {
109 nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
110 nr2 += 3;
111 }
112 return((unsigned int) nr);
113}
114
115unsigned int Active_tranx::get_hash_value(const char *log_file_name,
116 my_off_t log_file_pos)
117{
118 unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
119 strlen(log_file_name));
120 unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
121 sizeof(log_file_pos));
122
123 return (hash1 + hash2) % m_num_entries;
124}
125
126int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
127 const char *log_file_name2, my_off_t log_file_pos2)
128{
129 int cmp = strcmp(log_file_name1, log_file_name2);
130
131 if (cmp != 0)
132 return cmp;
133
134 if (log_file_pos1 > log_file_pos2)
135 return 1;
136 else if (log_file_pos1 < log_file_pos2)
137 return -1;
138 return 0;
139}
140
141int Active_tranx::insert_tranx_node(const char *log_file_name,
142 my_off_t log_file_pos)
143{
144 Tranx_node *ins_node;
145 int result = 0;
146 unsigned int hash_val;
147
148 DBUG_ENTER("Active_tranx:insert_tranx_node");
149
150 ins_node = m_allocator.allocate_node();
151 if (!ins_node)
152 {
153 sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
154 "Active_tranx:insert_tranx_node",
155 log_file_name, (ulong)log_file_pos);
156 result = -1;
157 goto l_end;
158 }
159
160 /* insert the binlog position in the active transaction list. */
161 strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
162 ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
163 ins_node->log_pos = log_file_pos;
164
165 if (!m_trx_front)
166 {
167 /* The list is empty. */
168 m_trx_front = m_trx_rear = ins_node;
169 }
170 else
171 {
172 int cmp = compare(ins_node, m_trx_rear);
173 if (cmp > 0)
174 {
175 /* Compare with the tail first. If the transaction happens later in
176 * binlog, then make it the new tail.
177 */
178 m_trx_rear->next = ins_node;
179 m_trx_rear = ins_node;
180 }
181 else
182 {
183 /* Otherwise, it is an error because the transaction should hold the
184 * mysql_bin_log.LOCK_log when appending events.
185 */
186 sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
187 "new node (%s, %lu)", "Active_tranx:insert_tranx_node",
188 m_trx_rear->log_name, (ulong)m_trx_rear->log_pos,
189 ins_node->log_name, (ulong)ins_node->log_pos);
190 result = -1;
191 goto l_end;
192 }
193 }
194
195 hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos);
196 ins_node->hash_next = m_trx_htb[hash_val];
197 m_trx_htb[hash_val] = ins_node;
198
199 DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)",
200 "Active_tranx:insert_tranx_node",
201 ins_node->log_name, (ulong)ins_node->log_pos,
202 hash_val));
203 l_end:
204
205 DBUG_RETURN(result);
206}
207
208bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
209 my_off_t log_file_pos)
210{
211 DBUG_ENTER("Active_tranx::is_tranx_end_pos");
212
213 unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
214 Tranx_node *entry = m_trx_htb[hash_val];
215
216 while (entry != NULL)
217 {
218 if (compare(entry, log_file_name, log_file_pos) == 0)
219 break;
220
221 entry = entry->hash_next;
222 }
223
224 DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
225 "Active_tranx::is_tranx_end_pos",
226 log_file_name, (ulong)log_file_pos, hash_val));
227
228 DBUG_RETURN(entry != NULL);
229}
230
231int Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
232 my_off_t log_file_pos)
233{
234 Tranx_node *new_front;
235
236 DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
237
238 if (log_file_name != NULL)
239 {
240 new_front = m_trx_front;
241
242 while (new_front)
243 {
244 if (compare(new_front, log_file_name, log_file_pos) > 0)
245 break;
246 new_front = new_front->next;
247 }
248 }
249 else
250 {
251 /* If log_file_name is NULL, clear everything. */
252 new_front = NULL;
253 }
254
255 if (new_front == NULL)
256 {
257 /* No active transaction nodes after the call. */
258
259 /* Clear the hash table. */
260 memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *));
261 m_allocator.free_all_nodes();
262
263 /* Clear the active transaction list. */
264 if (m_trx_front != NULL)
265 {
266 m_trx_front = NULL;
267 m_trx_rear = NULL;
268 }
269
270 DBUG_PRINT("semisync", ("%s: cleared all nodes",
271 "Active_tranx::::clear_active_tranx_nodes"));
272 }
273 else if (new_front != m_trx_front)
274 {
275 Tranx_node *curr_node, *next_node;
276
277 /* Delete all transaction nodes before the confirmation point. */
278 int n_frees = 0;
279 curr_node = m_trx_front;
280 while (curr_node != new_front)
281 {
282 next_node = curr_node->next;
283 n_frees++;
284
285 /* Remove the node from the hash table. */
286 unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos);
287 Tranx_node **hash_ptr = &(m_trx_htb[hash_val]);
288 while ((*hash_ptr) != NULL)
289 {
290 if ((*hash_ptr) == curr_node)
291 {
292 (*hash_ptr) = curr_node->hash_next;
293 break;
294 }
295 hash_ptr = &((*hash_ptr)->hash_next);
296 }
297
298 curr_node = next_node;
299 }
300
301 m_trx_front = new_front;
302 m_allocator.free_nodes_before(m_trx_front);
303
304 DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)",
305 "Active_tranx::::clear_active_tranx_nodes",
306 n_frees,
307 m_trx_front->log_name, (ulong)m_trx_front->log_pos));
308 }
309
310 DBUG_RETURN(0);
311}
312
313
314/*******************************************************************************
315 *
316 * <Repl_semi_sync_master> class: the basic code layer for syncsync master.
317 * <Repl_semi_sync_slave> class: the basic code layer for syncsync slave.
318 *
319 * The most important functions during semi-syn replication listed:
320 *
321 * Master:
322 * . report_reply_binlog(): called by the binlog dump thread when it receives
323 * the slave's status information.
324 * . update_sync_header(): based on transaction waiting information, decide
325 * whether to request the slave to reply.
326 * . write_tranx_in_binlog(): called by the transaction thread when it finishes
327 * writing all transaction events in binlog.
328 * . commit_trx(): transaction thread wait for the slave reply.
329 *
330 * Slave:
331 * . slave_read_sync_header(): read the semi-sync header from the master, get
332 * the sync status and get the payload for events.
333 * . slave_reply(): reply to the master about the replication progress.
334 *
335 ******************************************************************************/
336
337Repl_semi_sync_master::Repl_semi_sync_master()
338 : m_active_tranxs(NULL),
339 m_init_done(false),
340 m_reply_file_name_inited(false),
341 m_reply_file_pos(0L),
342 m_wait_file_name_inited(false),
343 m_wait_file_pos(0),
344 m_master_enabled(false),
345 m_wait_timeout(0L),
346 m_state(0),
347 m_wait_point(0)
348{
349 strcpy(m_reply_file_name, "");
350 strcpy(m_wait_file_name, "");
351}
352
353int Repl_semi_sync_master::init_object()
354{
355 int result;
356
357 m_init_done = true;
358
359 /* References to the parameter works after set_options(). */
360 set_wait_timeout(rpl_semi_sync_master_timeout);
361 set_trace_level(rpl_semi_sync_master_trace_level);
362 set_wait_point(rpl_semi_sync_master_wait_point);
363
364 /* Mutex initialization can only be done after MY_INIT(). */
365 mysql_mutex_init(key_LOCK_binlog,
366 &LOCK_binlog, MY_MUTEX_INIT_FAST);
367 mysql_cond_init(key_COND_binlog_send,
368 &COND_binlog_send, NULL);
369
370 if (rpl_semi_sync_master_enabled)
371 {
372 result = enable_master();
373 if (!result)
374 result= ack_receiver.start(); /* Start the ACK thread. */
375 }
376 else
377 {
378 result = disable_master();
379 }
380
381 /*
382 If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
383 switch off semisync to avoid hang if there's none active slave.
384 */
385 if (!rpl_semi_sync_master_wait_no_slave)
386 switch_off();
387
388 return result;
389}
390
391int Repl_semi_sync_master::enable_master()
392{
393 int result = 0;
394
395 /* Must have the lock when we do enable of disable. */
396 lock();
397
398 if (!get_master_enabled())
399 {
400 m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
401 if (m_active_tranxs != NULL)
402 {
403 m_commit_file_name_inited = false;
404 m_reply_file_name_inited = false;
405 m_wait_file_name_inited = false;
406
407 set_master_enabled(true);
408 m_state = true;
409 sql_print_information("Semi-sync replication enabled on the master.");
410 }
411 else
412 {
413 sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
414 result = -1;
415 }
416 }
417
418 unlock();
419
420 return result;
421}
422
423int Repl_semi_sync_master::disable_master()
424{
425 /* Must have the lock when we do enable of disable. */
426 lock();
427
428 if (get_master_enabled())
429 {
430 /* Switch off the semi-sync first so that waiting transaction will be
431 * waken up.
432 */
433 switch_off();
434
435 assert(m_active_tranxs != NULL);
436 delete m_active_tranxs;
437 m_active_tranxs = NULL;
438
439 m_reply_file_name_inited = false;
440 m_wait_file_name_inited = false;
441 m_commit_file_name_inited = false;
442
443 set_master_enabled(false);
444 sql_print_information("Semi-sync replication disabled on the master.");
445 }
446
447 unlock();
448
449 return 0;
450}
451
452void Repl_semi_sync_master::cleanup()
453{
454 if (m_init_done)
455 {
456 mysql_mutex_destroy(&LOCK_binlog);
457 mysql_cond_destroy(&COND_binlog_send);
458 m_init_done= 0;
459 }
460
461 delete m_active_tranxs;
462}
463
464void Repl_semi_sync_master::lock()
465{
466 mysql_mutex_lock(&LOCK_binlog);
467}
468
469void Repl_semi_sync_master::unlock()
470{
471 mysql_mutex_unlock(&LOCK_binlog);
472}
473
474void Repl_semi_sync_master::cond_broadcast()
475{
476 mysql_cond_broadcast(&COND_binlog_send);
477}
478
479int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
480{
481 int wait_res;
482
483 DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
484
485 wait_res= mysql_cond_timedwait(&COND_binlog_send,
486 &LOCK_binlog, wait_time);
487
488 DBUG_RETURN(wait_res);
489}
490
491void Repl_semi_sync_master::add_slave()
492{
493 lock();
494 rpl_semi_sync_master_clients++;
495 unlock();
496}
497
498void Repl_semi_sync_master::remove_slave()
499{
500 lock();
501 rpl_semi_sync_master_clients--;
502
503 /* Only switch off if semi-sync is enabled and is on */
504 if (get_master_enabled() && is_on())
505 {
506 /* If user has chosen not to wait if no semi-sync slave available
507 and the last semi-sync slave exits, turn off semi-sync on master
508 immediately.
509 */
510 if (!rpl_semi_sync_master_wait_no_slave &&
511 rpl_semi_sync_master_clients == 0)
512 switch_off();
513 }
514 unlock();
515}
516
517int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
518 const uchar *packet,
519 ulong packet_len)
520{
521 int result= -1;
522 char log_file_name[FN_REFLEN+1];
523 my_off_t log_file_pos;
524 ulong log_file_len = 0;
525
526 DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
527
528 if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
529 Repl_semi_sync_master::k_packet_magic_num))
530 {
531 sql_print_error("Read semi-sync reply magic number error");
532 goto l_end;
533 }
534
535 if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
536 {
537 sql_print_error("Read semi-sync reply length error: packet is too small");
538 goto l_end;
539 }
540
541 log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
542 log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
543 if (unlikely(log_file_len >= FN_REFLEN))
544 {
545 sql_print_error("Read semi-sync reply binlog file length too large");
546 goto l_end;
547 }
548 strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
549 log_file_name[log_file_len] = 0;
550
551 DBUG_ASSERT(dirname_length(log_file_name) == 0);
552
553 DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u",
554 "Repl_semi_sync_master::report_reply_packet",
555 log_file_name, (ulong)log_file_pos, server_id));
556
557 rpl_semi_sync_master_get_ack++;
558 report_reply_binlog(server_id, log_file_name, log_file_pos);
559
560l_end:
561
562 DBUG_RETURN(result);
563}
564
565int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
566 const char *log_file_name,
567 my_off_t log_file_pos)
568{
569 int cmp;
570 bool can_release_threads = false;
571 bool need_copy_send_pos = true;
572
573 DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
574
575 if (!(get_master_enabled()))
576 DBUG_RETURN(0);
577
578 lock();
579
580 /* This is the real check inside the mutex. */
581 if (!get_master_enabled())
582 goto l_end;
583
584 if (!is_on())
585 /* We check to see whether we can switch semi-sync ON. */
586 try_switch_on(server_id, log_file_name, log_file_pos);
587
588 /* The position should increase monotonically, if there is only one
589 * thread sending the binlog to the slave.
590 * In reality, to improve the transaction availability, we allow multiple
591 * sync replication slaves. So, if any one of them get the transaction,
592 * the transaction session in the primary can move forward.
593 */
594 if (m_reply_file_name_inited)
595 {
596 cmp = Active_tranx::compare(log_file_name, log_file_pos,
597 m_reply_file_name, m_reply_file_pos);
598
599 /* If the requested position is behind the sending binlog position,
600 * would not adjust sending binlog position.
601 * We based on the assumption that there are multiple semi-sync slave,
602 * and at least one of them shou/ld be up to date.
603 * If all semi-sync slaves are behind, at least initially, the primary
604 * can find the situation after the waiting timeout. After that, some
605 * slaves should catch up quickly.
606 */
607 if (cmp < 0)
608 {
609 /* If the position is behind, do not copy it. */
610 need_copy_send_pos = false;
611 }
612 }
613
614 if (need_copy_send_pos)
615 {
616 strmake_buf(m_reply_file_name, log_file_name);
617 m_reply_file_pos = log_file_pos;
618 m_reply_file_name_inited = true;
619
620 /* Remove all active transaction nodes before this point. */
621 assert(m_active_tranxs != NULL);
622 m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
623
624 DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
625 "Repl_semi_sync_master::report_reply_binlog",
626 log_file_name, (ulong)log_file_pos));
627 }
628
629 if (rpl_semi_sync_master_wait_sessions > 0)
630 {
631 /* Let us check if some of the waiting threads doing a trx
632 * commit can now proceed.
633 */
634 cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
635 m_wait_file_name, m_wait_file_pos);
636 if (cmp >= 0)
637 {
638 /* Yes, at least one waiting thread can now proceed:
639 * let us release all waiting threads with a broadcast
640 */
641 can_release_threads = true;
642 m_wait_file_name_inited = false;
643 }
644 }
645
646 l_end:
647 unlock();
648
649 if (can_release_threads)
650 {
651 DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
652 "Repl_semi_sync_master::report_reply_binlog"));
653
654 cond_broadcast();
655 }
656
657 DBUG_RETURN(0);
658}
659
660int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
661{
662 if (!get_master_enabled())
663 return 0;
664
665 int ret= 0;
666 if(log_pos &&
667 wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
668 ret= commit_trx(log_file + dirname_length(log_file), log_pos);
669
670 return ret;
671}
672
673int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all)
674{
675 if (!get_master_enabled())
676 return 0;
677
678 int ret= 0;
679 const char *log_file;
680 my_off_t log_pos;
681
682 bool is_real_trans=
683 (all || thd->transaction.all.ha_list == 0);
684 /*
685 The coordinates are propagated to this point having been computed
686 in report_binlog_update
687 */
688 Trans_binlog_info *log_info= thd->semisync_info;
689 log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
690 log_pos= log_info ? log_info->log_pos : 0;
691
692 DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
693
694 if (is_real_trans &&
695 log_pos &&
696 wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
697 ret= commit_trx(log_file, log_pos);
698
699 if (is_real_trans && log_info)
700 {
701 log_info->log_file[0]= 0;
702 log_info->log_pos= 0;
703 }
704
705 return ret;
706}
707
708int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
709{
710 return wait_after_commit(thd, all);
711}
712
713/**
714 The method runs after flush to binary log is done.
715*/
716int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
717 my_off_t log_pos)
718{
719 if (get_master_enabled())
720 {
721 Trans_binlog_info *log_info;
722
723 if (!(log_info= thd->semisync_info))
724 {
725 if(!(log_info=
726 (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
727 return 1;
728 thd->semisync_info= log_info;
729 }
730 strcpy(log_info->log_file, log_file + dirname_length(log_file));
731 log_info->log_pos = log_pos;
732
733 return write_tranx_in_binlog(log_info->log_file, log_pos);
734 }
735
736 return 0;
737}
738
739int Repl_semi_sync_master::dump_start(THD* thd,
740 const char *log_file,
741 my_off_t log_pos)
742{
743 if (!thd->semi_sync_slave)
744 return 0;
745
746 if (ack_receiver.add_slave(thd))
747 {
748 sql_print_error("Failed to register slave to semi-sync ACK receiver "
749 "thread. Turning off semisync");
750 thd->semi_sync_slave= 0;
751 return 1;
752 }
753
754 add_slave();
755 report_reply_binlog(thd->variables.server_id,
756 log_file + dirname_length(log_file), log_pos);
757 sql_print_information("Start semi-sync binlog_dump to slave "
758 "(server_id: %ld), pos(%s, %lu)",
759 (long) thd->variables.server_id, log_file,
760 (ulong) log_pos);
761
762 return 0;
763}
764
765void Repl_semi_sync_master::dump_end(THD* thd)
766{
767 if (!thd->semi_sync_slave)
768 return;
769
770 sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %ld)",
771 (long) thd->variables.server_id);
772
773 remove_slave();
774 ack_receiver.remove_slave(thd);
775
776 return;
777}
778
779int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
780 my_off_t trx_wait_binlog_pos)
781{
782 DBUG_ENTER("Repl_semi_sync_master::commit_trx");
783
784 if (get_master_enabled() && trx_wait_binlog_name)
785 {
786 struct timespec start_ts;
787 struct timespec abstime;
788 int wait_result;
789 PSI_stage_info old_stage;
790 THD *thd= current_thd;
791
792 set_timespec(start_ts, 0);
793
794 DEBUG_SYNC(thd, "rpl_semisync_master_commit_trx_before_lock");
795 /* Acquire the mutex. */
796 lock();
797
798 /* This must be called after acquired the lock */
799 THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog,
800 & stage_waiting_for_semi_sync_ack_from_slave,
801 & old_stage);
802
803 /* This is the real check inside the mutex. */
804 if (!get_master_enabled() || !is_on())
805 goto l_end;
806
807 DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)\n",
808 "Repl_semi_sync_master::commit_trx",
809 trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
810 (int)is_on()));
811
812 while (is_on() && !thd_killed(thd))
813 {
814 if (m_reply_file_name_inited)
815 {
816 int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
817 trx_wait_binlog_name,
818 trx_wait_binlog_pos);
819 if (cmp >= 0)
820 {
821 /* We have already sent the relevant binlog to the slave: no need to
822 * wait here.
823 */
824 DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),",
825 "Repl_semi_sync_master::commit_trx",
826 m_reply_file_name,
827 (ulong)m_reply_file_pos));
828 break;
829 }
830 }
831
832 /* Let us update the info about the minimum binlog position of waiting
833 * threads.
834 */
835 if (m_wait_file_name_inited)
836 {
837 int cmp = Active_tranx::compare(trx_wait_binlog_name,
838 trx_wait_binlog_pos,
839 m_wait_file_name, m_wait_file_pos);
840 if (cmp <= 0)
841 {
842 /* This thd has a lower position, let's update the minimum info. */
843 strmake_buf(m_wait_file_name, trx_wait_binlog_name);
844 m_wait_file_pos = trx_wait_binlog_pos;
845
846 rpl_semi_sync_master_wait_pos_backtraverse++;
847 DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),",
848 "Repl_semi_sync_master::commit_trx",
849 m_wait_file_name, (ulong)m_wait_file_pos));
850 }
851 }
852 else
853 {
854 strmake_buf(m_wait_file_name, trx_wait_binlog_name);
855 m_wait_file_pos = trx_wait_binlog_pos;
856 m_wait_file_name_inited = true;
857
858 DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),",
859 "Repl_semi_sync_master::commit_trx",
860 m_wait_file_name, (ulong)m_wait_file_pos));
861 }
862
863 /* Calcuate the waiting period. */
864 long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
865 long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
866 long nsecs = start_ts.tv_nsec + diff_nsecs;
867 abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
868 abstime.tv_nsec = nsecs % TIME_BILLION;
869
870 /* In semi-synchronous replication, we wait until the binlog-dump
871 * thread has received the reply on the relevant binlog segment from the
872 * replication slave.
873 *
874 * Let us suspend this thread to wait on the condition;
875 * when replication has progressed far enough, we will release
876 * these waiting threads.
877 */
878 rpl_semi_sync_master_wait_sessions++;
879
880 DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
881 "Repl_semi_sync_master::commit_trx",
882 m_wait_timeout,
883 m_wait_file_name, (ulong)m_wait_file_pos));
884
885 wait_result = cond_timewait(&abstime);
886 rpl_semi_sync_master_wait_sessions--;
887
888 if (wait_result != 0)
889 {
890 /* This is a real wait timeout. */
891 sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
892 "semi-sync up to file %s, position %lu.",
893 trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
894 m_reply_file_name, (ulong)m_reply_file_pos);
895 rpl_semi_sync_master_wait_timeouts++;
896
897 /* switch semi-sync off */
898 switch_off();
899 }
900 else
901 {
902 int wait_time;
903
904 wait_time = get_wait_time(start_ts);
905 if (wait_time < 0)
906 {
907 DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at "
908 "wait position (%s, %lu)",
909 trx_wait_binlog_name,
910 (ulong)trx_wait_binlog_pos));
911 rpl_semi_sync_master_timefunc_fails++;
912 }
913 else
914 {
915 rpl_semi_sync_master_trx_wait_num++;
916 rpl_semi_sync_master_trx_wait_time += wait_time;
917 }
918 }
919 }
920
921 /*
922 At this point, the binlog file and position of this transaction
923 must have been removed from Active_tranx.
924 m_active_tranxs may be NULL if someone disabled semi sync during
925 cond_timewait()
926 */
927 assert(thd_killed(thd) || !m_active_tranxs ||
928 !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
929 trx_wait_binlog_pos));
930
931 l_end:
932 /* Update the status counter. */
933 if (is_on())
934 rpl_semi_sync_master_yes_transactions++;
935 else
936 rpl_semi_sync_master_no_transactions++;
937
938 /* The lock held will be released by thd_exit_cond, so no need to
939 call unlock() here */
940 THD_EXIT_COND(thd, &old_stage);
941 }
942
943 DBUG_RETURN(0);
944}
945
946/* Indicate that semi-sync replication is OFF now.
947 *
948 * What should we do when it is disabled? The problem is that we want
949 * the semi-sync replication enabled again when the slave catches up
950 * later. But, it is not that easy to detect that the slave has caught
951 * up. This is caused by the fact that MySQL's replication protocol is
952 * asynchronous, meaning that if the master does not use the semi-sync
953 * protocol, the slave would not send anything to the master.
954 * Still, if the master is sending (N+1)-th event, we assume that it is
955 * an indicator that the slave has received N-th event and earlier ones.
956 *
957 * If semi-sync is disabled, all transactions still update the wait
958 * position with the last position in binlog. But no transactions will
959 * wait for confirmations and the active transaction list would not be
960 * maintained. In binlog dump thread, update_sync_header() checks whether
961 * the current sending event catches up with last wait position. If it
962 * does match, semi-sync will be switched on again.
963 */
964int Repl_semi_sync_master::switch_off()
965{
966 int result;
967
968 DBUG_ENTER("Repl_semi_sync_master::switch_off");
969
970 m_state = false;
971
972 /* Clear the active transaction list. */
973 assert(m_active_tranxs != NULL);
974 result = m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
975
976 rpl_semi_sync_master_off_times++;
977 m_wait_file_name_inited = false;
978 m_reply_file_name_inited = false;
979 sql_print_information("Semi-sync replication switched OFF.");
980 cond_broadcast(); /* wake up all waiting threads */
981
982 DBUG_RETURN(result);
983}
984
985int Repl_semi_sync_master::try_switch_on(int server_id,
986 const char *log_file_name,
987 my_off_t log_file_pos)
988{
989 bool semi_sync_on = false;
990
991 DBUG_ENTER("Repl_semi_sync_master::try_switch_on");
992
993 /* If the current sending event's position is larger than or equal to the
994 * 'largest' commit transaction binlog position, the slave is already
995 * catching up now and we can switch semi-sync on here.
996 * If m_commit_file_name_inited indicates there are no recent transactions,
997 * we can enable semi-sync immediately.
998 */
999 if (m_commit_file_name_inited)
1000 {
1001 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1002 m_commit_file_name, m_commit_file_pos);
1003 semi_sync_on = (cmp >= 0);
1004 }
1005 else
1006 {
1007 semi_sync_on = true;
1008 }
1009
1010 if (semi_sync_on)
1011 {
1012 /* Switch semi-sync replication on. */
1013 m_state = true;
1014
1015 sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
1016 "at (%s, %lu)",
1017 server_id, log_file_name,
1018 (ulong)log_file_pos);
1019 }
1020
1021 DBUG_RETURN(0);
1022}
1023
1024int Repl_semi_sync_master::reserve_sync_header(String* packet)
1025{
1026 DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
1027
1028 /* Set the magic number and the sync status. By default, no sync
1029 * is required.
1030 */
1031 packet->append(reinterpret_cast<const char*>(k_sync_header),
1032 sizeof(k_sync_header));
1033 DBUG_RETURN(0);
1034}
1035
1036int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
1037 const char *log_file_name,
1038 my_off_t log_file_pos,
1039 bool* need_sync)
1040{
1041 int cmp = 0;
1042 bool sync = false;
1043
1044 DBUG_ENTER("Repl_semi_sync_master::update_sync_header");
1045
1046 /* If the semi-sync master is not enabled, or the slave is not a semi-sync
1047 * target, do not request replies from the slave.
1048 */
1049 if (!get_master_enabled() || !thd->semi_sync_slave)
1050 {
1051 *need_sync = false;
1052 DBUG_RETURN(0);
1053 }
1054
1055 lock();
1056
1057 /* This is the real check inside the mutex. */
1058 if (!get_master_enabled())
1059 {
1060 assert(sync == false);
1061 goto l_end;
1062 }
1063
1064 if (is_on())
1065 {
1066 /* semi-sync is ON */
1067 sync = false; /* No sync unless a transaction is involved. */
1068
1069 if (m_reply_file_name_inited)
1070 {
1071 cmp = Active_tranx::compare(log_file_name, log_file_pos,
1072 m_reply_file_name, m_reply_file_pos);
1073 if (cmp <= 0)
1074 {
1075 /* If we have already got the reply for the event, then we do
1076 * not need to sync the transaction again.
1077 */
1078 goto l_end;
1079 }
1080 }
1081
1082 if (m_wait_file_name_inited)
1083 {
1084 cmp = Active_tranx::compare(log_file_name, log_file_pos,
1085 m_wait_file_name, m_wait_file_pos);
1086 }
1087 else
1088 {
1089 cmp = 1;
1090 }
1091
1092 /* If we are already waiting for some transaction replies which
1093 * are later in binlog, do not wait for this one event.
1094 */
1095 if (cmp >= 0)
1096 {
1097 /*
1098 * We only wait if the event is a transaction's ending event.
1099 */
1100 assert(m_active_tranxs != NULL);
1101 sync = m_active_tranxs->is_tranx_end_pos(log_file_name,
1102 log_file_pos);
1103 }
1104 }
1105 else
1106 {
1107 if (m_commit_file_name_inited)
1108 {
1109 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1110 m_commit_file_name, m_commit_file_pos);
1111 sync = (cmp >= 0);
1112 }
1113 else
1114 {
1115 sync = true;
1116 }
1117 }
1118
1119 DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)",
1120 "Repl_semi_sync_master::update_sync_header",
1121 thd->variables.server_id, log_file_name,
1122 (ulong)log_file_pos, sync, (int)is_on()));
1123 *need_sync= sync;
1124
1125 l_end:
1126 unlock();
1127
1128 /* We do not need to clear sync flag because we set it to 0 when we
1129 * reserve the packet header.
1130 */
1131 if (sync)
1132 {
1133 (packet)[2] = k_packet_flag_sync;
1134 }
1135
1136 DBUG_RETURN(0);
1137}
1138
1139int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
1140 my_off_t log_file_pos)
1141{
1142 int result = 0;
1143
1144 DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog");
1145
1146 lock();
1147
1148 /* This is the real check inside the mutex. */
1149 if (!get_master_enabled())
1150 goto l_end;
1151
1152 /* Update the 'largest' transaction commit position seen so far even
1153 * though semi-sync is switched off.
1154 * It is much better that we update m_commit_file* here, instead of
1155 * inside commit_trx(). This is mostly because update_sync_header()
1156 * will watch for m_commit_file* to decide whether to switch semi-sync
1157 * on. The detailed reason is explained in function update_sync_header().
1158 */
1159 if (m_commit_file_name_inited)
1160 {
1161 int cmp = Active_tranx::compare(log_file_name, log_file_pos,
1162 m_commit_file_name, m_commit_file_pos);
1163 if (cmp > 0)
1164 {
1165 /* This is a larger position, let's update the maximum info. */
1166 strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1167 m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1168 m_commit_file_pos = log_file_pos;
1169 }
1170 }
1171 else
1172 {
1173 strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
1174 m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
1175 m_commit_file_pos = log_file_pos;
1176 m_commit_file_name_inited = true;
1177 }
1178
1179 if (is_on())
1180 {
1181 assert(m_active_tranxs != NULL);
1182 if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
1183 {
1184 /*
1185 if insert tranx_node failed, print a warning message
1186 and turn off semi-sync
1187 */
1188 sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1189 log_file_name, (ulong)log_file_pos);
1190 switch_off();
1191 }
1192 else
1193 {
1194 rpl_semi_sync_master_request_ack++;
1195 }
1196 }
1197
1198 l_end:
1199 unlock();
1200
1201 DBUG_RETURN(result);
1202}
1203
1204int Repl_semi_sync_master::flush_net(THD *thd,
1205 const char *event_buf)
1206{
1207 int result = -1;
1208 NET* net= &thd->net;
1209
1210 DBUG_ENTER("Repl_semi_sync_master::flush_net");
1211
1212 assert((unsigned char)event_buf[1] == k_packet_magic_num);
1213 if ((unsigned char)event_buf[2] != k_packet_flag_sync)
1214 {
1215 /* current event does not require reply */
1216 result = 0;
1217 goto l_end;
1218 }
1219
1220 /* We flush to make sure that the current event is sent to the network,
1221 * instead of being buffered in the TCP/IP stack.
1222 */
1223 if (net_flush(net))
1224 {
1225 sql_print_error("Semi-sync master failed on net_flush() "
1226 "before waiting for slave reply");
1227 goto l_end;
1228 }
1229
1230 net_clear(net, 0);
1231 net->pkt_nr++;
1232 result = 0;
1233 rpl_semi_sync_master_net_wait_num++;
1234
1235 l_end:
1236 thd->clear_error();
1237
1238 DBUG_RETURN(result);
1239}
1240
1241int Repl_semi_sync_master::after_reset_master()
1242{
1243 int result = 0;
1244
1245 DBUG_ENTER("Repl_semi_sync_master::after_reset_master");
1246
1247 if (rpl_semi_sync_master_enabled)
1248 {
1249 sql_print_information("Enable Semi-sync Master after reset master");
1250 enable_master();
1251 }
1252
1253 lock();
1254
1255 if (rpl_semi_sync_master_clients == 0 &&
1256 !rpl_semi_sync_master_wait_no_slave)
1257 m_state = 0;
1258 else
1259 m_state = get_master_enabled()? 1 : 0;
1260
1261 m_wait_file_name_inited = false;
1262 m_reply_file_name_inited = false;
1263 m_commit_file_name_inited = false;
1264
1265 rpl_semi_sync_master_yes_transactions = 0;
1266 rpl_semi_sync_master_no_transactions = 0;
1267 rpl_semi_sync_master_off_times = 0;
1268 rpl_semi_sync_master_timefunc_fails = 0;
1269 rpl_semi_sync_master_wait_sessions = 0;
1270 rpl_semi_sync_master_wait_pos_backtraverse = 0;
1271 rpl_semi_sync_master_trx_wait_num = 0;
1272 rpl_semi_sync_master_trx_wait_time = 0;
1273 rpl_semi_sync_master_net_wait_num = 0;
1274 rpl_semi_sync_master_net_wait_time = 0;
1275
1276 unlock();
1277
1278 DBUG_RETURN(result);
1279}
1280
1281int Repl_semi_sync_master::before_reset_master()
1282{
1283 int result = 0;
1284
1285 DBUG_ENTER("Repl_semi_sync_master::before_reset_master");
1286
1287 if (rpl_semi_sync_master_enabled)
1288 disable_master();
1289
1290 DBUG_RETURN(result);
1291}
1292
1293void Repl_semi_sync_master::check_and_switch()
1294{
1295 lock();
1296 if (get_master_enabled() && is_on())
1297 {
1298 if (!rpl_semi_sync_master_wait_no_slave
1299 && rpl_semi_sync_master_clients == 0)
1300 switch_off();
1301 }
1302 unlock();
1303}
1304
1305void Repl_semi_sync_master::set_export_stats()
1306{
1307 lock();
1308
1309 rpl_semi_sync_master_status = m_state;
1310 rpl_semi_sync_master_avg_trx_wait_time=
1311 ((rpl_semi_sync_master_trx_wait_num) ?
1312 (ulong)((double)rpl_semi_sync_master_trx_wait_time /
1313 ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1314 rpl_semi_sync_master_avg_net_wait_time=
1315 ((rpl_semi_sync_master_net_wait_num) ?
1316 (ulong)((double)rpl_semi_sync_master_net_wait_time /
1317 ((double)rpl_semi_sync_master_net_wait_num)) : 0);
1318
1319 unlock();
1320}
1321
1322/* Get the waiting time given the wait's staring time.
1323 *
1324 * Return:
1325 * >= 0: the waiting time in microsecons(us)
1326 * < 0: error in get time or time back traverse
1327 */
1328static int get_wait_time(const struct timespec& start_ts)
1329{
1330 ulonglong start_usecs, end_usecs;
1331 struct timespec end_ts;
1332
1333 /* Starting time in microseconds(us). */
1334 start_usecs = timespec_to_usec(&start_ts);
1335
1336 /* Get the wait time interval. */
1337 set_timespec(end_ts, 0);
1338
1339 /* Ending time in microseconds(us). */
1340 end_usecs = timespec_to_usec(&end_ts);
1341
1342 if (end_usecs < start_usecs)
1343 return -1;
1344
1345 return (int)(end_usecs - start_usecs);
1346}
1347
1348void semi_sync_master_deinit()
1349{
1350 repl_semisync_master.cleanup();
1351 ack_receiver.cleanup();
1352}
1353