| 1 | /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. |
| 2 | |
| 3 | This program is free software; you can redistribute it and/or modify |
| 4 | it under the terms of the GNU General Public License as published by |
| 5 | the Free Software Foundation; version 2 of the License. |
| 6 | |
| 7 | This program is distributed in the hope that it will be useful, |
| 8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 10 | GNU General Public License for more details. |
| 11 | |
| 12 | You should have received a copy of the GNU General Public License |
| 13 | along with this program; if not, write to the Free Software |
| 14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
| 15 | |
| 16 | #include <my_global.h> |
| 17 | #include "semisync_master.h" |
| 18 | #include "semisync_master_ack_receiver.h" |
| 19 | |
| 20 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
| 21 | extern PSI_mutex_key key_LOCK_ack_receiver; |
| 22 | extern PSI_cond_key key_COND_ack_receiver; |
| 23 | #endif |
| 24 | #ifdef HAVE_PSI_THREAD_INTERFACE |
| 25 | extern PSI_thread_key key_thread_ack_receiver; |
| 26 | #endif |
| 27 | extern Repl_semi_sync_master repl_semisync; |
| 28 | |
| 29 | /* Callback function of ack receive thread */ |
| 30 | pthread_handler_t ack_receive_handler(void *arg) |
| 31 | { |
| 32 | Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); |
| 33 | |
| 34 | my_thread_init(); |
| 35 | recv->run(); |
| 36 | my_thread_end(); |
| 37 | |
| 38 | return NULL; |
| 39 | } |
| 40 | |
| 41 | Ack_receiver::Ack_receiver() |
| 42 | { |
| 43 | DBUG_ENTER("Ack_receiver::Ack_receiver" ); |
| 44 | |
| 45 | m_status= ST_DOWN; |
| 46 | mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, |
| 47 | MY_MUTEX_INIT_FAST); |
| 48 | mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); |
| 49 | m_pid= 0; |
| 50 | |
| 51 | DBUG_VOID_RETURN; |
| 52 | } |
| 53 | |
| 54 | void Ack_receiver::cleanup() |
| 55 | { |
| 56 | DBUG_ENTER("Ack_receiver::~Ack_receiver" ); |
| 57 | |
| 58 | stop(); |
| 59 | mysql_mutex_destroy(&m_mutex); |
| 60 | mysql_cond_destroy(&m_cond); |
| 61 | |
| 62 | DBUG_VOID_RETURN; |
| 63 | } |
| 64 | |
| 65 | bool Ack_receiver::start() |
| 66 | { |
| 67 | DBUG_ENTER("Ack_receiver::start" ); |
| 68 | |
| 69 | mysql_mutex_lock(&m_mutex); |
| 70 | if(m_status == ST_DOWN) |
| 71 | { |
| 72 | pthread_attr_t attr; |
| 73 | |
| 74 | m_status= ST_UP; |
| 75 | |
| 76 | if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure" , 1, 0) || |
| 77 | pthread_attr_init(&attr) != 0 || |
| 78 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || |
| 79 | #ifndef _WIN32 |
| 80 | pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || |
| 81 | #endif |
| 82 | mysql_thread_create(key_thread_ack_receiver, &m_pid, |
| 83 | &attr, ack_receive_handler, this)) |
| 84 | { |
| 85 | sql_print_error("Failed to start semi-sync ACK receiver thread, " |
| 86 | " could not create thread(errno:%d)" , errno); |
| 87 | |
| 88 | m_status= ST_DOWN; |
| 89 | mysql_mutex_unlock(&m_mutex); |
| 90 | |
| 91 | DBUG_RETURN(true); |
| 92 | } |
| 93 | (void) pthread_attr_destroy(&attr); |
| 94 | } |
| 95 | mysql_mutex_unlock(&m_mutex); |
| 96 | |
| 97 | DBUG_RETURN(false); |
| 98 | } |
| 99 | |
| 100 | void Ack_receiver::stop() |
| 101 | { |
| 102 | DBUG_ENTER("Ack_receiver::stop" ); |
| 103 | |
| 104 | mysql_mutex_lock(&m_mutex); |
| 105 | if (m_status == ST_UP) |
| 106 | { |
| 107 | m_status= ST_STOPPING; |
| 108 | mysql_cond_broadcast(&m_cond); |
| 109 | |
| 110 | while (m_status == ST_STOPPING) |
| 111 | mysql_cond_wait(&m_cond, &m_mutex); |
| 112 | |
| 113 | DBUG_ASSERT(m_status == ST_DOWN); |
| 114 | |
| 115 | m_pid= 0; |
| 116 | } |
| 117 | mysql_mutex_unlock(&m_mutex); |
| 118 | |
| 119 | DBUG_VOID_RETURN; |
| 120 | } |
| 121 | |
| 122 | bool Ack_receiver::add_slave(THD *thd) |
| 123 | { |
| 124 | Slave *slave; |
| 125 | DBUG_ENTER("Ack_receiver::add_slave" ); |
| 126 | |
| 127 | if (!(slave= new Slave)) |
| 128 | DBUG_RETURN(true); |
| 129 | |
| 130 | slave->thd= thd; |
| 131 | slave->vio= *thd->net.vio; |
| 132 | slave->vio.mysql_socket.m_psi= NULL; |
| 133 | slave->vio.read_timeout= 1; |
| 134 | |
| 135 | mysql_mutex_lock(&m_mutex); |
| 136 | m_slaves.push_back(slave); |
| 137 | m_slaves_changed= true; |
| 138 | mysql_cond_broadcast(&m_cond); |
| 139 | mysql_mutex_unlock(&m_mutex); |
| 140 | |
| 141 | DBUG_RETURN(false); |
| 142 | } |
| 143 | |
| 144 | void Ack_receiver::remove_slave(THD *thd) |
| 145 | { |
| 146 | I_List_iterator<Slave> it(m_slaves); |
| 147 | Slave *slave; |
| 148 | DBUG_ENTER("Ack_receiver::remove_slave" ); |
| 149 | |
| 150 | mysql_mutex_lock(&m_mutex); |
| 151 | |
| 152 | while ((slave= it++)) |
| 153 | { |
| 154 | if (slave->thd == thd) |
| 155 | { |
| 156 | delete slave; |
| 157 | m_slaves_changed= true; |
| 158 | break; |
| 159 | } |
| 160 | } |
| 161 | mysql_mutex_unlock(&m_mutex); |
| 162 | |
| 163 | DBUG_VOID_RETURN; |
| 164 | } |
| 165 | |
| 166 | inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) |
| 167 | { |
| 168 | MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); |
| 169 | } |
| 170 | |
| 171 | inline void Ack_receiver::wait_for_slave_connection() |
| 172 | { |
| 173 | set_stage_info(stage_waiting_for_semi_sync_slave); |
| 174 | mysql_cond_wait(&m_cond, &m_mutex); |
| 175 | } |
| 176 | |
| 177 | my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) |
| 178 | { |
| 179 | my_socket max_fd= INVALID_SOCKET; |
| 180 | Slave *slave; |
| 181 | I_List_iterator<Slave> it(m_slaves); |
| 182 | |
| 183 | *count= 0; |
| 184 | FD_ZERO(fds); |
| 185 | while ((slave= it++)) |
| 186 | { |
| 187 | (*count)++; |
| 188 | my_socket fd= slave->sock_fd(); |
| 189 | max_fd= (fd > max_fd ? fd : max_fd); |
| 190 | FD_SET(fd, fds); |
| 191 | } |
| 192 | |
| 193 | return max_fd; |
| 194 | } |
| 195 | |
| 196 | /* Auxilary function to initialize a NET object with given net buffer. */ |
| 197 | static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) |
| 198 | { |
| 199 | memset(net, 0, sizeof(NET)); |
| 200 | net->max_packet= buff_len; |
| 201 | net->buff= buff; |
| 202 | net->buff_end= buff + buff_len; |
| 203 | net->read_pos= net->buff; |
| 204 | } |
| 205 | |
| 206 | void Ack_receiver::run() |
| 207 | { |
| 208 | // skip LOCK_global_system_variables due to the 3rd arg |
| 209 | THD *thd= new THD(next_thread_id(), false, true); |
| 210 | NET net; |
| 211 | unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; |
| 212 | fd_set read_fds; |
| 213 | my_socket max_fd= INVALID_SOCKET; |
| 214 | Slave *slave; |
| 215 | |
| 216 | my_thread_init(); |
| 217 | |
| 218 | DBUG_ENTER("Ack_receiver::run" ); |
| 219 | |
| 220 | sql_print_information("Starting ack receiver thread" ); |
| 221 | thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; |
| 222 | thd->thread_stack= (char*) &thd; |
| 223 | thd->store_globals(); |
| 224 | thd->security_ctx->skip_grants(); |
| 225 | thread_safe_increment32(&service_thread_count); |
| 226 | thd->set_command(COM_DAEMON); |
| 227 | init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); |
| 228 | |
| 229 | mysql_mutex_lock(&m_mutex); |
| 230 | m_slaves_changed= true; |
| 231 | mysql_mutex_unlock(&m_mutex); |
| 232 | |
| 233 | while (1) |
| 234 | { |
| 235 | fd_set fds; |
| 236 | int ret; |
| 237 | uint slave_count; |
| 238 | |
| 239 | mysql_mutex_lock(&m_mutex); |
| 240 | if (unlikely(m_status == ST_STOPPING)) |
| 241 | goto end; |
| 242 | |
| 243 | set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); |
| 244 | if (unlikely(m_slaves_changed)) |
| 245 | { |
| 246 | if (unlikely(m_slaves.is_empty())) |
| 247 | { |
| 248 | wait_for_slave_connection(); |
| 249 | mysql_mutex_unlock(&m_mutex); |
| 250 | continue; |
| 251 | } |
| 252 | |
| 253 | max_fd= get_slave_sockets(&read_fds, &slave_count); |
| 254 | m_slaves_changed= false; |
| 255 | DBUG_PRINT("info" , ("fd count %u, max_fd %d" , slave_count,(int) max_fd)); |
| 256 | } |
| 257 | |
| 258 | struct timeval tv= {1, 0}; |
| 259 | fds= read_fds; |
| 260 | /* select requires max fd + 1 for the first argument */ |
| 261 | ret= select((int)(max_fd+1), &fds, NULL, NULL, &tv); |
| 262 | if (ret <= 0) |
| 263 | { |
| 264 | mysql_mutex_unlock(&m_mutex); |
| 265 | |
| 266 | ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error" , -1, ret); |
| 267 | |
| 268 | if (ret == -1) |
| 269 | sql_print_information("Failed to select() on semi-sync dump sockets, " |
| 270 | "error: errno=%d" , socket_errno); |
| 271 | /* Sleep 1us, so other threads can catch the m_mutex easily. */ |
| 272 | my_sleep(1); |
| 273 | continue; |
| 274 | } |
| 275 | |
| 276 | set_stage_info(stage_reading_semi_sync_ack); |
| 277 | I_List_iterator<Slave> it(m_slaves); |
| 278 | |
| 279 | while ((slave= it++)) |
| 280 | { |
| 281 | if (FD_ISSET(slave->sock_fd(), &fds)) |
| 282 | { |
| 283 | ulong len; |
| 284 | |
| 285 | net_clear(&net, 0); |
| 286 | net.vio= &slave->vio; |
| 287 | |
| 288 | len= my_net_read(&net); |
| 289 | if (likely(len != packet_error)) |
| 290 | repl_semisync_master.report_reply_packet(slave->server_id(), |
| 291 | net.read_pos, len); |
| 292 | else if (net.last_errno == ER_NET_READ_ERROR) |
| 293 | FD_CLR(slave->sock_fd(), &read_fds); |
| 294 | } |
| 295 | } |
| 296 | mysql_mutex_unlock(&m_mutex); |
| 297 | } |
| 298 | end: |
| 299 | sql_print_information("Stopping ack receiver thread" ); |
| 300 | m_status= ST_DOWN; |
| 301 | delete thd; |
| 302 | thread_safe_decrement32(&service_thread_count); |
| 303 | signal_thd_deleted(); |
| 304 | mysql_cond_broadcast(&m_cond); |
| 305 | mysql_mutex_unlock(&m_mutex); |
| 306 | DBUG_VOID_RETURN; |
| 307 | } |
| 308 | |