1 | /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
15 | |
16 | #include <my_global.h> |
17 | #include "semisync_master.h" |
18 | #include "semisync_master_ack_receiver.h" |
19 | |
20 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
21 | extern PSI_mutex_key key_LOCK_ack_receiver; |
22 | extern PSI_cond_key key_COND_ack_receiver; |
23 | #endif |
24 | #ifdef HAVE_PSI_THREAD_INTERFACE |
25 | extern PSI_thread_key key_thread_ack_receiver; |
26 | #endif |
27 | extern Repl_semi_sync_master repl_semisync; |
28 | |
29 | /* Callback function of ack receive thread */ |
30 | pthread_handler_t ack_receive_handler(void *arg) |
31 | { |
32 | Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); |
33 | |
34 | my_thread_init(); |
35 | recv->run(); |
36 | my_thread_end(); |
37 | |
38 | return NULL; |
39 | } |
40 | |
41 | Ack_receiver::Ack_receiver() |
42 | { |
43 | DBUG_ENTER("Ack_receiver::Ack_receiver" ); |
44 | |
45 | m_status= ST_DOWN; |
46 | mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, |
47 | MY_MUTEX_INIT_FAST); |
48 | mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); |
49 | m_pid= 0; |
50 | |
51 | DBUG_VOID_RETURN; |
52 | } |
53 | |
54 | void Ack_receiver::cleanup() |
55 | { |
56 | DBUG_ENTER("Ack_receiver::~Ack_receiver" ); |
57 | |
58 | stop(); |
59 | mysql_mutex_destroy(&m_mutex); |
60 | mysql_cond_destroy(&m_cond); |
61 | |
62 | DBUG_VOID_RETURN; |
63 | } |
64 | |
65 | bool Ack_receiver::start() |
66 | { |
67 | DBUG_ENTER("Ack_receiver::start" ); |
68 | |
69 | mysql_mutex_lock(&m_mutex); |
70 | if(m_status == ST_DOWN) |
71 | { |
72 | pthread_attr_t attr; |
73 | |
74 | m_status= ST_UP; |
75 | |
76 | if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure" , 1, 0) || |
77 | pthread_attr_init(&attr) != 0 || |
78 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || |
79 | #ifndef _WIN32 |
80 | pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || |
81 | #endif |
82 | mysql_thread_create(key_thread_ack_receiver, &m_pid, |
83 | &attr, ack_receive_handler, this)) |
84 | { |
85 | sql_print_error("Failed to start semi-sync ACK receiver thread, " |
86 | " could not create thread(errno:%d)" , errno); |
87 | |
88 | m_status= ST_DOWN; |
89 | mysql_mutex_unlock(&m_mutex); |
90 | |
91 | DBUG_RETURN(true); |
92 | } |
93 | (void) pthread_attr_destroy(&attr); |
94 | } |
95 | mysql_mutex_unlock(&m_mutex); |
96 | |
97 | DBUG_RETURN(false); |
98 | } |
99 | |
100 | void Ack_receiver::stop() |
101 | { |
102 | DBUG_ENTER("Ack_receiver::stop" ); |
103 | |
104 | mysql_mutex_lock(&m_mutex); |
105 | if (m_status == ST_UP) |
106 | { |
107 | m_status= ST_STOPPING; |
108 | mysql_cond_broadcast(&m_cond); |
109 | |
110 | while (m_status == ST_STOPPING) |
111 | mysql_cond_wait(&m_cond, &m_mutex); |
112 | |
113 | DBUG_ASSERT(m_status == ST_DOWN); |
114 | |
115 | m_pid= 0; |
116 | } |
117 | mysql_mutex_unlock(&m_mutex); |
118 | |
119 | DBUG_VOID_RETURN; |
120 | } |
121 | |
122 | bool Ack_receiver::add_slave(THD *thd) |
123 | { |
124 | Slave *slave; |
125 | DBUG_ENTER("Ack_receiver::add_slave" ); |
126 | |
127 | if (!(slave= new Slave)) |
128 | DBUG_RETURN(true); |
129 | |
130 | slave->thd= thd; |
131 | slave->vio= *thd->net.vio; |
132 | slave->vio.mysql_socket.m_psi= NULL; |
133 | slave->vio.read_timeout= 1; |
134 | |
135 | mysql_mutex_lock(&m_mutex); |
136 | m_slaves.push_back(slave); |
137 | m_slaves_changed= true; |
138 | mysql_cond_broadcast(&m_cond); |
139 | mysql_mutex_unlock(&m_mutex); |
140 | |
141 | DBUG_RETURN(false); |
142 | } |
143 | |
144 | void Ack_receiver::remove_slave(THD *thd) |
145 | { |
146 | I_List_iterator<Slave> it(m_slaves); |
147 | Slave *slave; |
148 | DBUG_ENTER("Ack_receiver::remove_slave" ); |
149 | |
150 | mysql_mutex_lock(&m_mutex); |
151 | |
152 | while ((slave= it++)) |
153 | { |
154 | if (slave->thd == thd) |
155 | { |
156 | delete slave; |
157 | m_slaves_changed= true; |
158 | break; |
159 | } |
160 | } |
161 | mysql_mutex_unlock(&m_mutex); |
162 | |
163 | DBUG_VOID_RETURN; |
164 | } |
165 | |
166 | inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) |
167 | { |
168 | MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); |
169 | } |
170 | |
171 | inline void Ack_receiver::wait_for_slave_connection() |
172 | { |
173 | set_stage_info(stage_waiting_for_semi_sync_slave); |
174 | mysql_cond_wait(&m_cond, &m_mutex); |
175 | } |
176 | |
177 | my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) |
178 | { |
179 | my_socket max_fd= INVALID_SOCKET; |
180 | Slave *slave; |
181 | I_List_iterator<Slave> it(m_slaves); |
182 | |
183 | *count= 0; |
184 | FD_ZERO(fds); |
185 | while ((slave= it++)) |
186 | { |
187 | (*count)++; |
188 | my_socket fd= slave->sock_fd(); |
189 | max_fd= (fd > max_fd ? fd : max_fd); |
190 | FD_SET(fd, fds); |
191 | } |
192 | |
193 | return max_fd; |
194 | } |
195 | |
196 | /* Auxilary function to initialize a NET object with given net buffer. */ |
197 | static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) |
198 | { |
199 | memset(net, 0, sizeof(NET)); |
200 | net->max_packet= buff_len; |
201 | net->buff= buff; |
202 | net->buff_end= buff + buff_len; |
203 | net->read_pos= net->buff; |
204 | } |
205 | |
206 | void Ack_receiver::run() |
207 | { |
208 | // skip LOCK_global_system_variables due to the 3rd arg |
209 | THD *thd= new THD(next_thread_id(), false, true); |
210 | NET net; |
211 | unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; |
212 | fd_set read_fds; |
213 | my_socket max_fd= INVALID_SOCKET; |
214 | Slave *slave; |
215 | |
216 | my_thread_init(); |
217 | |
218 | DBUG_ENTER("Ack_receiver::run" ); |
219 | |
220 | sql_print_information("Starting ack receiver thread" ); |
221 | thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; |
222 | thd->thread_stack= (char*) &thd; |
223 | thd->store_globals(); |
224 | thd->security_ctx->skip_grants(); |
225 | thread_safe_increment32(&service_thread_count); |
226 | thd->set_command(COM_DAEMON); |
227 | init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); |
228 | |
229 | mysql_mutex_lock(&m_mutex); |
230 | m_slaves_changed= true; |
231 | mysql_mutex_unlock(&m_mutex); |
232 | |
233 | while (1) |
234 | { |
235 | fd_set fds; |
236 | int ret; |
237 | uint slave_count; |
238 | |
239 | mysql_mutex_lock(&m_mutex); |
240 | if (unlikely(m_status == ST_STOPPING)) |
241 | goto end; |
242 | |
243 | set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); |
244 | if (unlikely(m_slaves_changed)) |
245 | { |
246 | if (unlikely(m_slaves.is_empty())) |
247 | { |
248 | wait_for_slave_connection(); |
249 | mysql_mutex_unlock(&m_mutex); |
250 | continue; |
251 | } |
252 | |
253 | max_fd= get_slave_sockets(&read_fds, &slave_count); |
254 | m_slaves_changed= false; |
255 | DBUG_PRINT("info" , ("fd count %u, max_fd %d" , slave_count,(int) max_fd)); |
256 | } |
257 | |
258 | struct timeval tv= {1, 0}; |
259 | fds= read_fds; |
260 | /* select requires max fd + 1 for the first argument */ |
261 | ret= select((int)(max_fd+1), &fds, NULL, NULL, &tv); |
262 | if (ret <= 0) |
263 | { |
264 | mysql_mutex_unlock(&m_mutex); |
265 | |
266 | ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error" , -1, ret); |
267 | |
268 | if (ret == -1) |
269 | sql_print_information("Failed to select() on semi-sync dump sockets, " |
270 | "error: errno=%d" , socket_errno); |
271 | /* Sleep 1us, so other threads can catch the m_mutex easily. */ |
272 | my_sleep(1); |
273 | continue; |
274 | } |
275 | |
276 | set_stage_info(stage_reading_semi_sync_ack); |
277 | I_List_iterator<Slave> it(m_slaves); |
278 | |
279 | while ((slave= it++)) |
280 | { |
281 | if (FD_ISSET(slave->sock_fd(), &fds)) |
282 | { |
283 | ulong len; |
284 | |
285 | net_clear(&net, 0); |
286 | net.vio= &slave->vio; |
287 | |
288 | len= my_net_read(&net); |
289 | if (likely(len != packet_error)) |
290 | repl_semisync_master.report_reply_packet(slave->server_id(), |
291 | net.read_pos, len); |
292 | else if (net.last_errno == ER_NET_READ_ERROR) |
293 | FD_CLR(slave->sock_fd(), &read_fds); |
294 | } |
295 | } |
296 | mysql_mutex_unlock(&m_mutex); |
297 | } |
298 | end: |
299 | sql_print_information("Stopping ack receiver thread" ); |
300 | m_status= ST_DOWN; |
301 | delete thd; |
302 | thread_safe_decrement32(&service_thread_count); |
303 | signal_thd_deleted(); |
304 | mysql_cond_broadcast(&m_cond); |
305 | mysql_mutex_unlock(&m_mutex); |
306 | DBUG_VOID_RETURN; |
307 | } |
308 | |