1#include "mariadb.h"
2#include "rpl_parallel.h"
3#include "slave.h"
4#include "rpl_mi.h"
5#include "sql_parse.h"
6#include "debug_sync.h"
7
8/*
9 Code for optional parallel execution of replicated events on the slave.
10*/
11
12
13/*
14 Maximum number of queued events to accumulate in a local free list, before
15 moving them to the global free list. There is additional a limit of how much
16 to accumulate based on opt_slave_parallel_max_queued.
17*/
18#define QEV_BATCH_FREE 200
19
20
21struct rpl_parallel_thread_pool global_rpl_thread_pool;
22
23static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
24 int err);
25
26static int
27rpt_handle_event(rpl_parallel_thread::queued_event *qev,
28 struct rpl_parallel_thread *rpt)
29{
30 int err;
31 rpl_group_info *rgi= qev->rgi;
32 Relay_log_info *rli= rgi->rli;
33 THD *thd= rgi->thd;
34 Log_event *ev;
35
36 DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
37 ev= qev->ev;
38
39 thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
40 ev->thd= thd;
41
42 strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
43 rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
44 rgi->event_relay_log_pos= qev->event_relay_log_pos;
45 rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
46 strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
47 if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
48 (ev->when == 0)))
49 rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
50 err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
51
52 thread_safe_increment64(&rli->executed_entries);
53 /* ToDo: error handling. */
54 return err;
55}
56
57
58static void
59handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
60{
61 int cmp;
62 Relay_log_info *rli;
63 rpl_parallel_entry *e;
64
65 /*
66 Events that are not part of an event group, such as Format Description,
67 Stop, GTID List and such, are executed directly in the driver SQL thread,
68 to keep the relay log state up-to-date. But the associated position update
69 is done here, in sync with other normal events as they are queued to
70 worker threads.
71 */
72 if ((thd->variables.option_bits & OPTION_BEGIN) &&
73 opt_using_transactions)
74 return;
75
76 /* Do not update position if an earlier event group caused an error abort. */
77 DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
78 e= qev->entry_for_queued;
79 if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
80 return;
81
82 rli= qev->rgi->rli;
83 mysql_mutex_lock(&rli->data_lock);
84 cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
85 if (cmp < 0)
86 {
87 rli->group_relay_log_pos= qev->future_event_relay_log_pos;
88 strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name);
89 rli->notify_group_relay_log_name_update();
90 } else if (cmp == 0 &&
91 rli->group_relay_log_pos < qev->future_event_relay_log_pos)
92 rli->group_relay_log_pos= qev->future_event_relay_log_pos;
93
94 cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name);
95 if (cmp < 0)
96 {
97 strcpy(rli->group_master_log_name, qev->future_event_master_log_name);
98 rli->group_master_log_pos= qev->future_event_master_log_pos;
99 }
100 else if (cmp == 0
101 && rli->group_master_log_pos < qev->future_event_master_log_pos)
102 rli->group_master_log_pos= qev->future_event_master_log_pos;
103 mysql_mutex_unlock(&rli->data_lock);
104 mysql_cond_broadcast(&rli->data_cond);
105}
106
107
108/*
109 Wait for any pending deadlock kills. Since deadlock kills happen
110 asynchronously, we need to be sure they will be completed before starting a
111 new transaction. Otherwise the new transaction might suffer a spurious kill.
112*/
113static void
114wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
115{
116 PSI_stage_info old_stage;
117
118 mysql_mutex_lock(&thd->LOCK_wakeup_ready);
119 thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready,
120 &stage_waiting_for_deadlock_kill, &old_stage);
121 while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
122 mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready);
123 thd->EXIT_COND(&old_stage);
124}
125
126
127static void
128finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
129 rpl_parallel_entry *entry, rpl_group_info *rgi)
130{
131 THD *thd= rpt->thd;
132 wait_for_commit *wfc= &rgi->commit_orderer;
133 int err;
134
135 thd->get_stmt_da()->set_overwrite_status(true);
136 /*
137 Remove any left-over registration to wait for a prior commit to
138 complete. Normally, such wait would already have been removed at
139 this point by wait_for_prior_commit() called from within COMMIT
140 processing. However, in case of MyISAM and no binlog, we might not
141 have any commit processing, and so we need to do the wait here,
142 before waking up any subsequent commits, to preserve correct
143 order of event execution. Also, in the error case we might have
144 skipped waiting and thus need to remove it explicitly.
145
146 It is important in the non-error case to do a wait, not just an
147 unregister. Because we might be last in a group-commit that is
148 replicated in parallel, and the following event will then wait
149 for us to complete and rely on this also ensuring that any other
150 event in the group has completed.
151
152 And in the error case, correct GCO lifetime relies on the fact that once
153 the last event group in the GCO has executed wait_for_prior_commit(),
154 all earlier event groups have also committed; this way no more
155 mark_start_commit() calls can be made and it is safe to de-allocate
156 the GCO.
157 */
158 err= wfc->wait_for_prior_commit(thd);
159 if (unlikely(err) && !rgi->worker_error)
160 signal_error_to_sql_driver_thread(thd, rgi, err);
161 thd->wait_for_commit_ptr= NULL;
162
163 mysql_mutex_lock(&entry->LOCK_parallel_entry);
164 /*
165 We need to mark that this event group started its commit phase, in case we
166 missed it before (otherwise we would deadlock the next event group that is
167 waiting for this). In most cases (normal DML), it will be a no-op.
168 */
169 rgi->mark_start_commit_no_lock();
170
171 if (entry->last_committed_sub_id < sub_id)
172 {
173 /*
174 Record that this event group has finished (eg. transaction is
175 committed, if transactional), so other event groups will no longer
176 attempt to wait for us to commit. Once we have increased
177 entry->last_committed_sub_id, no other threads will execute
178 register_wait_for_prior_commit() against us. Thus, by doing one
179 extra (usually redundant) wakeup_subsequent_commits() we can ensure
180 that no register_wait_for_prior_commit() can ever happen without a
181 subsequent wakeup_subsequent_commits() to wake it up.
182
183 We can race here with the next transactions, but that is fine, as
184 long as we check that we do not decrease last_committed_sub_id. If
185 this commit is done, then any prior commits will also have been
186 done and also no longer need waiting for.
187 */
188 entry->last_committed_sub_id= sub_id;
189 if (entry->need_sub_id_signal)
190 mysql_cond_broadcast(&entry->COND_parallel_entry);
191
192 /* Now free any GCOs in which all transactions have committed. */
193 group_commit_orderer *tmp_gco= rgi->gco;
194 while (tmp_gco &&
195 (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id ||
196 tmp_gco->next_gco->wait_count > entry->count_committing_event_groups))
197 {
198 /*
199 We must not free a GCO before the wait_count of the following GCO has
200 been reached and wakeup has been sent. Otherwise we will lose the
201 wakeup and hang (there were several such bugs in the past).
202
203 The intention is that this is ensured already since we only free when
204 the last event group in the GCO has committed
205 (tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have
206 extra check on next_gco->wait_count to hopefully avoid hanging; we
207 have here an assertion in debug builds that this check does not in
208 fact trigger.
209 */
210 DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id);
211 tmp_gco= tmp_gco->prev_gco;
212 }
213 while (tmp_gco)
214 {
215 group_commit_orderer *prev_gco= tmp_gco->prev_gco;
216 tmp_gco->next_gco->prev_gco= NULL;
217 rpt->loc_free_gco(tmp_gco);
218 tmp_gco= prev_gco;
219 }
220 }
221
222 /*
223 If this event group got error, then any following event groups that have
224 not yet started should just skip their group, preparing for stop of the
225 SQL driver thread.
226 */
227 if (unlikely(rgi->worker_error) &&
228 entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
229 entry->stop_on_error_sub_id= sub_id;
230 mysql_mutex_unlock(&entry->LOCK_parallel_entry);
231
232 DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry", {
233 if (rgi->current_gtid.seq_no == 1000) {
234 DBUG_ASSERT(entry->stop_on_error_sub_id == sub_id);
235 debug_sync_set_action(thd,
236 STRING_WITH_LEN("now WAIT_FOR proceed_by_1000"));
237 }
238 });
239
240 if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
241 wait_for_pending_deadlock_kill(thd, rgi);
242 thd->clear_error();
243 thd->reset_killed();
244 /*
245 Would do thd->get_stmt_da()->set_overwrite_status(false) here, but
246 reset_diagnostics_area() already does that.
247 */
248 thd->get_stmt_da()->reset_diagnostics_area();
249 wfc->wakeup_subsequent_commits(rgi->worker_error);
250}
251
252
253static void
254signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
255{
256 rgi->worker_error= err;
257 /*
258 In case we get an error during commit, inform following transactions that
259 we aborted our commit.
260 */
261 rgi->unmark_start_commit();
262 rgi->cleanup_context(thd, true);
263 rgi->rli->abort_slave= true;
264 rgi->rli->stop_for_until= false;
265 mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
266 rgi->rli->relay_log.signal_relay_log_update();
267 mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
268}
269
270
271static void
272unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
273 PSI_stage_info *old_stage)
274{
275 if (*did_enter_cond)
276 {
277 thd->EXIT_COND(old_stage);
278 *did_enter_cond= false;
279 }
280 else
281 mysql_mutex_unlock(lock);
282}
283
284
285static void
286register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
287 rpl_parallel_entry *entry)
288{
289 mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
290 if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
291 {
292 /*
293 Register that the commit of this event group must wait for the
294 commit of the previous event group to complete before it may
295 complete itself, so that we preserve commit order.
296 */
297 wait_for_commit *waitee=
298 &rgi->wait_commit_group_info->commit_orderer;
299 rgi->commit_orderer.register_wait_for_prior_commit(waitee);
300 }
301}
302
303
304/*
305 Do not start parallel execution of this event group until all prior groups
306 have reached the commit phase that are not safe to run in parallel with.
307*/
308static bool
309do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
310 bool *did_enter_cond, PSI_stage_info *old_stage)
311{
312 THD *thd= rgi->thd;
313 rpl_parallel_entry *entry= rgi->parallel_entry;
314 uint64 wait_count;
315
316 mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
317
318 if (!gco->installed)
319 {
320 group_commit_orderer *prev_gco= gco->prev_gco;
321 if (prev_gco)
322 {
323 prev_gco->last_sub_id= gco->prior_sub_id;
324 prev_gco->next_gco= gco;
325 }
326 gco->installed= true;
327 }
328 wait_count= gco->wait_count;
329 if (wait_count > entry->count_committing_event_groups)
330 {
331 DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
332 thd->ENTER_COND(&gco->COND_group_commit_orderer,
333 &entry->LOCK_parallel_entry,
334 &stage_waiting_for_prior_transaction_to_start_commit,
335 old_stage);
336 *did_enter_cond= true;
337 thd->set_time_for_next_stage();
338 do
339 {
340 if (unlikely(thd->check_killed()) && !rgi->worker_error)
341 {
342 DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
343 thd->clear_error();
344 thd->get_stmt_da()->reset_diagnostics_area();
345 thd->send_kill_message();
346 slave_output_error_info(rgi, thd);
347 signal_error_to_sql_driver_thread(thd, rgi, 1);
348 /*
349 Even though we were killed, we need to continue waiting for the
350 prior event groups to signal that we can continue. Otherwise we
351 mess up the accounting for ordering. However, now that we have
352 marked the error, events will just be skipped rather than
353 executed, and things will progress quickly towards stop.
354 */
355 }
356 mysql_cond_wait(&gco->COND_group_commit_orderer,
357 &entry->LOCK_parallel_entry);
358 } while (wait_count > entry->count_committing_event_groups);
359 }
360
361 if (entry->force_abort && wait_count > entry->stop_count)
362 {
363 /*
364 We are stopping (STOP SLAVE), and this event group is beyond the point
365 where we can safely stop. So return a flag that will cause us to skip,
366 rather than execute, the following events.
367 */
368 return true;
369 }
370 else
371 return false;
372}
373
374
375static void
376do_ftwrl_wait(rpl_group_info *rgi,
377 bool *did_enter_cond, PSI_stage_info *old_stage)
378{
379 THD *thd= rgi->thd;
380 rpl_parallel_entry *entry= rgi->parallel_entry;
381 uint64 sub_id= rgi->gtid_sub_id;
382 DBUG_ENTER("do_ftwrl_wait");
383
384 mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
385
386 /*
387 If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
388 transaction is later than transactions that have priority to complete
389 before FTWRL. If so, wait here so that FTWRL can proceed and complete
390 first.
391
392 (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
393 this test false as required).
394 */
395 if (unlikely(sub_id > entry->pause_sub_id))
396 {
397 thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
398 &stage_waiting_for_ftwrl, old_stage);
399 *did_enter_cond= true;
400 thd->set_time_for_next_stage();
401 do
402 {
403 if (entry->force_abort || rgi->worker_error)
404 break;
405 if (unlikely(thd->check_killed()))
406 {
407 thd->send_kill_message();
408 slave_output_error_info(rgi, thd);
409 signal_error_to_sql_driver_thread(thd, rgi, 1);
410 break;
411 }
412 mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
413 } while (sub_id > entry->pause_sub_id);
414
415 /*
416 We do not call EXIT_COND() here, as this will be done later by our
417 caller (since we set *did_enter_cond to true).
418 */
419 }
420
421 if (sub_id > entry->largest_started_sub_id)
422 entry->largest_started_sub_id= sub_id;
423
424 DBUG_VOID_RETURN;
425}
426
427
428static int
429pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
430{
431 PSI_stage_info old_stage;
432 int res= 0;
433
434 /*
435 Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
436 READ LOCK work correctly, without incuring extra locking penalties in
437 normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
438 thread pool, and for this we need to make sure the pool will not go away
439 during the operation. The LOCK_rpl_thread_pool is not suitable for
440 this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
441 must be released before locking any LOCK_rpl_thread lock, or a deadlock
442 can occur.
443
444 So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
445 pool size changes with this condition wait.
446 */
447 mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
448 if (thd)
449 {
450 thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
451 &stage_waiting_for_rpl_thread_pool, &old_stage);
452 thd->set_time_for_next_stage();
453 }
454 while (pool->busy)
455 {
456 if (thd && unlikely(thd->check_killed()))
457 {
458 thd->send_kill_message();
459 res= 1;
460 break;
461 }
462 mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
463 }
464 if (!res)
465 pool->busy= true;
466 if (thd)
467 thd->EXIT_COND(&old_stage);
468 else
469 mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
470
471 return res;
472}
473
474
475static void
476pool_mark_not_busy(rpl_parallel_thread_pool *pool)
477{
478 mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
479 DBUG_ASSERT(pool->busy);
480 pool->busy= false;
481 mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
482 mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
483}
484
485
486void
487rpl_unpause_after_ftwrl(THD *thd)
488{
489 uint32 i;
490 rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
491 DBUG_ENTER("rpl_unpause_after_ftwrl");
492
493 DBUG_ASSERT(pool->busy);
494
495 for (i= 0; i < pool->count; ++i)
496 {
497 rpl_parallel_entry *e;
498 rpl_parallel_thread *rpt= pool->threads[i];
499
500 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
501 if (!rpt->current_owner)
502 {
503 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
504 continue;
505 }
506 e= rpt->current_entry;
507 mysql_mutex_lock(&e->LOCK_parallel_entry);
508 rpt->pause_for_ftwrl = false;
509 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
510 e->pause_sub_id= (uint64)ULONGLONG_MAX;
511 mysql_cond_broadcast(&e->COND_parallel_entry);
512 mysql_mutex_unlock(&e->LOCK_parallel_entry);
513 }
514
515 pool_mark_not_busy(pool);
516 DBUG_VOID_RETURN;
517}
518
519
520/*
521 .
522
523 Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
524*/
525int
526rpl_pause_for_ftwrl(THD *thd)
527{
528 uint32 i;
529 rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
530 int err;
531 DBUG_ENTER("rpl_pause_for_ftwrl");
532
533 /*
534 While the count_pending_pause_for_ftwrl counter is non-zero, the pool
535 cannot be shutdown/resized, so threads are guaranteed to not disappear.
536
537 This is required to safely be able to access the individual threads below.
538 (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
539 as this can deadlock against release_thread()).
540 */
541 if ((err= pool_mark_busy(pool, thd)))
542 DBUG_RETURN(err);
543
544 for (i= 0; i < pool->count; ++i)
545 {
546 PSI_stage_info old_stage;
547 rpl_parallel_entry *e;
548 rpl_parallel_thread *rpt= pool->threads[i];
549
550 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
551 if (!rpt->current_owner)
552 {
553 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
554 continue;
555 }
556 e= rpt->current_entry;
557 mysql_mutex_lock(&e->LOCK_parallel_entry);
558 /*
559 Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
560 de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
561 */
562 rpt->pause_for_ftwrl = true;
563 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
564 ++e->need_sub_id_signal;
565 if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
566 e->pause_sub_id= e->largest_started_sub_id;
567 thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
568 &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
569 thd->set_time_for_next_stage();
570 while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
571 e->last_committed_sub_id < e->pause_sub_id &&
572 !err)
573 {
574 if (unlikely(thd->check_killed()))
575 {
576 thd->send_kill_message();
577 err= 1;
578 break;
579 }
580 mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
581 };
582 --e->need_sub_id_signal;
583 thd->EXIT_COND(&old_stage);
584 if (err)
585 break;
586 }
587
588 if (err)
589 rpl_unpause_after_ftwrl(thd);
590 DBUG_RETURN(err);
591}
592
593
594#ifndef DBUG_OFF
595static int
596dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
597{
598 if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
599 rgi->retry_event_count == 4)
600 {
601 thd->clear_error();
602 thd->get_stmt_da()->reset_diagnostics_area();
603 my_error(ER_LOCK_DEADLOCK, MYF(0));
604 return 1;
605 }
606 return 0;
607}
608#endif
609
610
611/*
612 If we detect a deadlock due to eg. storage engine locks that conflict with
613 the fixed commit order, then the later transaction will be killed
614 asynchroneously to allow the former to complete its commit.
615
616 In this case, we convert the 'killed' error into a deadlock error, and retry
617 the later transaction.
618
619 If we are doing optimistic parallel apply of transactions not known to be
620 safe, we convert any error to a deadlock error, but then at retry we will
621 wait for prior transactions to commit first, so that the retries can be
622 done non-speculative.
623*/
624static void
625convert_kill_to_deadlock_error(rpl_group_info *rgi)
626{
627 THD *thd= rgi->thd;
628 int err_code;
629
630 if (!thd->get_stmt_da()->is_error())
631 return;
632 err_code= thd->get_stmt_da()->sql_errno();
633 if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
634 err_code != ER_PRIOR_COMMIT_FAILED) ||
635 ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
636 rgi->killed_for_retry))
637 {
638 thd->clear_error();
639 my_error(ER_LOCK_DEADLOCK, MYF(0));
640 thd->reset_killed();
641 }
642}
643
644
645/*
646 Check if an event marks the end of an event group. Returns non-zero if so,
647 zero otherwise.
648
649 In addition, returns 1 if the group is committing, 2 if it is rolling back.
650*/
651static int
652is_group_ending(Log_event *ev, Log_event_type event_type)
653{
654 if (event_type == XID_EVENT)
655 return 1;
656 if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed
657 {
658 Query_log_event *qev = (Query_log_event *)ev;
659 if (qev->is_commit())
660 return 1;
661 if (qev->is_rollback())
662 return 2;
663 }
664 return 0;
665}
666
667
668static int
669retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
670 rpl_parallel_thread::queued_event *orig_qev)
671{
672 IO_CACHE rlog;
673 LOG_INFO linfo;
674 File fd= (File)-1;
675 const char *errmsg;
676 inuse_relaylog *ir= rgi->relay_log;
677 uint64 event_count;
678 uint64 events_to_execute= rgi->retry_event_count;
679 Relay_log_info *rli= rgi->rli;
680 int err;
681 ulonglong cur_offset, old_offset;
682 char log_name[FN_REFLEN];
683 THD *thd= rgi->thd;
684 rpl_parallel_entry *entry= rgi->parallel_entry;
685 ulong retries= 0;
686 Format_description_log_event *description_event= NULL;
687
688do_retry:
689 event_count= 0;
690 err= 0;
691 errmsg= NULL;
692
693 /*
694 If we already started committing before getting the deadlock (or other
695 error) that caused us to need to retry, we have already signalled
696 subsequent transactions that we have started committing. This is
697 potentially a problem, as now we will rollback, and if subsequent
698 transactions would start to execute now, they could see an unexpected
699 state of the database and get eg. key not found or duplicate key error.
700
701 However, to get a deadlock in the first place, there must have been
702 another earlier transaction that is waiting for us. Thus that other
703 transaction has _not_ yet started to commit, and any subsequent
704 transactions will still be waiting at this point.
705
706 So here, we decrement back the count of transactions that started
707 committing (if we already incremented it), undoing the effect of an
708 earlier mark_start_commit(). Then later, when the retry succeeds and we
709 commit again, we can do a new mark_start_commit() and eventually wake up
710 subsequent transactions at the proper time.
711
712 We need to do the unmark before the rollback, to be sure that the
713 transaction we deadlocked with will not signal that it started to commit
714 until after the unmark.
715 */
716 DBUG_EXECUTE_IF("inject_mdev8302", { my_sleep(20000);});
717 rgi->unmark_start_commit();
718 DEBUG_SYNC(thd, "rpl_parallel_retry_after_unmark");
719
720 /*
721 We might get the deadlock error that causes the retry during commit, while
722 sitting in wait_for_prior_commit(). If this happens, we will have a
723 pending error in the wait_for_commit object. So clear this by
724 unregistering (and later re-registering) the wait.
725 */
726 if(thd->wait_for_commit_ptr)
727 thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
728 DBUG_EXECUTE_IF("inject_mdev8031", {
729 /* Simulate that we get deadlock killed at this exact point. */
730 rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
731 thd->set_killed(KILL_CONNECTION);
732 });
733 DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry", {
734 if (rgi->current_gtid.seq_no == 1001) {
735 debug_sync_set_action(thd,
736 STRING_WITH_LEN("rpl_parallel_simulate_wait_at_retry WAIT_FOR proceed_by_1001"));
737 }
738 DEBUG_SYNC(thd, "rpl_parallel_simulate_wait_at_retry");
739 });
740
741 rgi->cleanup_context(thd, 1);
742 wait_for_pending_deadlock_kill(thd, rgi);
743 thd->reset_killed();
744 thd->clear_error();
745 rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
746
747 /*
748 If we retry due to a deadlock kill that occurred during the commit step, we
749 might have already updated (but not committed) an update of table
750 mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have
751 rolled back any such update, so we must set the gtid_pending flag back to
752 true so that we will do a new update when/if we succeed with the retry.
753 */
754 rgi->gtid_pending= true;
755
756 mysql_mutex_lock(&rli->data_lock);
757 ++rli->retried_trans;
758 statistic_increment(slave_retried_transactions, LOCK_status);
759 mysql_mutex_unlock(&rli->data_lock);
760
761 for (;;)
762 {
763 mysql_mutex_lock(&entry->LOCK_parallel_entry);
764 if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
765#ifndef DBUG_OFF
766 (DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
767#endif
768 rgi->gtid_sub_id < entry->stop_on_error_sub_id)
769 {
770 register_wait_for_prior_event_group_commit(rgi, entry);
771 }
772 else
773 {
774 /*
775 A failure of a preceeding "parent" transaction may not be
776 seen by the current one through its own worker_error.
777 Such induced error gets set by ourselves now.
778 */
779 err= rgi->worker_error= 1;
780 my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
781 mysql_mutex_unlock(&entry->LOCK_parallel_entry);
782 goto err;
783 }
784 mysql_mutex_unlock(&entry->LOCK_parallel_entry);
785
786 /*
787 Let us wait for all prior transactions to complete before trying again.
788 This way, we avoid repeatedly conflicting with and getting deadlock
789 killed by the same earlier transaction.
790 */
791 if (!(err= thd->wait_for_prior_commit()))
792 {
793 rgi->speculation = rpl_group_info::SPECULATE_WAIT;
794 break;
795 }
796
797 convert_kill_to_deadlock_error(rgi);
798 if (!has_temporary_error(thd))
799 goto err;
800 /*
801 If we get a temporary error such as a deadlock kill, we can safely
802 ignore it, as we already rolled back.
803
804 But we still want to retry the wait for the prior transaction to
805 complete its commit.
806 */
807 thd->clear_error();
808 thd->reset_killed();
809 if(thd->wait_for_commit_ptr)
810 thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
811 DBUG_EXECUTE_IF("inject_mdev8031", {
812 /* Inject a small sleep to give prior transaction a chance to commit. */
813 my_sleep(100000);
814 });
815 }
816
817 /*
818 Let us clear any lingering deadlock kill one more time, here after
819 wait_for_prior_commit() has completed. This should rule out any
820 possibility of an old deadlock kill lingering on beyond this point.
821 */
822 thd->reset_killed();
823
824 strmake_buf(log_name, ir->name);
825 if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
826 {
827 err= 1;
828 goto err;
829 }
830 cur_offset= rgi->retry_start_offset;
831 delete description_event;
832 description_event=
833 read_relay_log_description_event(&rlog, cur_offset, &errmsg);
834 if (!description_event)
835 {
836 err= 1;
837 goto err;
838 }
839 DBUG_EXECUTE_IF("inject_mdev8031", {
840 /* Simulate pending KILL caught in read_relay_log_description_event(). */
841 if (unlikely(thd->check_killed())) {
842 thd->send_kill_message();
843 err= 1;
844 goto err;
845 }
846 });
847 my_b_seek(&rlog, cur_offset);
848
849 do
850 {
851 Log_event_type event_type;
852 Log_event *ev;
853 rpl_parallel_thread::queued_event *qev;
854
855 /* The loop is here so we can try again the next relay log file on EOF. */
856 for (;;)
857 {
858 old_offset= cur_offset;
859 ev= Log_event::read_log_event(&rlog, description_event,
860 opt_slave_sql_verify_checksum);
861 cur_offset= my_b_tell(&rlog);
862
863 if (ev)
864 break;
865 if (unlikely(rlog.error < 0))
866 {
867 errmsg= "slave SQL thread aborted because of I/O error";
868 err= 1;
869 goto check_retry;
870 }
871 if (unlikely(rlog.error > 0))
872 {
873 sql_print_error("Slave SQL thread: I/O error reading "
874 "event(errno: %d cur_log->error: %d)",
875 my_errno, rlog.error);
876 errmsg= "Aborting slave SQL thread because of partial event read";
877 err= 1;
878 goto err;
879 }
880 /* EOF. Move to the next relay log. */
881 end_io_cache(&rlog);
882 mysql_file_close(fd, MYF(MY_WME));
883 fd= (File)-1;
884
885 /* Find the next relay log file. */
886 if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
887 (err= rli->relay_log.find_next_log(&linfo, 1)))
888 {
889 char buff[22];
890 sql_print_error("next log error: %d offset: %s log: %s",
891 err,
892 llstr(linfo.index_file_offset, buff),
893 log_name);
894 goto err;
895 }
896 strmake_buf(log_name ,linfo.log_file_name);
897
898 DBUG_EXECUTE_IF("inject_retry_event_group_open_binlog_kill", {
899 if (retries < 2)
900 {
901 /* Simulate that we get deadlock killed during open_binlog(). */
902 thd->reset_for_next_command();
903 rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
904 thd->set_killed(KILL_CONNECTION);
905 thd->send_kill_message();
906 fd= (File)-1;
907 err= 1;
908 goto check_retry;
909 }
910 });
911 if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
912 {
913 err= 1;
914 goto check_retry;
915 }
916 description_event->reset_crypto();
917 /* Loop to try again on the new log file. */
918 }
919
920 event_type= ev->get_type_code();
921 if (event_type == FORMAT_DESCRIPTION_EVENT)
922 {
923 Format_description_log_event *newde= (Format_description_log_event*)ev;
924 newde->copy_crypto_data(description_event);
925 delete description_event;
926 description_event= newde;
927 continue;
928 }
929 else if (event_type == START_ENCRYPTION_EVENT)
930 {
931 description_event->start_decryption((Start_encryption_log_event*)ev);
932 delete ev;
933 continue;
934 }
935 else if (!Log_event::is_group_event(event_type))
936 {
937 delete ev;
938 continue;
939 }
940 ev->thd= thd;
941
942 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
943 qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset,
944 cur_offset - old_offset);
945 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
946 if (!qev)
947 {
948 delete ev;
949 my_error(ER_OUT_OF_RESOURCES, MYF(0));
950 err= 1;
951 goto err;
952 }
953 if (is_group_ending(ev, event_type) == 1)
954 rgi->mark_start_commit();
955
956 err= rpt_handle_event(qev, rpt);
957 ++event_count;
958 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
959 rpt->free_qev(qev);
960 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
961
962 delete_or_keep_event_post_apply(rgi, event_type, ev);
963 DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
964 if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
965 DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
966 err= dbug_simulate_tmp_error(rgi, thd););
967 if (!err)
968 continue;
969
970check_retry:
971 convert_kill_to_deadlock_error(rgi);
972 if (has_temporary_error(thd))
973 {
974 ++retries;
975 if (retries < slave_trans_retries)
976 {
977 if (fd >= 0)
978 {
979 end_io_cache(&rlog);
980 mysql_file_close(fd, MYF(MY_WME));
981 fd= (File)-1;
982 }
983 goto do_retry;
984 }
985 sql_print_error("Slave worker thread retried transaction %lu time(s) "
986 "in vain, giving up. Consider raising the value of "
987 "the slave_transaction_retries variable.",
988 slave_trans_retries);
989 }
990 goto err;
991
992 } while (event_count < events_to_execute);
993
994err:
995
996 if (description_event)
997 delete description_event;
998 if (fd >= 0)
999 {
1000 end_io_cache(&rlog);
1001 mysql_file_close(fd, MYF(MY_WME));
1002 }
1003 if (errmsg)
1004 sql_print_error("Error reading relay log event: %s", errmsg);
1005 return err;
1006}
1007
1008
1009pthread_handler_t
1010handle_rpl_parallel_thread(void *arg)
1011{
1012 THD *thd;
1013 PSI_stage_info old_stage;
1014 struct rpl_parallel_thread::queued_event *events;
1015 bool group_standalone= true;
1016 bool in_event_group= false;
1017 bool skip_event_group= false;
1018 rpl_group_info *group_rgi= NULL;
1019 group_commit_orderer *gco;
1020 uint64 event_gtid_sub_id= 0;
1021 rpl_sql_thread_info sql_info(NULL);
1022 int err;
1023
1024 struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
1025
1026 my_thread_init();
1027 thd = new THD(next_thread_id());
1028 thd->thread_stack = (char*)&thd;
1029 add_to_active_threads(thd);
1030 set_current_thd(thd);
1031 pthread_detach_this_thread();
1032 thd->init_for_queries();
1033 thd->variables.binlog_annotate_row_events= 0;
1034 init_thr_lock();
1035 thd->store_globals();
1036 thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
1037 thd->security_ctx->skip_grants();
1038 thd->variables.max_allowed_packet= slave_max_allowed_packet;
1039 /* Ensure that slave can exeute any alter table it gets from master */
1040 thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT;
1041 thd->slave_thread= 1;
1042
1043 set_slave_thread_options(thd);
1044 thd->client_capabilities = CLIENT_LOCAL_FILES;
1045 thd->net.reading_or_writing= 0;
1046 thd_proc_info(thd, "Waiting for work from main SQL threads");
1047 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
1048 thd->system_thread_info.rpl_sql_info= &sql_info;
1049 /*
1050 We need to use (at least) REPEATABLE READ isolation level. Otherwise
1051 speculative parallel apply can run out-of-order and give wrong results
1052 for statement-based replication.
1053 */
1054 thd->variables.tx_isolation= ISO_REPEATABLE_READ;
1055
1056 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1057 rpt->thd= thd;
1058
1059 while (rpt->delay_start)
1060 mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
1061
1062 rpt->running= true;
1063 mysql_cond_signal(&rpt->COND_rpl_thread);
1064
1065 thd->set_command(COM_SLAVE_WORKER);
1066 while (!rpt->stop)
1067 {
1068 uint wait_count= 0;
1069 rpl_parallel_thread::queued_event *qev, *next_qev;
1070
1071 thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
1072 &stage_waiting_for_work_from_sql_thread, &old_stage);
1073 /*
1074 There are 4 cases that should cause us to wake up:
1075 - Events have been queued for us to handle.
1076 - We have an owner, but no events and not inside event group -> we need
1077 to release ourself to the thread pool
1078 - SQL thread is stopping, and we have an owner but no events, and we are
1079 inside an event group; no more events will be queued to us, so we need
1080 to abort the group (force_abort==1).
1081 - Thread pool shutdown (rpt->stop==1).
1082 */
1083 while (!( (events= rpt->event_queue) ||
1084 (rpt->current_owner && !in_event_group) ||
1085 (rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
1086 rpt->stop))
1087 {
1088 if (!wait_count++)
1089 thd->set_time_for_next_stage();
1090 mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
1091 }
1092 rpt->dequeue1(events);
1093 thd->EXIT_COND(&old_stage);
1094
1095 more_events:
1096 for (qev= events; qev; qev= next_qev)
1097 {
1098 Log_event_type event_type;
1099 rpl_group_info *rgi= qev->rgi;
1100 rpl_parallel_entry *entry= rgi->parallel_entry;
1101 bool end_of_group;
1102 int group_ending;
1103
1104 next_qev= qev->next;
1105 if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
1106 {
1107 handle_queued_pos_update(thd, qev);
1108 rpt->loc_free_qev(qev);
1109 continue;
1110 }
1111 else if (qev->typ ==
1112 rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
1113 {
1114 if (in_event_group)
1115 {
1116 /*
1117 Master restarted (crashed) in the middle of an event group.
1118 So we need to roll back and discard that event group.
1119 */
1120 group_rgi->cleanup_context(thd, 1);
1121 in_event_group= false;
1122 finish_event_group(rpt, group_rgi->gtid_sub_id,
1123 qev->entry_for_queued, group_rgi);
1124
1125 rpt->loc_free_rgi(group_rgi);
1126 thd->rgi_slave= group_rgi= NULL;
1127 }
1128
1129 rpt->loc_free_qev(qev);
1130 continue;
1131 }
1132 DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
1133
1134 thd->rgi_slave= rgi;
1135 gco= rgi->gco;
1136 /* Handle a new event group, which will be initiated by a GTID event. */
1137 if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
1138 {
1139 bool did_enter_cond= false;
1140 PSI_stage_info old_stage;
1141
1142 DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
1143 if (rgi->current_gtid.domain_id == 0 &&
1144 rgi->current_gtid.seq_no == 100) {
1145 debug_sync_set_action(thd,
1146 STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100"));
1147 }
1148 });
1149
1150 if(unlikely(thd->wait_for_commit_ptr) && group_rgi != NULL)
1151 {
1152 /*
1153 This indicates that we get a new GTID event in the middle of
1154 a not completed event group. This is corrupt binlog (the master
1155 will never write such binlog), so it does not happen unless
1156 someone tries to inject wrong crafted binlog, but let us still
1157 try to handle it somewhat nicely.
1158 */
1159 group_rgi->cleanup_context(thd, true);
1160 finish_event_group(rpt, group_rgi->gtid_sub_id,
1161 group_rgi->parallel_entry, group_rgi);
1162 rpt->loc_free_rgi(group_rgi);
1163 }
1164
1165 thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
1166 in_event_group= true;
1167 /*
1168 If the standalone flag is set, then this event group consists of a
1169 single statement (possibly preceeded by some Intvar_log_event and
1170 similar), without any terminating COMMIT/ROLLBACK/XID.
1171 */
1172 group_standalone=
1173 (0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
1174 Gtid_log_event::FL_STANDALONE));
1175
1176 event_gtid_sub_id= rgi->gtid_sub_id;
1177 rgi->thd= thd;
1178
1179 mysql_mutex_lock(&entry->LOCK_parallel_entry);
1180 skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
1181
1182 if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
1183 skip_event_group= true;
1184 if (likely(!skip_event_group))
1185 do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
1186
1187 /*
1188 Register ourself to wait for the previous commit, if we need to do
1189 such registration _and_ that previous commit has not already
1190 occurred.
1191 */
1192 register_wait_for_prior_event_group_commit(rgi, entry);
1193
1194 unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
1195 &did_enter_cond, &old_stage);
1196
1197 thd->wait_for_commit_ptr= &rgi->commit_orderer;
1198
1199 if (opt_gtid_ignore_duplicates &&
1200 rgi->rli->mi->using_gtid != Master_info::USE_GTID_NO)
1201 {
1202 int res=
1203 rpl_global_gtid_slave_state->check_duplicate_gtid(&rgi->current_gtid,
1204 rgi);
1205 if (res < 0)
1206 {
1207 /* Error. */
1208 slave_output_error_info(rgi, thd);
1209 signal_error_to_sql_driver_thread(thd, rgi, 1);
1210 }
1211 else if (!res)
1212 {
1213 /* GTID already applied by another master connection, skip. */
1214 skip_event_group= true;
1215 }
1216 else
1217 {
1218 /* We have to apply the event. */
1219 }
1220 }
1221 /*
1222 If we are optimistically running transactions in parallel, but this
1223 particular event group should not run in parallel with what came
1224 before, then wait now for the prior transaction to complete its
1225 commit.
1226 */
1227 if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
1228 (err= thd->wait_for_prior_commit()))
1229 {
1230 slave_output_error_info(rgi, thd);
1231 signal_error_to_sql_driver_thread(thd, rgi, 1);
1232 }
1233 }
1234
1235 group_rgi= rgi;
1236 group_ending= is_group_ending(qev->ev, event_type);
1237 /*
1238 We do not unmark_start_commit() here in case of an explicit ROLLBACK
1239 statement. Such events should be very rare, there is no real reason
1240 to try to group commit them - on the contrary, it seems best to avoid
1241 running them in parallel with following group commits, as with
1242 ROLLBACK events we are already deep in dangerous corner cases with
1243 mix of transactional and non-transactional tables or the like. And
1244 avoiding the mark_start_commit() here allows us to keep an assertion
1245 in ha_rollback_trans() that we do not rollback after doing
1246 mark_start_commit().
1247 */
1248 if (group_ending == 1 && likely(!rgi->worker_error))
1249 {
1250 /*
1251 Do an extra check for (deadlock) kill here. This helps prevent a
1252 lingering deadlock kill that occurred during normal DML processing to
1253 propagate past the mark_start_commit(). If we detect a deadlock only
1254 after mark_start_commit(), we have to unmark, which has at least a
1255 theoretical possibility of leaving a window where it looks like all
1256 transactions in a GCO have started committing, while in fact one
1257 will need to rollback and retry. This is not supposed to be possible
1258 (since there is a deadlock, at least one transaction should be
1259 blocked from reaching commit), but this seems a fragile ensurance,
1260 and there were historically a number of subtle bugs in this area.
1261 */
1262 if (!thd->killed)
1263 {
1264 DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
1265 rgi->mark_start_commit();
1266 DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit");
1267 }
1268 }
1269
1270 /*
1271 If the SQL thread is stopping, we just skip execution of all the
1272 following event groups. We still do all the normal waiting and wakeup
1273 processing between the event groups as a simple way to ensure that
1274 everything is stopped and cleaned up correctly.
1275 */
1276 if (likely(!rgi->worker_error) && !skip_event_group)
1277 {
1278 ++rgi->retry_event_count;
1279#ifndef DBUG_OFF
1280 err= 0;
1281 DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid",
1282 if (event_type == XID_EVENT)
1283 {
1284 thd->clear_error();
1285 thd->get_stmt_da()->reset_diagnostics_area();
1286 my_error(ER_LOCK_DEADLOCK, MYF(0));
1287 err= 1;
1288 DEBUG_SYNC(thd, "rpl_parallel_simulate_temp_err_xid");
1289 });
1290 if (!err)
1291#endif
1292 {
1293 if (unlikely(thd->check_killed()))
1294 {
1295 thd->clear_error();
1296 thd->get_stmt_da()->reset_diagnostics_area();
1297 thd->send_kill_message();
1298 err= 1;
1299 }
1300 else
1301 err= rpt_handle_event(qev, rpt);
1302 }
1303 delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
1304 DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
1305 err= dbug_simulate_tmp_error(rgi, thd););
1306 if (unlikely(err))
1307 {
1308 convert_kill_to_deadlock_error(rgi);
1309 if (has_temporary_error(thd) && slave_trans_retries > 0)
1310 err= retry_event_group(rgi, rpt, qev);
1311 }
1312 }
1313 else
1314 {
1315 delete qev->ev;
1316 thd->get_stmt_da()->set_overwrite_status(true);
1317 err= thd->wait_for_prior_commit();
1318 thd->get_stmt_da()->set_overwrite_status(false);
1319 }
1320
1321 end_of_group=
1322 in_event_group &&
1323 ((group_standalone && !Log_event::is_part_of_group(event_type)) ||
1324 group_ending);
1325
1326 rpt->loc_free_qev(qev);
1327
1328 if (unlikely(err))
1329 {
1330 if (!rgi->worker_error)
1331 {
1332 slave_output_error_info(rgi, thd);
1333 signal_error_to_sql_driver_thread(thd, rgi, err);
1334 }
1335 thd->reset_killed();
1336 }
1337 if (end_of_group)
1338 {
1339 in_event_group= false;
1340 finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
1341 rpt->loc_free_rgi(rgi);
1342 thd->rgi_slave= group_rgi= rgi= NULL;
1343 skip_event_group= false;
1344 DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
1345 }
1346 }
1347
1348 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1349 /*
1350 Now that we have the lock, we can move everything from our local free
1351 lists to the real free lists that are also accessible from the SQL
1352 driver thread.
1353 */
1354 rpt->batch_free();
1355
1356 if ((events= rpt->event_queue) != NULL)
1357 {
1358 /*
1359 Take next group of events from the replication pool.
1360 This is faster than having to wakeup the pool manager thread to give
1361 us a new event.
1362 */
1363 rpt->dequeue1(events);
1364 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1365 goto more_events;
1366 }
1367
1368 rpt->inuse_relaylog_refcount_update();
1369
1370 if (in_event_group && group_rgi->parallel_entry->force_abort)
1371 {
1372 /*
1373 We are asked to abort, without getting the remaining events in the
1374 current event group.
1375
1376 We have to rollback the current transaction and update the last
1377 sub_id value so that SQL thread will know we are done with the
1378 half-processed event group.
1379 */
1380 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1381 signal_error_to_sql_driver_thread(thd, group_rgi, 1);
1382 finish_event_group(rpt, group_rgi->gtid_sub_id,
1383 group_rgi->parallel_entry, group_rgi);
1384 in_event_group= false;
1385 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1386 rpt->free_rgi(group_rgi);
1387 thd->rgi_slave= group_rgi= NULL;
1388 skip_event_group= false;
1389 }
1390 if (!in_event_group)
1391 {
1392 /* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */
1393 while (rpt->current_entry && rpt->pause_for_ftwrl)
1394 {
1395 /*
1396 We are currently in the delicate process of pausing parallel
1397 replication while FLUSH TABLES WITH READ LOCK is starting. We must
1398 not de-allocate the thread (setting rpt->current_owner= NULL) until
1399 rpl_unpause_after_ftwrl() has woken us up.
1400 */
1401 rpl_parallel_entry *e= rpt->current_entry;
1402 /*
1403 Wait for rpl_unpause_after_ftwrl() to wake us up.
1404 Note that rpl_pause_for_ftwrl() may wait for 'e->pause_sub_id'
1405 to change. This should happen eventually in finish_event_group()
1406 */
1407 mysql_mutex_lock(&e->LOCK_parallel_entry);
1408 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1409 if (rpt->pause_for_ftwrl)
1410 mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
1411 mysql_mutex_unlock(&e->LOCK_parallel_entry);
1412 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1413 }
1414
1415 rpt->current_owner= NULL;
1416 /* Tell wait_for_done() that we are done, if it is waiting. */
1417 if (likely(rpt->current_entry) &&
1418 unlikely(rpt->current_entry->force_abort))
1419 mysql_cond_broadcast(&rpt->COND_rpl_thread_stop);
1420
1421 rpt->current_entry= NULL;
1422 if (!rpt->stop)
1423 rpt->pool->release_thread(rpt);
1424 }
1425 }
1426
1427 rpt->thd= NULL;
1428 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1429
1430 thd->clear_error();
1431 thd->catalog= 0;
1432 thd->reset_query();
1433 thd->reset_db(&null_clex_str);
1434 thd_proc_info(thd, "Slave worker thread exiting");
1435 thd->temporary_tables= 0;
1436
1437 THD_CHECK_SENTRY(thd);
1438 unlink_not_visible_thd(thd);
1439 delete thd;
1440
1441 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1442 rpt->running= false;
1443 mysql_cond_signal(&rpt->COND_rpl_thread);
1444 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1445
1446 my_thread_end();
1447
1448 return NULL;
1449}
1450
1451
1452static void
1453dealloc_gco(group_commit_orderer *gco)
1454{
1455 mysql_cond_destroy(&gco->COND_group_commit_orderer);
1456 my_free(gco);
1457}
1458
1459/**
1460 Change thread count for global parallel worker threads
1461
1462 @param pool parallel thread pool
1463 @param new_count Number of threads to be in pool. 0 in shutdown
1464 @param force Force thread count to new_count even if slave
1465 threads are running
1466
1467 By default we don't resize pool of there are running threads.
1468 However during shutdown we will always do it.
1469 This is needed as any_slave_sql_running() returns 1 during shutdown
1470 as we don't want to access master_info while
1471 Master_info_index::free_connections are running.
1472*/
1473
1474static int
1475rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
1476 uint32 new_count, bool force)
1477{
1478 uint32 i;
1479 rpl_parallel_thread **old_list= NULL;
1480 rpl_parallel_thread **new_list= NULL;
1481 rpl_parallel_thread *new_free_list= NULL;
1482 rpl_parallel_thread *rpt_array= NULL;
1483 int res;
1484
1485 if ((res= pool_mark_busy(pool, current_thd)))
1486 return res;
1487
1488 /* Protect against parallel pool resizes */
1489 if (pool->count == new_count)
1490 {
1491 pool_mark_not_busy(pool);
1492 return 0;
1493 }
1494
1495 /*
1496 If we are about to delete pool, do an extra check that there are no new
1497 slave threads running since we marked pool busy
1498 */
1499 if (!new_count && !force)
1500 {
1501 if (any_slave_sql_running(false))
1502 {
1503 DBUG_PRINT("warning",
1504 ("SQL threads running while trying to reset parallel pool"));
1505 pool_mark_not_busy(pool);
1506 return 0; // Ok to not resize pool
1507 }
1508 }
1509
1510 /*
1511 Allocate the new list of threads up-front.
1512 That way, if we fail half-way, we only need to free whatever we managed
1513 to allocate, and will not be left with a half-functional thread pool.
1514 */
1515 if (new_count &&
1516 !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
1517 &new_list, new_count*sizeof(*new_list),
1518 &rpt_array, new_count*sizeof(*rpt_array),
1519 NULL))
1520 {
1521 my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
1522 new_count*sizeof(*rpt_array))));
1523 goto err;
1524 }
1525
1526 for (i= 0; i < new_count; ++i)
1527 {
1528 pthread_t th;
1529
1530 new_list[i]= &rpt_array[i];
1531 new_list[i]->delay_start= true;
1532 mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
1533 MY_MUTEX_INIT_SLOW);
1534 mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
1535 mysql_cond_init(key_COND_rpl_thread_queue,
1536 &new_list[i]->COND_rpl_thread_queue, NULL);
1537 mysql_cond_init(key_COND_rpl_thread_stop,
1538 &new_list[i]->COND_rpl_thread_stop, NULL);
1539 new_list[i]->pool= pool;
1540 if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
1541 handle_rpl_parallel_thread, new_list[i]))
1542 {
1543 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1544 goto err;
1545 }
1546 new_list[i]->next= new_free_list;
1547 new_free_list= new_list[i];
1548 }
1549
1550 /*
1551 Grab each old thread in turn, and signal it to stop.
1552
1553 Note that since we require all replication threads to be stopped before
1554 changing the parallel replication worker thread pool, all the threads will
1555 be already idle and will terminate immediately.
1556 */
1557 for (i= 0; i < pool->count; ++i)
1558 {
1559 rpl_parallel_thread *rpt;
1560
1561 mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
1562 while ((rpt= pool->free_list) == NULL)
1563 mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
1564 pool->free_list= rpt->next;
1565 mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
1566 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1567 rpt->stop= true;
1568 mysql_cond_signal(&rpt->COND_rpl_thread);
1569 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1570 }
1571
1572 for (i= 0; i < pool->count; ++i)
1573 {
1574 rpl_parallel_thread *rpt= pool->threads[i];
1575 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1576 while (rpt->running)
1577 mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
1578 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
1579 mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
1580 mysql_cond_destroy(&rpt->COND_rpl_thread);
1581 while (rpt->qev_free_list)
1582 {
1583 rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
1584 my_free(rpt->qev_free_list);
1585 rpt->qev_free_list= next;
1586 }
1587 while (rpt->rgi_free_list)
1588 {
1589 rpl_group_info *next= rpt->rgi_free_list->next;
1590 delete rpt->rgi_free_list;
1591 rpt->rgi_free_list= next;
1592 }
1593 while (rpt->gco_free_list)
1594 {
1595 group_commit_orderer *next= rpt->gco_free_list->next_gco;
1596 dealloc_gco(rpt->gco_free_list);
1597 rpt->gco_free_list= next;
1598 }
1599 }
1600
1601 old_list= pool->threads;
1602 if (new_count < pool->count)
1603 pool->count= new_count;
1604 pool->threads= new_list;
1605 if (new_count > pool->count)
1606 pool->count= new_count;
1607 my_free(old_list);
1608 pool->free_list= new_free_list;
1609 for (i= 0; i < pool->count; ++i)
1610 {
1611 mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
1612 pool->threads[i]->delay_start= false;
1613 mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
1614 while (!pool->threads[i]->running)
1615 mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
1616 &pool->threads[i]->LOCK_rpl_thread);
1617 mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
1618 }
1619
1620 pool_mark_not_busy(pool);
1621
1622 return 0;
1623
1624err:
1625 if (new_list)
1626 {
1627 while (new_free_list)
1628 {
1629 mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
1630 new_free_list->delay_start= false;
1631 new_free_list->stop= true;
1632 mysql_cond_signal(&new_free_list->COND_rpl_thread);
1633 while (!new_free_list->running)
1634 mysql_cond_wait(&new_free_list->COND_rpl_thread,
1635 &new_free_list->LOCK_rpl_thread);
1636 while (new_free_list->running)
1637 mysql_cond_wait(&new_free_list->COND_rpl_thread,
1638 &new_free_list->LOCK_rpl_thread);
1639 mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
1640 new_free_list= new_free_list->next;
1641 }
1642 my_free(new_list);
1643 }
1644 pool_mark_not_busy(pool);
1645 return 1;
1646}
1647
1648/*
1649 Deactivate the parallel replication thread pool, if there are now no more
1650 SQL threads running.
1651*/
1652
1653int rpl_parallel_resize_pool_if_no_slaves(void)
1654{
1655 /* master_info_index is set to NULL on shutdown */
1656 if (opt_slave_parallel_threads > 0 && !any_slave_sql_running(false))
1657 return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
1658 return 0;
1659}
1660
1661
1662/**
1663 Resize pool if not active or busy (in which case we may be in
1664 resize to 0
1665*/
1666
1667int
1668rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
1669{
1670 bool resize;
1671 mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
1672 resize= !pool->count || pool->busy;
1673 mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
1674 if (resize)
1675 return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
1676 0);
1677 return 0;
1678}
1679
1680
1681int
1682rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
1683{
1684 return rpl_parallel_change_thread_count(pool, 0, 0);
1685}
1686
1687
1688void
1689rpl_parallel_thread::batch_free()
1690{
1691 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1692 if (loc_qev_list)
1693 {
1694 *loc_qev_last_ptr_ptr= qev_free_list;
1695 qev_free_list= loc_qev_list;
1696 loc_qev_list= NULL;
1697 dequeue2(loc_qev_size);
1698 /* Signal that our queue can now accept more events. */
1699 mysql_cond_signal(&COND_rpl_thread_queue);
1700 loc_qev_size= 0;
1701 qev_free_pending= 0;
1702 }
1703 if (loc_rgi_list)
1704 {
1705 *loc_rgi_last_ptr_ptr= rgi_free_list;
1706 rgi_free_list= loc_rgi_list;
1707 loc_rgi_list= NULL;
1708 }
1709 if (loc_gco_list)
1710 {
1711 *loc_gco_last_ptr_ptr= gco_free_list;
1712 gco_free_list= loc_gco_list;
1713 loc_gco_list= NULL;
1714 }
1715}
1716
1717
1718void
1719rpl_parallel_thread::inuse_relaylog_refcount_update()
1720{
1721 inuse_relaylog *ir= accumulated_ir_last;
1722 if (ir)
1723 {
1724 my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
1725 accumulated_ir_count= 0;
1726 accumulated_ir_last= NULL;
1727 }
1728}
1729
1730
1731rpl_parallel_thread::queued_event *
1732rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
1733{
1734 queued_event *qev;
1735 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1736 if ((qev= qev_free_list))
1737 qev_free_list= qev->next;
1738 else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
1739 {
1740 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
1741 return NULL;
1742 }
1743 qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
1744 qev->ev= ev;
1745 qev->event_size= (size_t)event_size;
1746 qev->next= NULL;
1747 return qev;
1748}
1749
1750
1751rpl_parallel_thread::queued_event *
1752rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
1753 Relay_log_info *rli)
1754{
1755 queued_event *qev= get_qev_common(ev, event_size);
1756 if (!qev)
1757 return NULL;
1758 strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
1759 qev->event_relay_log_pos= rli->event_relay_log_pos;
1760 qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
1761 strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
1762 return qev;
1763}
1764
1765
1766rpl_parallel_thread::queued_event *
1767rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
1768 const char *relay_log_name,
1769 ulonglong event_pos, ulonglong event_size)
1770{
1771 queued_event *qev= get_qev_common(ev, event_size);
1772 if (!qev)
1773 return NULL;
1774 qev->rgi= orig_qev->rgi;
1775 strcpy(qev->event_relay_log_name, relay_log_name);
1776 qev->event_relay_log_pos= event_pos;
1777 qev->future_event_relay_log_pos= event_pos+event_size;
1778 strcpy(qev->future_event_master_log_name,
1779 orig_qev->future_event_master_log_name);
1780 return qev;
1781}
1782
1783
1784void
1785rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
1786{
1787 inuse_relaylog *ir= qev->ir;
1788 inuse_relaylog *last_ir= accumulated_ir_last;
1789 if (ir != last_ir)
1790 {
1791 if (last_ir)
1792 inuse_relaylog_refcount_update();
1793 accumulated_ir_last= ir;
1794 }
1795 ++accumulated_ir_count;
1796 if (!loc_qev_list)
1797 loc_qev_last_ptr_ptr= &qev->next;
1798 else
1799 qev->next= loc_qev_list;
1800 loc_qev_list= qev;
1801 loc_qev_size+= qev->event_size;
1802 /*
1803 We want to release to the global free list only occasionally, to avoid
1804 having to take the LOCK_rpl_thread muted too many times.
1805
1806 However, we do need to release regularly. If we let the unreleased part
1807 grow too large, then the SQL driver thread may go to sleep waiting for
1808 the queue to drop below opt_slave_parallel_max_queued, and this in turn
1809 can stall all other worker threads for more stuff to do.
1810 */
1811 if (++qev_free_pending >= QEV_BATCH_FREE ||
1812 loc_qev_size >= opt_slave_parallel_max_queued/3)
1813 {
1814 mysql_mutex_lock(&LOCK_rpl_thread);
1815 batch_free();
1816 mysql_mutex_unlock(&LOCK_rpl_thread);
1817 }
1818}
1819
1820
1821void
1822rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
1823{
1824 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1825 qev->next= qev_free_list;
1826 qev_free_list= qev;
1827}
1828
1829
1830rpl_group_info*
1831rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
1832 rpl_parallel_entry *e, ulonglong event_size)
1833{
1834 rpl_group_info *rgi;
1835 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1836 if ((rgi= rgi_free_list))
1837 {
1838 rgi_free_list= rgi->next;
1839 rgi->reinit(rli);
1840 }
1841 else
1842 {
1843 if(!(rgi= new rpl_group_info(rli)))
1844 {
1845 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
1846 return NULL;
1847 }
1848 rgi->is_parallel_exec = true;
1849 }
1850 if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()) &&
1851 !rgi->deferred_events)
1852 rgi->deferred_events= new Deferred_log_events(rli);
1853 if (event_group_new_gtid(rgi, gtid_ev))
1854 {
1855 free_rgi(rgi);
1856 my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
1857 return NULL;
1858 }
1859 rgi->parallel_entry= e;
1860 rgi->relay_log= rli->last_inuse_relaylog;
1861 rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
1862 rgi->retry_event_count= 0;
1863 rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
1864
1865 return rgi;
1866}
1867
1868
1869void
1870rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
1871{
1872 DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
1873 rgi->free_annotate_event();
1874 if (!loc_rgi_list)
1875 loc_rgi_last_ptr_ptr= &rgi->next;
1876 else
1877 rgi->next= loc_rgi_list;
1878 loc_rgi_list= rgi;
1879}
1880
1881
1882void
1883rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
1884{
1885 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1886 DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
1887 rgi->free_annotate_event();
1888 rgi->next= rgi_free_list;
1889 rgi_free_list= rgi;
1890}
1891
1892
1893group_commit_orderer *
1894rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
1895 uint64 prior_sub_id)
1896{
1897 group_commit_orderer *gco;
1898 mysql_mutex_assert_owner(&LOCK_rpl_thread);
1899 if ((gco= gco_free_list))
1900 gco_free_list= gco->next_gco;
1901 else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
1902 {
1903 my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
1904 return NULL;
1905 }
1906 mysql_cond_init(key_COND_group_commit_orderer,
1907 &gco->COND_group_commit_orderer, NULL);
1908 gco->wait_count= wait_count;
1909 gco->prev_gco= prev;
1910 gco->next_gco= NULL;
1911 gco->prior_sub_id= prior_sub_id;
1912 gco->installed= false;
1913 gco->flags= 0;
1914 return gco;
1915}
1916
1917
1918void
1919rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
1920{
1921 if (!loc_gco_list)
1922 loc_gco_last_ptr_ptr= &gco->next_gco;
1923 else
1924 gco->next_gco= loc_gco_list;
1925 loc_gco_list= gco;
1926}
1927
1928
1929rpl_parallel_thread_pool::rpl_parallel_thread_pool()
1930 : threads(0), free_list(0), count(0), inited(false), busy(false)
1931{
1932}
1933
1934
1935int
1936rpl_parallel_thread_pool::init(uint32 size)
1937{
1938 threads= NULL;
1939 free_list= NULL;
1940 count= 0;
1941 busy= false;
1942
1943 mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
1944 MY_MUTEX_INIT_SLOW);
1945 mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
1946 inited= true;
1947
1948 /*
1949 The pool is initially empty. Threads will be spawned when a slave SQL
1950 thread is started.
1951 */
1952
1953 return 0;
1954}
1955
1956
1957void
1958rpl_parallel_thread_pool::destroy()
1959{
1960 if (!inited)
1961 return;
1962 rpl_parallel_change_thread_count(this, 0, 1);
1963 mysql_mutex_destroy(&LOCK_rpl_thread_pool);
1964 mysql_cond_destroy(&COND_rpl_thread_pool);
1965 inited= false;
1966}
1967
1968
1969/*
1970 Wait for a worker thread to become idle. When one does, grab the thread for
1971 our use and return it.
1972
1973 Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
1974*/
1975struct rpl_parallel_thread *
1976rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
1977 rpl_parallel_entry *entry)
1978{
1979 rpl_parallel_thread *rpt;
1980
1981 DBUG_ASSERT(count > 0);
1982 mysql_mutex_lock(&LOCK_rpl_thread_pool);
1983 while (unlikely(busy) || !(rpt= free_list))
1984 mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
1985 free_list= rpt->next;
1986 mysql_mutex_unlock(&LOCK_rpl_thread_pool);
1987 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
1988 rpt->current_owner= owner;
1989 rpt->current_entry= entry;
1990
1991 return rpt;
1992}
1993
1994
1995/*
1996 Release a thread to the thread pool.
1997 The thread should be locked, and should not have any work queued for it.
1998*/
1999void
2000rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
2001{
2002 rpl_parallel_thread *list;
2003
2004 mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
2005 DBUG_ASSERT(rpt->current_owner == NULL);
2006 mysql_mutex_lock(&LOCK_rpl_thread_pool);
2007 list= free_list;
2008 rpt->next= list;
2009 free_list= rpt;
2010 if (!list)
2011 mysql_cond_broadcast(&COND_rpl_thread_pool);
2012 mysql_mutex_unlock(&LOCK_rpl_thread_pool);
2013}
2014
2015
2016/*
2017 Obtain a worker thread that we can queue an event to.
2018
2019 Each invocation allocates a new worker thread, to maximise
2020 parallelism. However, only up to a maximum of
2021 --slave-domain-parallel-threads workers can be occupied by a single
2022 replication domain; after that point, we start re-using worker threads that
2023 are still executing events that were queued earlier for this thread.
2024
2025 We never queue more than --rpl-parallel-wait-queue_max amount of events
2026 for one worker, to avoid the SQL driver thread using up all memory with
2027 queued events while worker threads are stalling.
2028
2029 Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
2030 locked. Exception is if we were killed, in which case NULL is returned.
2031
2032 The *did_enter_cond flag is set true if we had to wait for a worker thread
2033 to become free (with mysql_cond_wait()). If so, old_stage will also be set,
2034 and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
2035 of mysql_mutex_unlock.
2036
2037 If the flag `reuse' is set, the last worker thread will be returned again,
2038 if it is still available. Otherwise a new worker thread is allocated.
2039*/
2040rpl_parallel_thread *
2041rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
2042 PSI_stage_info *old_stage, bool reuse)
2043{
2044 uint32 idx;
2045 Relay_log_info *rli= rgi->rli;
2046 rpl_parallel_thread *thr;
2047
2048 idx= rpl_thread_idx;
2049 if (!reuse)
2050 {
2051 ++idx;
2052 if (idx >= rpl_thread_max)
2053 idx= 0;
2054 rpl_thread_idx= idx;
2055 }
2056 thr= rpl_threads[idx];
2057 if (thr)
2058 {
2059 *did_enter_cond= false;
2060 mysql_mutex_lock(&thr->LOCK_rpl_thread);
2061 for (;;)
2062 {
2063 if (thr->current_owner != &rpl_threads[idx])
2064 {
2065 /*
2066 The worker thread became idle, and returned to the free list and
2067 possibly was allocated to a different request. So we should allocate
2068 a new worker thread.
2069 */
2070 unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
2071 did_enter_cond, old_stage);
2072 thr= NULL;
2073 break;
2074 }
2075 else if (thr->queued_size <= opt_slave_parallel_max_queued)
2076 {
2077 /* The thread is ready to queue into. */
2078 break;
2079 }
2080 else if (unlikely(rli->sql_driver_thd->check_killed()))
2081 {
2082 unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
2083 did_enter_cond, old_stage);
2084 my_error(ER_CONNECTION_KILLED, MYF(0));
2085 DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
2086 {
2087 debug_sync_set_action(rli->sql_driver_thd,
2088 STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
2089 };);
2090 slave_output_error_info(rgi, rli->sql_driver_thd);
2091 return NULL;
2092 }
2093 else
2094 {
2095 /*
2096 We have reached the limit of how much memory we are allowed to use
2097 for queuing events, so wait for the thread to consume some of its
2098 queue.
2099 */
2100 if (!*did_enter_cond)
2101 {
2102 /*
2103 We need to do the debug_sync before ENTER_COND().
2104 Because debug_sync changes the thd->mysys_var->current_mutex,
2105 and this can cause THD::awake to use the wrong mutex.
2106 */
2107 DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
2108 {
2109 debug_sync_set_action(rli->sql_driver_thd,
2110 STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
2111 };);
2112 rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
2113 &thr->LOCK_rpl_thread,
2114 &stage_waiting_for_room_in_worker_thread,
2115 old_stage);
2116 *did_enter_cond= true;
2117 }
2118 mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
2119 }
2120 }
2121 }
2122 if (!thr)
2123 rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
2124 this);
2125
2126 return thr;
2127}
2128
2129static void
2130free_rpl_parallel_entry(void *element)
2131{
2132 rpl_parallel_entry *e= (rpl_parallel_entry *)element;
2133 while (e->current_gco)
2134 {
2135 group_commit_orderer *prev_gco= e->current_gco->prev_gco;
2136 dealloc_gco(e->current_gco);
2137 e->current_gco= prev_gco;
2138 }
2139 mysql_cond_destroy(&e->COND_parallel_entry);
2140 mysql_mutex_destroy(&e->LOCK_parallel_entry);
2141 my_free(e);
2142}
2143
2144
2145rpl_parallel::rpl_parallel() :
2146 current(NULL), sql_thread_stopping(false)
2147{
2148 my_hash_init(&domain_hash, &my_charset_bin, 32,
2149 offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
2150 NULL, free_rpl_parallel_entry, HASH_UNIQUE);
2151}
2152
2153
2154void
2155rpl_parallel::reset()
2156{
2157 my_hash_reset(&domain_hash);
2158 current= NULL;
2159 sql_thread_stopping= false;
2160}
2161
2162
2163rpl_parallel::~rpl_parallel()
2164{
2165 my_hash_free(&domain_hash);
2166}
2167
2168
2169rpl_parallel_entry *
2170rpl_parallel::find(uint32 domain_id)
2171{
2172 struct rpl_parallel_entry *e;
2173
2174 if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash,
2175 (const uchar *)&domain_id, 0)))
2176 {
2177 /* Allocate a new, empty one. */
2178 ulong count= opt_slave_domain_parallel_threads;
2179 if (count == 0 || count > opt_slave_parallel_threads)
2180 count= opt_slave_parallel_threads;
2181 rpl_parallel_thread **p;
2182 if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
2183 &e, sizeof(*e),
2184 &p, count*sizeof(*p),
2185 NULL))
2186 {
2187 my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
2188 return NULL;
2189 }
2190 e->rpl_threads= p;
2191 e->rpl_thread_max= count;
2192 e->domain_id= domain_id;
2193 e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
2194 e->pause_sub_id= (uint64)ULONGLONG_MAX;
2195 if (my_hash_insert(&domain_hash, (uchar *)e))
2196 {
2197 my_free(e);
2198 return NULL;
2199 }
2200 mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
2201 MY_MUTEX_INIT_FAST);
2202 mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
2203 }
2204 else
2205 e->force_abort= false;
2206
2207 return e;
2208}
2209
2210/**
2211 Wait until all sql worker threads has stopped processing
2212
2213 This is called when sql thread has been killed/stopped
2214*/
2215
2216void
2217rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
2218{
2219 struct rpl_parallel_entry *e;
2220 rpl_parallel_thread *rpt;
2221 uint32 i, j;
2222
2223 /*
2224 First signal all workers that they must force quit; no more events will
2225 be queued to complete any partial event groups executed.
2226 */
2227 for (i= 0; i < domain_hash.records; ++i)
2228 {
2229 e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
2230 mysql_mutex_lock(&e->LOCK_parallel_entry);
2231 /*
2232 We want the worker threads to stop as quickly as is safe. If the slave
2233 SQL threads are behind, we could have significant amount of events
2234 queued for the workers, and we want to stop without waiting for them
2235 all to be applied first. But if any event group has already started
2236 executing in a worker, we want to be sure that all prior event groups
2237 are also executed, so that we stop at a consistent point in the binlog
2238 stream (per replication domain).
2239
2240 All event groups wait for e->count_committing_event_groups to reach
2241 the value of group_commit_orderer::wait_count before starting to
2242 execute. Thus, at this point we know that any event group with a
2243 strictly larger wait_count are safe to skip, none of them can have
2244 started executing yet. So we set e->stop_count here and use it to
2245 decide in the worker threads whether to continue executing an event
2246 group or whether to skip it, when force_abort is set.
2247
2248 If we stop due to reaching the START SLAVE UNTIL condition, then we
2249 need to continue executing any queued events up to that point.
2250 */
2251 e->force_abort= true;
2252 e->stop_count= rli->stop_for_until ?
2253 e->count_queued_event_groups : e->count_committing_event_groups;
2254 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2255 for (j= 0; j < e->rpl_thread_max; ++j)
2256 {
2257 if ((rpt= e->rpl_threads[j]))
2258 {
2259 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
2260 if (rpt->current_owner == &e->rpl_threads[j])
2261 mysql_cond_signal(&rpt->COND_rpl_thread);
2262 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
2263 }
2264 }
2265 }
2266 DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
2267 {
2268 debug_sync_set_action(thd,
2269 STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
2270 };);
2271
2272 for (i= 0; i < domain_hash.records; ++i)
2273 {
2274 e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
2275 for (j= 0; j < e->rpl_thread_max; ++j)
2276 {
2277 if ((rpt= e->rpl_threads[j]))
2278 {
2279 mysql_mutex_lock(&rpt->LOCK_rpl_thread);
2280 while (rpt->current_owner == &e->rpl_threads[j])
2281 mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
2282 mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
2283 }
2284 }
2285 }
2286}
2287
2288
2289/*
2290 This function handles the case where the SQL driver thread reached the
2291 START SLAVE UNTIL position; we stop queueing more events but continue
2292 processing remaining, already queued events; then use executes manual
2293 STOP SLAVE; then this function signals to worker threads that they
2294 should stop the processing of any remaining queued events.
2295*/
2296void
2297rpl_parallel::stop_during_until()
2298{
2299 struct rpl_parallel_entry *e;
2300 uint32 i;
2301
2302 for (i= 0; i < domain_hash.records; ++i)
2303 {
2304 e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
2305 mysql_mutex_lock(&e->LOCK_parallel_entry);
2306 if (e->force_abort)
2307 e->stop_count= e->count_committing_event_groups;
2308 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2309 }
2310}
2311
2312
2313bool
2314rpl_parallel::workers_idle()
2315{
2316 struct rpl_parallel_entry *e;
2317 uint32 i, max_i;
2318
2319 max_i= domain_hash.records;
2320 for (i= 0; i < max_i; ++i)
2321 {
2322 bool active;
2323 e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
2324 mysql_mutex_lock(&e->LOCK_parallel_entry);
2325 active= e->current_sub_id > e->last_committed_sub_id;
2326 mysql_mutex_unlock(&e->LOCK_parallel_entry);
2327 if (active)
2328 break;
2329 }
2330 return (i == max_i);
2331}
2332
2333
2334int
2335rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
2336 Format_description_log_event *fdev)
2337{
2338 uint32 idx;
2339 rpl_parallel_thread *thr;
2340 rpl_parallel_thread::queued_event *qev;
2341 Relay_log_info *rli= rgi->rli;
2342
2343 /*
2344 We only need to queue the server restart if we still have a thread working
2345 on a (potentially partial) event group.
2346
2347 If the last thread we queued for has finished, then it cannot have any
2348 partial event group that needs aborting.
2349
2350 Thus there is no need for the full complexity of choose_thread(). We only
2351 need to check if we have a current worker thread, and queue for it if so.
2352 */
2353 idx= rpl_thread_idx;
2354 thr= rpl_threads[idx];
2355 if (!thr)
2356 return 0;
2357 mysql_mutex_lock(&thr->LOCK_rpl_thread);
2358 if (thr->current_owner != &rpl_threads[idx])
2359 {
2360 /* No active worker thread, so no need to queue the master restart. */
2361 mysql_mutex_unlock(&thr->LOCK_rpl_thread);
2362 return 0;
2363 }
2364
2365 if (!(qev= thr->get_qev(fdev, 0, rli)))
2366 {
2367 mysql_mutex_unlock(&thr->LOCK_rpl_thread);
2368 return 1;
2369 }
2370
2371 qev->rgi= rgi;
2372 qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART;
2373 qev->entry_for_queued= this;
2374 qev->ir= rli->last_inuse_relaylog;
2375 ++qev->ir->queued_count;
2376 thr->enqueue(qev);
2377 mysql_cond_signal(&thr->COND_rpl_thread);
2378 mysql_mutex_unlock(&thr->LOCK_rpl_thread);
2379 return 0;
2380}
2381
2382
2383int
2384rpl_parallel::wait_for_workers_idle(THD *thd)
2385{
2386 uint32 i, max_i;
2387
2388 /*
2389 The domain_hash is only accessed by the SQL driver thread, so it is safe
2390 to iterate over without a lock.
2391 */
2392 max_i= domain_hash.records;
2393 for (i= 0; i < max_i; ++i)
2394 {
2395 PSI_stage_info old_stage;
2396 struct rpl_parallel_entry *e;
2397 int err= 0;
2398
2399 e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
2400 mysql_mutex_lock(&e->LOCK_parallel_entry);
2401 ++e->need_sub_id_signal;
2402 thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
2403 &stage_waiting_for_workers_idle, &old_stage);
2404 while (e->current_sub_id > e->last_committed_sub_id)
2405 {
2406 if (unlikely(thd->check_killed()))
2407 {
2408 thd->send_kill_message();
2409 err= 1;
2410 break;
2411 }
2412 mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
2413 }
2414 --e->need_sub_id_signal;
2415 thd->EXIT_COND(&old_stage);
2416 if (err)
2417 return err;
2418 }
2419 return 0;
2420}
2421
2422
2423/*
2424 Handle seeing a GTID during slave restart in GTID mode. If we stopped with
2425 different replication domains having reached different positions in the relay
2426 log, we need to skip event groups in domains that are further progressed.
2427
2428 Updates the state with the seen GTID, and returns true if this GTID should
2429 be skipped, false otherwise.
2430*/
2431bool
2432process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
2433{
2434 slave_connection_state::entry *gtid_entry;
2435 slave_connection_state *state= &rli->restart_gtid_pos;
2436
2437 if (likely(state->count() == 0) ||
2438 !(gtid_entry= state->find_entry(gtid->domain_id)))
2439 return false;
2440 if (gtid->server_id == gtid_entry->gtid.server_id)
2441 {
2442 uint64 seq_no= gtid_entry->gtid.seq_no;
2443 if (gtid->seq_no >= seq_no)
2444 {
2445 /*
2446 This domain has reached its start position. So remove it, so that
2447 further events will be processed normally.
2448 */
2449 state->remove(&gtid_entry->gtid);
2450 }
2451 return gtid->seq_no <= seq_no;
2452 }
2453 else
2454 return true;
2455}
2456
2457
2458/*
2459 This is used when we get an error during processing in do_event();
2460 We will not queue any event to the thread, but we still need to wake it up
2461 to be sure that it will be returned to the pool.
2462*/
2463static void
2464abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
2465 bool *did_enter_cond, PSI_stage_info *old_stage)
2466{
2467 unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
2468 did_enter_cond, old_stage);
2469 mysql_cond_signal(&cur_thread->COND_rpl_thread);
2470}
2471
2472
2473/*
2474 do_event() is executed by the sql_driver_thd thread.
2475 It's main purpose is to find a thread that can execute the query.
2476
2477 @retval 0 ok, event was accepted
2478 @retval 1 error
2479 @retval -1 event should be executed serially, in the sql driver thread
2480*/
2481
2482int
2483rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
2484 ulonglong event_size)
2485{
2486 rpl_parallel_entry *e;
2487 rpl_parallel_thread *cur_thread;
2488 rpl_parallel_thread::queued_event *qev;
2489 rpl_group_info *rgi= NULL;
2490 Relay_log_info *rli= serial_rgi->rli;
2491 enum Log_event_type typ;
2492 bool is_group_event;
2493 bool did_enter_cond= false;
2494 PSI_stage_info old_stage;
2495
2496 DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
2497 /* Handle master log name change, seen in Rotate_log_event. */
2498 typ= ev->get_type_code();
2499 if (unlikely(typ == ROTATE_EVENT))
2500 {
2501 Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev);
2502 if ((rev->server_id != global_system_variables.server_id ||
2503 rli->replicate_same_server_id) &&
2504 !rev->is_relay_log_event() &&
2505 !rli->is_in_group())
2506 {
2507 memcpy(rli->future_event_master_log_name,
2508 rev->new_log_ident, rev->ident_len+1);
2509 rli->notify_group_master_log_name_update();
2510 }
2511 }
2512
2513 /*
2514 Execute queries non-parallel if slave_skip_counter is set, as it's is
2515 easier to skip queries in single threaded mode.
2516 */
2517 if (rli->slave_skip_counter)
2518 return -1;
2519
2520 /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
2521 is_group_event= Log_event::is_group_event(typ);
2522 if (unlikely(!current) && typ != GTID_EVENT &&
2523 !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
2524 return -1;
2525
2526 /* Note: rli->data_lock is released by sql_delay_event(). */
2527 if (sql_delay_event(ev, rli->sql_driver_thd, serial_rgi))
2528 {
2529 /*
2530 If sql_delay_event() returns non-zero, it means that the wait timed out
2531 due to slave stop. We should not queue the event in this case, it must
2532 not be applied yet.
2533 */
2534 delete ev;
2535 return 1;
2536 }
2537
2538 if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
2539 {
2540 Format_description_log_event *fdev=
2541 static_cast<Format_description_log_event *>(ev);
2542 if (fdev->created)
2543 {
2544 /*
2545 This format description event marks a new binlog after a master server
2546 restart. We are going to close all temporary tables to clean up any
2547 possible left-overs after a prior master crash.
2548
2549 Thus we need to wait for all prior events to execute to completion,
2550 in case they need access to any of the temporary tables.
2551
2552 We also need to notify the worker thread running the prior incomplete
2553 event group (if any), as such event group signifies an incompletely
2554 written group cut short by a master crash, and must be rolled back.
2555 */
2556 if (current->queue_master_restart(serial_rgi, fdev) ||
2557 wait_for_workers_idle(rli->sql_driver_thd))
2558 {
2559 delete ev;
2560 return 1;
2561 }
2562 }
2563 }
2564 else if (unlikely(typ == GTID_LIST_EVENT))
2565 {
2566 Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
2567 rpl_gtid *list= glev->list;
2568 uint32 count= glev->count;
2569 rli->update_relay_log_state(list, count);
2570 while (count)
2571 {
2572 process_gtid_for_restart_pos(rli, list);
2573 ++list;
2574 --count;
2575 }
2576 }
2577
2578 /*
2579 Stop queueing additional event groups once the SQL thread is requested to
2580 stop.
2581
2582 We have to queue any remaining events of any event group that has already
2583 been partially queued, but after that we will just ignore any further
2584 events the SQL driver thread may try to queue, and eventually it will stop.
2585 */
2586 if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
2587 sql_thread_stopping= true;
2588 if (sql_thread_stopping)
2589 {
2590 delete ev;
2591 /*
2592 Return "no error"; normal stop is not an error, and otherwise the error
2593 has already been recorded.
2594 */
2595 return 0;
2596 }
2597
2598 if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
2599 {
2600 if (typ == GTID_EVENT)
2601 rli->gtid_skip_flag= GTID_SKIP_NOT;
2602 else
2603 {
2604 if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
2605 {
2606 if (!Log_event::is_part_of_group(typ))
2607 rli->gtid_skip_flag= GTID_SKIP_NOT;
2608 }
2609 else
2610 {
2611 DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
2612 if (typ == XID_EVENT ||
2613 (typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed
2614 (((Query_log_event *)ev)->is_commit() ||
2615 ((Query_log_event *)ev)->is_rollback())))
2616 rli->gtid_skip_flag= GTID_SKIP_NOT;
2617 }
2618 delete_or_keep_event_post_apply(serial_rgi, typ, ev);
2619 return 0;
2620 }
2621 }
2622
2623 if (typ == GTID_EVENT)
2624 {
2625 rpl_gtid gtid;
2626 Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
2627 uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
2628 rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
2629 0 : gtid_ev->domain_id);
2630 if (!(e= find(domain_id)))
2631 {
2632 my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
2633 delete ev;
2634 return 1;
2635 }
2636 current= e;
2637
2638 gtid.domain_id= gtid_ev->domain_id;
2639 gtid.server_id= gtid_ev->server_id;
2640 gtid.seq_no= gtid_ev->seq_no;
2641 rli->update_relay_log_state(&gtid, 1);
2642 if (process_gtid_for_restart_pos(rli, &gtid))
2643 {
2644 /*
2645 This domain has progressed further into the relay log before the last
2646 SQL thread restart. So we need to skip this event group to not doubly
2647 apply it.
2648 */
2649 rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
2650 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
2651 delete_or_keep_event_post_apply(serial_rgi, typ, ev);
2652 return 0;
2653 }
2654 }
2655 else
2656 e= current;
2657
2658 /*
2659 Find a worker thread to queue the event for.
2660 Prefer a new thread, so we maximise parallelism (at least for the group
2661 commit). But do not exceed a limit of --slave-domain-parallel-threads;
2662 instead re-use a thread that we queued for previously.
2663 */
2664 cur_thread=
2665 e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
2666 typ != GTID_EVENT);
2667 if (!cur_thread)
2668 {
2669 /* This means we were killed. The error is already signalled. */
2670 delete ev;
2671 return 1;
2672 }
2673
2674 if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
2675 {
2676 abandon_worker_thread(rli->sql_driver_thd, cur_thread,
2677 &did_enter_cond, &old_stage);
2678 delete ev;
2679 return 1;
2680 }
2681
2682 if (typ == GTID_EVENT)
2683 {
2684 Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
2685 bool new_gco;
2686 enum_slave_parallel_mode mode= rli->mi->parallel_mode;
2687 uchar gtid_flags= gtid_ev->flags2;
2688 group_commit_orderer *gco;
2689 uint8 force_switch_flag;
2690 enum rpl_group_info::enum_speculation speculation;
2691
2692 if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
2693 {
2694 cur_thread->free_qev(qev);
2695 abandon_worker_thread(rli->sql_driver_thd, cur_thread,
2696 &did_enter_cond, &old_stage);
2697 delete ev;
2698 return 1;
2699 }
2700
2701 /*
2702 We queue the event group in a new worker thread, to run in parallel
2703 with previous groups.
2704
2705 To preserve commit order within the replication domain, we set up
2706 rgi->wait_commit_sub_id to make the new group commit only after the
2707 previous group has committed.
2708
2709 Event groups that group-committed together on the master can be run
2710 in parallel with each other without restrictions. But one batch of
2711 group-commits may not start before all groups in the previous batch
2712 have initiated their commit phase; we set up rgi->gco to ensure that.
2713 */
2714 rgi->wait_commit_sub_id= e->current_sub_id;
2715 rgi->wait_commit_group_info= e->current_group_info;
2716
2717 speculation= rpl_group_info::SPECULATE_NO;
2718 new_gco= true;
2719 force_switch_flag= 0;
2720 gco= e->current_gco;
2721 if (likely(gco))
2722 {
2723 uint8 flags= gco->flags;
2724
2725 if (mode <= SLAVE_PARALLEL_MINIMAL ||
2726 !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
2727 e->last_commit_id != gtid_ev->commit_id)
2728 flags|= group_commit_orderer::MULTI_BATCH;
2729 /* Make sure we do not attempt to run DDL in parallel speculatively. */
2730 if (gtid_flags & Gtid_log_event::FL_DDL)
2731 flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
2732
2733 if (!(flags & group_commit_orderer::MULTI_BATCH))
2734 {
2735 /*
2736 Still the same batch of event groups that group-committed together
2737 on the master, so we can run in parallel.
2738 */
2739 new_gco= false;
2740 }
2741 else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) &&
2742 !(flags & group_commit_orderer::FORCE_SWITCH))
2743 {
2744 /*
2745 In transactional parallel mode, we optimistically attempt to run
2746 non-DDL in parallel. In case of conflicts, we catch the conflict as
2747 a deadlock or other error, roll back and retry serially.
2748
2749 The assumption is that only a few event groups will be
2750 non-transactional or otherwise unsuitable for parallel apply. Those
2751 transactions are still scheduled in parallel, but we set a flag that
2752 will make the worker thread wait for everything before to complete
2753 before starting.
2754 */
2755 new_gco= false;
2756 if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
2757 ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
2758 (gtid_flags & Gtid_log_event::FL_WAITED)) &&
2759 (mode < SLAVE_PARALLEL_AGGRESSIVE)))
2760 {
2761 /*
2762 This transaction should not be speculatively run in parallel with
2763 what came before, either because it cannot safely be rolled back in
2764 case of a conflict, or because it was marked as likely to conflict
2765 and require expensive rollback and retry.
2766
2767 Here we mark it as such, and then the worker thread will do a
2768 wait_for_prior_commit() before starting it. We do not introduce a
2769 new group_commit_orderer, since we still want following transactions
2770 to run in parallel with transactions prior to this one.
2771 */
2772 speculation= rpl_group_info::SPECULATE_WAIT;
2773 }
2774 else
2775 speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
2776 }
2777 gco->flags= flags;
2778 }
2779 else
2780 {
2781 if (gtid_flags & Gtid_log_event::FL_DDL)
2782 force_switch_flag= group_commit_orderer::FORCE_SWITCH;
2783 }
2784 rgi->speculation= speculation;
2785
2786 if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
2787 e->last_commit_id= gtid_ev->commit_id;
2788 else
2789 e->last_commit_id= 0;
2790
2791 if (new_gco)
2792 {
2793 /*
2794 Do not run this event group in parallel with what came before; instead
2795 wait for everything prior to at least have started its commit phase, to
2796 avoid any risk of performing any conflicting action too early.
2797
2798 Remember the count that marks the end of the previous batch of event
2799 groups that run in parallel, and allocate a new gco.
2800 */
2801 uint64 count= e->count_queued_event_groups;
2802
2803 if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id)))
2804 {
2805 cur_thread->free_rgi(rgi);
2806 cur_thread->free_qev(qev);
2807 abandon_worker_thread(rli->sql_driver_thd, cur_thread,
2808 &did_enter_cond, &old_stage);
2809 delete ev;
2810 return 1;
2811 }
2812 gco->flags|= force_switch_flag;
2813 e->current_gco= gco;
2814 }
2815 rgi->gco= gco;
2816
2817 qev->rgi= e->current_group_info= rgi;
2818 e->current_sub_id= rgi->gtid_sub_id;
2819 ++e->count_queued_event_groups;
2820 }
2821 else if (!is_group_event)
2822 {
2823 int err;
2824 bool tmp;
2825 /*
2826 Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
2827 Same for events not preceeded by GTID (we should not see those normally,
2828 but they might be from an old master).
2829 */
2830 qev->rgi= serial_rgi;
2831
2832 tmp= serial_rgi->is_parallel_exec;
2833 serial_rgi->is_parallel_exec= true;
2834 err= rpt_handle_event(qev, NULL);
2835 serial_rgi->is_parallel_exec= tmp;
2836 if (ev->is_relay_log_event())
2837 qev->future_event_master_log_pos= 0;
2838 else if (typ == ROTATE_EVENT)
2839 qev->future_event_master_log_pos=
2840 (static_cast<Rotate_log_event *>(ev))->pos;
2841 else
2842 qev->future_event_master_log_pos= ev->log_pos;
2843 delete_or_keep_event_post_apply(serial_rgi, typ, ev);
2844
2845 if (err)
2846 {
2847 cur_thread->free_qev(qev);
2848 abandon_worker_thread(rli->sql_driver_thd, cur_thread,
2849 &did_enter_cond, &old_stage);
2850 return 1;
2851 }
2852 /*
2853 Queue a position update, so that the position will be updated in a
2854 reasonable way relative to other events:
2855
2856 - If the currently executing events are queued serially for a single
2857 thread, the position will only be updated when everything before has
2858 completed.
2859
2860 - If we are executing multiple independent events in parallel, then at
2861 least the position will not be updated until one of them has reached
2862 the current point.
2863 */
2864 qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
2865 qev->entry_for_queued= e;
2866 }
2867 else
2868 {
2869 qev->rgi= e->current_group_info;
2870 }
2871
2872 /*
2873 Queue the event for processing.
2874 */
2875 qev->ir= rli->last_inuse_relaylog;
2876 ++qev->ir->queued_count;
2877 cur_thread->enqueue(qev);
2878 unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
2879 &did_enter_cond, &old_stage);
2880 mysql_cond_signal(&cur_thread->COND_rpl_thread);
2881
2882 return 0;
2883}
2884