| 1 | /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. |
| 2 | |
| 3 | This program is free software; you can redistribute it and/or modify |
| 4 | it under the terms of the GNU General Public License as published by |
| 5 | the Free Software Foundation; version 2 of the License. |
| 6 | |
| 7 | This program is distributed in the hope that it will be useful, |
| 8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 10 | GNU General Public License for more details. |
| 11 | |
| 12 | You should have received a copy of the GNU General Public License |
| 13 | along with this program; if not, write to the Free Software |
| 14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
| 15 | |
| 16 | #ifndef RPL_GTID_H |
| 17 | #define RPL_GTID_H |
| 18 | |
| 19 | #include "hash.h" |
| 20 | #include "queues.h" |
| 21 | |
| 22 | |
| 23 | /* Definitions for MariaDB global transaction ID (GTID). */ |
| 24 | |
| 25 | |
| 26 | extern const LEX_CSTRING rpl_gtid_slave_state_table_name; |
| 27 | |
| 28 | class String; |
| 29 | |
| 30 | struct rpl_gtid |
| 31 | { |
| 32 | uint32 domain_id; |
| 33 | uint32 server_id; |
| 34 | uint64 seq_no; |
| 35 | }; |
| 36 | |
| 37 | inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs) |
| 38 | { |
| 39 | return |
| 40 | lhs.domain_id == rhs.domain_id && |
| 41 | lhs.server_id == rhs.server_id && |
| 42 | lhs.seq_no == rhs.seq_no; |
| 43 | }; |
| 44 | |
| 45 | enum enum_gtid_skip_type { |
| 46 | GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION |
| 47 | }; |
| 48 | |
| 49 | |
| 50 | /* |
| 51 | Structure to keep track of threads waiting in MASTER_GTID_WAIT(). |
| 52 | |
| 53 | Since replication is (mostly) single-threaded, we want to minimise the |
| 54 | performance impact on that from MASTER_GTID_WAIT(). To achieve this, we |
| 55 | are careful to keep the common lock between replication threads and |
| 56 | MASTER_GTID_WAIT threads held for as short as possible. We keep only |
| 57 | a single thread waiting to be notified by the replication threads; this |
| 58 | thread then handles all the (potentially heavy) lifting of dealing with |
| 59 | all current waiting threads. |
| 60 | */ |
| 61 | struct gtid_waiting { |
| 62 | /* Elements in the hash, basically a priority queue for each domain. */ |
| 63 | struct hash_element { |
| 64 | QUEUE queue; |
| 65 | uint32 domain_id; |
| 66 | }; |
| 67 | /* A priority queue to handle waiters in one domain in seq_no order. */ |
| 68 | struct queue_element { |
| 69 | uint64 wait_seq_no; |
| 70 | THD *thd; |
| 71 | int queue_idx; |
| 72 | /* |
| 73 | do_small_wait is true if we have responsibility for ensuring that there |
| 74 | is a small waiter. |
| 75 | */ |
| 76 | bool do_small_wait; |
| 77 | /* |
| 78 | The flag `done' is set when the wait is completed (either due to reaching |
| 79 | the position waited for, or due to timeout or kill). The queue_element |
| 80 | is in the queue if and only if `done' is true. |
| 81 | */ |
| 82 | bool done; |
| 83 | }; |
| 84 | |
| 85 | mysql_mutex_t LOCK_gtid_waiting; |
| 86 | HASH hash; |
| 87 | |
| 88 | void init(); |
| 89 | void destroy(); |
| 90 | hash_element *get_entry(uint32 domain_id); |
| 91 | int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us); |
| 92 | void promote_new_waiter(gtid_waiting::hash_element *he); |
| 93 | int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until); |
| 94 | void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he); |
| 95 | int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he, |
| 96 | queue_element *elem); |
| 97 | void remove_from_wait_queue(hash_element *he, queue_element *elem); |
| 98 | }; |
| 99 | |
| 100 | |
| 101 | class Relay_log_info; |
| 102 | struct rpl_group_info; |
| 103 | class Gtid_list_log_event; |
| 104 | |
| 105 | /* |
| 106 | Replication slave state. |
| 107 | |
| 108 | For every independent replication stream (identified by domain_id), this |
| 109 | remembers the last gtid applied on the slave within this domain. |
| 110 | |
| 111 | Since events are always committed in-order within a single domain, this is |
| 112 | sufficient to maintain the state of the replication slave. |
| 113 | */ |
| 114 | struct rpl_slave_state |
| 115 | { |
| 116 | /* Elements in the list of GTIDs kept for each domain_id. */ |
| 117 | struct list_element |
| 118 | { |
| 119 | struct list_element *next; |
| 120 | uint64 sub_id; |
| 121 | uint64 seq_no; |
| 122 | uint32 server_id; |
| 123 | /* |
| 124 | hton of mysql.gtid_slave_pos* table used to record this GTID. |
| 125 | Can be NULL if the gtid table failed to load (eg. missing |
| 126 | mysql.gtid_slave_pos table following an upgrade). |
| 127 | */ |
| 128 | void *hton; |
| 129 | }; |
| 130 | |
| 131 | /* Elements in the HASH that hold the state for one domain_id. */ |
| 132 | struct element |
| 133 | { |
| 134 | struct list_element *list; |
| 135 | uint32 domain_id; |
| 136 | /* Highest seq_no seen so far in this domain. */ |
| 137 | uint64 highest_seq_no; |
| 138 | /* |
| 139 | If this is non-NULL, then it is the waiter responsible for the small |
| 140 | wait in MASTER_GTID_WAIT(). |
| 141 | */ |
| 142 | gtid_waiting::queue_element *gtid_waiter; |
| 143 | /* |
| 144 | If gtid_waiter is non-NULL, then this is the seq_no that its |
| 145 | MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to |
| 146 | signal the waiter on COND_wait_gtid. |
| 147 | */ |
| 148 | uint64 min_wait_seq_no; |
| 149 | mysql_cond_t COND_wait_gtid; |
| 150 | |
| 151 | /* |
| 152 | For --gtid-ignore-duplicates. The Relay_log_info that currently owns |
| 153 | this domain, and the number of worker threads that are active in it. |
| 154 | |
| 155 | The idea is that only one of multiple master connections is allowed to |
| 156 | actively apply events for a given domain. Other connections must either |
| 157 | discard the events (if the seq_no in GTID shows they have already been |
| 158 | applied), or wait to see if the current owner will apply it. |
| 159 | */ |
| 160 | const Relay_log_info *owner_rli; |
| 161 | uint32 owner_count; |
| 162 | mysql_cond_t COND_gtid_ignore_duplicates; |
| 163 | |
| 164 | list_element *grab_list() { list_element *l= list; list= NULL; return l; } |
| 165 | void add(list_element *l) |
| 166 | { |
| 167 | l->next= list; |
| 168 | list= l; |
| 169 | } |
| 170 | }; |
| 171 | |
| 172 | /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */ |
| 173 | enum gtid_pos_table_state { |
| 174 | GTID_POS_AUTO_CREATE, |
| 175 | GTID_POS_CREATE_REQUESTED, |
| 176 | GTID_POS_CREATE_IN_PROGRESS, |
| 177 | GTID_POS_AVAILABLE |
| 178 | }; |
| 179 | struct gtid_pos_table { |
| 180 | struct gtid_pos_table *next; |
| 181 | /* |
| 182 | Use a void * here, rather than handlerton *, to make explicit that we |
| 183 | are not using the value to access any functionality in the engine. It |
| 184 | is just used as an opaque value to identify which engine we are using |
| 185 | for each GTID row. |
| 186 | */ |
| 187 | void *table_hton; |
| 188 | LEX_CSTRING table_name; |
| 189 | uint8 state; |
| 190 | }; |
| 191 | |
| 192 | /* Mapping from domain_id to its element. */ |
| 193 | HASH hash; |
| 194 | /* Mutex protecting access to the state. */ |
| 195 | mysql_mutex_t LOCK_slave_state; |
| 196 | /* Auxiliary buffer to sort gtid list. */ |
| 197 | DYNAMIC_ARRAY gtid_sort_array; |
| 198 | |
| 199 | uint64 last_sub_id; |
| 200 | /* |
| 201 | List of tables available for durably storing the slave GTID position. |
| 202 | |
| 203 | Accesses to this table is protected by LOCK_slave_state. However for |
| 204 | efficiency, there is also a provision for read access to it from a running |
| 205 | slave without lock. |
| 206 | |
| 207 | An element can be added at the head of a list by storing the new |
| 208 | gtid_pos_tables pointer atomically with release semantics, to ensure that |
| 209 | the next pointer of the new element is visible to readers of the new list. |
| 210 | Other changes (like deleting or replacing elements) must happen only while |
| 211 | all SQL driver threads are stopped. LOCK_slave_state must be held in any |
| 212 | case. |
| 213 | |
| 214 | The list can be read without lock by an SQL driver thread or worker thread |
| 215 | by reading the gtid_pos_tables pointer atomically with acquire semantics, |
| 216 | to ensure that it will see the correct next pointer of a new head element. |
| 217 | |
| 218 | The type is struct gtid_pos_table *, but needs to be void * to allow using |
| 219 | my_atomic operations without violating C strict aliasing semantics. |
| 220 | */ |
| 221 | void * volatile gtid_pos_tables; |
| 222 | /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */ |
| 223 | void * volatile default_gtid_pos_table; |
| 224 | bool loaded; |
| 225 | |
| 226 | rpl_slave_state(); |
| 227 | ~rpl_slave_state(); |
| 228 | |
| 229 | void truncate_hash(); |
| 230 | ulong count() const { return hash.records; } |
| 231 | int update(uint32 domain_id, uint32 server_id, uint64 sub_id, |
| 232 | uint64 seq_no, void *hton, rpl_group_info *rgi); |
| 233 | int truncate_state_table(THD *thd); |
| 234 | void select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename); |
| 235 | int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, |
| 236 | bool in_transaction, bool in_statement, void **out_hton); |
| 237 | uint64 next_sub_id(uint32 domain_id); |
| 238 | int iterate(int (*cb)(rpl_gtid *, void *), void *data, |
| 239 | rpl_gtid *, uint32 , |
| 240 | bool sort); |
| 241 | int tostring(String *dest, rpl_gtid *, uint32 ); |
| 242 | bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid); |
| 243 | int load(THD *thd, const char *state_from_master, size_t len, bool reset, |
| 244 | bool in_statement); |
| 245 | bool is_empty(); |
| 246 | |
| 247 | element *get_element(uint32 domain_id); |
| 248 | int put_back_list(uint32 domain_id, list_element *list); |
| 249 | |
| 250 | void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, |
| 251 | rpl_group_info *rgi); |
| 252 | int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); |
| 253 | int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); |
| 254 | void release_domain_owner(rpl_group_info *rgi); |
| 255 | void set_gtid_pos_tables_list(gtid_pos_table *new_list, |
| 256 | gtid_pos_table *default_entry); |
| 257 | void add_gtid_pos_table(gtid_pos_table *entry); |
| 258 | struct gtid_pos_table *alloc_gtid_pos_table(LEX_CSTRING *table_name, |
| 259 | void *hton, rpl_slave_state::gtid_pos_table_state state); |
| 260 | void free_gtid_pos_tables(struct gtid_pos_table *list); |
| 261 | }; |
| 262 | |
| 263 | |
| 264 | /* |
| 265 | Binlog state. |
| 266 | This keeps the last GTID written to the binlog for every distinct |
| 267 | (domain_id, server_id) pair. |
| 268 | This will be logged at the start of the next binlog file as a |
| 269 | Gtid_list_log_event; this way, it is easy to find the binlog file |
| 270 | containing a given GTID, by simply scanning backwards from the newest |
| 271 | one until a lower seq_no is found in the Gtid_list_log_event at the |
| 272 | start of a binlog for the given domain_id and server_id. |
| 273 | |
| 274 | We also remember the last logged GTID for every domain_id. This is used |
| 275 | to know where to start when a master is changed to a slave. As a side |
| 276 | effect, it also allows to skip a hash lookup in the very common case of |
| 277 | logging a new GTID with same server id as last GTID. |
| 278 | */ |
| 279 | struct rpl_binlog_state |
| 280 | { |
| 281 | struct element { |
| 282 | uint32 domain_id; |
| 283 | HASH hash; /* Containing all server_id for one domain_id */ |
| 284 | /* The most recent entry in the hash. */ |
| 285 | rpl_gtid *last_gtid; |
| 286 | /* Counter to allocate next seq_no for this domain. */ |
| 287 | uint64 seq_no_counter; |
| 288 | |
| 289 | int update_element(const rpl_gtid *gtid); |
| 290 | }; |
| 291 | /* Mapping from domain_id to collection of elements. */ |
| 292 | HASH hash; |
| 293 | /* Mutex protecting access to the state. */ |
| 294 | mysql_mutex_t LOCK_binlog_state; |
| 295 | my_bool initialized; |
| 296 | |
| 297 | /* Auxiliary buffer to sort gtid list. */ |
| 298 | DYNAMIC_ARRAY gtid_sort_array; |
| 299 | |
| 300 | rpl_binlog_state() :initialized(0) {} |
| 301 | ~rpl_binlog_state(); |
| 302 | |
| 303 | void init(); |
| 304 | void reset_nolock(); |
| 305 | void reset(); |
| 306 | void free(); |
| 307 | bool load(struct rpl_gtid *list, uint32 count); |
| 308 | bool load(rpl_slave_state *slave_pos); |
| 309 | int update_nolock(const struct rpl_gtid *gtid, bool strict); |
| 310 | int update(const struct rpl_gtid *gtid, bool strict); |
| 311 | int update_with_next_gtid(uint32 domain_id, uint32 server_id, |
| 312 | rpl_gtid *gtid); |
| 313 | int alloc_element_nolock(const rpl_gtid *gtid); |
| 314 | bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); |
| 315 | int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no); |
| 316 | int write_to_iocache(IO_CACHE *dest); |
| 317 | int read_from_iocache(IO_CACHE *src); |
| 318 | uint32 count(); |
| 319 | int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); |
| 320 | int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); |
| 321 | bool append_pos(String *str); |
| 322 | bool append_state(String *str); |
| 323 | rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id); |
| 324 | rpl_gtid *find(uint32 domain_id, uint32 server_id); |
| 325 | rpl_gtid *find_most_recent(uint32 domain_id); |
| 326 | const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*); |
| 327 | }; |
| 328 | |
| 329 | |
| 330 | /* |
| 331 | Represent the GTID state that a slave connection to a master requests |
| 332 | the master to start sending binlog events from. |
| 333 | */ |
| 334 | struct slave_connection_state |
| 335 | { |
| 336 | struct entry { |
| 337 | rpl_gtid gtid; |
| 338 | uint32 flags; |
| 339 | }; |
| 340 | /* Bits for 'flags' */ |
| 341 | enum start_flags |
| 342 | { |
| 343 | START_OWN_SLAVE_POS= 0x1, |
| 344 | START_ON_EMPTY_DOMAIN= 0x2 |
| 345 | }; |
| 346 | |
| 347 | /* Mapping from domain_id to the entry with GTID requested for that domain. */ |
| 348 | HASH hash; |
| 349 | |
| 350 | /* Auxiliary buffer to sort gtid list. */ |
| 351 | DYNAMIC_ARRAY gtid_sort_array; |
| 352 | |
| 353 | slave_connection_state(); |
| 354 | ~slave_connection_state(); |
| 355 | |
| 356 | void reset() { my_hash_reset(&hash); } |
| 357 | int load(const char *slave_request, size_t len); |
| 358 | int load(const rpl_gtid *gtid_list, uint32 count); |
| 359 | int load(rpl_slave_state *state, rpl_gtid *, uint32 ); |
| 360 | rpl_gtid *find(uint32 domain_id); |
| 361 | entry *find_entry(uint32 domain_id); |
| 362 | int update(const rpl_gtid *in_gtid); |
| 363 | void remove(const rpl_gtid *gtid); |
| 364 | void remove_if_present(const rpl_gtid *in_gtid); |
| 365 | ulong count() const { return hash.records; } |
| 366 | int to_string(String *out_str); |
| 367 | int append_to_string(String *out_str); |
| 368 | int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); |
| 369 | bool is_pos_reached(); |
| 370 | }; |
| 371 | |
| 372 | |
| 373 | extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, |
| 374 | bool *first); |
| 375 | extern int gtid_check_rpl_slave_state_table(TABLE *table); |
| 376 | extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, |
| 377 | uint32 *out_len); |
| 378 | |
| 379 | #endif /* RPL_GTID_H */ |
| 380 | |