1/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16
17/* Definitions for MariaDB global transaction ID (GTID). */
18
19#include "mariadb.h"
20#include "sql_priv.h"
21#include "unireg.h"
22#include "mariadb.h"
23#include "sql_base.h"
24#include "sql_parse.h"
25#include "key.h"
26#include "rpl_gtid.h"
27#include "rpl_rli.h"
28#include "slave.h"
29#include "log_event.h"
30
31const LEX_CSTRING rpl_gtid_slave_state_table_name=
32 { STRING_WITH_LEN("gtid_slave_pos") };
33
34
35void
36rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
37 rpl_group_info *rgi)
38{
39 int err;
40 /*
41 Add the gtid to the HASH in the replication slave state.
42
43 We must do this only _after_ commit, so that for parallel replication,
44 there will not be an attempt to delete the corresponding table row before
45 it is even committed.
46 */
47 mysql_mutex_lock(&LOCK_slave_state);
48 err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
49 mysql_mutex_unlock(&LOCK_slave_state);
50 if (err)
51 {
52 sql_print_warning("Slave: Out of memory during slave state maintenance. "
53 "Some no longer necessary rows in table "
54 "mysql.%s may be left undeleted.",
55 rpl_gtid_slave_state_table_name.str);
56 /*
57 Such failure is not fatal. We will fail to delete the row for this
58 GTID, but it will do no harm and will be removed automatically on next
59 server restart.
60 */
61 }
62}
63
64
65int
66rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
67{
68 DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
69
70 /*
71 Update the GTID position, if we have it and did not already update
72 it in a GTID transaction.
73 */
74 if (rgi->gtid_pending)
75 {
76 uint64 sub_id= rgi->gtid_sub_id;
77 void *hton= NULL;
78
79 rgi->gtid_pending= false;
80 if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
81 {
82 if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
83 DBUG_RETURN(1);
84 update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
85 }
86 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
87 }
88 DBUG_RETURN(0);
89}
90
91
92/*
93 Check GTID event execution when --gtid-ignore-duplicates.
94
95 The idea with --gtid-ignore-duplicates is that we allow multiple master
96 connections (in multi-source replication) to all receive the same GTIDs and
97 event groups. Only one instance of each is applied; we use the sequence
98 number in the GTID to decide whether a GTID has already been applied.
99
100 So if the seq_no of a GTID (or a higher sequence number) has already been
101 applied, then the event should be skipped. If not then the event should be
102 applied.
103
104 To avoid two master connections tring to apply the same event
105 simultaneously, only one is allowed to work in any given domain at any point
106 in time. The associated Relay_log_info object is called the owner of the
107 domain (and there can be multiple parallel worker threads working in that
108 domain for that Relay_log_info). Any other Relay_log_info/master connection
109 must wait for the domain to become free, or for their GTID to have been
110 applied, before being allowed to proceed.
111
112 Returns:
113 0 This GTID is already applied, it should be skipped.
114 1 The GTID is not yet applied; this rli is now the owner, and must apply
115 the event and release the domain afterwards.
116 -1 Error (out of memory to allocate a new element for the domain).
117*/
118int
119rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
120{
121 uint32 domain_id= gtid->domain_id;
122 uint64 seq_no= gtid->seq_no;
123 rpl_slave_state::element *elem;
124 int res;
125 bool did_enter_cond= false;
126 PSI_stage_info old_stage;
127 THD *UNINIT_VAR(thd);
128 Relay_log_info *rli= rgi->rli;
129
130 mysql_mutex_lock(&LOCK_slave_state);
131 if (!(elem= get_element(domain_id)))
132 {
133 my_error(ER_OUT_OF_RESOURCES, MYF(0));
134 res= -1;
135 goto err;
136 }
137 /*
138 Note that the elem pointer does not change once inserted in the hash. So
139 we can re-use the pointer without looking it up again in the hash after
140 each lock release and re-take.
141 */
142
143 for (;;)
144 {
145 if (elem->highest_seq_no >= seq_no)
146 {
147 /* This sequence number is already applied, ignore it. */
148 res= 0;
149 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
150 break;
151 }
152 if (!elem->owner_rli)
153 {
154 /* The domain became free, grab it and apply the event. */
155 elem->owner_rli= rli;
156 elem->owner_count= 1;
157 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
158 res= 1;
159 break;
160 }
161 if (elem->owner_rli == rli)
162 {
163 /* Already own this domain, increment reference count and apply event. */
164 ++elem->owner_count;
165 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
166 res= 1;
167 break;
168 }
169 thd= rgi->thd;
170 if (unlikely(thd->check_killed()))
171 {
172 thd->send_kill_message();
173 res= -1;
174 break;
175 }
176 /*
177 Someone else is currently processing this GTID (or an earlier one).
178 Wait for them to complete (or fail), and then check again.
179 */
180 if (!did_enter_cond)
181 {
182 thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
183 &stage_gtid_wait_other_connection, &old_stage);
184 did_enter_cond= true;
185 }
186 mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
187 &LOCK_slave_state);
188 }
189
190err:
191 if (did_enter_cond)
192 thd->EXIT_COND(&old_stage);
193 else
194 mysql_mutex_unlock(&LOCK_slave_state);
195 return res;
196}
197
198
199void
200rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
201{
202 element *elem= NULL;
203
204 mysql_mutex_lock(&LOCK_slave_state);
205 if (!(elem= get_element(rgi->current_gtid.domain_id)))
206 {
207 /*
208 We cannot really deal with error here, as we are already called in an
209 error handling case (transaction failure and rollback).
210
211 However, get_element() only fails if the element did not exist already
212 and could not be allocated due to out-of-memory - and if it did not
213 exist, then we would not get here in the first place.
214 */
215 mysql_mutex_unlock(&LOCK_slave_state);
216 return;
217 }
218
219 if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
220 {
221 uint32 count= elem->owner_count;
222 DBUG_ASSERT(count > 0);
223 DBUG_ASSERT(elem->owner_rli == rgi->rli);
224 --count;
225 elem->owner_count= count;
226 if (count == 0)
227 {
228 elem->owner_rli= NULL;
229 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
230 }
231 }
232 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
233 mysql_mutex_unlock(&LOCK_slave_state);
234}
235
236
237static void
238rpl_slave_state_free_element(void *arg)
239{
240 struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
241 mysql_cond_destroy(&elem->COND_wait_gtid);
242 mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
243 my_free(elem);
244}
245
246
247rpl_slave_state::rpl_slave_state()
248 : last_sub_id(0), gtid_pos_tables(0), loaded(false)
249{
250 mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
251 MY_MUTEX_INIT_SLOW);
252 my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
253 sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
254 my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
255}
256
257
258rpl_slave_state::~rpl_slave_state()
259{
260 free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
261 truncate_hash();
262 my_hash_free(&hash);
263 delete_dynamic(&gtid_sort_array);
264 mysql_mutex_destroy(&LOCK_slave_state);
265}
266
267
268void
269rpl_slave_state::truncate_hash()
270{
271 uint32 i;
272
273 for (i= 0; i < hash.records; ++i)
274 {
275 element *e= (element *)my_hash_element(&hash, i);
276 list_element *l= e->list;
277 list_element *next;
278 while (l)
279 {
280 next= l->next;
281 my_free(l);
282 l= next;
283 }
284 /* The element itself is freed by the hash element free function. */
285 }
286 my_hash_reset(&hash);
287}
288
289
290int
291rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
292 uint64 seq_no, void *hton, rpl_group_info *rgi)
293{
294 element *elem= NULL;
295 list_element *list_elem= NULL;
296
297 DBUG_ASSERT(hton || !loaded);
298 if (!(elem= get_element(domain_id)))
299 return 1;
300
301 if (seq_no > elem->highest_seq_no)
302 elem->highest_seq_no= seq_no;
303 if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
304 {
305 /*
306 Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
307 Signal (and remove) them. The waiter will handle all the processing
308 of all pending MASTER_GTID_WAIT(), so we do not slow down the
309 replication SQL thread.
310 */
311 mysql_mutex_assert_owner(&LOCK_slave_state);
312 elem->gtid_waiter= NULL;
313 mysql_cond_broadcast(&elem->COND_wait_gtid);
314 }
315
316 if (rgi)
317 {
318 if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
319 {
320#ifdef DBUG_ASSERT_EXISTS
321 Relay_log_info *rli= rgi->rli;
322#endif
323 uint32 count= elem->owner_count;
324 DBUG_ASSERT(count > 0);
325 DBUG_ASSERT(elem->owner_rli == rli);
326 --count;
327 elem->owner_count= count;
328 if (count == 0)
329 {
330 elem->owner_rli= NULL;
331 mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
332 }
333 }
334 rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
335 }
336
337 if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
338 return 1;
339 list_elem->server_id= server_id;
340 list_elem->sub_id= sub_id;
341 list_elem->seq_no= seq_no;
342 list_elem->hton= hton;
343
344 elem->add(list_elem);
345 if (last_sub_id < sub_id)
346 last_sub_id= sub_id;
347
348 return 0;
349}
350
351
352struct rpl_slave_state::element *
353rpl_slave_state::get_element(uint32 domain_id)
354{
355 struct element *elem;
356
357 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
358 if (elem)
359 return elem;
360
361 if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
362 return NULL;
363 elem->list= NULL;
364 elem->domain_id= domain_id;
365 elem->highest_seq_no= 0;
366 elem->gtid_waiter= NULL;
367 elem->owner_rli= NULL;
368 elem->owner_count= 0;
369 mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
370 mysql_cond_init(key_COND_gtid_ignore_duplicates,
371 &elem->COND_gtid_ignore_duplicates, 0);
372 if (my_hash_insert(&hash, (uchar *)elem))
373 {
374 my_free(elem);
375 return NULL;
376 }
377 return elem;
378}
379
380
381int
382rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
383{
384 element *e;
385 if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
386 return 1;
387 while (list)
388 {
389 list_element *next= list->next;
390 e->add(list);
391 list= next;
392 }
393 return 0;
394}
395
396
397int
398rpl_slave_state::truncate_state_table(THD *thd)
399{
400 TABLE_LIST tlist;
401 int err= 0;
402
403 tmp_disable_binlog(thd);
404 tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE);
405 if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
406 {
407 err= tlist.table->file->ha_truncate();
408
409 if (err)
410 {
411 ha_rollback_trans(thd, FALSE);
412 close_thread_tables(thd);
413 ha_rollback_trans(thd, TRUE);
414 }
415 else
416 {
417 ha_commit_trans(thd, FALSE);
418 close_thread_tables(thd);
419 ha_commit_trans(thd, TRUE);
420 }
421 thd->mdl_context.release_transactional_locks();
422 }
423
424 reenable_binlog(thd);
425 return err;
426}
427
428
429static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
430 { { STRING_WITH_LEN("domain_id") },
431 { STRING_WITH_LEN("int(10) unsigned") },
432 {NULL, 0} },
433 { { STRING_WITH_LEN("sub_id") },
434 { STRING_WITH_LEN("bigint(20) unsigned") },
435 {NULL, 0} },
436 { { STRING_WITH_LEN("server_id") },
437 { STRING_WITH_LEN("int(10) unsigned") },
438 {NULL, 0} },
439 { { STRING_WITH_LEN("seq_no") },
440 { STRING_WITH_LEN("bigint(20) unsigned") },
441 {NULL, 0} },
442};
443
444static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
445
446static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
447 array_elements(mysql_rpl_slave_state_coltypes),
448 mysql_rpl_slave_state_coltypes,
449 array_elements(mysql_rpl_slave_state_pk_parts),
450 mysql_rpl_slave_state_pk_parts
451};
452
453static Table_check_intact_log_error gtid_table_intact;
454
455/*
456 Check that the mysql.gtid_slave_pos table has the correct definition.
457*/
458int
459gtid_check_rpl_slave_state_table(TABLE *table)
460{
461 int err;
462
463 if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
464 my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
465 rpl_gtid_slave_state_table_name.str);
466 return err;
467}
468
469
470/*
471 Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
472 that is already in use by the current transaction, if any.
473*/
474void
475rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
476{
477 struct gtid_pos_table *list, *table_entry, *default_entry;
478
479 /*
480 See comments on rpl_slave_state::gtid_pos_tables for rules around proper
481 access to the list.
482 */
483 list= (struct gtid_pos_table *)
484 my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
485
486 Ha_trx_info *ha_info;
487 uint count = 0;
488 for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
489 {
490 void *trx_hton= ha_info->ht();
491 table_entry= list;
492
493 if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
494 continue;
495 while (table_entry)
496 {
497 if (table_entry->table_hton == trx_hton)
498 {
499 if (likely(table_entry->state == GTID_POS_AVAILABLE))
500 {
501 *out_tablename= table_entry->table_name;
502 /*
503 Check if this is a cross-engine transaction, so we can correctly
504 maintain the rpl_transactions_multi_engine status variable.
505 */
506 if (count >= 1)
507 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
508 else
509 {
510 for (;;)
511 {
512 ha_info= ha_info->next();
513 if (!ha_info)
514 break;
515 if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
516 {
517 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
518 break;
519 }
520 }
521 }
522 return;
523 }
524 /*
525 This engine is marked to automatically create the table.
526 We cannot easily do this here (possibly in the middle of a
527 transaction). But we can request the slave background thread
528 to create it, and in a short while it should become available
529 for following transactions.
530 */
531#ifdef HAVE_REPLICATION
532 slave_background_gtid_pos_create_request(table_entry);
533#endif
534 break;
535 }
536 table_entry= table_entry->next;
537 }
538 ++count;
539 }
540 /*
541 If we cannot find any table whose engine matches an engine that is
542 already active in the transaction, or if there is no current transaction
543 engines available, we return the default gtid_slave_pos table.
544 */
545 default_entry= (struct gtid_pos_table *)
546 my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
547 *out_tablename= default_entry->table_name;
548 /* Record in status that we failed to find a suitable gtid_pos table. */
549 if (count > 0)
550 {
551 statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
552 if (count > 1)
553 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
554 }
555}
556
557
558/*
559 Write a gtid to the replication slave state table.
560
561 Do it as part of the transaction, to get slave crash safety, or as a separate
562 transaction if !in_transaction (eg. MyISAM or DDL).
563
564 gtid The global transaction id for this event group.
565 sub_id Value allocated within the sub_id when the event group was
566 read (sub_id must be consistent with commit order in master binlog).
567
568 Note that caller must later ensure that the new gtid and sub_id is inserted
569 into the appropriate HASH element with rpl_slave_state.add(), so that it can
570 be deleted later. But this must only be done after COMMIT if in transaction.
571*/
572int
573rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
574 bool in_transaction, bool in_statement,
575 void **out_hton)
576{
577 TABLE_LIST tlist;
578 int err= 0, not_sql_thread;
579 bool table_opened= false;
580 TABLE *table;
581 list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
582 uint64 best_sub_id;
583 element *elem;
584 ulonglong thd_saved_option= thd->variables.option_bits;
585 Query_tables_list lex_backup;
586 wait_for_commit* suspended_wfc;
587 void *hton= NULL;
588 LEX_CSTRING gtid_pos_table_name;
589 DBUG_ENTER("record_gtid");
590
591 *out_hton= NULL;
592 if (unlikely(!loaded))
593 {
594 /*
595 Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
596 corrupt.
597
598 We already complained loudly about this, but we can try to continue
599 until the DBA fixes it.
600 */
601 DBUG_RETURN(0);
602 }
603
604 if (!in_statement)
605 thd->reset_for_next_command();
606
607 /*
608 Only the SQL thread can call select_gtid_pos_table without a mutex
609 Other threads needs to use a mutex and take into account that the
610 result may change during execution, so we have to make a copy.
611 */
612
613 if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
614 mysql_mutex_lock(&LOCK_slave_state);
615 select_gtid_pos_table(thd, &gtid_pos_table_name);
616 if (not_sql_thread)
617 {
618 LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
619 gtid_pos_table_name.length);
620 mysql_mutex_unlock(&LOCK_slave_state);
621 if (!tmp)
622 DBUG_RETURN(1);
623 gtid_pos_table_name= *tmp;
624 }
625
626 DBUG_EXECUTE_IF("gtid_inject_record_gtid",
627 {
628 my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
629 DBUG_RETURN(1);
630 } );
631
632 /*
633 If we are applying a non-transactional event group, we will be committing
634 here a transaction, but that does not imply that the event group has
635 completed or has been binlogged. So we should not trigger
636 wakeup_subsequent_commits() here.
637
638 Note: An alternative here could be to put a call to mark_start_commit() in
639 stmt_done() before the call to record_and_update_gtid(). This would
640 prevent later calling mark_start_commit() after we have run
641 wakeup_subsequent_commits() from committing the GTID update transaction
642 (which must be avoided to avoid accessing freed group_commit_orderer
643 object). It would also allow following event groups to start slightly
644 earlier. And in the cases where record_gtid() is called without an active
645 transaction, the current statement should have been binlogged already, so
646 binlog order is preserved.
647
648 But this is rather subtle, and potentially fragile. And it does not really
649 seem worth it; non-transactional loads are unlikely to benefit much from
650 parallel replication in any case. So for now, we go with the simple
651 suspend/resume of wakeup_subsequent_commits() here in record_gtid().
652 */
653 suspended_wfc= thd->suspend_subsequent_commits();
654 thd->lex->reset_n_backup_query_tables_list(&lex_backup);
655 tlist.init_one_table(&MYSQL_SCHEMA_NAME, &gtid_pos_table_name, NULL, TL_WRITE);
656 if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
657 goto end;
658 table_opened= true;
659 table= tlist.table;
660 hton= table->s->db_type();
661
662 if ((err= gtid_check_rpl_slave_state_table(table)))
663 goto end;
664
665#ifdef WITH_WSREP
666 /*
667 Updates in slave state table should not be appended to galera transaction
668 writeset.
669 */
670 thd->wsrep_ignore_table= true;
671#endif
672
673 if (!in_transaction)
674 {
675 DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
676 thd->variables.option_bits&=
677 ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
678 OPTION_GTID_BEGIN);
679 }
680 else
681 thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
682
683 bitmap_set_all(table->write_set);
684 table->rpl_write_set= table->write_set;
685
686 table->field[0]->store((ulonglong)gtid->domain_id, true);
687 table->field[1]->store(sub_id, true);
688 table->field[2]->store((ulonglong)gtid->server_id, true);
689 table->field[3]->store(gtid->seq_no, true);
690 DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
691 if ((err= table->file->ha_write_row(table->record[0])))
692 {
693 table->file->print_error(err, MYF(0));
694 goto end;
695 }
696 *out_hton= hton;
697
698 if(opt_bin_log &&
699 (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
700 gtid->seq_no)))
701 {
702 my_error(ER_OUT_OF_RESOURCES, MYF(0));
703 goto end;
704 }
705
706 mysql_mutex_lock(&LOCK_slave_state);
707 if ((elem= get_element(gtid->domain_id)) == NULL)
708 {
709 mysql_mutex_unlock(&LOCK_slave_state);
710 my_error(ER_OUT_OF_RESOURCES, MYF(0));
711 err= 1;
712 goto end;
713 }
714
715 /* Now pull out all GTIDs that were recorded in this engine. */
716 delete_list = NULL;
717 next_ptr_ptr= &elem->list;
718 cur= elem->list;
719 best_sub_id= 0;
720 best_ptr_ptr= NULL;
721 while (cur)
722 {
723 list_element *next= cur->next;
724 if (cur->hton == hton)
725 {
726 /* Belongs to same engine, so move it to the delete list. */
727 cur->next= delete_list;
728 delete_list= cur;
729 if (cur->sub_id > best_sub_id)
730 {
731 best_sub_id= cur->sub_id;
732 best_ptr_ptr= &delete_list;
733 }
734 else if (best_ptr_ptr == &delete_list)
735 best_ptr_ptr= &cur->next;
736 }
737 else
738 {
739 /* Another engine, leave it in the list. */
740 if (cur->sub_id > best_sub_id)
741 {
742 best_sub_id= cur->sub_id;
743 /* Current best is not on the delete list. */
744 best_ptr_ptr= NULL;
745 }
746 *next_ptr_ptr= cur;
747 next_ptr_ptr= &cur->next;
748 }
749 cur= next;
750 }
751 *next_ptr_ptr= NULL;
752 /*
753 If the highest sub_id element is on the delete list, put it back on the
754 original list, to preserve the highest sub_id element in the table for
755 GTID position recovery.
756 */
757 if (best_ptr_ptr)
758 {
759 cur= *best_ptr_ptr;
760 *best_ptr_ptr= cur->next;
761 cur->next= elem->list;
762 elem->list= cur;
763 }
764 mysql_mutex_unlock(&LOCK_slave_state);
765
766 if (!delete_list)
767 goto end;
768
769 /* Now delete any already committed GTIDs. */
770 bitmap_set_bit(table->read_set, table->field[0]->field_index);
771 bitmap_set_bit(table->read_set, table->field[1]->field_index);
772
773 if ((err= table->file->ha_index_init(0, 0)))
774 {
775 table->file->print_error(err, MYF(0));
776 goto end;
777 }
778 while (delete_list)
779 {
780 uchar key_buffer[4+8];
781
782 DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
783 { err= ENOENT;
784 table->file->print_error(err, MYF(0));
785 /* `break' does not work inside DBUG_EXECUTE_IF */
786 goto dbug_break; });
787
788 next= delete_list->next;
789
790 table->field[1]->store(delete_list->sub_id, true);
791 /* domain_id is already set in table->record[0] from write_row() above. */
792 key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
793 if (table->file->ha_index_read_map(table->record[1], key_buffer,
794 HA_WHOLE_KEY, HA_READ_KEY_EXACT))
795 /* We cannot find the row, assume it is already deleted. */
796 ;
797 else if ((err= table->file->ha_delete_row(table->record[1])))
798 table->file->print_error(err, MYF(0));
799 /*
800 In case of error, we still discard the element from the list. We do
801 not want to endlessly error on the same element in case of table
802 corruption or such.
803 */
804 my_free(delete_list);
805 delete_list= next;
806 if (err)
807 break;
808 }
809IF_DBUG(dbug_break:, )
810 table->file->ha_index_end();
811
812end:
813
814#ifdef WITH_WSREP
815 thd->wsrep_ignore_table= false;
816#endif
817
818 if (table_opened)
819 {
820 if (err || (err= ha_commit_trans(thd, FALSE)))
821 {
822 /*
823 If error, we need to put any remaining delete_list back into the HASH
824 so we can do another delete attempt later.
825 */
826 if (delete_list)
827 {
828 mysql_mutex_lock(&LOCK_slave_state);
829 put_back_list(gtid->domain_id, delete_list);
830 mysql_mutex_unlock(&LOCK_slave_state);
831 }
832
833 ha_rollback_trans(thd, FALSE);
834 }
835 close_thread_tables(thd);
836 if (in_transaction)
837 thd->mdl_context.release_statement_locks();
838 else
839 thd->mdl_context.release_transactional_locks();
840 }
841 thd->lex->restore_backup_query_tables_list(&lex_backup);
842 thd->variables.option_bits= thd_saved_option;
843 thd->resume_subsequent_commits(suspended_wfc);
844 DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
845 {
846 if (gtid->server_id == 100)
847 my_sleep(500000);
848 });
849 DBUG_RETURN(err);
850}
851
852
853uint64
854rpl_slave_state::next_sub_id(uint32 domain_id)
855{
856 uint64 sub_id= 0;
857
858 mysql_mutex_lock(&LOCK_slave_state);
859 sub_id= ++last_sub_id;
860 mysql_mutex_unlock(&LOCK_slave_state);
861
862 return sub_id;
863}
864
865/* A callback used in sorting of gtid list based on domain_id. */
866static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
867{
868 uint32 d1= ((rpl_gtid *)id1)->domain_id;
869 uint32 d2= ((rpl_gtid *)id2)->domain_id;
870
871 if (d1 < d2)
872 return -1;
873 else if (d1 > d2)
874 return 1;
875 return 0;
876}
877
878/* Format the specified gtid and store it in the given string buffer. */
879bool
880rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
881{
882 if (*first)
883 *first= false;
884 else
885 if (dest->append(",",1))
886 return true;
887 return
888 dest->append_ulonglong(gtid->domain_id) ||
889 dest->append("-",1) ||
890 dest->append_ulonglong(gtid->server_id) ||
891 dest->append("-",1) ||
892 dest->append_ulonglong(gtid->seq_no);
893}
894
895/*
896 Sort the given gtid list based on domain_id and store them in the specified
897 string.
898*/
899static bool
900rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
901{
902 bool first= true, res= true;
903
904 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
905
906 for (uint i= 0; i < gtid_dynarr->elements; i ++)
907 {
908 rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
909 if (rpl_slave_state_tostring_helper(str, gtid, &first))
910 goto err;
911 }
912 res= false;
913
914err:
915 return res;
916}
917
918
919/* Sort the given gtid list based on domain_id and call cb for each gtid. */
920static bool
921rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
922 int (*cb)(rpl_gtid *, void *),
923 void *data)
924{
925 rpl_gtid *gtid;
926 bool res= true;
927
928 sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
929
930 for (uint i= 0; i < gtid_dynarr->elements; i ++)
931 {
932 gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
933 if ((*cb)(gtid, data))
934 goto err;
935 }
936 res= false;
937
938err:
939 return res;
940}
941
942int
943rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
944 rpl_gtid *extra_gtids, uint32 num_extra,
945 bool sort)
946{
947 uint32 i;
948 HASH gtid_hash;
949 uchar *rec;
950 rpl_gtid *gtid;
951 int res= 1;
952 bool locked= false;
953
954 my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
955 sizeof(uint32), NULL, NULL, HASH_UNIQUE);
956 for (i= 0; i < num_extra; ++i)
957 if (extra_gtids[i].server_id == global_system_variables.server_id &&
958 my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
959 goto err;
960
961 mysql_mutex_lock(&LOCK_slave_state);
962 locked= true;
963 reset_dynamic(&gtid_sort_array);
964
965 for (i= 0; i < hash.records; ++i)
966 {
967 uint64 best_sub_id;
968 rpl_gtid best_gtid;
969 element *e= (element *)my_hash_element(&hash, i);
970 list_element *l= e->list;
971
972 if (!l)
973 continue; /* Nothing here */
974
975 best_gtid.domain_id= e->domain_id;
976 best_gtid.server_id= l->server_id;
977 best_gtid.seq_no= l->seq_no;
978 best_sub_id= l->sub_id;
979 while ((l= l->next))
980 {
981 if (l->sub_id > best_sub_id)
982 {
983 best_sub_id= l->sub_id;
984 best_gtid.server_id= l->server_id;
985 best_gtid.seq_no= l->seq_no;
986 }
987 }
988
989 /* Check if we have something newer in the extra list. */
990 rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
991 if (rec)
992 {
993 gtid= (rpl_gtid *)rec;
994 if (gtid->seq_no > best_gtid.seq_no)
995 memcpy(&best_gtid, gtid, sizeof(best_gtid));
996 if (my_hash_delete(&gtid_hash, rec))
997 {
998 goto err;
999 }
1000 }
1001
1002 if ((res= sort ? insert_dynamic(&gtid_sort_array,
1003 (const void *) &best_gtid) :
1004 (*cb)(&best_gtid, data)))
1005 {
1006 goto err;
1007 }
1008 }
1009
1010 /* Also add any remaining extra domain_ids. */
1011 for (i= 0; i < gtid_hash.records; ++i)
1012 {
1013 gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
1014 if ((res= sort ? insert_dynamic(&gtid_sort_array, (const void *) gtid) :
1015 (*cb)(gtid, data)))
1016 {
1017 goto err;
1018 }
1019 }
1020
1021 if (sort && rpl_slave_state_tostring_helper(&gtid_sort_array, cb, data))
1022 {
1023 goto err;
1024 }
1025
1026 res= 0;
1027
1028err:
1029 if (locked) mysql_mutex_unlock(&LOCK_slave_state);
1030 my_hash_free(&gtid_hash);
1031
1032 return res;
1033}
1034
1035
1036struct rpl_slave_state_tostring_data {
1037 String *dest;
1038 bool first;
1039};
1040static int
1041rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
1042{
1043 rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
1044 return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
1045}
1046
1047
1048/*
1049 Prepare the current slave state as a string, suitable for sending to the
1050 master to request to receive binlog events starting from that GTID state.
1051
1052 The state consists of the most recently applied GTID for each domain_id,
1053 ie. the one with the highest sub_id within each domain_id.
1054
1055 Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
1056 a server was previously a master and now needs to connect to a new master as
1057 a slave. For each domain_id, if the GTID in the binlog was logged with our
1058 own server_id _and_ has a higher seq_no than what is in the slave state,
1059 then this should be used as the position to start replicating at. This
1060 allows to promote a slave as new master, and connect the old master as a
1061 slave with MASTER_GTID_POS=AUTO.
1062*/
1063int
1064rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
1065{
1066 struct rpl_slave_state_tostring_data data;
1067 data.first= true;
1068 data.dest= dest;
1069
1070 return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
1071 num_extra, true);
1072}
1073
1074
1075/*
1076 Lookup a domain_id in the current replication slave state.
1077
1078 Returns false if the domain_id has no entries in the slave state.
1079 Otherwise returns true, and fills in out_gtid with the corresponding
1080 GTID.
1081*/
1082bool
1083rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
1084{
1085 element *elem;
1086 list_element *list;
1087 uint64 best_sub_id;
1088
1089 mysql_mutex_lock(&LOCK_slave_state);
1090 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1091 if (!elem || !(list= elem->list))
1092 {
1093 mysql_mutex_unlock(&LOCK_slave_state);
1094 return false;
1095 }
1096
1097 out_gtid->domain_id= domain_id;
1098 out_gtid->server_id= list->server_id;
1099 out_gtid->seq_no= list->seq_no;
1100 best_sub_id= list->sub_id;
1101
1102 while ((list= list->next))
1103 {
1104 if (best_sub_id > list->sub_id)
1105 continue;
1106 best_sub_id= list->sub_id;
1107 out_gtid->server_id= list->server_id;
1108 out_gtid->seq_no= list->seq_no;
1109 }
1110
1111 mysql_mutex_unlock(&LOCK_slave_state);
1112 return true;
1113}
1114
1115
1116/*
1117 Parse a GTID at the start of a string, and update the pointer to point
1118 at the first character after the parsed GTID.
1119
1120 Returns 0 on ok, non-zero on parse error.
1121*/
1122static int
1123gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
1124{
1125 char *q;
1126 const char *p= *ptr;
1127 uint64 v1, v2, v3;
1128 int err= 0;
1129
1130 q= (char*) end;
1131 v1= (uint64)my_strtoll10(p, &q, &err);
1132 if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
1133 return 1;
1134 p= q+1;
1135 q= (char*) end;
1136 v2= (uint64)my_strtoll10(p, &q, &err);
1137 if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
1138 return 1;
1139 p= q+1;
1140 q= (char*) end;
1141 v3= (uint64)my_strtoll10(p, &q, &err);
1142 if (err != 0)
1143 return 1;
1144
1145 out_gtid->domain_id= (uint32) v1;
1146 out_gtid->server_id= (uint32) v2;
1147 out_gtid->seq_no= v3;
1148 *ptr= q;
1149 return 0;
1150}
1151
1152
1153rpl_gtid *
1154gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
1155{
1156 const char *p= const_cast<char *>(str);
1157 const char *end= p + str_len;
1158 uint32 len= 0, alloc_len= 5;
1159 rpl_gtid *list= NULL;
1160
1161 for (;;)
1162 {
1163 rpl_gtid gtid;
1164
1165 if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
1166 {
1167 my_free(list);
1168 return NULL;
1169 }
1170 if ((!list || len >= alloc_len) &&
1171 !(list=
1172 (rpl_gtid *)my_realloc(list,
1173 (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
1174 MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
1175 return NULL;
1176 list[len++]= gtid;
1177
1178 if (p == end)
1179 break;
1180 if (*p != ',')
1181 {
1182 my_free(list);
1183 return NULL;
1184 }
1185 ++p;
1186 }
1187 *out_len= len;
1188 return list;
1189}
1190
1191
1192/*
1193 Update the slave replication state with the GTID position obtained from
1194 master when connecting with old-style (filename,offset) position.
1195
1196 If RESET is true then all existing entries are removed. Otherwise only
1197 domain_ids mentioned in the STATE_FROM_MASTER are changed.
1198
1199 Returns 0 if ok, non-zero if error.
1200*/
1201int
1202rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
1203 bool reset, bool in_statement)
1204{
1205 const char *end= state_from_master + len;
1206
1207 if (reset)
1208 {
1209 if (truncate_state_table(thd))
1210 return 1;
1211 truncate_hash();
1212 }
1213 if (state_from_master == end)
1214 return 0;
1215 for (;;)
1216 {
1217 rpl_gtid gtid;
1218 uint64 sub_id;
1219 void *hton= NULL;
1220
1221 if (gtid_parser_helper(&state_from_master, end, &gtid) ||
1222 !(sub_id= next_sub_id(gtid.domain_id)) ||
1223 record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
1224 update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
1225 return 1;
1226 if (state_from_master == end)
1227 break;
1228 if (*state_from_master != ',')
1229 return 1;
1230 ++state_from_master;
1231 }
1232 return 0;
1233}
1234
1235
1236bool
1237rpl_slave_state::is_empty()
1238{
1239 uint32 i;
1240 bool result= true;
1241
1242 mysql_mutex_lock(&LOCK_slave_state);
1243 for (i= 0; i < hash.records; ++i)
1244 {
1245 element *e= (element *)my_hash_element(&hash, i);
1246 if (e->list)
1247 {
1248 result= false;
1249 break;
1250 }
1251 }
1252 mysql_mutex_unlock(&LOCK_slave_state);
1253
1254 return result;
1255}
1256
1257
1258void
1259rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
1260{
1261 struct gtid_pos_table *cur, *next;
1262
1263 cur= list;
1264 while (cur)
1265 {
1266 next= cur->next;
1267 my_free(cur);
1268 cur= next;
1269 }
1270}
1271
1272
1273/*
1274 Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
1275 The caller must be holding LOCK_slave_state. Additionally, this function
1276 must only be called while all SQL threads are stopped.
1277*/
1278void
1279rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
1280 rpl_slave_state::gtid_pos_table *default_entry)
1281{
1282 gtid_pos_table *old_list;
1283
1284 mysql_mutex_assert_owner(&LOCK_slave_state);
1285 old_list= (struct gtid_pos_table *)gtid_pos_tables;
1286 my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
1287 my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
1288 MY_MEMORY_ORDER_RELEASE);
1289 free_gtid_pos_tables(old_list);
1290}
1291
1292
1293void
1294rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
1295{
1296 mysql_mutex_assert_owner(&LOCK_slave_state);
1297 entry->next= (struct gtid_pos_table *)gtid_pos_tables;
1298 my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
1299}
1300
1301
1302struct rpl_slave_state::gtid_pos_table *
1303rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
1304 rpl_slave_state::gtid_pos_table_state state)
1305{
1306 struct gtid_pos_table *p;
1307 char *allocated_str;
1308
1309 if (!my_multi_malloc(MYF(MY_WME),
1310 &p, sizeof(*p),
1311 &allocated_str, table_name->length+1,
1312 NULL))
1313 {
1314 my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
1315 return NULL;
1316 }
1317 memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
1318 p->next = NULL;
1319 p->table_hton= hton;
1320 p->table_name.str= allocated_str;
1321 p->table_name.length= table_name->length;
1322 p->state= state;
1323 return p;
1324}
1325
1326
1327void rpl_binlog_state::init()
1328{
1329 my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
1330 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
1331 my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
1332 mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
1333 MY_MUTEX_INIT_SLOW);
1334 initialized= 1;
1335}
1336
1337void
1338rpl_binlog_state::reset_nolock()
1339{
1340 uint32 i;
1341
1342 for (i= 0; i < hash.records; ++i)
1343 my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
1344 my_hash_reset(&hash);
1345}
1346
1347
1348void
1349rpl_binlog_state::reset()
1350{
1351 mysql_mutex_lock(&LOCK_binlog_state);
1352 reset_nolock();
1353 mysql_mutex_unlock(&LOCK_binlog_state);
1354}
1355
1356
1357void rpl_binlog_state::free()
1358{
1359 if (initialized)
1360 {
1361 initialized= 0;
1362 reset_nolock();
1363 my_hash_free(&hash);
1364 delete_dynamic(&gtid_sort_array);
1365 mysql_mutex_destroy(&LOCK_binlog_state);
1366 }
1367}
1368
1369
1370bool
1371rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
1372{
1373 uint32 i;
1374 bool res= false;
1375
1376 mysql_mutex_lock(&LOCK_binlog_state);
1377 reset_nolock();
1378 for (i= 0; i < count; ++i)
1379 {
1380 if (update_nolock(&(list[i]), false))
1381 {
1382 res= true;
1383 break;
1384 }
1385 }
1386 mysql_mutex_unlock(&LOCK_binlog_state);
1387 return res;
1388}
1389
1390
1391static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
1392{
1393 rpl_binlog_state *self= (rpl_binlog_state *)data;
1394 return self->update_nolock(gtid, false);
1395}
1396
1397
1398bool
1399rpl_binlog_state::load(rpl_slave_state *slave_pos)
1400{
1401 bool res= false;
1402
1403 mysql_mutex_lock(&LOCK_binlog_state);
1404 reset_nolock();
1405 if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
1406 res= true;
1407 mysql_mutex_unlock(&LOCK_binlog_state);
1408 return res;
1409}
1410
1411
1412rpl_binlog_state::~rpl_binlog_state()
1413{
1414 free();
1415}
1416
1417
1418/*
1419 Update replication state with a new GTID.
1420
1421 If the (domain_id, server_id) pair already exists, then the new GTID replaces
1422 the old one for that domain id. Else a new entry is inserted.
1423
1424 Returns 0 for ok, 1 for error.
1425*/
1426int
1427rpl_binlog_state::update_nolock(const struct rpl_gtid *gtid, bool strict)
1428{
1429 element *elem;
1430
1431 if ((elem= (element *)my_hash_search(&hash,
1432 (const uchar *)(&gtid->domain_id), 0)))
1433 {
1434 if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
1435 {
1436 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
1437 gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
1438 elem->last_gtid->server_id, elem->last_gtid->seq_no);
1439 return 1;
1440 }
1441 if (elem->seq_no_counter < gtid->seq_no)
1442 elem->seq_no_counter= gtid->seq_no;
1443 if (!elem->update_element(gtid))
1444 return 0;
1445 }
1446 else if (!alloc_element_nolock(gtid))
1447 return 0;
1448
1449 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1450 return 1;
1451}
1452
1453
1454int
1455rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
1456{
1457 int res;
1458 mysql_mutex_lock(&LOCK_binlog_state);
1459 res= update_nolock(gtid, strict);
1460 mysql_mutex_unlock(&LOCK_binlog_state);
1461 return res;
1462}
1463
1464
1465/*
1466 Fill in a new GTID, allocating next sequence number, and update state
1467 accordingly.
1468*/
1469int
1470rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
1471 rpl_gtid *gtid)
1472{
1473 element *elem;
1474 int res= 0;
1475
1476 gtid->domain_id= domain_id;
1477 gtid->server_id= server_id;
1478
1479 mysql_mutex_lock(&LOCK_binlog_state);
1480 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1481 {
1482 gtid->seq_no= ++elem->seq_no_counter;
1483 if (!elem->update_element(gtid))
1484 goto end;
1485 }
1486 else
1487 {
1488 gtid->seq_no= 1;
1489 if (!alloc_element_nolock(gtid))
1490 goto end;
1491 }
1492
1493 my_error(ER_OUT_OF_RESOURCES, MYF(0));
1494 res= 1;
1495end:
1496 mysql_mutex_unlock(&LOCK_binlog_state);
1497 return res;
1498}
1499
1500
1501/* Helper functions for update. */
1502int
1503rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
1504{
1505 rpl_gtid *lookup_gtid;
1506
1507 /*
1508 By far the most common case is that successive events within same
1509 replication domain have the same server id (it changes only when
1510 switching to a new master). So save a hash lookup in this case.
1511 */
1512 if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
1513 {
1514 last_gtid->seq_no= gtid->seq_no;
1515 return 0;
1516 }
1517
1518 lookup_gtid= (rpl_gtid *)
1519 my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
1520 if (lookup_gtid)
1521 {
1522 lookup_gtid->seq_no= gtid->seq_no;
1523 last_gtid= lookup_gtid;
1524 return 0;
1525 }
1526
1527 /* Allocate a new GTID and insert it. */
1528 lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1529 if (!lookup_gtid)
1530 return 1;
1531 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1532 if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
1533 {
1534 my_free(lookup_gtid);
1535 return 1;
1536 }
1537 last_gtid= lookup_gtid;
1538 return 0;
1539}
1540
1541
1542int
1543rpl_binlog_state::alloc_element_nolock(const rpl_gtid *gtid)
1544{
1545 element *elem;
1546 rpl_gtid *lookup_gtid;
1547
1548 /* First time we see this domain_id; allocate a new element. */
1549 elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
1550 lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
1551 if (elem && lookup_gtid)
1552 {
1553 elem->domain_id= gtid->domain_id;
1554 my_hash_init(&elem->hash, &my_charset_bin, 32,
1555 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1556 HASH_UNIQUE);
1557 elem->last_gtid= lookup_gtid;
1558 elem->seq_no_counter= gtid->seq_no;
1559 memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
1560 if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
1561 {
1562 lookup_gtid= NULL; /* Do not free. */
1563 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1564 return 0;
1565 }
1566 my_hash_free(&elem->hash);
1567 }
1568
1569 /* An error. */
1570 if (elem)
1571 my_free(elem);
1572 if (lookup_gtid)
1573 my_free(lookup_gtid);
1574 return 1;
1575}
1576
1577
1578/*
1579 Check that a new GTID can be logged without creating an out-of-order
1580 sequence number with existing GTIDs.
1581*/
1582bool
1583rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
1584 uint64 seq_no)
1585{
1586 element *elem;
1587 bool res= 0;
1588
1589 mysql_mutex_lock(&LOCK_binlog_state);
1590 if ((elem= (element *)my_hash_search(&hash,
1591 (const uchar *)(&domain_id), 0)) &&
1592 elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
1593 {
1594 my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
1595 elem->last_gtid->domain_id, elem->last_gtid->server_id,
1596 elem->last_gtid->seq_no);
1597 res= 1;
1598 }
1599 mysql_mutex_unlock(&LOCK_binlog_state);
1600 return res;
1601}
1602
1603
1604/*
1605 When we see a new GTID that will not be binlogged (eg. slave thread
1606 with --log-slave-updates=0), then we need to remember to allocate any
1607 GTID seq_no of our own within that domain starting from there.
1608
1609 Returns 0 if ok, non-zero if out-of-memory.
1610*/
1611int
1612rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
1613{
1614 element *elem;
1615 int res;
1616
1617 mysql_mutex_lock(&LOCK_binlog_state);
1618 if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
1619 {
1620 if (elem->seq_no_counter < seq_no)
1621 elem->seq_no_counter= seq_no;
1622 res= 0;
1623 goto end;
1624 }
1625
1626 /* We need to allocate a new, empty element to remember the next seq_no. */
1627 if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
1628 {
1629 res= 1;
1630 goto end;
1631 }
1632
1633 elem->domain_id= domain_id;
1634 my_hash_init(&elem->hash, &my_charset_bin, 32,
1635 offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
1636 HASH_UNIQUE);
1637 elem->last_gtid= NULL;
1638 elem->seq_no_counter= seq_no;
1639 if (0 == my_hash_insert(&hash, (const uchar *)elem))
1640 {
1641 res= 0;
1642 goto end;
1643 }
1644
1645 my_hash_free(&elem->hash);
1646 my_free(elem);
1647 res= 1;
1648
1649end:
1650 mysql_mutex_unlock(&LOCK_binlog_state);
1651 return res;
1652}
1653
1654
1655/*
1656 Write binlog state to text file, so we can read it in again without having
1657 to scan last binlog file (normal shutdown/startup, not crash recovery).
1658
1659 The most recent GTID within each domain_id is written after any other GTID
1660 within this domain.
1661*/
1662int
1663rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
1664{
1665 ulong i, j;
1666 char buf[21];
1667 int res= 0;
1668
1669 mysql_mutex_lock(&LOCK_binlog_state);
1670 for (i= 0; i < hash.records; ++i)
1671 {
1672 element *e= (element *)my_hash_element(&hash, i);
1673 if (!e->last_gtid)
1674 {
1675 DBUG_ASSERT(e->hash.records == 0);
1676 continue;
1677 }
1678 for (j= 0; j <= e->hash.records; ++j)
1679 {
1680 const rpl_gtid *gtid;
1681 if (j < e->hash.records)
1682 {
1683 gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
1684 if (gtid == e->last_gtid)
1685 continue;
1686 }
1687 else
1688 gtid= e->last_gtid;
1689
1690 longlong10_to_str(gtid->seq_no, buf, 10);
1691 if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
1692 buf))
1693 {
1694 res= 1;
1695 goto end;
1696 }
1697 }
1698 }
1699
1700end:
1701 mysql_mutex_unlock(&LOCK_binlog_state);
1702 return res;
1703}
1704
1705
1706int
1707rpl_binlog_state::read_from_iocache(IO_CACHE *src)
1708{
1709 /* 10-digit - 10-digit - 20-digit \n \0 */
1710 char buf[10+1+10+1+20+1+1];
1711 const char *p, *end;
1712 rpl_gtid gtid;
1713 int res= 0;
1714
1715 mysql_mutex_lock(&LOCK_binlog_state);
1716 reset_nolock();
1717 for (;;)
1718 {
1719 size_t len= my_b_gets(src, buf, sizeof(buf));
1720 if (!len)
1721 break;
1722 p= buf;
1723 end= buf + len;
1724 if (gtid_parser_helper(&p, end, &gtid) ||
1725 update_nolock(&gtid, false))
1726 {
1727 res= 1;
1728 break;
1729 }
1730 }
1731 mysql_mutex_unlock(&LOCK_binlog_state);
1732 return res;
1733}
1734
1735
1736rpl_gtid *
1737rpl_binlog_state::find_nolock(uint32 domain_id, uint32 server_id)
1738{
1739 element *elem;
1740 if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
1741 return NULL;
1742 return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
1743}
1744
1745rpl_gtid *
1746rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
1747{
1748 rpl_gtid *p;
1749 mysql_mutex_lock(&LOCK_binlog_state);
1750 p= find_nolock(domain_id, server_id);
1751 mysql_mutex_unlock(&LOCK_binlog_state);
1752 return p;
1753}
1754
1755rpl_gtid *
1756rpl_binlog_state::find_most_recent(uint32 domain_id)
1757{
1758 element *elem;
1759 rpl_gtid *gtid= NULL;
1760
1761 mysql_mutex_lock(&LOCK_binlog_state);
1762 elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
1763 if (elem && elem->last_gtid)
1764 gtid= elem->last_gtid;
1765 mysql_mutex_unlock(&LOCK_binlog_state);
1766
1767 return gtid;
1768}
1769
1770
1771uint32
1772rpl_binlog_state::count()
1773{
1774 uint32 c= 0;
1775 uint32 i;
1776
1777 mysql_mutex_lock(&LOCK_binlog_state);
1778 for (i= 0; i < hash.records; ++i)
1779 c+= ((element *)my_hash_element(&hash, i))->hash.records;
1780 mysql_mutex_unlock(&LOCK_binlog_state);
1781
1782 return c;
1783}
1784
1785
1786int
1787rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
1788{
1789 uint32 i, j, pos;
1790 int res= 0;
1791
1792 mysql_mutex_lock(&LOCK_binlog_state);
1793 pos= 0;
1794 for (i= 0; i < hash.records; ++i)
1795 {
1796 element *e= (element *)my_hash_element(&hash, i);
1797 if (!e->last_gtid)
1798 {
1799 DBUG_ASSERT(e->hash.records==0);
1800 continue;
1801 }
1802 for (j= 0; j <= e->hash.records; ++j)
1803 {
1804 const rpl_gtid *gtid;
1805 if (j < e->hash.records)
1806 {
1807 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1808 if (gtid == e->last_gtid)
1809 continue;
1810 }
1811 else
1812 gtid= e->last_gtid;
1813
1814 if (pos >= list_size)
1815 {
1816 res= 1;
1817 goto end;
1818 }
1819 memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
1820 }
1821 }
1822
1823end:
1824 mysql_mutex_unlock(&LOCK_binlog_state);
1825 return res;
1826}
1827
1828
1829/*
1830 Get a list of the most recently binlogged GTID, for each domain_id.
1831
1832 This can be used when switching from being a master to being a slave,
1833 to know where to start replicating from the new master.
1834
1835 The returned list must be de-allocated with my_free().
1836
1837 Returns 0 for ok, non-zero for out-of-memory.
1838*/
1839int
1840rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
1841{
1842 uint32 i;
1843 uint32 alloc_size, out_size;
1844 int res= 0;
1845
1846 out_size= 0;
1847 mysql_mutex_lock(&LOCK_binlog_state);
1848 alloc_size= hash.records;
1849 if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
1850 MYF(MY_WME))))
1851 {
1852 res= 1;
1853 goto end;
1854 }
1855 for (i= 0; i < alloc_size; ++i)
1856 {
1857 element *e= (element *)my_hash_element(&hash, i);
1858 if (!e->last_gtid)
1859 continue;
1860 memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
1861 }
1862
1863end:
1864 mysql_mutex_unlock(&LOCK_binlog_state);
1865 *size= out_size;
1866 return res;
1867}
1868
1869bool
1870rpl_binlog_state::append_pos(String *str)
1871{
1872 uint32 i;
1873
1874 mysql_mutex_lock(&LOCK_binlog_state);
1875 reset_dynamic(&gtid_sort_array);
1876
1877 for (i= 0; i < hash.records; ++i)
1878 {
1879 element *e= (element *)my_hash_element(&hash, i);
1880 if (e->last_gtid &&
1881 insert_dynamic(&gtid_sort_array, (const void *) e->last_gtid))
1882 {
1883 mysql_mutex_unlock(&LOCK_binlog_state);
1884 return true;
1885 }
1886 }
1887 rpl_slave_state_tostring_helper(&gtid_sort_array, str);
1888 mysql_mutex_unlock(&LOCK_binlog_state);
1889
1890 return false;
1891}
1892
1893
1894bool
1895rpl_binlog_state::append_state(String *str)
1896{
1897 uint32 i, j;
1898 bool res= false;
1899
1900 mysql_mutex_lock(&LOCK_binlog_state);
1901 reset_dynamic(&gtid_sort_array);
1902
1903 for (i= 0; i < hash.records; ++i)
1904 {
1905 element *e= (element *)my_hash_element(&hash, i);
1906 if (!e->last_gtid)
1907 {
1908 DBUG_ASSERT(e->hash.records==0);
1909 continue;
1910 }
1911 for (j= 0; j <= e->hash.records; ++j)
1912 {
1913 const rpl_gtid *gtid;
1914 if (j < e->hash.records)
1915 {
1916 gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
1917 if (gtid == e->last_gtid)
1918 continue;
1919 }
1920 else
1921 gtid= e->last_gtid;
1922
1923 if (insert_dynamic(&gtid_sort_array, (const void *) gtid))
1924 {
1925 res= true;
1926 goto end;
1927 }
1928 }
1929 }
1930
1931 rpl_slave_state_tostring_helper(&gtid_sort_array, str);
1932
1933end:
1934 mysql_mutex_unlock(&LOCK_binlog_state);
1935 return res;
1936}
1937
1938/**
1939 Remove domains supplied by the first argument from binlog state.
1940 Removal is done for any domain whose last gtids (from all its servers) match
1941 ones in Gtid list event of the 2nd argument.
1942
1943 @param ids gtid domain id sequence, may contain dups
1944 @param glev pointer to Gtid list event describing
1945 the match condition
1946 @param errbuf [out] pointer to possible error message array
1947
1948 @retval NULL as success when at least one domain is removed
1949 @retval "" empty string to indicate ineffective call
1950 when no domains removed
1951 @retval NOT EMPTY string otherwise an error message
1952*/
1953const char*
1954rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
1955 Gtid_list_log_event *glev,
1956 char* errbuf)
1957{
1958 DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
1959 rpl_binlog_state::element* domain_unique_buffer[16];
1960 ulong k, l;
1961 const char* errmsg= NULL;
1962
1963 DBUG_ENTER("rpl_binlog_state::drop_domain");
1964
1965 my_init_dynamic_array2(&domain_unique,
1966 sizeof(element*), domain_unique_buffer,
1967 sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
1968
1969 mysql_mutex_lock(&LOCK_binlog_state);
1970
1971 /*
1972 Gtid list is supposed to come from a binlog's Gtid_list event and
1973 therefore should be a subset of the current binlog state. That is
1974 for every domain in the list the binlog state contains a gtid with
1975 sequence number not less than that of the list.
1976 Exceptions of this inclusion rule are:
1977 A. the list may still refer to gtids from already deleted domains.
1978 Files containing them must have been purged whereas the file
1979 with the list is not yet.
1980 B. out of order groups were injected
1981 C. manually build list of binlog files violating the inclusion
1982 constraint.
1983 While A is a normal case (not necessarily distinguishable from C though),
1984 B and C may require the user's attention so any (incl the A's suspected)
1985 inconsistency is diagnosed and *warned*.
1986 */
1987 for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
1988 {
1989 rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
1990 glev->list[l].server_id);
1991 if (!rb_state_gtid)
1992 sprintf(errbuf,
1993 "missing gtids from the '%u-%u' domain-server pair which is "
1994 "referred to in the gtid list describing an earlier state. Ignore "
1995 "if the domain ('%u') was already explicitly deleted",
1996 glev->list[l].domain_id, glev->list[l].server_id,
1997 glev->list[l].domain_id);
1998 else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
1999 sprintf(errbuf,
2000 "having a gtid '%u-%u-%llu' which is less than "
2001 "the '%u-%u-%llu' of the gtid list describing an earlier state. "
2002 "The state may have been affected by manually injecting "
2003 "a lower sequence number gtid or via replication",
2004 rb_state_gtid->domain_id, rb_state_gtid->server_id,
2005 rb_state_gtid->seq_no, glev->list[l].domain_id,
2006 glev->list[l].server_id, glev->list[l].seq_no);
2007 if (strlen(errbuf)) // use strlen() as cheap flag
2008 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2009 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2010 "The current gtid binlog state is incompatible with "
2011 "a former one %s.", errbuf);
2012 }
2013
2014 /*
2015 For each domain_id from ids
2016 when no such domain in binlog state
2017 warn && continue
2018 For each domain.server's last gtid
2019 when not locate the last gtid in glev.list
2020 error out binlog state can't change
2021 otherwise continue
2022 */
2023 for (ulong i= 0; i < ids->elements; i++)
2024 {
2025 rpl_binlog_state::element *elem= NULL;
2026 ulong *ptr_domain_id;
2027 bool not_match;
2028
2029 ptr_domain_id= (ulong*) dynamic_array_ptr(ids, i);
2030 elem= (rpl_binlog_state::element *)
2031 my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
2032 if (!elem)
2033 {
2034 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
2035 ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
2036 "The gtid domain being deleted ('%lu') is not in "
2037 "the current binlog state", *ptr_domain_id);
2038 continue;
2039 }
2040
2041 for (not_match= true, k= 0; k < elem->hash.records; k++)
2042 {
2043 rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
2044 for (ulong l= 0; l < glev->count && not_match; l++)
2045 not_match= !(*d_gtid == glev->list[l]);
2046 }
2047
2048 if (not_match)
2049 {
2050 sprintf(errbuf, "binlog files may contain gtids from the domain ('%lu') "
2051 "being deleted. Make sure to first purge those files",
2052 *ptr_domain_id);
2053 errmsg= errbuf;
2054 goto end;
2055 }
2056 // compose a sequence of unique pointers to domain object
2057 for (k= 0; k < domain_unique.elements; k++)
2058 {
2059 if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
2060 == elem)
2061 break; // domain_id's elem has been already in
2062 }
2063 if (k == domain_unique.elements) // proven not to have duplicates
2064 insert_dynamic(&domain_unique, (uchar*) &elem);
2065 }
2066
2067 // Domain removal from binlog state
2068 for (k= 0; k < domain_unique.elements; k++)
2069 {
2070 rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
2071 dynamic_array_ptr(&domain_unique, k);
2072 my_hash_free(&elem->hash);
2073 my_hash_delete(&hash, (uchar*) elem);
2074 }
2075
2076 DBUG_ASSERT(strlen(errbuf) == 0);
2077
2078 if (domain_unique.elements == 0)
2079 errmsg= "";
2080
2081end:
2082 mysql_mutex_unlock(&LOCK_binlog_state);
2083 delete_dynamic(&domain_unique);
2084
2085 DBUG_RETURN(errmsg);
2086}
2087
2088slave_connection_state::slave_connection_state()
2089{
2090 my_hash_init(&hash, &my_charset_bin, 32,
2091 offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
2092 sizeof(uint32), NULL, my_free, HASH_UNIQUE);
2093 my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
2094}
2095
2096
2097slave_connection_state::~slave_connection_state()
2098{
2099 my_hash_free(&hash);
2100 delete_dynamic(&gtid_sort_array);
2101}
2102
2103
2104/*
2105 Create a hash from the slave GTID state that is sent to master when slave
2106 connects to start replication.
2107
2108 The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
2109
2110 0-2-112,1-4-1022
2111
2112 The state gives for each domain_id the GTID to start replication from for
2113 the corresponding replication stream. So domain_id must be unique.
2114
2115 Returns 0 if ok, non-zero if error due to malformed input.
2116
2117 Note that input string is built by slave server, so it will not be incorrect
2118 unless bug/corruption/malicious server. So we just need basic sanity check,
2119 not fancy user-friendly error message.
2120*/
2121
2122int
2123slave_connection_state::load(const char *slave_request, size_t len)
2124{
2125 const char *p, *end;
2126 uchar *rec;
2127 rpl_gtid *gtid;
2128 const entry *e;
2129
2130 reset();
2131 p= slave_request;
2132 end= slave_request + len;
2133 if (p == end)
2134 return 0;
2135 for (;;)
2136 {
2137 if (!(rec= (uchar *)my_malloc(sizeof(entry), MYF(MY_WME))))
2138 return 1;
2139 gtid= &((entry *)rec)->gtid;
2140 if (gtid_parser_helper(&p, end, gtid))
2141 {
2142 my_free(rec);
2143 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2144 return 1;
2145 }
2146 if ((e= (const entry *)
2147 my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
2148 {
2149 my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
2150 gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
2151 e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
2152 my_free(rec);
2153 return 1;
2154 }
2155 ((entry *)rec)->flags= 0;
2156 if (my_hash_insert(&hash, rec))
2157 {
2158 my_free(rec);
2159 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2160 return 1;
2161 }
2162 if (p == end)
2163 break; /* Finished. */
2164 if (*p != ',')
2165 {
2166 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2167 return 1;
2168 }
2169 ++p;
2170 }
2171
2172 return 0;
2173}
2174
2175
2176int
2177slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
2178{
2179 uint32 i;
2180
2181 reset();
2182 for (i= 0; i < count; ++i)
2183 if (update(&gtid_list[i]))
2184 return 1;
2185 return 0;
2186}
2187
2188
2189static int
2190slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
2191{
2192 slave_connection_state *state= (slave_connection_state *)data;
2193 return state->update(gtid);
2194}
2195
2196
2197/*
2198 Same as rpl_slave_state::tostring(), but populates a slave_connection_state
2199 instead.
2200*/
2201int
2202slave_connection_state::load(rpl_slave_state *state,
2203 rpl_gtid *extra_gtids, uint32 num_extra)
2204{
2205 reset();
2206 return state->iterate(slave_connection_state_load_cb, this,
2207 extra_gtids, num_extra, false);
2208}
2209
2210
2211slave_connection_state::entry *
2212slave_connection_state::find_entry(uint32 domain_id)
2213{
2214 return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
2215}
2216
2217
2218rpl_gtid *
2219slave_connection_state::find(uint32 domain_id)
2220{
2221 entry *e= find_entry(domain_id);
2222 if (!e)
2223 return NULL;
2224 return &e->gtid;
2225}
2226
2227
2228int
2229slave_connection_state::update(const rpl_gtid *in_gtid)
2230{
2231 entry *e;
2232 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2233 if (rec)
2234 {
2235 e= (entry *)rec;
2236 e->gtid= *in_gtid;
2237 return 0;
2238 }
2239
2240 if (!(e= (entry *)my_malloc(sizeof(*e), MYF(MY_WME))))
2241 return 1;
2242 e->gtid= *in_gtid;
2243 e->flags= 0;
2244 if (my_hash_insert(&hash, (uchar *)e))
2245 {
2246 my_free(e);
2247 return 1;
2248 }
2249
2250 return 0;
2251}
2252
2253
2254void
2255slave_connection_state::remove(const rpl_gtid *in_gtid)
2256{
2257 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2258#ifdef DBUG_ASSERT_EXISTS
2259 bool err;
2260 rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
2261 DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
2262 DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
2263 DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
2264 err=
2265#endif
2266 my_hash_delete(&hash, rec);
2267 DBUG_ASSERT(!err);
2268}
2269
2270
2271void
2272slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
2273{
2274 uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
2275 if (rec)
2276 my_hash_delete(&hash, rec);
2277}
2278
2279
2280int
2281slave_connection_state::to_string(String *out_str)
2282{
2283 out_str->length(0);
2284 return append_to_string(out_str);
2285}
2286
2287
2288int
2289slave_connection_state::append_to_string(String *out_str)
2290{
2291 uint32 i;
2292 bool first;
2293
2294 first= true;
2295 for (i= 0; i < hash.records; ++i)
2296 {
2297 const entry *e= (const entry *)my_hash_element(&hash, i);
2298 if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
2299 return 1;
2300 }
2301 return 0;
2302}
2303
2304
2305int
2306slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
2307{
2308 uint32 i, pos;
2309
2310 pos= 0;
2311 for (i= 0; i < hash.records; ++i)
2312 {
2313 entry *e;
2314 if (pos >= list_size)
2315 return 1;
2316 e= (entry *)my_hash_element(&hash, i);
2317 memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
2318 }
2319
2320 return 0;
2321}
2322
2323
2324/*
2325 Check if the GTID position has been reached, for mysql_binlog_send().
2326
2327 The position has not been reached if we have anything in the state, unless
2328 it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
2329 belong to this master at all), or the START_OWN_SLAVE_POS (which means that
2330 we start on an old position from when the server was a slave with
2331 --log-slave-updates=0).
2332*/
2333bool
2334slave_connection_state::is_pos_reached()
2335{
2336 uint32 i;
2337
2338 for (i= 0; i < hash.records; ++i)
2339 {
2340 entry *e= (entry *)my_hash_element(&hash, i);
2341 if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
2342 return false;
2343 }
2344
2345 return true;
2346}
2347
2348
2349/*
2350 Execute a MASTER_GTID_WAIT().
2351 The position to wait for is in gtid_str in string form.
2352 The timeout in microseconds is in timeout_us, zero means no timeout.
2353
2354 Returns:
2355 1 for error.
2356 0 for wait completed.
2357 -1 for wait timed out.
2358*/
2359int
2360gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
2361{
2362 int err;
2363 rpl_gtid *wait_pos;
2364 uint32 count, i;
2365 struct timespec wait_until, *wait_until_ptr;
2366 ulonglong before;
2367
2368 /* Wait for the empty position returns immediately. */
2369 if (gtid_str->length() == 0)
2370 {
2371 status_var_increment(thd->status_var.master_gtid_wait_count);
2372 return 0;
2373 }
2374
2375 if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
2376 &count)))
2377 {
2378 my_error(ER_INCORRECT_GTID_STATE, MYF(0));
2379 return 1;
2380 }
2381 status_var_increment(thd->status_var.master_gtid_wait_count);
2382 before= microsecond_interval_timer();
2383
2384 if (timeout_us >= 0)
2385 {
2386 set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
2387 wait_until_ptr= &wait_until;
2388 }
2389 else
2390 wait_until_ptr= NULL;
2391 err= 0;
2392 for (i= 0; i < count; ++i)
2393 {
2394 if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
2395 break;
2396 }
2397 switch (err)
2398 {
2399 case -1:
2400 status_var_increment(thd->status_var.master_gtid_wait_timeouts);
2401 /* fall through */
2402 case 0:
2403 status_var_add(thd->status_var.master_gtid_wait_time,
2404 microsecond_interval_timer() - before);
2405 }
2406 my_free(wait_pos);
2407 return err;
2408}
2409
2410
2411void
2412gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
2413{
2414 queue_element *qe;
2415
2416 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2417 if (queue_empty(&he->queue))
2418 return;
2419 qe= (queue_element *)queue_top(&he->queue);
2420 qe->do_small_wait= true;
2421 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2422}
2423
2424void
2425gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
2426 gtid_waiting::hash_element *he)
2427{
2428 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2429
2430 for (;;)
2431 {
2432 queue_element *qe;
2433
2434 if (queue_empty(&he->queue))
2435 break;
2436 qe= (queue_element *)queue_top(&he->queue);
2437 if (qe->wait_seq_no > wakeup_seq_no)
2438 break;
2439 DBUG_ASSERT(!qe->done);
2440 queue_remove_top(&he->queue);
2441 qe->done= true;;
2442 mysql_cond_signal(&qe->thd->COND_wakeup_ready);
2443 }
2444}
2445
2446
2447/*
2448 Execute a MASTER_GTID_WAIT() for one specific domain.
2449
2450 The implementation is optimised primarily for (1) minimal performance impact
2451 on the slave replication threads, and secondarily for (2) quick performance
2452 of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
2453 read to clients in an async replication read-scaleout scenario.
2454
2455 To achieve (1), we have a "small" wait and a "large" wait. The small wait
2456 contends with the replication threads on the lock on the gtid_slave_pos, so
2457 only minimal processing is done under that lock, and only a single waiter at
2458 a time does the small wait.
2459
2460 If there is already a small waiter, a new thread will either replace the
2461 small waiter (if it needs to wait for an earlier sequence number), or
2462 instead do a "large" wait.
2463
2464 Once awoken on the small wait, the waiting thread releases the lock shared
2465 with the SQL threads quickly, and then processes all waiters currently doing
2466 the large wait using a different lock that does not impact replication.
2467
2468 This way, the SQL threads only need to do a single check + possibly a
2469 pthread_cond_signal() when updating the gtid_slave_state, and the time that
2470 non-SQL threads contend for the lock on gtid_slave_state is minimized.
2471
2472 There is always at least one thread that has the responsibility to ensure
2473 that there is a small waiter; this thread has queue_element::do_small_wait
2474 set to true. This thread will do the small wait until it is done, at which
2475 point it will make sure to pass on the responsibility to another thread.
2476 Normally only one thread has do_small_wait==true, but it can occasionally
2477 happen that there is more than one, when threads race one another for the
2478 lock on the small wait (this results in slightly increased activity on the
2479 small lock but is otherwise harmless).
2480
2481 Returns:
2482 0 Wait completed normally
2483 -1 Wait completed due to timeout
2484 1 An error (my_error() will have been called to set the error in the da)
2485*/
2486int
2487gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
2488 struct timespec *wait_until)
2489{
2490 bool timed_out= false;
2491#ifdef HAVE_REPLICATION
2492 queue_element elem;
2493 uint32 domain_id= wait_gtid->domain_id;
2494 uint64 seq_no= wait_gtid->seq_no;
2495 hash_element *he;
2496 rpl_slave_state::element *slave_state_elem= NULL;
2497 PSI_stage_info old_stage;
2498 bool did_enter_cond= false;
2499
2500 elem.wait_seq_no= seq_no;
2501 elem.thd= thd;
2502 elem.done= false;
2503
2504 mysql_mutex_lock(&LOCK_gtid_waiting);
2505 if (!(he= get_entry(wait_gtid->domain_id)))
2506 {
2507 mysql_mutex_unlock(&LOCK_gtid_waiting);
2508 return 1;
2509 }
2510 /*
2511 If there is already another waiter with seq_no no larger than our own,
2512 we are sure that there is already a small waiter that will wake us up
2513 (or later pass the small wait responsibility to us). So in this case, we
2514 do not need to touch the small wait lock at all.
2515 */
2516 elem.do_small_wait=
2517 (queue_empty(&he->queue) ||
2518 ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
2519
2520 if (register_in_wait_queue(thd, wait_gtid, he, &elem))
2521 {
2522 mysql_mutex_unlock(&LOCK_gtid_waiting);
2523 return 1;
2524 }
2525 /*
2526 Loop, doing either the small or large wait as appropriate, until either
2527 the position waited for is reached, or we get a kill or timeout.
2528 */
2529 for (;;)
2530 {
2531 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2532
2533 if (elem.do_small_wait)
2534 {
2535 uint64 wakeup_seq_no;
2536 queue_element *cur_waiter;
2537
2538 mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2539 /*
2540 The elements in the gtid_slave_state_hash are never re-allocated once
2541 they enter the hash, so we do not need to re-do the lookup after releasing
2542 and re-aquiring the lock.
2543 */
2544 if (!slave_state_elem &&
2545 !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
2546 {
2547 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2548 remove_from_wait_queue(he, &elem);
2549 promote_new_waiter(he);
2550 if (did_enter_cond)
2551 thd->EXIT_COND(&old_stage);
2552 else
2553 mysql_mutex_unlock(&LOCK_gtid_waiting);
2554 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2555 return 1;
2556 }
2557
2558 if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
2559 {
2560 /*
2561 We do not have to wait. (We will be removed from the wait queue when
2562 we call process_wait_hash() below.
2563 */
2564 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2565 }
2566 else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
2567 slave_state_elem->min_wait_seq_no <= seq_no)
2568 {
2569 /*
2570 There is already a suitable small waiter, go do the large wait.
2571 (Normally we would not have needed to check the small wait in this
2572 case, but it can happen if we race with another thread for the small
2573 lock).
2574 */
2575 elem.do_small_wait= false;
2576 mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
2577 }
2578 else
2579 {
2580 /*
2581 We have to do the small wait ourselves (stealing it from any thread
2582 that might already be waiting for a later seq_no).
2583 */
2584 slave_state_elem->gtid_waiter= &elem;
2585 slave_state_elem->min_wait_seq_no= seq_no;
2586 if (cur_waiter)
2587 {
2588 /* We stole the wait, so wake up the old waiting thread. */
2589 mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
2590 }
2591
2592 /* Release the large lock, and do the small wait. */
2593 if (did_enter_cond)
2594 {
2595 thd->EXIT_COND(&old_stage);
2596 did_enter_cond= false;
2597 }
2598 else
2599 mysql_mutex_unlock(&LOCK_gtid_waiting);
2600 thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
2601 &rpl_global_gtid_slave_state->LOCK_slave_state,
2602 &stage_master_gtid_wait_primary, &old_stage);
2603 do
2604 {
2605 if (unlikely(thd->check_killed()))
2606 break;
2607 else if (wait_until)
2608 {
2609 int err=
2610 mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
2611 &rpl_global_gtid_slave_state->LOCK_slave_state,
2612 wait_until);
2613 if (err == ETIMEDOUT || err == ETIME)
2614 {
2615 timed_out= true;
2616 break;
2617 }
2618 }
2619 else
2620 mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
2621 &rpl_global_gtid_slave_state->LOCK_slave_state);
2622 } while (slave_state_elem->gtid_waiter == &elem);
2623 wakeup_seq_no= slave_state_elem->highest_seq_no;
2624 /*
2625 If we aborted due to timeout or kill, remove us as waiter.
2626
2627 If we were replaced by another waiter with a smaller seq_no, then we
2628 no longer have responsibility for the small wait.
2629 */
2630 if ((cur_waiter= slave_state_elem->gtid_waiter))
2631 {
2632 if (cur_waiter == &elem)
2633 slave_state_elem->gtid_waiter= NULL;
2634 else if (slave_state_elem->min_wait_seq_no <= seq_no)
2635 elem.do_small_wait= false;
2636 }
2637 thd->EXIT_COND(&old_stage);
2638
2639 mysql_mutex_lock(&LOCK_gtid_waiting);
2640 }
2641
2642 /*
2643 Note that hash_entry pointers do not change once allocated, so we do
2644 not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
2645 */
2646 process_wait_hash(wakeup_seq_no, he);
2647 }
2648 else
2649 {
2650 /* Do the large wait. */
2651 if (!did_enter_cond)
2652 {
2653 thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
2654 &stage_master_gtid_wait, &old_stage);
2655 did_enter_cond= true;
2656 }
2657 while (!elem.done && likely(!thd->check_killed()))
2658 {
2659 thd_wait_begin(thd, THD_WAIT_BINLOG);
2660 if (wait_until)
2661 {
2662 int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
2663 &LOCK_gtid_waiting, wait_until);
2664 if (err == ETIMEDOUT || err == ETIME)
2665 timed_out= true;
2666 }
2667 else
2668 mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
2669 thd_wait_end(thd);
2670 if (elem.do_small_wait || timed_out)
2671 break;
2672 }
2673 }
2674
2675 if ((thd->killed || timed_out) && !elem.done)
2676 {
2677 /* Aborted, so remove ourselves from the hash. */
2678 remove_from_wait_queue(he, &elem);
2679 elem.done= true;
2680 }
2681 if (elem.done)
2682 {
2683 /*
2684 If our wait is done, but we have (or were passed) responsibility for
2685 the small wait, then we need to pass on that task to someone else.
2686 */
2687 if (elem.do_small_wait)
2688 promote_new_waiter(he);
2689 break;
2690 }
2691 }
2692
2693 if (did_enter_cond)
2694 thd->EXIT_COND(&old_stage);
2695 else
2696 mysql_mutex_unlock(&LOCK_gtid_waiting);
2697 if (thd->killed)
2698 thd->send_kill_message();
2699#endif /* HAVE_REPLICATION */
2700 return timed_out ? -1 : 0;
2701}
2702
2703
2704static void
2705free_hash_element(void *p)
2706{
2707 gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
2708 delete_queue(&e->queue);
2709 my_free(e);
2710}
2711
2712
2713void
2714gtid_waiting::init()
2715{
2716 my_hash_init(&hash, &my_charset_bin, 32,
2717 offsetof(hash_element, domain_id), sizeof(uint32), NULL,
2718 free_hash_element, HASH_UNIQUE);
2719 mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
2720}
2721
2722
2723void
2724gtid_waiting::destroy()
2725{
2726 mysql_mutex_destroy(&LOCK_gtid_waiting);
2727 my_hash_free(&hash);
2728}
2729
2730
2731static int
2732cmp_queue_elem(void *, uchar *a, uchar *b)
2733{
2734 uint64 seq_no_a= *(uint64 *)a;
2735 uint64 seq_no_b= *(uint64 *)b;
2736 if (seq_no_a < seq_no_b)
2737 return -1;
2738 else if (seq_no_a == seq_no_b)
2739 return 0;
2740 else
2741 return 1;
2742}
2743
2744
2745gtid_waiting::hash_element *
2746gtid_waiting::get_entry(uint32 domain_id)
2747{
2748 hash_element *e;
2749
2750 if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
2751 return e;
2752
2753 if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
2754 return NULL;
2755
2756 if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
2757 cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
2758 {
2759 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2760 my_free(e);
2761 return NULL;
2762 }
2763 e->domain_id= domain_id;
2764 if (my_hash_insert(&hash, (uchar *)e))
2765 {
2766 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2767 delete_queue(&e->queue);
2768 my_free(e);
2769 return NULL;
2770 }
2771 return e;
2772}
2773
2774
2775int
2776gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
2777 gtid_waiting::hash_element *he,
2778 gtid_waiting::queue_element *elem)
2779{
2780 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2781
2782 if (queue_insert_safe(&he->queue, (uchar *)elem))
2783 {
2784 my_error(ER_OUT_OF_RESOURCES, MYF(0));
2785 return 1;
2786 }
2787
2788 return 0;
2789}
2790
2791
2792void
2793gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
2794 gtid_waiting::queue_element *elem)
2795{
2796 mysql_mutex_assert_owner(&LOCK_gtid_waiting);
2797
2798 queue_remove(&he->queue, elem->queue_idx);
2799}
2800