1 | #ifndef RPL_PARALLEL_H |
2 | #define RPL_PARALLEL_H |
3 | |
4 | #include "log_event.h" |
5 | |
6 | |
7 | struct rpl_parallel; |
8 | struct rpl_parallel_entry; |
9 | struct rpl_parallel_thread_pool; |
10 | |
11 | class Relay_log_info; |
12 | struct inuse_relaylog; |
13 | |
14 | |
15 | /* |
16 | Structure used to keep track of the parallel replication of a batch of |
17 | event-groups that group-committed together on the master. |
18 | |
19 | It is used to ensure that every event group in one batch has reached the |
20 | commit stage before the next batch starts executing. |
21 | |
22 | Note the lifetime of this structure: |
23 | |
24 | - It is allocated when the first event in a new batch of group commits |
25 | is queued, from the free list rpl_parallel_entry::gco_free_list. |
26 | |
27 | - The gco for the batch currently being queued is owned by |
28 | rpl_parallel_entry::current_gco. The gco for a previous batch that has |
29 | been fully queued is owned by the gco->prev_gco pointer of the gco for |
30 | the following batch. |
31 | |
32 | - The worker thread waits on gco->COND_group_commit_orderer for |
33 | rpl_parallel_entry::count_committing_event_groups to reach wait_count |
34 | before starting; the first waiter links the gco into the next_gco |
35 | pointer of the gco of the previous batch for signalling. |
36 | |
37 | - When an event group reaches the commit stage, it signals the |
38 | COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and |
39 | rpl_parallel_entry::count_committing_event_groups has reached |
40 | gco->next_gco->wait_count. |
41 | |
42 | - The gco lives until all its event groups have completed their commit. |
43 | This is detected by rpl_parallel_entry::last_committed_sub_id being |
44 | greater than or equal gco->last_sub_id. Once this happens, the gco is |
45 | freed. Note that since update of last_committed_sub_id can happen |
46 | out-of-order, the thread that frees a given gco can be for any later |
47 | event group, not necessarily an event group from the gco being freed. |
48 | */ |
49 | struct group_commit_orderer { |
50 | /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ |
51 | mysql_cond_t COND_group_commit_orderer; |
52 | uint64 wait_count; |
53 | group_commit_orderer *prev_gco; |
54 | group_commit_orderer *next_gco; |
55 | /* |
56 | The sub_id of last event group in the previous GCO. |
57 | Only valid if prev_gco != NULL. |
58 | */ |
59 | uint64 prior_sub_id; |
60 | /* |
61 | The sub_id of the last event group in this GCO. Only valid when next_gco |
62 | is non-NULL. |
63 | */ |
64 | uint64 last_sub_id; |
65 | /* |
66 | This flag is set when this GCO has been installed into the next_gco pointer |
67 | of the previous GCO. |
68 | */ |
69 | bool installed; |
70 | |
71 | enum force_switch_bits |
72 | { |
73 | /* |
74 | This flag is set for a GCO in which we have event groups with multiple |
75 | different commit_id values from the master. This happens when we |
76 | optimistically try to execute in parallel transactions not known to be |
77 | conflict-free. |
78 | |
79 | When this flag is set, in case of DDL we need to start a new GCO |
80 | regardless of current commit_id, as DDL is not safe to |
81 | speculatively apply in parallel with prior event groups. |
82 | */ |
83 | MULTI_BATCH= 1, |
84 | /* |
85 | This flag is set for a GCO that contains DDL. If set, it forces |
86 | a switch to a new GCO upon seeing a new commit_id, as DDL is not |
87 | safe to speculatively replicate in parallel with subsequent |
88 | transactions. |
89 | */ |
90 | FORCE_SWITCH= 2 |
91 | }; |
92 | uint8 flags; |
93 | }; |
94 | |
95 | |
96 | struct rpl_parallel_thread { |
97 | bool delay_start; |
98 | bool running; |
99 | bool stop; |
100 | bool pause_for_ftwrl; |
101 | mysql_mutex_t LOCK_rpl_thread; |
102 | mysql_cond_t COND_rpl_thread; |
103 | mysql_cond_t COND_rpl_thread_queue; |
104 | mysql_cond_t COND_rpl_thread_stop; |
105 | struct rpl_parallel_thread *next; /* For free list. */ |
106 | struct rpl_parallel_thread_pool *pool; |
107 | THD *thd; |
108 | /* |
109 | Who owns the thread, if any (it's a pointer into the |
110 | rpl_parallel_entry::rpl_threads array. |
111 | */ |
112 | struct rpl_parallel_thread **current_owner; |
113 | /* The rpl_parallel_entry of the owner. */ |
114 | rpl_parallel_entry *current_entry; |
115 | struct queued_event { |
116 | queued_event *next; |
117 | /* |
118 | queued_event can hold either an event to be executed, or just a binlog |
119 | position to be updated without any associated event. |
120 | */ |
121 | enum queued_event_t { |
122 | QUEUED_EVENT, |
123 | QUEUED_POS_UPDATE, |
124 | QUEUED_MASTER_RESTART |
125 | } typ; |
126 | union { |
127 | Log_event *ev; /* QUEUED_EVENT */ |
128 | rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and |
129 | QUEUED_MASTER_RESTART */ |
130 | }; |
131 | rpl_group_info *rgi; |
132 | inuse_relaylog *ir; |
133 | ulonglong future_event_relay_log_pos; |
134 | char event_relay_log_name[FN_REFLEN]; |
135 | char future_event_master_log_name[FN_REFLEN]; |
136 | ulonglong event_relay_log_pos; |
137 | my_off_t future_event_master_log_pos; |
138 | size_t event_size; |
139 | } *event_queue, *last_in_queue; |
140 | uint64 queued_size; |
141 | /* These free lists are protected by LOCK_rpl_thread. */ |
142 | queued_event *qev_free_list; |
143 | rpl_group_info *rgi_free_list; |
144 | group_commit_orderer *gco_free_list; |
145 | /* |
146 | These free lists are local to the thread, so need not be protected by any |
147 | lock. They are moved to the global free lists in batches in the function |
148 | batch_free(), to reduce LOCK_rpl_thread contention. |
149 | |
150 | The lists are not NULL-terminated (as we do not need to traverse them). |
151 | Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the |
152 | `next' pointer of the last element, which is used to link into the front |
153 | of the global freelists. |
154 | */ |
155 | queued_event *loc_qev_list, **loc_qev_last_ptr_ptr; |
156 | size_t loc_qev_size; |
157 | uint64 qev_free_pending; |
158 | rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr; |
159 | group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr; |
160 | /* These keep track of batch update of inuse_relaylog refcounts. */ |
161 | inuse_relaylog *accumulated_ir_last; |
162 | uint64 accumulated_ir_count; |
163 | |
164 | void enqueue(queued_event *qev) |
165 | { |
166 | if (last_in_queue) |
167 | last_in_queue->next= qev; |
168 | else |
169 | event_queue= qev; |
170 | last_in_queue= qev; |
171 | queued_size+= qev->event_size; |
172 | } |
173 | |
174 | void dequeue1(queued_event *list) |
175 | { |
176 | DBUG_ASSERT(list == event_queue); |
177 | event_queue= last_in_queue= NULL; |
178 | } |
179 | |
180 | void dequeue2(size_t dequeue_size) |
181 | { |
182 | queued_size-= dequeue_size; |
183 | } |
184 | |
185 | queued_event *get_qev_common(Log_event *ev, ulonglong event_size); |
186 | queued_event *get_qev(Log_event *ev, ulonglong event_size, |
187 | Relay_log_info *rli); |
188 | queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, |
189 | const char *relay_log_name, |
190 | ulonglong event_pos, ulonglong event_size); |
191 | /* |
192 | Put a qev on the local free list, to be later released to the global free |
193 | list by batch_free(). |
194 | */ |
195 | void loc_free_qev(queued_event *qev); |
196 | /* |
197 | Release an rgi immediately to the global free list. Requires holding the |
198 | LOCK_rpl_thread mutex. |
199 | */ |
200 | void free_qev(queued_event *qev); |
201 | rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, |
202 | rpl_parallel_entry *e, ulonglong event_size); |
203 | /* |
204 | Put an gco on the local free list, to be later released to the global free |
205 | list by batch_free(). |
206 | */ |
207 | void loc_free_rgi(rpl_group_info *rgi); |
208 | /* |
209 | Release an rgi immediately to the global free list. Requires holding the |
210 | LOCK_rpl_thread mutex. |
211 | */ |
212 | void free_rgi(rpl_group_info *rgi); |
213 | group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev, |
214 | uint64 first_sub_id); |
215 | /* |
216 | Put a gco on the local free list, to be later released to the global free |
217 | list by batch_free(). |
218 | */ |
219 | void loc_free_gco(group_commit_orderer *gco); |
220 | /* |
221 | Move all local free lists to the global ones. Requires holding |
222 | LOCK_rpl_thread. |
223 | */ |
224 | void batch_free(); |
225 | /* Update inuse_relaylog refcounts with what we have accumulated so far. */ |
226 | void inuse_relaylog_refcount_update(); |
227 | }; |
228 | |
229 | |
230 | struct rpl_parallel_thread_pool { |
231 | struct rpl_parallel_thread **threads; |
232 | struct rpl_parallel_thread *free_list; |
233 | mysql_mutex_t LOCK_rpl_thread_pool; |
234 | mysql_cond_t COND_rpl_thread_pool; |
235 | uint32 count; |
236 | bool inited; |
237 | /* |
238 | While FTWRL runs, this counter is incremented to make SQL thread or |
239 | STOP/START slave not try to start new activity while that operation |
240 | is in progress. |
241 | */ |
242 | bool busy; |
243 | |
244 | rpl_parallel_thread_pool(); |
245 | int init(uint32 size); |
246 | void destroy(); |
247 | struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, |
248 | rpl_parallel_entry *entry); |
249 | void release_thread(rpl_parallel_thread *rpt); |
250 | }; |
251 | |
252 | |
253 | struct rpl_parallel_entry { |
254 | mysql_mutex_t LOCK_parallel_entry; |
255 | mysql_cond_t COND_parallel_entry; |
256 | uint32 domain_id; |
257 | /* |
258 | Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show |
259 | that they are waiting, so that finish_event_group knows to signal them |
260 | when last_committed_sub_id is increased. |
261 | */ |
262 | uint32 need_sub_id_signal; |
263 | uint64 last_commit_id; |
264 | bool active; |
265 | /* |
266 | Set when SQL thread is shutting down, and no more events can be processed, |
267 | so worker threads must force abort any current transactions without |
268 | waiting for event groups to complete. |
269 | */ |
270 | bool force_abort; |
271 | /* |
272 | At STOP SLAVE (force_abort=true), we do not want to process all events in |
273 | the queue (which could unnecessarily delay stop, if a lot of events happen |
274 | to be queued). The stop_count provides a safe point at which to stop, so |
275 | that everything before becomes committed and nothing after does. The value |
276 | corresponds to group_commit_orderer::wait_count; if wait_count is less than |
277 | or equal to stop_count, we execute the associated event group, else we |
278 | skip it (and all following) and stop. |
279 | */ |
280 | uint64 stop_count; |
281 | |
282 | /* |
283 | Cyclic array recording the last rpl_thread_max worker threads that we |
284 | queued event for. This is used to limit how many workers a single domain |
285 | can occupy (--slave-domain-parallel-threads). |
286 | |
287 | Note that workers are never explicitly deleted from the array. Instead, |
288 | we need to check (under LOCK_rpl_thread) that the thread still belongs |
289 | to us before re-using (rpl_thread::current_owner). |
290 | */ |
291 | rpl_parallel_thread **rpl_threads; |
292 | uint32 rpl_thread_max; |
293 | uint32 rpl_thread_idx; |
294 | /* |
295 | The sub_id of the last transaction to commit within this domain_id. |
296 | Must be accessed under LOCK_parallel_entry protection. |
297 | |
298 | Event groups commit in order, so the rpl_group_info for an event group |
299 | will be alive (at least) as long as |
300 | rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to |
301 | safely refer back to previous event groups if they are still executing, |
302 | and ignore them if they completed, without requiring explicit |
303 | synchronisation between the threads. |
304 | */ |
305 | uint64 last_committed_sub_id; |
306 | /* |
307 | The sub_id of the last event group in this replication domain that was |
308 | queued for execution by a worker thread. |
309 | */ |
310 | uint64 current_sub_id; |
311 | /* |
312 | The largest sub_id that has started its transaction. Protected by |
313 | LOCK_parallel_entry. |
314 | |
315 | (Transactions can start out-of-order, so this value signifies that no |
316 | transactions with larger sub_id have started, but not necessarily that all |
317 | transactions with smaller sub_id have started). |
318 | */ |
319 | uint64 largest_started_sub_id; |
320 | rpl_group_info *current_group_info; |
321 | /* |
322 | If we get an error in some event group, we set the sub_id of that event |
323 | group here. Then later event groups (with higher sub_id) can know not to |
324 | try to start (event groups that already started will be rolled back when |
325 | wait_for_prior_commit() returns error). |
326 | The value is ULONGLONG_MAX when no error occurred. |
327 | */ |
328 | uint64 stop_on_error_sub_id; |
329 | /* |
330 | During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than |
331 | this value must not start, but wait until the global read lock is released. |
332 | The value is set to ULONGLONG_MAX when no FTWRL is pending. |
333 | */ |
334 | uint64 pause_sub_id; |
335 | /* Total count of event groups queued so far. */ |
336 | uint64 count_queued_event_groups; |
337 | /* |
338 | Count of event groups that have started (but not necessarily completed) |
339 | the commit phase. We use this to know when every event group in a previous |
340 | batch of master group commits have started committing on the slave, so |
341 | that it is safe to start executing the events in the following batch. |
342 | */ |
343 | uint64 count_committing_event_groups; |
344 | /* The group_commit_orderer object for the events currently being queued. */ |
345 | group_commit_orderer *current_gco; |
346 | |
347 | rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, |
348 | PSI_stage_info *old_stage, bool reuse); |
349 | int queue_master_restart(rpl_group_info *rgi, |
350 | Format_description_log_event *fdev); |
351 | }; |
352 | struct rpl_parallel { |
353 | HASH domain_hash; |
354 | rpl_parallel_entry *current; |
355 | bool sql_thread_stopping; |
356 | |
357 | rpl_parallel(); |
358 | ~rpl_parallel(); |
359 | void reset(); |
360 | rpl_parallel_entry *find(uint32 domain_id); |
361 | void wait_for_done(THD *thd, Relay_log_info *rli); |
362 | void stop_during_until(); |
363 | bool workers_idle(); |
364 | int wait_for_workers_idle(THD *thd); |
365 | int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); |
366 | }; |
367 | |
368 | |
369 | extern struct rpl_parallel_thread_pool global_rpl_thread_pool; |
370 | |
371 | |
372 | extern int rpl_parallel_resize_pool_if_no_slaves(void); |
373 | extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); |
374 | extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); |
375 | extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); |
376 | extern int rpl_pause_for_ftwrl(THD *thd); |
377 | extern void rpl_unpause_after_ftwrl(THD *thd); |
378 | |
379 | #endif /* RPL_PARALLEL_H */ |
380 | |