1/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16#include <my_global.h>
17#include "semisync_master.h"
18#include "semisync_master_ack_receiver.h"
19
20#ifdef HAVE_PSI_MUTEX_INTERFACE
21extern PSI_mutex_key key_LOCK_ack_receiver;
22extern PSI_cond_key key_COND_ack_receiver;
23#endif
24#ifdef HAVE_PSI_THREAD_INTERFACE
25extern PSI_thread_key key_thread_ack_receiver;
26#endif
27extern Repl_semi_sync_master repl_semisync;
28
29/* Callback function of ack receive thread */
30pthread_handler_t ack_receive_handler(void *arg)
31{
32 Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
33
34 my_thread_init();
35 recv->run();
36 my_thread_end();
37
38 return NULL;
39}
40
41Ack_receiver::Ack_receiver()
42{
43 DBUG_ENTER("Ack_receiver::Ack_receiver");
44
45 m_status= ST_DOWN;
46 mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex,
47 MY_MUTEX_INIT_FAST);
48 mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
49 m_pid= 0;
50
51 DBUG_VOID_RETURN;
52}
53
54void Ack_receiver::cleanup()
55{
56 DBUG_ENTER("Ack_receiver::~Ack_receiver");
57
58 stop();
59 mysql_mutex_destroy(&m_mutex);
60 mysql_cond_destroy(&m_cond);
61
62 DBUG_VOID_RETURN;
63}
64
65bool Ack_receiver::start()
66{
67 DBUG_ENTER("Ack_receiver::start");
68
69 mysql_mutex_lock(&m_mutex);
70 if(m_status == ST_DOWN)
71 {
72 pthread_attr_t attr;
73
74 m_status= ST_UP;
75
76 if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
77 pthread_attr_init(&attr) != 0 ||
78 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
79#ifndef _WIN32
80 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
81#endif
82 mysql_thread_create(key_thread_ack_receiver, &m_pid,
83 &attr, ack_receive_handler, this))
84 {
85 sql_print_error("Failed to start semi-sync ACK receiver thread, "
86 " could not create thread(errno:%d)", errno);
87
88 m_status= ST_DOWN;
89 mysql_mutex_unlock(&m_mutex);
90
91 DBUG_RETURN(true);
92 }
93 (void) pthread_attr_destroy(&attr);
94 }
95 mysql_mutex_unlock(&m_mutex);
96
97 DBUG_RETURN(false);
98}
99
100void Ack_receiver::stop()
101{
102 DBUG_ENTER("Ack_receiver::stop");
103
104 mysql_mutex_lock(&m_mutex);
105 if (m_status == ST_UP)
106 {
107 m_status= ST_STOPPING;
108 mysql_cond_broadcast(&m_cond);
109
110 while (m_status == ST_STOPPING)
111 mysql_cond_wait(&m_cond, &m_mutex);
112
113 DBUG_ASSERT(m_status == ST_DOWN);
114
115 m_pid= 0;
116 }
117 mysql_mutex_unlock(&m_mutex);
118
119 DBUG_VOID_RETURN;
120}
121
122bool Ack_receiver::add_slave(THD *thd)
123{
124 Slave *slave;
125 DBUG_ENTER("Ack_receiver::add_slave");
126
127 if (!(slave= new Slave))
128 DBUG_RETURN(true);
129
130 slave->thd= thd;
131 slave->vio= *thd->net.vio;
132 slave->vio.mysql_socket.m_psi= NULL;
133 slave->vio.read_timeout= 1;
134
135 mysql_mutex_lock(&m_mutex);
136 m_slaves.push_back(slave);
137 m_slaves_changed= true;
138 mysql_cond_broadcast(&m_cond);
139 mysql_mutex_unlock(&m_mutex);
140
141 DBUG_RETURN(false);
142}
143
144void Ack_receiver::remove_slave(THD *thd)
145{
146 I_List_iterator<Slave> it(m_slaves);
147 Slave *slave;
148 DBUG_ENTER("Ack_receiver::remove_slave");
149
150 mysql_mutex_lock(&m_mutex);
151
152 while ((slave= it++))
153 {
154 if (slave->thd == thd)
155 {
156 delete slave;
157 m_slaves_changed= true;
158 break;
159 }
160 }
161 mysql_mutex_unlock(&m_mutex);
162
163 DBUG_VOID_RETURN;
164}
165
166inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
167{
168 MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
169}
170
171inline void Ack_receiver::wait_for_slave_connection()
172{
173 set_stage_info(stage_waiting_for_semi_sync_slave);
174 mysql_cond_wait(&m_cond, &m_mutex);
175}
176
177my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
178{
179 my_socket max_fd= INVALID_SOCKET;
180 Slave *slave;
181 I_List_iterator<Slave> it(m_slaves);
182
183 *count= 0;
184 FD_ZERO(fds);
185 while ((slave= it++))
186 {
187 (*count)++;
188 my_socket fd= slave->sock_fd();
189 max_fd= (fd > max_fd ? fd : max_fd);
190 FD_SET(fd, fds);
191 }
192
193 return max_fd;
194}
195
196/* Auxilary function to initialize a NET object with given net buffer. */
197static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
198{
199 memset(net, 0, sizeof(NET));
200 net->max_packet= buff_len;
201 net->buff= buff;
202 net->buff_end= buff + buff_len;
203 net->read_pos= net->buff;
204}
205
206void Ack_receiver::run()
207{
208 // skip LOCK_global_system_variables due to the 3rd arg
209 THD *thd= new THD(next_thread_id(), false, true);
210 NET net;
211 unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
212 fd_set read_fds;
213 my_socket max_fd= INVALID_SOCKET;
214 Slave *slave;
215
216 my_thread_init();
217
218 DBUG_ENTER("Ack_receiver::run");
219
220 sql_print_information("Starting ack receiver thread");
221 thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
222 thd->thread_stack= (char*) &thd;
223 thd->store_globals();
224 thd->security_ctx->skip_grants();
225 thread_safe_increment32(&service_thread_count);
226 thd->set_command(COM_DAEMON);
227 init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
228
229 mysql_mutex_lock(&m_mutex);
230 m_slaves_changed= true;
231 mysql_mutex_unlock(&m_mutex);
232
233 while (1)
234 {
235 fd_set fds;
236 int ret;
237 uint slave_count;
238
239 mysql_mutex_lock(&m_mutex);
240 if (unlikely(m_status == ST_STOPPING))
241 goto end;
242
243 set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
244 if (unlikely(m_slaves_changed))
245 {
246 if (unlikely(m_slaves.is_empty()))
247 {
248 wait_for_slave_connection();
249 mysql_mutex_unlock(&m_mutex);
250 continue;
251 }
252
253 max_fd= get_slave_sockets(&read_fds, &slave_count);
254 m_slaves_changed= false;
255 DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,(int) max_fd));
256 }
257
258 struct timeval tv= {1, 0};
259 fds= read_fds;
260 /* select requires max fd + 1 for the first argument */
261 ret= select((int)(max_fd+1), &fds, NULL, NULL, &tv);
262 if (ret <= 0)
263 {
264 mysql_mutex_unlock(&m_mutex);
265
266 ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
267
268 if (ret == -1)
269 sql_print_information("Failed to select() on semi-sync dump sockets, "
270 "error: errno=%d", socket_errno);
271 /* Sleep 1us, so other threads can catch the m_mutex easily. */
272 my_sleep(1);
273 continue;
274 }
275
276 set_stage_info(stage_reading_semi_sync_ack);
277 I_List_iterator<Slave> it(m_slaves);
278
279 while ((slave= it++))
280 {
281 if (FD_ISSET(slave->sock_fd(), &fds))
282 {
283 ulong len;
284
285 net_clear(&net, 0);
286 net.vio= &slave->vio;
287
288 len= my_net_read(&net);
289 if (likely(len != packet_error))
290 repl_semisync_master.report_reply_packet(slave->server_id(),
291 net.read_pos, len);
292 else if (net.last_errno == ER_NET_READ_ERROR)
293 FD_CLR(slave->sock_fd(), &read_fds);
294 }
295 }
296 mysql_mutex_unlock(&m_mutex);
297 }
298end:
299 sql_print_information("Stopping ack receiver thread");
300 m_status= ST_DOWN;
301 delete thd;
302 thread_safe_decrement32(&service_thread_count);
303 signal_thd_deleted();
304 mysql_cond_broadcast(&m_cond);
305 mysql_mutex_unlock(&m_mutex);
306 DBUG_VOID_RETURN;
307}
308