1/*
2Copyright (c) 2008, Patrick Galbraith
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are
7met:
8
9 * Redistributions of source code must retain the above copyright
10notice, this list of conditions and the following disclaimer.
11
12 * Redistributions in binary form must reproduce the above
13copyright notice, this list of conditions and the following disclaimer
14in the documentation and/or other materials provided with the
15distribution.
16
17 * Neither the name of Patrick Galbraith nor the names of its
18contributors may be used to endorse or promote products derived from
19this software without specific prior written permission.
20
21THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32*/
33
34
35#ifdef USE_PRAGMA_INTERFACE
36#pragma interface /* gcc class implementation */
37#endif
38
39//#include <mysql.h>
40#include <my_global.h>
41#include <thr_lock.h>
42#include "handler.h"
43
44class federatedx_io;
45
46/*
47 FEDERATEDX_SERVER will eventually be a structure that will be shared among
48 all FEDERATEDX_SHARE instances so that the federated server can minimise
49 the number of open connections. This will eventually lead to the support
50 of reliable XA federated tables.
51*/
52typedef struct st_fedrated_server {
53 MEM_ROOT mem_root;
54 uint use_count, io_count;
55
56 uchar *key;
57 uint key_length;
58
59 const char *scheme;
60 const char *hostname;
61 const char *username;
62 const char *password;
63 const char *database;
64 const char *socket;
65 ushort port;
66
67 const char *csname;
68
69 mysql_mutex_t mutex;
70 federatedx_io *idle_list;
71} FEDERATEDX_SERVER;
72
73/*
74 Please read ha_exmple.cc before reading this file.
75 Please keep in mind that the federatedx storage engine implements all methods
76 that are required to be implemented. handler.h has a full list of methods
77 that you can implement.
78*/
79
80/*
81 handler::print_error has a case statement for error numbers.
82 This value is (10000) is far out of range and will envoke the
83 default: case.
84 (Current error range is 120-159 from include/my_base.h)
85*/
86#define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
87
88#define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5
89#define FEDERATEDX_RECORDS_IN_RANGE 2
90#define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb
91
92/*
93 FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers
94 The example implements the minimum of what you will probably need.
95*/
96typedef struct st_federatedx_share {
97 MEM_ROOT mem_root;
98
99 bool parsed;
100 /* this key is unique db/tablename */
101 const char *share_key;
102 /*
103 the primary select query to be used in rnd_init
104 */
105 char *select_query;
106 /*
107 remote host info, parse_url supplies
108 */
109 char *server_name;
110 char *connection_string;
111 char *scheme;
112 char *hostname;
113 char *username;
114 char *password;
115 char *database;
116 char *table_name;
117 char *table;
118 char *socket;
119 char *sport;
120 int share_key_length;
121 ushort port;
122
123 size_t table_name_length, server_name_length, connect_string_length;
124 uint use_count;
125 THR_LOCK lock;
126 FEDERATEDX_SERVER *s;
127} FEDERATEDX_SHARE;
128
129
130typedef struct st_federatedx_result FEDERATEDX_IO_RESULT;
131typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
132typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
133
134class federatedx_io
135{
136 friend class federatedx_txn;
137 FEDERATEDX_SERVER * const server;
138 federatedx_io **owner_ptr;
139 federatedx_io *txn_next;
140 federatedx_io *idle_next;
141 bool active; /* currently participating in a transaction */
142 bool busy; /* in use by a ha_federated instance */
143 bool readonly;/* indicates that no updates have occurred */
144
145protected:
146 void set_active(bool new_active)
147 { active= new_active; }
148public:
149 federatedx_io(FEDERATEDX_SERVER *);
150 virtual ~federatedx_io();
151
152 bool is_readonly() const { return readonly; }
153 bool is_active() const { return active; }
154
155 const char * get_charsetname() const
156 { return server->csname ? server->csname : "latin1"; }
157
158 const char * get_hostname() const { return server->hostname; }
159 const char * get_username() const { return server->username; }
160 const char * get_password() const { return server->password; }
161 const char * get_database() const { return server->database; }
162 ushort get_port() const { return server->port; }
163 const char * get_socket() const { return server->socket; }
164
165 static bool handles_scheme(const char *scheme);
166 static federatedx_io *construct(MEM_ROOT *server_root,
167 FEDERATEDX_SERVER *server);
168
169 static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
170 { return alloc_root(mem_root, size); }
171 static void operator delete(void *ptr, size_t size)
172 { TRASH_FREE(ptr, size); }
173 static void operator delete(void *, MEM_ROOT *)
174 { }
175
176 virtual int query(const char *buffer, size_t length)=0;
177 virtual FEDERATEDX_IO_RESULT *store_result()=0;
178
179 virtual size_t max_query_size() const=0;
180
181 virtual my_ulonglong affected_rows() const=0;
182 virtual my_ulonglong last_insert_id() const=0;
183
184 virtual int error_code()=0;
185 virtual const char *error_str()=0;
186
187 virtual void reset()=0;
188 virtual int commit()=0;
189 virtual int rollback()=0;
190
191 virtual int savepoint_set(ulong sp)=0;
192 virtual ulong savepoint_release(ulong sp)=0;
193 virtual ulong savepoint_rollback(ulong sp)=0;
194 virtual void savepoint_restrict(ulong sp)=0;
195
196 virtual ulong last_savepoint() const=0;
197 virtual ulong actual_savepoint() const=0;
198 virtual bool is_autocommit() const=0;
199
200 virtual bool table_metadata(ha_statistics *stats, const char *table_name,
201 uint table_name_length, uint flag) = 0;
202
203 /* resultset operations */
204
205 virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
206 virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
207 virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
208 virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0;
209 virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
210 virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
211 unsigned int column)=0;
212 virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
213 unsigned int column) const=0;
214
215 virtual size_t get_ref_length() const=0;
216 virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
217 void *ref)=0;
218 virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
219 const void *ref)=0;
220 virtual void set_thd(void *thd) { }
221
222};
223
224
225class federatedx_txn
226{
227 federatedx_io *txn_list;
228 ulong savepoint_level;
229 ulong savepoint_stmt;
230 ulong savepoint_next;
231
232 void release_scan();
233public:
234 federatedx_txn();
235 ~federatedx_txn();
236
237 bool has_connections() const { return txn_list != NULL; }
238 bool in_transaction() const { return savepoint_next != 0; }
239 int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
240 void release(federatedx_io **io);
241 void close(FEDERATEDX_SERVER *);
242
243 bool txn_begin();
244 int txn_commit();
245 int txn_rollback();
246
247 bool sp_acquire(ulong *save);
248 int sp_rollback(ulong *save);
249 int sp_release(ulong *save);
250
251 bool stmt_begin();
252 int stmt_commit();
253 int stmt_rollback();
254 void stmt_autocommit();
255};
256
257
258/*
259 Class definition for the storage engine
260*/
261class ha_federatedx: public handler
262{
263 friend int federatedx_db_init(void *p);
264
265 THR_LOCK_DATA lock; /* MySQL lock */
266 FEDERATEDX_SHARE *share; /* Shared lock info */
267 federatedx_txn *txn;
268 federatedx_io *io;
269 FEDERATEDX_IO_RESULT *stored_result;
270 /**
271 Array of all stored results we get during a query execution.
272 */
273 DYNAMIC_ARRAY results;
274 bool position_called;
275 uint fetch_num; // stores the fetch num
276 int remote_error_number;
277 char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
278 bool ignore_duplicates, replace_duplicates;
279 bool insert_dup_update, table_will_be_deleted;
280 DYNAMIC_STRING bulk_insert;
281
282private:
283 /*
284 return 0 on success
285 return errorcode otherwise
286 */
287 uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
288 FEDERATEDX_IO_RESULT *result);
289 bool create_where_from_key(String *to, KEY *key_info,
290 const key_range *start_key,
291 const key_range *end_key,
292 bool records_in_range, bool eq_range);
293 int stash_remote_error();
294
295 federatedx_txn *get_txn(THD *thd, bool no_create= FALSE);
296
297 static int disconnect(handlerton *hton, MYSQL_THD thd);
298 static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv);
299 static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv);
300 static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
301 static int commit(handlerton *hton, MYSQL_THD thd, bool all);
302 static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
303 static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
304 HA_CREATE_INFO *);
305
306 bool append_stmt_insert(String *query);
307
308 int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result);
309 int index_read_idx_with_result_set(uchar *buf, uint index,
310 const uchar *key,
311 uint key_len,
312 ha_rkey_function find_flag,
313 FEDERATEDX_IO_RESULT **result);
314 int real_query(const char *query, uint length);
315 int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag);
316public:
317 ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg);
318 ~ha_federatedx() {}
319 /*
320 The name of the index type that will be used for display
321 don't implement this method unless you really have indexes
322 */
323 // perhaps get index type
324 const char *index_type(uint inx) { return "REMOTE"; }
325 /*
326 This is a list of flags that says what the storage engine
327 implements. The current table flags are documented in
328 handler.h
329 */
330 ulonglong table_flags() const
331 {
332 /* fix server to be able to get remote server table flags */
333 return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED
334 | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS |
335 HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR |
336 HA_PRIMARY_KEY_REQUIRED_FOR_DELETE |
337 HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY);
338 }
339 /*
340 This is a bitmap of flags that says how the storage engine
341 implements indexes. The current index flags are documented in
342 handler.h. If you do not implement indexes, just return zero
343 here.
344
345 part is the key part to check. First key part is 0
346 If all_parts it's set, MySQL want to know the flags for the combined
347 index up to and including 'part'.
348 */
349 /* fix server to be able to get remote server index flags */
350 ulong index_flags(uint inx, uint part, bool all_parts) const
351 {
352 return (HA_READ_NEXT | HA_READ_RANGE);
353 }
354 uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
355 uint max_supported_keys() const { return MAX_KEY; }
356 uint max_supported_key_parts() const { return MAX_REF_PARTS; }
357 uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
358 uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
359 /*
360 Called in test_quick_select to determine if indexes should be used.
361 Normally, we need to know number of blocks . For federatedx we need to
362 know number of blocks on remote side, and number of packets and blocks
363 on the network side (?)
364 Talk to Kostja about this - how to get the
365 number of rows * ...
366 disk scan time on other side (block size, size of the row) + network time ...
367 The reason for "records * 1000" is that such a large number forces
368 this to use indexes "
369 */
370 double scan_time()
371 {
372 DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
373 return (double)(stats.records*1000);
374 }
375 /*
376 The next method will never be called if you do not implement indexes.
377 */
378 double read_time(uint index, uint ranges, ha_rows rows)
379 {
380 /*
381 Per Brian, this number is bugus, but this method must be implemented,
382 and at a later date, he intends to document this issue for handler code
383 */
384 return (double) rows / 20.0+1;
385 }
386
387 const key_map *keys_to_use_for_scanning() { return &key_map_full; }
388 /*
389 Everything below are methods that we implment in ha_federatedx.cc.
390
391 Most of these methods are not obligatory, skip them and
392 MySQL will treat them as not implemented
393 */
394 int open(const char *name, int mode, uint test_if_locked); // required
395 int close(void); // required
396
397 void start_bulk_insert(ha_rows rows, uint flags);
398 int end_bulk_insert();
399 int write_row(uchar *buf);
400 int update_row(const uchar *old_data, const uchar *new_data);
401 int delete_row(const uchar *buf);
402 int index_init(uint keynr, bool sorted);
403 ha_rows estimate_rows_upper_bound();
404 int index_read(uchar *buf, const uchar *key,
405 uint key_len, enum ha_rkey_function find_flag);
406 int index_read_idx(uchar *buf, uint idx, const uchar *key,
407 uint key_len, enum ha_rkey_function find_flag);
408 int index_next(uchar *buf);
409 int index_end();
410 int read_range_first(const key_range *start_key,
411 const key_range *end_key,
412 bool eq_range, bool sorted);
413 int read_range_next();
414 /*
415 unlike index_init(), rnd_init() can be called two times
416 without rnd_end() in between (it only makes sense if scan=1).
417 then the second call should prepare for the new table scan
418 (e.g if rnd_init allocates the cursor, second call should
419 position it to the start of the table, no need to deallocate
420 and allocate it again
421 */
422 int rnd_init(bool scan); //required
423 int rnd_end();
424 int rnd_next(uchar *buf); //required
425 int rnd_pos(uchar *buf, uchar *pos); //required
426 void position(const uchar *record); //required
427 int info(uint); //required
428 int extra(ha_extra_function operation);
429
430 void update_auto_increment(void);
431 int repair(THD* thd, HA_CHECK_OPT* check_opt);
432 int optimize(THD* thd, HA_CHECK_OPT* check_opt);
433
434 int delete_all_rows(void);
435 int create(const char *name, TABLE *form,
436 HA_CREATE_INFO *create_info); //required
437 ha_rows records_in_range(uint inx, key_range *start_key,
438 key_range *end_key);
439 uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; }
440
441 THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
442 enum thr_lock_type lock_type); //required
443 bool get_error_message(int error, String *buf);
444 int start_stmt(THD *thd, thr_lock_type lock_type);
445 int external_lock(THD *thd, int lock_type);
446 int reset(void);
447 int free_result(void);
448};
449
450extern const char ident_quote_char; // Character for quoting
451 // identifiers
452extern const char value_quote_char; // Character for quoting
453 // literals
454
455extern bool append_ident(String *string, const char *name, size_t length,
456 const char quote_char);
457
458
459extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
460 FEDERATEDX_SERVER *server);
461extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
462 FEDERATEDX_SERVER *server);
463