| 1 | #include "mariadb.h" | 
| 2 | #include "rpl_parallel.h" | 
| 3 | #include "slave.h" | 
| 4 | #include "rpl_mi.h" | 
| 5 | #include "sql_parse.h" | 
| 6 | #include "debug_sync.h" | 
| 7 |  | 
| 8 | /* | 
| 9 |   Code for optional parallel execution of replicated events on the slave. | 
| 10 | */ | 
| 11 |  | 
| 12 |  | 
| 13 | /* | 
| 14 |   Maximum number of queued events to accumulate in a local free list, before | 
| 15 |   moving them to the global free list. There is additional a limit of how much | 
| 16 |   to accumulate based on opt_slave_parallel_max_queued. | 
| 17 | */ | 
| 18 | #define QEV_BATCH_FREE 200 | 
| 19 |  | 
| 20 |  | 
| 21 | struct rpl_parallel_thread_pool global_rpl_thread_pool; | 
| 22 |  | 
| 23 | static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, | 
| 24 |                                               int err); | 
| 25 |  | 
| 26 | static int | 
| 27 | rpt_handle_event(rpl_parallel_thread::queued_event *qev, | 
| 28 |                  struct rpl_parallel_thread *rpt) | 
| 29 | { | 
| 30 |   int err; | 
| 31 |   rpl_group_info *rgi= qev->rgi; | 
| 32 |   Relay_log_info *rli= rgi->rli; | 
| 33 |   THD *thd= rgi->thd; | 
| 34 |   Log_event *ev; | 
| 35 |  | 
| 36 |   DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); | 
| 37 |   ev= qev->ev; | 
| 38 |  | 
| 39 |   thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; | 
| 40 |   ev->thd= thd; | 
| 41 |  | 
| 42 |   strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); | 
| 43 |   rgi->event_relay_log_name= rgi->event_relay_log_name_buf; | 
| 44 |   rgi->event_relay_log_pos= qev->event_relay_log_pos; | 
| 45 |   rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; | 
| 46 |   strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); | 
| 47 |   if (!(ev->is_artificial_event() || ev->is_relay_log_event() || | 
| 48 |         (ev->when == 0))) | 
| 49 |     rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; | 
| 50 |   err= apply_event_and_update_pos_for_parallel(ev, thd, rgi); | 
| 51 |  | 
| 52 |   thread_safe_increment64(&rli->executed_entries); | 
| 53 |   /* ToDo: error handling. */ | 
| 54 |   return err; | 
| 55 | } | 
| 56 |  | 
| 57 |  | 
| 58 | static void | 
| 59 | handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) | 
| 60 | { | 
| 61 |   int cmp; | 
| 62 |   Relay_log_info *rli; | 
| 63 |   rpl_parallel_entry *e; | 
| 64 |  | 
| 65 |   /* | 
| 66 |     Events that are not part of an event group, such as Format Description, | 
| 67 |     Stop, GTID List and such, are executed directly in the driver SQL thread, | 
| 68 |     to keep the relay log state up-to-date. But the associated position update | 
| 69 |     is done here, in sync with other normal events as they are queued to | 
| 70 |     worker threads. | 
| 71 |   */ | 
| 72 |   if ((thd->variables.option_bits & OPTION_BEGIN) && | 
| 73 |       opt_using_transactions) | 
| 74 |     return; | 
| 75 |  | 
| 76 |   /* Do not update position if an earlier event group caused an error abort. */ | 
| 77 |   DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); | 
| 78 |   e= qev->entry_for_queued; | 
| 79 |   if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) | 
| 80 |     return; | 
| 81 |  | 
| 82 |   rli= qev->rgi->rli; | 
| 83 |   mysql_mutex_lock(&rli->data_lock); | 
| 84 |   cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); | 
| 85 |   if (cmp < 0) | 
| 86 |   { | 
| 87 |     rli->group_relay_log_pos= qev->future_event_relay_log_pos; | 
| 88 |     strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); | 
| 89 |     rli->notify_group_relay_log_name_update(); | 
| 90 |   } else if (cmp == 0 && | 
| 91 |              rli->group_relay_log_pos < qev->future_event_relay_log_pos) | 
| 92 |     rli->group_relay_log_pos= qev->future_event_relay_log_pos; | 
| 93 |  | 
| 94 |   cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); | 
| 95 |   if (cmp < 0) | 
| 96 |   { | 
| 97 |     strcpy(rli->group_master_log_name, qev->future_event_master_log_name); | 
| 98 |     rli->group_master_log_pos= qev->future_event_master_log_pos; | 
| 99 |   } | 
| 100 |   else if (cmp == 0 | 
| 101 |            && rli->group_master_log_pos < qev->future_event_master_log_pos) | 
| 102 |     rli->group_master_log_pos= qev->future_event_master_log_pos; | 
| 103 |   mysql_mutex_unlock(&rli->data_lock); | 
| 104 |   mysql_cond_broadcast(&rli->data_cond); | 
| 105 | } | 
| 106 |  | 
| 107 |  | 
| 108 | /* | 
| 109 |   Wait for any pending deadlock kills. Since deadlock kills happen | 
| 110 |   asynchronously, we need to be sure they will be completed before starting a | 
| 111 |   new transaction. Otherwise the new transaction might suffer a spurious kill. | 
| 112 | */ | 
| 113 | static void | 
| 114 | wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi) | 
| 115 | { | 
| 116 |   PSI_stage_info old_stage; | 
| 117 |  | 
| 118 |   mysql_mutex_lock(&thd->LOCK_wakeup_ready); | 
| 119 |   thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready, | 
| 120 |                   &stage_waiting_for_deadlock_kill, &old_stage); | 
| 121 |   while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) | 
| 122 |     mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready); | 
| 123 |   thd->EXIT_COND(&old_stage); | 
| 124 | } | 
| 125 |  | 
| 126 |  | 
| 127 | static void | 
| 128 | finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, | 
| 129 |                    rpl_parallel_entry *entry, rpl_group_info *rgi) | 
| 130 | { | 
| 131 |   THD *thd= rpt->thd; | 
| 132 |   wait_for_commit *wfc= &rgi->commit_orderer; | 
| 133 |   int err; | 
| 134 |  | 
| 135 |   thd->get_stmt_da()->set_overwrite_status(true); | 
| 136 |   /* | 
| 137 |     Remove any left-over registration to wait for a prior commit to | 
| 138 |     complete. Normally, such wait would already have been removed at | 
| 139 |     this point by wait_for_prior_commit() called from within COMMIT | 
| 140 |     processing. However, in case of MyISAM and no binlog, we might not | 
| 141 |     have any commit processing, and so we need to do the wait here, | 
| 142 |     before waking up any subsequent commits, to preserve correct | 
| 143 |     order of event execution. Also, in the error case we might have | 
| 144 |     skipped waiting and thus need to remove it explicitly. | 
| 145 |  | 
| 146 |     It is important in the non-error case to do a wait, not just an | 
| 147 |     unregister. Because we might be last in a group-commit that is | 
| 148 |     replicated in parallel, and the following event will then wait | 
| 149 |     for us to complete and rely on this also ensuring that any other | 
| 150 |     event in the group has completed. | 
| 151 |  | 
| 152 |     And in the error case, correct GCO lifetime relies on the fact that once | 
| 153 |     the last event group in the GCO has executed wait_for_prior_commit(), | 
| 154 |     all earlier event groups have also committed; this way no more | 
| 155 |     mark_start_commit() calls can be made and it is safe to de-allocate | 
| 156 |     the GCO. | 
| 157 |   */ | 
| 158 |   err= wfc->wait_for_prior_commit(thd); | 
| 159 |   if (unlikely(err) && !rgi->worker_error) | 
| 160 |     signal_error_to_sql_driver_thread(thd, rgi, err); | 
| 161 |   thd->wait_for_commit_ptr= NULL; | 
| 162 |  | 
| 163 |   mysql_mutex_lock(&entry->LOCK_parallel_entry); | 
| 164 |   /* | 
| 165 |     We need to mark that this event group started its commit phase, in case we | 
| 166 |     missed it before (otherwise we would deadlock the next event group that is | 
| 167 |     waiting for this). In most cases (normal DML), it will be a no-op. | 
| 168 |   */ | 
| 169 |   rgi->mark_start_commit_no_lock(); | 
| 170 |  | 
| 171 |   if (entry->last_committed_sub_id < sub_id) | 
| 172 |   { | 
| 173 |     /* | 
| 174 |       Record that this event group has finished (eg. transaction is | 
| 175 |       committed, if transactional), so other event groups will no longer | 
| 176 |       attempt to wait for us to commit. Once we have increased | 
| 177 |       entry->last_committed_sub_id, no other threads will execute | 
| 178 |       register_wait_for_prior_commit() against us. Thus, by doing one | 
| 179 |       extra (usually redundant) wakeup_subsequent_commits() we can ensure | 
| 180 |       that no register_wait_for_prior_commit() can ever happen without a | 
| 181 |       subsequent wakeup_subsequent_commits() to wake it up. | 
| 182 |  | 
| 183 |       We can race here with the next transactions, but that is fine, as | 
| 184 |       long as we check that we do not decrease last_committed_sub_id. If | 
| 185 |       this commit is done, then any prior commits will also have been | 
| 186 |       done and also no longer need waiting for. | 
| 187 |     */ | 
| 188 |     entry->last_committed_sub_id= sub_id; | 
| 189 |     if (entry->need_sub_id_signal) | 
| 190 |       mysql_cond_broadcast(&entry->COND_parallel_entry); | 
| 191 |  | 
| 192 |     /* Now free any GCOs in which all transactions have committed. */ | 
| 193 |     group_commit_orderer *tmp_gco= rgi->gco; | 
| 194 |     while (tmp_gco && | 
| 195 |            (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id || | 
| 196 |             tmp_gco->next_gco->wait_count > entry->count_committing_event_groups)) | 
| 197 |     { | 
| 198 |       /* | 
| 199 |         We must not free a GCO before the wait_count of the following GCO has | 
| 200 |         been reached and wakeup has been sent. Otherwise we will lose the | 
| 201 |         wakeup and hang (there were several such bugs in the past). | 
| 202 |  | 
| 203 |         The intention is that this is ensured already since we only free when | 
| 204 |         the last event group in the GCO has committed | 
| 205 |         (tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have | 
| 206 |         extra check on next_gco->wait_count to hopefully avoid hanging; we | 
| 207 |         have here an assertion in debug builds that this check does not in | 
| 208 |         fact trigger. | 
| 209 |       */ | 
| 210 |       DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id); | 
| 211 |       tmp_gco= tmp_gco->prev_gco; | 
| 212 |     } | 
| 213 |     while (tmp_gco) | 
| 214 |     { | 
| 215 |       group_commit_orderer *prev_gco= tmp_gco->prev_gco; | 
| 216 |       tmp_gco->next_gco->prev_gco= NULL; | 
| 217 |       rpt->loc_free_gco(tmp_gco); | 
| 218 |       tmp_gco= prev_gco; | 
| 219 |     } | 
| 220 |   } | 
| 221 |  | 
| 222 |   /* | 
| 223 |     If this event group got error, then any following event groups that have | 
| 224 |     not yet started should just skip their group, preparing for stop of the | 
| 225 |     SQL driver thread. | 
| 226 |   */ | 
| 227 |   if (unlikely(rgi->worker_error) && | 
| 228 |       entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) | 
| 229 |     entry->stop_on_error_sub_id= sub_id; | 
| 230 |   mysql_mutex_unlock(&entry->LOCK_parallel_entry); | 
| 231 |  | 
| 232 |   DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry" , { | 
| 233 |       if (rgi->current_gtid.seq_no == 1000) { | 
| 234 |         DBUG_ASSERT(entry->stop_on_error_sub_id == sub_id); | 
| 235 |         debug_sync_set_action(thd, | 
| 236 |                               STRING_WITH_LEN("now WAIT_FOR proceed_by_1000" )); | 
| 237 |       } | 
| 238 |     }); | 
| 239 |  | 
| 240 |   if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) | 
| 241 |     wait_for_pending_deadlock_kill(thd, rgi); | 
| 242 |   thd->clear_error(); | 
| 243 |   thd->reset_killed(); | 
| 244 |   /* | 
| 245 |     Would do thd->get_stmt_da()->set_overwrite_status(false) here, but | 
| 246 |     reset_diagnostics_area() already does that. | 
| 247 |   */ | 
| 248 |   thd->get_stmt_da()->reset_diagnostics_area(); | 
| 249 |   wfc->wakeup_subsequent_commits(rgi->worker_error); | 
| 250 | } | 
| 251 |  | 
| 252 |  | 
| 253 | static void | 
| 254 | signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err) | 
| 255 | { | 
| 256 |   rgi->worker_error= err; | 
| 257 |   /* | 
| 258 |     In case we get an error during commit, inform following transactions that | 
| 259 |     we aborted our commit. | 
| 260 |   */ | 
| 261 |   rgi->unmark_start_commit(); | 
| 262 |   rgi->cleanup_context(thd, true); | 
| 263 |   rgi->rli->abort_slave= true; | 
| 264 |   rgi->rli->stop_for_until= false; | 
| 265 |   mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); | 
| 266 |   rgi->rli->relay_log.signal_relay_log_update(); | 
| 267 |   mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); | 
| 268 | } | 
| 269 |  | 
| 270 |  | 
| 271 | static void | 
| 272 | unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, | 
| 273 |                     PSI_stage_info *old_stage) | 
| 274 | { | 
| 275 |   if (*did_enter_cond) | 
| 276 |   { | 
| 277 |     thd->EXIT_COND(old_stage); | 
| 278 |     *did_enter_cond= false; | 
| 279 |   } | 
| 280 |   else | 
| 281 |     mysql_mutex_unlock(lock); | 
| 282 | } | 
| 283 |  | 
| 284 |  | 
| 285 | static void | 
| 286 | register_wait_for_prior_event_group_commit(rpl_group_info *rgi, | 
| 287 |                                            rpl_parallel_entry *entry) | 
| 288 | { | 
| 289 |   mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); | 
| 290 |   if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) | 
| 291 |   { | 
| 292 |     /* | 
| 293 |       Register that the commit of this event group must wait for the | 
| 294 |       commit of the previous event group to complete before it may | 
| 295 |       complete itself, so that we preserve commit order. | 
| 296 |     */ | 
| 297 |     wait_for_commit *waitee= | 
| 298 |       &rgi->wait_commit_group_info->commit_orderer; | 
| 299 |     rgi->commit_orderer.register_wait_for_prior_commit(waitee); | 
| 300 |   } | 
| 301 | } | 
| 302 |  | 
| 303 |  | 
| 304 | /* | 
| 305 |   Do not start parallel execution of this event group until all prior groups | 
| 306 |   have reached the commit phase that are not safe to run in parallel with. | 
| 307 | */ | 
| 308 | static bool | 
| 309 | do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, | 
| 310 |             bool *did_enter_cond, PSI_stage_info *old_stage) | 
| 311 | { | 
| 312 |   THD *thd= rgi->thd; | 
| 313 |   rpl_parallel_entry *entry= rgi->parallel_entry; | 
| 314 |   uint64 wait_count; | 
| 315 |  | 
| 316 |   mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); | 
| 317 |  | 
| 318 |   if (!gco->installed) | 
| 319 |   { | 
| 320 |     group_commit_orderer *prev_gco= gco->prev_gco; | 
| 321 |     if (prev_gco) | 
| 322 |     { | 
| 323 |       prev_gco->last_sub_id= gco->prior_sub_id; | 
| 324 |       prev_gco->next_gco= gco; | 
| 325 |     } | 
| 326 |     gco->installed= true; | 
| 327 |   } | 
| 328 |   wait_count= gco->wait_count; | 
| 329 |   if (wait_count > entry->count_committing_event_groups) | 
| 330 |   { | 
| 331 |     DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior" ); | 
| 332 |     thd->ENTER_COND(&gco->COND_group_commit_orderer, | 
| 333 |                     &entry->LOCK_parallel_entry, | 
| 334 |                     &stage_waiting_for_prior_transaction_to_start_commit, | 
| 335 |                     old_stage); | 
| 336 |     *did_enter_cond= true; | 
| 337 |     thd->set_time_for_next_stage(); | 
| 338 |     do | 
| 339 |     { | 
| 340 |       if (unlikely(thd->check_killed()) && !rgi->worker_error) | 
| 341 |       { | 
| 342 |         DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed" ); | 
| 343 |         thd->clear_error(); | 
| 344 |         thd->get_stmt_da()->reset_diagnostics_area(); | 
| 345 |         thd->send_kill_message(); | 
| 346 |         slave_output_error_info(rgi, thd); | 
| 347 |         signal_error_to_sql_driver_thread(thd, rgi, 1); | 
| 348 |         /* | 
| 349 |           Even though we were killed, we need to continue waiting for the | 
| 350 |           prior event groups to signal that we can continue. Otherwise we | 
| 351 |           mess up the accounting for ordering. However, now that we have | 
| 352 |           marked the error, events will just be skipped rather than | 
| 353 |           executed, and things will progress quickly towards stop. | 
| 354 |         */ | 
| 355 |       } | 
| 356 |       mysql_cond_wait(&gco->COND_group_commit_orderer, | 
| 357 |                       &entry->LOCK_parallel_entry); | 
| 358 |     } while (wait_count > entry->count_committing_event_groups); | 
| 359 |   } | 
| 360 |  | 
| 361 |   if (entry->force_abort && wait_count > entry->stop_count) | 
| 362 |   { | 
| 363 |     /* | 
| 364 |       We are stopping (STOP SLAVE), and this event group is beyond the point | 
| 365 |       where we can safely stop. So return a flag that will cause us to skip, | 
| 366 |       rather than execute, the following events. | 
| 367 |     */ | 
| 368 |     return true; | 
| 369 |   } | 
| 370 |   else | 
| 371 |     return false; | 
| 372 | } | 
| 373 |  | 
| 374 |  | 
| 375 | static void | 
| 376 | do_ftwrl_wait(rpl_group_info *rgi, | 
| 377 |               bool *did_enter_cond, PSI_stage_info *old_stage) | 
| 378 | { | 
| 379 |   THD *thd= rgi->thd; | 
| 380 |   rpl_parallel_entry *entry= rgi->parallel_entry; | 
| 381 |   uint64 sub_id= rgi->gtid_sub_id; | 
| 382 |   DBUG_ENTER("do_ftwrl_wait" ); | 
| 383 |  | 
| 384 |   mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); | 
| 385 |  | 
| 386 |   /* | 
| 387 |     If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this | 
| 388 |     transaction is later than transactions that have priority to complete | 
| 389 |     before FTWRL. If so, wait here so that FTWRL can proceed and complete | 
| 390 |     first. | 
| 391 |  | 
| 392 |     (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes | 
| 393 |     this test false as required). | 
| 394 |   */ | 
| 395 |   if (unlikely(sub_id > entry->pause_sub_id)) | 
| 396 |   { | 
| 397 |     thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, | 
| 398 |                     &stage_waiting_for_ftwrl, old_stage); | 
| 399 |     *did_enter_cond= true; | 
| 400 |     thd->set_time_for_next_stage(); | 
| 401 |     do | 
| 402 |     { | 
| 403 |       if (entry->force_abort || rgi->worker_error) | 
| 404 |         break; | 
| 405 |       if (unlikely(thd->check_killed())) | 
| 406 |       { | 
| 407 |         thd->send_kill_message(); | 
| 408 |         slave_output_error_info(rgi, thd); | 
| 409 |         signal_error_to_sql_driver_thread(thd, rgi, 1); | 
| 410 |         break; | 
| 411 |       } | 
| 412 |       mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); | 
| 413 |     } while (sub_id > entry->pause_sub_id); | 
| 414 |  | 
| 415 |     /* | 
| 416 |       We do not call EXIT_COND() here, as this will be done later by our | 
| 417 |       caller (since we set *did_enter_cond to true). | 
| 418 |     */ | 
| 419 |   } | 
| 420 |  | 
| 421 |   if (sub_id > entry->largest_started_sub_id) | 
| 422 |     entry->largest_started_sub_id= sub_id; | 
| 423 |  | 
| 424 |   DBUG_VOID_RETURN; | 
| 425 | } | 
| 426 |  | 
| 427 |  | 
| 428 | static int | 
| 429 | pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) | 
| 430 | { | 
| 431 |   PSI_stage_info old_stage; | 
| 432 |   int res= 0; | 
| 433 |  | 
| 434 |   /* | 
| 435 |     Wait here while the queue is busy. This is done to make FLUSH TABLES WITH | 
| 436 |     READ LOCK work correctly, without incuring extra locking penalties in | 
| 437 |     normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the | 
| 438 |     thread pool, and for this we need to make sure the pool will not go away | 
| 439 |     during the operation. The LOCK_rpl_thread_pool is not suitable for | 
| 440 |     this. It is taken by release_thread() while holding LOCK_rpl_thread; so it | 
| 441 |     must be released before locking any LOCK_rpl_thread lock, or a deadlock | 
| 442 |     can occur. | 
| 443 |  | 
| 444 |     So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and | 
| 445 |     pool size changes with this condition wait. | 
| 446 |   */ | 
| 447 |   mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); | 
| 448 |   if (thd) | 
| 449 |   { | 
| 450 |     thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, | 
| 451 |                     &stage_waiting_for_rpl_thread_pool, &old_stage); | 
| 452 |     thd->set_time_for_next_stage(); | 
| 453 |   } | 
| 454 |   while (pool->busy) | 
| 455 |   { | 
| 456 |     if (thd && unlikely(thd->check_killed())) | 
| 457 |     { | 
| 458 |       thd->send_kill_message(); | 
| 459 |       res= 1; | 
| 460 |       break; | 
| 461 |     } | 
| 462 |     mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); | 
| 463 |   } | 
| 464 |   if (!res) | 
| 465 |     pool->busy= true; | 
| 466 |   if (thd) | 
| 467 |     thd->EXIT_COND(&old_stage); | 
| 468 |   else | 
| 469 |     mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); | 
| 470 |  | 
| 471 |   return res; | 
| 472 | } | 
| 473 |  | 
| 474 |  | 
| 475 | static void | 
| 476 | pool_mark_not_busy(rpl_parallel_thread_pool *pool) | 
| 477 | { | 
| 478 |   mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); | 
| 479 |   DBUG_ASSERT(pool->busy); | 
| 480 |   pool->busy= false; | 
| 481 |   mysql_cond_broadcast(&pool->COND_rpl_thread_pool); | 
| 482 |   mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); | 
| 483 | } | 
| 484 |  | 
| 485 |  | 
| 486 | void | 
| 487 | rpl_unpause_after_ftwrl(THD *thd) | 
| 488 | { | 
| 489 |   uint32 i; | 
| 490 |   rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; | 
| 491 |   DBUG_ENTER("rpl_unpause_after_ftwrl" ); | 
| 492 |  | 
| 493 |   DBUG_ASSERT(pool->busy); | 
| 494 |  | 
| 495 |   for (i= 0; i < pool->count; ++i) | 
| 496 |   { | 
| 497 |     rpl_parallel_entry *e; | 
| 498 |     rpl_parallel_thread *rpt= pool->threads[i]; | 
| 499 |  | 
| 500 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 501 |     if (!rpt->current_owner) | 
| 502 |     { | 
| 503 |       mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 504 |       continue; | 
| 505 |     } | 
| 506 |     e= rpt->current_entry; | 
| 507 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 508 |     rpt->pause_for_ftwrl = false; | 
| 509 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 510 |     e->pause_sub_id= (uint64)ULONGLONG_MAX; | 
| 511 |     mysql_cond_broadcast(&e->COND_parallel_entry); | 
| 512 |     mysql_mutex_unlock(&e->LOCK_parallel_entry); | 
| 513 |   } | 
| 514 |  | 
| 515 |   pool_mark_not_busy(pool); | 
| 516 |   DBUG_VOID_RETURN; | 
| 517 | } | 
| 518 |  | 
| 519 |  | 
| 520 | /* | 
| 521 |   . | 
| 522 |  | 
| 523 |   Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called. | 
| 524 | */ | 
| 525 | int | 
| 526 | rpl_pause_for_ftwrl(THD *thd) | 
| 527 | { | 
| 528 |   uint32 i; | 
| 529 |   rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; | 
| 530 |   int err; | 
| 531 |   DBUG_ENTER("rpl_pause_for_ftwrl" ); | 
| 532 |  | 
| 533 |   /* | 
| 534 |     While the count_pending_pause_for_ftwrl counter is non-zero, the pool | 
| 535 |     cannot be shutdown/resized, so threads are guaranteed to not disappear. | 
| 536 |  | 
| 537 |     This is required to safely be able to access the individual threads below. | 
| 538 |     (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, | 
| 539 |     as this can deadlock against release_thread()). | 
| 540 |   */ | 
| 541 |   if ((err= pool_mark_busy(pool, thd))) | 
| 542 |     DBUG_RETURN(err); | 
| 543 |  | 
| 544 |   for (i= 0; i < pool->count; ++i) | 
| 545 |   { | 
| 546 |     PSI_stage_info old_stage; | 
| 547 |     rpl_parallel_entry *e; | 
| 548 |     rpl_parallel_thread *rpt= pool->threads[i]; | 
| 549 |  | 
| 550 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 551 |     if (!rpt->current_owner) | 
| 552 |     { | 
| 553 |       mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 554 |       continue; | 
| 555 |     } | 
| 556 |     e= rpt->current_entry; | 
| 557 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 558 |     /* | 
| 559 |       Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not | 
| 560 |       de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl(). | 
| 561 |     */ | 
| 562 |     rpt->pause_for_ftwrl = true; | 
| 563 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 564 |     ++e->need_sub_id_signal; | 
| 565 |     if (e->pause_sub_id == (uint64)ULONGLONG_MAX) | 
| 566 |       e->pause_sub_id= e->largest_started_sub_id; | 
| 567 |     thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, | 
| 568 |                     &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); | 
| 569 |     thd->set_time_for_next_stage(); | 
| 570 |     while (e->pause_sub_id < (uint64)ULONGLONG_MAX && | 
| 571 |            e->last_committed_sub_id < e->pause_sub_id && | 
| 572 |            !err) | 
| 573 |     { | 
| 574 |       if (unlikely(thd->check_killed())) | 
| 575 |       { | 
| 576 |         thd->send_kill_message(); | 
| 577 |         err= 1; | 
| 578 |         break; | 
| 579 |       } | 
| 580 |       mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); | 
| 581 |     }; | 
| 582 |     --e->need_sub_id_signal; | 
| 583 |     thd->EXIT_COND(&old_stage); | 
| 584 |     if (err) | 
| 585 |       break; | 
| 586 |   } | 
| 587 |  | 
| 588 |   if (err) | 
| 589 |     rpl_unpause_after_ftwrl(thd); | 
| 590 |   DBUG_RETURN(err); | 
| 591 | } | 
| 592 |  | 
| 593 |  | 
| 594 | #ifndef DBUG_OFF | 
| 595 | static int | 
| 596 | dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) | 
| 597 | { | 
| 598 |   if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 && | 
| 599 |       rgi->retry_event_count == 4) | 
| 600 |   { | 
| 601 |     thd->clear_error(); | 
| 602 |     thd->get_stmt_da()->reset_diagnostics_area(); | 
| 603 |     my_error(ER_LOCK_DEADLOCK, MYF(0)); | 
| 604 |     return 1; | 
| 605 |   } | 
| 606 |   return 0; | 
| 607 | } | 
| 608 | #endif | 
| 609 |  | 
| 610 |  | 
| 611 | /* | 
| 612 |   If we detect a deadlock due to eg. storage engine locks that conflict with | 
| 613 |   the fixed commit order, then the later transaction will be killed | 
| 614 |   asynchroneously to allow the former to complete its commit. | 
| 615 |  | 
| 616 |   In this case, we convert the 'killed' error into a deadlock error, and retry | 
| 617 |   the later transaction. | 
| 618 |  | 
| 619 |   If we are doing optimistic parallel apply of transactions not known to be | 
| 620 |   safe, we convert any error to a deadlock error, but then at retry we will | 
| 621 |   wait for prior transactions to commit first, so that the retries can be | 
| 622 |   done non-speculative. | 
| 623 | */ | 
| 624 | static void | 
| 625 | convert_kill_to_deadlock_error(rpl_group_info *rgi) | 
| 626 | { | 
| 627 |   THD *thd= rgi->thd; | 
| 628 |   int err_code; | 
| 629 |  | 
| 630 |   if (!thd->get_stmt_da()->is_error()) | 
| 631 |     return; | 
| 632 |   err_code= thd->get_stmt_da()->sql_errno(); | 
| 633 |   if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC && | 
| 634 |        err_code != ER_PRIOR_COMMIT_FAILED) || | 
| 635 |       ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && | 
| 636 |        rgi->killed_for_retry)) | 
| 637 |   { | 
| 638 |     thd->clear_error(); | 
| 639 |     my_error(ER_LOCK_DEADLOCK, MYF(0)); | 
| 640 |     thd->reset_killed(); | 
| 641 |   } | 
| 642 | } | 
| 643 |  | 
| 644 |  | 
| 645 | /* | 
| 646 |   Check if an event marks the end of an event group. Returns non-zero if so, | 
| 647 |   zero otherwise. | 
| 648 |  | 
| 649 |   In addition, returns 1 if the group is committing, 2 if it is rolling back. | 
| 650 | */ | 
| 651 | static int | 
| 652 | is_group_ending(Log_event *ev, Log_event_type event_type) | 
| 653 | { | 
| 654 |   if (event_type == XID_EVENT) | 
| 655 |     return 1; | 
| 656 |   if (event_type == QUERY_EVENT)  // COMMIT/ROLLBACK are never compressed | 
| 657 |   { | 
| 658 |     Query_log_event *qev = (Query_log_event *)ev; | 
| 659 |     if (qev->is_commit()) | 
| 660 |       return 1; | 
| 661 |     if (qev->is_rollback()) | 
| 662 |       return 2; | 
| 663 |   } | 
| 664 |   return 0; | 
| 665 | } | 
| 666 |  | 
| 667 |  | 
| 668 | static int | 
| 669 | retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, | 
| 670 |                   rpl_parallel_thread::queued_event *orig_qev) | 
| 671 | { | 
| 672 |   IO_CACHE rlog; | 
| 673 |   LOG_INFO linfo; | 
| 674 |   File fd= (File)-1; | 
| 675 |   const char *errmsg; | 
| 676 |   inuse_relaylog *ir= rgi->relay_log; | 
| 677 |   uint64 event_count; | 
| 678 |   uint64 events_to_execute= rgi->retry_event_count; | 
| 679 |   Relay_log_info *rli= rgi->rli; | 
| 680 |   int err; | 
| 681 |   ulonglong cur_offset, old_offset; | 
| 682 |   char log_name[FN_REFLEN]; | 
| 683 |   THD *thd= rgi->thd; | 
| 684 |   rpl_parallel_entry *entry= rgi->parallel_entry; | 
| 685 |   ulong retries= 0; | 
| 686 |   Format_description_log_event *description_event= NULL; | 
| 687 |  | 
| 688 | do_retry: | 
| 689 |   event_count= 0; | 
| 690 |   err= 0; | 
| 691 |   errmsg= NULL; | 
| 692 |  | 
| 693 |   /* | 
| 694 |     If we already started committing before getting the deadlock (or other | 
| 695 |     error) that caused us to need to retry, we have already signalled | 
| 696 |     subsequent transactions that we have started committing. This is | 
| 697 |     potentially a problem, as now we will rollback, and if subsequent | 
| 698 |     transactions would start to execute now, they could see an unexpected | 
| 699 |     state of the database and get eg. key not found or duplicate key error. | 
| 700 |  | 
| 701 |     However, to get a deadlock in the first place, there must have been | 
| 702 |     another earlier transaction that is waiting for us. Thus that other | 
| 703 |     transaction has _not_ yet started to commit, and any subsequent | 
| 704 |     transactions will still be waiting at this point. | 
| 705 |  | 
| 706 |     So here, we decrement back the count of transactions that started | 
| 707 |     committing (if we already incremented it), undoing the effect of an | 
| 708 |     earlier mark_start_commit(). Then later, when the retry succeeds and we | 
| 709 |     commit again, we can do a new mark_start_commit() and eventually wake up | 
| 710 |     subsequent transactions at the proper time. | 
| 711 |  | 
| 712 |     We need to do the unmark before the rollback, to be sure that the | 
| 713 |     transaction we deadlocked with will not signal that it started to commit | 
| 714 |     until after the unmark. | 
| 715 |   */ | 
| 716 |   DBUG_EXECUTE_IF("inject_mdev8302" , { my_sleep(20000);}); | 
| 717 |   rgi->unmark_start_commit(); | 
| 718 |   DEBUG_SYNC(thd, "rpl_parallel_retry_after_unmark" ); | 
| 719 |  | 
| 720 |   /* | 
| 721 |     We might get the deadlock error that causes the retry during commit, while | 
| 722 |     sitting in wait_for_prior_commit(). If this happens, we will have a | 
| 723 |     pending error in the wait_for_commit object. So clear this by | 
| 724 |     unregistering (and later re-registering) the wait. | 
| 725 |   */ | 
| 726 |   if(thd->wait_for_commit_ptr) | 
| 727 |     thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); | 
| 728 |   DBUG_EXECUTE_IF("inject_mdev8031" , { | 
| 729 |       /* Simulate that we get deadlock killed at this exact point. */ | 
| 730 |       rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; | 
| 731 |       thd->set_killed(KILL_CONNECTION); | 
| 732 |   }); | 
| 733 |   DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry" , { | 
| 734 |       if (rgi->current_gtid.seq_no == 1001) { | 
| 735 |         debug_sync_set_action(thd, | 
| 736 |                               STRING_WITH_LEN("rpl_parallel_simulate_wait_at_retry WAIT_FOR proceed_by_1001" )); | 
| 737 |       } | 
| 738 |       DEBUG_SYNC(thd, "rpl_parallel_simulate_wait_at_retry" ); | 
| 739 |     }); | 
| 740 |  | 
| 741 |   rgi->cleanup_context(thd, 1); | 
| 742 |   wait_for_pending_deadlock_kill(thd, rgi); | 
| 743 |   thd->reset_killed(); | 
| 744 |   thd->clear_error(); | 
| 745 |   rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE; | 
| 746 |  | 
| 747 |   /* | 
| 748 |     If we retry due to a deadlock kill that occurred during the commit step, we | 
| 749 |     might have already updated (but not committed) an update of table | 
| 750 |     mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have | 
| 751 |     rolled back any such update, so we must set the gtid_pending flag back to | 
| 752 |     true so that we will do a new update when/if we succeed with the retry. | 
| 753 |   */ | 
| 754 |   rgi->gtid_pending= true; | 
| 755 |  | 
| 756 |   mysql_mutex_lock(&rli->data_lock); | 
| 757 |   ++rli->retried_trans; | 
| 758 |   statistic_increment(slave_retried_transactions, LOCK_status); | 
| 759 |   mysql_mutex_unlock(&rli->data_lock); | 
| 760 |  | 
| 761 |   for (;;) | 
| 762 |   { | 
| 763 |     mysql_mutex_lock(&entry->LOCK_parallel_entry); | 
| 764 |     if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX || | 
| 765 | #ifndef DBUG_OFF | 
| 766 |         (DBUG_EVALUATE_IF("simulate_mdev_12746" , 1, 0)) || | 
| 767 | #endif | 
| 768 |         rgi->gtid_sub_id < entry->stop_on_error_sub_id) | 
| 769 |     { | 
| 770 |       register_wait_for_prior_event_group_commit(rgi, entry); | 
| 771 |     } | 
| 772 |     else | 
| 773 |     { | 
| 774 |       /* | 
| 775 |         A failure of a preceeding "parent" transaction may not be | 
| 776 |         seen by the current one through its own worker_error. | 
| 777 |         Such induced error gets set by ourselves now. | 
| 778 |       */ | 
| 779 |       err= rgi->worker_error= 1; | 
| 780 |       my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); | 
| 781 |       mysql_mutex_unlock(&entry->LOCK_parallel_entry); | 
| 782 |       goto err; | 
| 783 |     } | 
| 784 |     mysql_mutex_unlock(&entry->LOCK_parallel_entry); | 
| 785 |  | 
| 786 |     /* | 
| 787 |       Let us wait for all prior transactions to complete before trying again. | 
| 788 |       This way, we avoid repeatedly conflicting with and getting deadlock | 
| 789 |       killed by the same earlier transaction. | 
| 790 |     */ | 
| 791 |     if (!(err= thd->wait_for_prior_commit())) | 
| 792 |     { | 
| 793 |       rgi->speculation = rpl_group_info::SPECULATE_WAIT; | 
| 794 |       break; | 
| 795 |     } | 
| 796 |  | 
| 797 |     convert_kill_to_deadlock_error(rgi); | 
| 798 |     if (!has_temporary_error(thd)) | 
| 799 |       goto err; | 
| 800 |     /* | 
| 801 |       If we get a temporary error such as a deadlock kill, we can safely | 
| 802 |       ignore it, as we already rolled back. | 
| 803 |  | 
| 804 |       But we still want to retry the wait for the prior transaction to | 
| 805 |       complete its commit. | 
| 806 |     */ | 
| 807 |     thd->clear_error(); | 
| 808 |     thd->reset_killed(); | 
| 809 |     if(thd->wait_for_commit_ptr) | 
| 810 |       thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); | 
| 811 |     DBUG_EXECUTE_IF("inject_mdev8031" , { | 
| 812 |         /* Inject a small sleep to give prior transaction a chance to commit. */ | 
| 813 |         my_sleep(100000); | 
| 814 |     }); | 
| 815 |   } | 
| 816 |  | 
| 817 |   /* | 
| 818 |     Let us clear any lingering deadlock kill one more time, here after | 
| 819 |     wait_for_prior_commit() has completed. This should rule out any | 
| 820 |     possibility of an old deadlock kill lingering on beyond this point. | 
| 821 |   */ | 
| 822 |   thd->reset_killed(); | 
| 823 |  | 
| 824 |   strmake_buf(log_name, ir->name); | 
| 825 |   if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) | 
| 826 |   { | 
| 827 |     err= 1; | 
| 828 |     goto err; | 
| 829 |   } | 
| 830 |   cur_offset= rgi->retry_start_offset; | 
| 831 |   delete description_event; | 
| 832 |   description_event= | 
| 833 |     read_relay_log_description_event(&rlog, cur_offset, &errmsg); | 
| 834 |   if (!description_event) | 
| 835 |   { | 
| 836 |     err= 1; | 
| 837 |     goto err; | 
| 838 |   } | 
| 839 |   DBUG_EXECUTE_IF("inject_mdev8031" , { | 
| 840 |       /* Simulate pending KILL caught in read_relay_log_description_event(). */ | 
| 841 |       if (unlikely(thd->check_killed())) { | 
| 842 |         thd->send_kill_message(); | 
| 843 |         err= 1; | 
| 844 |         goto err; | 
| 845 |       } | 
| 846 |   }); | 
| 847 |   my_b_seek(&rlog, cur_offset); | 
| 848 |  | 
| 849 |   do | 
| 850 |   { | 
| 851 |     Log_event_type event_type; | 
| 852 |     Log_event *ev; | 
| 853 |     rpl_parallel_thread::queued_event *qev; | 
| 854 |  | 
| 855 |     /* The loop is here so we can try again the next relay log file on EOF. */ | 
| 856 |     for (;;) | 
| 857 |     { | 
| 858 |       old_offset= cur_offset; | 
| 859 |       ev= Log_event::read_log_event(&rlog, description_event, | 
| 860 |                                     opt_slave_sql_verify_checksum); | 
| 861 |       cur_offset= my_b_tell(&rlog); | 
| 862 |  | 
| 863 |       if (ev) | 
| 864 |         break; | 
| 865 |       if (unlikely(rlog.error < 0)) | 
| 866 |       { | 
| 867 |         errmsg= "slave SQL thread aborted because of I/O error" ; | 
| 868 |         err= 1; | 
| 869 |         goto check_retry; | 
| 870 |       } | 
| 871 |       if (unlikely(rlog.error > 0)) | 
| 872 |       { | 
| 873 |         sql_print_error("Slave SQL thread: I/O error reading "  | 
| 874 |                         "event(errno: %d  cur_log->error: %d)" , | 
| 875 |                         my_errno, rlog.error); | 
| 876 |         errmsg= "Aborting slave SQL thread because of partial event read" ; | 
| 877 |         err= 1; | 
| 878 |         goto err; | 
| 879 |       } | 
| 880 |       /* EOF. Move to the next relay log. */ | 
| 881 |       end_io_cache(&rlog); | 
| 882 |       mysql_file_close(fd, MYF(MY_WME)); | 
| 883 |       fd= (File)-1; | 
| 884 |  | 
| 885 |       /* Find the next relay log file. */ | 
| 886 |       if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) || | 
| 887 |          (err= rli->relay_log.find_next_log(&linfo, 1))) | 
| 888 |       { | 
| 889 |         char buff[22]; | 
| 890 |         sql_print_error("next log error: %d  offset: %s  log: %s" , | 
| 891 |                         err, | 
| 892 |                         llstr(linfo.index_file_offset, buff), | 
| 893 |                         log_name); | 
| 894 |         goto err; | 
| 895 |       } | 
| 896 |       strmake_buf(log_name ,linfo.log_file_name); | 
| 897 |  | 
| 898 |       DBUG_EXECUTE_IF("inject_retry_event_group_open_binlog_kill" , { | 
| 899 |           if (retries < 2) | 
| 900 |           { | 
| 901 |             /* Simulate that we get deadlock killed during open_binlog(). */ | 
| 902 |             thd->reset_for_next_command(); | 
| 903 |             rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; | 
| 904 |             thd->set_killed(KILL_CONNECTION); | 
| 905 |             thd->send_kill_message(); | 
| 906 |             fd= (File)-1; | 
| 907 |             err= 1; | 
| 908 |             goto check_retry; | 
| 909 |           } | 
| 910 |       }); | 
| 911 |       if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) | 
| 912 |       { | 
| 913 |         err= 1; | 
| 914 |         goto check_retry; | 
| 915 |       } | 
| 916 |       description_event->reset_crypto(); | 
| 917 |       /* Loop to try again on the new log file. */ | 
| 918 |     } | 
| 919 |  | 
| 920 |     event_type= ev->get_type_code(); | 
| 921 |     if (event_type == FORMAT_DESCRIPTION_EVENT) | 
| 922 |     { | 
| 923 |       Format_description_log_event *newde= (Format_description_log_event*)ev; | 
| 924 |       newde->copy_crypto_data(description_event); | 
| 925 |       delete description_event; | 
| 926 |       description_event= newde; | 
| 927 |       continue; | 
| 928 |     } | 
| 929 |     else if (event_type == START_ENCRYPTION_EVENT) | 
| 930 |     { | 
| 931 |       description_event->start_decryption((Start_encryption_log_event*)ev); | 
| 932 |       delete ev; | 
| 933 |       continue; | 
| 934 |     } | 
| 935 |     else if (!Log_event::is_group_event(event_type)) | 
| 936 |     { | 
| 937 |       delete ev; | 
| 938 |       continue; | 
| 939 |     } | 
| 940 |     ev->thd= thd; | 
| 941 |  | 
| 942 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 943 |     qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset, | 
| 944 |                             cur_offset - old_offset); | 
| 945 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 946 |     if (!qev) | 
| 947 |     { | 
| 948 |       delete ev; | 
| 949 |       my_error(ER_OUT_OF_RESOURCES, MYF(0)); | 
| 950 |       err= 1; | 
| 951 |       goto err; | 
| 952 |     } | 
| 953 |     if (is_group_ending(ev, event_type) == 1) | 
| 954 |       rgi->mark_start_commit(); | 
| 955 |  | 
| 956 |     err= rpt_handle_event(qev, rpt); | 
| 957 |     ++event_count; | 
| 958 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 959 |     rpt->free_qev(qev); | 
| 960 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 961 |  | 
| 962 |     delete_or_keep_event_post_apply(rgi, event_type, ev); | 
| 963 |     DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100" , | 
| 964 |                     if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); | 
| 965 |     DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100" , | 
| 966 |                     err= dbug_simulate_tmp_error(rgi, thd);); | 
| 967 |     if (!err) | 
| 968 |       continue; | 
| 969 |  | 
| 970 | check_retry: | 
| 971 |     convert_kill_to_deadlock_error(rgi); | 
| 972 |     if (has_temporary_error(thd)) | 
| 973 |     { | 
| 974 |       ++retries; | 
| 975 |       if (retries < slave_trans_retries) | 
| 976 |       { | 
| 977 |         if (fd >= 0) | 
| 978 |         { | 
| 979 |           end_io_cache(&rlog); | 
| 980 |           mysql_file_close(fd, MYF(MY_WME)); | 
| 981 |           fd= (File)-1; | 
| 982 |         } | 
| 983 |         goto do_retry; | 
| 984 |       } | 
| 985 |       sql_print_error("Slave worker thread retried transaction %lu time(s) "  | 
| 986 |                       "in vain, giving up. Consider raising the value of "  | 
| 987 |                       "the slave_transaction_retries variable." , | 
| 988 |                       slave_trans_retries); | 
| 989 |     } | 
| 990 |     goto err; | 
| 991 |  | 
| 992 |   } while (event_count < events_to_execute); | 
| 993 |  | 
| 994 | err: | 
| 995 |  | 
| 996 |   if (description_event) | 
| 997 |     delete description_event; | 
| 998 |   if (fd >= 0) | 
| 999 |   { | 
| 1000 |     end_io_cache(&rlog); | 
| 1001 |     mysql_file_close(fd, MYF(MY_WME)); | 
| 1002 |   } | 
| 1003 |   if (errmsg) | 
| 1004 |     sql_print_error("Error reading relay log event: %s" , errmsg); | 
| 1005 |   return err; | 
| 1006 | } | 
| 1007 |  | 
| 1008 |  | 
| 1009 | pthread_handler_t | 
| 1010 | handle_rpl_parallel_thread(void *arg) | 
| 1011 | { | 
| 1012 |   THD *thd; | 
| 1013 |   PSI_stage_info old_stage; | 
| 1014 |   struct rpl_parallel_thread::queued_event *events; | 
| 1015 |   bool group_standalone= true; | 
| 1016 |   bool in_event_group= false; | 
| 1017 |   bool skip_event_group= false; | 
| 1018 |   rpl_group_info *group_rgi= NULL; | 
| 1019 |   group_commit_orderer *gco; | 
| 1020 |   uint64 event_gtid_sub_id= 0; | 
| 1021 |   rpl_sql_thread_info sql_info(NULL); | 
| 1022 |   int err; | 
| 1023 |  | 
| 1024 |   struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; | 
| 1025 |  | 
| 1026 |   my_thread_init(); | 
| 1027 |   thd = new THD(next_thread_id()); | 
| 1028 |   thd->thread_stack = (char*)&thd; | 
| 1029 |   add_to_active_threads(thd); | 
| 1030 |   set_current_thd(thd); | 
| 1031 |   pthread_detach_this_thread(); | 
| 1032 |   thd->init_for_queries(); | 
| 1033 |   thd->variables.binlog_annotate_row_events= 0; | 
| 1034 |   init_thr_lock(); | 
| 1035 |   thd->store_globals(); | 
| 1036 |   thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; | 
| 1037 |   thd->security_ctx->skip_grants(); | 
| 1038 |   thd->variables.max_allowed_packet= slave_max_allowed_packet; | 
| 1039 |   /* Ensure that slave can exeute any alter table it gets from master */ | 
| 1040 |   thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT; | 
| 1041 |   thd->slave_thread= 1; | 
| 1042 |  | 
| 1043 |   set_slave_thread_options(thd); | 
| 1044 |   thd->client_capabilities = CLIENT_LOCAL_FILES; | 
| 1045 |   thd->net.reading_or_writing= 0; | 
| 1046 |   thd_proc_info(thd, "Waiting for work from main SQL threads" ); | 
| 1047 |   thd->variables.lock_wait_timeout= LONG_TIMEOUT; | 
| 1048 |   thd->system_thread_info.rpl_sql_info= &sql_info; | 
| 1049 |   /* | 
| 1050 |     We need to use (at least) REPEATABLE READ isolation level. Otherwise | 
| 1051 |     speculative parallel apply can run out-of-order and give wrong results | 
| 1052 |     for statement-based replication. | 
| 1053 |   */ | 
| 1054 |   thd->variables.tx_isolation= ISO_REPEATABLE_READ; | 
| 1055 |  | 
| 1056 |   mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1057 |   rpt->thd= thd; | 
| 1058 |  | 
| 1059 |   while (rpt->delay_start) | 
| 1060 |     mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); | 
| 1061 |  | 
| 1062 |   rpt->running= true; | 
| 1063 |   mysql_cond_signal(&rpt->COND_rpl_thread); | 
| 1064 |  | 
| 1065 |   thd->set_command(COM_SLAVE_WORKER); | 
| 1066 |   while (!rpt->stop) | 
| 1067 |   { | 
| 1068 |     uint wait_count= 0; | 
| 1069 |     rpl_parallel_thread::queued_event *qev, *next_qev; | 
| 1070 |  | 
| 1071 |     thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, | 
| 1072 |                     &stage_waiting_for_work_from_sql_thread, &old_stage); | 
| 1073 |     /* | 
| 1074 |       There are 4 cases that should cause us to wake up: | 
| 1075 |        - Events have been queued for us to handle. | 
| 1076 |        - We have an owner, but no events and not inside event group -> we need | 
| 1077 |          to release ourself to the thread pool | 
| 1078 |        - SQL thread is stopping, and we have an owner but no events, and we are | 
| 1079 |          inside an event group; no more events will be queued to us, so we need | 
| 1080 |          to abort the group (force_abort==1). | 
| 1081 |        - Thread pool shutdown (rpt->stop==1). | 
| 1082 |     */ | 
| 1083 |     while (!( (events= rpt->event_queue) || | 
| 1084 |               (rpt->current_owner && !in_event_group) || | 
| 1085 |               (rpt->current_owner && group_rgi->parallel_entry->force_abort) || | 
| 1086 |               rpt->stop)) | 
| 1087 |     { | 
| 1088 |       if (!wait_count++) | 
| 1089 |         thd->set_time_for_next_stage(); | 
| 1090 |       mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); | 
| 1091 |     } | 
| 1092 |     rpt->dequeue1(events); | 
| 1093 |     thd->EXIT_COND(&old_stage); | 
| 1094 |  | 
| 1095 |   more_events: | 
| 1096 |     for (qev= events; qev; qev= next_qev) | 
| 1097 |     { | 
| 1098 |       Log_event_type event_type; | 
| 1099 |       rpl_group_info *rgi= qev->rgi; | 
| 1100 |       rpl_parallel_entry *entry= rgi->parallel_entry; | 
| 1101 |       bool end_of_group; | 
| 1102 |       int group_ending; | 
| 1103 |  | 
| 1104 |       next_qev= qev->next; | 
| 1105 |       if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) | 
| 1106 |       { | 
| 1107 |         handle_queued_pos_update(thd, qev); | 
| 1108 |         rpt->loc_free_qev(qev); | 
| 1109 |         continue; | 
| 1110 |       } | 
| 1111 |       else if (qev->typ == | 
| 1112 |                rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) | 
| 1113 |       { | 
| 1114 |         if (in_event_group) | 
| 1115 |         { | 
| 1116 |           /* | 
| 1117 |             Master restarted (crashed) in the middle of an event group. | 
| 1118 |             So we need to roll back and discard that event group. | 
| 1119 |           */ | 
| 1120 |           group_rgi->cleanup_context(thd, 1); | 
| 1121 |           in_event_group= false; | 
| 1122 |           finish_event_group(rpt, group_rgi->gtid_sub_id, | 
| 1123 |                              qev->entry_for_queued, group_rgi); | 
| 1124 |  | 
| 1125 |           rpt->loc_free_rgi(group_rgi); | 
| 1126 |           thd->rgi_slave= group_rgi= NULL; | 
| 1127 |         } | 
| 1128 |  | 
| 1129 |         rpt->loc_free_qev(qev); | 
| 1130 |         continue; | 
| 1131 |       } | 
| 1132 |       DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); | 
| 1133 |  | 
| 1134 |       thd->rgi_slave= rgi; | 
| 1135 |       gco= rgi->gco; | 
| 1136 |       /* Handle a new event group, which will be initiated by a GTID event. */ | 
| 1137 |       if ((event_type= qev->ev->get_type_code()) == GTID_EVENT) | 
| 1138 |       { | 
| 1139 |         bool did_enter_cond= false; | 
| 1140 |         PSI_stage_info old_stage; | 
| 1141 |  | 
| 1142 |         DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100" , { | 
| 1143 |             if (rgi->current_gtid.domain_id == 0 && | 
| 1144 |                 rgi->current_gtid.seq_no == 100) { | 
| 1145 |               debug_sync_set_action(thd, | 
| 1146 |                       STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100" )); | 
| 1147 |             } | 
| 1148 |           }); | 
| 1149 |  | 
| 1150 |         if(unlikely(thd->wait_for_commit_ptr) && group_rgi != NULL) | 
| 1151 |         { | 
| 1152 |           /* | 
| 1153 |             This indicates that we get a new GTID event in the middle of | 
| 1154 |             a not completed event group. This is corrupt binlog (the master | 
| 1155 |             will never write such binlog), so it does not happen unless | 
| 1156 |             someone tries to inject wrong crafted binlog, but let us still | 
| 1157 |             try to handle it somewhat nicely. | 
| 1158 |           */ | 
| 1159 |           group_rgi->cleanup_context(thd, true); | 
| 1160 |           finish_event_group(rpt, group_rgi->gtid_sub_id, | 
| 1161 |                              group_rgi->parallel_entry, group_rgi); | 
| 1162 |           rpt->loc_free_rgi(group_rgi); | 
| 1163 |         } | 
| 1164 |  | 
| 1165 |         thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; | 
| 1166 |         in_event_group= true; | 
| 1167 |         /* | 
| 1168 |           If the standalone flag is set, then this event group consists of a | 
| 1169 |           single statement (possibly preceeded by some Intvar_log_event and | 
| 1170 |           similar), without any terminating COMMIT/ROLLBACK/XID. | 
| 1171 |         */ | 
| 1172 |         group_standalone= | 
| 1173 |           (0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 & | 
| 1174 |                  Gtid_log_event::FL_STANDALONE)); | 
| 1175 |  | 
| 1176 |         event_gtid_sub_id= rgi->gtid_sub_id; | 
| 1177 |         rgi->thd= thd; | 
| 1178 |  | 
| 1179 |         mysql_mutex_lock(&entry->LOCK_parallel_entry); | 
| 1180 |         skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage); | 
| 1181 |  | 
| 1182 |         if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) | 
| 1183 |           skip_event_group= true; | 
| 1184 |         if (likely(!skip_event_group)) | 
| 1185 |           do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); | 
| 1186 |  | 
| 1187 |         /* | 
| 1188 |           Register ourself to wait for the previous commit, if we need to do | 
| 1189 |           such registration _and_ that previous commit has not already | 
| 1190 |           occurred. | 
| 1191 |         */ | 
| 1192 |         register_wait_for_prior_event_group_commit(rgi, entry); | 
| 1193 |  | 
| 1194 |         unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, | 
| 1195 |                             &did_enter_cond, &old_stage); | 
| 1196 |  | 
| 1197 |         thd->wait_for_commit_ptr= &rgi->commit_orderer; | 
| 1198 |  | 
| 1199 |         if (opt_gtid_ignore_duplicates && | 
| 1200 |             rgi->rli->mi->using_gtid != Master_info::USE_GTID_NO) | 
| 1201 |         { | 
| 1202 |           int res= | 
| 1203 |             rpl_global_gtid_slave_state->check_duplicate_gtid(&rgi->current_gtid, | 
| 1204 |                                                              rgi); | 
| 1205 |           if (res < 0) | 
| 1206 |           { | 
| 1207 |             /* Error. */ | 
| 1208 |             slave_output_error_info(rgi, thd); | 
| 1209 |             signal_error_to_sql_driver_thread(thd, rgi, 1); | 
| 1210 |           } | 
| 1211 |           else if (!res) | 
| 1212 |           { | 
| 1213 |             /* GTID already applied by another master connection, skip. */ | 
| 1214 |             skip_event_group= true; | 
| 1215 |           } | 
| 1216 |           else | 
| 1217 |           { | 
| 1218 |             /* We have to apply the event. */ | 
| 1219 |           } | 
| 1220 |         } | 
| 1221 |         /* | 
| 1222 |           If we are optimistically running transactions in parallel, but this | 
| 1223 |           particular event group should not run in parallel with what came | 
| 1224 |           before, then wait now for the prior transaction to complete its | 
| 1225 |           commit. | 
| 1226 |         */ | 
| 1227 |         if (rgi->speculation == rpl_group_info::SPECULATE_WAIT && | 
| 1228 |             (err= thd->wait_for_prior_commit())) | 
| 1229 |         { | 
| 1230 |           slave_output_error_info(rgi, thd); | 
| 1231 |           signal_error_to_sql_driver_thread(thd, rgi, 1); | 
| 1232 |         } | 
| 1233 |       } | 
| 1234 |  | 
| 1235 |       group_rgi= rgi; | 
| 1236 |       group_ending= is_group_ending(qev->ev, event_type); | 
| 1237 |       /* | 
| 1238 |         We do not unmark_start_commit() here in case of an explicit ROLLBACK | 
| 1239 |         statement. Such events should be very rare, there is no real reason | 
| 1240 |         to try to group commit them - on the contrary, it seems best to avoid | 
| 1241 |         running them in parallel with following group commits, as with | 
| 1242 |         ROLLBACK events we are already deep in dangerous corner cases with | 
| 1243 |         mix of transactional and non-transactional tables or the like. And | 
| 1244 |         avoiding the mark_start_commit() here allows us to keep an assertion | 
| 1245 |         in ha_rollback_trans() that we do not rollback after doing | 
| 1246 |         mark_start_commit(). | 
| 1247 |       */ | 
| 1248 |       if (group_ending == 1 && likely(!rgi->worker_error)) | 
| 1249 |       { | 
| 1250 |         /* | 
| 1251 |           Do an extra check for (deadlock) kill here. This helps prevent a | 
| 1252 |           lingering deadlock kill that occurred during normal DML processing to | 
| 1253 |           propagate past the mark_start_commit(). If we detect a deadlock only | 
| 1254 |           after mark_start_commit(), we have to unmark, which has at least a | 
| 1255 |           theoretical possibility of leaving a window where it looks like all | 
| 1256 |           transactions in a GCO have started committing, while in fact one | 
| 1257 |           will need to rollback and retry. This is not supposed to be possible | 
| 1258 |           (since there is a deadlock, at least one transaction should be | 
| 1259 |           blocked from reaching commit), but this seems a fragile ensurance, | 
| 1260 |           and there were historically a number of subtle bugs in this area. | 
| 1261 |         */ | 
| 1262 |         if (!thd->killed) | 
| 1263 |         { | 
| 1264 |           DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit" ); | 
| 1265 |           rgi->mark_start_commit(); | 
| 1266 |           DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit" ); | 
| 1267 |         } | 
| 1268 |       } | 
| 1269 |  | 
| 1270 |       /* | 
| 1271 |         If the SQL thread is stopping, we just skip execution of all the | 
| 1272 |         following event groups. We still do all the normal waiting and wakeup | 
| 1273 |         processing between the event groups as a simple way to ensure that | 
| 1274 |         everything is stopped and cleaned up correctly. | 
| 1275 |       */ | 
| 1276 |       if (likely(!rgi->worker_error) && !skip_event_group) | 
| 1277 |       { | 
| 1278 |         ++rgi->retry_event_count; | 
| 1279 | #ifndef DBUG_OFF | 
| 1280 |         err= 0; | 
| 1281 |         DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid" , | 
| 1282 |           if (event_type == XID_EVENT) | 
| 1283 |           { | 
| 1284 |             thd->clear_error(); | 
| 1285 |             thd->get_stmt_da()->reset_diagnostics_area(); | 
| 1286 |             my_error(ER_LOCK_DEADLOCK, MYF(0)); | 
| 1287 |             err= 1; | 
| 1288 |             DEBUG_SYNC(thd, "rpl_parallel_simulate_temp_err_xid" ); | 
| 1289 |           }); | 
| 1290 |         if (!err) | 
| 1291 | #endif | 
| 1292 |         { | 
| 1293 |           if (unlikely(thd->check_killed())) | 
| 1294 |           { | 
| 1295 |             thd->clear_error(); | 
| 1296 |             thd->get_stmt_da()->reset_diagnostics_area(); | 
| 1297 |             thd->send_kill_message(); | 
| 1298 |             err= 1; | 
| 1299 |           } | 
| 1300 |           else | 
| 1301 |             err= rpt_handle_event(qev, rpt); | 
| 1302 |         } | 
| 1303 |         delete_or_keep_event_post_apply(rgi, event_type, qev->ev); | 
| 1304 |         DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100" , | 
| 1305 |                         err= dbug_simulate_tmp_error(rgi, thd);); | 
| 1306 |         if (unlikely(err)) | 
| 1307 |         { | 
| 1308 |           convert_kill_to_deadlock_error(rgi); | 
| 1309 |           if (has_temporary_error(thd) && slave_trans_retries > 0) | 
| 1310 |             err= retry_event_group(rgi, rpt, qev); | 
| 1311 |         } | 
| 1312 |       } | 
| 1313 |       else | 
| 1314 |       { | 
| 1315 |         delete qev->ev; | 
| 1316 |         thd->get_stmt_da()->set_overwrite_status(true); | 
| 1317 |         err= thd->wait_for_prior_commit(); | 
| 1318 |         thd->get_stmt_da()->set_overwrite_status(false); | 
| 1319 |       } | 
| 1320 |  | 
| 1321 |       end_of_group= | 
| 1322 |         in_event_group && | 
| 1323 |         ((group_standalone && !Log_event::is_part_of_group(event_type)) || | 
| 1324 |          group_ending); | 
| 1325 |  | 
| 1326 |       rpt->loc_free_qev(qev); | 
| 1327 |  | 
| 1328 |       if (unlikely(err)) | 
| 1329 |       { | 
| 1330 |         if (!rgi->worker_error) | 
| 1331 |         { | 
| 1332 |           slave_output_error_info(rgi, thd); | 
| 1333 |           signal_error_to_sql_driver_thread(thd, rgi, err); | 
| 1334 |         } | 
| 1335 |         thd->reset_killed(); | 
| 1336 |       } | 
| 1337 |       if (end_of_group) | 
| 1338 |       { | 
| 1339 |         in_event_group= false; | 
| 1340 |         finish_event_group(rpt, event_gtid_sub_id, entry, rgi); | 
| 1341 |         rpt->loc_free_rgi(rgi); | 
| 1342 |         thd->rgi_slave= group_rgi= rgi= NULL; | 
| 1343 |         skip_event_group= false; | 
| 1344 |         DEBUG_SYNC(thd, "rpl_parallel_end_of_group" ); | 
| 1345 |       } | 
| 1346 |     } | 
| 1347 |  | 
| 1348 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1349 |     /* | 
| 1350 |       Now that we have the lock, we can move everything from our local free | 
| 1351 |       lists to the real free lists that are also accessible from the SQL | 
| 1352 |       driver thread. | 
| 1353 |     */ | 
| 1354 |     rpt->batch_free(); | 
| 1355 |  | 
| 1356 |     if ((events= rpt->event_queue) != NULL) | 
| 1357 |     { | 
| 1358 |       /* | 
| 1359 |         Take next group of events from the replication pool. | 
| 1360 |         This is faster than having to wakeup the pool manager thread to give | 
| 1361 |         us a new event. | 
| 1362 |       */ | 
| 1363 |       rpt->dequeue1(events); | 
| 1364 |       mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1365 |       goto more_events; | 
| 1366 |     } | 
| 1367 |  | 
| 1368 |     rpt->inuse_relaylog_refcount_update(); | 
| 1369 |  | 
| 1370 |     if (in_event_group && group_rgi->parallel_entry->force_abort) | 
| 1371 |     { | 
| 1372 |       /* | 
| 1373 |         We are asked to abort, without getting the remaining events in the | 
| 1374 |         current event group. | 
| 1375 |  | 
| 1376 |         We have to rollback the current transaction and update the last | 
| 1377 |         sub_id value so that SQL thread will know we are done with the | 
| 1378 |         half-processed event group. | 
| 1379 |       */ | 
| 1380 |       mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1381 |       signal_error_to_sql_driver_thread(thd, group_rgi, 1); | 
| 1382 |       finish_event_group(rpt, group_rgi->gtid_sub_id, | 
| 1383 |                          group_rgi->parallel_entry, group_rgi); | 
| 1384 |       in_event_group= false; | 
| 1385 |       mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1386 |       rpt->free_rgi(group_rgi); | 
| 1387 |       thd->rgi_slave= group_rgi= NULL; | 
| 1388 |       skip_event_group= false; | 
| 1389 |     } | 
| 1390 |     if (!in_event_group) | 
| 1391 |     { | 
| 1392 |       /* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */ | 
| 1393 |       while (rpt->current_entry && rpt->pause_for_ftwrl) | 
| 1394 |       { | 
| 1395 |         /* | 
| 1396 |           We are currently in the delicate process of pausing parallel | 
| 1397 |           replication while FLUSH TABLES WITH READ LOCK is starting. We must | 
| 1398 |           not de-allocate the thread (setting rpt->current_owner= NULL) until | 
| 1399 |           rpl_unpause_after_ftwrl() has woken us up. | 
| 1400 |         */ | 
| 1401 |         rpl_parallel_entry *e= rpt->current_entry; | 
| 1402 |         /* | 
| 1403 |           Wait for rpl_unpause_after_ftwrl() to wake us up. | 
| 1404 |           Note that rpl_pause_for_ftwrl() may wait for 'e->pause_sub_id' | 
| 1405 |           to change. This should happen eventually in finish_event_group() | 
| 1406 |         */ | 
| 1407 |         mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 1408 |         mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1409 |         if (rpt->pause_for_ftwrl) | 
| 1410 |           mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); | 
| 1411 |         mysql_mutex_unlock(&e->LOCK_parallel_entry); | 
| 1412 |         mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1413 |       } | 
| 1414 |  | 
| 1415 |       rpt->current_owner= NULL; | 
| 1416 |       /* Tell wait_for_done() that we are done, if it is waiting. */ | 
| 1417 |       if (likely(rpt->current_entry) && | 
| 1418 |           unlikely(rpt->current_entry->force_abort)) | 
| 1419 |         mysql_cond_broadcast(&rpt->COND_rpl_thread_stop); | 
| 1420 |  | 
| 1421 |       rpt->current_entry= NULL; | 
| 1422 |       if (!rpt->stop) | 
| 1423 |         rpt->pool->release_thread(rpt); | 
| 1424 |     } | 
| 1425 |   } | 
| 1426 |  | 
| 1427 |   rpt->thd= NULL; | 
| 1428 |   mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1429 |  | 
| 1430 |   thd->clear_error(); | 
| 1431 |   thd->catalog= 0; | 
| 1432 |   thd->reset_query(); | 
| 1433 |   thd->reset_db(&null_clex_str); | 
| 1434 |   thd_proc_info(thd, "Slave worker thread exiting" ); | 
| 1435 |   thd->temporary_tables= 0; | 
| 1436 |  | 
| 1437 |   THD_CHECK_SENTRY(thd); | 
| 1438 |   unlink_not_visible_thd(thd); | 
| 1439 |   delete thd; | 
| 1440 |  | 
| 1441 |   mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1442 |   rpt->running= false; | 
| 1443 |   mysql_cond_signal(&rpt->COND_rpl_thread); | 
| 1444 |   mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1445 |  | 
| 1446 |   my_thread_end(); | 
| 1447 |  | 
| 1448 |   return NULL; | 
| 1449 | } | 
| 1450 |  | 
| 1451 |  | 
| 1452 | static void | 
| 1453 | dealloc_gco(group_commit_orderer *gco) | 
| 1454 | { | 
| 1455 |   mysql_cond_destroy(&gco->COND_group_commit_orderer); | 
| 1456 |   my_free(gco); | 
| 1457 | } | 
| 1458 |  | 
| 1459 | /** | 
| 1460 |    Change thread count for global parallel worker threads | 
| 1461 |  | 
| 1462 |    @param pool          parallel thread pool | 
| 1463 |    @param new_count     Number of threads to be in pool. 0 in shutdown | 
| 1464 |    @param force         Force thread count to new_count even if slave | 
| 1465 |                         threads are running | 
| 1466 |  | 
| 1467 |    By default we don't resize pool of there are running threads. | 
| 1468 |    However during shutdown we will always do it. | 
| 1469 |    This is needed as any_slave_sql_running() returns 1 during shutdown | 
| 1470 |    as we don't want to access master_info while | 
| 1471 |    Master_info_index::free_connections are running. | 
| 1472 | */ | 
| 1473 |  | 
| 1474 | static int | 
| 1475 | rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, | 
| 1476 |                                  uint32 new_count, bool force) | 
| 1477 | { | 
| 1478 |   uint32 i; | 
| 1479 |   rpl_parallel_thread **old_list= NULL; | 
| 1480 |   rpl_parallel_thread **new_list= NULL; | 
| 1481 |   rpl_parallel_thread *new_free_list= NULL; | 
| 1482 |   rpl_parallel_thread *rpt_array= NULL; | 
| 1483 |   int res; | 
| 1484 |  | 
| 1485 |   if ((res= pool_mark_busy(pool, current_thd))) | 
| 1486 |     return res; | 
| 1487 |  | 
| 1488 |   /* Protect against parallel pool resizes */ | 
| 1489 |   if (pool->count == new_count) | 
| 1490 |   { | 
| 1491 |     pool_mark_not_busy(pool); | 
| 1492 |     return 0; | 
| 1493 |   } | 
| 1494 |  | 
| 1495 |   /* | 
| 1496 |     If we are about to delete pool, do an extra check that there are no new | 
| 1497 |     slave threads running since we marked pool busy | 
| 1498 |   */ | 
| 1499 |   if (!new_count && !force) | 
| 1500 |   { | 
| 1501 |     if (any_slave_sql_running(false)) | 
| 1502 |     { | 
| 1503 |       DBUG_PRINT("warning" , | 
| 1504 |                  ("SQL threads running while trying to reset parallel pool" )); | 
| 1505 |       pool_mark_not_busy(pool); | 
| 1506 |       return 0;                                 // Ok to not resize pool | 
| 1507 |     } | 
| 1508 |   } | 
| 1509 |  | 
| 1510 |   /* | 
| 1511 |     Allocate the new list of threads up-front. | 
| 1512 |     That way, if we fail half-way, we only need to free whatever we managed | 
| 1513 |     to allocate, and will not be left with a half-functional thread pool. | 
| 1514 |   */ | 
| 1515 |   if (new_count && | 
| 1516 |       !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), | 
| 1517 |                        &new_list, new_count*sizeof(*new_list), | 
| 1518 |                        &rpt_array, new_count*sizeof(*rpt_array), | 
| 1519 |                        NULL)) | 
| 1520 |   { | 
| 1521 |     my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + | 
| 1522 |                                           new_count*sizeof(*rpt_array)))); | 
| 1523 |     goto err; | 
| 1524 |   } | 
| 1525 |  | 
| 1526 |   for (i= 0; i < new_count; ++i) | 
| 1527 |   { | 
| 1528 |     pthread_t th; | 
| 1529 |  | 
| 1530 |     new_list[i]= &rpt_array[i]; | 
| 1531 |     new_list[i]->delay_start= true; | 
| 1532 |     mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, | 
| 1533 |                      MY_MUTEX_INIT_SLOW); | 
| 1534 |     mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); | 
| 1535 |     mysql_cond_init(key_COND_rpl_thread_queue, | 
| 1536 |                     &new_list[i]->COND_rpl_thread_queue, NULL); | 
| 1537 |     mysql_cond_init(key_COND_rpl_thread_stop, | 
| 1538 |                     &new_list[i]->COND_rpl_thread_stop, NULL); | 
| 1539 |     new_list[i]->pool= pool; | 
| 1540 |     if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib, | 
| 1541 |                             handle_rpl_parallel_thread, new_list[i])) | 
| 1542 |     { | 
| 1543 |       my_error(ER_OUT_OF_RESOURCES, MYF(0)); | 
| 1544 |       goto err; | 
| 1545 |     } | 
| 1546 |     new_list[i]->next= new_free_list; | 
| 1547 |     new_free_list= new_list[i]; | 
| 1548 |   } | 
| 1549 |  | 
| 1550 |   /* | 
| 1551 |     Grab each old thread in turn, and signal it to stop. | 
| 1552 |  | 
| 1553 |     Note that since we require all replication threads to be stopped before | 
| 1554 |     changing the parallel replication worker thread pool, all the threads will | 
| 1555 |     be already idle and will terminate immediately. | 
| 1556 |   */ | 
| 1557 |   for (i= 0; i < pool->count; ++i) | 
| 1558 |   { | 
| 1559 |     rpl_parallel_thread *rpt; | 
| 1560 |  | 
| 1561 |     mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); | 
| 1562 |     while ((rpt= pool->free_list) == NULL) | 
| 1563 |       mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); | 
| 1564 |     pool->free_list= rpt->next; | 
| 1565 |     mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); | 
| 1566 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1567 |     rpt->stop= true; | 
| 1568 |     mysql_cond_signal(&rpt->COND_rpl_thread); | 
| 1569 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1570 |   } | 
| 1571 |  | 
| 1572 |   for (i= 0; i < pool->count; ++i) | 
| 1573 |   { | 
| 1574 |     rpl_parallel_thread *rpt= pool->threads[i]; | 
| 1575 |     mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1576 |     while (rpt->running) | 
| 1577 |       mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); | 
| 1578 |     mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 1579 |     mysql_mutex_destroy(&rpt->LOCK_rpl_thread); | 
| 1580 |     mysql_cond_destroy(&rpt->COND_rpl_thread); | 
| 1581 |     while (rpt->qev_free_list) | 
| 1582 |     { | 
| 1583 |       rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next; | 
| 1584 |       my_free(rpt->qev_free_list); | 
| 1585 |       rpt->qev_free_list= next; | 
| 1586 |     } | 
| 1587 |     while (rpt->rgi_free_list) | 
| 1588 |     { | 
| 1589 |       rpl_group_info *next= rpt->rgi_free_list->next; | 
| 1590 |       delete rpt->rgi_free_list; | 
| 1591 |       rpt->rgi_free_list= next; | 
| 1592 |     } | 
| 1593 |     while (rpt->gco_free_list) | 
| 1594 |     { | 
| 1595 |       group_commit_orderer *next= rpt->gco_free_list->next_gco; | 
| 1596 |       dealloc_gco(rpt->gco_free_list); | 
| 1597 |       rpt->gco_free_list= next; | 
| 1598 |     } | 
| 1599 |   } | 
| 1600 |  | 
| 1601 |   old_list= pool->threads; | 
| 1602 |   if (new_count < pool->count) | 
| 1603 |     pool->count= new_count; | 
| 1604 |   pool->threads= new_list; | 
| 1605 |   if (new_count > pool->count) | 
| 1606 |     pool->count= new_count; | 
| 1607 |   my_free(old_list); | 
| 1608 |   pool->free_list= new_free_list; | 
| 1609 |   for (i= 0; i < pool->count; ++i) | 
| 1610 |   { | 
| 1611 |     mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); | 
| 1612 |     pool->threads[i]->delay_start= false; | 
| 1613 |     mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); | 
| 1614 |     while (!pool->threads[i]->running) | 
| 1615 |       mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, | 
| 1616 |                       &pool->threads[i]->LOCK_rpl_thread); | 
| 1617 |     mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); | 
| 1618 |   } | 
| 1619 |  | 
| 1620 |   pool_mark_not_busy(pool); | 
| 1621 |  | 
| 1622 |   return 0; | 
| 1623 |  | 
| 1624 | err: | 
| 1625 |   if (new_list) | 
| 1626 |   { | 
| 1627 |     while (new_free_list) | 
| 1628 |     { | 
| 1629 |       mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); | 
| 1630 |       new_free_list->delay_start= false; | 
| 1631 |       new_free_list->stop= true; | 
| 1632 |       mysql_cond_signal(&new_free_list->COND_rpl_thread); | 
| 1633 |       while (!new_free_list->running) | 
| 1634 |         mysql_cond_wait(&new_free_list->COND_rpl_thread, | 
| 1635 |                         &new_free_list->LOCK_rpl_thread); | 
| 1636 |       while (new_free_list->running) | 
| 1637 |         mysql_cond_wait(&new_free_list->COND_rpl_thread, | 
| 1638 |                         &new_free_list->LOCK_rpl_thread); | 
| 1639 |       mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread); | 
| 1640 |       new_free_list= new_free_list->next; | 
| 1641 |     } | 
| 1642 |     my_free(new_list); | 
| 1643 |   } | 
| 1644 |   pool_mark_not_busy(pool); | 
| 1645 |   return 1; | 
| 1646 | } | 
| 1647 |  | 
| 1648 | /* | 
| 1649 |   Deactivate the parallel replication thread pool, if there are now no more | 
| 1650 |   SQL threads running. | 
| 1651 | */ | 
| 1652 |  | 
| 1653 | int rpl_parallel_resize_pool_if_no_slaves(void) | 
| 1654 | { | 
| 1655 |   /* master_info_index is set to NULL on shutdown */ | 
| 1656 |   if (opt_slave_parallel_threads > 0 && !any_slave_sql_running(false)) | 
| 1657 |     return rpl_parallel_inactivate_pool(&global_rpl_thread_pool); | 
| 1658 |   return 0; | 
| 1659 | } | 
| 1660 |  | 
| 1661 |  | 
| 1662 | /** | 
| 1663 |    Resize pool if not active or busy (in which case we may be in | 
| 1664 |    resize to 0 | 
| 1665 | */ | 
| 1666 |  | 
| 1667 | int | 
| 1668 | rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) | 
| 1669 | { | 
| 1670 |   bool resize; | 
| 1671 |   mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); | 
| 1672 |   resize= !pool->count || pool->busy; | 
| 1673 |   mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); | 
| 1674 |   if (resize) | 
| 1675 |     return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads, | 
| 1676 |                                             0); | 
| 1677 |   return 0; | 
| 1678 | } | 
| 1679 |  | 
| 1680 |  | 
| 1681 | int | 
| 1682 | rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) | 
| 1683 | { | 
| 1684 |   return rpl_parallel_change_thread_count(pool, 0, 0); | 
| 1685 | } | 
| 1686 |  | 
| 1687 |  | 
| 1688 | void | 
| 1689 | rpl_parallel_thread::batch_free() | 
| 1690 | { | 
| 1691 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1692 |   if (loc_qev_list) | 
| 1693 |   { | 
| 1694 |     *loc_qev_last_ptr_ptr= qev_free_list; | 
| 1695 |     qev_free_list= loc_qev_list; | 
| 1696 |     loc_qev_list= NULL; | 
| 1697 |     dequeue2(loc_qev_size); | 
| 1698 |     /* Signal that our queue can now accept more events. */ | 
| 1699 |     mysql_cond_signal(&COND_rpl_thread_queue); | 
| 1700 |     loc_qev_size= 0; | 
| 1701 |     qev_free_pending= 0; | 
| 1702 |   } | 
| 1703 |   if (loc_rgi_list) | 
| 1704 |   { | 
| 1705 |     *loc_rgi_last_ptr_ptr= rgi_free_list; | 
| 1706 |     rgi_free_list= loc_rgi_list; | 
| 1707 |     loc_rgi_list= NULL; | 
| 1708 |   } | 
| 1709 |   if (loc_gco_list) | 
| 1710 |   { | 
| 1711 |     *loc_gco_last_ptr_ptr= gco_free_list; | 
| 1712 |     gco_free_list= loc_gco_list; | 
| 1713 |     loc_gco_list= NULL; | 
| 1714 |   } | 
| 1715 | } | 
| 1716 |  | 
| 1717 |  | 
| 1718 | void | 
| 1719 | rpl_parallel_thread::inuse_relaylog_refcount_update() | 
| 1720 | { | 
| 1721 |   inuse_relaylog *ir= accumulated_ir_last; | 
| 1722 |   if (ir) | 
| 1723 |   { | 
| 1724 |     my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); | 
| 1725 |     accumulated_ir_count= 0; | 
| 1726 |     accumulated_ir_last= NULL; | 
| 1727 |   } | 
| 1728 | } | 
| 1729 |  | 
| 1730 |  | 
| 1731 | rpl_parallel_thread::queued_event * | 
| 1732 | rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) | 
| 1733 | { | 
| 1734 |   queued_event *qev; | 
| 1735 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1736 |   if ((qev= qev_free_list)) | 
| 1737 |     qev_free_list= qev->next; | 
| 1738 |   else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0)))) | 
| 1739 |   { | 
| 1740 |     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); | 
| 1741 |     return NULL; | 
| 1742 |   } | 
| 1743 |   qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT; | 
| 1744 |   qev->ev= ev; | 
| 1745 |   qev->event_size= (size_t)event_size; | 
| 1746 |   qev->next= NULL; | 
| 1747 |   return qev; | 
| 1748 | } | 
| 1749 |  | 
| 1750 |  | 
| 1751 | rpl_parallel_thread::queued_event * | 
| 1752 | rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, | 
| 1753 |                              Relay_log_info *rli) | 
| 1754 | { | 
| 1755 |   queued_event *qev= get_qev_common(ev, event_size); | 
| 1756 |   if (!qev) | 
| 1757 |     return NULL; | 
| 1758 |   strcpy(qev->event_relay_log_name, rli->event_relay_log_name); | 
| 1759 |   qev->event_relay_log_pos= rli->event_relay_log_pos; | 
| 1760 |   qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; | 
| 1761 |   strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); | 
| 1762 |   return qev; | 
| 1763 | } | 
| 1764 |  | 
| 1765 |  | 
| 1766 | rpl_parallel_thread::queued_event * | 
| 1767 | rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, | 
| 1768 |                                    const char *relay_log_name, | 
| 1769 |                                    ulonglong event_pos, ulonglong event_size) | 
| 1770 | { | 
| 1771 |   queued_event *qev= get_qev_common(ev, event_size); | 
| 1772 |   if (!qev) | 
| 1773 |     return NULL; | 
| 1774 |   qev->rgi= orig_qev->rgi; | 
| 1775 |   strcpy(qev->event_relay_log_name, relay_log_name); | 
| 1776 |   qev->event_relay_log_pos= event_pos; | 
| 1777 |   qev->future_event_relay_log_pos= event_pos+event_size; | 
| 1778 |   strcpy(qev->future_event_master_log_name, | 
| 1779 |          orig_qev->future_event_master_log_name); | 
| 1780 |   return qev; | 
| 1781 | } | 
| 1782 |  | 
| 1783 |  | 
| 1784 | void | 
| 1785 | rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev) | 
| 1786 | { | 
| 1787 |   inuse_relaylog *ir= qev->ir; | 
| 1788 |   inuse_relaylog *last_ir= accumulated_ir_last; | 
| 1789 |   if (ir != last_ir) | 
| 1790 |   { | 
| 1791 |     if (last_ir) | 
| 1792 |       inuse_relaylog_refcount_update(); | 
| 1793 |     accumulated_ir_last= ir; | 
| 1794 |   } | 
| 1795 |   ++accumulated_ir_count; | 
| 1796 |   if (!loc_qev_list) | 
| 1797 |     loc_qev_last_ptr_ptr= &qev->next; | 
| 1798 |   else | 
| 1799 |     qev->next= loc_qev_list; | 
| 1800 |   loc_qev_list= qev; | 
| 1801 |   loc_qev_size+= qev->event_size; | 
| 1802 |   /* | 
| 1803 |     We want to release to the global free list only occasionally, to avoid | 
| 1804 |     having to take the LOCK_rpl_thread muted too many times. | 
| 1805 |  | 
| 1806 |     However, we do need to release regularly. If we let the unreleased part | 
| 1807 |     grow too large, then the SQL driver thread may go to sleep waiting for | 
| 1808 |     the queue to drop below opt_slave_parallel_max_queued, and this in turn | 
| 1809 |     can stall all other worker threads for more stuff to do. | 
| 1810 |   */ | 
| 1811 |   if (++qev_free_pending >= QEV_BATCH_FREE || | 
| 1812 |       loc_qev_size >= opt_slave_parallel_max_queued/3) | 
| 1813 |   { | 
| 1814 |     mysql_mutex_lock(&LOCK_rpl_thread); | 
| 1815 |     batch_free(); | 
| 1816 |     mysql_mutex_unlock(&LOCK_rpl_thread); | 
| 1817 |   } | 
| 1818 | } | 
| 1819 |  | 
| 1820 |  | 
| 1821 | void | 
| 1822 | rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) | 
| 1823 | { | 
| 1824 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1825 |   qev->next= qev_free_list; | 
| 1826 |   qev_free_list= qev; | 
| 1827 | } | 
| 1828 |  | 
| 1829 |  | 
| 1830 | rpl_group_info* | 
| 1831 | rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, | 
| 1832 |                              rpl_parallel_entry *e, ulonglong event_size) | 
| 1833 | { | 
| 1834 |   rpl_group_info *rgi; | 
| 1835 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1836 |   if ((rgi= rgi_free_list)) | 
| 1837 |   { | 
| 1838 |     rgi_free_list= rgi->next; | 
| 1839 |     rgi->reinit(rli); | 
| 1840 |   } | 
| 1841 |   else | 
| 1842 |   { | 
| 1843 |     if(!(rgi= new rpl_group_info(rli))) | 
| 1844 |     { | 
| 1845 |       my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi)); | 
| 1846 |       return NULL; | 
| 1847 |     } | 
| 1848 |     rgi->is_parallel_exec = true; | 
| 1849 |   } | 
| 1850 |   if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()) && | 
| 1851 |       !rgi->deferred_events) | 
| 1852 |     rgi->deferred_events= new Deferred_log_events(rli); | 
| 1853 |   if (event_group_new_gtid(rgi, gtid_ev)) | 
| 1854 |   { | 
| 1855 |     free_rgi(rgi); | 
| 1856 |     my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); | 
| 1857 |     return NULL; | 
| 1858 |   } | 
| 1859 |   rgi->parallel_entry= e; | 
| 1860 |   rgi->relay_log= rli->last_inuse_relaylog; | 
| 1861 |   rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; | 
| 1862 |   rgi->retry_event_count= 0; | 
| 1863 |   rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE; | 
| 1864 |  | 
| 1865 |   return rgi; | 
| 1866 | } | 
| 1867 |  | 
| 1868 |  | 
| 1869 | void | 
| 1870 | rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi) | 
| 1871 | { | 
| 1872 |   DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); | 
| 1873 |   rgi->free_annotate_event(); | 
| 1874 |   if (!loc_rgi_list) | 
| 1875 |     loc_rgi_last_ptr_ptr= &rgi->next; | 
| 1876 |   else | 
| 1877 |     rgi->next= loc_rgi_list; | 
| 1878 |   loc_rgi_list= rgi; | 
| 1879 | } | 
| 1880 |  | 
| 1881 |  | 
| 1882 | void | 
| 1883 | rpl_parallel_thread::free_rgi(rpl_group_info *rgi) | 
| 1884 | { | 
| 1885 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1886 |   DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); | 
| 1887 |   rgi->free_annotate_event(); | 
| 1888 |   rgi->next= rgi_free_list; | 
| 1889 |   rgi_free_list= rgi; | 
| 1890 | } | 
| 1891 |  | 
| 1892 |  | 
| 1893 | group_commit_orderer * | 
| 1894 | rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev, | 
| 1895 |                              uint64 prior_sub_id) | 
| 1896 | { | 
| 1897 |   group_commit_orderer *gco; | 
| 1898 |   mysql_mutex_assert_owner(&LOCK_rpl_thread); | 
| 1899 |   if ((gco= gco_free_list)) | 
| 1900 |     gco_free_list= gco->next_gco; | 
| 1901 |   else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0)))) | 
| 1902 |   { | 
| 1903 |     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco)); | 
| 1904 |     return NULL; | 
| 1905 |   } | 
| 1906 |   mysql_cond_init(key_COND_group_commit_orderer, | 
| 1907 |                   &gco->COND_group_commit_orderer, NULL); | 
| 1908 |   gco->wait_count= wait_count; | 
| 1909 |   gco->prev_gco= prev; | 
| 1910 |   gco->next_gco= NULL; | 
| 1911 |   gco->prior_sub_id= prior_sub_id; | 
| 1912 |   gco->installed= false; | 
| 1913 |   gco->flags= 0; | 
| 1914 |   return gco; | 
| 1915 | } | 
| 1916 |  | 
| 1917 |  | 
| 1918 | void | 
| 1919 | rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) | 
| 1920 | { | 
| 1921 |   if (!loc_gco_list) | 
| 1922 |     loc_gco_last_ptr_ptr= &gco->next_gco; | 
| 1923 |   else | 
| 1924 |     gco->next_gco= loc_gco_list; | 
| 1925 |   loc_gco_list= gco; | 
| 1926 | } | 
| 1927 |  | 
| 1928 |  | 
| 1929 | rpl_parallel_thread_pool::rpl_parallel_thread_pool() | 
| 1930 |   : threads(0), free_list(0), count(0), inited(false), busy(false) | 
| 1931 | { | 
| 1932 | } | 
| 1933 |  | 
| 1934 |  | 
| 1935 | int | 
| 1936 | rpl_parallel_thread_pool::init(uint32 size) | 
| 1937 | { | 
| 1938 |   threads= NULL; | 
| 1939 |   free_list= NULL; | 
| 1940 |   count= 0; | 
| 1941 |   busy= false; | 
| 1942 |  | 
| 1943 |   mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, | 
| 1944 |                    MY_MUTEX_INIT_SLOW); | 
| 1945 |   mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); | 
| 1946 |   inited= true; | 
| 1947 |  | 
| 1948 |   /* | 
| 1949 |     The pool is initially empty. Threads will be spawned when a slave SQL | 
| 1950 |     thread is started. | 
| 1951 |   */ | 
| 1952 |  | 
| 1953 |   return 0; | 
| 1954 | } | 
| 1955 |  | 
| 1956 |  | 
| 1957 | void | 
| 1958 | rpl_parallel_thread_pool::destroy() | 
| 1959 | { | 
| 1960 |   if (!inited) | 
| 1961 |     return; | 
| 1962 |   rpl_parallel_change_thread_count(this, 0, 1); | 
| 1963 |   mysql_mutex_destroy(&LOCK_rpl_thread_pool); | 
| 1964 |   mysql_cond_destroy(&COND_rpl_thread_pool); | 
| 1965 |   inited= false; | 
| 1966 | } | 
| 1967 |  | 
| 1968 |  | 
| 1969 | /* | 
| 1970 |   Wait for a worker thread to become idle. When one does, grab the thread for | 
| 1971 |   our use and return it. | 
| 1972 |  | 
| 1973 |   Note that we return with the worker threads's LOCK_rpl_thread mutex locked. | 
| 1974 | */ | 
| 1975 | struct rpl_parallel_thread * | 
| 1976 | rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, | 
| 1977 |                                      rpl_parallel_entry *entry) | 
| 1978 | { | 
| 1979 |   rpl_parallel_thread *rpt; | 
| 1980 |  | 
| 1981 |   DBUG_ASSERT(count > 0); | 
| 1982 |   mysql_mutex_lock(&LOCK_rpl_thread_pool); | 
| 1983 |   while (unlikely(busy) || !(rpt= free_list)) | 
| 1984 |     mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); | 
| 1985 |   free_list= rpt->next; | 
| 1986 |   mysql_mutex_unlock(&LOCK_rpl_thread_pool); | 
| 1987 |   mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 1988 |   rpt->current_owner= owner; | 
| 1989 |   rpt->current_entry= entry; | 
| 1990 |  | 
| 1991 |   return rpt; | 
| 1992 | } | 
| 1993 |  | 
| 1994 |  | 
| 1995 | /* | 
| 1996 |   Release a thread to the thread pool. | 
| 1997 |   The thread should be locked, and should not have any work queued for it. | 
| 1998 | */ | 
| 1999 | void | 
| 2000 | rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) | 
| 2001 | { | 
| 2002 |   rpl_parallel_thread *list; | 
| 2003 |  | 
| 2004 |   mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread); | 
| 2005 |   DBUG_ASSERT(rpt->current_owner == NULL); | 
| 2006 |   mysql_mutex_lock(&LOCK_rpl_thread_pool); | 
| 2007 |   list= free_list; | 
| 2008 |   rpt->next= list; | 
| 2009 |   free_list= rpt; | 
| 2010 |   if (!list) | 
| 2011 |     mysql_cond_broadcast(&COND_rpl_thread_pool); | 
| 2012 |   mysql_mutex_unlock(&LOCK_rpl_thread_pool); | 
| 2013 | } | 
| 2014 |  | 
| 2015 |  | 
| 2016 | /* | 
| 2017 |   Obtain a worker thread that we can queue an event to. | 
| 2018 |  | 
| 2019 |   Each invocation allocates a new worker thread, to maximise | 
| 2020 |   parallelism. However, only up to a maximum of | 
| 2021 |   --slave-domain-parallel-threads workers can be occupied by a single | 
| 2022 |   replication domain; after that point, we start re-using worker threads that | 
| 2023 |   are still executing events that were queued earlier for this thread. | 
| 2024 |  | 
| 2025 |   We never queue more than --rpl-parallel-wait-queue_max amount of events | 
| 2026 |   for one worker, to avoid the SQL driver thread using up all memory with | 
| 2027 |   queued events while worker threads are stalling. | 
| 2028 |  | 
| 2029 |   Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread | 
| 2030 |   locked. Exception is if we were killed, in which case NULL is returned. | 
| 2031 |  | 
| 2032 |   The *did_enter_cond flag is set true if we had to wait for a worker thread | 
| 2033 |   to become free (with mysql_cond_wait()). If so, old_stage will also be set, | 
| 2034 |   and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead | 
| 2035 |   of mysql_mutex_unlock. | 
| 2036 |  | 
| 2037 |   If the flag `reuse' is set, the last worker thread will be returned again, | 
| 2038 |   if it is still available. Otherwise a new worker thread is allocated. | 
| 2039 | */ | 
| 2040 | rpl_parallel_thread * | 
| 2041 | rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, | 
| 2042 |                                   PSI_stage_info *old_stage, bool reuse) | 
| 2043 | { | 
| 2044 |   uint32 idx; | 
| 2045 |   Relay_log_info *rli= rgi->rli; | 
| 2046 |   rpl_parallel_thread *thr; | 
| 2047 |  | 
| 2048 |   idx= rpl_thread_idx; | 
| 2049 |   if (!reuse) | 
| 2050 |   { | 
| 2051 |     ++idx; | 
| 2052 |     if (idx >= rpl_thread_max) | 
| 2053 |       idx= 0; | 
| 2054 |     rpl_thread_idx= idx; | 
| 2055 |   } | 
| 2056 |   thr= rpl_threads[idx]; | 
| 2057 |   if (thr) | 
| 2058 |   { | 
| 2059 |     *did_enter_cond= false; | 
| 2060 |     mysql_mutex_lock(&thr->LOCK_rpl_thread); | 
| 2061 |     for (;;) | 
| 2062 |     { | 
| 2063 |       if (thr->current_owner != &rpl_threads[idx]) | 
| 2064 |       { | 
| 2065 |         /* | 
| 2066 |           The worker thread became idle, and returned to the free list and | 
| 2067 |           possibly was allocated to a different request. So we should allocate | 
| 2068 |           a new worker thread. | 
| 2069 |         */ | 
| 2070 |         unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, | 
| 2071 |                             did_enter_cond, old_stage); | 
| 2072 |         thr= NULL; | 
| 2073 |         break; | 
| 2074 |       } | 
| 2075 |       else if (thr->queued_size <= opt_slave_parallel_max_queued) | 
| 2076 |       { | 
| 2077 |         /* The thread is ready to queue into. */ | 
| 2078 |         break; | 
| 2079 |       } | 
| 2080 |       else if (unlikely(rli->sql_driver_thd->check_killed())) | 
| 2081 |       { | 
| 2082 |         unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, | 
| 2083 |                             did_enter_cond, old_stage); | 
| 2084 |         my_error(ER_CONNECTION_KILLED, MYF(0)); | 
| 2085 |         DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max" , | 
| 2086 |           { | 
| 2087 |             debug_sync_set_action(rli->sql_driver_thd, | 
| 2088 |                       STRING_WITH_LEN("now SIGNAL wait_queue_killed" )); | 
| 2089 |           };); | 
| 2090 |         slave_output_error_info(rgi, rli->sql_driver_thd); | 
| 2091 |         return NULL; | 
| 2092 |       } | 
| 2093 |       else | 
| 2094 |       { | 
| 2095 |         /* | 
| 2096 |           We have reached the limit of how much memory we are allowed to use | 
| 2097 |           for queuing events, so wait for the thread to consume some of its | 
| 2098 |           queue. | 
| 2099 |         */ | 
| 2100 |         if (!*did_enter_cond) | 
| 2101 |         { | 
| 2102 |           /* | 
| 2103 |             We need to do the debug_sync before ENTER_COND(). | 
| 2104 |             Because debug_sync changes the thd->mysys_var->current_mutex, | 
| 2105 |             and this can cause THD::awake to use the wrong mutex. | 
| 2106 |           */ | 
| 2107 |           DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max" , | 
| 2108 |             { | 
| 2109 |               debug_sync_set_action(rli->sql_driver_thd, | 
| 2110 |                         STRING_WITH_LEN("now SIGNAL wait_queue_ready" )); | 
| 2111 |             };); | 
| 2112 |           rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue, | 
| 2113 |                                           &thr->LOCK_rpl_thread, | 
| 2114 |                                           &stage_waiting_for_room_in_worker_thread, | 
| 2115 |                                           old_stage); | 
| 2116 |           *did_enter_cond= true; | 
| 2117 |         } | 
| 2118 |         mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread); | 
| 2119 |       } | 
| 2120 |     } | 
| 2121 |   } | 
| 2122 |   if (!thr) | 
| 2123 |     rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], | 
| 2124 |                                                              this); | 
| 2125 |  | 
| 2126 |   return thr; | 
| 2127 | } | 
| 2128 |  | 
| 2129 | static void | 
| 2130 | free_rpl_parallel_entry(void *element) | 
| 2131 | { | 
| 2132 |   rpl_parallel_entry *e= (rpl_parallel_entry *)element; | 
| 2133 |   while (e->current_gco) | 
| 2134 |   { | 
| 2135 |     group_commit_orderer *prev_gco= e->current_gco->prev_gco; | 
| 2136 |     dealloc_gco(e->current_gco); | 
| 2137 |     e->current_gco= prev_gco; | 
| 2138 |   } | 
| 2139 |   mysql_cond_destroy(&e->COND_parallel_entry); | 
| 2140 |   mysql_mutex_destroy(&e->LOCK_parallel_entry); | 
| 2141 |   my_free(e); | 
| 2142 | } | 
| 2143 |  | 
| 2144 |  | 
| 2145 | rpl_parallel::rpl_parallel() : | 
| 2146 |   current(NULL), sql_thread_stopping(false) | 
| 2147 | { | 
| 2148 |   my_hash_init(&domain_hash, &my_charset_bin, 32, | 
| 2149 |                offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), | 
| 2150 |                NULL, free_rpl_parallel_entry, HASH_UNIQUE); | 
| 2151 | } | 
| 2152 |  | 
| 2153 |  | 
| 2154 | void | 
| 2155 | rpl_parallel::reset() | 
| 2156 | { | 
| 2157 |   my_hash_reset(&domain_hash); | 
| 2158 |   current= NULL; | 
| 2159 |   sql_thread_stopping= false; | 
| 2160 | } | 
| 2161 |  | 
| 2162 |  | 
| 2163 | rpl_parallel::~rpl_parallel() | 
| 2164 | { | 
| 2165 |   my_hash_free(&domain_hash); | 
| 2166 | } | 
| 2167 |  | 
| 2168 |  | 
| 2169 | rpl_parallel_entry * | 
| 2170 | rpl_parallel::find(uint32 domain_id) | 
| 2171 | { | 
| 2172 |   struct rpl_parallel_entry *e; | 
| 2173 |  | 
| 2174 |   if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash, | 
| 2175 |                                                 (const uchar *)&domain_id, 0))) | 
| 2176 |   { | 
| 2177 |     /* Allocate a new, empty one. */ | 
| 2178 |     ulong count= opt_slave_domain_parallel_threads; | 
| 2179 |     if (count == 0 || count > opt_slave_parallel_threads) | 
| 2180 |       count= opt_slave_parallel_threads; | 
| 2181 |     rpl_parallel_thread **p; | 
| 2182 |     if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), | 
| 2183 |                          &e, sizeof(*e), | 
| 2184 |                          &p, count*sizeof(*p), | 
| 2185 |                          NULL)) | 
| 2186 |     { | 
| 2187 |       my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); | 
| 2188 |       return NULL; | 
| 2189 |     } | 
| 2190 |     e->rpl_threads= p; | 
| 2191 |     e->rpl_thread_max= count; | 
| 2192 |     e->domain_id= domain_id; | 
| 2193 |     e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; | 
| 2194 |     e->pause_sub_id= (uint64)ULONGLONG_MAX; | 
| 2195 |     if (my_hash_insert(&domain_hash, (uchar *)e)) | 
| 2196 |     { | 
| 2197 |       my_free(e); | 
| 2198 |       return NULL; | 
| 2199 |     } | 
| 2200 |     mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, | 
| 2201 |                      MY_MUTEX_INIT_FAST); | 
| 2202 |     mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); | 
| 2203 |   } | 
| 2204 |   else | 
| 2205 |     e->force_abort= false; | 
| 2206 |  | 
| 2207 |   return e; | 
| 2208 | } | 
| 2209 |  | 
| 2210 | /** | 
| 2211 |   Wait until all sql worker threads has stopped processing | 
| 2212 |  | 
| 2213 |   This is called when sql thread has been killed/stopped | 
| 2214 | */ | 
| 2215 |  | 
| 2216 | void | 
| 2217 | rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) | 
| 2218 | { | 
| 2219 |   struct rpl_parallel_entry *e; | 
| 2220 |   rpl_parallel_thread *rpt; | 
| 2221 |   uint32 i, j; | 
| 2222 |  | 
| 2223 |   /* | 
| 2224 |     First signal all workers that they must force quit; no more events will | 
| 2225 |     be queued to complete any partial event groups executed. | 
| 2226 |   */ | 
| 2227 |   for (i= 0; i < domain_hash.records; ++i) | 
| 2228 |   { | 
| 2229 |     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); | 
| 2230 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 2231 |     /* | 
| 2232 |       We want the worker threads to stop as quickly as is safe. If the slave | 
| 2233 |       SQL threads are behind, we could have significant amount of events | 
| 2234 |       queued for the workers, and we want to stop without waiting for them | 
| 2235 |       all to be applied first. But if any event group has already started | 
| 2236 |       executing in a worker, we want to be sure that all prior event groups | 
| 2237 |       are also executed, so that we stop at a consistent point in the binlog | 
| 2238 |       stream (per replication domain). | 
| 2239 |  | 
| 2240 |       All event groups wait for e->count_committing_event_groups to reach | 
| 2241 |       the value of group_commit_orderer::wait_count before starting to | 
| 2242 |       execute. Thus, at this point we know that any event group with a | 
| 2243 |       strictly larger wait_count are safe to skip, none of them can have | 
| 2244 |       started executing yet. So we set e->stop_count here and use it to | 
| 2245 |       decide in the worker threads whether to continue executing an event | 
| 2246 |       group or whether to skip it, when force_abort is set. | 
| 2247 |  | 
| 2248 |       If we stop due to reaching the START SLAVE UNTIL condition, then we | 
| 2249 |       need to continue executing any queued events up to that point. | 
| 2250 |     */ | 
| 2251 |     e->force_abort= true; | 
| 2252 |     e->stop_count= rli->stop_for_until ? | 
| 2253 |       e->count_queued_event_groups : e->count_committing_event_groups; | 
| 2254 |     mysql_mutex_unlock(&e->LOCK_parallel_entry); | 
| 2255 |     for (j= 0; j < e->rpl_thread_max; ++j) | 
| 2256 |     { | 
| 2257 |       if ((rpt= e->rpl_threads[j])) | 
| 2258 |       { | 
| 2259 |         mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 2260 |         if (rpt->current_owner == &e->rpl_threads[j]) | 
| 2261 |           mysql_cond_signal(&rpt->COND_rpl_thread); | 
| 2262 |         mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 2263 |       } | 
| 2264 |     } | 
| 2265 |   } | 
| 2266 |   DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger" , | 
| 2267 |   { | 
| 2268 |     debug_sync_set_action(thd, | 
| 2269 |                           STRING_WITH_LEN("now SIGNAL wait_for_done_waiting" )); | 
| 2270 |   };); | 
| 2271 |  | 
| 2272 |   for (i= 0; i < domain_hash.records; ++i) | 
| 2273 |   { | 
| 2274 |     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); | 
| 2275 |     for (j= 0; j < e->rpl_thread_max; ++j) | 
| 2276 |     { | 
| 2277 |       if ((rpt= e->rpl_threads[j])) | 
| 2278 |       { | 
| 2279 |         mysql_mutex_lock(&rpt->LOCK_rpl_thread); | 
| 2280 |         while (rpt->current_owner == &e->rpl_threads[j]) | 
| 2281 |           mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); | 
| 2282 |         mysql_mutex_unlock(&rpt->LOCK_rpl_thread); | 
| 2283 |       } | 
| 2284 |     } | 
| 2285 |   } | 
| 2286 | } | 
| 2287 |  | 
| 2288 |  | 
| 2289 | /* | 
| 2290 |   This function handles the case where the SQL driver thread reached the | 
| 2291 |   START SLAVE UNTIL position; we stop queueing more events but continue | 
| 2292 |   processing remaining, already queued events; then use executes manual | 
| 2293 |   STOP SLAVE; then this function signals to worker threads that they | 
| 2294 |   should stop the processing of any remaining queued events. | 
| 2295 | */ | 
| 2296 | void | 
| 2297 | rpl_parallel::stop_during_until() | 
| 2298 | { | 
| 2299 |   struct rpl_parallel_entry *e; | 
| 2300 |   uint32 i; | 
| 2301 |  | 
| 2302 |   for (i= 0; i < domain_hash.records; ++i) | 
| 2303 |   { | 
| 2304 |     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); | 
| 2305 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 2306 |     if (e->force_abort) | 
| 2307 |       e->stop_count= e->count_committing_event_groups; | 
| 2308 |     mysql_mutex_unlock(&e->LOCK_parallel_entry); | 
| 2309 |   } | 
| 2310 | } | 
| 2311 |  | 
| 2312 |  | 
| 2313 | bool | 
| 2314 | rpl_parallel::workers_idle() | 
| 2315 | { | 
| 2316 |   struct rpl_parallel_entry *e; | 
| 2317 |   uint32 i, max_i; | 
| 2318 |  | 
| 2319 |   max_i= domain_hash.records; | 
| 2320 |   for (i= 0; i < max_i; ++i) | 
| 2321 |   { | 
| 2322 |     bool active; | 
| 2323 |     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); | 
| 2324 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 2325 |     active= e->current_sub_id > e->last_committed_sub_id; | 
| 2326 |     mysql_mutex_unlock(&e->LOCK_parallel_entry); | 
| 2327 |     if (active) | 
| 2328 |       break; | 
| 2329 |   } | 
| 2330 |   return (i == max_i); | 
| 2331 | } | 
| 2332 |  | 
| 2333 |  | 
| 2334 | int | 
| 2335 | rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, | 
| 2336 |                                          Format_description_log_event *fdev) | 
| 2337 | { | 
| 2338 |   uint32 idx; | 
| 2339 |   rpl_parallel_thread *thr; | 
| 2340 |   rpl_parallel_thread::queued_event *qev; | 
| 2341 |   Relay_log_info *rli= rgi->rli; | 
| 2342 |  | 
| 2343 |   /* | 
| 2344 |     We only need to queue the server restart if we still have a thread working | 
| 2345 |     on a (potentially partial) event group. | 
| 2346 |  | 
| 2347 |     If the last thread we queued for has finished, then it cannot have any | 
| 2348 |     partial event group that needs aborting. | 
| 2349 |  | 
| 2350 |     Thus there is no need for the full complexity of choose_thread(). We only | 
| 2351 |     need to check if we have a current worker thread, and queue for it if so. | 
| 2352 |   */ | 
| 2353 |   idx= rpl_thread_idx; | 
| 2354 |   thr= rpl_threads[idx]; | 
| 2355 |   if (!thr) | 
| 2356 |     return 0; | 
| 2357 |   mysql_mutex_lock(&thr->LOCK_rpl_thread); | 
| 2358 |   if (thr->current_owner != &rpl_threads[idx]) | 
| 2359 |   { | 
| 2360 |     /* No active worker thread, so no need to queue the master restart. */ | 
| 2361 |     mysql_mutex_unlock(&thr->LOCK_rpl_thread); | 
| 2362 |     return 0; | 
| 2363 |   } | 
| 2364 |  | 
| 2365 |   if (!(qev= thr->get_qev(fdev, 0, rli))) | 
| 2366 |   { | 
| 2367 |     mysql_mutex_unlock(&thr->LOCK_rpl_thread); | 
| 2368 |     return 1; | 
| 2369 |   } | 
| 2370 |  | 
| 2371 |   qev->rgi= rgi; | 
| 2372 |   qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART; | 
| 2373 |   qev->entry_for_queued= this; | 
| 2374 |   qev->ir= rli->last_inuse_relaylog; | 
| 2375 |   ++qev->ir->queued_count; | 
| 2376 |   thr->enqueue(qev); | 
| 2377 |   mysql_cond_signal(&thr->COND_rpl_thread); | 
| 2378 |   mysql_mutex_unlock(&thr->LOCK_rpl_thread); | 
| 2379 |   return 0; | 
| 2380 | } | 
| 2381 |  | 
| 2382 |  | 
| 2383 | int | 
| 2384 | rpl_parallel::wait_for_workers_idle(THD *thd) | 
| 2385 | { | 
| 2386 |   uint32 i, max_i; | 
| 2387 |  | 
| 2388 |   /* | 
| 2389 |     The domain_hash is only accessed by the SQL driver thread, so it is safe | 
| 2390 |     to iterate over without a lock. | 
| 2391 |   */ | 
| 2392 |   max_i= domain_hash.records; | 
| 2393 |   for (i= 0; i < max_i; ++i) | 
| 2394 |   { | 
| 2395 |     PSI_stage_info old_stage; | 
| 2396 |     struct rpl_parallel_entry *e; | 
| 2397 |     int err= 0; | 
| 2398 |  | 
| 2399 |     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); | 
| 2400 |     mysql_mutex_lock(&e->LOCK_parallel_entry); | 
| 2401 |     ++e->need_sub_id_signal; | 
| 2402 |     thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, | 
| 2403 |                     &stage_waiting_for_workers_idle, &old_stage); | 
| 2404 |     while (e->current_sub_id > e->last_committed_sub_id) | 
| 2405 |     { | 
| 2406 |       if (unlikely(thd->check_killed())) | 
| 2407 |       { | 
| 2408 |         thd->send_kill_message(); | 
| 2409 |         err= 1; | 
| 2410 |         break; | 
| 2411 |       } | 
| 2412 |       mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); | 
| 2413 |     } | 
| 2414 |     --e->need_sub_id_signal; | 
| 2415 |     thd->EXIT_COND(&old_stage); | 
| 2416 |     if (err) | 
| 2417 |       return err; | 
| 2418 |   } | 
| 2419 |   return 0; | 
| 2420 | } | 
| 2421 |  | 
| 2422 |  | 
| 2423 | /* | 
| 2424 |   Handle seeing a GTID during slave restart in GTID mode. If we stopped with | 
| 2425 |   different replication domains having reached different positions in the relay | 
| 2426 |   log, we need to skip event groups in domains that are further progressed. | 
| 2427 |  | 
| 2428 |   Updates the state with the seen GTID, and returns true if this GTID should | 
| 2429 |   be skipped, false otherwise. | 
| 2430 | */ | 
| 2431 | bool | 
| 2432 | process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid) | 
| 2433 | { | 
| 2434 |   slave_connection_state::entry *gtid_entry; | 
| 2435 |   slave_connection_state *state= &rli->restart_gtid_pos; | 
| 2436 |  | 
| 2437 |   if (likely(state->count() == 0) || | 
| 2438 |       !(gtid_entry= state->find_entry(gtid->domain_id))) | 
| 2439 |     return false; | 
| 2440 |   if (gtid->server_id == gtid_entry->gtid.server_id) | 
| 2441 |   { | 
| 2442 |     uint64 seq_no= gtid_entry->gtid.seq_no; | 
| 2443 |     if (gtid->seq_no >= seq_no) | 
| 2444 |     { | 
| 2445 |       /* | 
| 2446 |         This domain has reached its start position. So remove it, so that | 
| 2447 |         further events will be processed normally. | 
| 2448 |       */ | 
| 2449 |       state->remove(>id_entry->gtid); | 
| 2450 |     } | 
| 2451 |     return gtid->seq_no <= seq_no; | 
| 2452 |   } | 
| 2453 |   else | 
| 2454 |     return true; | 
| 2455 | } | 
| 2456 |  | 
| 2457 |  | 
| 2458 | /* | 
| 2459 |   This is used when we get an error during processing in do_event(); | 
| 2460 |   We will not queue any event to the thread, but we still need to wake it up | 
| 2461 |   to be sure that it will be returned to the pool. | 
| 2462 | */ | 
| 2463 | static void | 
| 2464 | abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, | 
| 2465 |                       bool *did_enter_cond, PSI_stage_info *old_stage) | 
| 2466 | { | 
| 2467 |   unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread, | 
| 2468 |                       did_enter_cond, old_stage); | 
| 2469 |   mysql_cond_signal(&cur_thread->COND_rpl_thread); | 
| 2470 | } | 
| 2471 |  | 
| 2472 |  | 
| 2473 | /* | 
| 2474 |   do_event() is executed by the sql_driver_thd thread. | 
| 2475 |   It's main purpose is to find a thread that can execute the query. | 
| 2476 |  | 
| 2477 |   @retval  0    ok, event was accepted | 
| 2478 |   @retval  1    error | 
| 2479 |   @retval -1    event should be executed serially, in the sql driver thread | 
| 2480 | */ | 
| 2481 |  | 
| 2482 | int | 
| 2483 | rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, | 
| 2484 |                        ulonglong event_size) | 
| 2485 | { | 
| 2486 |   rpl_parallel_entry *e; | 
| 2487 |   rpl_parallel_thread *cur_thread; | 
| 2488 |   rpl_parallel_thread::queued_event *qev; | 
| 2489 |   rpl_group_info *rgi= NULL; | 
| 2490 |   Relay_log_info *rli= serial_rgi->rli; | 
| 2491 |   enum Log_event_type typ; | 
| 2492 |   bool is_group_event; | 
| 2493 |   bool did_enter_cond= false; | 
| 2494 |   PSI_stage_info old_stage; | 
| 2495 |  | 
| 2496 |   DBUG_EXECUTE_IF("slave_crash_if_parallel_apply" , DBUG_SUICIDE();); | 
| 2497 |   /* Handle master log name change, seen in Rotate_log_event. */ | 
| 2498 |   typ= ev->get_type_code(); | 
| 2499 |   if (unlikely(typ == ROTATE_EVENT)) | 
| 2500 |   { | 
| 2501 |     Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev); | 
| 2502 |     if ((rev->server_id != global_system_variables.server_id || | 
| 2503 |          rli->replicate_same_server_id) && | 
| 2504 |         !rev->is_relay_log_event() && | 
| 2505 |         !rli->is_in_group()) | 
| 2506 |     { | 
| 2507 |       memcpy(rli->future_event_master_log_name, | 
| 2508 |              rev->new_log_ident, rev->ident_len+1); | 
| 2509 |       rli->notify_group_master_log_name_update(); | 
| 2510 |     } | 
| 2511 |   } | 
| 2512 |  | 
| 2513 |   /* | 
| 2514 |     Execute queries non-parallel if slave_skip_counter is set, as it's is | 
| 2515 |     easier to skip queries in single threaded mode. | 
| 2516 |   */ | 
| 2517 |   if (rli->slave_skip_counter) | 
| 2518 |     return -1; | 
| 2519 |  | 
| 2520 |   /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ | 
| 2521 |   is_group_event= Log_event::is_group_event(typ); | 
| 2522 |   if (unlikely(!current) && typ != GTID_EVENT && | 
| 2523 |       !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)) | 
| 2524 |     return -1; | 
| 2525 |  | 
| 2526 |   /* Note: rli->data_lock is released by sql_delay_event(). */ | 
| 2527 |   if (sql_delay_event(ev, rli->sql_driver_thd, serial_rgi)) | 
| 2528 |   { | 
| 2529 |     /* | 
| 2530 |       If sql_delay_event() returns non-zero, it means that the wait timed out | 
| 2531 |       due to slave stop. We should not queue the event in this case, it must | 
| 2532 |       not be applied yet. | 
| 2533 |     */ | 
| 2534 |     delete ev; | 
| 2535 |     return 1; | 
| 2536 |   } | 
| 2537 |  | 
| 2538 |   if (unlikely(typ == FORMAT_DESCRIPTION_EVENT)) | 
| 2539 |   { | 
| 2540 |     Format_description_log_event *fdev= | 
| 2541 |       static_cast<Format_description_log_event *>(ev); | 
| 2542 |     if (fdev->created) | 
| 2543 |     { | 
| 2544 |       /* | 
| 2545 |         This format description event marks a new binlog after a master server | 
| 2546 |         restart. We are going to close all temporary tables to clean up any | 
| 2547 |         possible left-overs after a prior master crash. | 
| 2548 |  | 
| 2549 |         Thus we need to wait for all prior events to execute to completion, | 
| 2550 |         in case they need access to any of the temporary tables. | 
| 2551 |  | 
| 2552 |         We also need to notify the worker thread running the prior incomplete | 
| 2553 |         event group (if any), as such event group signifies an incompletely | 
| 2554 |         written group cut short by a master crash, and must be rolled back. | 
| 2555 |       */ | 
| 2556 |       if (current->queue_master_restart(serial_rgi, fdev) || | 
| 2557 |           wait_for_workers_idle(rli->sql_driver_thd)) | 
| 2558 |       { | 
| 2559 |         delete ev; | 
| 2560 |         return 1; | 
| 2561 |       } | 
| 2562 |     } | 
| 2563 |   } | 
| 2564 |   else if (unlikely(typ == GTID_LIST_EVENT)) | 
| 2565 |   { | 
| 2566 |     Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev); | 
| 2567 |     rpl_gtid *list= glev->list; | 
| 2568 |     uint32 count= glev->count; | 
| 2569 |     rli->update_relay_log_state(list, count); | 
| 2570 |     while (count) | 
| 2571 |     { | 
| 2572 |       process_gtid_for_restart_pos(rli, list); | 
| 2573 |       ++list; | 
| 2574 |       --count; | 
| 2575 |     } | 
| 2576 |   } | 
| 2577 |  | 
| 2578 |   /* | 
| 2579 |     Stop queueing additional event groups once the SQL thread is requested to | 
| 2580 |     stop. | 
| 2581 |  | 
| 2582 |     We have to queue any remaining events of any event group that has already | 
| 2583 |     been partially queued, but after that we will just ignore any further | 
| 2584 |     events the SQL driver thread may try to queue, and eventually it will stop. | 
| 2585 |   */ | 
| 2586 |   if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) | 
| 2587 |     sql_thread_stopping= true; | 
| 2588 |   if (sql_thread_stopping) | 
| 2589 |   { | 
| 2590 |     delete ev; | 
| 2591 |     /* | 
| 2592 |       Return "no error"; normal stop is not an error, and otherwise the error | 
| 2593 |       has already been recorded. | 
| 2594 |     */ | 
| 2595 |     return 0; | 
| 2596 |   } | 
| 2597 |  | 
| 2598 |   if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event) | 
| 2599 |   { | 
| 2600 |     if (typ == GTID_EVENT) | 
| 2601 |       rli->gtid_skip_flag= GTID_SKIP_NOT; | 
| 2602 |     else | 
| 2603 |     { | 
| 2604 |       if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE) | 
| 2605 |       { | 
| 2606 |         if (!Log_event::is_part_of_group(typ)) | 
| 2607 |           rli->gtid_skip_flag= GTID_SKIP_NOT; | 
| 2608 |       } | 
| 2609 |       else | 
| 2610 |       { | 
| 2611 |         DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); | 
| 2612 |         if (typ == XID_EVENT || | 
| 2613 |             (typ == QUERY_EVENT &&  // COMMIT/ROLLBACK are never compressed | 
| 2614 |              (((Query_log_event *)ev)->is_commit() || | 
| 2615 |               ((Query_log_event *)ev)->is_rollback()))) | 
| 2616 |           rli->gtid_skip_flag= GTID_SKIP_NOT; | 
| 2617 |       } | 
| 2618 |       delete_or_keep_event_post_apply(serial_rgi, typ, ev); | 
| 2619 |       return 0; | 
| 2620 |     } | 
| 2621 |   } | 
| 2622 |  | 
| 2623 |   if (typ == GTID_EVENT) | 
| 2624 |   { | 
| 2625 |     rpl_gtid gtid; | 
| 2626 |     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); | 
| 2627 |     uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || | 
| 2628 |                        rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? | 
| 2629 |                        0 : gtid_ev->domain_id); | 
| 2630 |     if (!(e= find(domain_id))) | 
| 2631 |     { | 
| 2632 |       my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); | 
| 2633 |       delete ev; | 
| 2634 |       return 1; | 
| 2635 |     } | 
| 2636 |     current= e; | 
| 2637 |  | 
| 2638 |     gtid.domain_id= gtid_ev->domain_id; | 
| 2639 |     gtid.server_id= gtid_ev->server_id; | 
| 2640 |     gtid.seq_no= gtid_ev->seq_no; | 
| 2641 |     rli->update_relay_log_state(>id, 1); | 
| 2642 |     if (process_gtid_for_restart_pos(rli, >id)) | 
| 2643 |     { | 
| 2644 |       /* | 
| 2645 |         This domain has progressed further into the relay log before the last | 
| 2646 |         SQL thread restart. So we need to skip this event group to not doubly | 
| 2647 |         apply it. | 
| 2648 |       */ | 
| 2649 |       rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ? | 
| 2650 |                             GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); | 
| 2651 |       delete_or_keep_event_post_apply(serial_rgi, typ, ev); | 
| 2652 |       return 0; | 
| 2653 |     } | 
| 2654 |   } | 
| 2655 |   else | 
| 2656 |     e= current; | 
| 2657 |  | 
| 2658 |   /* | 
| 2659 |     Find a worker thread to queue the event for. | 
| 2660 |     Prefer a new thread, so we maximise parallelism (at least for the group | 
| 2661 |     commit). But do not exceed a limit of --slave-domain-parallel-threads; | 
| 2662 |     instead re-use a thread that we queued for previously. | 
| 2663 |   */ | 
| 2664 |   cur_thread= | 
| 2665 |     e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, | 
| 2666 |                      typ != GTID_EVENT); | 
| 2667 |   if (!cur_thread) | 
| 2668 |   { | 
| 2669 |     /* This means we were killed. The error is already signalled. */ | 
| 2670 |     delete ev; | 
| 2671 |     return 1; | 
| 2672 |   } | 
| 2673 |  | 
| 2674 |   if (!(qev= cur_thread->get_qev(ev, event_size, rli))) | 
| 2675 |   { | 
| 2676 |     abandon_worker_thread(rli->sql_driver_thd, cur_thread, | 
| 2677 |                           &did_enter_cond, &old_stage); | 
| 2678 |     delete ev; | 
| 2679 |     return 1; | 
| 2680 |   } | 
| 2681 |  | 
| 2682 |   if (typ == GTID_EVENT) | 
| 2683 |   { | 
| 2684 |     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); | 
| 2685 |     bool new_gco; | 
| 2686 |     enum_slave_parallel_mode mode= rli->mi->parallel_mode; | 
| 2687 |     uchar gtid_flags= gtid_ev->flags2; | 
| 2688 |     group_commit_orderer *gco; | 
| 2689 |     uint8 force_switch_flag; | 
| 2690 |     enum rpl_group_info::enum_speculation speculation; | 
| 2691 |  | 
| 2692 |     if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) | 
| 2693 |     { | 
| 2694 |       cur_thread->free_qev(qev); | 
| 2695 |       abandon_worker_thread(rli->sql_driver_thd, cur_thread, | 
| 2696 |                             &did_enter_cond, &old_stage); | 
| 2697 |       delete ev; | 
| 2698 |       return 1; | 
| 2699 |     } | 
| 2700 |  | 
| 2701 |     /* | 
| 2702 |       We queue the event group in a new worker thread, to run in parallel | 
| 2703 |       with previous groups. | 
| 2704 |  | 
| 2705 |       To preserve commit order within the replication domain, we set up | 
| 2706 |       rgi->wait_commit_sub_id to make the new group commit only after the | 
| 2707 |       previous group has committed. | 
| 2708 |  | 
| 2709 |       Event groups that group-committed together on the master can be run | 
| 2710 |       in parallel with each other without restrictions. But one batch of | 
| 2711 |       group-commits may not start before all groups in the previous batch | 
| 2712 |       have initiated their commit phase; we set up rgi->gco to ensure that. | 
| 2713 |     */ | 
| 2714 |     rgi->wait_commit_sub_id= e->current_sub_id; | 
| 2715 |     rgi->wait_commit_group_info= e->current_group_info; | 
| 2716 |  | 
| 2717 |     speculation= rpl_group_info::SPECULATE_NO; | 
| 2718 |     new_gco= true; | 
| 2719 |     force_switch_flag= 0; | 
| 2720 |     gco= e->current_gco; | 
| 2721 |     if (likely(gco)) | 
| 2722 |     { | 
| 2723 |       uint8 flags= gco->flags; | 
| 2724 |  | 
| 2725 |       if (mode <= SLAVE_PARALLEL_MINIMAL || | 
| 2726 |           !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) || | 
| 2727 |           e->last_commit_id != gtid_ev->commit_id) | 
| 2728 |         flags|= group_commit_orderer::MULTI_BATCH; | 
| 2729 |       /* Make sure we do not attempt to run DDL in parallel speculatively. */ | 
| 2730 |       if (gtid_flags & Gtid_log_event::FL_DDL) | 
| 2731 |         flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH); | 
| 2732 |  | 
| 2733 |       if (!(flags & group_commit_orderer::MULTI_BATCH)) | 
| 2734 |       { | 
| 2735 |         /* | 
| 2736 |           Still the same batch of event groups that group-committed together | 
| 2737 |           on the master, so we can run in parallel. | 
| 2738 |         */ | 
| 2739 |         new_gco= false; | 
| 2740 |       } | 
| 2741 |       else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) && | 
| 2742 |                !(flags & group_commit_orderer::FORCE_SWITCH)) | 
| 2743 |       { | 
| 2744 |         /* | 
| 2745 |           In transactional parallel mode, we optimistically attempt to run | 
| 2746 |           non-DDL in parallel. In case of conflicts, we catch the conflict as | 
| 2747 |           a deadlock or other error, roll back and retry serially. | 
| 2748 |  | 
| 2749 |           The assumption is that only a few event groups will be | 
| 2750 |           non-transactional or otherwise unsuitable for parallel apply. Those | 
| 2751 |           transactions are still scheduled in parallel, but we set a flag that | 
| 2752 |           will make the worker thread wait for everything before to complete | 
| 2753 |           before starting. | 
| 2754 |         */ | 
| 2755 |         new_gco= false; | 
| 2756 |         if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) || | 
| 2757 |             ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) || | 
| 2758 |                (gtid_flags & Gtid_log_event::FL_WAITED)) && | 
| 2759 |               (mode < SLAVE_PARALLEL_AGGRESSIVE))) | 
| 2760 |         { | 
| 2761 |           /* | 
| 2762 |             This transaction should not be speculatively run in parallel with | 
| 2763 |             what came before, either because it cannot safely be rolled back in | 
| 2764 |             case of a conflict, or because it was marked as likely to conflict | 
| 2765 |             and require expensive rollback and retry. | 
| 2766 |  | 
| 2767 |             Here we mark it as such, and then the worker thread will do a | 
| 2768 |             wait_for_prior_commit() before starting it. We do not introduce a | 
| 2769 |             new group_commit_orderer, since we still want following transactions | 
| 2770 |             to run in parallel with transactions prior to this one. | 
| 2771 |           */ | 
| 2772 |           speculation= rpl_group_info::SPECULATE_WAIT; | 
| 2773 |         } | 
| 2774 |         else | 
| 2775 |           speculation= rpl_group_info::SPECULATE_OPTIMISTIC; | 
| 2776 |       } | 
| 2777 |       gco->flags= flags; | 
| 2778 |     } | 
| 2779 |     else | 
| 2780 |     { | 
| 2781 |       if (gtid_flags & Gtid_log_event::FL_DDL) | 
| 2782 |         force_switch_flag= group_commit_orderer::FORCE_SWITCH; | 
| 2783 |     } | 
| 2784 |     rgi->speculation= speculation; | 
| 2785 |  | 
| 2786 |     if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) | 
| 2787 |       e->last_commit_id= gtid_ev->commit_id; | 
| 2788 |     else | 
| 2789 |       e->last_commit_id= 0; | 
| 2790 |  | 
| 2791 |     if (new_gco) | 
| 2792 |     { | 
| 2793 |       /* | 
| 2794 |         Do not run this event group in parallel with what came before; instead | 
| 2795 |         wait for everything prior to at least have started its commit phase, to | 
| 2796 |         avoid any risk of performing any conflicting action too early. | 
| 2797 |  | 
| 2798 |         Remember the count that marks the end of the previous batch of event | 
| 2799 |         groups that run in parallel, and allocate a new gco. | 
| 2800 |       */ | 
| 2801 |       uint64 count= e->count_queued_event_groups; | 
| 2802 |  | 
| 2803 |       if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id))) | 
| 2804 |       { | 
| 2805 |         cur_thread->free_rgi(rgi); | 
| 2806 |         cur_thread->free_qev(qev); | 
| 2807 |         abandon_worker_thread(rli->sql_driver_thd, cur_thread, | 
| 2808 |                               &did_enter_cond, &old_stage); | 
| 2809 |         delete ev; | 
| 2810 |         return 1; | 
| 2811 |       } | 
| 2812 |       gco->flags|= force_switch_flag; | 
| 2813 |       e->current_gco= gco; | 
| 2814 |     } | 
| 2815 |     rgi->gco= gco; | 
| 2816 |  | 
| 2817 |     qev->rgi= e->current_group_info= rgi; | 
| 2818 |     e->current_sub_id= rgi->gtid_sub_id; | 
| 2819 |     ++e->count_queued_event_groups; | 
| 2820 |   } | 
| 2821 |   else if (!is_group_event) | 
| 2822 |   { | 
| 2823 |     int err; | 
| 2824 |     bool tmp; | 
| 2825 |     /* | 
| 2826 |       Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. | 
| 2827 |       Same for events not preceeded by GTID (we should not see those normally, | 
| 2828 |       but they might be from an old master). | 
| 2829 |     */ | 
| 2830 |     qev->rgi= serial_rgi; | 
| 2831 |  | 
| 2832 |     tmp= serial_rgi->is_parallel_exec; | 
| 2833 |     serial_rgi->is_parallel_exec= true; | 
| 2834 |     err= rpt_handle_event(qev, NULL); | 
| 2835 |     serial_rgi->is_parallel_exec= tmp; | 
| 2836 |     if (ev->is_relay_log_event()) | 
| 2837 |       qev->future_event_master_log_pos= 0; | 
| 2838 |     else if (typ == ROTATE_EVENT) | 
| 2839 |       qev->future_event_master_log_pos= | 
| 2840 |         (static_cast<Rotate_log_event *>(ev))->pos; | 
| 2841 |     else | 
| 2842 |       qev->future_event_master_log_pos= ev->log_pos; | 
| 2843 |     delete_or_keep_event_post_apply(serial_rgi, typ, ev); | 
| 2844 |  | 
| 2845 |     if (err) | 
| 2846 |     { | 
| 2847 |       cur_thread->free_qev(qev); | 
| 2848 |       abandon_worker_thread(rli->sql_driver_thd, cur_thread, | 
| 2849 |                             &did_enter_cond, &old_stage); | 
| 2850 |       return 1; | 
| 2851 |     } | 
| 2852 |     /* | 
| 2853 |       Queue a position update, so that the position will be updated in a | 
| 2854 |       reasonable way relative to other events: | 
| 2855 |  | 
| 2856 |        - If the currently executing events are queued serially for a single | 
| 2857 |          thread, the position will only be updated when everything before has | 
| 2858 |          completed. | 
| 2859 |  | 
| 2860 |        - If we are executing multiple independent events in parallel, then at | 
| 2861 |          least the position will not be updated until one of them has reached | 
| 2862 |          the current point. | 
| 2863 |     */ | 
| 2864 |     qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE; | 
| 2865 |     qev->entry_for_queued= e; | 
| 2866 |   } | 
| 2867 |   else | 
| 2868 |   { | 
| 2869 |     qev->rgi= e->current_group_info; | 
| 2870 |   } | 
| 2871 |  | 
| 2872 |   /* | 
| 2873 |     Queue the event for processing. | 
| 2874 |   */ | 
| 2875 |   qev->ir= rli->last_inuse_relaylog; | 
| 2876 |   ++qev->ir->queued_count; | 
| 2877 |   cur_thread->enqueue(qev); | 
| 2878 |   unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, | 
| 2879 |                       &did_enter_cond, &old_stage); | 
| 2880 |   mysql_cond_signal(&cur_thread->COND_rpl_thread); | 
| 2881 |  | 
| 2882 |   return 0; | 
| 2883 | } | 
| 2884 |  |