1 | /* Copyright (c) 2006, 2011, Oracle and/or its affiliates. |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
15 | |
16 | #include "mariadb.h" |
17 | #include "sql_priv.h" |
18 | #include "rpl_injector.h" |
19 | #include "transaction.h" |
20 | #include "sql_parse.h" // begin_trans, end_trans, COMMIT |
21 | #include "sql_base.h" // close_thread_tables |
22 | #include "log_event.h" // Incident_log_event |
23 | |
24 | /* |
25 | injector::transaction - member definitions |
26 | */ |
27 | |
28 | /* inline since it's called below */ |
29 | inline |
30 | injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd) |
31 | : m_state(START_STATE), m_thd(thd) |
32 | { |
33 | /* |
34 | Default initialization of m_start_pos (which initializes it to garbage). |
35 | We need to fill it in using the code below. |
36 | */ |
37 | LOG_INFO log_info; |
38 | log->get_current_log(&log_info); |
39 | /* !!! binlog_pos does not follow RAII !!! */ |
40 | m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0)); |
41 | m_start_pos.m_file_pos= log_info.pos; |
42 | |
43 | m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */ |
44 | trans_begin(m_thd); |
45 | } |
46 | |
47 | injector::transaction::~transaction() |
48 | { |
49 | if (!good()) |
50 | return; |
51 | |
52 | /* Needed since my_free expects a 'char*' (instead of 'void*'). */ |
53 | char* const the_memory= const_cast<char*>(m_start_pos.m_file_name); |
54 | |
55 | /* |
56 | We set the first character to null just to give all the copies of the |
57 | start position a (minimal) chance of seening that the memory is lost. |
58 | All assuming the my_free does not step over the memory, of course. |
59 | */ |
60 | *the_memory= '\0'; |
61 | |
62 | my_free(the_memory); |
63 | } |
64 | |
65 | /** |
66 | @retval 0 transaction committed |
67 | @retval 1 transaction rolled back |
68 | */ |
69 | int injector::transaction::commit() |
70 | { |
71 | DBUG_ENTER("injector::transaction::commit()" ); |
72 | int error= m_thd->binlog_flush_pending_rows_event(true); |
73 | /* |
74 | Cluster replication does not preserve statement or |
75 | transaction boundaries of the master. Instead, a new |
76 | transaction on replication slave is started when a new GCI |
77 | (global checkpoint identifier) is issued, and is committed |
78 | when the last event of the check point has been received and |
79 | processed. This ensures consistency of each cluster in |
80 | cluster replication, and there is no requirement for stronger |
81 | consistency: MySQL replication is asynchronous with other |
82 | engines as well. |
83 | |
84 | A practical consequence of that is that row level replication |
85 | stream passed through the injector thread never contains |
86 | COMMIT events. |
87 | Here we should preserve the server invariant that there is no |
88 | outstanding statement transaction when the normal transaction |
89 | is committed by committing the statement transaction |
90 | explicitly. |
91 | */ |
92 | trans_commit_stmt(m_thd); |
93 | if (!trans_commit(m_thd)) |
94 | { |
95 | close_thread_tables(m_thd); |
96 | m_thd->mdl_context.release_transactional_locks(); |
97 | } |
98 | DBUG_RETURN(error); |
99 | } |
100 | |
101 | |
102 | int injector::transaction::use_table(server_id_type sid, table tbl) |
103 | { |
104 | DBUG_ENTER("injector::transaction::use_table" ); |
105 | |
106 | int error; |
107 | |
108 | if (unlikely((error= check_state(TABLE_STATE)))) |
109 | DBUG_RETURN(error); |
110 | |
111 | server_id_type save_id= m_thd->variables.server_id; |
112 | m_thd->set_server_id(sid); |
113 | error= m_thd->binlog_write_table_map(tbl.get_table(), |
114 | tbl.is_transactional()); |
115 | m_thd->set_server_id(save_id); |
116 | DBUG_RETURN(error); |
117 | } |
118 | |
119 | |
120 | |
121 | injector::transaction::binlog_pos injector::transaction::start_pos() const |
122 | { |
123 | return m_start_pos; |
124 | } |
125 | |
126 | |
127 | /* |
128 | injector - member definitions |
129 | */ |
130 | |
131 | /* This constructor is called below */ |
132 | inline injector::injector() |
133 | { |
134 | } |
135 | |
136 | static injector *s_injector= 0; |
137 | injector *injector::instance() |
138 | { |
139 | if (s_injector == 0) |
140 | s_injector= new injector; |
141 | /* "There can be only one [instance]" */ |
142 | return s_injector; |
143 | } |
144 | |
145 | void injector::free_instance() |
146 | { |
147 | injector *inj = s_injector; |
148 | |
149 | if (inj != 0) |
150 | { |
151 | s_injector= 0; |
152 | delete inj; |
153 | } |
154 | } |
155 | |
156 | |
157 | injector::transaction injector::new_trans(THD *thd) |
158 | { |
159 | DBUG_ENTER("injector::new_trans(THD*)" ); |
160 | /* |
161 | Currently, there is no alternative to using 'mysql_bin_log' since that |
162 | is hardcoded into the way the handler is using the binary log. |
163 | */ |
164 | DBUG_RETURN(transaction(&mysql_bin_log, thd)); |
165 | } |
166 | |
167 | void injector::new_trans(THD *thd, injector::transaction *ptr) |
168 | { |
169 | DBUG_ENTER("injector::new_trans(THD *, transaction *)" ); |
170 | /* |
171 | Currently, there is no alternative to using 'mysql_bin_log' since that |
172 | is hardcoded into the way the handler is using the binary log. |
173 | */ |
174 | transaction trans(&mysql_bin_log, thd); |
175 | ptr->swap(trans); |
176 | |
177 | DBUG_VOID_RETURN; |
178 | } |
179 | |
180 | int injector::record_incident(THD *thd, Incident incident) |
181 | { |
182 | Incident_log_event ev(thd, incident); |
183 | int error; |
184 | if (unlikely((error= mysql_bin_log.write(&ev)))) |
185 | return error; |
186 | return mysql_bin_log.rotate_and_purge(true); |
187 | } |
188 | |
189 | int injector::record_incident(THD *thd, Incident incident, |
190 | const LEX_CSTRING *message) |
191 | { |
192 | Incident_log_event ev(thd, incident, message); |
193 | int error; |
194 | if (unlikely((error= mysql_bin_log.write(&ev)))) |
195 | return error; |
196 | return mysql_bin_log.rotate_and_purge(true); |
197 | } |
198 | |