1/*
2Copyright (c) 2007, Antony T Curtis
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are
7met:
8
9 * Redistributions of source code must retain the above copyright
10notice, this list of conditions and the following disclaimer.
11
12 * Neither the name of FederatedX nor the names of its
13contributors may be used to endorse or promote products derived from
14this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27*/
28
29#ifdef USE_PRAGMA_IMPLEMENTATION
30#pragma implementation // gcc: Class implementation
31#endif
32
33#define MYSQL_SERVER 1
34#include <my_global.h>
35#include "sql_priv.h"
36
37#include "ha_federatedx.h"
38
39#include "m_string.h"
40#include "table.h"
41#include "sql_servers.h"
42
43federatedx_txn::federatedx_txn()
44 : txn_list(0), savepoint_level(0), savepoint_stmt(0), savepoint_next(0)
45{
46 DBUG_ENTER("federatedx_txn::federatedx_txn");
47 DBUG_VOID_RETURN;
48}
49
50federatedx_txn::~federatedx_txn()
51{
52 DBUG_ENTER("federatedx_txn::~federatedx_txn");
53 DBUG_ASSERT(!txn_list);
54 DBUG_VOID_RETURN;
55}
56
57
58void federatedx_txn::close(FEDERATEDX_SERVER *server)
59{
60 uint count= 0;
61 federatedx_io *io, **iop;
62 DBUG_ENTER("federatedx_txn::close");
63
64 DBUG_ASSERT(!server->use_count);
65 DBUG_PRINT("info",("use count: %u connections: %u",
66 server->use_count, server->io_count));
67
68 for (iop= &txn_list; (io= *iop);)
69 {
70 if (io->server != server)
71 iop= &io->txn_next;
72 else
73 {
74 *iop= io->txn_next;
75 io->txn_next= NULL;
76 io->busy= FALSE;
77
78 io->idle_next= server->idle_list;
79 server->idle_list= io;
80 }
81 }
82
83 while ((io= server->idle_list))
84 {
85 server->idle_list= io->idle_next;
86 delete io;
87 count++;
88 }
89
90 DBUG_PRINT("info",("closed %u connections, txn_list: %s", count,
91 txn_list ? "active": "empty"));
92 DBUG_VOID_RETURN;
93}
94
95
96int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd,
97 bool readonly, federatedx_io **ioptr)
98{
99 federatedx_io *io;
100 FEDERATEDX_SERVER *server= share->s;
101 DBUG_ENTER("federatedx_txn::acquire");
102 DBUG_ASSERT(ioptr && server);
103
104 if (!(io= *ioptr))
105 {
106 /* check to see if we have an available IO connection */
107 for (io= txn_list; io; io= io->txn_next)
108 if (io->server == server)
109 break;
110
111 if (!io)
112 {
113 /* check to see if there are any unowned IO connections */
114 mysql_mutex_lock(&server->mutex);
115 if ((io= server->idle_list))
116 {
117 server->idle_list= io->idle_next;
118 io->idle_next= NULL;
119 }
120 else
121 io= federatedx_io::construct(&server->mem_root, server);
122
123 io->txn_next= txn_list;
124 txn_list= io;
125
126 mysql_mutex_unlock(&server->mutex);
127 }
128
129 if (io->busy)
130 *io->owner_ptr= NULL;
131
132 io->busy= TRUE;
133 io->owner_ptr= ioptr;
134 io->set_thd(thd);
135 }
136
137 DBUG_ASSERT(io->busy && io->server == server);
138
139 io->readonly&= readonly;
140
141 DBUG_RETURN((*ioptr= io) ? 0 : -1);
142}
143
144
145void federatedx_txn::release(federatedx_io **ioptr)
146{
147 federatedx_io *io;
148 DBUG_ENTER("federatedx_txn::release");
149 DBUG_ASSERT(ioptr);
150
151 if ((io= *ioptr))
152 {
153 /* mark as available for reuse in this transaction */
154 io->busy= FALSE;
155 *ioptr= NULL;
156
157 DBUG_PRINT("info", ("active: %d autocommit: %d",
158 io->active, io->is_autocommit()));
159
160 if (io->is_autocommit())
161 {
162 io->set_thd(NULL);
163 io->active= FALSE;
164 }
165 }
166
167 release_scan();
168
169 DBUG_VOID_RETURN;
170}
171
172
173void federatedx_txn::release_scan()
174{
175 uint count= 0, returned= 0;
176 federatedx_io *io, **pio;
177 DBUG_ENTER("federatedx_txn::release_scan");
178
179 /* return any inactive and idle connections to the server */
180 for (pio= &txn_list; (io= *pio); count++)
181 {
182 if (io->active || io->busy)
183 pio= &io->txn_next;
184 else
185 {
186 FEDERATEDX_SERVER *server= io->server;
187
188 /* unlink from list of connections bound to the transaction */
189 *pio= io->txn_next;
190 io->txn_next= NULL;
191
192 /* reset some values */
193 io->readonly= TRUE;
194
195 mysql_mutex_lock(&server->mutex);
196 io->idle_next= server->idle_list;
197 server->idle_list= io;
198 mysql_mutex_unlock(&server->mutex);
199 returned++;
200 }
201 }
202 DBUG_PRINT("info",("returned %u of %u connections(s)", returned, count));
203
204 DBUG_VOID_RETURN;
205}
206
207
208bool federatedx_txn::txn_begin()
209{
210 ulong level= 0;
211 DBUG_ENTER("federatedx_txn::txn_begin");
212
213 if (savepoint_next == 0)
214 {
215 savepoint_next++;
216 savepoint_level= savepoint_stmt= 0;
217 sp_acquire(&level);
218 }
219
220 DBUG_RETURN(level == 1);
221}
222
223
224int federatedx_txn::txn_commit()
225{
226 int error= 0;
227 federatedx_io *io;
228 DBUG_ENTER("federatedx_txn::txn_commit");
229
230 if (savepoint_next)
231 {
232 DBUG_ASSERT(savepoint_stmt != 1);
233
234 for (io= txn_list; io; io= io->txn_next)
235 {
236 int rc= 0;
237
238 if (io->active)
239 rc= io->commit();
240 else
241 io->rollback();
242
243 if (io->active && rc)
244 error= -1;
245
246 io->reset();
247 }
248
249 release_scan();
250
251 savepoint_next= savepoint_stmt= savepoint_level= 0;
252 }
253
254 DBUG_RETURN(error);
255}
256
257
258int federatedx_txn::txn_rollback()
259{
260 int error= 0;
261 federatedx_io *io;
262 DBUG_ENTER("federatedx_txn::txn_commit");
263
264 if (savepoint_next)
265 {
266 DBUG_ASSERT(savepoint_stmt != 1);
267
268 for (io= txn_list; io; io= io->txn_next)
269 {
270 int rc= io->rollback();
271
272 if (io->active && rc)
273 error= -1;
274
275 io->reset();
276 }
277
278 release_scan();
279
280 savepoint_next= savepoint_stmt= savepoint_level= 0;
281 }
282
283 DBUG_RETURN(error);
284}
285
286
287bool federatedx_txn::sp_acquire(ulong *sp)
288{
289 bool rc= FALSE;
290 federatedx_io *io;
291 DBUG_ENTER("federatedx_txn::sp_acquire");
292 DBUG_ASSERT(sp && savepoint_next);
293
294 *sp= savepoint_level= savepoint_next++;
295
296 for (io= txn_list; io; io= io->txn_next)
297 {
298 if (io->readonly)
299 continue;
300
301 io->savepoint_set(savepoint_level);
302 rc= TRUE;
303 }
304
305 DBUG_RETURN(rc);
306}
307
308
309int federatedx_txn::sp_rollback(ulong *sp)
310{
311 ulong level, new_level= savepoint_level;
312 federatedx_io *io;
313 DBUG_ENTER("federatedx_txn::sp_rollback");
314 DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
315
316 for (io= txn_list; io; io= io->txn_next)
317 {
318 if (io->readonly)
319 continue;
320
321 if ((level= io->savepoint_rollback(*sp)) < new_level)
322 new_level= level;
323 }
324
325 savepoint_level= new_level;
326
327 DBUG_RETURN(0);
328}
329
330
331int federatedx_txn::sp_release(ulong *sp)
332{
333 ulong level, new_level= savepoint_level;
334 federatedx_io *io;
335 DBUG_ENTER("federatedx_txn::sp_release");
336 DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
337
338 for (io= txn_list; io; io= io->txn_next)
339 {
340 if (io->readonly)
341 continue;
342
343 if ((level= io->savepoint_release(*sp)) < new_level)
344 new_level= level;
345 }
346
347 savepoint_level= new_level;
348 *sp= 0;
349
350 DBUG_RETURN(0);
351}
352
353
354bool federatedx_txn::stmt_begin()
355{
356 bool result= FALSE;
357 DBUG_ENTER("federatedx_txn::stmt_begin");
358
359 if (!savepoint_stmt)
360 {
361 if (!savepoint_next)
362 {
363 savepoint_next++;
364 savepoint_level= savepoint_stmt= 0;
365 }
366 result= sp_acquire(&savepoint_stmt);
367 }
368
369 DBUG_RETURN(result);
370}
371
372
373int federatedx_txn::stmt_commit()
374{
375 int result= 0;
376 DBUG_ENTER("federatedx_txn::stmt_commit");
377
378 if (savepoint_stmt == 1)
379 {
380 savepoint_stmt= 0;
381 result= txn_commit();
382 }
383 else
384 if (savepoint_stmt)
385 result= sp_release(&savepoint_stmt);
386
387 DBUG_RETURN(result);
388}
389
390
391int federatedx_txn::stmt_rollback()
392{
393 int result= 0;
394 DBUG_ENTER("federated:txn::stmt_rollback");
395
396 if (savepoint_stmt == 1)
397 {
398 savepoint_stmt= 0;
399 result= txn_rollback();
400 }
401 else
402 if (savepoint_stmt)
403 {
404 result= sp_rollback(&savepoint_stmt);
405 sp_release(&savepoint_stmt);
406 }
407
408 DBUG_RETURN(result);
409}
410
411
412void federatedx_txn::stmt_autocommit()
413{
414 federatedx_io *io;
415 DBUG_ENTER("federatedx_txn::stmt_autocommit");
416
417 for (io= txn_list; savepoint_stmt && io; io= io->txn_next)
418 {
419 if (io->readonly)
420 continue;
421
422 io->savepoint_restrict(savepoint_stmt);
423 }
424
425 DBUG_VOID_RETURN;
426}
427
428
429