1/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17
18#include <my_global.h>
19#include "semisync_slave.h"
20
21Repl_semi_sync_slave repl_semisync_slave;
22
23my_bool rpl_semi_sync_slave_enabled= 0;
24
25char rpl_semi_sync_slave_delay_master;
26my_bool rpl_semi_sync_slave_status= 0;
27ulong rpl_semi_sync_slave_trace_level;
28
29/*
30 indicate whether or not the slave should send a reply to the master.
31
32 This is set to true in repl_semi_slave_read_event if the current
33 event read is the last event of a transaction. And the value is
34 checked in repl_semi_slave_queue_event.
35*/
36bool semi_sync_need_reply= false;
37unsigned int rpl_semi_sync_slave_kill_conn_timeout;
38unsigned long long rpl_semi_sync_slave_send_ack = 0;
39
40int Repl_semi_sync_slave::init_object()
41{
42 int result= 0;
43
44 m_init_done = true;
45
46 /* References to the parameter works after set_options(). */
47 set_slave_enabled(rpl_semi_sync_slave_enabled);
48 set_trace_level(rpl_semi_sync_slave_trace_level);
49 set_delay_master(rpl_semi_sync_slave_delay_master);
50 set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
51
52 return result;
53}
54
55int Repl_semi_sync_slave::slave_read_sync_header(const char *header,
56 unsigned long total_len,
57 int *semi_flags,
58 const char **payload,
59 unsigned long *payload_len)
60{
61 int read_res = 0;
62 DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header");
63
64 if (rpl_semi_sync_slave_status)
65 {
66 if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
67 && (unsigned char)(header[0]) == k_packet_magic_num)
68 {
69 semi_sync_need_reply = (header[1] & k_packet_flag_sync);
70 *payload_len = total_len - 2;
71 *payload = header + 2;
72
73 DBUG_PRINT("semisync", ("%s: reply - %d",
74 "Repl_semi_sync_slave::slave_read_sync_header",
75 semi_sync_need_reply));
76
77 if (semi_sync_need_reply)
78 *semi_flags |= SEMI_SYNC_NEED_ACK;
79 if (is_delay_master())
80 *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
81 }
82 else
83 {
84 sql_print_error("Missing magic number for semi-sync packet, packet "
85 "len: %lu", total_len);
86 read_res = -1;
87 }
88 } else {
89 *payload= header;
90 *payload_len= total_len;
91 }
92
93 DBUG_RETURN(read_res);
94}
95
96int Repl_semi_sync_slave::slave_start(Master_info *mi)
97{
98 bool semi_sync= get_slave_enabled();
99
100 sql_print_information("Slave I/O thread: Start %s replication to\
101 master '%s@%s:%d' in log '%s' at position %lu",
102 semi_sync ? "semi-sync" : "asynchronous",
103 const_cast<char *>(mi->user), mi->host, mi->port,
104 const_cast<char *>(mi->master_log_name),
105 (unsigned long)(mi->master_log_pos));
106
107 if (semi_sync && !rpl_semi_sync_slave_status)
108 rpl_semi_sync_slave_status= 1;
109
110 /*clear the counter*/
111 rpl_semi_sync_slave_send_ack= 0;
112 return 0;
113}
114
115int Repl_semi_sync_slave::slave_stop(Master_info *mi)
116{
117 if (rpl_semi_sync_slave_status)
118 rpl_semi_sync_slave_status= 0;
119 if (get_slave_enabled())
120 kill_connection(mi->mysql);
121 return 0;
122}
123
124int Repl_semi_sync_slave::reset_slave(Master_info *mi)
125{
126 return 0;
127}
128
129void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
130{
131 if (!mysql)
132 return;
133
134 char kill_buffer[30];
135 MYSQL *kill_mysql = NULL;
136 kill_mysql = mysql_init(kill_mysql);
137 mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout);
138 mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout);
139 mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout);
140
141 bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
142 mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
143 if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
144 {
145 sql_print_information("cannot connect to master to kill slave io_thread's "
146 "connection");
147 if (!ret)
148 mysql_close(kill_mysql);
149 return;
150 }
151 size_t kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
152 mysql->thread_id);
153 mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length);
154 mysql_close(kill_mysql);
155}
156
157int Repl_semi_sync_slave::request_transmit(Master_info *mi)
158{
159 MYSQL *mysql= mi->mysql;
160 MYSQL_RES *res= 0;
161 MYSQL_ROW row;
162 const char *query;
163
164 if (!get_slave_enabled())
165 return 0;
166
167 query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
168 if (mysql_real_query(mysql, query, (ulong)strlen(query)) ||
169 !(res= mysql_store_result(mysql)))
170 {
171 sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql));
172 return 1;
173 }
174
175 row= mysql_fetch_row(res);
176 if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0)
177 || !row)
178 {
179 /* Master does not support semi-sync */
180 sql_print_warning("Master server does not support semi-sync, "
181 "fallback to asynchronous replication");
182 rpl_semi_sync_slave_status= 0;
183 mysql_free_result(res);
184 return 0;
185 }
186 mysql_free_result(res);
187
188 /*
189 Tell master dump thread that we want to do semi-sync
190 replication
191 */
192 query= "SET @rpl_semi_sync_slave= 1";
193 if (mysql_real_query(mysql, query, (ulong)strlen(query)))
194 {
195 sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
196 return 1;
197 }
198 mysql_free_result(mysql_store_result(mysql));
199 rpl_semi_sync_slave_status= 1;
200
201 return 0;
202}
203
204int Repl_semi_sync_slave::slave_reply(Master_info *mi)
205{
206 MYSQL* mysql= mi->mysql;
207 const char *binlog_filename= const_cast<char *>(mi->master_log_name);
208 my_off_t binlog_filepos= mi->master_log_pos;
209
210 NET *net= &mysql->net;
211 uchar reply_buffer[REPLY_MAGIC_NUM_LEN
212 + REPLY_BINLOG_POS_LEN
213 + REPLY_BINLOG_NAME_LEN];
214 int reply_res = 0;
215 size_t name_len = strlen(binlog_filename);
216
217 DBUG_ENTER("Repl_semi_sync_slave::slave_reply");
218
219 if (rpl_semi_sync_slave_status && semi_sync_need_reply)
220 {
221 /* Prepare the buffer of the reply. */
222 reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num;
223 int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
224 memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
225 binlog_filename,
226 name_len + 1 /* including trailing '\0' */);
227
228 DBUG_PRINT("semisync", ("%s: reply (%s, %lu)",
229 "Repl_semi_sync_slave::slave_reply",
230 binlog_filename, (ulong)binlog_filepos));
231
232 net_clear(net, 0);
233 /* Send the reply. */
234 reply_res = my_net_write(net, reply_buffer,
235 name_len + REPLY_BINLOG_NAME_OFFSET);
236 if (!reply_res)
237 {
238 reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net));
239 if (reply_res)
240 sql_print_error("Semi-sync slave net_flush() reply failed");
241 rpl_semi_sync_slave_send_ack++;
242 }
243 else
244 {
245 sql_print_error("Semi-sync slave send reply failed: %s (%d)",
246 net->last_error, net->last_errno);
247 }
248 }
249
250 DBUG_RETURN(reply_res);
251}
252