1/*
2Copyright (c) 2008-2009, Patrick Galbraith & Antony Curtis
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are
7met:
8
9 * Redistributions of source code must retain the above copyright
10notice, this list of conditions and the following disclaimer.
11
12 * Neither the name of Patrick Galbraith nor the names of its
13contributors may be used to endorse or promote products derived from
14this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27*/
28/*
29
30 FederatedX Pluggable Storage Engine
31
32 ha_federatedx.cc - FederatedX Pluggable Storage Engine
33 Patrick Galbraith, 2008
34
35 This is a handler which uses a foreign database as the data file, as
36 opposed to a handler like MyISAM, which uses .MYD files locally.
37
38 How this handler works
39 ----------------------------------
40 Normal database files are local and as such: You create a table called
41 'users', a file such as 'users.MYD' is created. A handler reads, inserts,
42 deletes, updates data in this file. The data is stored in particular format,
43 so to read, that data has to be parsed into fields, to write, fields have to
44 be stored in this format to write to this data file.
45
46 With FederatedX storage engine, there will be no local files
47 for each table's data (such as .MYD). A foreign database will store
48 the data that would normally be in this file. This will necessitate
49 the use of MySQL client API to read, delete, update, insert this
50 data. The data will have to be retrieve via an SQL call "SELECT *
51 FROM users". Then, to read this data, it will have to be retrieved
52 via mysql_fetch_row one row at a time, then converted from the
53 column in this select into the format that the handler expects.
54
55 The create table will simply create the .frm file, and within the
56 "CREATE TABLE" SQL, there SHALL be any of the following :
57
58 connection=scheme://username:password@hostname:port/database/tablename
59 connection=scheme://username@hostname/database/tablename
60 connection=scheme://username:password@hostname/database/tablename
61 connection=scheme://username:password@hostname/database/tablename
62
63 - OR -
64
65 As of 5.1 federatedx now allows you to use a non-url
66 format, taking advantage of mysql.servers:
67
68 connection="connection_one"
69 connection="connection_one/table_foo"
70
71 An example would be:
72
73 connection=mysql://username:password@hostname:port/database/tablename
74
75 or, if we had:
76
77 create server 'server_one' foreign data wrapper 'mysql' options
78 (HOST '127.0.0.1',
79 DATABASE 'db1',
80 USER 'root',
81 PASSWORD '',
82 PORT 3306,
83 SOCKET '',
84 OWNER 'root');
85
86 CREATE TABLE federatedx.t1 (
87 `id` int(20) NOT NULL,
88 `name` varchar(64) NOT NULL default ''
89 )
90 ENGINE="FEDERATEDX" DEFAULT CHARSET=latin1
91 CONNECTION='server_one';
92
93 So, this will have been the equivalent of
94
95 CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
96
97 Then, we can also change the server to point to a new schema:
98
99 ALTER SERVER 'server_one' options(DATABASE 'db2');
100
101 All subsequent calls will now be against db2.t1! Guess what? You don't
102 have to perform an alter table!
103
104 This connecton="connection string" is necessary for the handler to be
105 able to connect to the foreign server, either by URL, or by server
106 name.
107
108
109 The basic flow is this:
110
111 SQL calls issues locally ->
112 mysql handler API (data in handler format) ->
113 mysql client API (data converted to SQL calls) ->
114 foreign database -> mysql client API ->
115 convert result sets (if any) to handler format ->
116 handler API -> results or rows affected to local
117
118 What this handler does and doesn't support
119 ------------------------------------------
120 * Tables MUST be created on the foreign server prior to any action on those
121 tables via the handler, first version. IMPORTANT: IF you MUST use the
122 federatedx storage engine type on the REMOTE end, MAKE SURE [ :) ] That
123 the table you connect to IS NOT a table pointing BACK to your ORIGNAL
124 table! You know and have heard the screaching of audio feedback? You
125 know putting two mirror in front of each other how the reflection
126 continues for eternity? Well, need I say more?!
127 * There will not be support for transactions.
128 * There is no way for the handler to know if the foreign database or table
129 has changed. The reason for this is that this database has to work like a
130 data file that would never be written to by anything other than the
131 database. The integrity of the data in the local table could be breached
132 if there was any change to the foreign database.
133 * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
134 * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
135 * Prepared statements will not be used in the first implementation, it
136 remains to to be seen whether the limited subset of the client API for the
137 server supports this.
138 * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
139 implementation.
140 * This will not work with the query cache.
141
142 Method calls
143
144 A two column table, with one record:
145
146 (SELECT)
147
148 "SELECT * FROM foo"
149 ha_federatedx::info
150 ha_federatedx::scan_time:
151 ha_federatedx::rnd_init: share->select_query SELECT * FROM foo
152 ha_federatedx::extra
153
154 <for every row of data retrieved>
155 ha_federatedx::rnd_next
156 ha_federatedx::convert_row_to_internal_format
157 ha_federatedx::rnd_next
158 </for every row of data retrieved>
159
160 ha_federatedx::rnd_end
161 ha_federatedx::extra
162 ha_federatedx::reset
163
164 (INSERT)
165
166 "INSERT INTO foo (id, ts) VALUES (2, now());"
167
168 ha_federatedx::write_row
169
170 ha_federatedx::reset
171
172 (UPDATE)
173
174 "UPDATE foo SET ts = now() WHERE id = 1;"
175
176 ha_federatedx::index_init
177 ha_federatedx::index_read
178 ha_federatedx::index_read_idx
179 ha_federatedx::rnd_next
180 ha_federatedx::convert_row_to_internal_format
181 ha_federatedx::update_row
182
183 ha_federatedx::extra
184 ha_federatedx::extra
185 ha_federatedx::extra
186 ha_federatedx::external_lock
187 ha_federatedx::reset
188
189
190 How do I use this handler?
191 --------------------------
192
193 <insert text about plugin storage engine>
194
195 Next, to use this handler, it's very simple. You must
196 have two databases running, either both on the same host, or
197 on different hosts.
198
199 One the server that will be connecting to the foreign
200 host (client), you create your table as such:
201
202 CREATE TABLE test_table (
203 id int(20) NOT NULL auto_increment,
204 name varchar(32) NOT NULL default '',
205 other int(20) NOT NULL default '0',
206 PRIMARY KEY (id),
207 KEY name (name),
208 KEY other_key (other))
209 ENGINE="FEDERATEDX"
210 DEFAULT CHARSET=latin1
211 CONNECTION='mysql://root@127.0.0.1:9306/federatedx/test_federatedx';
212
213 Notice the "COMMENT" and "ENGINE" field? This is where you
214 respectively set the engine type, "FEDERATEDX" and foreign
215 host information, this being the database your 'client' database
216 will connect to and use as the "data file". Obviously, the foreign
217 database is running on port 9306, so you want to start up your other
218 database so that it is indeed on port 9306, and your federatedx
219 database on a port other than that. In my setup, I use port 5554
220 for federatedx, and port 5555 for the foreign database.
221
222 Then, on the foreign database:
223
224 CREATE TABLE test_table (
225 id int(20) NOT NULL auto_increment,
226 name varchar(32) NOT NULL default '',
227 other int(20) NOT NULL default '0',
228 PRIMARY KEY (id),
229 KEY name (name),
230 KEY other_key (other))
231 ENGINE="<NAME>" <-- whatever you want, or not specify
232 DEFAULT CHARSET=latin1 ;
233
234 This table is exactly the same (and must be exactly the same),
235 except that it is not using the federatedx handler and does
236 not need the URL.
237
238
239 How to see the handler in action
240 --------------------------------
241
242 When developing this handler, I compiled the federatedx database with
243 debugging:
244
245 ./configure --with-federatedx-storage-engine
246 --prefix=/home/mysql/mysql-build/federatedx/ --with-debug
247
248 Once compiled, I did a 'make install' (not for the purpose of installing
249 the binary, but to install all the files the binary expects to see in the
250 diretory I specified in the build with --prefix,
251 "/home/mysql/mysql-build/federatedx".
252
253 Then, I started the foreign server:
254
255 /usr/local/mysql/bin/mysqld_safe
256 --user=mysql --log=/tmp/mysqld.5555.log -P 5555
257
258 Then, I went back to the directory containing the newly compiled mysqld,
259 <builddir>/sql/, started up gdb:
260
261 gdb ./mysqld
262
263 Then, withn the (gdb) prompt:
264 (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug-dbug
265
266 Next, I open several windows for each:
267
268 1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
269 2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
270 3. A window with a client open to the federatedx server on port 5554
271 4. A window with a client open to the federatedx server on port 5555
272
273 I would create a table on the client to the foreign server on port
274 5555, and then to the federatedx server on port 5554. At this point,
275 I would run whatever queries I wanted to on the federatedx server,
276 just always remembering that whatever changes I wanted to make on
277 the table, or if I created new tables, that I would have to do that
278 on the foreign server.
279
280 Another thing to look for is 'show variables' to show you that you have
281 support for federatedx handler support:
282
283 show variables like '%federat%'
284
285 and:
286
287 show storage engines;
288
289 Both should display the federatedx storage handler.
290
291
292 Testing
293 -------
294
295 Testing for FederatedX as a pluggable storage engine for
296 now is a manual process that I intend to build a test
297 suite that works for all pluggable storage engines.
298
299 How to test
300
301 1. cp fed.dat /tmp
302 (make sure you have access to "test". Use a user that has
303 super privileges for now)
304 2. mysql -f -u root test < federated.test > federated.myresult 2>&1
305 3. diff federated.result federated.myresult (there _should_ be no differences)
306
307
308*/
309
310#ifdef USE_PRAGMA_IMPLEMENTATION
311#pragma implementation // gcc: Class implementation
312#endif
313
314#define MYSQL_SERVER 1
315#include <my_global.h>
316#include <mysql/plugin.h>
317#include "ha_federatedx.h"
318#include "sql_servers.h"
319#include "sql_analyse.h" // append_escaped()
320#include "sql_show.h" // append_identifier()
321#include "tztime.h" // my_tz_find()
322
323#ifdef I_AM_PARANOID
324#define MIN_PORT 1023
325#else
326#define MIN_PORT 0
327#endif
328
329/* Variables for federatedx share methods */
330static HASH federatedx_open_tables; // To track open tables
331static HASH federatedx_open_servers; // To track open servers
332mysql_mutex_t federatedx_mutex; // To init the hash
333const char ident_quote_char= '`'; // Character for quoting
334 // identifiers
335const char value_quote_char= '\''; // Character for quoting
336 // literals
337static const int bulk_padding= 64; // bytes "overhead" in packet
338
339/* Variables used when chopping off trailing characters */
340static const uint sizeof_trailing_comma= sizeof(", ") - 1;
341static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
342static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
343
344static Time_zone *UTC= 0;
345
346/* Static declaration for handerton */
347static handler *federatedx_create_handler(handlerton *hton,
348 TABLE_SHARE *table,
349 MEM_ROOT *mem_root);
350
351/* FederatedX storage engine handlerton */
352
353static handler *federatedx_create_handler(handlerton *hton,
354 TABLE_SHARE *table,
355 MEM_ROOT *mem_root)
356{
357 return new (mem_root) ha_federatedx(hton, table);
358}
359
360
361/* Function we use in the creation of our hash to get key */
362
363static uchar *
364federatedx_share_get_key(FEDERATEDX_SHARE *share, size_t *length,
365 my_bool not_used __attribute__ ((unused)))
366{
367 *length= share->share_key_length;
368 return (uchar*) share->share_key;
369}
370
371
372static uchar *
373federatedx_server_get_key(FEDERATEDX_SERVER *server, size_t *length,
374 my_bool not_used __attribute__ ((unused)))
375{
376 *length= server->key_length;
377 return server->key;
378}
379
380#ifdef HAVE_PSI_INTERFACE
381static PSI_mutex_key fe_key_mutex_federatedx, fe_key_mutex_FEDERATEDX_SERVER_mutex;
382
383static PSI_mutex_info all_federated_mutexes[]=
384{
385 { &fe_key_mutex_federatedx, "federatedx", PSI_FLAG_GLOBAL},
386 { &fe_key_mutex_FEDERATEDX_SERVER_mutex, "FEDERATED_SERVER::mutex", 0}
387};
388
389static void init_federated_psi_keys(void)
390{
391 const char* category= "federated";
392 int count;
393
394 if (PSI_server == NULL)
395 return;
396
397 count= array_elements(all_federated_mutexes);
398 PSI_server->register_mutex(category, all_federated_mutexes, count);
399}
400#else
401#define init_federated_psi_keys() /* no-op */
402#endif /* HAVE_PSI_INTERFACE */
403
404
405/*
406 Initialize the federatedx handler.
407
408 SYNOPSIS
409 federatedx_db_init()
410 p Handlerton
411
412 RETURN
413 FALSE OK
414 TRUE Error
415*/
416
417int federatedx_db_init(void *p)
418{
419 DBUG_ENTER("federatedx_db_init");
420 init_federated_psi_keys();
421 handlerton *federatedx_hton= (handlerton *)p;
422 federatedx_hton->state= SHOW_OPTION_YES;
423 /* Needed to work with old .frm files */
424 federatedx_hton->db_type= DB_TYPE_FEDERATED_DB;
425 federatedx_hton->savepoint_offset= sizeof(ulong);
426 federatedx_hton->close_connection= ha_federatedx::disconnect;
427 federatedx_hton->savepoint_set= ha_federatedx::savepoint_set;
428 federatedx_hton->savepoint_rollback= ha_federatedx::savepoint_rollback;
429 federatedx_hton->savepoint_release= ha_federatedx::savepoint_release;
430 federatedx_hton->commit= ha_federatedx::commit;
431 federatedx_hton->rollback= ha_federatedx::rollback;
432 federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted;
433 federatedx_hton->create= federatedx_create_handler;
434 federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED;
435
436 if (mysql_mutex_init(fe_key_mutex_federatedx,
437 &federatedx_mutex, MY_MUTEX_INIT_FAST))
438 goto error;
439 if (!my_hash_init(&federatedx_open_tables, &my_charset_bin, 32, 0, 0,
440 (my_hash_get_key) federatedx_share_get_key, 0, 0) &&
441 !my_hash_init(&federatedx_open_servers, &my_charset_bin, 32, 0, 0,
442 (my_hash_get_key) federatedx_server_get_key, 0, 0))
443 {
444 DBUG_RETURN(FALSE);
445 }
446
447 mysql_mutex_destroy(&federatedx_mutex);
448error:
449 DBUG_RETURN(TRUE);
450}
451
452
453/*
454 Release the federatedx handler.
455
456 SYNOPSIS
457 federatedx_db_end()
458
459 RETURN
460 FALSE OK
461*/
462
463int federatedx_done(void *p)
464{
465 my_hash_free(&federatedx_open_tables);
466 my_hash_free(&federatedx_open_servers);
467 mysql_mutex_destroy(&federatedx_mutex);
468
469 return 0;
470}
471
472/**
473 @brief Append identifiers to the string.
474
475 @param[in,out] string The target string.
476 @param[in] name Identifier name
477 @param[in] length Length of identifier name in bytes
478 @param[in] quote_char Quote char to use for quoting identifier.
479
480 @return Operation Status
481 @retval FALSE OK
482 @retval TRUE There was an error appending to the string.
483
484 @note This function is based upon the append_identifier() function
485 in sql_show.cc except that quoting always occurs.
486*/
487
488bool append_ident(String *string, const char *name, size_t length,
489 const char quote_char)
490{
491 bool result;
492 uint clen;
493 const char *name_end;
494 DBUG_ENTER("append_ident");
495
496 if (quote_char)
497 {
498 string->reserve(length * 2 + 2);
499 if ((result= string->append(&quote_char, 1, system_charset_info)))
500 goto err;
501
502 for (name_end= name+length; name < name_end; name+= clen)
503 {
504 uchar c= *(uchar *) name;
505 clen= my_charlen_fix(system_charset_info, name, name_end);
506 if (clen == 1 && c == (uchar) quote_char &&
507 (result= string->append(&quote_char, 1, system_charset_info)))
508 goto err;
509 if ((result= string->append(name, clen, string->charset())))
510 goto err;
511 }
512 result= string->append(&quote_char, 1, system_charset_info);
513 }
514 else
515 result= string->append(name, length, system_charset_info);
516
517err:
518 DBUG_RETURN(result);
519}
520
521
522static int parse_url_error(FEDERATEDX_SHARE *share, TABLE_SHARE *table_s,
523 int error_num)
524{
525 char buf[FEDERATEDX_QUERY_BUFFER_SIZE];
526 size_t buf_len;
527 DBUG_ENTER("ha_federatedx parse_url_error");
528
529 buf_len= MY_MIN(table_s->connect_string.length,
530 FEDERATEDX_QUERY_BUFFER_SIZE-1);
531 strmake(buf, table_s->connect_string.str, buf_len);
532 my_error(error_num, MYF(0), buf, 14);
533 DBUG_RETURN(error_num);
534}
535
536/*
537 retrieve server object which contains server meta-data
538 from the system table given a server's name, set share
539 connection parameter members
540*/
541int get_connection(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share)
542{
543 int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
544 FOREIGN_SERVER *server, server_buffer;
545 DBUG_ENTER("ha_federatedx::get_connection");
546
547 /*
548 get_server_by_name() clones the server if exists and allocates
549 copies of strings in the supplied mem_root
550 */
551 if (!(server=
552 get_server_by_name(mem_root, share->connection_string, &server_buffer)))
553 {
554 DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
555 /* need to come up with error handling */
556 error_num=1;
557 goto error;
558 }
559 DBUG_PRINT("info", ("get_server_by_name returned server at %p",
560 server));
561
562 /*
563 Most of these should never be empty strings, error handling will
564 need to be implemented. Also, is this the best way to set the share
565 members? Is there some allocation needed? In running this code, it works
566 except there are errors in the trace file of the share being overrun
567 at the address of the share.
568 */
569 share->server_name_length= server->server_name_length;
570 share->server_name= const_cast<char*>(server->server_name);
571 share->username= const_cast<char*>(server->username);
572 share->password= const_cast<char*>(server->password);
573 share->database= const_cast<char*>(server->db);
574 share->port= server->port > MIN_PORT && server->port < 65536 ?
575 (ushort) server->port : MYSQL_PORT;
576 share->hostname= const_cast<char*>(server->host);
577 if (!(share->socket= const_cast<char*>(server->socket)) &&
578 !strcmp(share->hostname, my_localhost))
579 share->socket= (char *) MYSQL_UNIX_ADDR;
580 share->scheme= const_cast<char*>(server->scheme);
581
582 DBUG_PRINT("info", ("share->username: %s", share->username));
583 DBUG_PRINT("info", ("share->password: %s", share->password));
584 DBUG_PRINT("info", ("share->hostname: %s", share->hostname));
585 DBUG_PRINT("info", ("share->database: %s", share->database));
586 DBUG_PRINT("info", ("share->port: %d", share->port));
587 DBUG_PRINT("info", ("share->socket: %s", share->socket));
588 DBUG_RETURN(0);
589
590error:
591 my_printf_error(error_num, "server name: '%s' doesn't exist!",
592 MYF(0), share->connection_string);
593 DBUG_RETURN(error_num);
594}
595
596/*
597 Parse connection info from table->s->connect_string
598
599 SYNOPSIS
600 parse_url()
601 mem_root MEM_ROOT pointer for memory allocation
602 share pointer to FEDERATEDX share
603 table pointer to current TABLE class
604 table_create_flag determines what error to throw
605
606 DESCRIPTION
607 Populates the share with information about the connection
608 to the foreign database that will serve as the data source.
609 This string must be specified (currently) in the "CONNECTION" field,
610 listed in the CREATE TABLE statement.
611
612 This string MUST be in the format of any of these:
613
614 CONNECTION="scheme://username:password@hostname:port/database/table"
615 CONNECTION="scheme://username@hostname/database/table"
616 CONNECTION="scheme://username@hostname:port/database/table"
617 CONNECTION="scheme://username:password@hostname/database/table"
618
619 _OR_
620
621 CONNECTION="connection name"
622
623
624
625 An Example:
626
627 CREATE TABLE t1 (id int(32))
628 ENGINE="FEDERATEDX"
629 CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federatedx/testtable";
630
631 CREATE TABLE t2 (
632 id int(4) NOT NULL auto_increment,
633 name varchar(32) NOT NULL,
634 PRIMARY KEY(id)
635 ) ENGINE="FEDERATEDX" CONNECTION="my_conn";
636
637 ***IMPORTANT***
638 Currently, the FederatedX Storage Engine only supports connecting to another
639 Database ("scheme" of "mysql"). Connections using JDBC as well as
640 other connectors are in the planning stage.
641
642
643 'password' and 'port' are both optional.
644
645 RETURN VALUE
646 0 success
647 error_num particular error code
648
649*/
650
651static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share,
652 TABLE_SHARE *table_s, uint table_create_flag)
653{
654 uint error_num= (table_create_flag ?
655 ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
656 ER_FOREIGN_DATA_STRING_INVALID);
657 DBUG_ENTER("ha_federatedx::parse_url");
658
659 share->port= 0;
660 share->socket= 0;
661 DBUG_PRINT("info", ("share at %p", share));
662 DBUG_PRINT("info", ("Length: %u", (uint) table_s->connect_string.length));
663 DBUG_PRINT("info", ("String: '%.*s'", (int) table_s->connect_string.length,
664 table_s->connect_string.str));
665 share->connection_string= strmake_root(mem_root, table_s->connect_string.str,
666 table_s->connect_string.length);
667
668 DBUG_PRINT("info",("parse_url alloced share->connection_string %p",
669 share->connection_string));
670
671 DBUG_PRINT("info",("share->connection_string: %s",share->connection_string));
672 /*
673 No :// or @ in connection string. Must be a straight connection name of
674 either "servername" or "servername/tablename"
675 */
676 if ((!strstr(share->connection_string, "://") &&
677 (!strchr(share->connection_string, '@'))))
678 {
679
680 DBUG_PRINT("info",
681 ("share->connection_string: %s internal format "
682 "share->connection_string: %p",
683 share->connection_string,
684 share->connection_string));
685
686 /* ok, so we do a little parsing, but not completely! */
687 share->parsed= FALSE;
688 /*
689 If there is a single '/' in the connection string, this means the user is
690 specifying a table name
691 */
692
693 if ((share->table_name= strchr(share->connection_string, '/')))
694 {
695 *share->table_name++= '\0';
696 share->table_name_length= strlen(share->table_name);
697
698 DBUG_PRINT("info",
699 ("internal format, parsed table_name "
700 "share->connection_string: %s share->table_name: %s",
701 share->connection_string, share->table_name));
702
703 /*
704 there better not be any more '/'s !
705 */
706 if (strchr(share->table_name, '/'))
707 goto error;
708 }
709 /*
710 Otherwise, straight server name, use tablename of federatedx table
711 as remote table name
712 */
713 else
714 {
715 /*
716 Connection specifies everything but, resort to
717 expecting remote and foreign table names to match
718 */
719 share->table_name= strmake_root(mem_root, table_s->table_name.str,
720 (share->table_name_length=
721 table_s->table_name.length));
722 DBUG_PRINT("info",
723 ("internal format, default table_name "
724 "share->connection_string: %s share->table_name: %s",
725 share->connection_string, share->table_name));
726 }
727
728 if ((error_num= get_connection(mem_root, share)))
729 goto error;
730 }
731 else
732 {
733 share->parsed= TRUE;
734 // Add a null for later termination of table name
735 share->connection_string[table_s->connect_string.length]= 0;
736 share->scheme= share->connection_string;
737 DBUG_PRINT("info",("parse_url alloced share->scheme: %p",
738 share->scheme));
739
740 /*
741 Remove addition of null terminator and store length
742 for each string in share
743 */
744 if (!(share->username= strstr(share->scheme, "://")))
745 goto error;
746 share->scheme[share->username - share->scheme]= '\0';
747
748 if (!federatedx_io::handles_scheme(share->scheme))
749 goto error;
750
751 share->username+= 3;
752
753 if (!(share->hostname= strchr(share->username, '@')))
754 goto error;
755 *share->hostname++= '\0'; // End username
756
757 if ((share->password= strchr(share->username, ':')))
758 {
759 *share->password++= '\0'; // End username
760
761 /* make sure there isn't an extra / or @ */
762 if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
763 goto error;
764 /*
765 Found that if the string is:
766 user:@hostname:port/db/table
767 Then password is a null string, so set to NULL
768 */
769 if (share->password[0] == '\0')
770 share->password= NULL;
771 }
772
773 /* make sure there isn't an extra / or @ */
774 if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
775 goto error;
776
777 if (!(share->database= strchr(share->hostname, '/')))
778 goto error;
779 *share->database++= '\0';
780
781 if ((share->sport= strchr(share->hostname, ':')))
782 {
783 *share->sport++= '\0';
784 if (share->sport[0] == '\0')
785 share->sport= NULL;
786 else
787 share->port= atoi(share->sport);
788 }
789
790 if (!(share->table_name= strchr(share->database, '/')))
791 goto error;
792 *share->table_name++= '\0';
793
794 share->table_name_length= strlen(share->table_name);
795
796 /* make sure there's not an extra / */
797 if ((strchr(share->table_name, '/')))
798 goto error;
799
800 if (share->hostname[0] == '\0')
801 share->hostname= NULL;
802
803 }
804 if (!share->port)
805 {
806 if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
807 share->socket= (char *) MYSQL_UNIX_ADDR;
808 else
809 share->port= MYSQL_PORT;
810 }
811
812 DBUG_PRINT("info",
813 ("scheme: %s username: %s password: %s hostname: %s "
814 "port: %d db: %s tablename: %s",
815 share->scheme, share->username, share->password,
816 share->hostname, share->port, share->database,
817 share->table_name));
818
819 DBUG_RETURN(0);
820
821error:
822 DBUG_RETURN(parse_url_error(share, table_s, error_num));
823}
824
825/*****************************************************************************
826** FEDERATEDX tables
827*****************************************************************************/
828
829ha_federatedx::ha_federatedx(handlerton *hton,
830 TABLE_SHARE *table_arg)
831 :handler(hton, table_arg),
832 txn(0), io(0), stored_result(0)
833{
834 bzero(&bulk_insert, sizeof(bulk_insert));
835}
836
837
838/*
839 Convert MySQL result set row to handler internal format
840
841 SYNOPSIS
842 convert_row_to_internal_format()
843 record Byte pointer to record
844 row MySQL result set row from fetchrow()
845 result Result set to use
846
847 DESCRIPTION
848 This method simply iterates through a row returned via fetchrow with
849 values from a successful SELECT , and then stores each column's value
850 in the field object via the field object pointer (pointing to the table's
851 array of field object pointers). This is how the handler needs the data
852 to be stored to then return results back to the user
853
854 RETURN VALUE
855 0 After fields have had field values stored from record
856*/
857
858uint ha_federatedx::convert_row_to_internal_format(uchar *record,
859 FEDERATEDX_IO_ROW *row,
860 FEDERATEDX_IO_RESULT *result)
861{
862 ulong *lengths;
863 Field **field;
864 int column= 0;
865 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
866 Time_zone *saved_time_zone= table->in_use->variables.time_zone;
867 DBUG_ENTER("ha_federatedx::convert_row_to_internal_format");
868
869 table->in_use->variables.time_zone= UTC;
870 lengths= io->fetch_lengths(result);
871
872 for (field= table->field; *field; field++, column++)
873 {
874 /*
875 index variable to move us through the row at the
876 same iterative step as the field
877 */
878 my_ptrdiff_t old_ptr;
879 old_ptr= (my_ptrdiff_t) (record - table->record[0]);
880 (*field)->move_field_offset(old_ptr);
881 if (io->is_column_null(row, column))
882 (*field)->set_null();
883 else
884 {
885 if (bitmap_is_set(table->read_set, (*field)->field_index))
886 {
887 (*field)->set_notnull();
888 (*field)->store(io->get_column_data(row, column), lengths[column], &my_charset_bin);
889 }
890 }
891 (*field)->move_field_offset(-old_ptr);
892 }
893 table->in_use->variables.time_zone= saved_time_zone;
894 dbug_tmp_restore_column_map(table->write_set, old_map);
895 DBUG_RETURN(0);
896}
897
898static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
899{
900 DBUG_ENTER("emit_key_part_name");
901 if (append_ident(to, part->field->field_name.str,
902 part->field->field_name.length, ident_quote_char))
903 DBUG_RETURN(1); // Out of memory
904 DBUG_RETURN(0);
905}
906
907static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
908 bool needs_quotes, bool is_like,
909 const uchar *ptr, uint len)
910{
911 Field *field= part->field;
912 DBUG_ENTER("emit_key_part_element");
913
914 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
915 DBUG_RETURN(1);
916
917 if (part->type == HA_KEYTYPE_BIT)
918 {
919 char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
920
921 *buf++= '0';
922 *buf++= 'x';
923 buf= octet2hex(buf, (char*) ptr, len);
924 if (to->append((char*) buff, (uint)(buf - buff)))
925 DBUG_RETURN(1);
926 }
927 else if (part->key_part_flag & HA_BLOB_PART)
928 {
929 String blob;
930 uint blob_length= uint2korr(ptr);
931 blob.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
932 blob_length, &my_charset_bin);
933 if (to->append_for_single_quote(&blob))
934 DBUG_RETURN(1);
935 }
936 else if (part->key_part_flag & HA_VAR_LENGTH_PART)
937 {
938 String varchar;
939 uint var_length= uint2korr(ptr);
940 varchar.set_quick((char*) ptr+HA_KEY_BLOB_LENGTH,
941 var_length, &my_charset_bin);
942 if (to->append_for_single_quote(&varchar))
943 DBUG_RETURN(1);
944 }
945 else
946 {
947 char strbuff[MAX_FIELD_WIDTH];
948 String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
949
950 res= field->val_str(&str, ptr);
951
952 if (field->result_type() == STRING_RESULT)
953 {
954 if (to->append_for_single_quote(res))
955 DBUG_RETURN(1);
956 }
957 else if (to->append(res->ptr(), res->length()))
958 DBUG_RETURN(1);
959 }
960
961 if (is_like && to->append(STRING_WITH_LEN("%")))
962 DBUG_RETURN(1);
963
964 if (needs_quotes && to->append(STRING_WITH_LEN("'")))
965 DBUG_RETURN(1);
966
967 DBUG_RETURN(0);
968}
969
970/*
971 Create a WHERE clause based off of values in keys
972 Note: This code was inspired by key_copy from key.cc
973
974 SYNOPSIS
975 create_where_from_key ()
976 to String object to store WHERE clause
977 key_info KEY struct pointer
978 key byte pointer containing key
979 key_length length of key
980 range_type 0 - no range, 1 - min range, 2 - max range
981 (see enum range_operation)
982
983 DESCRIPTION
984 Using iteration through all the keys via a KEY_PART_INFO pointer,
985 This method 'extracts' the value of each key in the byte pointer
986 *key, and for each key found, constructs an appropriate WHERE clause
987
988 RETURN VALUE
989 0 After all keys have been accounted for to create the WHERE clause
990 1 No keys found
991
992 Range flags Table per Timour:
993
994 -----------------
995 - start_key:
996 * ">" -> HA_READ_AFTER_KEY
997 * ">=" -> HA_READ_KEY_OR_NEXT
998 * "=" -> HA_READ_KEY_EXACT
999
1000 - end_key:
1001 * "<" -> HA_READ_BEFORE_KEY
1002 * "<=" -> HA_READ_AFTER_KEY
1003
1004 records_in_range:
1005 -----------------
1006 - start_key:
1007 * ">" -> HA_READ_AFTER_KEY
1008 * ">=" -> HA_READ_KEY_EXACT
1009 * "=" -> HA_READ_KEY_EXACT
1010
1011 - end_key:
1012 * "<" -> HA_READ_BEFORE_KEY
1013 * "<=" -> HA_READ_AFTER_KEY
1014 * "=" -> HA_READ_AFTER_KEY
1015
10160 HA_READ_KEY_EXACT, Find first record else error
10171 HA_READ_KEY_OR_NEXT, Record or next record
10182 HA_READ_KEY_OR_PREV, Record or previous
10193 HA_READ_AFTER_KEY, Find next rec. after key-record
10204 HA_READ_BEFORE_KEY, Find next rec. before key-record
10215 HA_READ_PREFIX, Key which as same prefix
10226 HA_READ_PREFIX_LAST, Last key with the same prefix
10237 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix
1024
1025Flags that I've found:
1026
1027id, primary key, varchar
1028
1029id = 'ccccc'
1030records_in_range: start_key 0 end_key 3
1031read_range_first: start_key 0 end_key NULL
1032
1033id > 'ccccc'
1034records_in_range: start_key 3 end_key NULL
1035read_range_first: start_key 3 end_key NULL
1036
1037id < 'ccccc'
1038records_in_range: start_key NULL end_key 4
1039read_range_first: start_key NULL end_key 4
1040
1041id <= 'ccccc'
1042records_in_range: start_key NULL end_key 3
1043read_range_first: start_key NULL end_key 3
1044
1045id >= 'ccccc'
1046records_in_range: start_key 0 end_key NULL
1047read_range_first: start_key 1 end_key NULL
1048
1049id like 'cc%cc'
1050records_in_range: start_key 0 end_key 3
1051read_range_first: start_key 1 end_key 3
1052
1053id > 'aaaaa' and id < 'ccccc'
1054records_in_range: start_key 3 end_key 4
1055read_range_first: start_key 3 end_key 4
1056
1057id >= 'aaaaa' and id < 'ccccc';
1058records_in_range: start_key 0 end_key 4
1059read_range_first: start_key 1 end_key 4
1060
1061id >= 'aaaaa' and id <= 'ccccc';
1062records_in_range: start_key 0 end_key 3
1063read_range_first: start_key 1 end_key 3
1064
1065id > 'aaaaa' and id <= 'ccccc';
1066records_in_range: start_key 3 end_key 3
1067read_range_first: start_key 3 end_key 3
1068
1069numeric keys:
1070
1071id = 4
1072index_read_idx: start_key 0 end_key NULL
1073
1074id > 4
1075records_in_range: start_key 3 end_key NULL
1076read_range_first: start_key 3 end_key NULL
1077
1078id >= 4
1079records_in_range: start_key 0 end_key NULL
1080read_range_first: start_key 1 end_key NULL
1081
1082id < 4
1083records_in_range: start_key NULL end_key 4
1084read_range_first: start_key NULL end_key 4
1085
1086id <= 4
1087records_in_range: start_key NULL end_key 3
1088read_range_first: start_key NULL end_key 3
1089
1090id like 4
1091full table scan, select * from
1092
1093id > 2 and id < 8
1094records_in_range: start_key 3 end_key 4
1095read_range_first: start_key 3 end_key 4
1096
1097id >= 2 and id < 8
1098records_in_range: start_key 0 end_key 4
1099read_range_first: start_key 1 end_key 4
1100
1101id >= 2 and id <= 8
1102records_in_range: start_key 0 end_key 3
1103read_range_first: start_key 1 end_key 3
1104
1105id > 2 and id <= 8
1106records_in_range: start_key 3 end_key 3
1107read_range_first: start_key 3 end_key 3
1108
1109multi keys (id int, name varchar, other varchar)
1110
1111id = 1;
1112records_in_range: start_key 0 end_key 3
1113read_range_first: start_key 0 end_key NULL
1114
1115id > 4;
1116id > 2 and name = '333'; remote: id > 2
1117id > 2 and name > '333'; remote: id > 2
1118id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
1119id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
1120id >= 4 and name = 'eric was here' and other > 'eeee';
1121records_in_range: start_key 3 end_key NULL
1122read_range_first: start_key 3 end_key NULL
1123
1124id >= 4;
1125id >= 2 and name = '333' and other < 'ddd';
1126remote: `id` >= 2 AND `name` >= '333';
1127records_in_range: start_key 0 end_key NULL
1128read_range_first: start_key 1 end_key NULL
1129
1130id < 4;
1131id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
1132records_in_range: start_key NULL end_key 4
1133read_range_first: start_key NULL end_key 4
1134
1135id <= 4;
1136records_in_range: start_key NULL end_key 3
1137read_range_first: start_key NULL end_key 3
1138
1139id like 4;
1140full table scan
1141
1142id > 2 and id < 4;
1143records_in_range: start_key 3 end_key 4
1144read_range_first: start_key 3 end_key 4
1145
1146id >= 2 and id < 4;
1147records_in_range: start_key 0 end_key 4
1148read_range_first: start_key 1 end_key 4
1149
1150id >= 2 and id <= 4;
1151records_in_range: start_key 0 end_key 3
1152read_range_first: start_key 1 end_key 3
1153
1154id > 2 and id <= 4;
1155id = 6 and name = 'eric was here' and other > 'eeee';
1156remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee')
1157AND (`id` <= 6) AND ( AND `name` <= 'eric was here')
1158no results
1159records_in_range: start_key 3 end_key 3
1160read_range_first: start_key 3 end_key 3
1161
1162Summary:
1163
1164* If the start key flag is 0 the max key flag shouldn't even be set,
1165 and if it is, the query produced would be invalid.
1166* Multipart keys, even if containing some or all numeric columns,
1167 are treated the same as non-numeric keys
1168
1169 If the query is " = " (quotes or not):
1170 - records in range start key flag HA_READ_KEY_EXACT,
1171 end key flag HA_READ_AFTER_KEY (incorrect)
1172 - any other: start key flag HA_READ_KEY_OR_NEXT,
1173 end key flag HA_READ_AFTER_KEY (correct)
1174
1175* 'like' queries (of key)
1176 - Numeric, full table scan
1177 - Non-numeric
1178 records_in_range: start_key 0 end_key 3
1179 other : start_key 1 end_key 3
1180
1181* If the key flag is HA_READ_AFTER_KEY:
1182 if start_key, append >
1183 if end_key, append <=
1184
1185* If create_where_key was called by records_in_range:
1186
1187 - if the key is numeric:
1188 start key flag is 0 when end key is NULL, end key flag is 3 or 4
1189 - if create_where_key was called by any other function:
1190 start key flag is 1 when end key is NULL, end key flag is 3 or 4
1191 - if the key is non-numeric, or multipart
1192 When the query is an exact match, the start key flag is 0,
1193 end key flag is 3 for what should be a no-range condition where
1194 you should have 0 and max key NULL, which it is if called by
1195 read_range_first
1196
1197Conclusion:
1198
11991. Need logic to determin if a key is min or max when the flag is
1200HA_READ_AFTER_KEY, and handle appending correct operator accordingly
1201
12022. Need a boolean flag to pass to create_where_from_key, used in the
1203switch statement. Add 1 to the flag if:
1204 - start key flag is HA_READ_KEY_EXACT and the end key is NULL
1205
1206*/
1207
1208bool ha_federatedx::create_where_from_key(String *to,
1209 KEY *key_info,
1210 const key_range *start_key,
1211 const key_range *end_key,
1212 bool from_records_in_range,
1213 bool eq_range)
1214{
1215 bool both_not_null=
1216 (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1217 const uchar *ptr;
1218 uint remainder, length;
1219 char tmpbuff[FEDERATEDX_QUERY_BUFFER_SIZE];
1220 String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
1221 const key_range *ranges[2]= { start_key, end_key };
1222 Time_zone *saved_time_zone= table->in_use->variables.time_zone;
1223 my_bitmap_map *old_map;
1224 DBUG_ENTER("ha_federatedx::create_where_from_key");
1225
1226 tmp.length(0);
1227 if (start_key == NULL && end_key == NULL)
1228 DBUG_RETURN(1);
1229
1230 table->in_use->variables.time_zone= UTC;
1231 old_map= dbug_tmp_use_all_columns(table, table->write_set);
1232 for (uint i= 0; i <= 1; i++)
1233 {
1234 bool needs_quotes;
1235 KEY_PART_INFO *key_part;
1236 if (ranges[i] == NULL)
1237 continue;
1238
1239 if (both_not_null)
1240 {
1241 if (i > 0)
1242 tmp.append(STRING_WITH_LEN(") AND ("));
1243 else
1244 tmp.append(STRING_WITH_LEN(" ("));
1245 }
1246
1247 for (key_part= key_info->key_part,
1248 remainder= key_info->user_defined_key_parts,
1249 length= ranges[i]->length,
1250 ptr= ranges[i]->key; ;
1251 remainder--,
1252 key_part++)
1253 {
1254 Field *field= key_part->field;
1255 uint store_length= key_part->store_length;
1256 uint part_length= MY_MIN(store_length, length);
1257 needs_quotes= field->str_needs_quotes();
1258 DBUG_DUMP("key, start of loop", ptr, length);
1259
1260 if (key_part->null_bit)
1261 {
1262 if (*ptr++)
1263 {
1264 /*
1265 We got "IS [NOT] NULL" condition against nullable column. We
1266 distinguish between "IS NOT NULL" and "IS NULL" by flag. For
1267 "IS NULL", flag is set to HA_READ_KEY_EXACT.
1268 */
1269 if (emit_key_part_name(&tmp, key_part) ||
1270 tmp.append(ranges[i]->flag == HA_READ_KEY_EXACT ?
1271 " IS NULL " : " IS NOT NULL "))
1272 goto err;
1273 /*
1274 We need to adjust pointer and length to be prepared for next
1275 key part. As well as check if this was last key part.
1276 */
1277 goto prepare_for_next_key_part;
1278 }
1279 }
1280
1281 if (tmp.append(STRING_WITH_LEN(" (")))
1282 goto err;
1283
1284 switch (ranges[i]->flag) {
1285 case HA_READ_KEY_EXACT:
1286 DBUG_PRINT("info", ("federatedx HA_READ_KEY_EXACT %d", i));
1287 if (store_length >= length ||
1288 !needs_quotes ||
1289 key_part->type == HA_KEYTYPE_BIT ||
1290 field->result_type() != STRING_RESULT)
1291 {
1292 if (emit_key_part_name(&tmp, key_part))
1293 goto err;
1294
1295 if (from_records_in_range)
1296 {
1297 if (tmp.append(STRING_WITH_LEN(" >= ")))
1298 goto err;
1299 }
1300 else
1301 {
1302 if (tmp.append(STRING_WITH_LEN(" = ")))
1303 goto err;
1304 }
1305
1306 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1307 part_length))
1308 goto err;
1309 }
1310 else
1311 {
1312 /* LIKE */
1313 if (emit_key_part_name(&tmp, key_part) ||
1314 tmp.append(STRING_WITH_LEN(" LIKE ")) ||
1315 emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1316 part_length))
1317 goto err;
1318 }
1319 break;
1320 case HA_READ_AFTER_KEY:
1321 if (eq_range)
1322 {
1323 if (tmp.append("1=1")) // Dummy
1324 goto err;
1325 break;
1326 }
1327 DBUG_PRINT("info", ("federatedx HA_READ_AFTER_KEY %d", i));
1328 if (store_length >= length || i > 0) /* end key */
1329 {
1330 if (emit_key_part_name(&tmp, key_part))
1331 goto err;
1332
1333 if (i > 0) /* end key */
1334 {
1335 if (tmp.append(STRING_WITH_LEN(" <= ")))
1336 goto err;
1337 }
1338 else /* start key */
1339 {
1340 if (tmp.append(STRING_WITH_LEN(" > ")))
1341 goto err;
1342 }
1343
1344 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1345 part_length))
1346 {
1347 goto err;
1348 }
1349 break;
1350 }
1351 /* fall through */
1352 case HA_READ_KEY_OR_NEXT:
1353 DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_NEXT %d", i));
1354 if (emit_key_part_name(&tmp, key_part) ||
1355 tmp.append(STRING_WITH_LEN(" >= ")) ||
1356 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1357 part_length))
1358 goto err;
1359 break;
1360 case HA_READ_BEFORE_KEY:
1361 DBUG_PRINT("info", ("federatedx HA_READ_BEFORE_KEY %d", i));
1362 if (store_length >= length)
1363 {
1364 if (emit_key_part_name(&tmp, key_part) ||
1365 tmp.append(STRING_WITH_LEN(" < ")) ||
1366 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1367 part_length))
1368 goto err;
1369 break;
1370 }
1371 /* fall through */
1372 case HA_READ_KEY_OR_PREV:
1373 DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_PREV %d", i));
1374 if (emit_key_part_name(&tmp, key_part) ||
1375 tmp.append(STRING_WITH_LEN(" <= ")) ||
1376 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1377 part_length))
1378 goto err;
1379 break;
1380 default:
1381 DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
1382 goto err;
1383 }
1384 if (tmp.append(STRING_WITH_LEN(") ")))
1385 goto err;
1386
1387prepare_for_next_key_part:
1388 if (store_length >= length)
1389 break;
1390 DBUG_PRINT("info", ("remainder %d", remainder));
1391 DBUG_ASSERT(remainder > 1);
1392 length-= store_length;
1393 /*
1394 For nullable columns, null-byte is already skipped before, that is
1395 ptr was incremented by 1. Since store_length still counts null-byte,
1396 we need to subtract 1 from store_length.
1397 */
1398 ptr+= store_length - MY_TEST(key_part->null_bit);
1399 if (tmp.append(STRING_WITH_LEN(" AND ")))
1400 goto err;
1401
1402 DBUG_PRINT("info",
1403 ("create_where_from_key WHERE clause: %s",
1404 tmp.c_ptr_quick()));
1405 }
1406 }
1407 dbug_tmp_restore_column_map(table->write_set, old_map);
1408 table->in_use->variables.time_zone= saved_time_zone;
1409
1410 if (both_not_null)
1411 if (tmp.append(STRING_WITH_LEN(") ")))
1412 DBUG_RETURN(1);
1413
1414 if (to->append(STRING_WITH_LEN(" WHERE ")))
1415 DBUG_RETURN(1);
1416
1417 if (to->append(tmp))
1418 DBUG_RETURN(1);
1419
1420 DBUG_RETURN(0);
1421
1422err:
1423 dbug_tmp_restore_column_map(table->write_set, old_map);
1424 table->in_use->variables.time_zone= saved_time_zone;
1425 DBUG_RETURN(1);
1426}
1427
1428static void fill_server(MEM_ROOT *mem_root, FEDERATEDX_SERVER *server,
1429 FEDERATEDX_SHARE *share, CHARSET_INFO *table_charset)
1430{
1431 char buffer[STRING_BUFFER_USUAL_SIZE];
1432 String key(buffer, sizeof(buffer), &my_charset_bin);
1433 String scheme(share->scheme, &my_charset_latin1);
1434 String hostname(share->hostname, &my_charset_latin1);
1435 String database(share->database, system_charset_info);
1436 String username(share->username, system_charset_info);
1437 String socket(share->socket ? share->socket : "", files_charset_info);
1438 String password(share->password ? share->password : "", &my_charset_bin);
1439 DBUG_ENTER("fill_server");
1440
1441 /* Do some case conversions */
1442 scheme.reserve(scheme.length());
1443 scheme.length(my_casedn_str(&my_charset_latin1, scheme.c_ptr_safe()));
1444
1445 hostname.reserve(hostname.length());
1446 hostname.length(my_casedn_str(&my_charset_latin1, hostname.c_ptr_safe()));
1447
1448 if (lower_case_table_names)
1449 {
1450 database.reserve(database.length());
1451 database.length(my_casedn_str(system_charset_info, database.c_ptr_safe()));
1452 }
1453
1454#ifndef __WIN__
1455 /*
1456 TODO: there is no unix sockets under windows so the engine should be
1457 revised about using sockets in such environment.
1458 */
1459 if (lower_case_file_system && socket.length())
1460 {
1461 socket.reserve(socket.length());
1462 socket.length(my_casedn_str(files_charset_info, socket.c_ptr_safe()));
1463 }
1464#endif
1465
1466 /* start with all bytes zeroed */
1467 bzero(server, sizeof(*server));
1468
1469 key.length(0);
1470 key.reserve(scheme.length() + hostname.length() + database.length() +
1471 socket.length() + username.length() + password.length() +
1472 sizeof(int) + 8);
1473 key.append(scheme);
1474 key.q_append('\0');
1475 server->hostname= (const char *) (intptr) key.length();
1476 key.append(hostname);
1477 key.q_append('\0');
1478 server->database= (const char *) (intptr) key.length();
1479 key.append(database);
1480 key.q_append('\0');
1481 key.q_append((uint32) share->port);
1482 server->socket= (const char *) (intptr) key.length();
1483 key.append(socket);
1484 key.q_append('\0');
1485 server->username= (const char *) (intptr) key.length();
1486 key.append(username);
1487 key.q_append('\0');
1488 server->password= (const char *) (intptr) key.length();
1489 key.append(password);
1490 key.c_ptr_safe(); // Ensure we have end \0
1491
1492 server->key_length= key.length();
1493 /* Copy and add end \0 */
1494 server->key= (uchar *) strmake_root(mem_root, key.ptr(), key.length());
1495
1496 /* pointer magic */
1497 server->scheme+= (intptr) server->key;
1498 server->hostname+= (intptr) server->key;
1499 server->database+= (intptr) server->key;
1500 server->username+= (intptr) server->key;
1501 server->password+= (intptr) server->key;
1502 server->socket+= (intptr) server->key;
1503 server->port= share->port;
1504
1505 if (!share->socket)
1506 server->socket= NULL;
1507 if (!share->password)
1508 server->password= NULL;
1509
1510 if (table_charset)
1511 server->csname= strdup_root(mem_root, table_charset->csname);
1512
1513 DBUG_VOID_RETURN;
1514}
1515
1516
1517static FEDERATEDX_SERVER *get_server(FEDERATEDX_SHARE *share, TABLE *table)
1518{
1519 FEDERATEDX_SERVER *server= NULL, tmp_server;
1520 MEM_ROOT mem_root;
1521 char buffer[STRING_BUFFER_USUAL_SIZE];
1522 String key(buffer, sizeof(buffer), &my_charset_bin);
1523 String scheme(share->scheme, &my_charset_latin1);
1524 String hostname(share->hostname, &my_charset_latin1);
1525 String database(share->database, system_charset_info);
1526 String username(share->username, system_charset_info);
1527 String socket(share->socket ? share->socket : "", files_charset_info);
1528 String password(share->password ? share->password : "", &my_charset_bin);
1529 DBUG_ENTER("ha_federated.cc::get_server");
1530
1531 mysql_mutex_assert_owner(&federatedx_mutex);
1532
1533 init_alloc_root(&mem_root, "federated", 4096, 4096, MYF(0));
1534
1535 fill_server(&mem_root, &tmp_server, share, table ? table->s->table_charset : 0);
1536
1537 if (!(server= (FEDERATEDX_SERVER *) my_hash_search(&federatedx_open_servers,
1538 tmp_server.key,
1539 tmp_server.key_length)))
1540 {
1541 if (!table || !tmp_server.csname)
1542 goto error;
1543
1544 if (!(server= (FEDERATEDX_SERVER *) memdup_root(&mem_root,
1545 (char *) &tmp_server,
1546 sizeof(*server))))
1547 goto error;
1548
1549 server->mem_root= mem_root;
1550
1551 if (my_hash_insert(&federatedx_open_servers, (uchar*) server))
1552 goto error;
1553
1554 mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex,
1555 &server->mutex, MY_MUTEX_INIT_FAST);
1556 }
1557 else
1558 free_root(&mem_root, MYF(0)); /* prevents memory leak */
1559
1560 server->use_count++;
1561
1562 DBUG_RETURN(server);
1563error:
1564 free_root(&mem_root, MYF(0));
1565 DBUG_RETURN(NULL);
1566}
1567
1568
1569/*
1570 Example of simple lock controls. The "share" it creates is structure we will
1571 pass to each federatedx handler. Do you have to have one of these? Well, you
1572 have pieces that are used for locking, and they are needed to function.
1573*/
1574
1575static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table)
1576{
1577 char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
1578 Field **field;
1579 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
1580 FEDERATEDX_SHARE *share= NULL, tmp_share;
1581 MEM_ROOT mem_root;
1582 DBUG_ENTER("ha_federatedx.cc::get_share");
1583
1584 /*
1585 In order to use this string, we must first zero it's length,
1586 or it will contain garbage
1587 */
1588 query.length(0);
1589
1590 bzero(&tmp_share, sizeof(tmp_share));
1591 init_alloc_root(&mem_root, "federated", 256, 0, MYF(0));
1592
1593 mysql_mutex_lock(&federatedx_mutex);
1594
1595 if (unlikely(!UTC))
1596 {
1597 String tz_00_name(STRING_WITH_LEN("+00:00"), &my_charset_bin);
1598 UTC= my_tz_find(current_thd, &tz_00_name);
1599 }
1600
1601 tmp_share.share_key= table_name;
1602 tmp_share.share_key_length= (int)strlen(table_name);
1603 if (parse_url(&mem_root, &tmp_share, table->s, 0))
1604 goto error;
1605
1606 /* TODO: change tmp_share.scheme to LEX_STRING object */
1607 if (!(share= (FEDERATEDX_SHARE *) my_hash_search(&federatedx_open_tables,
1608 (uchar*) tmp_share.share_key,
1609 tmp_share.
1610 share_key_length)))
1611 {
1612 query.set_charset(system_charset_info);
1613 query.append(STRING_WITH_LEN("SELECT "));
1614 for (field= table->field; *field; field++)
1615 {
1616 append_ident(&query, (*field)->field_name.str,
1617 (*field)->field_name.length, ident_quote_char);
1618 query.append(STRING_WITH_LEN(", "));
1619 }
1620 /* chops off trailing comma */
1621 query.length(query.length() - sizeof_trailing_comma);
1622
1623 query.append(STRING_WITH_LEN(" FROM "));
1624
1625 append_ident(&query, tmp_share.table_name,
1626 tmp_share.table_name_length, ident_quote_char);
1627
1628 if (!(share= (FEDERATEDX_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
1629 !(share->share_key= (char*) memdup_root(&mem_root, tmp_share.share_key, tmp_share.share_key_length+1)) ||
1630 !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length())))
1631 goto error;
1632
1633 share->mem_root= mem_root;
1634
1635 DBUG_PRINT("info",
1636 ("share->select_query %s", share->select_query));
1637
1638 if (!(share->s= get_server(share, table)))
1639 goto error;
1640
1641 if (my_hash_insert(&federatedx_open_tables, (uchar*) share))
1642 goto error;
1643 thr_lock_init(&share->lock);
1644 }
1645 else
1646 free_root(&mem_root, MYF(0)); /* prevents memory leak */
1647
1648 share->use_count++;
1649 mysql_mutex_unlock(&federatedx_mutex);
1650
1651 DBUG_RETURN(share);
1652
1653error:
1654 mysql_mutex_unlock(&federatedx_mutex);
1655 free_root(&mem_root, MYF(0));
1656 DBUG_RETURN(NULL);
1657}
1658
1659
1660static federatedx_txn zero_txn;
1661static int free_server(federatedx_txn *txn, FEDERATEDX_SERVER *server)
1662{
1663 bool destroy;
1664 DBUG_ENTER("free_server");
1665
1666 mysql_mutex_lock(&federatedx_mutex);
1667 if ((destroy= !--server->use_count))
1668 my_hash_delete(&federatedx_open_servers, (uchar*) server);
1669 mysql_mutex_unlock(&federatedx_mutex);
1670
1671 if (destroy)
1672 {
1673 MEM_ROOT mem_root;
1674
1675 if (!txn)
1676 txn= &zero_txn;
1677
1678 txn->close(server);
1679
1680 DBUG_ASSERT(server->io_count == 0);
1681
1682 mysql_mutex_destroy(&server->mutex);
1683 mem_root= server->mem_root;
1684 free_root(&mem_root, MYF(0));
1685 }
1686
1687 DBUG_RETURN(0);
1688}
1689
1690
1691/*
1692 Free lock controls. We call this whenever we close a table.
1693 If the table had the last reference to the share then we
1694 free memory associated with it.
1695*/
1696
1697static void free_share(federatedx_txn *txn, FEDERATEDX_SHARE *share)
1698{
1699 bool destroy;
1700 DBUG_ENTER("free_share");
1701
1702 mysql_mutex_lock(&federatedx_mutex);
1703 if ((destroy= !--share->use_count))
1704 my_hash_delete(&federatedx_open_tables, (uchar*) share);
1705 mysql_mutex_unlock(&federatedx_mutex);
1706
1707 if (destroy)
1708 {
1709 MEM_ROOT mem_root;
1710 FEDERATEDX_SERVER *server= share->s;
1711
1712 thr_lock_delete(&share->lock);
1713
1714 mem_root= share->mem_root;
1715 free_root(&mem_root, MYF(0));
1716
1717 free_server(txn, server);
1718 }
1719
1720 DBUG_VOID_RETURN;
1721}
1722
1723
1724ha_rows ha_federatedx::records_in_range(uint inx, key_range *start_key,
1725 key_range *end_key)
1726{
1727 /*
1728
1729 We really want indexes to be used as often as possible, therefore
1730 we just need to hard-code the return value to a very low number to
1731 force the issue
1732
1733*/
1734 DBUG_ENTER("ha_federatedx::records_in_range");
1735 DBUG_RETURN(FEDERATEDX_RECORDS_IN_RANGE);
1736}
1737
1738federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create)
1739{
1740 federatedx_txn **txnp= (federatedx_txn **) ha_data(thd);
1741 if (!*txnp && !no_create)
1742 *txnp= new federatedx_txn();
1743 return *txnp;
1744}
1745
1746
1747int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
1748{
1749 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
1750 delete txn;
1751 *((federatedx_txn **) thd_ha_data(thd, hton))= 0;
1752 return 0;
1753}
1754
1755
1756/*
1757 Used for opening tables. The name will be the name of the file.
1758 A table is opened when it needs to be opened. For instance
1759 when a request comes in for a select on the table (tables are not
1760 open and closed for each request, they are cached).
1761
1762 Called from handler.cc by handler::ha_open(). The server opens
1763 all tables by calling ha_open() which then calls the handler
1764 specific open().
1765*/
1766
1767int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
1768{
1769 int error;
1770 THD *thd= ha_thd();
1771 DBUG_ENTER("ha_federatedx::open");
1772
1773 if (!(share= get_share(name, table)))
1774 DBUG_RETURN(1);
1775 thr_lock_data_init(&share->lock, &lock, NULL);
1776
1777 DBUG_ASSERT(io == NULL);
1778
1779 txn= get_txn(thd);
1780
1781 if ((error= txn->acquire(share, thd, TRUE, &io)))
1782 {
1783 free_share(txn, share);
1784 DBUG_RETURN(error);
1785 }
1786
1787 ref_length= (uint)io->get_ref_length();
1788
1789 txn->release(&io);
1790
1791 DBUG_PRINT("info", ("ref_length: %u", ref_length));
1792
1793 my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4, MYF(0));
1794
1795 reset();
1796
1797 DBUG_RETURN(0);
1798}
1799
1800class Net_error_handler : public Internal_error_handler
1801{
1802public:
1803 Net_error_handler() {}
1804
1805public:
1806 bool handle_condition(THD *thd, uint sql_errno, const char* sqlstate,
1807 Sql_condition::enum_warning_level *level,
1808 const char* msg, Sql_condition ** cond_hdl)
1809 {
1810 return sql_errno >= ER_ABORTING_CONNECTION &&
1811 sql_errno <= ER_NET_WRITE_INTERRUPTED;
1812 }
1813};
1814
1815/*
1816 Closes a table. We call the free_share() function to free any resources
1817 that we have allocated in the "shared" structure.
1818
1819 Called from sql_base.cc, sql_select.cc, and table.cc.
1820 In sql_select.cc it is only used to close up temporary tables or during
1821 the process where a temporary table is converted over to being a
1822 myisam table.
1823 For sql_base.cc look at close_data_tables().
1824*/
1825
1826int ha_federatedx::close(void)
1827{
1828 int retval= 0;
1829 THD *thd= ha_thd();
1830 DBUG_ENTER("ha_federatedx::close");
1831
1832 /* free the result set */
1833 reset();
1834
1835 delete_dynamic(&results);
1836
1837 /* Disconnect from mysql */
1838 if (!thd || !(txn= get_txn(thd, true)))
1839 txn= &zero_txn;
1840
1841 txn->release(&io);
1842 DBUG_ASSERT(io == NULL);
1843
1844 Net_error_handler err_handler;
1845 if (thd)
1846 thd->push_internal_handler(&err_handler);
1847 free_share(txn, share);
1848 if (thd)
1849 thd->pop_internal_handler();
1850
1851 DBUG_RETURN(retval);
1852}
1853
1854/*
1855
1856 Checks if a field in a record is SQL NULL.
1857
1858 SYNOPSIS
1859 field_in_record_is_null()
1860 table TABLE pointer, MySQL table object
1861 field Field pointer, MySQL field object
1862 record char pointer, contains record
1863
1864 DESCRIPTION
1865 This method uses the record format information in table to track
1866 the null bit in record.
1867
1868 RETURN VALUE
1869 1 if NULL
1870 0 otherwise
1871*/
1872
1873static inline uint field_in_record_is_null(TABLE *table, Field *field,
1874 char *record)
1875{
1876 int null_offset;
1877 DBUG_ENTER("ha_federatedx::field_in_record_is_null");
1878
1879 if (!field->null_ptr)
1880 DBUG_RETURN(0);
1881
1882 null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
1883
1884 if (record[null_offset] & field->null_bit)
1885 DBUG_RETURN(1);
1886
1887 DBUG_RETURN(0);
1888}
1889
1890
1891/**
1892 @brief Construct the INSERT statement.
1893
1894 @details This method will construct the INSERT statement and appends it to
1895 the supplied query string buffer.
1896
1897 @return
1898 @retval FALSE No error
1899 @retval TRUE Failure
1900*/
1901
1902bool ha_federatedx::append_stmt_insert(String *query)
1903{
1904 char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
1905 Field **field;
1906 uint tmp_length;
1907 bool added_field= FALSE;
1908
1909 /* The main insert query string */
1910 String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
1911 DBUG_ENTER("ha_federatedx::append_stmt_insert");
1912
1913 insert_string.length(0);
1914
1915 if (replace_duplicates)
1916 insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
1917 else if (ignore_duplicates && !insert_dup_update)
1918 insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
1919 else
1920 insert_string.append(STRING_WITH_LEN("INSERT INTO "));
1921 append_ident(&insert_string, share->table_name, share->table_name_length,
1922 ident_quote_char);
1923 tmp_length= insert_string.length();
1924 insert_string.append(STRING_WITH_LEN(" ("));
1925
1926 /*
1927 loop through the field pointer array, add any fields to both the values
1928 list and the fields list that match the current query id
1929 */
1930 for (field= table->field; *field; field++)
1931 {
1932 if (bitmap_is_set(table->write_set, (*field)->field_index))
1933 {
1934 /* append the field name */
1935 append_ident(&insert_string, (*field)->field_name.str,
1936 (*field)->field_name.length, ident_quote_char);
1937
1938 /* append commas between both fields and fieldnames */
1939 /*
1940 unfortunately, we can't use the logic if *(fields + 1) to
1941 make the following appends conditional as we don't know if the
1942 next field is in the write set
1943 */
1944 insert_string.append(STRING_WITH_LEN(", "));
1945 added_field= TRUE;
1946 }
1947 }
1948
1949 if (added_field)
1950 {
1951 /* Remove trailing comma. */
1952 insert_string.length(insert_string.length() - sizeof_trailing_comma);
1953 insert_string.append(STRING_WITH_LEN(") "));
1954 }
1955 else
1956 {
1957 /* If there were no fields, we don't want to add a closing paren. */
1958 insert_string.length(tmp_length);
1959 }
1960
1961 insert_string.append(STRING_WITH_LEN(" VALUES "));
1962
1963 DBUG_RETURN(query->append(insert_string));
1964}
1965
1966
1967/*
1968 write_row() inserts a row. No extra() hint is given currently if a bulk load
1969 is happeneding. buf() is a byte array of data. You can use the field
1970 information to extract the data from the native byte array type.
1971 Example of this would be:
1972 for (Field **field=table->field ; *field ; field++)
1973 {
1974 ...
1975 }
1976
1977 Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
1978 sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
1979*/
1980
1981int ha_federatedx::write_row(uchar *buf)
1982{
1983 char values_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
1984 char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1985 Field **field;
1986 uint tmp_length;
1987 int error= 0;
1988 bool use_bulk_insert;
1989 bool auto_increment_update_required= (table->next_number_field != NULL);
1990
1991 /* The string containing the values to be added to the insert */
1992 String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
1993 /* The actual value of the field, to be added to the values_string */
1994 String insert_field_value_string(insert_field_value_buffer,
1995 sizeof(insert_field_value_buffer),
1996 &my_charset_bin);
1997 Time_zone *saved_time_zone= table->in_use->variables.time_zone;
1998 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1999 DBUG_ENTER("ha_federatedx::write_row");
2000
2001 table->in_use->variables.time_zone= UTC;
2002 values_string.length(0);
2003 insert_field_value_string.length(0);
2004
2005 /*
2006 start both our field and field values strings
2007 We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
2008 Ignore duplicates is always true when insert_dup_update is true.
2009 When replace_duplicates == TRUE, we can safely enable multi-row insert.
2010 When performing multi-row insert, we only collect the columns values for
2011 the row. The start of the statement is only created when the first
2012 row is copied in to the bulk_insert string.
2013 */
2014 if (!(use_bulk_insert= bulk_insert.str &&
2015 (!insert_dup_update || replace_duplicates)))
2016 append_stmt_insert(&values_string);
2017
2018 values_string.append(STRING_WITH_LEN(" ("));
2019 tmp_length= values_string.length();
2020
2021 /*
2022 loop through the field pointer array, add any fields to both the values
2023 list and the fields list that is part of the write set
2024 */
2025 for (field= table->field; *field; field++)
2026 {
2027 if (bitmap_is_set(table->write_set, (*field)->field_index))
2028 {
2029 if ((*field)->is_null())
2030 values_string.append(STRING_WITH_LEN(" NULL "));
2031 else
2032 {
2033 bool needs_quote= (*field)->str_needs_quotes();
2034 (*field)->val_str(&insert_field_value_string);
2035 if (needs_quote)
2036 values_string.append(value_quote_char);
2037 insert_field_value_string.print(&values_string);
2038 if (needs_quote)
2039 values_string.append(value_quote_char);
2040
2041 insert_field_value_string.length(0);
2042 }
2043
2044 /* append commas between both fields and fieldnames */
2045 /*
2046 unfortunately, we can't use the logic if *(fields + 1) to
2047 make the following appends conditional as we don't know if the
2048 next field is in the write set
2049 */
2050 values_string.append(STRING_WITH_LEN(", "));
2051 }
2052 }
2053 dbug_tmp_restore_column_map(table->read_set, old_map);
2054 table->in_use->variables.time_zone= saved_time_zone;
2055
2056 /*
2057 if there were no fields, we don't want to add a closing paren
2058 AND, we don't want to chop off the last char '('
2059 insert will be "INSERT INTO t1 VALUES ();"
2060 */
2061 if (values_string.length() > tmp_length)
2062 {
2063 /* chops off trailing comma */
2064 values_string.length(values_string.length() - sizeof_trailing_comma);
2065 }
2066 /* we always want to append this, even if there aren't any fields */
2067 values_string.append(STRING_WITH_LEN(") "));
2068
2069 if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
2070 DBUG_RETURN(error);
2071
2072 if (use_bulk_insert)
2073 {
2074 /*
2075 Send the current bulk insert out if appending the current row would
2076 cause the statement to overflow the packet size, otherwise set
2077 auto_increment_update_required to FALSE as no query was executed.
2078 */
2079 if (bulk_insert.length + values_string.length() + bulk_padding >
2080 io->max_query_size() && bulk_insert.length)
2081 {
2082 error= io->query(bulk_insert.str, bulk_insert.length);
2083 bulk_insert.length= 0;
2084 }
2085 else
2086 auto_increment_update_required= FALSE;
2087
2088 if (bulk_insert.length == 0)
2089 {
2090 char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2091 String insert_string(insert_buffer, sizeof(insert_buffer),
2092 &my_charset_bin);
2093 insert_string.length(0);
2094 append_stmt_insert(&insert_string);
2095 dynstr_append_mem(&bulk_insert, insert_string.ptr(),
2096 insert_string.length());
2097 }
2098 else
2099 dynstr_append_mem(&bulk_insert, ",", 1);
2100
2101 dynstr_append_mem(&bulk_insert, values_string.ptr(),
2102 values_string.length());
2103 }
2104 else
2105 {
2106 error= io->query(values_string.ptr(), values_string.length());
2107 }
2108
2109 if (error)
2110 {
2111 DBUG_RETURN(stash_remote_error());
2112 }
2113 /*
2114 If the table we've just written a record to contains an auto_increment
2115 field, then store the last_insert_id() value from the foreign server
2116 */
2117 if (auto_increment_update_required)
2118 {
2119 update_auto_increment();
2120
2121 /* mysql_insert() uses this for protocol return value */
2122 table->next_number_field->store(stats.auto_increment_value, 1);
2123 }
2124
2125 DBUG_RETURN(0);
2126}
2127
2128
2129/**
2130 @brief Prepares the storage engine for bulk inserts.
2131
2132 @param[in] rows estimated number of rows in bulk insert
2133 or 0 if unknown.
2134
2135 @details Initializes memory structures required for bulk insert.
2136*/
2137
2138void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags)
2139{
2140 uint page_size;
2141 DBUG_ENTER("ha_federatedx::start_bulk_insert");
2142
2143 dynstr_free(&bulk_insert);
2144
2145 /**
2146 We don't bother with bulk-insert semantics when the estimated rows == 1
2147 The rows value will be 0 if the server does not know how many rows
2148 would be inserted. This can occur when performing INSERT...SELECT
2149 */
2150
2151 if (rows == 1)
2152 DBUG_VOID_RETURN;
2153
2154 /*
2155 Make sure we have an open connection so that we know the
2156 maximum packet size.
2157 */
2158 if (txn->acquire(share, ha_thd(), FALSE, &io))
2159 DBUG_VOID_RETURN;
2160
2161 page_size= (uint) my_getpagesize();
2162
2163 if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
2164 DBUG_VOID_RETURN;
2165
2166 bulk_insert.length= 0;
2167 DBUG_VOID_RETURN;
2168}
2169
2170
2171/**
2172 @brief End bulk insert.
2173
2174 @details This method will send any remaining rows to the remote server.
2175 Finally, it will deinitialize the bulk insert data structure.
2176
2177 @return Operation status
2178 @retval 0 No error
2179 @retval != 0 Error occurred at remote server. Also sets my_errno.
2180*/
2181
2182int ha_federatedx::end_bulk_insert()
2183{
2184 int error= 0;
2185 DBUG_ENTER("ha_federatedx::end_bulk_insert");
2186
2187 if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted)
2188 {
2189 if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
2190 DBUG_RETURN(error);
2191 if (io->query(bulk_insert.str, bulk_insert.length))
2192 error= stash_remote_error();
2193 else
2194 if (table->next_number_field)
2195 update_auto_increment();
2196 }
2197
2198 dynstr_free(&bulk_insert);
2199
2200 DBUG_RETURN(my_errno= error);
2201}
2202
2203
2204/*
2205 ha_federatedx::update_auto_increment
2206
2207 This method ensures that last_insert_id() works properly. What it simply does
2208 is calls last_insert_id() on the foreign database immediately after insert
2209 (if the table has an auto_increment field) and sets the insert id via
2210 thd->insert_id(ID)).
2211*/
2212void ha_federatedx::update_auto_increment(void)
2213{
2214 THD *thd= ha_thd();
2215 DBUG_ENTER("ha_federatedx::update_auto_increment");
2216
2217 ha_federatedx::info(HA_STATUS_AUTO);
2218 thd->first_successful_insert_id_in_cur_stmt=
2219 stats.auto_increment_value;
2220 DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
2221
2222 DBUG_VOID_RETURN;
2223}
2224
2225int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt)
2226{
2227 int error= 0;
2228 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2229 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2230 DBUG_ENTER("ha_federatedx::optimize");
2231
2232 query.length(0);
2233
2234 query.set_charset(system_charset_info);
2235 query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
2236 append_ident(&query, share->table_name, share->table_name_length,
2237 ident_quote_char);
2238
2239 DBUG_ASSERT(txn == get_txn(thd));
2240
2241 if ((error= txn->acquire(share, thd, FALSE, &io)))
2242 DBUG_RETURN(error);
2243
2244 if (io->query(query.ptr(), query.length()))
2245 error= stash_remote_error();
2246
2247 DBUG_RETURN(error);
2248}
2249
2250
2251int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt)
2252{
2253 int error= 0;
2254 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2255 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
2256 DBUG_ENTER("ha_federatedx::repair");
2257
2258 query.length(0);
2259
2260 query.set_charset(system_charset_info);
2261 query.append(STRING_WITH_LEN("REPAIR TABLE "));
2262 append_ident(&query, share->table_name, share->table_name_length,
2263 ident_quote_char);
2264 if (check_opt->flags & T_QUICK)
2265 query.append(STRING_WITH_LEN(" QUICK"));
2266 if (check_opt->flags & T_EXTEND)
2267 query.append(STRING_WITH_LEN(" EXTENDED"));
2268 if (check_opt->sql_flags & TT_USEFRM)
2269 query.append(STRING_WITH_LEN(" USE_FRM"));
2270
2271 DBUG_ASSERT(txn == get_txn(thd));
2272
2273 if ((error= txn->acquire(share, thd, FALSE, &io)))
2274 DBUG_RETURN(error);
2275
2276 if (io->query(query.ptr(), query.length()))
2277 error= stash_remote_error();
2278
2279 DBUG_RETURN(error);
2280}
2281
2282
2283/*
2284 Yes, update_row() does what you expect, it updates a row. old_data will have
2285 the previous row record in it, while new_data will have the newest data in
2286 it.
2287
2288 Keep in mind that the server can do updates based on ordering if an ORDER BY
2289 clause was used. Consecutive ordering is not guaranteed.
2290 Currently new_data will not have an updated auto_increament record, or
2291 and updated timestamp field. You can do these for federatedx by doing these:
2292 if (table->timestamp_on_update_now)
2293 update_timestamp(new_row+table->timestamp_on_update_now-1);
2294 if (table->next_number_field && record == table->record[0])
2295 update_auto_increment();
2296
2297 Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
2298*/
2299
2300int ha_federatedx::update_row(const uchar *old_data, const uchar *new_data)
2301{
2302 /*
2303 This used to control how the query was built. If there was a
2304 primary key, the query would be built such that there was a where
2305 clause with only that column as the condition. This is flawed,
2306 because if we have a multi-part primary key, it would only use the
2307 first part! We don't need to do this anyway, because
2308 read_range_first will retrieve the correct record, which is what
2309 is used to build the WHERE clause. We can however use this to
2310 append a LIMIT to the end if there is NOT a primary key. Why do
2311 this? Because we only are updating one record, and LIMIT enforces
2312 this.
2313 */
2314 bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY);
2315
2316 /*
2317 buffers for following strings
2318 */
2319 char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2320 char update_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2321 char where_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2322
2323 /* Work area for field values */
2324 String field_value(field_value_buffer, sizeof(field_value_buffer),
2325 &my_charset_bin);
2326 /* stores the update query */
2327 String update_string(update_buffer,
2328 sizeof(update_buffer),
2329 &my_charset_bin);
2330 /* stores the WHERE clause */
2331 String where_string(where_buffer,
2332 sizeof(where_buffer),
2333 &my_charset_bin);
2334 uchar *record= table->record[0];
2335 int error;
2336 DBUG_ENTER("ha_federatedx::update_row");
2337 /*
2338 set string lengths to 0 to avoid misc chars in string
2339 */
2340 field_value.length(0);
2341 update_string.length(0);
2342 where_string.length(0);
2343
2344 if (ignore_duplicates)
2345 update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
2346 else
2347 update_string.append(STRING_WITH_LEN("UPDATE "));
2348 append_ident(&update_string, share->table_name,
2349 share->table_name_length, ident_quote_char);
2350 update_string.append(STRING_WITH_LEN(" SET "));
2351
2352 /*
2353 In this loop, we want to match column names to values being inserted
2354 (while building INSERT statement).
2355
2356 Iterate through table->field (new data) and share->old_field (old_data)
2357 using the same index to create an SQL UPDATE statement. New data is
2358 used to create SET field=value and old data is used to create WHERE
2359 field=oldvalue
2360 */
2361
2362 Time_zone *saved_time_zone= table->in_use->variables.time_zone;
2363 table->in_use->variables.time_zone= UTC;
2364 for (Field **field= table->field; *field; field++)
2365 {
2366 if (bitmap_is_set(table->write_set, (*field)->field_index))
2367 {
2368 append_ident(&update_string, (*field)->field_name.str,
2369 (*field)->field_name.length,
2370 ident_quote_char);
2371 update_string.append(STRING_WITH_LEN(" = "));
2372
2373 if ((*field)->is_null())
2374 update_string.append(STRING_WITH_LEN(" NULL "));
2375 else
2376 {
2377 /* otherwise = */
2378 my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2379 bool needs_quote= (*field)->str_needs_quotes();
2380 (*field)->val_str(&field_value);
2381 if (needs_quote)
2382 update_string.append(value_quote_char);
2383 field_value.print(&update_string);
2384 if (needs_quote)
2385 update_string.append(value_quote_char);
2386 field_value.length(0);
2387 tmp_restore_column_map(table->read_set, old_map);
2388 }
2389 update_string.append(STRING_WITH_LEN(", "));
2390 }
2391
2392 if (bitmap_is_set(table->read_set, (*field)->field_index))
2393 {
2394 append_ident(&where_string, (*field)->field_name.str,
2395 (*field)->field_name.length,
2396 ident_quote_char);
2397 if (field_in_record_is_null(table, *field, (char*) old_data))
2398 where_string.append(STRING_WITH_LEN(" IS NULL "));
2399 else
2400 {
2401 bool needs_quote= (*field)->str_needs_quotes();
2402 where_string.append(STRING_WITH_LEN(" = "));
2403 (*field)->val_str(&field_value,
2404 (old_data + (*field)->offset(record)));
2405 if (needs_quote)
2406 where_string.append(value_quote_char);
2407 field_value.print(&where_string);
2408 if (needs_quote)
2409 where_string.append(value_quote_char);
2410 field_value.length(0);
2411 }
2412 where_string.append(STRING_WITH_LEN(" AND "));
2413 }
2414 }
2415 table->in_use->variables.time_zone= saved_time_zone;
2416
2417 /* Remove last ', '. This works as there must be at least on updated field */
2418 update_string.length(update_string.length() - sizeof_trailing_comma);
2419
2420 if (where_string.length())
2421 {
2422 /* chop off trailing AND */
2423 where_string.length(where_string.length() - sizeof_trailing_and);
2424 update_string.append(STRING_WITH_LEN(" WHERE "));
2425 update_string.append(where_string);
2426 }
2427
2428 /*
2429 If this table has not a primary key, then we could possibly
2430 update multiple rows. We want to make sure to only update one!
2431 */
2432 if (!has_a_primary_key)
2433 update_string.append(STRING_WITH_LEN(" LIMIT 1"));
2434
2435 if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
2436 DBUG_RETURN(error);
2437
2438 if (io->query(update_string.ptr(), update_string.length()))
2439 {
2440 DBUG_RETURN(stash_remote_error());
2441 }
2442 DBUG_RETURN(0);
2443}
2444
2445/*
2446 This will delete a row. 'buf' will contain a copy of the row to be =deleted.
2447 The server will call this right after the current row has been called (from
2448 either a previous rnd_next() or index call).
2449 If you keep a pointer to the last row or can access a primary key it will
2450 make doing the deletion quite a bit easier.
2451 Keep in mind that the server does no guarentee consecutive deletions.
2452 ORDER BY clauses can be used.
2453
2454 Called in sql_acl.cc and sql_udf.cc to manage internal table information.
2455 Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
2456 it is used for removing duplicates while in insert it is used for REPLACE
2457 calls.
2458*/
2459
2460int ha_federatedx::delete_row(const uchar *buf)
2461{
2462 char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2463 char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2464 String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
2465 String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
2466 uint found= 0;
2467 int error;
2468 DBUG_ENTER("ha_federatedx::delete_row");
2469
2470 delete_string.length(0);
2471 delete_string.append(STRING_WITH_LEN("DELETE FROM "));
2472 append_ident(&delete_string, share->table_name,
2473 share->table_name_length, ident_quote_char);
2474 delete_string.append(STRING_WITH_LEN(" WHERE "));
2475
2476 Time_zone *saved_time_zone= table->in_use->variables.time_zone;
2477 table->in_use->variables.time_zone= UTC;
2478 for (Field **field= table->field; *field; field++)
2479 {
2480 Field *cur_field= *field;
2481 found++;
2482 if (bitmap_is_set(table->read_set, cur_field->field_index))
2483 {
2484 append_ident(&delete_string, (*field)->field_name.str,
2485 (*field)->field_name.length, ident_quote_char);
2486 data_string.length(0);
2487 if (cur_field->is_null())
2488 {
2489 delete_string.append(STRING_WITH_LEN(" IS NULL "));
2490 }
2491 else
2492 {
2493 bool needs_quote= cur_field->str_needs_quotes();
2494 delete_string.append(STRING_WITH_LEN(" = "));
2495 cur_field->val_str(&data_string);
2496 if (needs_quote)
2497 delete_string.append(value_quote_char);
2498 data_string.print(&delete_string);
2499 if (needs_quote)
2500 delete_string.append(value_quote_char);
2501 }
2502 delete_string.append(STRING_WITH_LEN(" AND "));
2503 }
2504 }
2505 table->in_use->variables.time_zone= saved_time_zone;
2506
2507 // Remove trailing AND
2508 delete_string.length(delete_string.length() - sizeof_trailing_and);
2509 if (!found)
2510 delete_string.length(delete_string.length() - sizeof_trailing_where);
2511
2512 delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
2513 DBUG_PRINT("info",
2514 ("Delete sql: %s", delete_string.c_ptr_quick()));
2515
2516 if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
2517 DBUG_RETURN(error);
2518
2519 if (io->query(delete_string.ptr(), delete_string.length()))
2520 {
2521 DBUG_RETURN(stash_remote_error());
2522 }
2523 stats.deleted+= (ha_rows) io->affected_rows();
2524 stats.records-= (ha_rows) io->affected_rows();
2525 DBUG_PRINT("info",
2526 ("rows deleted %ld rows deleted for all time %ld",
2527 (long) io->affected_rows(), (long) stats.deleted));
2528
2529 DBUG_RETURN(0);
2530}
2531
2532
2533/*
2534 Positions an index cursor to the index specified in the handle. Fetches the
2535 row if available. If the key value is null, begin at the first key of the
2536 index. This method, which is called in the case of an SQL statement having
2537 a WHERE clause on a non-primary key index, simply calls index_read_idx.
2538*/
2539
2540int ha_federatedx::index_read(uchar *buf, const uchar *key,
2541 uint key_len, ha_rkey_function find_flag)
2542{
2543 DBUG_ENTER("ha_federatedx::index_read");
2544
2545 if (stored_result)
2546 (void) free_result();
2547 DBUG_RETURN(index_read_idx_with_result_set(buf, active_index, key,
2548 key_len, find_flag,
2549 &stored_result));
2550}
2551
2552
2553/*
2554 Positions an index cursor to the index specified in key. Fetches the
2555 row if any. This is only used to read whole keys.
2556
2557 This method is called via index_read in the case of a WHERE clause using
2558 a primary key index OR is called DIRECTLY when the WHERE clause
2559 uses a PRIMARY KEY index.
2560
2561 NOTES
2562 This uses an internal result set that is deleted before function
2563 returns. We need to be able to be callable from ha_rnd_pos()
2564*/
2565
2566int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key,
2567 uint key_len, enum ha_rkey_function find_flag)
2568{
2569 int retval;
2570 FEDERATEDX_IO_RESULT *io_result= 0;
2571 DBUG_ENTER("ha_federatedx::index_read_idx");
2572
2573 if ((retval= index_read_idx_with_result_set(buf, index, key,
2574 key_len, find_flag,
2575 &io_result)))
2576 DBUG_RETURN(retval);
2577 /* io is correct, as index_read_idx_with_result_set was ok */
2578 io->free_result(io_result);
2579 DBUG_RETURN(retval);
2580}
2581
2582
2583/*
2584 Create result set for rows matching query and return first row
2585
2586 RESULT
2587 0 ok In this case *result will contain the result set
2588 # error In this case *result will contain 0
2589*/
2590
2591int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
2592 const uchar *key,
2593 uint key_len,
2594 ha_rkey_function find_flag,
2595 FEDERATEDX_IO_RESULT **result)
2596{
2597 int retval;
2598 char error_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2599 char index_value[STRING_BUFFER_USUAL_SIZE];
2600 char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2601 String index_string(index_value,
2602 sizeof(index_value),
2603 &my_charset_bin);
2604 String sql_query(sql_query_buffer,
2605 sizeof(sql_query_buffer),
2606 &my_charset_bin);
2607 key_range range;
2608 DBUG_ENTER("ha_federatedx::index_read_idx_with_result_set");
2609
2610 *result= 0; // In case of errors
2611 index_string.length(0);
2612 sql_query.length(0);
2613
2614 sql_query.append(share->select_query);
2615
2616 range.key= key;
2617 range.length= key_len;
2618 range.flag= find_flag;
2619 create_where_from_key(&index_string,
2620 &table->key_info[index],
2621 &range,
2622 NULL, 0, 0);
2623 sql_query.append(index_string);
2624
2625 if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
2626 DBUG_RETURN(retval);
2627
2628 if (io->query(sql_query.ptr(), sql_query.length()))
2629 {
2630 sprintf(error_buffer, "error: %d '%s'",
2631 io->error_code(), io->error_str());
2632 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2633 goto error;
2634 }
2635 if (!(*result= io->store_result()))
2636 {
2637 retval= HA_ERR_END_OF_FILE;
2638 goto error;
2639 }
2640 if (!(retval= read_next(buf, *result)))
2641 DBUG_RETURN(retval);
2642
2643 insert_dynamic(&results, (uchar*) result);
2644 *result= 0;
2645 DBUG_RETURN(retval);
2646
2647error:
2648 my_error(retval, MYF(0), error_buffer);
2649 DBUG_RETURN(retval);
2650}
2651
2652
2653/*
2654 This method is used exlusevely by filesort() to check if we
2655 can create sorting buffers of necessary size.
2656 If the handler returns more records that it declares
2657 here server can just crash on filesort().
2658 We cannot guarantee that's not going to happen with
2659 the FEDERATEDX engine, as we have records==0 always if the
2660 client is a VIEW, and for the table the number of
2661 records can inpredictably change during execution.
2662 So we return maximum possible value here.
2663*/
2664
2665ha_rows ha_federatedx::estimate_rows_upper_bound()
2666{
2667 return HA_POS_ERROR;
2668}
2669
2670
2671/* Initialized at each key walk (called multiple times unlike rnd_init()) */
2672
2673int ha_federatedx::index_init(uint keynr, bool sorted)
2674{
2675 DBUG_ENTER("ha_federatedx::index_init");
2676 DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr));
2677 active_index= keynr;
2678 DBUG_RETURN(0);
2679}
2680
2681
2682/*
2683 Read first range
2684*/
2685
2686int ha_federatedx::read_range_first(const key_range *start_key,
2687 const key_range *end_key,
2688 bool eq_range_arg, bool sorted)
2689{
2690 char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
2691 int retval;
2692 String sql_query(sql_query_buffer,
2693 sizeof(sql_query_buffer),
2694 &my_charset_bin);
2695 DBUG_ENTER("ha_federatedx::read_range_first");
2696
2697 DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2698
2699 sql_query.length(0);
2700 sql_query.append(share->select_query);
2701 create_where_from_key(&sql_query,
2702 &table->key_info[active_index],
2703 start_key, end_key, 0, eq_range_arg);
2704
2705 if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
2706 DBUG_RETURN(retval);
2707
2708 if (stored_result)
2709 (void) free_result();
2710
2711 if (io->query(sql_query.ptr(), sql_query.length()))
2712 {
2713 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2714 goto error;
2715 }
2716 sql_query.length(0);
2717
2718 if (!(stored_result= io->store_result()))
2719 {
2720 retval= HA_ERR_END_OF_FILE;
2721 goto error;
2722 }
2723
2724 retval= read_next(table->record[0], stored_result);
2725 DBUG_RETURN(retval);
2726
2727error:
2728 DBUG_RETURN(retval);
2729}
2730
2731
2732int ha_federatedx::read_range_next()
2733{
2734 int retval;
2735 DBUG_ENTER("ha_federatedx::read_range_next");
2736 retval= rnd_next(table->record[0]);
2737 DBUG_RETURN(retval);
2738}
2739
2740
2741/* Used to read forward through the index. */
2742int ha_federatedx::index_next(uchar *buf)
2743{
2744 DBUG_ENTER("ha_federatedx::index_next");
2745 int retval=read_next(buf, stored_result);
2746 DBUG_RETURN(retval);
2747}
2748
2749
2750/*
2751 rnd_init() is called when the system wants the storage engine to do a table
2752 scan.
2753
2754 This is the method that gets data for the SELECT calls.
2755
2756 See the federatedx in the introduction at the top of this file to see when
2757 rnd_init() is called.
2758
2759 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2760 sql_table.cc, and sql_update.cc.
2761*/
2762
2763int ha_federatedx::rnd_init(bool scan)
2764{
2765 DBUG_ENTER("ha_federatedx::rnd_init");
2766 /*
2767 The use of the 'scan' flag is incredibly important for this handler
2768 to work properly, especially with updates containing WHERE clauses
2769 using indexed columns.
2770
2771 When the initial query contains a WHERE clause of the query using an
2772 indexed column, it's index_read_idx that selects the exact record from
2773 the foreign database.
2774
2775 When there is NO index in the query, either due to not having a WHERE
2776 clause, or the WHERE clause is using columns that are not indexed, a
2777 'full table scan' done by rnd_init, which in this situation simply means
2778 a 'select * from ...' on the foreign table.
2779
2780 In other words, this 'scan' flag gives us the means to ensure that if
2781 there is an index involved in the query, we want index_read_idx to
2782 retrieve the exact record (scan flag is 0), and do not want rnd_init
2783 to do a 'full table scan' and wipe out that result set.
2784
2785 Prior to using this flag, the problem was most apparent with updates.
2786
2787 An initial query like 'UPDATE tablename SET anything = whatever WHERE
2788 indexedcol = someval', index_read_idx would get called, using a query
2789 constructed with a WHERE clause built from the values of index ('indexcol'
2790 in this case, having a value of 'someval'). mysql_store_result would
2791 then get called (this would be the result set we want to use).
2792
2793 After this rnd_init (from sql_update.cc) would be called, it would then
2794 unecessarily call "select * from table" on the foreign table, then call
2795 mysql_store_result, which would wipe out the correct previous result set
2796 from the previous call of index_read_idx's that had the result set
2797 containing the correct record, hence update the wrong row!
2798
2799 */
2800
2801 if (scan)
2802 {
2803 int error;
2804
2805 if ((error= txn->acquire(share, ha_thd(), TRUE, &io)))
2806 DBUG_RETURN(error);
2807
2808 if (stored_result)
2809 (void) free_result();
2810
2811 if (io->query(share->select_query,
2812 strlen(share->select_query)))
2813 goto error;
2814
2815 stored_result= io->store_result();
2816 if (!stored_result)
2817 goto error;
2818 }
2819 DBUG_RETURN(0);
2820
2821error:
2822 DBUG_RETURN(stash_remote_error());
2823}
2824
2825
2826int ha_federatedx::rnd_end()
2827{
2828 DBUG_ENTER("ha_federatedx::rnd_end");
2829 DBUG_RETURN(index_end());
2830}
2831
2832
2833int ha_federatedx::free_result()
2834{
2835 int error;
2836 DBUG_ENTER("ha_federatedx::free_result");
2837 DBUG_ASSERT(stored_result);
2838 for (uint i= 0; i < results.elements; ++i)
2839 {
2840 FEDERATEDX_IO_RESULT *result= 0;
2841 get_dynamic(&results, (uchar*) &result, i);
2842 if (result == stored_result)
2843 goto end;
2844 }
2845 if (position_called)
2846 {
2847 insert_dynamic(&results, (uchar*) &stored_result);
2848 }
2849 else
2850 {
2851 federatedx_io *tmp_io= 0, **iop;
2852 if (!*(iop= &io) && (error= txn->acquire(share, ha_thd(), TRUE, (iop= &tmp_io))))
2853 {
2854 DBUG_ASSERT(0); // Fail when testing
2855 insert_dynamic(&results, (uchar*) &stored_result);
2856 goto end;
2857 }
2858 (*iop)->free_result(stored_result);
2859 txn->release(&tmp_io);
2860 }
2861end:
2862 stored_result= 0;
2863 position_called= FALSE;
2864 DBUG_RETURN(0);
2865}
2866
2867int ha_federatedx::index_end(void)
2868{
2869 int error= 0;
2870 DBUG_ENTER("ha_federatedx::index_end");
2871 if (stored_result)
2872 error= free_result();
2873 active_index= MAX_KEY;
2874 DBUG_RETURN(error);
2875}
2876
2877
2878/*
2879 This is called for each row of the table scan. When you run out of records
2880 you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
2881 The Field structure for the table is the key to getting data into buf
2882 in a manner that will allow the server to understand it.
2883
2884 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
2885 sql_table.cc, and sql_update.cc.
2886*/
2887
2888int ha_federatedx::rnd_next(uchar *buf)
2889{
2890 DBUG_ENTER("ha_federatedx::rnd_next");
2891
2892 if (stored_result == 0)
2893 {
2894 /*
2895 Return value of rnd_init is not always checked (see records.cc),
2896 so we can get here _even_ if there is _no_ pre-fetched result-set!
2897 TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
2898 */
2899 DBUG_RETURN(1);
2900 }
2901 int retval=read_next(buf, stored_result);
2902 DBUG_RETURN(retval);
2903}
2904
2905
2906/*
2907 ha_federatedx::read_next
2908
2909 reads from a result set and converts to mysql internal
2910 format
2911
2912 SYNOPSIS
2913 field_in_record_is_null()
2914 buf byte pointer to record
2915 result mysql result set
2916
2917 DESCRIPTION
2918 This method is a wrapper method that reads one record from a result
2919 set and converts it to the internal table format
2920
2921 RETURN VALUE
2922 1 error
2923 0 no error
2924*/
2925
2926int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
2927{
2928 int retval;
2929 FEDERATEDX_IO_ROW *row;
2930 DBUG_ENTER("ha_federatedx::read_next");
2931
2932 if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
2933 DBUG_RETURN(retval);
2934
2935 /* Fetch a row, insert it back in a row format. */
2936 if (!(row= io->fetch_row(result)))
2937 DBUG_RETURN(HA_ERR_END_OF_FILE);
2938
2939 if (!(retval= convert_row_to_internal_format(buf, row, result)))
2940 table->status= 0;
2941
2942 DBUG_RETURN(retval);
2943}
2944
2945
2946/**
2947 @brief Store a reference to current row.
2948
2949 @details During a query execution we may have different result sets (RS),
2950 e.g. for different ranges. All the RS's used are stored in
2951 memory and placed in @c results dynamic array. At the end of
2952 execution all stored RS's are freed at once in the
2953 @c ha_federated::reset().
2954 So, in case of federated, a reference to current row is a
2955 stored result address and current data cursor position.
2956 As we keep all RS in memory during a query execution,
2957 we can get any record using the reference any time until
2958 @c ha_federated::reset() is called.
2959 TODO: we don't have to store all RS's rows but only those
2960 we call @c ha_federated::position() for, so we can free memory
2961 where we store other rows in the @c ha_federated::index_end().
2962
2963 @param[in] record record data (unused)
2964
2965*/
2966
2967void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
2968{
2969 DBUG_ENTER("ha_federatedx::position");
2970
2971 if (!stored_result)
2972 {
2973 bzero(ref, ref_length);
2974 DBUG_VOID_RETURN;
2975 }
2976
2977 if (txn->acquire(share, ha_thd(), TRUE, &io))
2978 DBUG_VOID_RETURN;
2979
2980 io->mark_position(stored_result, ref);
2981
2982 position_called= TRUE;
2983
2984 DBUG_VOID_RETURN;
2985}
2986
2987
2988/*
2989 This is like rnd_next, but you are given a position to use to determine the
2990 row. The position will be of the type that you stored in ref.
2991
2992 This method is required for an ORDER BY
2993
2994 Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
2995*/
2996
2997int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
2998{
2999 int retval;
3000 FEDERATEDX_IO_RESULT *result= stored_result;
3001 DBUG_ENTER("ha_federatedx::rnd_pos");
3002
3003 /* We have to move this to 'ref' to get things aligned */
3004 bmove(ref, pos, ref_length);
3005
3006 if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
3007 goto error;
3008
3009 if ((retval= io->seek_position(&result, ref)))
3010 goto error;
3011
3012 retval= read_next(buf, result);
3013 DBUG_RETURN(retval);
3014
3015error:
3016 DBUG_RETURN(retval);
3017}
3018
3019
3020/*
3021 ::info() is used to return information to the optimizer.
3022 Currently this table handler doesn't implement most of the fields
3023 really needed. SHOW also makes use of this data
3024 Another note, you will probably want to have the following in your
3025 code:
3026 if (records < 2)
3027 records = 2;
3028 The reason is that the server will optimize for cases of only a single
3029 record. If in a table scan you don't know the number of records
3030 it will probably be better to set records to two so you can return
3031 as many records as you need.
3032 Along with records a few more variables you may wish to set are:
3033 records
3034 deleted
3035 data_file_length
3036 index_file_length
3037 delete_length
3038 check_time
3039 Take a look at the public variables in handler.h for more information.
3040
3041 Called in:
3042 filesort.cc
3043 ha_heap.cc
3044 item_sum.cc
3045 opt_sum.cc
3046 sql_delete.cc
3047 sql_delete.cc
3048 sql_derived.cc
3049 sql_select.cc
3050 sql_select.cc
3051 sql_select.cc
3052 sql_select.cc
3053 sql_select.cc
3054 sql_show.cc
3055 sql_show.cc
3056 sql_show.cc
3057 sql_show.cc
3058 sql_table.cc
3059 sql_union.cc
3060 sql_update.cc
3061
3062*/
3063
3064int ha_federatedx::info(uint flag)
3065{
3066 uint error_code;
3067 THD *thd= ha_thd();
3068 federatedx_txn *tmp_txn;
3069 federatedx_io *tmp_io= 0, **iop= 0;
3070 DBUG_ENTER("ha_federatedx::info");
3071
3072 error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
3073
3074 // external_lock may not have been called so txn may not be set
3075 tmp_txn= get_txn(thd);
3076
3077 /* we want not to show table status if not needed to do so */
3078 if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
3079 {
3080 if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
3081 goto fail;
3082 }
3083
3084 if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
3085 {
3086 /*
3087 size of IO operations (This is based on a good guess, no high science
3088 involved)
3089 */
3090 if (flag & HA_STATUS_CONST)
3091 stats.block_size= 4096;
3092
3093 if ((*iop)->table_metadata(&stats, share->table_name,
3094 (uint)share->table_name_length, flag))
3095 goto error;
3096 }
3097
3098 if (flag & HA_STATUS_AUTO)
3099 stats.auto_increment_value= (*iop)->last_insert_id();
3100
3101 /*
3102 If ::info created it's own transaction, close it. This happens in case
3103 of show table status;
3104 */
3105 tmp_txn->release(&tmp_io);
3106
3107 DBUG_RETURN(0);
3108
3109error:
3110 if (iop && *iop)
3111 {
3112 my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0),
3113 (*iop)->error_code(), (*iop)->error_str());
3114 }
3115 else if (remote_error_number != -1 /* error already reported */)
3116 {
3117 error_code= remote_error_number;
3118 my_error(error_code, MYF(0), ER_THD(thd, error_code));
3119 }
3120fail:
3121 tmp_txn->release(&tmp_io);
3122 DBUG_RETURN(error_code);
3123}
3124
3125
3126/**
3127 @brief Handles extra signals from MySQL server
3128
3129 @param[in] operation Hint for storage engine
3130
3131 @return Operation Status
3132 @retval 0 OK
3133 */
3134int ha_federatedx::extra(ha_extra_function operation)
3135{
3136 DBUG_ENTER("ha_federatedx::extra");
3137 switch (operation) {
3138 case HA_EXTRA_IGNORE_DUP_KEY:
3139 ignore_duplicates= TRUE;
3140 break;
3141 case HA_EXTRA_NO_IGNORE_DUP_KEY:
3142 insert_dup_update= FALSE;
3143 ignore_duplicates= FALSE;
3144 break;
3145 case HA_EXTRA_WRITE_CAN_REPLACE:
3146 replace_duplicates= TRUE;
3147 break;
3148 case HA_EXTRA_WRITE_CANNOT_REPLACE:
3149 /*
3150 We use this flag to ensure that we do not create an "INSERT IGNORE"
3151 statement when inserting new rows into the remote table.
3152 */
3153 replace_duplicates= FALSE;
3154 break;
3155 case HA_EXTRA_INSERT_WITH_UPDATE:
3156 insert_dup_update= TRUE;
3157 break;
3158 case HA_EXTRA_PREPARE_FOR_DROP:
3159 table_will_be_deleted = TRUE;
3160 break;
3161 default:
3162 /* do nothing */
3163 DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
3164 }
3165 DBUG_RETURN(0);
3166}
3167
3168
3169/**
3170 @brief Reset state of file to after 'open'.
3171
3172 @detail This function is called after every statement for all tables
3173 used by that statement.
3174
3175 @return Operation status
3176 @retval 0 OK
3177*/
3178
3179int ha_federatedx::reset(void)
3180{
3181 THD *thd= ha_thd();
3182 int error = 0;
3183
3184 insert_dup_update= FALSE;
3185 ignore_duplicates= FALSE;
3186 replace_duplicates= FALSE;
3187 position_called= FALSE;
3188
3189 if (stored_result)
3190 insert_dynamic(&results, (uchar*) &stored_result);
3191 stored_result= 0;
3192
3193 if (results.elements)
3194 {
3195 federatedx_txn *tmp_txn;
3196 federatedx_io *tmp_io= 0, **iop;
3197
3198 // external_lock may not have been called so txn may not be set
3199 tmp_txn= get_txn(thd);
3200
3201 if (!*(iop= &io) && (error= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
3202 {
3203 DBUG_ASSERT(0); // Fail when testing
3204 return error;
3205 }
3206
3207 for (uint i= 0; i < results.elements; ++i)
3208 {
3209 FEDERATEDX_IO_RESULT *result= 0;
3210 get_dynamic(&results, (uchar*) &result, i);
3211 (*iop)->free_result(result);
3212 }
3213 tmp_txn->release(&tmp_io);
3214 reset_dynamic(&results);
3215 }
3216
3217 return error;
3218
3219}
3220
3221/*
3222 Used to delete all rows in a table. Both for cases of truncate and
3223 for cases where the optimizer realizes that all rows will be
3224 removed as a result of a SQL statement.
3225
3226 Called from item_sum.cc by Item_func_group_concat::clear(),
3227 Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
3228 Called from sql_delete.cc by mysql_delete().
3229 Called from sql_select.cc by JOIN::reinit().
3230 Called from sql_union.cc by st_select_lex_unit::exec().
3231*/
3232
3233int ha_federatedx::delete_all_rows()
3234{
3235 THD *thd= ha_thd();
3236 char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
3237 String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
3238 int error;
3239 DBUG_ENTER("ha_federatedx::delete_all_rows");
3240
3241 query.length(0);
3242
3243 query.set_charset(system_charset_info);
3244 if (thd->lex->sql_command == SQLCOM_TRUNCATE)
3245 query.append(STRING_WITH_LEN("TRUNCATE "));
3246 else
3247 query.append(STRING_WITH_LEN("DELETE FROM "));
3248 append_ident(&query, share->table_name, share->table_name_length,
3249 ident_quote_char);
3250
3251 /* no need for savepoint in autocommit mode */
3252 if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
3253 txn->stmt_autocommit();
3254
3255 /*
3256 TRUNCATE won't return anything in mysql_affected_rows
3257 */
3258
3259 if ((error= txn->acquire(share, thd, FALSE, &io)))
3260 DBUG_RETURN(error);
3261
3262 if (io->query(query.ptr(), query.length()))
3263 {
3264 DBUG_RETURN(stash_remote_error());
3265 }
3266 stats.deleted+= stats.records;
3267 stats.records= 0;
3268 DBUG_RETURN(0);
3269}
3270
3271
3272/*
3273 The idea with handler::store_lock() is the following:
3274
3275 The statement decided which locks we should need for the table
3276 for updates/deletes/inserts we get WRITE locks, for SELECT... we get
3277 read locks.
3278
3279 Before adding the lock into the table lock handler (see thr_lock.c)
3280 mysqld calls store lock with the requested locks. Store lock can now
3281 modify a write lock to a read lock (or some other lock), ignore the
3282 lock (if we don't want to use MySQL table locks at all) or add locks
3283 for many tables (like we do when we are using a MERGE handler).
3284
3285 Berkeley DB for federatedx changes all WRITE locks to TL_WRITE_ALLOW_WRITE
3286 (which signals that we are doing WRITES, but we are still allowing other
3287 reader's and writer's.
3288
3289 When releasing locks, store_lock() are also called. In this case one
3290 usually doesn't have to do anything.
3291
3292 In some exceptional cases MySQL may send a request for a TL_IGNORE;
3293 This means that we are requesting the same lock as last time and this
3294 should also be ignored. (This may happen when someone does a flush
3295 table when we have opened a part of the tables, in which case mysqld
3296 closes and reopens the tables and tries to get the same locks at last
3297 time). In the future we will probably try to remove this.
3298
3299 Called from lock.cc by get_lock_data().
3300*/
3301
3302THR_LOCK_DATA **ha_federatedx::store_lock(THD *thd,
3303 THR_LOCK_DATA **to,
3304 enum thr_lock_type lock_type)
3305{
3306 DBUG_ENTER("ha_federatedx::store_lock");
3307 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3308 {
3309 /*
3310 Here is where we get into the guts of a row level lock.
3311 If TL_UNLOCK is set
3312 If we are not doing a LOCK TABLE or DISCARD/IMPORT
3313 TABLESPACE, then allow multiple writers
3314 */
3315
3316 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3317 lock_type <= TL_WRITE) && !thd->in_lock_tables)
3318 lock_type= TL_WRITE_ALLOW_WRITE;
3319
3320 /*
3321 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
3322 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
3323 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
3324 to t2. Convert the lock to a normal read lock to allow
3325 concurrent inserts to t2.
3326 */
3327
3328 if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3329 lock_type= TL_READ;
3330
3331 lock.type= lock_type;
3332 }
3333
3334 *to++= &lock;
3335
3336 DBUG_RETURN(to);
3337}
3338
3339
3340static int test_connection(MYSQL_THD thd, federatedx_io *io,
3341 FEDERATEDX_SHARE *share)
3342{
3343 char buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
3344 String str(buffer, sizeof(buffer), &my_charset_bin);
3345 FEDERATEDX_IO_RESULT *resultset= NULL;
3346 int retval;
3347
3348 str.length(0);
3349 str.append(STRING_WITH_LEN("SELECT * FROM "));
3350 append_identifier(thd, &str, share->table_name,
3351 share->table_name_length);
3352 str.append(STRING_WITH_LEN(" WHERE 1=0"));
3353
3354 if ((retval= io->query(str.ptr(), str.length())))
3355 {
3356 sprintf(buffer, "database: '%s' username: '%s' hostname: '%s'",
3357 share->database, share->username, share->hostname);
3358 DBUG_PRINT("info", ("error-code: %d", io->error_code()));
3359 my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
3360 }
3361 else
3362 resultset= io->store_result();
3363
3364 io->free_result(resultset);
3365
3366 return retval;
3367}
3368
3369/*
3370 create() does nothing, since we have no local setup of our own.
3371 FUTURE: We should potentially connect to the foreign database and
3372*/
3373
3374int ha_federatedx::create(const char *name, TABLE *table_arg,
3375 HA_CREATE_INFO *create_info)
3376{
3377 int retval;
3378 THD *thd= ha_thd();
3379 FEDERATEDX_SHARE tmp_share; // Only a temporary share, to test the url
3380 federatedx_txn *tmp_txn;
3381 federatedx_io *tmp_io= NULL;
3382 DBUG_ENTER("ha_federatedx::create");
3383
3384 if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg->s, 1)))
3385 goto error;
3386
3387 /* loopback socket connections hang due to LOCK_open mutex */
3388 if ((!tmp_share.hostname || !strcmp(tmp_share.hostname,my_localhost)) &&
3389 !tmp_share.port)
3390 goto error;
3391
3392 /*
3393 If possible, we try to use an existing network connection to
3394 the remote server. To ensure that no new FEDERATEDX_SERVER
3395 instance is created, we pass NULL in get_server() TABLE arg.
3396 */
3397 mysql_mutex_lock(&federatedx_mutex);
3398 tmp_share.s= get_server(&tmp_share, NULL);
3399 mysql_mutex_unlock(&federatedx_mutex);
3400
3401 if (tmp_share.s)
3402 {
3403 tmp_txn= get_txn(thd);
3404 if (!(retval= tmp_txn->acquire(&tmp_share, thd, TRUE, &tmp_io)))
3405 {
3406 retval= test_connection(thd, tmp_io, &tmp_share);
3407 tmp_txn->release(&tmp_io);
3408 }
3409 free_server(tmp_txn, tmp_share.s);
3410 }
3411 else
3412 {
3413 FEDERATEDX_SERVER server;
3414
3415 fill_server(thd->mem_root, &server, &tmp_share, create_info->table_charset);
3416
3417#ifndef DBUG_OFF
3418 mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex,
3419 &server.mutex, MY_MUTEX_INIT_FAST);
3420 mysql_mutex_lock(&server.mutex);
3421#endif
3422
3423 tmp_io= federatedx_io::construct(thd->mem_root, &server);
3424
3425 retval= test_connection(thd, tmp_io, &tmp_share);
3426
3427#ifndef DBUG_OFF
3428 mysql_mutex_unlock(&server.mutex);
3429 mysql_mutex_destroy(&server.mutex);
3430#endif
3431
3432 delete tmp_io;
3433 }
3434
3435error:
3436 DBUG_RETURN(retval);
3437
3438}
3439
3440
3441int ha_federatedx::stash_remote_error()
3442{
3443 DBUG_ENTER("ha_federatedx::stash_remote_error()");
3444 if (!io)
3445 DBUG_RETURN(remote_error_number);
3446 remote_error_number= io->error_code();
3447 strmake_buf(remote_error_buf, io->error_str());
3448 if (remote_error_number == ER_DUP_ENTRY ||
3449 remote_error_number == ER_DUP_KEY)
3450 DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3451 DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
3452}
3453
3454
3455bool ha_federatedx::get_error_message(int error, String* buf)
3456{
3457 DBUG_ENTER("ha_federatedx::get_error_message");
3458 DBUG_PRINT("enter", ("error: %d", error));
3459 if (error == HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM)
3460 {
3461 buf->append(STRING_WITH_LEN("Error on remote system: "));
3462 buf->qs_append(remote_error_number);
3463 buf->append(STRING_WITH_LEN(": "));
3464 buf->append(remote_error_buf);
3465 /* Ensure string ends with \0 */
3466 (void) buf->c_ptr_safe();
3467
3468 remote_error_number= 0;
3469 remote_error_buf[0]= '\0';
3470 }
3471 DBUG_PRINT("exit", ("message: %s", buf->c_ptr_safe()));
3472 DBUG_RETURN(FALSE);
3473}
3474
3475
3476int ha_federatedx::start_stmt(MYSQL_THD thd, thr_lock_type lock_type)
3477{
3478 DBUG_ENTER("ha_federatedx::start_stmt");
3479 DBUG_ASSERT(txn == get_txn(thd));
3480
3481 if (!txn->in_transaction())
3482 {
3483 txn->stmt_begin();
3484 trans_register_ha(thd, FALSE, ht);
3485 }
3486 DBUG_RETURN(0);
3487}
3488
3489
3490int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type)
3491{
3492 int error= 0;
3493 DBUG_ENTER("ha_federatedx::external_lock");
3494
3495 if (lock_type == F_UNLCK)
3496 txn->release(&io);
3497 else
3498 {
3499 table_will_be_deleted = FALSE;
3500 txn= get_txn(thd);
3501 if (!(error= txn->acquire(share, ha_thd(), lock_type == F_RDLCK, &io)) &&
3502 (lock_type == F_WRLCK || !io->is_autocommit()))
3503 {
3504 if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
3505 {
3506 txn->stmt_begin();
3507 trans_register_ha(thd, FALSE, ht);
3508 }
3509 else
3510 {
3511 txn->txn_begin();
3512 trans_register_ha(thd, TRUE, ht);
3513 }
3514 }
3515 }
3516
3517 DBUG_RETURN(error);
3518}
3519
3520
3521int ha_federatedx::savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv)
3522{
3523 int error= 0;
3524 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
3525 DBUG_ENTER("ha_federatedx::savepoint_set");
3526
3527 if (txn && txn->has_connections())
3528 {
3529 if (txn->txn_begin())
3530 trans_register_ha(thd, TRUE, hton);
3531
3532 txn->sp_acquire((ulong *) sv);
3533
3534 DBUG_ASSERT(1 < *(ulong *) sv);
3535 }
3536
3537 DBUG_RETURN(error);
3538}
3539
3540
3541int ha_federatedx::savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv)
3542 {
3543 int error= 0;
3544 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
3545 DBUG_ENTER("ha_federatedx::savepoint_rollback");
3546
3547 if (txn)
3548 error= txn->sp_rollback((ulong *) sv);
3549
3550 DBUG_RETURN(error);
3551}
3552
3553
3554int ha_federatedx::savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv)
3555{
3556 int error= 0;
3557 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
3558 DBUG_ENTER("ha_federatedx::savepoint_release");
3559
3560 if (txn)
3561 error= txn->sp_release((ulong *) sv);
3562
3563 DBUG_RETURN(error);
3564}
3565
3566
3567int ha_federatedx::commit(handlerton *hton, MYSQL_THD thd, bool all)
3568{
3569 int return_val;
3570 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
3571 DBUG_ENTER("ha_federatedx::commit");
3572
3573 if (all)
3574 return_val= txn->txn_commit();
3575 else
3576 return_val= txn->stmt_commit();
3577
3578 DBUG_PRINT("info", ("error val: %d", return_val));
3579 DBUG_RETURN(return_val);
3580}
3581
3582
3583int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all)
3584{
3585 int return_val;
3586 federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
3587 DBUG_ENTER("ha_federatedx::rollback");
3588
3589 if (all)
3590 return_val= txn->txn_rollback();
3591 else
3592 return_val= txn->stmt_rollback();
3593
3594 DBUG_PRINT("info", ("error val: %d", return_val));
3595 DBUG_RETURN(return_val);
3596}
3597
3598
3599/*
3600 Federated supports assisted discovery, like
3601 CREATE TABLE t1 CONNECTION="mysql://joe:pass@192.168.1.111/federated/t1";
3602 but not a fully automatic discovery where a table magically appear
3603 on any use (like, on SELECT * from t1).
3604*/
3605int ha_federatedx::discover_assisted(handlerton *hton, THD* thd,
3606 TABLE_SHARE *table_s, HA_CREATE_INFO *info)
3607{
3608 int error= HA_ERR_NO_CONNECTION;
3609 FEDERATEDX_SHARE tmp_share;
3610 CHARSET_INFO *cs= system_charset_info;
3611 MYSQL mysql;
3612 char buf[1024];
3613 String query(buf, sizeof(buf), cs);
3614 static LEX_CSTRING cut_clause={STRING_WITH_LEN(" WITH SYSTEM VERSIONING")};
3615 int cut_offset;
3616 MYSQL_RES *res;
3617 MYSQL_ROW rdata;
3618 ulong *rlen;
3619 my_bool my_true= 1;
3620
3621 if (parse_url(thd->mem_root, &tmp_share, table_s, 1))
3622 return HA_WRONG_CREATE_OPTION;
3623
3624 mysql_init(&mysql);
3625 mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, cs->csname);
3626 mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*)&my_true);
3627
3628 if (!mysql_real_connect(&mysql, tmp_share.hostname, tmp_share.username,
3629 tmp_share.password, tmp_share.database,
3630 tmp_share.port, tmp_share.socket, 0))
3631 goto err1;
3632
3633 if (mysql_real_query(&mysql, STRING_WITH_LEN("SET SQL_MODE=NO_TABLE_OPTIONS")))
3634 goto err1;
3635
3636 query.copy(STRING_WITH_LEN("SHOW CREATE TABLE "), cs);
3637 append_ident(&query, tmp_share.table_name,
3638 tmp_share.table_name_length, ident_quote_char);
3639
3640 if (mysql_real_query(&mysql, query.ptr(), query.length()))
3641 goto err1;
3642
3643 if (!((res= mysql_store_result(&mysql))))
3644 goto err1;
3645
3646 if (!(rdata= mysql_fetch_row(res)) || !((rlen= mysql_fetch_lengths(res))))
3647 goto err2;
3648
3649 query.copy(rdata[1], rlen[1], cs);
3650 cut_offset= (int)query.length() - (int)cut_clause.length;
3651 if (cut_offset > 0 && !memcmp(query.ptr() + cut_offset,
3652 cut_clause.str, cut_clause.length))
3653 query.length(cut_offset);
3654 query.append(STRING_WITH_LEN(" CONNECTION='"), cs);
3655 query.append_for_single_quote(table_s->connect_string.str,
3656 table_s->connect_string.length);
3657 query.append('\'');
3658
3659 error= table_s->init_from_sql_statement_string(thd, true,
3660 query.ptr(), query.length());
3661
3662err2:
3663 mysql_free_result(res);
3664err1:
3665 if (error)
3666 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), mysql_error(&mysql));
3667 mysql_close(&mysql);
3668 return error;
3669}
3670
3671
3672struct st_mysql_storage_engine federatedx_storage_engine=
3673{ MYSQL_HANDLERTON_INTERFACE_VERSION };
3674
3675maria_declare_plugin(federatedx)
3676{
3677 MYSQL_STORAGE_ENGINE_PLUGIN,
3678 &federatedx_storage_engine,
3679 "FEDERATED",
3680 "Patrick Galbraith",
3681 "FederatedX pluggable storage engine",
3682 PLUGIN_LICENSE_GPL,
3683 federatedx_db_init, /* Plugin Init */
3684 federatedx_done, /* Plugin Deinit */
3685 0x0201 /* 2.1 */,
3686 NULL, /* status variables */
3687 NULL, /* system variables */
3688 "2.1", /* string version */
3689 MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
3690}
3691maria_declare_plugin_end;
3692