1/* Copyright (c) 2006, 2011, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16#include "mariadb.h"
17#include "sql_priv.h"
18#include "rpl_injector.h"
19#include "transaction.h"
20#include "sql_parse.h" // begin_trans, end_trans, COMMIT
21#include "sql_base.h" // close_thread_tables
22#include "log_event.h" // Incident_log_event
23
24/*
25 injector::transaction - member definitions
26*/
27
28/* inline since it's called below */
29inline
30injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
31 : m_state(START_STATE), m_thd(thd)
32{
33 /*
34 Default initialization of m_start_pos (which initializes it to garbage).
35 We need to fill it in using the code below.
36 */
37 LOG_INFO log_info;
38 log->get_current_log(&log_info);
39 /* !!! binlog_pos does not follow RAII !!! */
40 m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
41 m_start_pos.m_file_pos= log_info.pos;
42
43 m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */
44 trans_begin(m_thd);
45}
46
47injector::transaction::~transaction()
48{
49 if (!good())
50 return;
51
52 /* Needed since my_free expects a 'char*' (instead of 'void*'). */
53 char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
54
55 /*
56 We set the first character to null just to give all the copies of the
57 start position a (minimal) chance of seening that the memory is lost.
58 All assuming the my_free does not step over the memory, of course.
59 */
60 *the_memory= '\0';
61
62 my_free(the_memory);
63}
64
65/**
66 @retval 0 transaction committed
67 @retval 1 transaction rolled back
68 */
69int injector::transaction::commit()
70{
71 DBUG_ENTER("injector::transaction::commit()");
72 int error= m_thd->binlog_flush_pending_rows_event(true);
73 /*
74 Cluster replication does not preserve statement or
75 transaction boundaries of the master. Instead, a new
76 transaction on replication slave is started when a new GCI
77 (global checkpoint identifier) is issued, and is committed
78 when the last event of the check point has been received and
79 processed. This ensures consistency of each cluster in
80 cluster replication, and there is no requirement for stronger
81 consistency: MySQL replication is asynchronous with other
82 engines as well.
83
84 A practical consequence of that is that row level replication
85 stream passed through the injector thread never contains
86 COMMIT events.
87 Here we should preserve the server invariant that there is no
88 outstanding statement transaction when the normal transaction
89 is committed by committing the statement transaction
90 explicitly.
91 */
92 trans_commit_stmt(m_thd);
93 if (!trans_commit(m_thd))
94 {
95 close_thread_tables(m_thd);
96 m_thd->mdl_context.release_transactional_locks();
97 }
98 DBUG_RETURN(error);
99}
100
101
102int injector::transaction::use_table(server_id_type sid, table tbl)
103{
104 DBUG_ENTER("injector::transaction::use_table");
105
106 int error;
107
108 if (unlikely((error= check_state(TABLE_STATE))))
109 DBUG_RETURN(error);
110
111 server_id_type save_id= m_thd->variables.server_id;
112 m_thd->set_server_id(sid);
113 error= m_thd->binlog_write_table_map(tbl.get_table(),
114 tbl.is_transactional());
115 m_thd->set_server_id(save_id);
116 DBUG_RETURN(error);
117}
118
119
120
121injector::transaction::binlog_pos injector::transaction::start_pos() const
122{
123 return m_start_pos;
124}
125
126
127/*
128 injector - member definitions
129*/
130
131/* This constructor is called below */
132inline injector::injector()
133{
134}
135
136static injector *s_injector= 0;
137injector *injector::instance()
138{
139 if (s_injector == 0)
140 s_injector= new injector;
141 /* "There can be only one [instance]" */
142 return s_injector;
143}
144
145void injector::free_instance()
146{
147 injector *inj = s_injector;
148
149 if (inj != 0)
150 {
151 s_injector= 0;
152 delete inj;
153 }
154}
155
156
157injector::transaction injector::new_trans(THD *thd)
158{
159 DBUG_ENTER("injector::new_trans(THD*)");
160 /*
161 Currently, there is no alternative to using 'mysql_bin_log' since that
162 is hardcoded into the way the handler is using the binary log.
163 */
164 DBUG_RETURN(transaction(&mysql_bin_log, thd));
165}
166
167void injector::new_trans(THD *thd, injector::transaction *ptr)
168{
169 DBUG_ENTER("injector::new_trans(THD *, transaction *)");
170 /*
171 Currently, there is no alternative to using 'mysql_bin_log' since that
172 is hardcoded into the way the handler is using the binary log.
173 */
174 transaction trans(&mysql_bin_log, thd);
175 ptr->swap(trans);
176
177 DBUG_VOID_RETURN;
178}
179
180int injector::record_incident(THD *thd, Incident incident)
181{
182 Incident_log_event ev(thd, incident);
183 int error;
184 if (unlikely((error= mysql_bin_log.write(&ev))))
185 return error;
186 return mysql_bin_log.rotate_and_purge(true);
187}
188
189int injector::record_incident(THD *thd, Incident incident,
190 const LEX_CSTRING *message)
191{
192 Incident_log_event ev(thd, incident, message);
193 int error;
194 if (unlikely((error= mysql_bin_log.write(&ev))))
195 return error;
196 return mysql_bin_log.rotate_and_purge(true);
197}
198