1 | /* |
2 | Copyright (c) 2008, Patrick Galbraith |
3 | All rights reserved. |
4 | |
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted provided that the following conditions are |
7 | met: |
8 | |
9 | * Redistributions of source code must retain the above copyright |
10 | notice, this list of conditions and the following disclaimer. |
11 | |
12 | * Redistributions in binary form must reproduce the above |
13 | copyright notice, this list of conditions and the following disclaimer |
14 | in the documentation and/or other materials provided with the |
15 | distribution. |
16 | |
17 | * Neither the name of Patrick Galbraith nor the names of its |
18 | contributors may be used to endorse or promote products derived from |
19 | this software without specific prior written permission. |
20 | |
21 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
22 | "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
23 | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
24 | A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
25 | OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 | SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 | LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
28 | DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
29 | THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
30 | (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
31 | OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
32 | */ |
33 | |
34 | |
35 | #ifdef USE_PRAGMA_INTERFACE |
36 | #pragma interface /* gcc class implementation */ |
37 | #endif |
38 | |
39 | //#include <mysql.h> |
40 | #include <my_global.h> |
41 | #include <thr_lock.h> |
42 | #include "handler.h" |
43 | |
44 | class federatedx_io; |
45 | |
46 | /* |
47 | FEDERATEDX_SERVER will eventually be a structure that will be shared among |
48 | all FEDERATEDX_SHARE instances so that the federated server can minimise |
49 | the number of open connections. This will eventually lead to the support |
50 | of reliable XA federated tables. |
51 | */ |
52 | typedef struct st_fedrated_server { |
53 | MEM_ROOT mem_root; |
54 | uint use_count, io_count; |
55 | |
56 | uchar *key; |
57 | uint key_length; |
58 | |
59 | const char *scheme; |
60 | const char *hostname; |
61 | const char *username; |
62 | const char *password; |
63 | const char *database; |
64 | const char *socket; |
65 | ushort port; |
66 | |
67 | const char *csname; |
68 | |
69 | mysql_mutex_t mutex; |
70 | federatedx_io *idle_list; |
71 | } FEDERATEDX_SERVER; |
72 | |
73 | /* |
74 | Please read ha_exmple.cc before reading this file. |
75 | Please keep in mind that the federatedx storage engine implements all methods |
76 | that are required to be implemented. handler.h has a full list of methods |
77 | that you can implement. |
78 | */ |
79 | |
80 | /* |
81 | handler::print_error has a case statement for error numbers. |
82 | This value is (10000) is far out of range and will envoke the |
83 | default: case. |
84 | (Current error range is 120-159 from include/my_base.h) |
85 | */ |
86 | #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 |
87 | |
88 | #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5 |
89 | #define FEDERATEDX_RECORDS_IN_RANGE 2 |
90 | #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb |
91 | |
92 | /* |
93 | FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers |
94 | The example implements the minimum of what you will probably need. |
95 | */ |
96 | typedef struct st_federatedx_share { |
97 | MEM_ROOT mem_root; |
98 | |
99 | bool parsed; |
100 | /* this key is unique db/tablename */ |
101 | const char *share_key; |
102 | /* |
103 | the primary select query to be used in rnd_init |
104 | */ |
105 | char *select_query; |
106 | /* |
107 | remote host info, parse_url supplies |
108 | */ |
109 | char *server_name; |
110 | char *connection_string; |
111 | char *scheme; |
112 | char *hostname; |
113 | char *username; |
114 | char *password; |
115 | char *database; |
116 | char *table_name; |
117 | char *table; |
118 | char *socket; |
119 | char *sport; |
120 | int share_key_length; |
121 | ushort port; |
122 | |
123 | size_t table_name_length, server_name_length, connect_string_length; |
124 | uint use_count; |
125 | THR_LOCK lock; |
126 | FEDERATEDX_SERVER *s; |
127 | } FEDERATEDX_SHARE; |
128 | |
129 | |
130 | typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; |
131 | typedef struct st_federatedx_row FEDERATEDX_IO_ROW; |
132 | typedef ptrdiff_t FEDERATEDX_IO_OFFSET; |
133 | |
134 | class federatedx_io |
135 | { |
136 | friend class federatedx_txn; |
137 | FEDERATEDX_SERVER * const server; |
138 | federatedx_io **owner_ptr; |
139 | federatedx_io *txn_next; |
140 | federatedx_io *idle_next; |
141 | bool active; /* currently participating in a transaction */ |
142 | bool busy; /* in use by a ha_federated instance */ |
143 | bool readonly;/* indicates that no updates have occurred */ |
144 | |
145 | protected: |
146 | void set_active(bool new_active) |
147 | { active= new_active; } |
148 | public: |
149 | federatedx_io(FEDERATEDX_SERVER *); |
150 | virtual ~federatedx_io(); |
151 | |
152 | bool is_readonly() const { return readonly; } |
153 | bool is_active() const { return active; } |
154 | |
155 | const char * get_charsetname() const |
156 | { return server->csname ? server->csname : "latin1" ; } |
157 | |
158 | const char * get_hostname() const { return server->hostname; } |
159 | const char * get_username() const { return server->username; } |
160 | const char * get_password() const { return server->password; } |
161 | const char * get_database() const { return server->database; } |
162 | ushort get_port() const { return server->port; } |
163 | const char * get_socket() const { return server->socket; } |
164 | |
165 | static bool handles_scheme(const char *scheme); |
166 | static federatedx_io *construct(MEM_ROOT *server_root, |
167 | FEDERATEDX_SERVER *server); |
168 | |
169 | static void *operator new(size_t size, MEM_ROOT *mem_root) throw () |
170 | { return alloc_root(mem_root, size); } |
171 | static void operator delete(void *ptr, size_t size) |
172 | { TRASH_FREE(ptr, size); } |
173 | static void operator delete(void *, MEM_ROOT *) |
174 | { } |
175 | |
176 | virtual int query(const char *buffer, size_t length)=0; |
177 | virtual FEDERATEDX_IO_RESULT *store_result()=0; |
178 | |
179 | virtual size_t max_query_size() const=0; |
180 | |
181 | virtual my_ulonglong affected_rows() const=0; |
182 | virtual my_ulonglong last_insert_id() const=0; |
183 | |
184 | virtual int error_code()=0; |
185 | virtual const char *error_str()=0; |
186 | |
187 | virtual void reset()=0; |
188 | virtual int commit()=0; |
189 | virtual int rollback()=0; |
190 | |
191 | virtual int savepoint_set(ulong sp)=0; |
192 | virtual ulong savepoint_release(ulong sp)=0; |
193 | virtual ulong savepoint_rollback(ulong sp)=0; |
194 | virtual void savepoint_restrict(ulong sp)=0; |
195 | |
196 | virtual ulong last_savepoint() const=0; |
197 | virtual ulong actual_savepoint() const=0; |
198 | virtual bool is_autocommit() const=0; |
199 | |
200 | virtual bool table_metadata(ha_statistics *stats, const char *table_name, |
201 | uint table_name_length, uint flag) = 0; |
202 | |
203 | /* resultset operations */ |
204 | |
205 | virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; |
206 | virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; |
207 | virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; |
208 | virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0; |
209 | virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; |
210 | virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, |
211 | unsigned int column)=0; |
212 | virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, |
213 | unsigned int column) const=0; |
214 | |
215 | virtual size_t get_ref_length() const=0; |
216 | virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, |
217 | void *ref)=0; |
218 | virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, |
219 | const void *ref)=0; |
220 | virtual void set_thd(void *thd) { } |
221 | |
222 | }; |
223 | |
224 | |
225 | class federatedx_txn |
226 | { |
227 | federatedx_io *txn_list; |
228 | ulong savepoint_level; |
229 | ulong savepoint_stmt; |
230 | ulong savepoint_next; |
231 | |
232 | void release_scan(); |
233 | public: |
234 | federatedx_txn(); |
235 | ~federatedx_txn(); |
236 | |
237 | bool has_connections() const { return txn_list != NULL; } |
238 | bool in_transaction() const { return savepoint_next != 0; } |
239 | int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io); |
240 | void release(federatedx_io **io); |
241 | void close(FEDERATEDX_SERVER *); |
242 | |
243 | bool txn_begin(); |
244 | int txn_commit(); |
245 | int txn_rollback(); |
246 | |
247 | bool sp_acquire(ulong *save); |
248 | int sp_rollback(ulong *save); |
249 | int sp_release(ulong *save); |
250 | |
251 | bool stmt_begin(); |
252 | int stmt_commit(); |
253 | int stmt_rollback(); |
254 | void stmt_autocommit(); |
255 | }; |
256 | |
257 | |
258 | /* |
259 | Class definition for the storage engine |
260 | */ |
261 | class ha_federatedx: public handler |
262 | { |
263 | friend int federatedx_db_init(void *p); |
264 | |
265 | THR_LOCK_DATA lock; /* MySQL lock */ |
266 | FEDERATEDX_SHARE *share; /* Shared lock info */ |
267 | federatedx_txn *txn; |
268 | federatedx_io *io; |
269 | FEDERATEDX_IO_RESULT *stored_result; |
270 | /** |
271 | Array of all stored results we get during a query execution. |
272 | */ |
273 | DYNAMIC_ARRAY results; |
274 | bool position_called; |
275 | uint fetch_num; // stores the fetch num |
276 | int remote_error_number; |
277 | char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; |
278 | bool ignore_duplicates, replace_duplicates; |
279 | bool insert_dup_update, table_will_be_deleted; |
280 | DYNAMIC_STRING bulk_insert; |
281 | |
282 | private: |
283 | /* |
284 | return 0 on success |
285 | return errorcode otherwise |
286 | */ |
287 | uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, |
288 | FEDERATEDX_IO_RESULT *result); |
289 | bool create_where_from_key(String *to, KEY *key_info, |
290 | const key_range *start_key, |
291 | const key_range *end_key, |
292 | bool records_in_range, bool eq_range); |
293 | int stash_remote_error(); |
294 | |
295 | federatedx_txn *get_txn(THD *thd, bool no_create= FALSE); |
296 | |
297 | static int disconnect(handlerton *hton, MYSQL_THD thd); |
298 | static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv); |
299 | static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv); |
300 | static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); |
301 | static int commit(handlerton *hton, MYSQL_THD thd, bool all); |
302 | static int rollback(handlerton *hton, MYSQL_THD thd, bool all); |
303 | static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, |
304 | HA_CREATE_INFO *); |
305 | |
306 | bool append_stmt_insert(String *query); |
307 | |
308 | int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result); |
309 | int index_read_idx_with_result_set(uchar *buf, uint index, |
310 | const uchar *key, |
311 | uint key_len, |
312 | ha_rkey_function find_flag, |
313 | FEDERATEDX_IO_RESULT **result); |
314 | int real_query(const char *query, uint length); |
315 | int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag); |
316 | public: |
317 | ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); |
318 | ~ha_federatedx() {} |
319 | /* |
320 | The name of the index type that will be used for display |
321 | don't implement this method unless you really have indexes |
322 | */ |
323 | // perhaps get index type |
324 | const char *index_type(uint inx) { return "REMOTE" ; } |
325 | /* |
326 | This is a list of flags that says what the storage engine |
327 | implements. The current table flags are documented in |
328 | handler.h |
329 | */ |
330 | ulonglong table_flags() const |
331 | { |
332 | /* fix server to be able to get remote server table flags */ |
333 | return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED |
334 | | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | |
335 | HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR | |
336 | HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | |
337 | HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY); |
338 | } |
339 | /* |
340 | This is a bitmap of flags that says how the storage engine |
341 | implements indexes. The current index flags are documented in |
342 | handler.h. If you do not implement indexes, just return zero |
343 | here. |
344 | |
345 | part is the key part to check. First key part is 0 |
346 | If all_parts it's set, MySQL want to know the flags for the combined |
347 | index up to and including 'part'. |
348 | */ |
349 | /* fix server to be able to get remote server index flags */ |
350 | ulong index_flags(uint inx, uint part, bool all_parts) const |
351 | { |
352 | return (HA_READ_NEXT | HA_READ_RANGE); |
353 | } |
354 | uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } |
355 | uint max_supported_keys() const { return MAX_KEY; } |
356 | uint max_supported_key_parts() const { return MAX_REF_PARTS; } |
357 | uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } |
358 | uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } |
359 | /* |
360 | Called in test_quick_select to determine if indexes should be used. |
361 | Normally, we need to know number of blocks . For federatedx we need to |
362 | know number of blocks on remote side, and number of packets and blocks |
363 | on the network side (?) |
364 | Talk to Kostja about this - how to get the |
365 | number of rows * ... |
366 | disk scan time on other side (block size, size of the row) + network time ... |
367 | The reason for "records * 1000" is that such a large number forces |
368 | this to use indexes " |
369 | */ |
370 | double scan_time() |
371 | { |
372 | DBUG_PRINT("info" , ("records %lu" , (ulong) stats.records)); |
373 | return (double)(stats.records*1000); |
374 | } |
375 | /* |
376 | The next method will never be called if you do not implement indexes. |
377 | */ |
378 | double read_time(uint index, uint ranges, ha_rows rows) |
379 | { |
380 | /* |
381 | Per Brian, this number is bugus, but this method must be implemented, |
382 | and at a later date, he intends to document this issue for handler code |
383 | */ |
384 | return (double) rows / 20.0+1; |
385 | } |
386 | |
387 | const key_map *keys_to_use_for_scanning() { return &key_map_full; } |
388 | /* |
389 | Everything below are methods that we implment in ha_federatedx.cc. |
390 | |
391 | Most of these methods are not obligatory, skip them and |
392 | MySQL will treat them as not implemented |
393 | */ |
394 | int open(const char *name, int mode, uint test_if_locked); // required |
395 | int close(void); // required |
396 | |
397 | void start_bulk_insert(ha_rows rows, uint flags); |
398 | int end_bulk_insert(); |
399 | int write_row(uchar *buf); |
400 | int update_row(const uchar *old_data, const uchar *new_data); |
401 | int delete_row(const uchar *buf); |
402 | int index_init(uint keynr, bool sorted); |
403 | ha_rows estimate_rows_upper_bound(); |
404 | int index_read(uchar *buf, const uchar *key, |
405 | uint key_len, enum ha_rkey_function find_flag); |
406 | int index_read_idx(uchar *buf, uint idx, const uchar *key, |
407 | uint key_len, enum ha_rkey_function find_flag); |
408 | int index_next(uchar *buf); |
409 | int index_end(); |
410 | int read_range_first(const key_range *start_key, |
411 | const key_range *end_key, |
412 | bool eq_range, bool sorted); |
413 | int read_range_next(); |
414 | /* |
415 | unlike index_init(), rnd_init() can be called two times |
416 | without rnd_end() in between (it only makes sense if scan=1). |
417 | then the second call should prepare for the new table scan |
418 | (e.g if rnd_init allocates the cursor, second call should |
419 | position it to the start of the table, no need to deallocate |
420 | and allocate it again |
421 | */ |
422 | int rnd_init(bool scan); //required |
423 | int rnd_end(); |
424 | int rnd_next(uchar *buf); //required |
425 | int rnd_pos(uchar *buf, uchar *pos); //required |
426 | void position(const uchar *record); //required |
427 | int info(uint); //required |
428 | int (ha_extra_function operation); |
429 | |
430 | void update_auto_increment(void); |
431 | int repair(THD* thd, HA_CHECK_OPT* check_opt); |
432 | int optimize(THD* thd, HA_CHECK_OPT* check_opt); |
433 | |
434 | int delete_all_rows(void); |
435 | int create(const char *name, TABLE *form, |
436 | HA_CREATE_INFO *create_info); //required |
437 | ha_rows records_in_range(uint inx, key_range *start_key, |
438 | key_range *end_key); |
439 | uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; } |
440 | |
441 | THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, |
442 | enum thr_lock_type lock_type); //required |
443 | bool get_error_message(int error, String *buf); |
444 | int start_stmt(THD *thd, thr_lock_type lock_type); |
445 | int external_lock(THD *thd, int lock_type); |
446 | int reset(void); |
447 | int free_result(void); |
448 | }; |
449 | |
450 | extern const char ident_quote_char; // Character for quoting |
451 | // identifiers |
452 | extern const char value_quote_char; // Character for quoting |
453 | // literals |
454 | |
455 | extern bool append_ident(String *string, const char *name, size_t length, |
456 | const char quote_char); |
457 | |
458 | |
459 | extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, |
460 | FEDERATEDX_SERVER *server); |
461 | extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, |
462 | FEDERATEDX_SERVER *server); |
463 | |