1 | /* |
2 | Copyright (c) 2007, Antony T Curtis |
3 | All rights reserved. |
4 | |
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted provided that the following conditions are |
7 | met: |
8 | |
9 | * Redistributions of source code must retain the above copyright |
10 | notice, this list of conditions and the following disclaimer. |
11 | |
12 | * Neither the name of FederatedX nor the names of its |
13 | contributors may be used to endorse or promote products derived from |
14 | this software without specific prior written permission. |
15 | |
16 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
17 | "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
18 | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
19 | A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
20 | OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
21 | SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
22 | LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
23 | DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
24 | THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
25 | (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
26 | OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifdef USE_PRAGMA_IMPLEMENTATION |
30 | #pragma implementation // gcc: Class implementation |
31 | #endif |
32 | |
33 | #define MYSQL_SERVER 1 |
34 | #include <my_global.h> |
35 | #include "sql_priv.h" |
36 | |
37 | #include "ha_federatedx.h" |
38 | |
39 | #include "m_string.h" |
40 | #include "table.h" |
41 | #include "sql_servers.h" |
42 | |
43 | federatedx_txn::federatedx_txn() |
44 | : txn_list(0), savepoint_level(0), savepoint_stmt(0), savepoint_next(0) |
45 | { |
46 | DBUG_ENTER("federatedx_txn::federatedx_txn" ); |
47 | DBUG_VOID_RETURN; |
48 | } |
49 | |
50 | federatedx_txn::~federatedx_txn() |
51 | { |
52 | DBUG_ENTER("federatedx_txn::~federatedx_txn" ); |
53 | DBUG_ASSERT(!txn_list); |
54 | DBUG_VOID_RETURN; |
55 | } |
56 | |
57 | |
58 | void federatedx_txn::close(FEDERATEDX_SERVER *server) |
59 | { |
60 | uint count= 0; |
61 | federatedx_io *io, **iop; |
62 | DBUG_ENTER("federatedx_txn::close" ); |
63 | |
64 | DBUG_ASSERT(!server->use_count); |
65 | DBUG_PRINT("info" ,("use count: %u connections: %u" , |
66 | server->use_count, server->io_count)); |
67 | |
68 | for (iop= &txn_list; (io= *iop);) |
69 | { |
70 | if (io->server != server) |
71 | iop= &io->txn_next; |
72 | else |
73 | { |
74 | *iop= io->txn_next; |
75 | io->txn_next= NULL; |
76 | io->busy= FALSE; |
77 | |
78 | io->idle_next= server->idle_list; |
79 | server->idle_list= io; |
80 | } |
81 | } |
82 | |
83 | while ((io= server->idle_list)) |
84 | { |
85 | server->idle_list= io->idle_next; |
86 | delete io; |
87 | count++; |
88 | } |
89 | |
90 | DBUG_PRINT("info" ,("closed %u connections, txn_list: %s" , count, |
91 | txn_list ? "active" : "empty" )); |
92 | DBUG_VOID_RETURN; |
93 | } |
94 | |
95 | |
96 | int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd, |
97 | bool readonly, federatedx_io **ioptr) |
98 | { |
99 | federatedx_io *io; |
100 | FEDERATEDX_SERVER *server= share->s; |
101 | DBUG_ENTER("federatedx_txn::acquire" ); |
102 | DBUG_ASSERT(ioptr && server); |
103 | |
104 | if (!(io= *ioptr)) |
105 | { |
106 | /* check to see if we have an available IO connection */ |
107 | for (io= txn_list; io; io= io->txn_next) |
108 | if (io->server == server) |
109 | break; |
110 | |
111 | if (!io) |
112 | { |
113 | /* check to see if there are any unowned IO connections */ |
114 | mysql_mutex_lock(&server->mutex); |
115 | if ((io= server->idle_list)) |
116 | { |
117 | server->idle_list= io->idle_next; |
118 | io->idle_next= NULL; |
119 | } |
120 | else |
121 | io= federatedx_io::construct(&server->mem_root, server); |
122 | |
123 | io->txn_next= txn_list; |
124 | txn_list= io; |
125 | |
126 | mysql_mutex_unlock(&server->mutex); |
127 | } |
128 | |
129 | if (io->busy) |
130 | *io->owner_ptr= NULL; |
131 | |
132 | io->busy= TRUE; |
133 | io->owner_ptr= ioptr; |
134 | io->set_thd(thd); |
135 | } |
136 | |
137 | DBUG_ASSERT(io->busy && io->server == server); |
138 | |
139 | io->readonly&= readonly; |
140 | |
141 | DBUG_RETURN((*ioptr= io) ? 0 : -1); |
142 | } |
143 | |
144 | |
145 | void federatedx_txn::release(federatedx_io **ioptr) |
146 | { |
147 | federatedx_io *io; |
148 | DBUG_ENTER("federatedx_txn::release" ); |
149 | DBUG_ASSERT(ioptr); |
150 | |
151 | if ((io= *ioptr)) |
152 | { |
153 | /* mark as available for reuse in this transaction */ |
154 | io->busy= FALSE; |
155 | *ioptr= NULL; |
156 | |
157 | DBUG_PRINT("info" , ("active: %d autocommit: %d" , |
158 | io->active, io->is_autocommit())); |
159 | |
160 | if (io->is_autocommit()) |
161 | { |
162 | io->set_thd(NULL); |
163 | io->active= FALSE; |
164 | } |
165 | } |
166 | |
167 | release_scan(); |
168 | |
169 | DBUG_VOID_RETURN; |
170 | } |
171 | |
172 | |
173 | void federatedx_txn::release_scan() |
174 | { |
175 | uint count= 0, returned= 0; |
176 | federatedx_io *io, **pio; |
177 | DBUG_ENTER("federatedx_txn::release_scan" ); |
178 | |
179 | /* return any inactive and idle connections to the server */ |
180 | for (pio= &txn_list; (io= *pio); count++) |
181 | { |
182 | if (io->active || io->busy) |
183 | pio= &io->txn_next; |
184 | else |
185 | { |
186 | FEDERATEDX_SERVER *server= io->server; |
187 | |
188 | /* unlink from list of connections bound to the transaction */ |
189 | *pio= io->txn_next; |
190 | io->txn_next= NULL; |
191 | |
192 | /* reset some values */ |
193 | io->readonly= TRUE; |
194 | |
195 | mysql_mutex_lock(&server->mutex); |
196 | io->idle_next= server->idle_list; |
197 | server->idle_list= io; |
198 | mysql_mutex_unlock(&server->mutex); |
199 | returned++; |
200 | } |
201 | } |
202 | DBUG_PRINT("info" ,("returned %u of %u connections(s)" , returned, count)); |
203 | |
204 | DBUG_VOID_RETURN; |
205 | } |
206 | |
207 | |
208 | bool federatedx_txn::txn_begin() |
209 | { |
210 | ulong level= 0; |
211 | DBUG_ENTER("federatedx_txn::txn_begin" ); |
212 | |
213 | if (savepoint_next == 0) |
214 | { |
215 | savepoint_next++; |
216 | savepoint_level= savepoint_stmt= 0; |
217 | sp_acquire(&level); |
218 | } |
219 | |
220 | DBUG_RETURN(level == 1); |
221 | } |
222 | |
223 | |
224 | int federatedx_txn::txn_commit() |
225 | { |
226 | int error= 0; |
227 | federatedx_io *io; |
228 | DBUG_ENTER("federatedx_txn::txn_commit" ); |
229 | |
230 | if (savepoint_next) |
231 | { |
232 | DBUG_ASSERT(savepoint_stmt != 1); |
233 | |
234 | for (io= txn_list; io; io= io->txn_next) |
235 | { |
236 | int rc= 0; |
237 | |
238 | if (io->active) |
239 | rc= io->commit(); |
240 | else |
241 | io->rollback(); |
242 | |
243 | if (io->active && rc) |
244 | error= -1; |
245 | |
246 | io->reset(); |
247 | } |
248 | |
249 | release_scan(); |
250 | |
251 | savepoint_next= savepoint_stmt= savepoint_level= 0; |
252 | } |
253 | |
254 | DBUG_RETURN(error); |
255 | } |
256 | |
257 | |
258 | int federatedx_txn::txn_rollback() |
259 | { |
260 | int error= 0; |
261 | federatedx_io *io; |
262 | DBUG_ENTER("federatedx_txn::txn_commit" ); |
263 | |
264 | if (savepoint_next) |
265 | { |
266 | DBUG_ASSERT(savepoint_stmt != 1); |
267 | |
268 | for (io= txn_list; io; io= io->txn_next) |
269 | { |
270 | int rc= io->rollback(); |
271 | |
272 | if (io->active && rc) |
273 | error= -1; |
274 | |
275 | io->reset(); |
276 | } |
277 | |
278 | release_scan(); |
279 | |
280 | savepoint_next= savepoint_stmt= savepoint_level= 0; |
281 | } |
282 | |
283 | DBUG_RETURN(error); |
284 | } |
285 | |
286 | |
287 | bool federatedx_txn::sp_acquire(ulong *sp) |
288 | { |
289 | bool rc= FALSE; |
290 | federatedx_io *io; |
291 | DBUG_ENTER("federatedx_txn::sp_acquire" ); |
292 | DBUG_ASSERT(sp && savepoint_next); |
293 | |
294 | *sp= savepoint_level= savepoint_next++; |
295 | |
296 | for (io= txn_list; io; io= io->txn_next) |
297 | { |
298 | if (io->readonly) |
299 | continue; |
300 | |
301 | io->savepoint_set(savepoint_level); |
302 | rc= TRUE; |
303 | } |
304 | |
305 | DBUG_RETURN(rc); |
306 | } |
307 | |
308 | |
309 | int federatedx_txn::sp_rollback(ulong *sp) |
310 | { |
311 | ulong level, new_level= savepoint_level; |
312 | federatedx_io *io; |
313 | DBUG_ENTER("federatedx_txn::sp_rollback" ); |
314 | DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level); |
315 | |
316 | for (io= txn_list; io; io= io->txn_next) |
317 | { |
318 | if (io->readonly) |
319 | continue; |
320 | |
321 | if ((level= io->savepoint_rollback(*sp)) < new_level) |
322 | new_level= level; |
323 | } |
324 | |
325 | savepoint_level= new_level; |
326 | |
327 | DBUG_RETURN(0); |
328 | } |
329 | |
330 | |
331 | int federatedx_txn::sp_release(ulong *sp) |
332 | { |
333 | ulong level, new_level= savepoint_level; |
334 | federatedx_io *io; |
335 | DBUG_ENTER("federatedx_txn::sp_release" ); |
336 | DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level); |
337 | |
338 | for (io= txn_list; io; io= io->txn_next) |
339 | { |
340 | if (io->readonly) |
341 | continue; |
342 | |
343 | if ((level= io->savepoint_release(*sp)) < new_level) |
344 | new_level= level; |
345 | } |
346 | |
347 | savepoint_level= new_level; |
348 | *sp= 0; |
349 | |
350 | DBUG_RETURN(0); |
351 | } |
352 | |
353 | |
354 | bool federatedx_txn::stmt_begin() |
355 | { |
356 | bool result= FALSE; |
357 | DBUG_ENTER("federatedx_txn::stmt_begin" ); |
358 | |
359 | if (!savepoint_stmt) |
360 | { |
361 | if (!savepoint_next) |
362 | { |
363 | savepoint_next++; |
364 | savepoint_level= savepoint_stmt= 0; |
365 | } |
366 | result= sp_acquire(&savepoint_stmt); |
367 | } |
368 | |
369 | DBUG_RETURN(result); |
370 | } |
371 | |
372 | |
373 | int federatedx_txn::stmt_commit() |
374 | { |
375 | int result= 0; |
376 | DBUG_ENTER("federatedx_txn::stmt_commit" ); |
377 | |
378 | if (savepoint_stmt == 1) |
379 | { |
380 | savepoint_stmt= 0; |
381 | result= txn_commit(); |
382 | } |
383 | else |
384 | if (savepoint_stmt) |
385 | result= sp_release(&savepoint_stmt); |
386 | |
387 | DBUG_RETURN(result); |
388 | } |
389 | |
390 | |
391 | int federatedx_txn::stmt_rollback() |
392 | { |
393 | int result= 0; |
394 | DBUG_ENTER("federated:txn::stmt_rollback" ); |
395 | |
396 | if (savepoint_stmt == 1) |
397 | { |
398 | savepoint_stmt= 0; |
399 | result= txn_rollback(); |
400 | } |
401 | else |
402 | if (savepoint_stmt) |
403 | { |
404 | result= sp_rollback(&savepoint_stmt); |
405 | sp_release(&savepoint_stmt); |
406 | } |
407 | |
408 | DBUG_RETURN(result); |
409 | } |
410 | |
411 | |
412 | void federatedx_txn::stmt_autocommit() |
413 | { |
414 | federatedx_io *io; |
415 | DBUG_ENTER("federatedx_txn::stmt_autocommit" ); |
416 | |
417 | for (io= txn_list; savepoint_stmt && io; io= io->txn_next) |
418 | { |
419 | if (io->readonly) |
420 | continue; |
421 | |
422 | io->savepoint_restrict(savepoint_stmt); |
423 | } |
424 | |
425 | DBUG_VOID_RETURN; |
426 | } |
427 | |
428 | |
429 | |