1#ifndef RPL_PARALLEL_H
2#define RPL_PARALLEL_H
3
4#include "log_event.h"
5
6
7struct rpl_parallel;
8struct rpl_parallel_entry;
9struct rpl_parallel_thread_pool;
10
11class Relay_log_info;
12struct inuse_relaylog;
13
14
15/*
16 Structure used to keep track of the parallel replication of a batch of
17 event-groups that group-committed together on the master.
18
19 It is used to ensure that every event group in one batch has reached the
20 commit stage before the next batch starts executing.
21
22 Note the lifetime of this structure:
23
24 - It is allocated when the first event in a new batch of group commits
25 is queued, from the free list rpl_parallel_entry::gco_free_list.
26
27 - The gco for the batch currently being queued is owned by
28 rpl_parallel_entry::current_gco. The gco for a previous batch that has
29 been fully queued is owned by the gco->prev_gco pointer of the gco for
30 the following batch.
31
32 - The worker thread waits on gco->COND_group_commit_orderer for
33 rpl_parallel_entry::count_committing_event_groups to reach wait_count
34 before starting; the first waiter links the gco into the next_gco
35 pointer of the gco of the previous batch for signalling.
36
37 - When an event group reaches the commit stage, it signals the
38 COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
39 rpl_parallel_entry::count_committing_event_groups has reached
40 gco->next_gco->wait_count.
41
42 - The gco lives until all its event groups have completed their commit.
43 This is detected by rpl_parallel_entry::last_committed_sub_id being
44 greater than or equal gco->last_sub_id. Once this happens, the gco is
45 freed. Note that since update of last_committed_sub_id can happen
46 out-of-order, the thread that frees a given gco can be for any later
47 event group, not necessarily an event group from the gco being freed.
48*/
49struct group_commit_orderer {
50 /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
51 mysql_cond_t COND_group_commit_orderer;
52 uint64 wait_count;
53 group_commit_orderer *prev_gco;
54 group_commit_orderer *next_gco;
55 /*
56 The sub_id of last event group in the previous GCO.
57 Only valid if prev_gco != NULL.
58 */
59 uint64 prior_sub_id;
60 /*
61 The sub_id of the last event group in this GCO. Only valid when next_gco
62 is non-NULL.
63 */
64 uint64 last_sub_id;
65 /*
66 This flag is set when this GCO has been installed into the next_gco pointer
67 of the previous GCO.
68 */
69 bool installed;
70
71 enum force_switch_bits
72 {
73 /*
74 This flag is set for a GCO in which we have event groups with multiple
75 different commit_id values from the master. This happens when we
76 optimistically try to execute in parallel transactions not known to be
77 conflict-free.
78
79 When this flag is set, in case of DDL we need to start a new GCO
80 regardless of current commit_id, as DDL is not safe to
81 speculatively apply in parallel with prior event groups.
82 */
83 MULTI_BATCH= 1,
84 /*
85 This flag is set for a GCO that contains DDL. If set, it forces
86 a switch to a new GCO upon seeing a new commit_id, as DDL is not
87 safe to speculatively replicate in parallel with subsequent
88 transactions.
89 */
90 FORCE_SWITCH= 2
91 };
92 uint8 flags;
93};
94
95
96struct rpl_parallel_thread {
97 bool delay_start;
98 bool running;
99 bool stop;
100 bool pause_for_ftwrl;
101 mysql_mutex_t LOCK_rpl_thread;
102 mysql_cond_t COND_rpl_thread;
103 mysql_cond_t COND_rpl_thread_queue;
104 mysql_cond_t COND_rpl_thread_stop;
105 struct rpl_parallel_thread *next; /* For free list. */
106 struct rpl_parallel_thread_pool *pool;
107 THD *thd;
108 /*
109 Who owns the thread, if any (it's a pointer into the
110 rpl_parallel_entry::rpl_threads array.
111 */
112 struct rpl_parallel_thread **current_owner;
113 /* The rpl_parallel_entry of the owner. */
114 rpl_parallel_entry *current_entry;
115 struct queued_event {
116 queued_event *next;
117 /*
118 queued_event can hold either an event to be executed, or just a binlog
119 position to be updated without any associated event.
120 */
121 enum queued_event_t {
122 QUEUED_EVENT,
123 QUEUED_POS_UPDATE,
124 QUEUED_MASTER_RESTART
125 } typ;
126 union {
127 Log_event *ev; /* QUEUED_EVENT */
128 rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and
129 QUEUED_MASTER_RESTART */
130 };
131 rpl_group_info *rgi;
132 inuse_relaylog *ir;
133 ulonglong future_event_relay_log_pos;
134 char event_relay_log_name[FN_REFLEN];
135 char future_event_master_log_name[FN_REFLEN];
136 ulonglong event_relay_log_pos;
137 my_off_t future_event_master_log_pos;
138 size_t event_size;
139 } *event_queue, *last_in_queue;
140 uint64 queued_size;
141 /* These free lists are protected by LOCK_rpl_thread. */
142 queued_event *qev_free_list;
143 rpl_group_info *rgi_free_list;
144 group_commit_orderer *gco_free_list;
145 /*
146 These free lists are local to the thread, so need not be protected by any
147 lock. They are moved to the global free lists in batches in the function
148 batch_free(), to reduce LOCK_rpl_thread contention.
149
150 The lists are not NULL-terminated (as we do not need to traverse them).
151 Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the
152 `next' pointer of the last element, which is used to link into the front
153 of the global freelists.
154 */
155 queued_event *loc_qev_list, **loc_qev_last_ptr_ptr;
156 size_t loc_qev_size;
157 uint64 qev_free_pending;
158 rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr;
159 group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr;
160 /* These keep track of batch update of inuse_relaylog refcounts. */
161 inuse_relaylog *accumulated_ir_last;
162 uint64 accumulated_ir_count;
163
164 void enqueue(queued_event *qev)
165 {
166 if (last_in_queue)
167 last_in_queue->next= qev;
168 else
169 event_queue= qev;
170 last_in_queue= qev;
171 queued_size+= qev->event_size;
172 }
173
174 void dequeue1(queued_event *list)
175 {
176 DBUG_ASSERT(list == event_queue);
177 event_queue= last_in_queue= NULL;
178 }
179
180 void dequeue2(size_t dequeue_size)
181 {
182 queued_size-= dequeue_size;
183 }
184
185 queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
186 queued_event *get_qev(Log_event *ev, ulonglong event_size,
187 Relay_log_info *rli);
188 queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
189 const char *relay_log_name,
190 ulonglong event_pos, ulonglong event_size);
191 /*
192 Put a qev on the local free list, to be later released to the global free
193 list by batch_free().
194 */
195 void loc_free_qev(queued_event *qev);
196 /*
197 Release an rgi immediately to the global free list. Requires holding the
198 LOCK_rpl_thread mutex.
199 */
200 void free_qev(queued_event *qev);
201 rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
202 rpl_parallel_entry *e, ulonglong event_size);
203 /*
204 Put an gco on the local free list, to be later released to the global free
205 list by batch_free().
206 */
207 void loc_free_rgi(rpl_group_info *rgi);
208 /*
209 Release an rgi immediately to the global free list. Requires holding the
210 LOCK_rpl_thread mutex.
211 */
212 void free_rgi(rpl_group_info *rgi);
213 group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev,
214 uint64 first_sub_id);
215 /*
216 Put a gco on the local free list, to be later released to the global free
217 list by batch_free().
218 */
219 void loc_free_gco(group_commit_orderer *gco);
220 /*
221 Move all local free lists to the global ones. Requires holding
222 LOCK_rpl_thread.
223 */
224 void batch_free();
225 /* Update inuse_relaylog refcounts with what we have accumulated so far. */
226 void inuse_relaylog_refcount_update();
227};
228
229
230struct rpl_parallel_thread_pool {
231 struct rpl_parallel_thread **threads;
232 struct rpl_parallel_thread *free_list;
233 mysql_mutex_t LOCK_rpl_thread_pool;
234 mysql_cond_t COND_rpl_thread_pool;
235 uint32 count;
236 bool inited;
237 /*
238 While FTWRL runs, this counter is incremented to make SQL thread or
239 STOP/START slave not try to start new activity while that operation
240 is in progress.
241 */
242 bool busy;
243
244 rpl_parallel_thread_pool();
245 int init(uint32 size);
246 void destroy();
247 struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
248 rpl_parallel_entry *entry);
249 void release_thread(rpl_parallel_thread *rpt);
250};
251
252
253struct rpl_parallel_entry {
254 mysql_mutex_t LOCK_parallel_entry;
255 mysql_cond_t COND_parallel_entry;
256 uint32 domain_id;
257 /*
258 Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
259 that they are waiting, so that finish_event_group knows to signal them
260 when last_committed_sub_id is increased.
261 */
262 uint32 need_sub_id_signal;
263 uint64 last_commit_id;
264 bool active;
265 /*
266 Set when SQL thread is shutting down, and no more events can be processed,
267 so worker threads must force abort any current transactions without
268 waiting for event groups to complete.
269 */
270 bool force_abort;
271 /*
272 At STOP SLAVE (force_abort=true), we do not want to process all events in
273 the queue (which could unnecessarily delay stop, if a lot of events happen
274 to be queued). The stop_count provides a safe point at which to stop, so
275 that everything before becomes committed and nothing after does. The value
276 corresponds to group_commit_orderer::wait_count; if wait_count is less than
277 or equal to stop_count, we execute the associated event group, else we
278 skip it (and all following) and stop.
279 */
280 uint64 stop_count;
281
282 /*
283 Cyclic array recording the last rpl_thread_max worker threads that we
284 queued event for. This is used to limit how many workers a single domain
285 can occupy (--slave-domain-parallel-threads).
286
287 Note that workers are never explicitly deleted from the array. Instead,
288 we need to check (under LOCK_rpl_thread) that the thread still belongs
289 to us before re-using (rpl_thread::current_owner).
290 */
291 rpl_parallel_thread **rpl_threads;
292 uint32 rpl_thread_max;
293 uint32 rpl_thread_idx;
294 /*
295 The sub_id of the last transaction to commit within this domain_id.
296 Must be accessed under LOCK_parallel_entry protection.
297
298 Event groups commit in order, so the rpl_group_info for an event group
299 will be alive (at least) as long as
300 rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to
301 safely refer back to previous event groups if they are still executing,
302 and ignore them if they completed, without requiring explicit
303 synchronisation between the threads.
304 */
305 uint64 last_committed_sub_id;
306 /*
307 The sub_id of the last event group in this replication domain that was
308 queued for execution by a worker thread.
309 */
310 uint64 current_sub_id;
311 /*
312 The largest sub_id that has started its transaction. Protected by
313 LOCK_parallel_entry.
314
315 (Transactions can start out-of-order, so this value signifies that no
316 transactions with larger sub_id have started, but not necessarily that all
317 transactions with smaller sub_id have started).
318 */
319 uint64 largest_started_sub_id;
320 rpl_group_info *current_group_info;
321 /*
322 If we get an error in some event group, we set the sub_id of that event
323 group here. Then later event groups (with higher sub_id) can know not to
324 try to start (event groups that already started will be rolled back when
325 wait_for_prior_commit() returns error).
326 The value is ULONGLONG_MAX when no error occurred.
327 */
328 uint64 stop_on_error_sub_id;
329 /*
330 During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
331 this value must not start, but wait until the global read lock is released.
332 The value is set to ULONGLONG_MAX when no FTWRL is pending.
333 */
334 uint64 pause_sub_id;
335 /* Total count of event groups queued so far. */
336 uint64 count_queued_event_groups;
337 /*
338 Count of event groups that have started (but not necessarily completed)
339 the commit phase. We use this to know when every event group in a previous
340 batch of master group commits have started committing on the slave, so
341 that it is safe to start executing the events in the following batch.
342 */
343 uint64 count_committing_event_groups;
344 /* The group_commit_orderer object for the events currently being queued. */
345 group_commit_orderer *current_gco;
346
347 rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
348 PSI_stage_info *old_stage, bool reuse);
349 int queue_master_restart(rpl_group_info *rgi,
350 Format_description_log_event *fdev);
351};
352struct rpl_parallel {
353 HASH domain_hash;
354 rpl_parallel_entry *current;
355 bool sql_thread_stopping;
356
357 rpl_parallel();
358 ~rpl_parallel();
359 void reset();
360 rpl_parallel_entry *find(uint32 domain_id);
361 void wait_for_done(THD *thd, Relay_log_info *rli);
362 void stop_during_until();
363 bool workers_idle();
364 int wait_for_workers_idle(THD *thd);
365 int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size);
366};
367
368
369extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
370
371
372extern int rpl_parallel_resize_pool_if_no_slaves(void);
373extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
374extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
375extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
376extern int rpl_pause_for_ftwrl(THD *thd);
377extern void rpl_unpause_after_ftwrl(THD *thd);
378
379#endif /* RPL_PARALLEL_H */
380