1 | /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
15 | |
16 | #ifndef RPL_GTID_H |
17 | #define RPL_GTID_H |
18 | |
19 | #include "hash.h" |
20 | #include "queues.h" |
21 | |
22 | |
23 | /* Definitions for MariaDB global transaction ID (GTID). */ |
24 | |
25 | |
26 | extern const LEX_CSTRING rpl_gtid_slave_state_table_name; |
27 | |
28 | class String; |
29 | |
30 | struct rpl_gtid |
31 | { |
32 | uint32 domain_id; |
33 | uint32 server_id; |
34 | uint64 seq_no; |
35 | }; |
36 | |
37 | inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs) |
38 | { |
39 | return |
40 | lhs.domain_id == rhs.domain_id && |
41 | lhs.server_id == rhs.server_id && |
42 | lhs.seq_no == rhs.seq_no; |
43 | }; |
44 | |
45 | enum enum_gtid_skip_type { |
46 | GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION |
47 | }; |
48 | |
49 | |
50 | /* |
51 | Structure to keep track of threads waiting in MASTER_GTID_WAIT(). |
52 | |
53 | Since replication is (mostly) single-threaded, we want to minimise the |
54 | performance impact on that from MASTER_GTID_WAIT(). To achieve this, we |
55 | are careful to keep the common lock between replication threads and |
56 | MASTER_GTID_WAIT threads held for as short as possible. We keep only |
57 | a single thread waiting to be notified by the replication threads; this |
58 | thread then handles all the (potentially heavy) lifting of dealing with |
59 | all current waiting threads. |
60 | */ |
61 | struct gtid_waiting { |
62 | /* Elements in the hash, basically a priority queue for each domain. */ |
63 | struct hash_element { |
64 | QUEUE queue; |
65 | uint32 domain_id; |
66 | }; |
67 | /* A priority queue to handle waiters in one domain in seq_no order. */ |
68 | struct queue_element { |
69 | uint64 wait_seq_no; |
70 | THD *thd; |
71 | int queue_idx; |
72 | /* |
73 | do_small_wait is true if we have responsibility for ensuring that there |
74 | is a small waiter. |
75 | */ |
76 | bool do_small_wait; |
77 | /* |
78 | The flag `done' is set when the wait is completed (either due to reaching |
79 | the position waited for, or due to timeout or kill). The queue_element |
80 | is in the queue if and only if `done' is true. |
81 | */ |
82 | bool done; |
83 | }; |
84 | |
85 | mysql_mutex_t LOCK_gtid_waiting; |
86 | HASH hash; |
87 | |
88 | void init(); |
89 | void destroy(); |
90 | hash_element *get_entry(uint32 domain_id); |
91 | int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us); |
92 | void promote_new_waiter(gtid_waiting::hash_element *he); |
93 | int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until); |
94 | void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he); |
95 | int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he, |
96 | queue_element *elem); |
97 | void remove_from_wait_queue(hash_element *he, queue_element *elem); |
98 | }; |
99 | |
100 | |
101 | class Relay_log_info; |
102 | struct rpl_group_info; |
103 | class Gtid_list_log_event; |
104 | |
105 | /* |
106 | Replication slave state. |
107 | |
108 | For every independent replication stream (identified by domain_id), this |
109 | remembers the last gtid applied on the slave within this domain. |
110 | |
111 | Since events are always committed in-order within a single domain, this is |
112 | sufficient to maintain the state of the replication slave. |
113 | */ |
114 | struct rpl_slave_state |
115 | { |
116 | /* Elements in the list of GTIDs kept for each domain_id. */ |
117 | struct list_element |
118 | { |
119 | struct list_element *next; |
120 | uint64 sub_id; |
121 | uint64 seq_no; |
122 | uint32 server_id; |
123 | /* |
124 | hton of mysql.gtid_slave_pos* table used to record this GTID. |
125 | Can be NULL if the gtid table failed to load (eg. missing |
126 | mysql.gtid_slave_pos table following an upgrade). |
127 | */ |
128 | void *hton; |
129 | }; |
130 | |
131 | /* Elements in the HASH that hold the state for one domain_id. */ |
132 | struct element |
133 | { |
134 | struct list_element *list; |
135 | uint32 domain_id; |
136 | /* Highest seq_no seen so far in this domain. */ |
137 | uint64 highest_seq_no; |
138 | /* |
139 | If this is non-NULL, then it is the waiter responsible for the small |
140 | wait in MASTER_GTID_WAIT(). |
141 | */ |
142 | gtid_waiting::queue_element *gtid_waiter; |
143 | /* |
144 | If gtid_waiter is non-NULL, then this is the seq_no that its |
145 | MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to |
146 | signal the waiter on COND_wait_gtid. |
147 | */ |
148 | uint64 min_wait_seq_no; |
149 | mysql_cond_t COND_wait_gtid; |
150 | |
151 | /* |
152 | For --gtid-ignore-duplicates. The Relay_log_info that currently owns |
153 | this domain, and the number of worker threads that are active in it. |
154 | |
155 | The idea is that only one of multiple master connections is allowed to |
156 | actively apply events for a given domain. Other connections must either |
157 | discard the events (if the seq_no in GTID shows they have already been |
158 | applied), or wait to see if the current owner will apply it. |
159 | */ |
160 | const Relay_log_info *owner_rli; |
161 | uint32 owner_count; |
162 | mysql_cond_t COND_gtid_ignore_duplicates; |
163 | |
164 | list_element *grab_list() { list_element *l= list; list= NULL; return l; } |
165 | void add(list_element *l) |
166 | { |
167 | l->next= list; |
168 | list= l; |
169 | } |
170 | }; |
171 | |
172 | /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */ |
173 | enum gtid_pos_table_state { |
174 | GTID_POS_AUTO_CREATE, |
175 | GTID_POS_CREATE_REQUESTED, |
176 | GTID_POS_CREATE_IN_PROGRESS, |
177 | GTID_POS_AVAILABLE |
178 | }; |
179 | struct gtid_pos_table { |
180 | struct gtid_pos_table *next; |
181 | /* |
182 | Use a void * here, rather than handlerton *, to make explicit that we |
183 | are not using the value to access any functionality in the engine. It |
184 | is just used as an opaque value to identify which engine we are using |
185 | for each GTID row. |
186 | */ |
187 | void *table_hton; |
188 | LEX_CSTRING table_name; |
189 | uint8 state; |
190 | }; |
191 | |
192 | /* Mapping from domain_id to its element. */ |
193 | HASH hash; |
194 | /* Mutex protecting access to the state. */ |
195 | mysql_mutex_t LOCK_slave_state; |
196 | /* Auxiliary buffer to sort gtid list. */ |
197 | DYNAMIC_ARRAY gtid_sort_array; |
198 | |
199 | uint64 last_sub_id; |
200 | /* |
201 | List of tables available for durably storing the slave GTID position. |
202 | |
203 | Accesses to this table is protected by LOCK_slave_state. However for |
204 | efficiency, there is also a provision for read access to it from a running |
205 | slave without lock. |
206 | |
207 | An element can be added at the head of a list by storing the new |
208 | gtid_pos_tables pointer atomically with release semantics, to ensure that |
209 | the next pointer of the new element is visible to readers of the new list. |
210 | Other changes (like deleting or replacing elements) must happen only while |
211 | all SQL driver threads are stopped. LOCK_slave_state must be held in any |
212 | case. |
213 | |
214 | The list can be read without lock by an SQL driver thread or worker thread |
215 | by reading the gtid_pos_tables pointer atomically with acquire semantics, |
216 | to ensure that it will see the correct next pointer of a new head element. |
217 | |
218 | The type is struct gtid_pos_table *, but needs to be void * to allow using |
219 | my_atomic operations without violating C strict aliasing semantics. |
220 | */ |
221 | void * volatile gtid_pos_tables; |
222 | /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */ |
223 | void * volatile default_gtid_pos_table; |
224 | bool loaded; |
225 | |
226 | rpl_slave_state(); |
227 | ~rpl_slave_state(); |
228 | |
229 | void truncate_hash(); |
230 | ulong count() const { return hash.records; } |
231 | int update(uint32 domain_id, uint32 server_id, uint64 sub_id, |
232 | uint64 seq_no, void *hton, rpl_group_info *rgi); |
233 | int truncate_state_table(THD *thd); |
234 | void select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename); |
235 | int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, |
236 | bool in_transaction, bool in_statement, void **out_hton); |
237 | uint64 next_sub_id(uint32 domain_id); |
238 | int iterate(int (*cb)(rpl_gtid *, void *), void *data, |
239 | rpl_gtid *, uint32 , |
240 | bool sort); |
241 | int tostring(String *dest, rpl_gtid *, uint32 ); |
242 | bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid); |
243 | int load(THD *thd, const char *state_from_master, size_t len, bool reset, |
244 | bool in_statement); |
245 | bool is_empty(); |
246 | |
247 | element *get_element(uint32 domain_id); |
248 | int put_back_list(uint32 domain_id, list_element *list); |
249 | |
250 | void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, |
251 | rpl_group_info *rgi); |
252 | int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); |
253 | int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); |
254 | void release_domain_owner(rpl_group_info *rgi); |
255 | void set_gtid_pos_tables_list(gtid_pos_table *new_list, |
256 | gtid_pos_table *default_entry); |
257 | void add_gtid_pos_table(gtid_pos_table *entry); |
258 | struct gtid_pos_table *alloc_gtid_pos_table(LEX_CSTRING *table_name, |
259 | void *hton, rpl_slave_state::gtid_pos_table_state state); |
260 | void free_gtid_pos_tables(struct gtid_pos_table *list); |
261 | }; |
262 | |
263 | |
264 | /* |
265 | Binlog state. |
266 | This keeps the last GTID written to the binlog for every distinct |
267 | (domain_id, server_id) pair. |
268 | This will be logged at the start of the next binlog file as a |
269 | Gtid_list_log_event; this way, it is easy to find the binlog file |
270 | containing a given GTID, by simply scanning backwards from the newest |
271 | one until a lower seq_no is found in the Gtid_list_log_event at the |
272 | start of a binlog for the given domain_id and server_id. |
273 | |
274 | We also remember the last logged GTID for every domain_id. This is used |
275 | to know where to start when a master is changed to a slave. As a side |
276 | effect, it also allows to skip a hash lookup in the very common case of |
277 | logging a new GTID with same server id as last GTID. |
278 | */ |
279 | struct rpl_binlog_state |
280 | { |
281 | struct element { |
282 | uint32 domain_id; |
283 | HASH hash; /* Containing all server_id for one domain_id */ |
284 | /* The most recent entry in the hash. */ |
285 | rpl_gtid *last_gtid; |
286 | /* Counter to allocate next seq_no for this domain. */ |
287 | uint64 seq_no_counter; |
288 | |
289 | int update_element(const rpl_gtid *gtid); |
290 | }; |
291 | /* Mapping from domain_id to collection of elements. */ |
292 | HASH hash; |
293 | /* Mutex protecting access to the state. */ |
294 | mysql_mutex_t LOCK_binlog_state; |
295 | my_bool initialized; |
296 | |
297 | /* Auxiliary buffer to sort gtid list. */ |
298 | DYNAMIC_ARRAY gtid_sort_array; |
299 | |
300 | rpl_binlog_state() :initialized(0) {} |
301 | ~rpl_binlog_state(); |
302 | |
303 | void init(); |
304 | void reset_nolock(); |
305 | void reset(); |
306 | void free(); |
307 | bool load(struct rpl_gtid *list, uint32 count); |
308 | bool load(rpl_slave_state *slave_pos); |
309 | int update_nolock(const struct rpl_gtid *gtid, bool strict); |
310 | int update(const struct rpl_gtid *gtid, bool strict); |
311 | int update_with_next_gtid(uint32 domain_id, uint32 server_id, |
312 | rpl_gtid *gtid); |
313 | int alloc_element_nolock(const rpl_gtid *gtid); |
314 | bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); |
315 | int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no); |
316 | int write_to_iocache(IO_CACHE *dest); |
317 | int read_from_iocache(IO_CACHE *src); |
318 | uint32 count(); |
319 | int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); |
320 | int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); |
321 | bool append_pos(String *str); |
322 | bool append_state(String *str); |
323 | rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id); |
324 | rpl_gtid *find(uint32 domain_id, uint32 server_id); |
325 | rpl_gtid *find_most_recent(uint32 domain_id); |
326 | const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*); |
327 | }; |
328 | |
329 | |
330 | /* |
331 | Represent the GTID state that a slave connection to a master requests |
332 | the master to start sending binlog events from. |
333 | */ |
334 | struct slave_connection_state |
335 | { |
336 | struct entry { |
337 | rpl_gtid gtid; |
338 | uint32 flags; |
339 | }; |
340 | /* Bits for 'flags' */ |
341 | enum start_flags |
342 | { |
343 | START_OWN_SLAVE_POS= 0x1, |
344 | START_ON_EMPTY_DOMAIN= 0x2 |
345 | }; |
346 | |
347 | /* Mapping from domain_id to the entry with GTID requested for that domain. */ |
348 | HASH hash; |
349 | |
350 | /* Auxiliary buffer to sort gtid list. */ |
351 | DYNAMIC_ARRAY gtid_sort_array; |
352 | |
353 | slave_connection_state(); |
354 | ~slave_connection_state(); |
355 | |
356 | void reset() { my_hash_reset(&hash); } |
357 | int load(const char *slave_request, size_t len); |
358 | int load(const rpl_gtid *gtid_list, uint32 count); |
359 | int load(rpl_slave_state *state, rpl_gtid *, uint32 ); |
360 | rpl_gtid *find(uint32 domain_id); |
361 | entry *find_entry(uint32 domain_id); |
362 | int update(const rpl_gtid *in_gtid); |
363 | void remove(const rpl_gtid *gtid); |
364 | void remove_if_present(const rpl_gtid *in_gtid); |
365 | ulong count() const { return hash.records; } |
366 | int to_string(String *out_str); |
367 | int append_to_string(String *out_str); |
368 | int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); |
369 | bool is_pos_reached(); |
370 | }; |
371 | |
372 | |
373 | extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, |
374 | bool *first); |
375 | extern int gtid_check_rpl_slave_state_table(TABLE *table); |
376 | extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, |
377 | uint32 *out_len); |
378 | |
379 | #endif /* RPL_GTID_H */ |
380 | |