| 1 | #ifndef RPL_PARALLEL_H |
| 2 | #define RPL_PARALLEL_H |
| 3 | |
| 4 | #include "log_event.h" |
| 5 | |
| 6 | |
| 7 | struct rpl_parallel; |
| 8 | struct rpl_parallel_entry; |
| 9 | struct rpl_parallel_thread_pool; |
| 10 | |
| 11 | class Relay_log_info; |
| 12 | struct inuse_relaylog; |
| 13 | |
| 14 | |
| 15 | /* |
| 16 | Structure used to keep track of the parallel replication of a batch of |
| 17 | event-groups that group-committed together on the master. |
| 18 | |
| 19 | It is used to ensure that every event group in one batch has reached the |
| 20 | commit stage before the next batch starts executing. |
| 21 | |
| 22 | Note the lifetime of this structure: |
| 23 | |
| 24 | - It is allocated when the first event in a new batch of group commits |
| 25 | is queued, from the free list rpl_parallel_entry::gco_free_list. |
| 26 | |
| 27 | - The gco for the batch currently being queued is owned by |
| 28 | rpl_parallel_entry::current_gco. The gco for a previous batch that has |
| 29 | been fully queued is owned by the gco->prev_gco pointer of the gco for |
| 30 | the following batch. |
| 31 | |
| 32 | - The worker thread waits on gco->COND_group_commit_orderer for |
| 33 | rpl_parallel_entry::count_committing_event_groups to reach wait_count |
| 34 | before starting; the first waiter links the gco into the next_gco |
| 35 | pointer of the gco of the previous batch for signalling. |
| 36 | |
| 37 | - When an event group reaches the commit stage, it signals the |
| 38 | COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and |
| 39 | rpl_parallel_entry::count_committing_event_groups has reached |
| 40 | gco->next_gco->wait_count. |
| 41 | |
| 42 | - The gco lives until all its event groups have completed their commit. |
| 43 | This is detected by rpl_parallel_entry::last_committed_sub_id being |
| 44 | greater than or equal gco->last_sub_id. Once this happens, the gco is |
| 45 | freed. Note that since update of last_committed_sub_id can happen |
| 46 | out-of-order, the thread that frees a given gco can be for any later |
| 47 | event group, not necessarily an event group from the gco being freed. |
| 48 | */ |
| 49 | struct group_commit_orderer { |
| 50 | /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ |
| 51 | mysql_cond_t COND_group_commit_orderer; |
| 52 | uint64 wait_count; |
| 53 | group_commit_orderer *prev_gco; |
| 54 | group_commit_orderer *next_gco; |
| 55 | /* |
| 56 | The sub_id of last event group in the previous GCO. |
| 57 | Only valid if prev_gco != NULL. |
| 58 | */ |
| 59 | uint64 prior_sub_id; |
| 60 | /* |
| 61 | The sub_id of the last event group in this GCO. Only valid when next_gco |
| 62 | is non-NULL. |
| 63 | */ |
| 64 | uint64 last_sub_id; |
| 65 | /* |
| 66 | This flag is set when this GCO has been installed into the next_gco pointer |
| 67 | of the previous GCO. |
| 68 | */ |
| 69 | bool installed; |
| 70 | |
| 71 | enum force_switch_bits |
| 72 | { |
| 73 | /* |
| 74 | This flag is set for a GCO in which we have event groups with multiple |
| 75 | different commit_id values from the master. This happens when we |
| 76 | optimistically try to execute in parallel transactions not known to be |
| 77 | conflict-free. |
| 78 | |
| 79 | When this flag is set, in case of DDL we need to start a new GCO |
| 80 | regardless of current commit_id, as DDL is not safe to |
| 81 | speculatively apply in parallel with prior event groups. |
| 82 | */ |
| 83 | MULTI_BATCH= 1, |
| 84 | /* |
| 85 | This flag is set for a GCO that contains DDL. If set, it forces |
| 86 | a switch to a new GCO upon seeing a new commit_id, as DDL is not |
| 87 | safe to speculatively replicate in parallel with subsequent |
| 88 | transactions. |
| 89 | */ |
| 90 | FORCE_SWITCH= 2 |
| 91 | }; |
| 92 | uint8 flags; |
| 93 | }; |
| 94 | |
| 95 | |
| 96 | struct rpl_parallel_thread { |
| 97 | bool delay_start; |
| 98 | bool running; |
| 99 | bool stop; |
| 100 | bool pause_for_ftwrl; |
| 101 | mysql_mutex_t LOCK_rpl_thread; |
| 102 | mysql_cond_t COND_rpl_thread; |
| 103 | mysql_cond_t COND_rpl_thread_queue; |
| 104 | mysql_cond_t COND_rpl_thread_stop; |
| 105 | struct rpl_parallel_thread *next; /* For free list. */ |
| 106 | struct rpl_parallel_thread_pool *pool; |
| 107 | THD *thd; |
| 108 | /* |
| 109 | Who owns the thread, if any (it's a pointer into the |
| 110 | rpl_parallel_entry::rpl_threads array. |
| 111 | */ |
| 112 | struct rpl_parallel_thread **current_owner; |
| 113 | /* The rpl_parallel_entry of the owner. */ |
| 114 | rpl_parallel_entry *current_entry; |
| 115 | struct queued_event { |
| 116 | queued_event *next; |
| 117 | /* |
| 118 | queued_event can hold either an event to be executed, or just a binlog |
| 119 | position to be updated without any associated event. |
| 120 | */ |
| 121 | enum queued_event_t { |
| 122 | QUEUED_EVENT, |
| 123 | QUEUED_POS_UPDATE, |
| 124 | QUEUED_MASTER_RESTART |
| 125 | } typ; |
| 126 | union { |
| 127 | Log_event *ev; /* QUEUED_EVENT */ |
| 128 | rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and |
| 129 | QUEUED_MASTER_RESTART */ |
| 130 | }; |
| 131 | rpl_group_info *rgi; |
| 132 | inuse_relaylog *ir; |
| 133 | ulonglong future_event_relay_log_pos; |
| 134 | char event_relay_log_name[FN_REFLEN]; |
| 135 | char future_event_master_log_name[FN_REFLEN]; |
| 136 | ulonglong event_relay_log_pos; |
| 137 | my_off_t future_event_master_log_pos; |
| 138 | size_t event_size; |
| 139 | } *event_queue, *last_in_queue; |
| 140 | uint64 queued_size; |
| 141 | /* These free lists are protected by LOCK_rpl_thread. */ |
| 142 | queued_event *qev_free_list; |
| 143 | rpl_group_info *rgi_free_list; |
| 144 | group_commit_orderer *gco_free_list; |
| 145 | /* |
| 146 | These free lists are local to the thread, so need not be protected by any |
| 147 | lock. They are moved to the global free lists in batches in the function |
| 148 | batch_free(), to reduce LOCK_rpl_thread contention. |
| 149 | |
| 150 | The lists are not NULL-terminated (as we do not need to traverse them). |
| 151 | Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the |
| 152 | `next' pointer of the last element, which is used to link into the front |
| 153 | of the global freelists. |
| 154 | */ |
| 155 | queued_event *loc_qev_list, **loc_qev_last_ptr_ptr; |
| 156 | size_t loc_qev_size; |
| 157 | uint64 qev_free_pending; |
| 158 | rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr; |
| 159 | group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr; |
| 160 | /* These keep track of batch update of inuse_relaylog refcounts. */ |
| 161 | inuse_relaylog *accumulated_ir_last; |
| 162 | uint64 accumulated_ir_count; |
| 163 | |
| 164 | void enqueue(queued_event *qev) |
| 165 | { |
| 166 | if (last_in_queue) |
| 167 | last_in_queue->next= qev; |
| 168 | else |
| 169 | event_queue= qev; |
| 170 | last_in_queue= qev; |
| 171 | queued_size+= qev->event_size; |
| 172 | } |
| 173 | |
| 174 | void dequeue1(queued_event *list) |
| 175 | { |
| 176 | DBUG_ASSERT(list == event_queue); |
| 177 | event_queue= last_in_queue= NULL; |
| 178 | } |
| 179 | |
| 180 | void dequeue2(size_t dequeue_size) |
| 181 | { |
| 182 | queued_size-= dequeue_size; |
| 183 | } |
| 184 | |
| 185 | queued_event *get_qev_common(Log_event *ev, ulonglong event_size); |
| 186 | queued_event *get_qev(Log_event *ev, ulonglong event_size, |
| 187 | Relay_log_info *rli); |
| 188 | queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, |
| 189 | const char *relay_log_name, |
| 190 | ulonglong event_pos, ulonglong event_size); |
| 191 | /* |
| 192 | Put a qev on the local free list, to be later released to the global free |
| 193 | list by batch_free(). |
| 194 | */ |
| 195 | void loc_free_qev(queued_event *qev); |
| 196 | /* |
| 197 | Release an rgi immediately to the global free list. Requires holding the |
| 198 | LOCK_rpl_thread mutex. |
| 199 | */ |
| 200 | void free_qev(queued_event *qev); |
| 201 | rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, |
| 202 | rpl_parallel_entry *e, ulonglong event_size); |
| 203 | /* |
| 204 | Put an gco on the local free list, to be later released to the global free |
| 205 | list by batch_free(). |
| 206 | */ |
| 207 | void loc_free_rgi(rpl_group_info *rgi); |
| 208 | /* |
| 209 | Release an rgi immediately to the global free list. Requires holding the |
| 210 | LOCK_rpl_thread mutex. |
| 211 | */ |
| 212 | void free_rgi(rpl_group_info *rgi); |
| 213 | group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev, |
| 214 | uint64 first_sub_id); |
| 215 | /* |
| 216 | Put a gco on the local free list, to be later released to the global free |
| 217 | list by batch_free(). |
| 218 | */ |
| 219 | void loc_free_gco(group_commit_orderer *gco); |
| 220 | /* |
| 221 | Move all local free lists to the global ones. Requires holding |
| 222 | LOCK_rpl_thread. |
| 223 | */ |
| 224 | void batch_free(); |
| 225 | /* Update inuse_relaylog refcounts with what we have accumulated so far. */ |
| 226 | void inuse_relaylog_refcount_update(); |
| 227 | }; |
| 228 | |
| 229 | |
| 230 | struct rpl_parallel_thread_pool { |
| 231 | struct rpl_parallel_thread **threads; |
| 232 | struct rpl_parallel_thread *free_list; |
| 233 | mysql_mutex_t LOCK_rpl_thread_pool; |
| 234 | mysql_cond_t COND_rpl_thread_pool; |
| 235 | uint32 count; |
| 236 | bool inited; |
| 237 | /* |
| 238 | While FTWRL runs, this counter is incremented to make SQL thread or |
| 239 | STOP/START slave not try to start new activity while that operation |
| 240 | is in progress. |
| 241 | */ |
| 242 | bool busy; |
| 243 | |
| 244 | rpl_parallel_thread_pool(); |
| 245 | int init(uint32 size); |
| 246 | void destroy(); |
| 247 | struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, |
| 248 | rpl_parallel_entry *entry); |
| 249 | void release_thread(rpl_parallel_thread *rpt); |
| 250 | }; |
| 251 | |
| 252 | |
| 253 | struct rpl_parallel_entry { |
| 254 | mysql_mutex_t LOCK_parallel_entry; |
| 255 | mysql_cond_t COND_parallel_entry; |
| 256 | uint32 domain_id; |
| 257 | /* |
| 258 | Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show |
| 259 | that they are waiting, so that finish_event_group knows to signal them |
| 260 | when last_committed_sub_id is increased. |
| 261 | */ |
| 262 | uint32 need_sub_id_signal; |
| 263 | uint64 last_commit_id; |
| 264 | bool active; |
| 265 | /* |
| 266 | Set when SQL thread is shutting down, and no more events can be processed, |
| 267 | so worker threads must force abort any current transactions without |
| 268 | waiting for event groups to complete. |
| 269 | */ |
| 270 | bool force_abort; |
| 271 | /* |
| 272 | At STOP SLAVE (force_abort=true), we do not want to process all events in |
| 273 | the queue (which could unnecessarily delay stop, if a lot of events happen |
| 274 | to be queued). The stop_count provides a safe point at which to stop, so |
| 275 | that everything before becomes committed and nothing after does. The value |
| 276 | corresponds to group_commit_orderer::wait_count; if wait_count is less than |
| 277 | or equal to stop_count, we execute the associated event group, else we |
| 278 | skip it (and all following) and stop. |
| 279 | */ |
| 280 | uint64 stop_count; |
| 281 | |
| 282 | /* |
| 283 | Cyclic array recording the last rpl_thread_max worker threads that we |
| 284 | queued event for. This is used to limit how many workers a single domain |
| 285 | can occupy (--slave-domain-parallel-threads). |
| 286 | |
| 287 | Note that workers are never explicitly deleted from the array. Instead, |
| 288 | we need to check (under LOCK_rpl_thread) that the thread still belongs |
| 289 | to us before re-using (rpl_thread::current_owner). |
| 290 | */ |
| 291 | rpl_parallel_thread **rpl_threads; |
| 292 | uint32 rpl_thread_max; |
| 293 | uint32 rpl_thread_idx; |
| 294 | /* |
| 295 | The sub_id of the last transaction to commit within this domain_id. |
| 296 | Must be accessed under LOCK_parallel_entry protection. |
| 297 | |
| 298 | Event groups commit in order, so the rpl_group_info for an event group |
| 299 | will be alive (at least) as long as |
| 300 | rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to |
| 301 | safely refer back to previous event groups if they are still executing, |
| 302 | and ignore them if they completed, without requiring explicit |
| 303 | synchronisation between the threads. |
| 304 | */ |
| 305 | uint64 last_committed_sub_id; |
| 306 | /* |
| 307 | The sub_id of the last event group in this replication domain that was |
| 308 | queued for execution by a worker thread. |
| 309 | */ |
| 310 | uint64 current_sub_id; |
| 311 | /* |
| 312 | The largest sub_id that has started its transaction. Protected by |
| 313 | LOCK_parallel_entry. |
| 314 | |
| 315 | (Transactions can start out-of-order, so this value signifies that no |
| 316 | transactions with larger sub_id have started, but not necessarily that all |
| 317 | transactions with smaller sub_id have started). |
| 318 | */ |
| 319 | uint64 largest_started_sub_id; |
| 320 | rpl_group_info *current_group_info; |
| 321 | /* |
| 322 | If we get an error in some event group, we set the sub_id of that event |
| 323 | group here. Then later event groups (with higher sub_id) can know not to |
| 324 | try to start (event groups that already started will be rolled back when |
| 325 | wait_for_prior_commit() returns error). |
| 326 | The value is ULONGLONG_MAX when no error occurred. |
| 327 | */ |
| 328 | uint64 stop_on_error_sub_id; |
| 329 | /* |
| 330 | During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than |
| 331 | this value must not start, but wait until the global read lock is released. |
| 332 | The value is set to ULONGLONG_MAX when no FTWRL is pending. |
| 333 | */ |
| 334 | uint64 pause_sub_id; |
| 335 | /* Total count of event groups queued so far. */ |
| 336 | uint64 count_queued_event_groups; |
| 337 | /* |
| 338 | Count of event groups that have started (but not necessarily completed) |
| 339 | the commit phase. We use this to know when every event group in a previous |
| 340 | batch of master group commits have started committing on the slave, so |
| 341 | that it is safe to start executing the events in the following batch. |
| 342 | */ |
| 343 | uint64 count_committing_event_groups; |
| 344 | /* The group_commit_orderer object for the events currently being queued. */ |
| 345 | group_commit_orderer *current_gco; |
| 346 | |
| 347 | rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, |
| 348 | PSI_stage_info *old_stage, bool reuse); |
| 349 | int queue_master_restart(rpl_group_info *rgi, |
| 350 | Format_description_log_event *fdev); |
| 351 | }; |
| 352 | struct rpl_parallel { |
| 353 | HASH domain_hash; |
| 354 | rpl_parallel_entry *current; |
| 355 | bool sql_thread_stopping; |
| 356 | |
| 357 | rpl_parallel(); |
| 358 | ~rpl_parallel(); |
| 359 | void reset(); |
| 360 | rpl_parallel_entry *find(uint32 domain_id); |
| 361 | void wait_for_done(THD *thd, Relay_log_info *rli); |
| 362 | void stop_during_until(); |
| 363 | bool workers_idle(); |
| 364 | int wait_for_workers_idle(THD *thd); |
| 365 | int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); |
| 366 | }; |
| 367 | |
| 368 | |
| 369 | extern struct rpl_parallel_thread_pool global_rpl_thread_pool; |
| 370 | |
| 371 | |
| 372 | extern int rpl_parallel_resize_pool_if_no_slaves(void); |
| 373 | extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); |
| 374 | extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); |
| 375 | extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); |
| 376 | extern int rpl_pause_for_ftwrl(THD *thd); |
| 377 | extern void rpl_unpause_after_ftwrl(THD *thd); |
| 378 | |
| 379 | #endif /* RPL_PARALLEL_H */ |
| 380 | |