1/* Copyright (c) 2004, 2015, Oracle and/or its affiliates.
2 Copyright (c) 2017, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17/*
18
19 MySQL Federated Storage Engine
20
21 ha_federated.cc - MySQL Federated Storage Engine
22 Patrick Galbraith and Brian Aker, 2004
23
24 This is a handler which uses a foreign database as the data file, as
25 opposed to a handler like MyISAM, which uses .MYD files locally.
26
27 How this handler works
28 ----------------------------------
29 Normal database files are local and as such: You create a table called
30 'users', a file such as 'users.MYD' is created. A handler reads, inserts,
31 deletes, updates data in this file. The data is stored in particular format,
32 so to read, that data has to be parsed into fields, to write, fields have to
33 be stored in this format to write to this data file.
34
35 With MySQL Federated storage engine, there will be no local files
36 for each table's data (such as .MYD). A foreign database will store
37 the data that would normally be in this file. This will necessitate
38 the use of MySQL client API to read, delete, update, insert this
39 data. The data will have to be retrieve via an SQL call "SELECT *
40 FROM users". Then, to read this data, it will have to be retrieved
41 via mysql_fetch_row one row at a time, then converted from the
42 column in this select into the format that the handler expects.
43
44 The create table will simply create the .frm file, and within the
45 "CREATE TABLE" SQL, there SHALL be any of the following :
46
47 connection=scheme://username:password@hostname:port/database/tablename
48 connection=scheme://username@hostname/database/tablename
49 connection=scheme://username:password@hostname/database/tablename
50 connection=scheme://username:password@hostname/database/tablename
51
52 - OR -
53
54 As of 5.1 (See worklog #3031), federated now allows you to use a non-url
55 format, taking advantage of mysql.servers:
56
57 connection="connection_one"
58 connection="connection_one/table_foo"
59
60 An example would be:
61
62 connection=mysql://username:password@hostname:port/database/tablename
63
64 or, if we had:
65
66 create server 'server_one' foreign data wrapper 'mysql' options
67 (HOST '127.0.0.1',
68 DATABASE 'db1',
69 USER 'root',
70 PASSWORD '',
71 PORT 3306,
72 SOCKET '',
73 OWNER 'root');
74
75 CREATE TABLE federated.t1 (
76 `id` int(20) NOT NULL,
77 `name` varchar(64) NOT NULL default ''
78 )
79 ENGINE="FEDERATED" DEFAULT CHARSET=latin1
80 CONNECTION='server_one';
81
82 So, this will have been the equivalent of
83
84 CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
85
86 Then, we can also change the server to point to a new schema:
87
88 ALTER SERVER 'server_one' options(DATABASE 'db2');
89
90 All subsequent calls will now be against db2.t1! Guess what? You don't
91 have to perform an alter table!
92
93 This connecton="connection string" is necessary for the handler to be
94 able to connect to the foreign server, either by URL, or by server
95 name.
96
97
98 The basic flow is this:
99
100 SQL calls issues locally ->
101 mysql handler API (data in handler format) ->
102 mysql client API (data converted to SQL calls) ->
103 foreign database -> mysql client API ->
104 convert result sets (if any) to handler format ->
105 handler API -> results or rows affected to local
106
107 What this handler does and doesn't support
108 ------------------------------------------
109 * Tables MUST be created on the foreign server prior to any action on those
110 tables via the handler, first version. IMPORTANT: IF you MUST use the
111 federated storage engine type on the REMOTE end, MAKE SURE [ :) ] That
112 the table you connect to IS NOT a table pointing BACK to your ORIGNAL
113 table! You know and have heard the screaching of audio feedback? You
114 know putting two mirror in front of each other how the reflection
115 continues for eternity? Well, need I say more?!
116 * There will not be support for transactions.
117 * There is no way for the handler to know if the foreign database or table
118 has changed. The reason for this is that this database has to work like a
119 data file that would never be written to by anything other than the
120 database. The integrity of the data in the local table could be breached
121 if there was any change to the foreign database.
122 * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
123 * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
124 * Prepared statements will not be used in the first implementation, it
125 remains to to be seen whether the limited subset of the client API for the
126 server supports this.
127 * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
128 implementation.
129 * This will not work with the query cache.
130
131 Method calls
132
133 A two column table, with one record:
134
135 (SELECT)
136
137 "SELECT * FROM foo"
138 ha_federated::info
139 ha_federated::scan_time:
140 ha_federated::rnd_init: share->select_query SELECT * FROM foo
141 ha_federated::extra
142
143 <for every row of data retrieved>
144 ha_federated::rnd_next
145 ha_federated::convert_row_to_internal_format
146 ha_federated::rnd_next
147 </for every row of data retrieved>
148
149 ha_federated::rnd_end
150 ha_federated::extra
151 ha_federated::reset
152
153 (INSERT)
154
155 "INSERT INTO foo (id, ts) VALUES (2, now());"
156
157 ha_federated::write_row
158
159 ha_federated::reset
160
161 (UPDATE)
162
163 "UPDATE foo SET ts = now() WHERE id = 1;"
164
165 ha_federated::index_init
166 ha_federated::index_read
167 ha_federated::index_read_idx
168 ha_federated::rnd_next
169 ha_federated::convert_row_to_internal_format
170 ha_federated::update_row
171
172 ha_federated::extra
173 ha_federated::extra
174 ha_federated::extra
175 ha_federated::external_lock
176 ha_federated::reset
177
178
179 How do I use this handler?
180 --------------------------
181 First of all, you need to build this storage engine:
182
183 ./configure --with-federated-storage-engine
184 make
185
186 Next, to use this handler, it's very simple. You must
187 have two databases running, either both on the same host, or
188 on different hosts.
189
190 One the server that will be connecting to the foreign
191 host (client), you create your table as such:
192
193 CREATE TABLE test_table (
194 id int(20) NOT NULL auto_increment,
195 name varchar(32) NOT NULL default '',
196 other int(20) NOT NULL default '0',
197 PRIMARY KEY (id),
198 KEY name (name),
199 KEY other_key (other))
200 ENGINE="FEDERATED"
201 DEFAULT CHARSET=latin1
202 CONNECTION='mysql://root@127.0.0.1:9306/federated/test_federated';
203
204 Notice the "COMMENT" and "ENGINE" field? This is where you
205 respectively set the engine type, "FEDERATED" and foreign
206 host information, this being the database your 'client' database
207 will connect to and use as the "data file". Obviously, the foreign
208 database is running on port 9306, so you want to start up your other
209 database so that it is indeed on port 9306, and your federated
210 database on a port other than that. In my setup, I use port 5554
211 for federated, and port 5555 for the foreign database.
212
213 Then, on the foreign database:
214
215 CREATE TABLE test_table (
216 id int(20) NOT NULL auto_increment,
217 name varchar(32) NOT NULL default '',
218 other int(20) NOT NULL default '0',
219 PRIMARY KEY (id),
220 KEY name (name),
221 KEY other_key (other))
222 ENGINE="<NAME>" <-- whatever you want, or not specify
223 DEFAULT CHARSET=latin1 ;
224
225 This table is exactly the same (and must be exactly the same),
226 except that it is not using the federated handler and does
227 not need the URL.
228
229
230 How to see the handler in action
231 --------------------------------
232
233 When developing this handler, I compiled the federated database with
234 debugging:
235
236 ./configure --with-federated-storage-engine
237 --prefix=/home/mysql/mysql-build/federated/ --with-debug
238
239 Once compiled, I did a 'make install' (not for the purpose of installing
240 the binary, but to install all the files the binary expects to see in the
241 diretory I specified in the build with --prefix,
242 "/home/mysql/mysql-build/federated".
243
244 Then, I started the foreign server:
245
246 /usr/local/mysql/bin/mysqld_safe
247 --user=mysql --log=/tmp/mysqld.5555.log -P 5555
248
249 Then, I went back to the directory containing the newly compiled mysqld,
250 <builddir>/sql/, started up gdb:
251
252 gdb ./mysqld
253
254 Then, withn the (gdb) prompt:
255 (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug-dbug
256
257 Next, I open several windows for each:
258
259 1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
260 2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
261 3. A window with a client open to the federated server on port 5554
262 4. A window with a client open to the federated server on port 5555
263
264 I would create a table on the client to the foreign server on port
265 5555, and then to the federated server on port 5554. At this point,
266 I would run whatever queries I wanted to on the federated server,
267 just always remembering that whatever changes I wanted to make on
268 the table, or if I created new tables, that I would have to do that
269 on the foreign server.
270
271 Another thing to look for is 'show variables' to show you that you have
272 support for federated handler support:
273
274 show variables like '%federat%'
275
276 and:
277
278 show storage engines;
279
280 Both should display the federated storage handler.
281
282
283 Testing
284 -------
285
286 There is a test for MySQL Federated Storage Handler in ./mysql-test/t,
287 federatedd.test It starts both a slave and master database using
288 the same setup that the replication tests use, with the exception that
289 it turns off replication, and sets replication to ignore the test tables.
290 After ensuring that you actually do have support for the federated storage
291 handler, numerous queries/inserts/updates/deletes are run, many derived
292 from the MyISAM tests, plus som other tests which were meant to reveal
293 any issues that would be most likely to affect this handler. All tests
294 should work! ;)
295
296 To run these tests, go into ./mysql-test (based in the directory you
297 built the server in)
298
299 ./mysql-test-run federated
300
301 To run the test, or if you want to run the test and have debug info:
302
303 ./mysql-test-run --debug federated
304
305 This will run the test in debug mode, and you can view the trace and
306 log files in the ./mysql-test/var/log directory
307
308 ls -l mysql-test/var/log/
309 -rw-r--r-- 1 patg patg 17 4 Dec 12:27 current_test
310 -rw-r--r-- 1 patg patg 692 4 Dec 12:52 manager.log
311 -rw-rw---- 1 patg patg 21246 4 Dec 12:51 master-bin.000001
312 -rw-rw---- 1 patg patg 68 4 Dec 12:28 master-bin.index
313 -rw-r--r-- 1 patg patg 1620 4 Dec 12:51 master.err
314 -rw-rw---- 1 patg patg 23179 4 Dec 12:51 master.log
315 -rw-rw---- 1 patg patg 16696550 4 Dec 12:51 master.trace
316 -rw-r--r-- 1 patg patg 0 4 Dec 12:28 mysqltest-time
317 -rw-r--r-- 1 patg patg 2024051 4 Dec 12:51 mysqltest.trace
318 -rw-rw---- 1 patg patg 94992 4 Dec 12:51 slave-bin.000001
319 -rw-rw---- 1 patg patg 67 4 Dec 12:28 slave-bin.index
320 -rw-rw---- 1 patg patg 249 4 Dec 12:52 slave-relay-bin.000003
321 -rw-rw---- 1 patg patg 73 4 Dec 12:28 slave-relay-bin.index
322 -rw-r--r-- 1 patg patg 1349 4 Dec 12:51 slave.err
323 -rw-rw---- 1 patg patg 96206 4 Dec 12:52 slave.log
324 -rw-rw---- 1 patg patg 15706355 4 Dec 12:51 slave.trace
325 -rw-r--r-- 1 patg patg 0 4 Dec 12:51 warnings
326
327 Of course, again, you can tail the trace log:
328
329 tail -f mysql-test/var/log/master.trace |grep ha_fed
330
331 As well as the slave query log:
332
333 tail -f mysql-test/var/log/slave.log
334
335 Files that comprise the test suit
336 ---------------------------------
337 mysql-test/t/federated.test
338 mysql-test/r/federated.result
339 mysql-test/r/have_federated_db.require
340 mysql-test/include/have_federated_db.inc
341
342
343 Other tidbits
344 -------------
345
346 These were the files that were modified or created for this
347 Federated handler to work, in 5.0:
348
349 ./configure.in
350 ./sql/Makefile.am
351 ./config/ac_macros/ha_federated.m4
352 ./sql/handler.cc
353 ./sql/mysqld.cc
354 ./sql/set_var.cc
355 ./sql/field.h
356 ./sql/sql_string.h
357 ./mysql-test/mysql-test-run(.sh)
358 ./mysql-test/t/federated.test
359 ./mysql-test/r/federated.result
360 ./mysql-test/r/have_federated_db.require
361 ./mysql-test/include/have_federated_db.inc
362 ./sql/ha_federated.cc
363 ./sql/ha_federated.h
364
365 In 5.1
366
367 my:~/mysql-build/mysql-5.1-bkbits patg$ ls storage/federated/
368 CMakeLists.txt Makefile.in ha_federated.h plug.in
369 Makefile SCCS libfederated.a
370 Makefile.am ha_federated.cc libfederated_a-ha_federated.o
371
372*/
373
374
375#define MYSQL_SERVER 1
376#include <my_global.h>
377#include "sql_priv.h"
378#include "sql_servers.h" // FOREIGN_SERVER, get_server_by_name
379#include "sql_class.h" // SSV
380#include "sql_analyse.h" // append_escaped
381#include <mysql/plugin.h>
382
383#ifdef USE_PRAGMA_IMPLEMENTATION
384#pragma implementation // gcc: Class implementation
385#endif
386
387#include "ha_federated.h"
388
389#include "m_string.h"
390#include "key.h" // key_copy
391
392#include <mysql/plugin.h>
393
394#ifdef I_AM_PARANOID
395#define MIN_PORT 1023
396#else
397#define MIN_PORT 0
398#endif
399
400/* Variables for federated share methods */
401static HASH federated_open_tables; // To track open tables
402mysql_mutex_t federated_mutex; // To init the hash
403static char ident_quote_char= '`'; // Character for quoting
404 // identifiers
405static char value_quote_char= '\''; // Character for quoting
406 // literals
407static const int bulk_padding= 64; // bytes "overhead" in packet
408
409/* Variables used when chopping off trailing characters */
410static const uint sizeof_trailing_comma= sizeof(", ") - 1;
411static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
412static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
413
414/* Static declaration for handerton */
415static handler *federated_create_handler(handlerton *hton,
416 TABLE_SHARE *table,
417 MEM_ROOT *mem_root);
418static int federated_commit(handlerton *hton, THD *thd, bool all);
419static int federated_rollback(handlerton *hton, THD *thd, bool all);
420
421/* Federated storage engine handlerton */
422
423static handler *federated_create_handler(handlerton *hton,
424 TABLE_SHARE *table,
425 MEM_ROOT *mem_root)
426{
427 return new (mem_root) ha_federated(hton, table);
428}
429
430
431/* Function we use in the creation of our hash to get key */
432
433static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
434 my_bool not_used __attribute__ ((unused)))
435{
436 *length= share->share_key_length;
437 return (uchar*) share->share_key;
438}
439
440#ifdef HAVE_PSI_INTERFACE
441static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;
442
443static PSI_mutex_info all_federated_mutexes[]=
444{
445 { &fe_key_mutex_federated, "federated", PSI_FLAG_GLOBAL},
446 { &fe_key_mutex_FEDERATED_SHARE_mutex, "FEDERATED_SHARE::mutex", 0}
447};
448
449static void init_federated_psi_keys(void)
450{
451 const char* category= "federated";
452 int count;
453
454 if (PSI_server == NULL)
455 return;
456
457 count= array_elements(all_federated_mutexes);
458 PSI_server->register_mutex(category, all_federated_mutexes, count);
459}
460#endif /* HAVE_PSI_INTERFACE */
461
462/*
463 Initialize the federated handler.
464
465 SYNOPSIS
466 federated_db_init()
467 p Handlerton
468
469 RETURN
470 FALSE OK
471 TRUE Error
472*/
473
474int federated_db_init(void *p)
475{
476 DBUG_ENTER("federated_db_init");
477
478#ifdef HAVE_PSI_INTERFACE
479 init_federated_psi_keys();
480#endif /* HAVE_PSI_INTERFACE */
481
482 handlerton *federated_hton= (handlerton *)p;
483 federated_hton->state= SHOW_OPTION_YES;
484 federated_hton->db_type= DB_TYPE_FEDERATED_DB;
485 federated_hton->commit= federated_commit;
486 federated_hton->rollback= federated_rollback;
487 federated_hton->create= federated_create_handler;
488 federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
489
490 /*
491 Support for transactions disabled until WL#2952 fixes it.
492 We do it like this to avoid "defined but not used" compiler warnings.
493 */
494 federated_hton->commit= 0;
495 federated_hton->rollback= 0;
496
497 if (mysql_mutex_init(fe_key_mutex_federated,
498 &federated_mutex, MY_MUTEX_INIT_FAST))
499 goto error;
500 if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
501 (my_hash_get_key) federated_get_key, 0, 0))
502 {
503 DBUG_RETURN(FALSE);
504 }
505
506 mysql_mutex_destroy(&federated_mutex);
507error:
508 DBUG_RETURN(TRUE);
509}
510
511
512/*
513 Release the federated handler.
514
515 SYNOPSIS
516 federated_db_end()
517
518 RETURN
519 FALSE OK
520*/
521
522int federated_done(void *p)
523{
524 my_hash_free(&federated_open_tables);
525 mysql_mutex_destroy(&federated_mutex);
526
527 return 0;
528}
529
530
531/**
532 @brief Append identifiers to the string.
533
534 @param[in,out] string The target string.
535 @param[in] name Identifier name
536 @param[in] length Length of identifier name in bytes
537 @param[in] quote_char Quote char to use for quoting identifier.
538
539 @return Operation Status
540 @retval FALSE OK
541 @retval TRUE There was an error appending to the string.
542
543 @note This function is based upon the append_identifier() function
544 in sql_show.cc except that quoting always occurs.
545*/
546
547static bool append_ident(String *string, const char *name, size_t length,
548 const char quote_char)
549{
550 bool result;
551 uint clen;
552 const char *name_end;
553 DBUG_ENTER("append_ident");
554
555 if (quote_char)
556 {
557 string->reserve((uint) length * 2 + 2);
558 if ((result= string->append(&quote_char, 1, system_charset_info)))
559 goto err;
560
561 for (name_end= name+length; name < name_end; name+= clen)
562 {
563 uchar c= *(uchar *) name;
564 clen= my_charlen_fix(system_charset_info, name, name_end);
565 if (clen == 1 && c == (uchar) quote_char &&
566 (result= string->append(&quote_char, 1, system_charset_info)))
567 goto err;
568 if ((result= string->append(name, clen, string->charset())))
569 goto err;
570 }
571 result= string->append(&quote_char, 1, system_charset_info);
572 }
573 else
574 result= string->append(name, (uint) length, system_charset_info);
575
576err:
577 DBUG_RETURN(result);
578}
579
580
581static int parse_url_error(FEDERATED_SHARE *share, TABLE *table, int error_num)
582{
583 char buf[FEDERATED_QUERY_BUFFER_SIZE];
584 size_t buf_len;
585 DBUG_ENTER("ha_federated parse_url_error");
586
587 buf_len= MY_MIN(table->s->connect_string.length,
588 FEDERATED_QUERY_BUFFER_SIZE-1);
589 strmake(buf, table->s->connect_string.str, buf_len);
590 my_error(error_num, MYF(0), buf, 14);
591 DBUG_RETURN(error_num);
592}
593
594/*
595 retrieve server object which contains server meta-data
596 from the system table given a server's name, set share
597 connection parameter members
598*/
599int get_connection(MEM_ROOT *mem_root, FEDERATED_SHARE *share)
600{
601 int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
602 FOREIGN_SERVER *server, server_buffer;
603 DBUG_ENTER("ha_federated::get_connection");
604
605 /*
606 get_server_by_name() clones the server if exists and allocates
607 copies of strings in the supplied mem_root
608 */
609 if (!(server=
610 get_server_by_name(mem_root, share->connection_string, &server_buffer)))
611 {
612 DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
613 /* need to come up with error handling */
614 error_num=1;
615 goto error;
616 }
617 DBUG_PRINT("info", ("get_server_by_name returned server at %p",
618 server));
619
620 /*
621 Most of these should never be empty strings, error handling will
622 need to be implemented. Also, is this the best way to set the share
623 members? Is there some allocation needed? In running this code, it works
624 except there are errors in the trace file of the share being overrun
625 at the address of the share.
626 */
627 share->server_name_length= server->server_name_length;
628 share->server_name= const_cast<char*>(server->server_name);
629 share->username= const_cast<char*>(server->username);
630 share->password= const_cast<char*>(server->password);
631 share->database= const_cast<char*>(server->db);
632 share->port= server->port > MIN_PORT && server->port < 65536 ?
633 (ushort) server->port : MYSQL_PORT;
634 share->hostname= const_cast<char*>(server->host);
635 if (!(share->socket= const_cast<char*>(server->socket)) &&
636 !strcmp(share->hostname, my_localhost))
637 share->socket= (char *) MYSQL_UNIX_ADDR;
638 share->scheme= const_cast<char*>(server->scheme);
639
640 DBUG_PRINT("info", ("share->username %s", share->username));
641 DBUG_PRINT("info", ("share->password %s", share->password));
642 DBUG_PRINT("info", ("share->hostname %s", share->hostname));
643 DBUG_PRINT("info", ("share->database %s", share->database));
644 DBUG_PRINT("info", ("share->port %d", share->port));
645 DBUG_PRINT("info", ("share->socket %s", share->socket));
646 DBUG_RETURN(0);
647
648error:
649 my_printf_error(error_num, "server name: '%s' doesn't exist!",
650 MYF(0), share->connection_string);
651 DBUG_RETURN(error_num);
652}
653
654/*
655 Parse connection info from table->s->connect_string
656
657 SYNOPSIS
658 parse_url()
659 mem_root MEM_ROOT pointer for memory allocation
660 share pointer to FEDERATED share
661 table pointer to current TABLE class
662 table_create_flag determines what error to throw
663
664 DESCRIPTION
665 Populates the share with information about the connection
666 to the foreign database that will serve as the data source.
667 This string must be specified (currently) in the "CONNECTION" field,
668 listed in the CREATE TABLE statement.
669
670 This string MUST be in the format of any of these:
671
672 CONNECTION="scheme://username:password@hostname:port/database/table"
673 CONNECTION="scheme://username@hostname/database/table"
674 CONNECTION="scheme://username@hostname:port/database/table"
675 CONNECTION="scheme://username:password@hostname/database/table"
676
677 _OR_
678
679 CONNECTION="connection name"
680
681
682
683 An Example:
684
685 CREATE TABLE t1 (id int(32))
686 ENGINE="FEDERATED"
687 CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federated/testtable";
688
689 CREATE TABLE t2 (
690 id int(4) NOT NULL auto_increment,
691 name varchar(32) NOT NULL,
692 PRIMARY KEY(id)
693 ) ENGINE="FEDERATED" CONNECTION="my_conn";
694
695 ***IMPORTANT***
696 Currently, the Federated Storage Engine only supports connecting to another
697 MySQL Database ("scheme" of "mysql"). Connections using JDBC as well as
698 other connectors are in the planning stage.
699
700
701 'password' and 'port' are both optional.
702
703 RETURN VALUE
704 0 success
705 error_num particular error code
706
707*/
708
709static int parse_url(MEM_ROOT *mem_root, FEDERATED_SHARE *share, TABLE *table,
710 uint table_create_flag)
711{
712 uint error_num= (table_create_flag ?
713 ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
714 ER_FOREIGN_DATA_STRING_INVALID);
715 DBUG_ENTER("ha_federated::parse_url");
716
717 share->port= 0;
718 share->socket= 0;
719 DBUG_PRINT("info", ("share at %p", share));
720 DBUG_PRINT("info", ("Length: %u", (uint) table->s->connect_string.length));
721 DBUG_PRINT("info", ("String: '%.*s'", (int) table->s->connect_string.length,
722 table->s->connect_string.str));
723 share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
724 table->s->connect_string.length);
725
726 DBUG_PRINT("info",("parse_url alloced share->connection_string %p",
727 share->connection_string));
728
729 DBUG_PRINT("info",("share->connection_string %s",share->connection_string));
730 /*
731 No :// or @ in connection string. Must be a straight connection name of
732 either "servername" or "servername/tablename"
733 */
734 if ( (!strstr(share->connection_string, "://") &&
735 (!strchr(share->connection_string, '@'))))
736 {
737
738 DBUG_PRINT("info",
739 ("share->connection_string %s internal format \
740 share->connection_string %p",
741 share->connection_string,
742 share->connection_string));
743
744 /* ok, so we do a little parsing, but not completely! */
745 share->parsed= FALSE;
746 /*
747 If there is a single '/' in the connection string, this means the user is
748 specifying a table name
749 */
750
751 if ((share->table_name= strchr(share->connection_string, '/')))
752 {
753 share->connection_string[share->table_name - share->connection_string]= '\0';
754 share->table_name++;
755 share->table_name_length= (uint) strlen(share->table_name);
756
757 DBUG_PRINT("info",
758 ("internal format, parsed table_name share->connection_string \
759 %s share->table_name %s",
760 share->connection_string, share->table_name));
761
762 /*
763 there better not be any more '/'s !
764 */
765 if (strchr(share->table_name, '/'))
766 goto error;
767
768 }
769 /*
770 otherwise, straight server name, use tablename of federated table
771 as remote table name
772 */
773 else
774 {
775 /*
776 connection specifies everything but, resort to
777 expecting remote and foreign table names to match
778 */
779 share->table_name= strmake_root(mem_root, table->s->table_name.str,
780 (share->table_name_length= table->s->table_name.length));
781 DBUG_PRINT("info",
782 ("internal format, default table_name share->connection_string \
783 %s share->table_name %s",
784 share->connection_string, share->table_name));
785 }
786
787 if ((error_num= get_connection(mem_root, share)))
788 goto error;
789 }
790 else
791 {
792 share->parsed= TRUE;
793 // Add a null for later termination of table name
794 share->connection_string[table->s->connect_string.length]= 0;
795 share->scheme= share->connection_string;
796 DBUG_PRINT("info",("parse_url alloced share->scheme %p",
797 share->scheme));
798
799 /*
800 remove addition of null terminator and store length
801 for each string in share
802 */
803 if (!(share->username= strstr(share->scheme, "://")))
804 goto error;
805 share->scheme[share->username - share->scheme]= '\0';
806
807 if (strcmp(share->scheme, "mysql") != 0)
808 goto error;
809
810 share->username+= 3;
811
812 if (!(share->hostname= strchr(share->username, '@')))
813 goto error;
814
815 share->username[share->hostname - share->username]= '\0';
816 share->hostname++;
817
818 if ((share->password= strchr(share->username, ':')))
819 {
820 share->username[share->password - share->username]= '\0';
821 share->password++;
822 share->username= share->username;
823 /* make sure there isn't an extra / or @ */
824 if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
825 goto error;
826 /*
827 Found that if the string is:
828 user:@hostname:port/db/table
829 Then password is a null string, so set to NULL
830 */
831 if (share->password[0] == '\0')
832 share->password= NULL;
833 }
834 else
835 share->username= share->username;
836
837 /* make sure there isn't an extra / or @ */
838 if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
839 goto error;
840
841 if (!(share->database= strchr(share->hostname, '/')))
842 goto error;
843 share->hostname[share->database - share->hostname]= '\0';
844 share->database++;
845
846 if ((share->sport= strchr(share->hostname, ':')))
847 {
848 share->hostname[share->sport - share->hostname]= '\0';
849 share->sport++;
850 if (share->sport[0] == '\0')
851 share->sport= NULL;
852 else
853 share->port= atoi(share->sport);
854 }
855
856 if (!(share->table_name= strchr(share->database, '/')))
857 goto error;
858 share->database[share->table_name - share->database]= '\0';
859 share->table_name++;
860
861 share->table_name_length= strlen(share->table_name);
862
863 /* make sure there's not an extra / */
864 if ((strchr(share->table_name, '/')))
865 goto error;
866
867 /*
868 If hostname is omitted, we set it to NULL. According to
869 mysql_real_connect() manual:
870 The value of host may be either a hostname or an IP address.
871 If host is NULL or the string "localhost", a connection to the
872 local host is assumed.
873 */
874 if (share->hostname[0] == '\0')
875 share->hostname= NULL;
876 }
877
878 if (!share->port)
879 {
880 if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
881 share->socket= (char*) MYSQL_UNIX_ADDR;
882 else
883 share->port= MYSQL_PORT;
884 }
885
886 DBUG_PRINT("info",
887 ("scheme: %s username: %s password: %s \
888 hostname: %s port: %d db: %s tablename: %s",
889 share->scheme, share->username, share->password,
890 share->hostname, share->port, share->database,
891 share->table_name));
892
893 DBUG_RETURN(0);
894
895error:
896 DBUG_RETURN(parse_url_error(share, table, error_num));
897}
898
899/*****************************************************************************
900** FEDERATED tables
901*****************************************************************************/
902
903ha_federated::ha_federated(handlerton *hton,
904 TABLE_SHARE *table_arg)
905 :handler(hton, table_arg),
906 mysql(0), stored_result(0)
907{
908 trx_next= 0;
909 bzero(&bulk_insert, sizeof(bulk_insert));
910}
911
912
913/*
914 Convert MySQL result set row to handler internal format
915
916 SYNOPSIS
917 convert_row_to_internal_format()
918 record Byte pointer to record
919 row MySQL result set row from fetchrow()
920 result Result set to use
921
922 DESCRIPTION
923 This method simply iterates through a row returned via fetchrow with
924 values from a successful SELECT , and then stores each column's value
925 in the field object via the field object pointer (pointing to the table's
926 array of field object pointers). This is how the handler needs the data
927 to be stored to then return results back to the user
928
929 RETURN VALUE
930 0 After fields have had field values stored from record
931*/
932
933uint ha_federated::convert_row_to_internal_format(uchar *record,
934 MYSQL_ROW row,
935 MYSQL_RES *result)
936{
937 ulong *lengths;
938 Field **field;
939 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
940 DBUG_ENTER("ha_federated::convert_row_to_internal_format");
941
942 lengths= mysql_fetch_lengths(result);
943
944 for (field= table->field; *field; field++, row++, lengths++)
945 {
946 /*
947 index variable to move us through the row at the
948 same iterative step as the field
949 */
950 my_ptrdiff_t old_ptr;
951 old_ptr= (my_ptrdiff_t) (record - table->record[0]);
952 (*field)->move_field_offset(old_ptr);
953 if (!*row)
954 {
955 (*field)->set_null();
956 (*field)->reset();
957 }
958 else
959 {
960 if (bitmap_is_set(table->read_set, (*field)->field_index))
961 {
962 (*field)->set_notnull();
963 (*field)->store(*row, *lengths, &my_charset_bin);
964 }
965 }
966 (*field)->move_field_offset(-old_ptr);
967 }
968 dbug_tmp_restore_column_map(table->write_set, old_map);
969 DBUG_RETURN(0);
970}
971
972static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
973{
974 DBUG_ENTER("emit_key_part_name");
975 if (append_ident(to, part->field->field_name.str,
976 part->field->field_name.length, ident_quote_char))
977 DBUG_RETURN(1); // Out of memory
978 DBUG_RETURN(0);
979}
980
981static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
982 bool needs_quotes, bool is_like,
983 const uchar *ptr, uint len)
984{
985 Field *field= part->field;
986 DBUG_ENTER("emit_key_part_element");
987
988 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
989 DBUG_RETURN(1);
990
991 if (part->type == HA_KEYTYPE_BIT)
992 {
993 char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
994
995 *buf++= '0';
996 *buf++= 'x';
997 buf= octet2hex(buf, (char*) ptr, len);
998 if (to->append((char*) buff, (uint)(buf - buff)))
999 DBUG_RETURN(1);
1000 }
1001 else if (part->key_part_flag & HA_BLOB_PART)
1002 {
1003 String blob;
1004 uint blob_length= uint2korr(ptr);
1005 blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1006 blob_length, &my_charset_bin);
1007 if (to->append_for_single_quote(&blob))
1008 DBUG_RETURN(1);
1009 }
1010 else if (part->key_part_flag & HA_VAR_LENGTH_PART)
1011 {
1012 String varchar;
1013 uint var_length= uint2korr(ptr);
1014 varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
1015 var_length, &my_charset_bin);
1016 if (to->append_for_single_quote(&varchar))
1017 DBUG_RETURN(1);
1018 }
1019 else
1020 {
1021 char strbuff[MAX_FIELD_WIDTH];
1022 String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
1023
1024 res= field->val_str(&str, ptr);
1025
1026 if (field->result_type() == STRING_RESULT)
1027 {
1028 if (to->append_for_single_quote(res))
1029 DBUG_RETURN(1);
1030 }
1031 else if (to->append(res->ptr(), res->length()))
1032 DBUG_RETURN(1);
1033 }
1034
1035 if (is_like && to->append(STRING_WITH_LEN("%")))
1036 DBUG_RETURN(1);
1037
1038 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
1039 DBUG_RETURN(1);
1040
1041 DBUG_RETURN(0);
1042}
1043
1044/*
1045 Create a WHERE clause based off of values in keys
1046 Note: This code was inspired by key_copy from key.cc
1047
1048 SYNOPSIS
1049 create_where_from_key ()
1050 to String object to store WHERE clause
1051 key_info KEY struct pointer
1052 key byte pointer containing key
1053 key_length length of key
1054 range_type 0 - no range, 1 - min range, 2 - max range
1055 (see enum range_operation)
1056
1057 DESCRIPTION
1058 Using iteration through all the keys via a KEY_PART_INFO pointer,
1059 This method 'extracts' the value of each key in the byte pointer
1060 *key, and for each key found, constructs an appropriate WHERE clause
1061
1062 RETURN VALUE
1063 0 After all keys have been accounted for to create the WHERE clause
1064 1 No keys found
1065
1066 Range flags Table per Timour:
1067
1068 -----------------
1069 - start_key:
1070 * ">" -> HA_READ_AFTER_KEY
1071 * ">=" -> HA_READ_KEY_OR_NEXT
1072 * "=" -> HA_READ_KEY_EXACT
1073
1074 - end_key:
1075 * "<" -> HA_READ_BEFORE_KEY
1076 * "<=" -> HA_READ_AFTER_KEY
1077
1078 records_in_range:
1079 -----------------
1080 - start_key:
1081 * ">" -> HA_READ_AFTER_KEY
1082 * ">=" -> HA_READ_KEY_EXACT
1083 * "=" -> HA_READ_KEY_EXACT
1084
1085 - end_key:
1086 * "<" -> HA_READ_BEFORE_KEY
1087 * "<=" -> HA_READ_AFTER_KEY
1088 * "=" -> HA_READ_AFTER_KEY
1089
10900 HA_READ_KEY_EXACT, Find first record else error
10911 HA_READ_KEY_OR_NEXT, Record or next record
10922 HA_READ_KEY_OR_PREV, Record or previous
10933 HA_READ_AFTER_KEY, Find next rec. after key-record
10944 HA_READ_BEFORE_KEY, Find next rec. before key-record
10955 HA_READ_PREFIX, Key which as same prefix
10966 HA_READ_PREFIX_LAST, Last key with the same prefix
10977 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix
1098
1099Flags that I've found:
1100
1101id, primary key, varchar
1102
1103id = 'ccccc'
1104records_in_range: start_key 0 end_key 3
1105read_range_first: start_key 0 end_key NULL
1106
1107id > 'ccccc'
1108records_in_range: start_key 3 end_key NULL
1109read_range_first: start_key 3 end_key NULL
1110
1111id < 'ccccc'
1112records_in_range: start_key NULL end_key 4
1113read_range_first: start_key NULL end_key 4
1114
1115id <= 'ccccc'
1116records_in_range: start_key NULL end_key 3
1117read_range_first: start_key NULL end_key 3
1118
1119id >= 'ccccc'
1120records_in_range: start_key 0 end_key NULL
1121read_range_first: start_key 1 end_key NULL
1122
1123id like 'cc%cc'
1124records_in_range: start_key 0 end_key 3
1125read_range_first: start_key 1 end_key 3
1126
1127id > 'aaaaa' and id < 'ccccc'
1128records_in_range: start_key 3 end_key 4
1129read_range_first: start_key 3 end_key 4
1130
1131id >= 'aaaaa' and id < 'ccccc';
1132records_in_range: start_key 0 end_key 4
1133read_range_first: start_key 1 end_key 4
1134
1135id >= 'aaaaa' and id <= 'ccccc';
1136records_in_range: start_key 0 end_key 3
1137read_range_first: start_key 1 end_key 3
1138
1139id > 'aaaaa' and id <= 'ccccc';
1140records_in_range: start_key 3 end_key 3
1141read_range_first: start_key 3 end_key 3
1142
1143numeric keys:
1144
1145id = 4
1146index_read_idx: start_key 0 end_key NULL
1147
1148id > 4
1149records_in_range: start_key 3 end_key NULL
1150read_range_first: start_key 3 end_key NULL
1151
1152id >= 4
1153records_in_range: start_key 0 end_key NULL
1154read_range_first: start_key 1 end_key NULL
1155
1156id < 4
1157records_in_range: start_key NULL end_key 4
1158read_range_first: start_key NULL end_key 4
1159
1160id <= 4
1161records_in_range: start_key NULL end_key 3
1162read_range_first: start_key NULL end_key 3
1163
1164id like 4
1165full table scan, select * from
1166
1167id > 2 and id < 8
1168records_in_range: start_key 3 end_key 4
1169read_range_first: start_key 3 end_key 4
1170
1171id >= 2 and id < 8
1172records_in_range: start_key 0 end_key 4
1173read_range_first: start_key 1 end_key 4
1174
1175id >= 2 and id <= 8
1176records_in_range: start_key 0 end_key 3
1177read_range_first: start_key 1 end_key 3
1178
1179id > 2 and id <= 8
1180records_in_range: start_key 3 end_key 3
1181read_range_first: start_key 3 end_key 3
1182
1183multi keys (id int, name varchar, other varchar)
1184
1185id = 1;
1186records_in_range: start_key 0 end_key 3
1187read_range_first: start_key 0 end_key NULL
1188
1189id > 4;
1190id > 2 and name = '333'; remote: id > 2
1191id > 2 and name > '333'; remote: id > 2
1192id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
1193id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
1194id >= 4 and name = 'eric was here' and other > 'eeee';
1195records_in_range: start_key 3 end_key NULL
1196read_range_first: start_key 3 end_key NULL
1197
1198id >= 4;
1199id >= 2 and name = '333' and other < 'ddd';
1200remote: `id` >= 2 AND `name` >= '333';
1201records_in_range: start_key 0 end_key NULL
1202read_range_first: start_key 1 end_key NULL
1203
1204id < 4;
1205id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
1206records_in_range: start_key NULL end_key 4
1207read_range_first: start_key NULL end_key 4
1208
1209id <= 4;
1210records_in_range: start_key NULL end_key 3
1211read_range_first: start_key NULL end_key 3
1212
1213id like 4;
1214full table scan
1215
1216id > 2 and id < 4;
1217records_in_range: start_key 3 end_key 4
1218read_range_first: start_key 3 end_key 4
1219
1220id >= 2 and id < 4;
1221records_in_range: start_key 0 end_key 4
1222read_range_first: start_key 1 end_key 4
1223
1224id >= 2 and id <= 4;
1225records_in_range: start_key 0 end_key 3
1226read_range_first: start_key 1 end_key 3
1227
1228id > 2 and id <= 4;
1229id = 6 and name = 'eric was here' and other > 'eeee';
1230remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee')
1231AND (`id` <= 6) AND ( AND `name` <= 'eric was here')
1232no results
1233records_in_range: start_key 3 end_key 3
1234read_range_first: start_key 3 end_key 3
1235
1236Summary:
1237
1238* If the start key flag is 0 the max key flag shouldn't even be set,
1239 and if it is, the query produced would be invalid.
1240* Multipart keys, even if containing some or all numeric columns,
1241 are treated the same as non-numeric keys
1242
1243 If the query is " = " (quotes or not):
1244 - records in range start key flag HA_READ_KEY_EXACT,
1245 end key flag HA_READ_AFTER_KEY (incorrect)
1246 - any other: start key flag HA_READ_KEY_OR_NEXT,
1247 end key flag HA_READ_AFTER_KEY (correct)
1248
1249* 'like' queries (of key)
1250 - Numeric, full table scan
1251 - Non-numeric
1252 records_in_range: start_key 0 end_key 3
1253 other : start_key 1 end_key 3
1254
1255* If the key flag is HA_READ_AFTER_KEY:
1256 if start_key, append >
1257 if end_key, append <=
1258
1259* If create_where_key was called by records_in_range:
1260
1261 - if the key is numeric:
1262 start key flag is 0 when end key is NULL, end key flag is 3 or 4
1263 - if create_where_key was called by any other function:
1264 start key flag is 1 when end key is NULL, end key flag is 3 or 4
1265 - if the key is non-numeric, or multipart
1266 When the query is an exact match, the start key flag is 0,
1267 end key flag is 3 for what should be a no-range condition where
1268 you should have 0 and max key NULL, which it is if called by
1269 read_range_first
1270
1271Conclusion:
1272
12731. Need logic to determin if a key is min or max when the flag is
1274HA_READ_AFTER_KEY, and handle appending correct operator accordingly
1275
12762. Need a boolean flag to pass to create_where_from_key, used in the
1277switch statement. Add 1 to the flag if:
1278 - start key flag is HA_READ_KEY_EXACT and the end key is NULL
1279
1280*/
1281
1282bool ha_federated::create_where_from_key(String *to,
1283 KEY *key_info,
1284 const key_range *start_key,
1285 const key_range *end_key,
1286 bool from_records_in_range,
1287 bool eq_range_arg)
1288{
1289 bool both_not_null=
1290 (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1291 const uchar *ptr;
1292 uint remainder, length;
1293 char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
1294 String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
1295 const key_range *ranges[2]= { start_key, end_key };
1296 my_bitmap_map *old_map;
1297 DBUG_ENTER("ha_federated::create_where_from_key");
1298
1299 tmp.length(0);
1300 if (start_key == NULL && end_key == NULL)
1301 DBUG_RETURN(1);
1302
1303 old_map= dbug_tmp_use_all_columns(table, table->write_set);
1304 for (uint i= 0; i <= 1; i++)
1305 {
1306 bool needs_quotes;
1307 KEY_PART_INFO *key_part;
1308 if (ranges[i] == NULL)
1309 continue;
1310
1311 if (both_not_null)
1312 {
1313 if (i > 0)
1314 tmp.append(STRING_WITH_LEN(") AND ("));
1315 else
1316 tmp.append(STRING_WITH_LEN(" ("));
1317 }
1318
1319 for (key_part= key_info->key_part,
1320 remainder= key_info->user_defined_key_parts,
1321 length= ranges[i]->length,
1322 ptr= ranges[i]->key; ;
1323 remainder--,
1324 key_part++)
1325 {
1326 Field *field= key_part->field;
1327 uint store_length= key_part->store_length;
1328 uint part_length= MY_MIN(store_length, length);
1329 needs_quotes= field->str_needs_quotes();
1330 DBUG_DUMP("key, start of loop", ptr, length);
1331
1332 if (key_part->null_bit)
1333 {
1334 if (*ptr++)
1335 {
1336 /*
1337 We got "IS [NOT] NULL" condition against nullable column. We
1338 distinguish between "IS NOT NULL" and "IS NULL" by flag. For
1339 "IS NULL", flag is set to HA_READ_KEY_EXACT.
1340 */
1341 if (emit_key_part_name(&tmp, key_part) ||
1342 (ranges[i]->flag == HA_READ_KEY_EXACT ?
1343 tmp.append(STRING_WITH_LEN(" IS NULL ")) :
1344 tmp.append(STRING_WITH_LEN(" IS NOT NULL "))))
1345 goto err;
1346 /*
1347 We need to adjust pointer and length to be prepared for next
1348 key part. As well as check if this was last key part.
1349 */
1350 goto prepare_for_next_key_part;
1351 }
1352 }
1353
1354 if (tmp.append(STRING_WITH_LEN(" (")))
1355 goto err;
1356
1357 switch (ranges[i]->flag) {
1358 case HA_READ_KEY_EXACT:
1359 DBUG_PRINT("info", ("federated HA_READ_KEY_EXACT %d", i));
1360 if (store_length >= length ||
1361 !needs_quotes ||
1362 key_part->type == HA_KEYTYPE_BIT ||
1363 field->result_type() != STRING_RESULT)
1364 {
1365 if (emit_key_part_name(&tmp, key_part))
1366 goto err;
1367
1368 if (from_records_in_range)
1369 {
1370 if (tmp.append(STRING_WITH_LEN(" >= ")))
1371 goto err;
1372 }
1373 else
1374 {
1375 if (tmp.append(STRING_WITH_LEN(" = ")))
1376 goto err;
1377 }
1378
1379 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1380 part_length))
1381 goto err;
1382 }
1383 else
1384 {
1385 /* LIKE */
1386 if (emit_key_part_name(&tmp, key_part) ||
1387 tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1388 emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1389 part_length))
1390 goto err;
1391 }
1392 break;
1393 case HA_READ_AFTER_KEY:
1394 if (eq_range_arg)
1395 {
1396 if (tmp.append("1=1")) // Dummy
1397 goto err;
1398 break;
1399 }
1400 DBUG_PRINT("info", ("federated HA_READ_AFTER_KEY %d", i));
1401 if ((store_length >= length) || (i > 0)) /* for all parts of end key*/
1402 {
1403 if (emit_key_part_name(&tmp, key_part))
1404 goto err;
1405
1406 if (i > 0) /* end key */
1407 {
1408 if (tmp.append(STRING_WITH_LEN(" <= ")))
1409 goto err;
1410 }
1411 else /* start key */
1412 {
1413 if (tmp.append(STRING_WITH_LEN(" > ")))
1414 goto err;
1415 }
1416
1417 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1418 part_length))
1419 {
1420 goto err;
1421 }
1422 break;
1423 }
1424 /* fall through */
1425 case HA_READ_KEY_OR_NEXT:
1426 DBUG_PRINT("info", ("federated HA_READ_KEY_OR_NEXT %d", i));
1427 if (emit_key_part_name(&tmp, key_part) ||
1428 tmp.append(STRING_WITH_LEN(" >= ")) ||
1429 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1430 part_length))
1431 goto err;
1432 break;
1433 case HA_READ_BEFORE_KEY:
1434 DBUG_PRINT("info", ("federated HA_READ_BEFORE_KEY %d", i));
1435 if (store_length >= length)
1436 {
1437 if (emit_key_part_name(&tmp, key_part) ||
1438 tmp.append(STRING_WITH_LEN(" < ")) ||
1439 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1440 part_length))
1441 goto err;
1442 break;
1443 }
1444 /* fall through */
1445 case HA_READ_KEY_OR_PREV:
1446 DBUG_PRINT("info", ("federated HA_READ_KEY_OR_PREV %d", i));
1447 if (emit_key_part_name(&tmp, key_part) ||
1448 tmp.append(STRING_WITH_LEN(" <= ")) ||
1449 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1450 part_length))
1451 goto err;
1452 break;
1453 default:
1454 DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1455 goto err;
1456 }
1457 if (tmp.append(STRING_WITH_LEN(") ")))
1458 goto err;
1459
1460prepare_for_next_key_part:
1461 if (store_length >= length)
1462 break;
1463 DBUG_PRINT("info", ("remainder %d", remainder));
1464 DBUG_ASSERT(remainder > 1);
1465 length-= store_length;
1466 /*
1467 For nullable columns, null-byte is already skipped before, that is
1468 ptr was incremented by 1. Since store_length still counts null-byte,
1469 we need to subtract 1 from store_length.
1470 */
1471 ptr+= store_length - MY_TEST(key_part->null_bit);
1472 if (tmp.append(STRING_WITH_LEN(" AND ")))
1473 goto err;
1474
1475 DBUG_PRINT("info",
1476 ("create_where_from_key WHERE clause: %s",
1477 tmp.c_ptr_quick()));
1478 }
1479 }
1480 dbug_tmp_restore_column_map(table->write_set, old_map);
1481
1482 if (both_not_null)
1483 if (tmp.append(STRING_WITH_LEN(") ")))
1484 DBUG_RETURN(1);
1485
1486 if (to->append(STRING_WITH_LEN(" WHERE ")))
1487 DBUG_RETURN(1);
1488
1489 if (to->append(tmp))
1490 DBUG_RETURN(1);
1491
1492 DBUG_RETURN(0);
1493
1494err:
1495 dbug_tmp_restore_column_map(table->write_set, old_map);
1496 DBUG_RETURN(1);
1497}
1498
1499/*
1500 Example of simple lock controls. The "share" it creates is structure we will
1501 pass to each federated handler. Do you have to have one of these? Well, you
1502 have pieces that are used for locking, and they are needed to function.
1503*/
1504
1505static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table)
1506{
1507 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1508 Field **field;
1509 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
1510 FEDERATED_SHARE *share= NULL, tmp_share;
1511 MEM_ROOT mem_root;
1512 DBUG_ENTER("ha_federated.cc::get_share");
1513
1514 /*
1515 In order to use this string, we must first zero it's length,
1516 or it will contain garbage
1517 */
1518 query.length(0);
1519
1520 init_alloc_root(&mem_root, "federated_share", 256, 0, MYF(0));
1521
1522 mysql_mutex_lock(&federated_mutex);
1523
1524 tmp_share.share_key= table_name;
1525 tmp_share.share_key_length= (uint) strlen(table_name);
1526 if (parse_url(&mem_root, &tmp_share, table, 0))
1527 goto error;
1528
1529 /* TODO: change tmp_share.scheme to LEX_STRING object */
1530 if (!(share= (FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
1531 (uchar*) tmp_share.share_key,
1532 tmp_share.
1533 share_key_length)))
1534 {
1535 query.set_charset(system_charset_info);
1536 query.append(STRING_WITH_LEN("SELECT "));
1537 for (field= table->field; *field; field++)
1538 {
1539 append_ident(&query, (*field)->field_name.str,
1540 (*field)->field_name.length, ident_quote_char);
1541 query.append(STRING_WITH_LEN(", "));
1542 }
1543 /* chops off trailing comma */
1544 query.length(query.length() - sizeof_trailing_comma);
1545
1546 query.append(STRING_WITH_LEN(" FROM "));
1547
1548 append_ident(&query, tmp_share.table_name,
1549 tmp_share.table_name_length, ident_quote_char);
1550
1551 if (!(share= (FEDERATED_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1552 !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length())))
1553 goto error;
1554
1555 share->use_count= 0;
1556 share->mem_root= mem_root;
1557
1558 DBUG_PRINT("info",
1559 ("share->select_query %s", share->select_query));
1560
1561 if (my_hash_insert(&federated_open_tables, (uchar*) share))
1562 goto error;
1563 thr_lock_init(&share->lock);
1564 mysql_mutex_init(fe_key_mutex_FEDERATED_SHARE_mutex,
1565 &share->mutex, MY_MUTEX_INIT_FAST);
1566 }
1567 else
1568 free_root(&mem_root, MYF(0)); /* prevents memory leak */
1569
1570 share->use_count++;
1571 mysql_mutex_unlock(&federated_mutex);
1572
1573 DBUG_RETURN(share);
1574
1575error:
1576 mysql_mutex_unlock(&federated_mutex);
1577 free_root(&mem_root, MYF(0));
1578 DBUG_RETURN(NULL);
1579}
1580
1581
1582/*
1583 Free lock controls. We call this whenever we close a table.
1584 If the table had the last reference to the share then we
1585 free memory associated with it.
1586*/
1587
1588static int free_share(FEDERATED_SHARE *share)
1589{
1590 MEM_ROOT mem_root= share->mem_root;
1591 DBUG_ENTER("free_share");
1592
1593 mysql_mutex_lock(&federated_mutex);
1594 if (!--share->use_count)
1595 {
1596 my_hash_delete(&federated_open_tables, (uchar*) share);
1597 thr_lock_delete(&share->lock);
1598 mysql_mutex_destroy(&share->mutex);
1599 free_root(&mem_root, MYF(0));
1600 }
1601 mysql_mutex_unlock(&federated_mutex);
1602
1603 DBUG_RETURN(0);
1604}
1605
1606
1607ha_rows ha_federated::records_in_range(uint inx, key_range *start_key,
1608 key_range *end_key)
1609{
1610 /*
1611
1612 We really want indexes to be used as often as possible, therefore
1613 we just need to hard-code the return value to a very low number to
1614 force the issue
1615
1616*/
1617 DBUG_ENTER("ha_federated::records_in_range");
1618 DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
1619}
1620
1621
1622/*
1623 Used for opening tables. The name will be the name of the file.
1624 A table is opened when it needs to be opened. For instance
1625 when a request comes in for a select on the table (tables are not
1626 open and closed for each request, they are cached).
1627
1628 Called from handler.cc by handler::ha_open(). The server opens
1629 all tables by calling ha_open() which then calls the handler
1630 specific open().
1631*/
1632
1633int ha_federated::open(const char *name, int mode, uint test_if_locked)
1634{
1635 DBUG_ENTER("ha_federated::open");
1636
1637 if (!(share= get_share(name, table)))
1638 DBUG_RETURN(1);
1639 thr_lock_data_init(&share->lock, &lock, NULL);
1640
1641 DBUG_ASSERT(mysql == NULL);
1642
1643 ref_length= sizeof(MYSQL_RES *) + sizeof(MYSQL_ROW_OFFSET);
1644 DBUG_PRINT("info", ("ref_length: %u", ref_length));
1645
1646 my_init_dynamic_array(&results, sizeof(MYSQL_RES *), 4, 4, MYF(0));
1647 reset();
1648
1649 DBUG_RETURN(0);
1650}
1651
1652class Net_error_handler : public Internal_error_handler
1653{
1654public:
1655 Net_error_handler() {}
1656
1657public:
1658 bool handle_condition(THD *thd, uint sql_errno, const char* sqlstate,
1659 Sql_condition::enum_warning_level *level,
1660 const char* msg, Sql_condition ** cond_hdl)
1661 {
1662 return sql_errno >= ER_ABORTING_CONNECTION &&
1663 sql_errno <= ER_NET_WRITE_INTERRUPTED;
1664 }
1665};
1666
1667/*
1668 Closes a table. We call the free_share() function to free any resources
1669 that we have allocated in the "shared" structure.
1670
1671 Called from sql_base.cc, sql_select.cc, and table.cc.
1672 In sql_select.cc it is only used to close up temporary tables or during
1673 the process where a temporary table is converted over to being a
1674 myisam table.
1675 For sql_base.cc look at close_data_tables().
1676*/
1677
1678int ha_federated::close(void)
1679{
1680 DBUG_ENTER("ha_federated::close");
1681
1682 free_result();
1683
1684 delete_dynamic(&results);
1685
1686 /* Disconnect from mysql */
1687 THD *thd= ha_thd();
1688 Net_error_handler err_handler;
1689 if (thd)
1690 thd->push_internal_handler(&err_handler);
1691 mysql_close(mysql);
1692 if (thd)
1693 thd->pop_internal_handler();
1694
1695 mysql= NULL;
1696
1697 DBUG_RETURN(free_share(share));
1698}
1699
1700/*
1701
1702 Checks if a field in a record is SQL NULL.
1703
1704 SYNOPSIS
1705 field_in_record_is_null()
1706 table TABLE pointer, MySQL table object
1707 field Field pointer, MySQL field object
1708 record char pointer, contains record
1709
1710 DESCRIPTION
1711 This method uses the record format information in table to track
1712 the null bit in record.
1713
1714 RETURN VALUE
1715 1 if NULL
1716 0 otherwise
1717*/
1718
1719static inline uint field_in_record_is_null(TABLE *table,
1720 Field *field,
1721 char *record)
1722{
1723 int null_offset;
1724 DBUG_ENTER("ha_federated::field_in_record_is_null");
1725
1726 if (!field->null_ptr)
1727 DBUG_RETURN(0);
1728
1729 null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
1730
1731 if (record[null_offset] & field->null_bit)
1732 DBUG_RETURN(1);
1733
1734 DBUG_RETURN(0);
1735}
1736
1737
1738/**
1739 @brief Construct the INSERT statement.
1740
1741 @details This method will construct the INSERT statement and appends it to
1742 the supplied query string buffer.
1743
1744 @return
1745 @retval FALSE No error
1746 @retval TRUE Failure
1747*/
1748
1749bool ha_federated::append_stmt_insert(String *query)
1750{
1751 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1752 Field **field;
1753 uint tmp_length;
1754 bool added_field= FALSE;
1755
1756 /* The main insert query string */
1757 String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1758 DBUG_ENTER("ha_federated::append_stmt_insert");
1759
1760 insert_string.length(0);
1761
1762 if (replace_duplicates)
1763 insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
1764 else if (ignore_duplicates && !insert_dup_update)
1765 insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
1766 else
1767 insert_string.append(STRING_WITH_LEN("INSERT INTO "));
1768 append_ident(&insert_string, share->table_name, share->table_name_length,
1769 ident_quote_char);
1770 tmp_length= insert_string.length();
1771 insert_string.append(STRING_WITH_LEN(" ("));
1772
1773 /*
1774 loop through the field pointer array, add any fields to both the values
1775 list and the fields list that match the current query id
1776 */
1777 for (field= table->field; *field; field++)
1778 {
1779 if (bitmap_is_set(table->write_set, (*field)->field_index))
1780 {
1781 /* append the field name */
1782 append_ident(&insert_string, (*field)->field_name.str,
1783 (*field)->field_name.length, ident_quote_char);
1784
1785 /* append commas between both fields and fieldnames */
1786 /*
1787 unfortunately, we can't use the logic if *(fields + 1) to
1788 make the following appends conditional as we don't know if the
1789 next field is in the write set
1790 */
1791 insert_string.append(STRING_WITH_LEN(", "));
1792 added_field= TRUE;
1793 }
1794 }
1795
1796 if (added_field)
1797 {
1798 /* Remove trailing comma. */
1799 insert_string.length(insert_string.length() - sizeof_trailing_comma);
1800 insert_string.append(STRING_WITH_LEN(") "));
1801 }
1802 else
1803 {
1804 /* If there were no fields, we don't want to add a closing paren. */
1805 insert_string.length(tmp_length);
1806 }
1807
1808 insert_string.append(STRING_WITH_LEN(" VALUES "));
1809
1810 DBUG_RETURN(query->append(insert_string));
1811}
1812
1813
1814/*
1815 write_row() inserts a row. No extra() hint is given currently if a bulk load
1816 is happeneding. buf() is a byte array of data. You can use the field
1817 information to extract the data from the native byte array type.
1818 Example of this would be:
1819 for (Field **field=table->field ; *field ; field++)
1820 {
1821 ...
1822 }
1823
1824 Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
1825 sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
1826*/
1827
1828int ha_federated::write_row(uchar *buf)
1829{
1830 char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1831 char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1832 Field **field;
1833 uint tmp_length;
1834 int error= 0;
1835 bool use_bulk_insert;
1836 bool auto_increment_update_required= (table->next_number_field != NULL);
1837
1838 /* The string containing the values to be added to the insert */
1839 String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1840 /* The actual value of the field, to be added to the values_string */
1841 String insert_field_value_string(insert_field_value_buffer,
1842 sizeof(insert_field_value_buffer),
1843 &my_charset_bin);
1844 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1845 DBUG_ENTER("ha_federated::write_row");
1846
1847 values_string.length(0);
1848 insert_field_value_string.length(0);
1849
1850 /*
1851 start both our field and field values strings
1852 We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
1853 Ignore duplicates is always true when insert_dup_update is true.
1854 When replace_duplicates == TRUE, we can safely enable multi-row insert.
1855 When performing multi-row insert, we only collect the columns values for
1856 the row. The start of the statement is only created when the first
1857 row is copied in to the bulk_insert string.
1858 */
1859 if (!(use_bulk_insert= bulk_insert.str &&
1860 (!insert_dup_update || replace_duplicates)))
1861 append_stmt_insert(&values_string);
1862
1863 values_string.append(STRING_WITH_LEN(" ("));
1864 tmp_length= values_string.length();
1865
1866 /*
1867 loop through the field pointer array, add any fields to both the values
1868 list and the fields list that is part of the write set
1869 */
1870 for (field= table->field; *field; field++)
1871 {
1872 if (bitmap_is_set(table->write_set, (*field)->field_index))
1873 {
1874 if ((*field)->is_null())
1875 values_string.append(STRING_WITH_LEN(" NULL "));
1876 else
1877 {
1878 bool needs_quote= (*field)->str_needs_quotes();
1879 (*field)->val_str(&insert_field_value_string);
1880 if (needs_quote)
1881 values_string.append(value_quote_char);
1882 insert_field_value_string.print(&values_string);
1883 if (needs_quote)
1884 values_string.append(value_quote_char);
1885
1886 insert_field_value_string.length(0);
1887 }
1888
1889 /* append commas between both fields and fieldnames */
1890 /*
1891 unfortunately, we can't use the logic if *(fields + 1) to
1892 make the following appends conditional as we don't know if the
1893 next field is in the write set
1894 */
1895 values_string.append(STRING_WITH_LEN(", "));
1896 }
1897 }
1898 dbug_tmp_restore_column_map(table->read_set, old_map);
1899
1900 /*
1901 if there were no fields, we don't want to add a closing paren
1902 AND, we don't want to chop off the last char '('
1903 insert will be "INSERT INTO t1 VALUES ();"
1904 */
1905 if (values_string.length() > tmp_length)
1906 {
1907 /* chops off trailing comma */
1908 values_string.length(values_string.length() - sizeof_trailing_comma);
1909 }
1910 /* we always want to append this, even if there aren't any fields */
1911 values_string.append(STRING_WITH_LEN(") "));
1912
1913 if (use_bulk_insert)
1914 {
1915 /*
1916 Send the current bulk insert out if appending the current row would
1917 cause the statement to overflow the packet size, otherwise set
1918 auto_increment_update_required to FALSE as no query was executed.
1919 */
1920 if (bulk_insert.length + values_string.length() + bulk_padding >
1921 mysql->net.max_packet_size && bulk_insert.length)
1922 {
1923 error= real_query(bulk_insert.str, bulk_insert.length);
1924 bulk_insert.length= 0;
1925 }
1926 else
1927 auto_increment_update_required= FALSE;
1928
1929 if (bulk_insert.length == 0)
1930 {
1931 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1932 String insert_string(insert_buffer, sizeof(insert_buffer),
1933 &my_charset_bin);
1934 insert_string.length(0);
1935 append_stmt_insert(&insert_string);
1936 dynstr_append_mem(&bulk_insert, insert_string.ptr(),
1937 insert_string.length());
1938 }
1939 else
1940 dynstr_append_mem(&bulk_insert, ",", 1);
1941
1942 dynstr_append_mem(&bulk_insert, values_string.ptr(),
1943 values_string.length());
1944 }
1945 else
1946 {
1947 error= real_query(values_string.ptr(), values_string.length());
1948 }
1949
1950 if (error)
1951 {
1952 DBUG_RETURN(stash_remote_error());
1953 }
1954 /*
1955 If the table we've just written a record to contains an auto_increment
1956 field, then store the last_insert_id() value from the foreign server
1957 */
1958 if (auto_increment_update_required)
1959 {
1960 update_auto_increment();
1961
1962 /* mysql_insert() uses this for protocol return value */
1963 table->next_number_field->store(stats.auto_increment_value, 1);
1964 }
1965
1966 DBUG_RETURN(0);
1967}
1968
1969
1970/**
1971 @brief Prepares the storage engine for bulk inserts.
1972
1973 @param[in] rows estimated number of rows in bulk insert
1974 or 0 if unknown.
1975
1976 @details Initializes memory structures required for bulk insert.
1977*/
1978
1979void ha_federated::start_bulk_insert(ha_rows rows, uint flags)
1980{
1981 uint page_size;
1982 DBUG_ENTER("ha_federated::start_bulk_insert");
1983
1984 dynstr_free(&bulk_insert);
1985
1986 /**
1987 We don't bother with bulk-insert semantics when the estimated rows == 1
1988 The rows value will be 0 if the server does not know how many rows
1989 would be inserted. This can occur when performing INSERT...SELECT
1990 */
1991
1992 if (rows == 1)
1993 DBUG_VOID_RETURN;
1994
1995 /*
1996 Make sure we have an open connection so that we know the
1997 maximum packet size.
1998 */
1999 if (!mysql && real_connect())
2000 DBUG_VOID_RETURN;
2001
2002 page_size= (uint) my_getpagesize();
2003
2004 if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
2005 DBUG_VOID_RETURN;
2006
2007 bulk_insert.length= 0;
2008 DBUG_VOID_RETURN;
2009}
2010
2011
2012/**
2013 @brief End bulk insert.
2014
2015 @details This method will send any remaining rows to the remote server.
2016 Finally, it will deinitialize the bulk insert data structure.
2017
2018 @return Operation status
2019 @retval 0 No error
2020 @retval != 0 Error occurred at remote server. Also sets my_errno.
2021*/
2022
2023int ha_federated::end_bulk_insert()
2024{
2025 int error= 0;
2026 DBUG_ENTER("ha_federated::end_bulk_insert");
2027
2028 if (!table_will_be_deleted && bulk_insert.str && bulk_insert.length)
2029 {
2030 if (real_query(bulk_insert.str, bulk_insert.length))
2031 error= stash_remote_error();
2032 else
2033 if (table->next_number_field)
2034 update_auto_increment();
2035 }
2036
2037 dynstr_free(&bulk_insert);
2038
2039 DBUG_RETURN(my_errno= error);
2040}
2041
2042
2043/*
2044 ha_federated::update_auto_increment
2045
2046 This method ensures that last_insert_id() works properly. What it simply does
2047 is calls last_insert_id() on the foreign database immediately after insert
2048 (if the table has an auto_increment field) and sets the insert id via
2049 thd->insert_id(ID)).
2050*/
2051void ha_federated::update_auto_increment(void)
2052{
2053 THD *thd= current_thd;
2054 DBUG_ENTER("ha_federated::update_auto_increment");
2055
2056 ha_federated::info(HA_STATUS_AUTO);
2057 thd->first_successful_insert_id_in_cur_stmt=
2058 stats.auto_increment_value;
2059 DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
2060
2061 DBUG_VOID_RETURN;
2062}
2063
2064int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
2065{
2066 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2067 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2068 DBUG_ENTER("ha_federated::optimize");
2069
2070 query.length(0);
2071
2072 query.set_charset(system_charset_info);
2073 query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
2074 append_ident(&query, share->table_name, share->table_name_length,
2075 ident_quote_char);
2076
2077 if (real_query(query.ptr(), query.length()))
2078 {
2079 DBUG_RETURN(stash_remote_error());
2080 }
2081
2082 DBUG_RETURN(0);
2083}
2084
2085
2086int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
2087{
2088 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2089 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2090 DBUG_ENTER("ha_federated::repair");
2091
2092 query.length(0);
2093
2094 query.set_charset(system_charset_info);
2095 query.append(STRING_WITH_LEN("REPAIR TABLE "));
2096 append_ident(&query, share->table_name, share->table_name_length,
2097 ident_quote_char);
2098 if (check_opt->flags & T_QUICK)
2099 query.append(STRING_WITH_LEN(" QUICK"));
2100 if (check_opt->flags & T_EXTEND)
2101 query.append(STRING_WITH_LEN(" EXTENDED"));
2102 if (check_opt->sql_flags & TT_USEFRM)
2103 query.append(STRING_WITH_LEN(" USE_FRM"));
2104
2105 if (real_query(query.ptr(), query.length()))
2106 {
2107 DBUG_RETURN(stash_remote_error());
2108 }
2109
2110 DBUG_RETURN(0);
2111}
2112
2113
2114/*
2115 Yes, update_row() does what you expect, it updates a row. old_data will have
2116 the previous row record in it, while new_data will have the newest data in
2117 it.
2118
2119 Keep in mind that the server can do updates based on ordering if an ORDER BY
2120 clause was used. Consecutive ordering is not guaranteed.
2121 Currently new_data will not have an updated auto_increament record, or
2122 and updated timestamp field. You can do these for federated by doing these:
2123 if (table->timestamp_on_update_now)
2124 update_timestamp(new_row+table->timestamp_on_update_now-1);
2125 if (table->next_number_field && record == table->record[0])
2126 update_auto_increment();
2127
2128 Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
2129*/
2130
2131int ha_federated::update_row(const uchar *old_data, const uchar *new_data)
2132{
2133 /*
2134 This used to control how the query was built. If there was a
2135 primary key, the query would be built such that there was a where
2136 clause with only that column as the condition. This is flawed,
2137 because if we have a multi-part primary key, it would only use the
2138 first part! We don't need to do this anyway, because
2139 read_range_first will retrieve the correct record, which is what
2140 is used to build the WHERE clause. We can however use this to
2141 append a LIMIT to the end if there is NOT a primary key. Why do
2142 this? Because we only are updating one record, and LIMIT enforces
2143 this.
2144 */
2145 bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY);
2146
2147 /*
2148 buffers for following strings
2149 */
2150 char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2151 char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2152 char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2153
2154 /* Work area for field values */
2155 String field_value(field_value_buffer, sizeof(field_value_buffer),
2156 &my_charset_bin);
2157 /* stores the update query */
2158 String update_string(update_buffer,
2159 sizeof(update_buffer),
2160 &my_charset_bin);
2161 /* stores the WHERE clause */
2162 String where_string(where_buffer,
2163 sizeof(where_buffer),
2164 &my_charset_bin);
2165 uchar *record= table->record[0];
2166 DBUG_ENTER("ha_federated::update_row");
2167 /*
2168 set string lengths to 0 to avoid misc chars in string
2169 */
2170 field_value.length(0);
2171 update_string.length(0);
2172 where_string.length(0);
2173
2174 if (ignore_duplicates)
2175 update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
2176 else
2177 update_string.append(STRING_WITH_LEN("UPDATE "));
2178 append_ident(&update_string, share->table_name,
2179 share->table_name_length, ident_quote_char);
2180 update_string.append(STRING_WITH_LEN(" SET "));
2181
2182 /*
2183 In this loop, we want to match column names to values being inserted
2184 (while building INSERT statement).
2185
2186 Iterate through table->field (new data) and share->old_field (old_data)
2187 using the same index to create an SQL UPDATE statement. New data is
2188 used to create SET field=value and old data is used to create WHERE
2189 field=oldvalue
2190 */
2191
2192 for (Field **field= table->field; *field; field++)
2193 {
2194 if (bitmap_is_set(table->write_set, (*field)->field_index))
2195 {
2196 append_ident(&update_string, (*field)->field_name.str,
2197 (*field)->field_name.length,
2198 ident_quote_char);
2199 update_string.append(STRING_WITH_LEN(" = "));
2200
2201 if ((*field)->is_null())
2202 update_string.append(STRING_WITH_LEN(" NULL "));
2203 else
2204 {
2205 /* otherwise = */
2206 my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2207 bool needs_quote= (*field)->str_needs_quotes();
2208 (*field)->val_str(&field_value);
2209 if (needs_quote)
2210 update_string.append(value_quote_char);
2211 field_value.print(&update_string);
2212 if (needs_quote)
2213 update_string.append(value_quote_char);
2214 field_value.length(0);
2215 tmp_restore_column_map(table->read_set, old_map);
2216 }
2217 update_string.append(STRING_WITH_LEN(", "));
2218 }
2219
2220 if (bitmap_is_set(table->read_set, (*field)->field_index))
2221 {
2222 append_ident(&where_string, (*field)->field_name.str,
2223 (*field)->field_name.length,
2224 ident_quote_char);
2225 if (field_in_record_is_null(table, *field, (char*) old_data))
2226 where_string.append(STRING_WITH_LEN(" IS NULL "));
2227 else
2228 {
2229 bool needs_quote= (*field)->str_needs_quotes();
2230 where_string.append(STRING_WITH_LEN(" = "));
2231 (*field)->val_str(&field_value,
2232 (old_data + (*field)->offset(record)));
2233 if (needs_quote)
2234 where_string.append(value_quote_char);
2235 field_value.print(&where_string);
2236 if (needs_quote)
2237 where_string.append(value_quote_char);
2238 field_value.length(0);
2239 }
2240 where_string.append(STRING_WITH_LEN(" AND "));
2241 }
2242 }
2243
2244 /* Remove last ', '. This works as there must be at least on updated field */
2245 update_string.length(update_string.length() - sizeof_trailing_comma);
2246
2247 if (where_string.length())
2248 {
2249 /* chop off trailing AND */
2250 where_string.length(where_string.length() - sizeof_trailing_and);
2251 update_string.append(STRING_WITH_LEN(" WHERE "));
2252 update_string.append(where_string);
2253 }
2254
2255 /*
2256 If this table has not a primary key, then we could possibly
2257 update multiple rows. We want to make sure to only update one!
2258 */
2259 if (!has_a_primary_key)
2260 update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2261
2262 if (real_query(update_string.ptr(), update_string.length()))
2263 {
2264 DBUG_RETURN(stash_remote_error());
2265 }
2266 DBUG_RETURN(0);
2267}
2268
2269/*
2270 This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2271 The server will call this right after the current row has been called (from
2272 either a previous rnd_next() or index call).
2273 If you keep a pointer to the last row or can access a primary key it will
2274 make doing the deletion quite a bit easier.
2275 Keep in mind that the server does no guarentee consecutive deletions.
2276 ORDER BY clauses can be used.
2277
2278 Called in sql_acl.cc and sql_udf.cc to manage internal table information.
2279 Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2280 it is used for removing duplicates while in insert it is used for REPLACE
2281 calls.
2282*/
2283
2284int ha_federated::delete_row(const uchar *buf)
2285{
2286 char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2287 char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2288 String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
2289 String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2290 uint found= 0;
2291 DBUG_ENTER("ha_federated::delete_row");
2292
2293 delete_string.length(0);
2294 delete_string.append(STRING_WITH_LEN("DELETE FROM "));
2295 append_ident(&delete_string, share->table_name,
2296 share->table_name_length, ident_quote_char);
2297 delete_string.append(STRING_WITH_LEN(" WHERE "));
2298
2299 for (Field **field= table->field; *field; field++)
2300 {
2301 Field *cur_field= *field;
2302 found++;
2303 if (bitmap_is_set(table->read_set, cur_field->field_index))
2304 {
2305 append_ident(&delete_string, (*field)->field_name.str,
2306 (*field)->field_name.length, ident_quote_char);
2307 data_string.length(0);
2308 if (cur_field->is_null())
2309 {
2310 delete_string.append(STRING_WITH_LEN(" IS NULL "));
2311 }
2312 else
2313 {
2314 bool needs_quote= cur_field->str_needs_quotes();
2315 delete_string.append(STRING_WITH_LEN(" = "));
2316 cur_field->val_str(&data_string);
2317 if (needs_quote)
2318 delete_string.append(value_quote_char);
2319 data_string.print(&delete_string);
2320 if (needs_quote)
2321 delete_string.append(value_quote_char);
2322 }
2323 delete_string.append(STRING_WITH_LEN(" AND "));
2324 }
2325 }
2326
2327 // Remove trailing AND
2328 delete_string.length(delete_string.length() - sizeof_trailing_and);
2329 if (!found)
2330 delete_string.length(delete_string.length() - sizeof_trailing_where);
2331
2332 delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2333 DBUG_PRINT("info",
2334 ("Delete sql: %s", delete_string.c_ptr_quick()));
2335 if (real_query(delete_string.ptr(), delete_string.length()))
2336 {
2337 DBUG_RETURN(stash_remote_error());
2338 }
2339 stats.deleted+= (ha_rows) mysql->affected_rows;
2340 stats.records-= (ha_rows) mysql->affected_rows;
2341 DBUG_PRINT("info",
2342 ("rows deleted %ld rows deleted for all time %ld",
2343 (long) mysql->affected_rows, (long) stats.deleted));
2344
2345 DBUG_RETURN(0);
2346}
2347
2348/*
2349 Positions an index cursor to the index specified in the handle. Fetches the
2350 row if available. If the key value is null, begin at the first key of the
2351 index. This method, which is called in the case of an SQL statement having
2352 a WHERE clause on a non-primary key index, simply calls index_read_idx.
2353*/
2354
2355int ha_federated::index_read(uchar *buf, const uchar *key,
2356 uint key_len, ha_rkey_function find_flag)
2357{
2358 int rc;
2359 DBUG_ENTER("ha_federated::index_read");
2360
2361 free_result();
2362 rc= index_read_idx_with_result_set(buf, active_index, key,
2363 key_len, find_flag,
2364 &stored_result);
2365 DBUG_RETURN(rc);
2366}
2367
2368
2369/*
2370 Positions an index cursor to the index specified in key. Fetches the
2371 row if any. This is only used to read whole keys.
2372
2373 This method is called via index_read in the case of a WHERE clause using
2374 a primary key index OR is called DIRECTLY when the WHERE clause
2375 uses a PRIMARY KEY index.
2376
2377 NOTES
2378 This uses an internal result set that is deleted before function
2379 returns. We need to be able to be calable from ha_rnd_pos()
2380*/
2381
2382int ha_federated::index_read_idx(uchar *buf, uint index, const uchar *key,
2383 uint key_len, enum ha_rkey_function find_flag)
2384{
2385 int retval;
2386 MYSQL_RES *mysql_result;
2387 DBUG_ENTER("ha_federated::index_read_idx");
2388
2389 if ((retval= index_read_idx_with_result_set(buf, index, key,
2390 key_len, find_flag,
2391 &mysql_result)))
2392 DBUG_RETURN(retval);
2393 mysql_free_result(mysql_result);
2394 results.elements--;
2395 DBUG_RETURN(0);
2396}
2397
2398
2399/*
2400 Create result set for rows matching query and return first row
2401
2402 RESULT
2403 0 ok In this case *result will contain the result set
2404 # error In this case *result will contain 0
2405*/
2406
2407int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
2408 const uchar *key,
2409 uint key_len,
2410 ha_rkey_function find_flag,
2411 MYSQL_RES **result)
2412{
2413 int retval;
2414 char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2415 char index_value[STRING_BUFFER_USUAL_SIZE];
2416 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2417 String index_string(index_value,
2418 sizeof(index_value),
2419 &my_charset_bin);
2420 String sql_query(sql_query_buffer,
2421 sizeof(sql_query_buffer),
2422 &my_charset_bin);
2423 key_range range;
2424 DBUG_ENTER("ha_federated::index_read_idx_with_result_set");
2425
2426 *result= 0; // In case of errors
2427 index_string.length(0);
2428 sql_query.length(0);
2429
2430 sql_query.append(share->select_query);
2431
2432 range.key= key;
2433 range.length= key_len;
2434 range.flag= find_flag;
2435 create_where_from_key(&index_string,
2436 &table->key_info[index],
2437 &range,
2438 NULL, 0, 0);
2439 sql_query.append(index_string);
2440
2441 if (real_query(sql_query.ptr(), sql_query.length()))
2442 {
2443 sprintf(error_buffer, "error: %d '%s'",
2444 mysql_errno(mysql), mysql_error(mysql));
2445 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2446 goto error;
2447 }
2448 if (!(*result= store_result(mysql)))
2449 {
2450 retval= HA_ERR_END_OF_FILE;
2451 goto error;
2452 }
2453 if ((retval= read_next(buf, *result)))
2454 {
2455 mysql_free_result(*result);
2456 results.elements--;
2457 *result= 0;
2458 DBUG_RETURN(retval);
2459 }
2460 DBUG_RETURN(0);
2461
2462error:
2463 my_error(retval, MYF(0), error_buffer);
2464 DBUG_RETURN(retval);
2465}
2466
2467
2468/*
2469 This method is used exlusevely by filesort() to check if we
2470 can create sorting buffers of necessary size.
2471 If the handler returns more records that it declares
2472 here server can just crash on filesort().
2473 We cannot guarantee that's not going to happen with
2474 the FEDERATED engine, as we have records==0 always if the
2475 client is a VIEW, and for the table the number of
2476 records can inpredictably change during execution.
2477 So we return maximum possible value here.
2478*/
2479
2480ha_rows ha_federated::estimate_rows_upper_bound()
2481{
2482 return HA_POS_ERROR;
2483}
2484
2485
2486/* Initialized at each key walk (called multiple times unlike rnd_init()) */
2487
2488int ha_federated::index_init(uint keynr, bool sorted)
2489{
2490 DBUG_ENTER("ha_federated::index_init");
2491 DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr));
2492 DBUG_RETURN(0);
2493}
2494
2495
2496/*
2497 Read first range
2498*/
2499
2500int ha_federated::read_range_first(const key_range *start_key,
2501 const key_range *end_key,
2502 bool eq_range_arg, bool sorted)
2503{
2504 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2505 int retval;
2506 String sql_query(sql_query_buffer,
2507 sizeof(sql_query_buffer),
2508 &my_charset_bin);
2509 DBUG_ENTER("ha_federated::read_range_first");
2510
2511 DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2512
2513 sql_query.length(0);
2514 sql_query.append(share->select_query);
2515 create_where_from_key(&sql_query,
2516 &table->key_info[active_index],
2517 start_key, end_key, 0, eq_range_arg);
2518 if (real_query(sql_query.ptr(), sql_query.length()))
2519 {
2520 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2521 goto error;
2522 }
2523 sql_query.length(0);
2524
2525 if (!(stored_result= store_result(mysql)))
2526 {
2527 retval= HA_ERR_END_OF_FILE;
2528 goto error;
2529 }
2530
2531 retval= read_next(table->record[0], stored_result);
2532 DBUG_RETURN(retval);
2533
2534error:
2535 DBUG_RETURN(retval);
2536}
2537
2538
2539int ha_federated::read_range_next()
2540{
2541 int retval;
2542 DBUG_ENTER("ha_federated::read_range_next");
2543 retval= rnd_next_int(table->record[0]);
2544 DBUG_RETURN(retval);
2545}
2546
2547
2548/* Used to read forward through the index. */
2549int ha_federated::index_next(uchar *buf)
2550{
2551 int retval;
2552 DBUG_ENTER("ha_federated::index_next");
2553 retval= read_next(buf, stored_result);
2554 DBUG_RETURN(retval);
2555}
2556
2557
2558/*
2559 rnd_init() is called when the system wants the storage engine to do a table
2560 scan.
2561
2562 This is the method that gets data for the SELECT calls.
2563
2564 See the federated in the introduction at the top of this file to see when
2565 rnd_init() is called.
2566
2567 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2568 sql_table.cc, and sql_update.cc.
2569*/
2570
2571int ha_federated::rnd_init(bool scan)
2572{
2573 DBUG_ENTER("ha_federated::rnd_init");
2574 /*
2575 The use of the 'scan' flag is incredibly important for this handler
2576 to work properly, especially with updates containing WHERE clauses
2577 using indexed columns.
2578
2579 When the initial query contains a WHERE clause of the query using an
2580 indexed column, it's index_read_idx that selects the exact record from
2581 the foreign database.
2582
2583 When there is NO index in the query, either due to not having a WHERE
2584 clause, or the WHERE clause is using columns that are not indexed, a
2585 'full table scan' done by rnd_init, which in this situation simply means
2586 a 'select * from ...' on the foreign table.
2587
2588 In other words, this 'scan' flag gives us the means to ensure that if
2589 there is an index involved in the query, we want index_read_idx to
2590 retrieve the exact record (scan flag is 0), and do not want rnd_init
2591 to do a 'full table scan' and wipe out that result set.
2592
2593 Prior to using this flag, the problem was most apparent with updates.
2594
2595 An initial query like 'UPDATE tablename SET anything = whatever WHERE
2596 indexedcol = someval', index_read_idx would get called, using a query
2597 constructed with a WHERE clause built from the values of index ('indexcol'
2598 in this case, having a value of 'someval'). mysql_store_result would
2599 then get called (this would be the result set we want to use).
2600
2601 After this rnd_init (from sql_update.cc) would be called, it would then
2602 unecessarily call "select * from table" on the foreign table, then call
2603 mysql_store_result, which would wipe out the correct previous result set
2604 from the previous call of index_read_idx's that had the result set
2605 containing the correct record, hence update the wrong row!
2606
2607 */
2608
2609 if (scan)
2610 {
2611 if (real_query(share->select_query, strlen(share->select_query)) ||
2612 !(stored_result= store_result(mysql)))
2613 DBUG_RETURN(stash_remote_error());
2614 }
2615 DBUG_RETURN(0);
2616}
2617
2618
2619int ha_federated::rnd_end()
2620{
2621 DBUG_ENTER("ha_federated::rnd_end");
2622 DBUG_RETURN(index_end());
2623}
2624
2625
2626int ha_federated::index_end(void)
2627{
2628 DBUG_ENTER("ha_federated::index_end");
2629 free_result();
2630 DBUG_RETURN(0);
2631}
2632
2633
2634/*
2635 This is called for each row of the table scan. When you run out of records
2636 you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
2637 The Field structure for the table is the key to getting data into buf
2638 in a manner that will allow the server to understand it.
2639
2640 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2641 sql_table.cc, and sql_update.cc.
2642*/
2643
2644int ha_federated::rnd_next(uchar *buf)
2645{
2646 int rc;
2647 DBUG_ENTER("ha_federated::rnd_next");
2648 rc= rnd_next_int(buf);
2649 DBUG_RETURN(rc);
2650}
2651
2652int ha_federated::rnd_next_int(uchar *buf)
2653{
2654 DBUG_ENTER("ha_federated::rnd_next_int");
2655
2656 if (stored_result == 0)
2657 {
2658 /*
2659 Return value of rnd_init is not always checked (see records.cc),
2660 so we can get here _even_ if there is _no_ pre-fetched result-set!
2661 TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
2662 */
2663 DBUG_RETURN(1);
2664 }
2665 DBUG_RETURN(read_next(buf, stored_result));
2666}
2667
2668
2669/*
2670 ha_federated::read_next
2671
2672 reads from a result set and converts to mysql internal
2673 format
2674
2675 SYNOPSIS
2676 field_in_record_is_null()
2677 buf byte pointer to record
2678 result mysql result set
2679
2680 DESCRIPTION
2681 This method is a wrapper method that reads one record from a result
2682 set and converts it to the internal table format
2683
2684 RETURN VALUE
2685 1 error
2686 0 no error
2687*/
2688
2689int ha_federated::read_next(uchar *buf, MYSQL_RES *result)
2690{
2691 int retval;
2692 MYSQL_ROW row;
2693 DBUG_ENTER("ha_federated::read_next");
2694
2695 /* Save current data cursor position. */
2696 current_position= result->data_cursor;
2697
2698 /* Fetch a row, insert it back in a row format. */
2699 if (!(row= mysql_fetch_row(result)))
2700 DBUG_RETURN(HA_ERR_END_OF_FILE);
2701
2702 if (!(retval= convert_row_to_internal_format(buf, row, result)))
2703 table->status= 0;
2704
2705 DBUG_RETURN(retval);
2706}
2707
2708
2709/**
2710 @brief Store a reference to current row.
2711
2712 @details During a query execution we may have different result sets (RS),
2713 e.g. for different ranges. All the RS's used are stored in
2714 memory and placed in @c results dynamic array. At the end of
2715 execution all stored RS's are freed at once in the
2716 @c ha_federated::reset().
2717 So, in case of federated, a reference to current row is a
2718 stored result address and current data cursor position.
2719 As we keep all RS in memory during a query execution,
2720 we can get any record using the reference any time until
2721 @c ha_federated::reset() is called.
2722 TODO: we don't have to store all RS's rows but only those
2723 we call @c ha_federated::position() for, so we can free memory
2724 where we store other rows in the @c ha_federated::index_end().
2725
2726 @param[in] record record data (unused)
2727*/
2728
2729void ha_federated::position(const uchar *record __attribute__ ((unused)))
2730{
2731 DBUG_ENTER("ha_federated::position");
2732
2733 if (!stored_result)
2734 DBUG_VOID_RETURN;
2735
2736 position_called= TRUE;
2737 /* Store result set address. */
2738 memcpy(ref, &stored_result, sizeof(MYSQL_RES *));
2739 /* Store data cursor position. */
2740 memcpy(ref + sizeof(MYSQL_RES *), &current_position,
2741 sizeof(MYSQL_ROW_OFFSET));
2742 DBUG_VOID_RETURN;
2743}
2744
2745
2746/*
2747 This is like rnd_next, but you are given a position to use to determine the
2748 row. The position will be of the type that you stored in ref.
2749
2750 This method is required for an ORDER BY
2751
2752 Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
2753*/
2754
2755int ha_federated::rnd_pos(uchar *buf, uchar *pos)
2756{
2757 MYSQL_RES *result;
2758 int ret_val;
2759 DBUG_ENTER("ha_federated::rnd_pos");
2760
2761 /* Get stored result set. */
2762 memcpy(&result, pos, sizeof(MYSQL_RES *));
2763 DBUG_ASSERT(result);
2764 /* Set data cursor position. */
2765 memcpy(&result->data_cursor, pos + sizeof(MYSQL_RES *),
2766 sizeof(MYSQL_ROW_OFFSET));
2767 /* Read a row. */
2768 ret_val= read_next(buf, result);
2769 DBUG_RETURN(ret_val);
2770}
2771
2772
2773/*
2774 ::info() is used to return information to the optimizer.
2775 Currently this table handler doesn't implement most of the fields
2776 really needed. SHOW also makes use of this data
2777 Another note, you will probably want to have the following in your
2778 code:
2779 if (records < 2)
2780 records = 2;
2781 The reason is that the server will optimize for cases of only a single
2782 record. If in a table scan you don't know the number of records
2783 it will probably be better to set records to two so you can return
2784 as many records as you need.
2785 Along with records a few more variables you may wish to set are:
2786 records
2787 deleted
2788 data_file_length
2789 index_file_length
2790 delete_length
2791 check_time
2792 Take a look at the public variables in handler.h for more information.
2793
2794 Called in:
2795 filesort.cc
2796 ha_heap.cc
2797 item_sum.cc
2798 opt_sum.cc
2799 sql_delete.cc
2800 sql_delete.cc
2801 sql_derived.cc
2802 sql_select.cc
2803 sql_select.cc
2804 sql_select.cc
2805 sql_select.cc
2806 sql_select.cc
2807 sql_show.cc
2808 sql_show.cc
2809 sql_show.cc
2810 sql_show.cc
2811 sql_table.cc
2812 sql_union.cc
2813 sql_update.cc
2814
2815*/
2816
2817int ha_federated::info(uint flag)
2818{
2819 char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
2820 int error;
2821 uint error_code;
2822 MYSQL_RES *result= 0;
2823 MYSQL_ROW row;
2824 String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
2825 DBUG_ENTER("ha_federated::info");
2826
2827 error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2828 /* we want not to show table status if not needed to do so */
2829 if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
2830 {
2831 status_query_string.length(0);
2832 status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
2833 append_ident(&status_query_string, share->table_name,
2834 share->table_name_length, value_quote_char);
2835
2836 if (real_query(status_query_string.ptr(), status_query_string.length()))
2837 goto error;
2838
2839 status_query_string.length(0);
2840
2841 result= mysql_store_result(mysql);
2842
2843 /*
2844 We're going to use fields num. 4, 12 and 13 of the resultset,
2845 so make sure we have these fields.
2846 */
2847 if (!result || (mysql_num_fields(result) < 14))
2848 goto error;
2849
2850 if (!mysql_num_rows(result))
2851 goto error;
2852
2853 if (!(row= mysql_fetch_row(result)))
2854 goto error;
2855
2856 /*
2857 deleted is set in ha_federated::info
2858 */
2859 /*
2860 need to figure out what this means as far as federated is concerned,
2861 since we don't have a "file"
2862
2863 data_file_length = ?
2864 index_file_length = ?
2865 delete_length = ?
2866 */
2867 if (row[4] != NULL)
2868 stats.records= (ha_rows) my_strtoll10(row[4], (char**) 0,
2869 &error);
2870 if (row[5] != NULL)
2871 stats.mean_rec_length= (ulong) my_strtoll10(row[5], (char**) 0, &error);
2872
2873 stats.data_file_length= stats.records * stats.mean_rec_length;
2874
2875 if (row[12] != NULL)
2876 stats.update_time= (ulong) my_strtoll10(row[12], (char**) 0,
2877 &error);
2878 if (row[13] != NULL)
2879 stats.check_time= (ulong) my_strtoll10(row[13], (char**) 0,
2880 &error);
2881
2882 /*
2883 size of IO operations (This is based on a good guess, no high science
2884 involved)
2885 */
2886 if (flag & HA_STATUS_CONST)
2887 stats.block_size= 4096;
2888
2889 }
2890
2891 if ((flag & HA_STATUS_AUTO) && mysql)
2892 stats.auto_increment_value= mysql->insert_id;
2893
2894 mysql_free_result(result);
2895
2896 DBUG_RETURN(0);
2897
2898error:
2899 mysql_free_result(result);
2900 if (mysql)
2901 {
2902 my_printf_error(error_code, ": %d : %s", MYF(0),
2903 mysql_errno(mysql), mysql_error(mysql));
2904 }
2905 else
2906 if (remote_error_number != -1 /* error already reported */)
2907 {
2908 error_code= remote_error_number;
2909 my_error(error_code, MYF(0), ER(error_code));
2910 }
2911 DBUG_RETURN(error_code);
2912}
2913
2914
2915/**
2916 @brief Handles extra signals from MySQL server
2917
2918 @param[in] operation Hint for storage engine
2919
2920 @return Operation Status
2921 @retval 0 OK
2922 */
2923int ha_federated::extra(ha_extra_function operation)
2924{
2925 DBUG_ENTER("ha_federated::extra");
2926 switch (operation) {
2927 case HA_EXTRA_IGNORE_DUP_KEY:
2928 ignore_duplicates= TRUE;
2929 break;
2930 case HA_EXTRA_NO_IGNORE_DUP_KEY:
2931 insert_dup_update= FALSE;
2932 ignore_duplicates= FALSE;
2933 break;
2934 case HA_EXTRA_WRITE_CAN_REPLACE:
2935 replace_duplicates= TRUE;
2936 break;
2937 case HA_EXTRA_WRITE_CANNOT_REPLACE:
2938 /*
2939 We use this flag to ensure that we do not create an "INSERT IGNORE"
2940 statement when inserting new rows into the remote table.
2941 */
2942 replace_duplicates= FALSE;
2943 break;
2944 case HA_EXTRA_INSERT_WITH_UPDATE:
2945 insert_dup_update= TRUE;
2946 break;
2947 case HA_EXTRA_PREPARE_FOR_DROP:
2948 table_will_be_deleted = TRUE;
2949 break;
2950 default:
2951 /* do nothing */
2952 DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
2953 }
2954 DBUG_RETURN(0);
2955}
2956
2957
2958/**
2959 @brief Reset state of file to after 'open'.
2960
2961 @detail This function is called after every statement for all tables
2962 used by that statement.
2963
2964 @return Operation status
2965 @retval 0 OK
2966*/
2967
2968int ha_federated::reset(void)
2969{
2970 insert_dup_update= FALSE;
2971 ignore_duplicates= FALSE;
2972 replace_duplicates= FALSE;
2973
2974 /* Free stored result sets. */
2975 for (uint i= 0; i < results.elements; i++)
2976 {
2977 MYSQL_RES *result;
2978 get_dynamic(&results, (uchar *) &result, i);
2979 mysql_free_result(result);
2980 }
2981 reset_dynamic(&results);
2982
2983 if (mysql)
2984 mysql->net.thd= NULL;
2985
2986 return 0;
2987}
2988
2989
2990/*
2991 Used to delete all rows in a table. Both for cases of truncate and
2992 for cases where the optimizer realizes that all rows will be
2993 removed as a result of a SQL statement.
2994
2995 Called from item_sum.cc by Item_func_group_concat::clear(),
2996 Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
2997 Called from sql_delete.cc by mysql_delete().
2998 Called from sql_select.cc by JOIN::reinit().
2999 Called from sql_union.cc by st_select_lex_unit::exec().
3000*/
3001
3002int ha_federated::delete_all_rows()
3003{
3004 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
3005 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3006 DBUG_ENTER("ha_federated::delete_all_rows");
3007
3008 query.length(0);
3009
3010 query.set_charset(system_charset_info);
3011 query.append(STRING_WITH_LEN("TRUNCATE "));
3012 append_ident(&query, share->table_name, share->table_name_length,
3013 ident_quote_char);
3014
3015 /*
3016 TRUNCATE won't return anything in mysql_affected_rows
3017 */
3018 if (real_query(query.ptr(), query.length()))
3019 {
3020 DBUG_RETURN(stash_remote_error());
3021 }
3022 stats.deleted+= stats.records;
3023 stats.records= 0;
3024 DBUG_RETURN(0);
3025}
3026
3027
3028
3029
3030/*
3031 Used to manually truncate the table via a delete of all rows in a table.
3032*/
3033
3034int ha_federated::truncate()
3035{
3036 return delete_all_rows();
3037}
3038
3039
3040/* The idea with handler::store_lock() is the following:
3041
3042 The statement decided which locks we should need for the table
3043 for updates/deletes/inserts we get WRITE locks, for SELECT... we get
3044 read locks.
3045
3046 Before adding the lock into the table lock handler (see thr_lock.c)
3047 mysqld calls store lock with the requested locks. Store lock can now
3048 modify a write lock to a read lock (or some other lock), ignore the
3049 lock (if we don't want to use MySQL table locks at all) or add locks
3050 for many tables (like we do when we are using a MERGE handler).
3051
3052 Berkeley DB for federated changes all WRITE locks to TL_WRITE_ALLOW_WRITE
3053 (which signals that we are doing WRITES, but we are still allowing other
3054 reader's and writer's.
3055
3056 When releasing locks, store_lock() are also called. In this case one
3057 usually doesn't have to do anything.
3058
3059 In some exceptional cases MySQL may send a request for a TL_IGNORE;
3060 This means that we are requesting the same lock as last time and this
3061 should also be ignored. (This may happen when someone does a flush
3062 table when we have opened a part of the tables, in which case mysqld
3063 closes and reopens the tables and tries to get the same locks at last
3064 time). In the future we will probably try to remove this.
3065
3066 Called from lock.cc by get_lock_data().
3067*/
3068
3069THR_LOCK_DATA **ha_federated::store_lock(THD *thd,
3070 THR_LOCK_DATA **to,
3071 enum thr_lock_type lock_type)
3072{
3073 DBUG_ENTER("ha_federated::store_lock");
3074 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3075 {
3076 /*
3077 Here is where we get into the guts of a row level lock.
3078 If TL_UNLOCK is set
3079 If we are not doing a LOCK TABLE or DISCARD/IMPORT
3080 TABLESPACE, then allow multiple writers
3081 */
3082
3083 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3084 lock_type <= TL_WRITE) && !thd->in_lock_tables)
3085 lock_type= TL_WRITE_ALLOW_WRITE;
3086
3087 /*
3088 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
3089 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
3090 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
3091 to t2. Convert the lock to a normal read lock to allow
3092 concurrent inserts to t2.
3093 */
3094
3095 if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3096 lock_type= TL_READ;
3097
3098 lock.type= lock_type;
3099 }
3100
3101 *to++= &lock;
3102
3103 DBUG_RETURN(to);
3104}
3105
3106/*
3107 create() does nothing, since we have no local setup of our own.
3108 FUTURE: We should potentially connect to the foreign database and
3109*/
3110
3111int ha_federated::create(const char *name, TABLE *table_arg,
3112 HA_CREATE_INFO *create_info)
3113{
3114 int retval;
3115 THD *thd= current_thd;
3116 FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
3117 DBUG_ENTER("ha_federated::create");
3118
3119 retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3120
3121 DBUG_RETURN(retval);
3122
3123}
3124
3125
3126int ha_federated::real_connect()
3127{
3128 char buffer[FEDERATED_QUERY_BUFFER_SIZE];
3129 String sql_query(buffer, sizeof(buffer), &my_charset_bin);
3130 DBUG_ENTER("ha_federated::real_connect");
3131
3132 DBUG_ASSERT(mysql == NULL);
3133
3134 if (!(mysql= mysql_init(NULL)))
3135 {
3136 remote_error_number= HA_ERR_OUT_OF_MEM;
3137 DBUG_RETURN(-1);
3138 }
3139
3140 /*
3141 BUG# 17044 Federated Storage Engine is not UTF8 clean
3142 Add set names to whatever charset the table is at open
3143 of table
3144 */
3145 /* this sets the csname like 'set names utf8' */
3146 mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
3147 this->table->s->table_charset->csname);
3148
3149 sql_query.length(0);
3150 if (!mysql_real_connect(mysql,
3151 share->hostname,
3152 share->username,
3153 share->password,
3154 share->database,
3155 share->port,
3156 share->socket, 0))
3157 {
3158 stash_remote_error();
3159 mysql_close(mysql);
3160 mysql= NULL;
3161 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
3162 remote_error_number= -1;
3163 DBUG_RETURN(-1);
3164 }
3165
3166 /*
3167 We have established a connection, lets try a simple dummy query just
3168 to check that the table and expected columns are present.
3169 */
3170 sql_query.append(share->select_query);
3171 sql_query.append(STRING_WITH_LEN(" WHERE 1=0"));
3172 if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
3173 {
3174 sql_query.length(0);
3175 sql_query.append("error: ");
3176 sql_query.qs_append(mysql_errno(mysql));
3177 sql_query.append(" '");
3178 sql_query.append(mysql_error(mysql));
3179 sql_query.append("'");
3180 mysql_close(mysql);
3181 mysql= NULL;
3182 my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
3183 remote_error_number= -1;
3184 DBUG_RETURN(-1);
3185 }
3186
3187 /* Just throw away the result, no rows anyways but need to keep in sync */
3188 mysql_free_result(mysql_store_result(mysql));
3189
3190 /*
3191 Since we do not support transactions at this version, we can let the client
3192 API silently reconnect. For future versions, we will need more logic to
3193 deal with transactions
3194 */
3195
3196 mysql->reconnect= 1;
3197 DBUG_RETURN(0);
3198}
3199
3200
3201int ha_federated::real_query(const char *query, size_t length)
3202{
3203 int rc= 0;
3204 DBUG_ENTER("ha_federated::real_query");
3205
3206 if (!query || !length)
3207 goto end;
3208
3209 if (!mysql && (rc= real_connect()))
3210 goto end;
3211
3212 mysql->net.thd= table->in_use;
3213
3214 rc= mysql_real_query(mysql, query, (uint) length);
3215
3216end:
3217 DBUG_RETURN(rc);
3218}
3219
3220
3221int ha_federated::stash_remote_error()
3222{
3223 DBUG_ENTER("ha_federated::stash_remote_error()");
3224 if (!mysql)
3225 DBUG_RETURN(remote_error_number);
3226 remote_error_number= mysql_errno(mysql);
3227 strmake_buf(remote_error_buf, mysql_error(mysql));
3228 if (remote_error_number == ER_DUP_ENTRY ||
3229 remote_error_number == ER_DUP_KEY)
3230 DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3231 DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
3232}
3233
3234
3235bool ha_federated::get_error_message(int error, String* buf)
3236{
3237 DBUG_ENTER("ha_federated::get_error_message");
3238 DBUG_PRINT("enter", ("error: %d", error));
3239 if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
3240 {
3241 buf->append(STRING_WITH_LEN("Error on remote system: "));
3242 buf->qs_append(remote_error_number);
3243 buf->append(STRING_WITH_LEN(": "));
3244 buf->append(remote_error_buf);
3245
3246 remote_error_number= 0;
3247 remote_error_buf[0]= '\0';
3248 }
3249 DBUG_PRINT("exit", ("message: %s", buf->ptr()));
3250 DBUG_RETURN(FALSE);
3251}
3252
3253
3254/**
3255 @brief Store a result set.
3256
3257 @details Call @c mysql_store_result() to save a result set then
3258 append it to the stored results array.
3259
3260 @param[in] mysql_arg MySLQ connection structure.
3261
3262 @return Stored result set (MYSQL_RES object).
3263*/
3264
3265MYSQL_RES *ha_federated::store_result(MYSQL *mysql_arg)
3266{
3267 MYSQL_RES *result= mysql_store_result(mysql_arg);
3268 DBUG_ENTER("ha_federated::store_result");
3269 if (result)
3270 {
3271 (void) insert_dynamic(&results, (uchar*) &result);
3272 }
3273 position_called= FALSE;
3274 DBUG_RETURN(result);
3275}
3276
3277
3278void ha_federated::free_result()
3279{
3280 DBUG_ENTER("ha_federated::free_result");
3281 if (stored_result && !position_called)
3282 {
3283 mysql_free_result(stored_result);
3284 stored_result= 0;
3285 if (results.elements > 0)
3286 results.elements--;
3287 }
3288 DBUG_VOID_RETURN;
3289}
3290
3291
3292int ha_federated::external_lock(THD *thd, int lock_type)
3293{
3294 int error= 0;
3295 DBUG_ENTER("ha_federated::external_lock");
3296
3297 table_will_be_deleted = FALSE;
3298 DBUG_RETURN(error);
3299}
3300
3301
3302static int federated_commit(handlerton *hton, THD *thd, bool all)
3303{
3304 int return_val= 0;
3305 ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
3306 DBUG_ENTER("federated_commit");
3307
3308 if (all)
3309 {
3310 int error= 0;
3311 ha_federated *ptr, *old= NULL;
3312 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3313 {
3314 if (old)
3315 old->trx_next= NULL;
3316 error= ptr->connection_commit();
3317 if (error && !return_val)
3318 return_val= error;
3319 }
3320 thd_set_ha_data(thd, hton, NULL);
3321 }
3322
3323 DBUG_PRINT("info", ("error val: %d", return_val));
3324 DBUG_RETURN(return_val);
3325}
3326
3327
3328static int federated_rollback(handlerton *hton, THD *thd, bool all)
3329{
3330 int return_val= 0;
3331 ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
3332 DBUG_ENTER("federated_rollback");
3333
3334 if (all)
3335 {
3336 int error= 0;
3337 ha_federated *ptr, *old= NULL;
3338 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3339 {
3340 if (old)
3341 old->trx_next= NULL;
3342 error= ptr->connection_rollback();
3343 if (error && !return_val)
3344 return_val= error;
3345 }
3346 thd_set_ha_data(thd, hton, NULL);
3347 }
3348
3349 DBUG_PRINT("info", ("error val: %d", return_val));
3350 DBUG_RETURN(return_val);
3351}
3352
3353int ha_federated::connection_commit()
3354{
3355 DBUG_ENTER("ha_federated::connection_commit");
3356 DBUG_RETURN(execute_simple_query("COMMIT", 6));
3357}
3358
3359
3360int ha_federated::connection_rollback()
3361{
3362 DBUG_ENTER("ha_federated::connection_rollback");
3363 DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
3364}
3365
3366
3367int ha_federated::connection_autocommit(bool state)
3368{
3369 const char *text;
3370 DBUG_ENTER("ha_federated::connection_autocommit");
3371 text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
3372 DBUG_RETURN(execute_simple_query(text, 16));
3373}
3374
3375
3376int ha_federated::execute_simple_query(const char *query, int len)
3377{
3378 DBUG_ENTER("ha_federated::execute_simple_query");
3379
3380 if (mysql_real_query(mysql, query, len))
3381 {
3382 DBUG_RETURN(stash_remote_error());
3383 }
3384 DBUG_RETURN(0);
3385}
3386
3387struct st_mysql_storage_engine federated_storage_engine=
3388{ MYSQL_HANDLERTON_INTERFACE_VERSION };
3389
3390mysql_declare_plugin(federated)
3391{
3392 MYSQL_STORAGE_ENGINE_PLUGIN,
3393 &federated_storage_engine,
3394 "FEDERATED",
3395 "Patrick Galbraith and Brian Aker, MySQL AB",
3396 "Federated MySQL storage engine",
3397 PLUGIN_LICENSE_GPL,
3398 federated_db_init, /* Plugin Init */
3399 federated_done, /* Plugin Deinit */
3400 0x0100 /* 1.0 */,
3401 NULL, /* status variables */
3402 NULL, /* system variables */
3403 NULL, /* config options */
3404 0, /* flags */
3405}
3406mysql_declare_plugin_end;
3407maria_declare_plugin(federated)
3408{
3409 MYSQL_STORAGE_ENGINE_PLUGIN,
3410 &federated_storage_engine,
3411 "FEDERATED",
3412 "Patrick Galbraith and Brian Aker, MySQL AB",
3413 "Federated MySQL storage engine",
3414 PLUGIN_LICENSE_GPL,
3415 federated_db_init, /* Plugin Init */
3416 federated_done, /* Plugin Deinit */
3417 0x0100 /* 1.0 */,
3418 NULL, /* status variables */
3419 NULL, /* system variables */
3420 "1.0", /* string version */
3421 MariaDB_PLUGIN_MATURITY_GAMMA /* maturity */
3422}
3423maria_declare_plugin_end;
3424