1/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16#ifndef RPL_GTID_H
17#define RPL_GTID_H
18
19#include "hash.h"
20#include "queues.h"
21
22
23/* Definitions for MariaDB global transaction ID (GTID). */
24
25
26extern const LEX_CSTRING rpl_gtid_slave_state_table_name;
27
28class String;
29
30struct rpl_gtid
31{
32 uint32 domain_id;
33 uint32 server_id;
34 uint64 seq_no;
35};
36
37inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs)
38{
39 return
40 lhs.domain_id == rhs.domain_id &&
41 lhs.server_id == rhs.server_id &&
42 lhs.seq_no == rhs.seq_no;
43};
44
45enum enum_gtid_skip_type {
46 GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
47};
48
49
50/*
51 Structure to keep track of threads waiting in MASTER_GTID_WAIT().
52
53 Since replication is (mostly) single-threaded, we want to minimise the
54 performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
55 are careful to keep the common lock between replication threads and
56 MASTER_GTID_WAIT threads held for as short as possible. We keep only
57 a single thread waiting to be notified by the replication threads; this
58 thread then handles all the (potentially heavy) lifting of dealing with
59 all current waiting threads.
60*/
61struct gtid_waiting {
62 /* Elements in the hash, basically a priority queue for each domain. */
63 struct hash_element {
64 QUEUE queue;
65 uint32 domain_id;
66 };
67 /* A priority queue to handle waiters in one domain in seq_no order. */
68 struct queue_element {
69 uint64 wait_seq_no;
70 THD *thd;
71 int queue_idx;
72 /*
73 do_small_wait is true if we have responsibility for ensuring that there
74 is a small waiter.
75 */
76 bool do_small_wait;
77 /*
78 The flag `done' is set when the wait is completed (either due to reaching
79 the position waited for, or due to timeout or kill). The queue_element
80 is in the queue if and only if `done' is true.
81 */
82 bool done;
83 };
84
85 mysql_mutex_t LOCK_gtid_waiting;
86 HASH hash;
87
88 void init();
89 void destroy();
90 hash_element *get_entry(uint32 domain_id);
91 int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
92 void promote_new_waiter(gtid_waiting::hash_element *he);
93 int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
94 void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
95 int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he,
96 queue_element *elem);
97 void remove_from_wait_queue(hash_element *he, queue_element *elem);
98};
99
100
101class Relay_log_info;
102struct rpl_group_info;
103class Gtid_list_log_event;
104
105/*
106 Replication slave state.
107
108 For every independent replication stream (identified by domain_id), this
109 remembers the last gtid applied on the slave within this domain.
110
111 Since events are always committed in-order within a single domain, this is
112 sufficient to maintain the state of the replication slave.
113*/
114struct rpl_slave_state
115{
116 /* Elements in the list of GTIDs kept for each domain_id. */
117 struct list_element
118 {
119 struct list_element *next;
120 uint64 sub_id;
121 uint64 seq_no;
122 uint32 server_id;
123 /*
124 hton of mysql.gtid_slave_pos* table used to record this GTID.
125 Can be NULL if the gtid table failed to load (eg. missing
126 mysql.gtid_slave_pos table following an upgrade).
127 */
128 void *hton;
129 };
130
131 /* Elements in the HASH that hold the state for one domain_id. */
132 struct element
133 {
134 struct list_element *list;
135 uint32 domain_id;
136 /* Highest seq_no seen so far in this domain. */
137 uint64 highest_seq_no;
138 /*
139 If this is non-NULL, then it is the waiter responsible for the small
140 wait in MASTER_GTID_WAIT().
141 */
142 gtid_waiting::queue_element *gtid_waiter;
143 /*
144 If gtid_waiter is non-NULL, then this is the seq_no that its
145 MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to
146 signal the waiter on COND_wait_gtid.
147 */
148 uint64 min_wait_seq_no;
149 mysql_cond_t COND_wait_gtid;
150
151 /*
152 For --gtid-ignore-duplicates. The Relay_log_info that currently owns
153 this domain, and the number of worker threads that are active in it.
154
155 The idea is that only one of multiple master connections is allowed to
156 actively apply events for a given domain. Other connections must either
157 discard the events (if the seq_no in GTID shows they have already been
158 applied), or wait to see if the current owner will apply it.
159 */
160 const Relay_log_info *owner_rli;
161 uint32 owner_count;
162 mysql_cond_t COND_gtid_ignore_duplicates;
163
164 list_element *grab_list() { list_element *l= list; list= NULL; return l; }
165 void add(list_element *l)
166 {
167 l->next= list;
168 list= l;
169 }
170 };
171
172 /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
173 enum gtid_pos_table_state {
174 GTID_POS_AUTO_CREATE,
175 GTID_POS_CREATE_REQUESTED,
176 GTID_POS_CREATE_IN_PROGRESS,
177 GTID_POS_AVAILABLE
178 };
179 struct gtid_pos_table {
180 struct gtid_pos_table *next;
181 /*
182 Use a void * here, rather than handlerton *, to make explicit that we
183 are not using the value to access any functionality in the engine. It
184 is just used as an opaque value to identify which engine we are using
185 for each GTID row.
186 */
187 void *table_hton;
188 LEX_CSTRING table_name;
189 uint8 state;
190 };
191
192 /* Mapping from domain_id to its element. */
193 HASH hash;
194 /* Mutex protecting access to the state. */
195 mysql_mutex_t LOCK_slave_state;
196 /* Auxiliary buffer to sort gtid list. */
197 DYNAMIC_ARRAY gtid_sort_array;
198
199 uint64 last_sub_id;
200 /*
201 List of tables available for durably storing the slave GTID position.
202
203 Accesses to this table is protected by LOCK_slave_state. However for
204 efficiency, there is also a provision for read access to it from a running
205 slave without lock.
206
207 An element can be added at the head of a list by storing the new
208 gtid_pos_tables pointer atomically with release semantics, to ensure that
209 the next pointer of the new element is visible to readers of the new list.
210 Other changes (like deleting or replacing elements) must happen only while
211 all SQL driver threads are stopped. LOCK_slave_state must be held in any
212 case.
213
214 The list can be read without lock by an SQL driver thread or worker thread
215 by reading the gtid_pos_tables pointer atomically with acquire semantics,
216 to ensure that it will see the correct next pointer of a new head element.
217
218 The type is struct gtid_pos_table *, but needs to be void * to allow using
219 my_atomic operations without violating C strict aliasing semantics.
220 */
221 void * volatile gtid_pos_tables;
222 /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */
223 void * volatile default_gtid_pos_table;
224 bool loaded;
225
226 rpl_slave_state();
227 ~rpl_slave_state();
228
229 void truncate_hash();
230 ulong count() const { return hash.records; }
231 int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
232 uint64 seq_no, void *hton, rpl_group_info *rgi);
233 int truncate_state_table(THD *thd);
234 void select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename);
235 int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
236 bool in_transaction, bool in_statement, void **out_hton);
237 uint64 next_sub_id(uint32 domain_id);
238 int iterate(int (*cb)(rpl_gtid *, void *), void *data,
239 rpl_gtid *extra_gtids, uint32 num_extra,
240 bool sort);
241 int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
242 bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid);
243 int load(THD *thd, const char *state_from_master, size_t len, bool reset,
244 bool in_statement);
245 bool is_empty();
246
247 element *get_element(uint32 domain_id);
248 int put_back_list(uint32 domain_id, list_element *list);
249
250 void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
251 rpl_group_info *rgi);
252 int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
253 int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
254 void release_domain_owner(rpl_group_info *rgi);
255 void set_gtid_pos_tables_list(gtid_pos_table *new_list,
256 gtid_pos_table *default_entry);
257 void add_gtid_pos_table(gtid_pos_table *entry);
258 struct gtid_pos_table *alloc_gtid_pos_table(LEX_CSTRING *table_name,
259 void *hton, rpl_slave_state::gtid_pos_table_state state);
260 void free_gtid_pos_tables(struct gtid_pos_table *list);
261};
262
263
264/*
265 Binlog state.
266 This keeps the last GTID written to the binlog for every distinct
267 (domain_id, server_id) pair.
268 This will be logged at the start of the next binlog file as a
269 Gtid_list_log_event; this way, it is easy to find the binlog file
270 containing a given GTID, by simply scanning backwards from the newest
271 one until a lower seq_no is found in the Gtid_list_log_event at the
272 start of a binlog for the given domain_id and server_id.
273
274 We also remember the last logged GTID for every domain_id. This is used
275 to know where to start when a master is changed to a slave. As a side
276 effect, it also allows to skip a hash lookup in the very common case of
277 logging a new GTID with same server id as last GTID.
278*/
279struct rpl_binlog_state
280{
281 struct element {
282 uint32 domain_id;
283 HASH hash; /* Containing all server_id for one domain_id */
284 /* The most recent entry in the hash. */
285 rpl_gtid *last_gtid;
286 /* Counter to allocate next seq_no for this domain. */
287 uint64 seq_no_counter;
288
289 int update_element(const rpl_gtid *gtid);
290 };
291 /* Mapping from domain_id to collection of elements. */
292 HASH hash;
293 /* Mutex protecting access to the state. */
294 mysql_mutex_t LOCK_binlog_state;
295 my_bool initialized;
296
297 /* Auxiliary buffer to sort gtid list. */
298 DYNAMIC_ARRAY gtid_sort_array;
299
300 rpl_binlog_state() :initialized(0) {}
301 ~rpl_binlog_state();
302
303 void init();
304 void reset_nolock();
305 void reset();
306 void free();
307 bool load(struct rpl_gtid *list, uint32 count);
308 bool load(rpl_slave_state *slave_pos);
309 int update_nolock(const struct rpl_gtid *gtid, bool strict);
310 int update(const struct rpl_gtid *gtid, bool strict);
311 int update_with_next_gtid(uint32 domain_id, uint32 server_id,
312 rpl_gtid *gtid);
313 int alloc_element_nolock(const rpl_gtid *gtid);
314 bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no);
315 int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no);
316 int write_to_iocache(IO_CACHE *dest);
317 int read_from_iocache(IO_CACHE *src);
318 uint32 count();
319 int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
320 int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
321 bool append_pos(String *str);
322 bool append_state(String *str);
323 rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id);
324 rpl_gtid *find(uint32 domain_id, uint32 server_id);
325 rpl_gtid *find_most_recent(uint32 domain_id);
326 const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*);
327};
328
329
330/*
331 Represent the GTID state that a slave connection to a master requests
332 the master to start sending binlog events from.
333*/
334struct slave_connection_state
335{
336 struct entry {
337 rpl_gtid gtid;
338 uint32 flags;
339 };
340 /* Bits for 'flags' */
341 enum start_flags
342 {
343 START_OWN_SLAVE_POS= 0x1,
344 START_ON_EMPTY_DOMAIN= 0x2
345 };
346
347 /* Mapping from domain_id to the entry with GTID requested for that domain. */
348 HASH hash;
349
350 /* Auxiliary buffer to sort gtid list. */
351 DYNAMIC_ARRAY gtid_sort_array;
352
353 slave_connection_state();
354 ~slave_connection_state();
355
356 void reset() { my_hash_reset(&hash); }
357 int load(const char *slave_request, size_t len);
358 int load(const rpl_gtid *gtid_list, uint32 count);
359 int load(rpl_slave_state *state, rpl_gtid *extra_gtids, uint32 num_extra);
360 rpl_gtid *find(uint32 domain_id);
361 entry *find_entry(uint32 domain_id);
362 int update(const rpl_gtid *in_gtid);
363 void remove(const rpl_gtid *gtid);
364 void remove_if_present(const rpl_gtid *in_gtid);
365 ulong count() const { return hash.records; }
366 int to_string(String *out_str);
367 int append_to_string(String *out_str);
368 int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
369 bool is_pos_reached();
370};
371
372
373extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
374 bool *first);
375extern int gtid_check_rpl_slave_state_table(TABLE *table);
376extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
377 uint32 *out_len);
378
379#endif /* RPL_GTID_H */
380