1/*
2Copyright (c) 2007, Antony T Curtis
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are
7met:
8
9 * Redistributions of source code must retain the above copyright
10notice, this list of conditions and the following disclaimer.
11
12 * Neither the name of FederatedX nor the names of its
13contributors may be used to endorse or promote products derived from
14this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27*/
28
29
30#define MYSQL_SERVER 1
31#include <my_global.h>
32#include "sql_priv.h"
33#include <mysqld_error.h>
34
35#include "ha_federatedx.h"
36
37#include "m_string.h"
38#include "mysqld_error.h"
39#include "sql_servers.h"
40
41#ifdef USE_PRAGMA_IMPLEMENTATION
42#pragma implementation // gcc: Class implementation
43#endif
44
45
46#define SAVEPOINT_REALIZED 1
47#define SAVEPOINT_RESTRICT 2
48#define SAVEPOINT_EMITTED 4
49
50
51typedef struct federatedx_savepoint
52{
53 ulong level;
54 uint flags;
55} SAVEPT;
56
57struct mysql_position
58{
59 MYSQL_RES* result;
60 MYSQL_ROW_OFFSET offset;
61};
62
63
64class federatedx_io_mysql :public federatedx_io
65{
66 MYSQL mysql; /* MySQL connection */
67 MYSQL_ROWS *current;
68 DYNAMIC_ARRAY savepoints;
69 bool requested_autocommit;
70 bool actual_autocommit;
71
72 int actual_query(const char *buffer, size_t length);
73 bool test_all_restrict() const;
74public:
75 federatedx_io_mysql(FEDERATEDX_SERVER *);
76 ~federatedx_io_mysql();
77
78 int simple_query(const char *fmt, ...);
79 int query(const char *buffer, size_t length);
80 virtual FEDERATEDX_IO_RESULT *store_result();
81
82 virtual size_t max_query_size() const;
83
84 virtual my_ulonglong affected_rows() const;
85 virtual my_ulonglong last_insert_id() const;
86
87 virtual int error_code();
88 virtual const char *error_str();
89
90 void reset();
91 int commit();
92 int rollback();
93
94 int savepoint_set(ulong sp);
95 ulong savepoint_release(ulong sp);
96 ulong savepoint_rollback(ulong sp);
97 void savepoint_restrict(ulong sp);
98
99 ulong last_savepoint() const;
100 ulong actual_savepoint() const;
101 bool is_autocommit() const;
102
103 bool table_metadata(ha_statistics *stats, const char *table_name,
104 uint table_name_length, uint flag);
105
106 /* resultset operations */
107
108 virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
109 virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
110 virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
111 virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result);
112 virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
113 virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
114 unsigned int column);
115 virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
116 unsigned int column) const;
117
118 virtual size_t get_ref_length() const;
119 virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
120 void *ref);
121 virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
122 const void *ref);
123 virtual void set_thd(void *thd);
124};
125
126
127federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
128 FEDERATEDX_SERVER *server)
129{
130 return new (server_root) federatedx_io_mysql(server);
131}
132
133
134federatedx_io_mysql::federatedx_io_mysql(FEDERATEDX_SERVER *aserver)
135 : federatedx_io(aserver),
136 requested_autocommit(TRUE), actual_autocommit(TRUE)
137{
138 DBUG_ENTER("federatedx_io_mysql::federatedx_io_mysql");
139
140 bzero(&mysql, sizeof(MYSQL));
141 bzero(&savepoints, sizeof(DYNAMIC_ARRAY));
142
143 my_init_dynamic_array(&savepoints, sizeof(SAVEPT), 16, 16, MYF(0));
144
145 DBUG_VOID_RETURN;
146}
147
148
149federatedx_io_mysql::~federatedx_io_mysql()
150{
151 DBUG_ENTER("federatedx_io_mysql::~federatedx_io_mysql");
152
153 mysql_close(&mysql);
154 delete_dynamic(&savepoints);
155
156 DBUG_VOID_RETURN;
157}
158
159
160void federatedx_io_mysql::reset()
161{
162 reset_dynamic(&savepoints);
163 set_active(FALSE);
164
165 requested_autocommit= TRUE;
166 mysql.reconnect= 1;
167}
168
169
170int federatedx_io_mysql::commit()
171{
172 int error= 0;
173 DBUG_ENTER("federatedx_io_mysql::commit");
174
175 if (!actual_autocommit && (error= actual_query("COMMIT", 6)))
176 rollback();
177
178 reset();
179
180 DBUG_RETURN(error);
181}
182
183int federatedx_io_mysql::rollback()
184{
185 int error= 0;
186 DBUG_ENTER("federatedx_io_mysql::rollback");
187
188 if (!actual_autocommit)
189 error= actual_query("ROLLBACK", 8);
190 else
191 error= ER_WARNING_NOT_COMPLETE_ROLLBACK;
192
193 reset();
194
195 DBUG_RETURN(error);
196}
197
198
199ulong federatedx_io_mysql::last_savepoint() const
200{
201 SAVEPT *savept= NULL;
202 DBUG_ENTER("federatedx_io_mysql::last_savepoint");
203
204 if (savepoints.elements)
205 savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
206
207 DBUG_RETURN(savept ? savept->level : 0);
208}
209
210
211ulong federatedx_io_mysql::actual_savepoint() const
212{
213 SAVEPT *savept= NULL;
214 uint index= savepoints.elements;
215 DBUG_ENTER("federatedx_io_mysql::last_savepoint");
216
217 while (index)
218 {
219 savept= dynamic_element(&savepoints, --index, SAVEPT *);
220 if (savept->flags & SAVEPOINT_REALIZED)
221 break;
222 savept= NULL;
223 }
224
225 DBUG_RETURN(savept ? savept->level : 0);
226}
227
228bool federatedx_io_mysql::is_autocommit() const
229{
230 return actual_autocommit;
231}
232
233
234int federatedx_io_mysql::savepoint_set(ulong sp)
235{
236 int error;
237 SAVEPT savept;
238 DBUG_ENTER("federatedx_io_mysql::savepoint_set");
239 DBUG_PRINT("info",("savepoint=%lu", sp));
240 DBUG_ASSERT(sp > last_savepoint());
241
242 savept.level= sp;
243 savept.flags= 0;
244
245 if ((error= insert_dynamic(&savepoints, (uchar*) &savept) ? -1 : 0))
246 goto err;
247
248 set_active(TRUE);
249 mysql.reconnect= 0;
250 requested_autocommit= FALSE;
251
252err:
253 DBUG_RETURN(error);
254}
255
256
257ulong federatedx_io_mysql::savepoint_release(ulong sp)
258{
259 SAVEPT *savept, *last= NULL;
260 DBUG_ENTER("federatedx_io_mysql::savepoint_release");
261 DBUG_PRINT("info",("savepoint=%lu", sp));
262
263 while (savepoints.elements)
264 {
265 savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
266 if (savept->level < sp)
267 break;
268 if ((savept->flags & (SAVEPOINT_REALIZED | SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED)
269 last= savept;
270 savepoints.elements--;
271 }
272
273 if (last)
274 {
275 char buffer[STRING_BUFFER_USUAL_SIZE];
276 size_t length= my_snprintf(buffer, sizeof(buffer),
277 "RELEASE SAVEPOINT save%lu", last->level);
278 actual_query(buffer, length);
279 }
280
281 DBUG_RETURN(last_savepoint());
282}
283
284
285ulong federatedx_io_mysql::savepoint_rollback(ulong sp)
286{
287 SAVEPT *savept;
288 uint index;
289 DBUG_ENTER("federatedx_io_mysql::savepoint_release");
290 DBUG_PRINT("info",("savepoint=%lu", sp));
291
292 while (savepoints.elements)
293 {
294 savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
295 if (savept->level <= sp)
296 break;
297 savepoints.elements--;
298 }
299
300 for (index= savepoints.elements, savept= NULL; index;)
301 {
302 savept= dynamic_element(&savepoints, --index, SAVEPT *);
303 if (savept->flags & SAVEPOINT_REALIZED)
304 break;
305 savept= NULL;
306 }
307
308 if (savept && !(savept->flags & SAVEPOINT_RESTRICT))
309 {
310 char buffer[STRING_BUFFER_USUAL_SIZE];
311 size_t length= my_snprintf(buffer, sizeof(buffer),
312 "ROLLBACK TO SAVEPOINT save%lu", savept->level);
313 actual_query(buffer, length);
314 }
315
316 DBUG_RETURN(last_savepoint());
317}
318
319
320void federatedx_io_mysql::savepoint_restrict(ulong sp)
321{
322 SAVEPT *savept;
323 uint index= savepoints.elements;
324 DBUG_ENTER("federatedx_io_mysql::savepoint_restrict");
325
326 while (index)
327 {
328 savept= dynamic_element(&savepoints, --index, SAVEPT *);
329 if (savept->level > sp)
330 continue;
331 if (savept->level < sp)
332 break;
333 savept->flags|= SAVEPOINT_RESTRICT;
334 break;
335 }
336
337 DBUG_VOID_RETURN;
338}
339
340
341int federatedx_io_mysql::simple_query(const char *fmt, ...)
342{
343 char buffer[STRING_BUFFER_USUAL_SIZE];
344 size_t length;
345 int error;
346 va_list arg;
347 DBUG_ENTER("federatedx_io_mysql::simple_query");
348
349 va_start(arg, fmt);
350 length= my_vsnprintf(buffer, sizeof(buffer), fmt, arg);
351 va_end(arg);
352
353 error= query(buffer, length);
354
355 DBUG_RETURN(error);
356}
357
358
359bool federatedx_io_mysql::test_all_restrict() const
360{
361 bool result= FALSE;
362 SAVEPT *savept;
363 uint index= savepoints.elements;
364 DBUG_ENTER("federatedx_io_mysql::test_all_restrict");
365
366 while (index)
367 {
368 savept= dynamic_element(&savepoints, --index, SAVEPT *);
369 if ((savept->flags & (SAVEPOINT_REALIZED |
370 SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED ||
371 (savept->flags & SAVEPOINT_EMITTED))
372 DBUG_RETURN(FALSE);
373 if (savept->flags & SAVEPOINT_RESTRICT)
374 result= TRUE;
375 }
376
377 DBUG_RETURN(result);
378}
379
380
381int federatedx_io_mysql::query(const char *buffer, size_t length)
382{
383 int error;
384 bool wants_autocommit= requested_autocommit | is_readonly();
385 DBUG_ENTER("federatedx_io_mysql::query");
386
387 if (!wants_autocommit && test_all_restrict())
388 wants_autocommit= TRUE;
389
390 if (wants_autocommit != actual_autocommit)
391 {
392 if ((error= actual_query(wants_autocommit ? "SET AUTOCOMMIT=1"
393 : "SET AUTOCOMMIT=0", 16)))
394 DBUG_RETURN(error);
395 mysql.reconnect= wants_autocommit ? 1 : 0;
396 actual_autocommit= wants_autocommit;
397 }
398
399 if (!actual_autocommit && last_savepoint() != actual_savepoint())
400 {
401 SAVEPT *savept= dynamic_element(&savepoints, savepoints.elements - 1,
402 SAVEPT *);
403 if (!(savept->flags & SAVEPOINT_RESTRICT))
404 {
405 char buf[STRING_BUFFER_USUAL_SIZE];
406 size_t len= my_snprintf(buf, sizeof(buf),
407 "SAVEPOINT save%lu", savept->level);
408 if ((error= actual_query(buf, len)))
409 DBUG_RETURN(error);
410 set_active(TRUE);
411 savept->flags|= SAVEPOINT_EMITTED;
412 }
413 savept->flags|= SAVEPOINT_REALIZED;
414 }
415
416 if (!(error= actual_query(buffer, length)))
417 set_active(is_active() || !actual_autocommit);
418
419 DBUG_RETURN(error);
420}
421
422
423int federatedx_io_mysql::actual_query(const char *buffer, size_t length)
424{
425 int error;
426 DBUG_ENTER("federatedx_io_mysql::actual_query");
427
428 if (!mysql.net.vio)
429 {
430 my_bool my_true= 1;
431
432 if (!(mysql_init(&mysql)))
433 DBUG_RETURN(-1);
434
435 /*
436 BUG# 17044 Federated Storage Engine is not UTF8 clean
437 Add set names to whatever charset the table is at open
438 of table
439 */
440 /* this sets the csname like 'set names utf8' */
441 mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, get_charsetname());
442 mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
443 (char*) &my_true);
444
445 if (!mysql_real_connect(&mysql,
446 get_hostname(),
447 get_username(),
448 get_password(),
449 get_database(),
450 get_port(),
451 get_socket(), 0))
452 DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
453 mysql.reconnect= 1;
454 }
455
456 if (!(error= mysql_real_query(&mysql, STRING_WITH_LEN("set time_zone='+00:00'"))))
457 error= mysql_real_query(&mysql, buffer, (ulong)length);
458
459 DBUG_RETURN(error);
460}
461
462size_t federatedx_io_mysql::max_query_size() const
463{
464 return mysql.net.max_packet_size;
465}
466
467
468my_ulonglong federatedx_io_mysql::affected_rows() const
469{
470 return mysql.affected_rows;
471}
472
473
474my_ulonglong federatedx_io_mysql::last_insert_id() const
475{
476 return mysql.insert_id;
477}
478
479
480int federatedx_io_mysql::error_code()
481{
482 return mysql_errno(&mysql);
483}
484
485
486const char *federatedx_io_mysql::error_str()
487{
488 return mysql_error(&mysql);
489}
490
491FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result()
492{
493 FEDERATEDX_IO_RESULT *result;
494 DBUG_ENTER("federatedx_io_mysql::store_result");
495
496 result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql);
497
498 DBUG_RETURN(result);
499}
500
501
502void federatedx_io_mysql::free_result(FEDERATEDX_IO_RESULT *io_result)
503{
504 mysql_free_result((MYSQL_RES *) io_result);
505}
506
507
508unsigned int federatedx_io_mysql::get_num_fields(FEDERATEDX_IO_RESULT *io_result)
509{
510 return mysql_num_fields((MYSQL_RES *) io_result);
511}
512
513
514my_ulonglong federatedx_io_mysql::get_num_rows(FEDERATEDX_IO_RESULT *io_result)
515{
516 return mysql_num_rows((MYSQL_RES *) io_result);
517}
518
519
520FEDERATEDX_IO_ROW *federatedx_io_mysql::fetch_row(FEDERATEDX_IO_RESULT *io_result)
521{
522 MYSQL_RES *result= (MYSQL_RES*)io_result;
523 current= result->data_cursor;
524 return (FEDERATEDX_IO_ROW *) mysql_fetch_row(result);
525}
526
527
528ulong *federatedx_io_mysql::fetch_lengths(FEDERATEDX_IO_RESULT *io_result)
529{
530 return mysql_fetch_lengths((MYSQL_RES *) io_result);
531}
532
533
534const char *federatedx_io_mysql::get_column_data(FEDERATEDX_IO_ROW *row,
535 unsigned int column)
536{
537 return ((MYSQL_ROW)row)[column];
538}
539
540
541bool federatedx_io_mysql::is_column_null(const FEDERATEDX_IO_ROW *row,
542 unsigned int column) const
543{
544 return !((MYSQL_ROW)row)[column];
545}
546
547bool federatedx_io_mysql::table_metadata(ha_statistics *stats,
548 const char *table_name,
549 uint table_name_length, uint flag)
550{
551 char status_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
552 FEDERATEDX_IO_RESULT *result= 0;
553 FEDERATEDX_IO_ROW *row;
554 String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
555 int error;
556
557 status_query_string.length(0);
558 status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
559 append_ident(&status_query_string, table_name,
560 table_name_length, value_quote_char);
561
562 if (query(status_query_string.ptr(), status_query_string.length()))
563 goto error;
564
565 status_query_string.length(0);
566
567 result= store_result();
568
569 /*
570 We're going to use fields num. 4, 12 and 13 of the resultset,
571 so make sure we have these fields.
572 */
573 if (!result || (get_num_fields(result) < 14))
574 goto error;
575
576 if (!get_num_rows(result))
577 goto error;
578
579 if (!(row= fetch_row(result)))
580 goto error;
581
582 /*
583 deleted is set in ha_federatedx::info
584 */
585 /*
586 need to figure out what this means as far as federatedx is concerned,
587 since we don't have a "file"
588
589 data_file_length = ?
590 index_file_length = ?
591 delete_length = ?
592 */
593 if (!is_column_null(row, 4))
594 stats->records= (ha_rows) my_strtoll10(get_column_data(row, 4),
595 (char**) 0, &error);
596 if (!is_column_null(row, 5))
597 stats->mean_rec_length= (ulong) my_strtoll10(get_column_data(row, 5),
598 (char**) 0, &error);
599
600 stats->data_file_length= stats->records * stats->mean_rec_length;
601
602 if (!is_column_null(row, 12))
603 stats->update_time= (time_t) my_strtoll10(get_column_data(row, 12),
604 (char**) 0, &error);
605 if (!is_column_null(row, 13))
606 stats->check_time= (time_t) my_strtoll10(get_column_data(row, 13),
607 (char**) 0, &error);
608
609 free_result(result);
610 return 0;
611
612error:
613 if (!mysql_errno(&mysql))
614 {
615 mysql.net.last_errno= ER_NO_SUCH_TABLE;
616 strmake_buf(mysql.net.last_error, "Remote table does not exist");
617 }
618 free_result(result);
619 return 1;
620}
621
622
623
624size_t federatedx_io_mysql::get_ref_length() const
625{
626 return sizeof(mysql_position);
627}
628
629
630void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result,
631 void *ref)
632{
633 mysql_position& pos= *reinterpret_cast<mysql_position*>(ref);
634 pos.result= (MYSQL_RES *) io_result;
635 pos.offset= current;
636}
637
638int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
639 const void *ref)
640{
641 const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref);
642
643 if (!pos.result || !pos.offset)
644 return HA_ERR_END_OF_FILE;
645
646 pos.result->current_row= 0;
647 pos.result->data_cursor= pos.offset;
648 *io_result= (FEDERATEDX_IO_RESULT*) pos.result;
649
650 return 0;
651}
652
653void federatedx_io_mysql::set_thd(void *thd)
654{
655 mysql.net.thd= thd;
656}
657