| 1 | /* |
| 2 | Copyright (c) 2008, Patrick Galbraith |
| 3 | All rights reserved. |
| 4 | |
| 5 | Redistribution and use in source and binary forms, with or without |
| 6 | modification, are permitted provided that the following conditions are |
| 7 | met: |
| 8 | |
| 9 | * Redistributions of source code must retain the above copyright |
| 10 | notice, this list of conditions and the following disclaimer. |
| 11 | |
| 12 | * Redistributions in binary form must reproduce the above |
| 13 | copyright notice, this list of conditions and the following disclaimer |
| 14 | in the documentation and/or other materials provided with the |
| 15 | distribution. |
| 16 | |
| 17 | * Neither the name of Patrick Galbraith nor the names of its |
| 18 | contributors may be used to endorse or promote products derived from |
| 19 | this software without specific prior written permission. |
| 20 | |
| 21 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 22 | "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 23 | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 24 | A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 25 | OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 26 | SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 27 | LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 28 | DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 29 | THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 30 | (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 31 | OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 32 | */ |
| 33 | |
| 34 | |
| 35 | #ifdef USE_PRAGMA_INTERFACE |
| 36 | #pragma interface /* gcc class implementation */ |
| 37 | #endif |
| 38 | |
| 39 | //#include <mysql.h> |
| 40 | #include <my_global.h> |
| 41 | #include <thr_lock.h> |
| 42 | #include "handler.h" |
| 43 | |
| 44 | class federatedx_io; |
| 45 | |
| 46 | /* |
| 47 | FEDERATEDX_SERVER will eventually be a structure that will be shared among |
| 48 | all FEDERATEDX_SHARE instances so that the federated server can minimise |
| 49 | the number of open connections. This will eventually lead to the support |
| 50 | of reliable XA federated tables. |
| 51 | */ |
| 52 | typedef struct st_fedrated_server { |
| 53 | MEM_ROOT mem_root; |
| 54 | uint use_count, io_count; |
| 55 | |
| 56 | uchar *key; |
| 57 | uint key_length; |
| 58 | |
| 59 | const char *scheme; |
| 60 | const char *hostname; |
| 61 | const char *username; |
| 62 | const char *password; |
| 63 | const char *database; |
| 64 | const char *socket; |
| 65 | ushort port; |
| 66 | |
| 67 | const char *csname; |
| 68 | |
| 69 | mysql_mutex_t mutex; |
| 70 | federatedx_io *idle_list; |
| 71 | } FEDERATEDX_SERVER; |
| 72 | |
| 73 | /* |
| 74 | Please read ha_exmple.cc before reading this file. |
| 75 | Please keep in mind that the federatedx storage engine implements all methods |
| 76 | that are required to be implemented. handler.h has a full list of methods |
| 77 | that you can implement. |
| 78 | */ |
| 79 | |
| 80 | /* |
| 81 | handler::print_error has a case statement for error numbers. |
| 82 | This value is (10000) is far out of range and will envoke the |
| 83 | default: case. |
| 84 | (Current error range is 120-159 from include/my_base.h) |
| 85 | */ |
| 86 | #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 |
| 87 | |
| 88 | #define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5 |
| 89 | #define FEDERATEDX_RECORDS_IN_RANGE 2 |
| 90 | #define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb |
| 91 | |
| 92 | /* |
| 93 | FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers |
| 94 | The example implements the minimum of what you will probably need. |
| 95 | */ |
| 96 | typedef struct st_federatedx_share { |
| 97 | MEM_ROOT mem_root; |
| 98 | |
| 99 | bool parsed; |
| 100 | /* this key is unique db/tablename */ |
| 101 | const char *share_key; |
| 102 | /* |
| 103 | the primary select query to be used in rnd_init |
| 104 | */ |
| 105 | char *select_query; |
| 106 | /* |
| 107 | remote host info, parse_url supplies |
| 108 | */ |
| 109 | char *server_name; |
| 110 | char *connection_string; |
| 111 | char *scheme; |
| 112 | char *hostname; |
| 113 | char *username; |
| 114 | char *password; |
| 115 | char *database; |
| 116 | char *table_name; |
| 117 | char *table; |
| 118 | char *socket; |
| 119 | char *sport; |
| 120 | int share_key_length; |
| 121 | ushort port; |
| 122 | |
| 123 | size_t table_name_length, server_name_length, connect_string_length; |
| 124 | uint use_count; |
| 125 | THR_LOCK lock; |
| 126 | FEDERATEDX_SERVER *s; |
| 127 | } FEDERATEDX_SHARE; |
| 128 | |
| 129 | |
| 130 | typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; |
| 131 | typedef struct st_federatedx_row FEDERATEDX_IO_ROW; |
| 132 | typedef ptrdiff_t FEDERATEDX_IO_OFFSET; |
| 133 | |
| 134 | class federatedx_io |
| 135 | { |
| 136 | friend class federatedx_txn; |
| 137 | FEDERATEDX_SERVER * const server; |
| 138 | federatedx_io **owner_ptr; |
| 139 | federatedx_io *txn_next; |
| 140 | federatedx_io *idle_next; |
| 141 | bool active; /* currently participating in a transaction */ |
| 142 | bool busy; /* in use by a ha_federated instance */ |
| 143 | bool readonly;/* indicates that no updates have occurred */ |
| 144 | |
| 145 | protected: |
| 146 | void set_active(bool new_active) |
| 147 | { active= new_active; } |
| 148 | public: |
| 149 | federatedx_io(FEDERATEDX_SERVER *); |
| 150 | virtual ~federatedx_io(); |
| 151 | |
| 152 | bool is_readonly() const { return readonly; } |
| 153 | bool is_active() const { return active; } |
| 154 | |
| 155 | const char * get_charsetname() const |
| 156 | { return server->csname ? server->csname : "latin1" ; } |
| 157 | |
| 158 | const char * get_hostname() const { return server->hostname; } |
| 159 | const char * get_username() const { return server->username; } |
| 160 | const char * get_password() const { return server->password; } |
| 161 | const char * get_database() const { return server->database; } |
| 162 | ushort get_port() const { return server->port; } |
| 163 | const char * get_socket() const { return server->socket; } |
| 164 | |
| 165 | static bool handles_scheme(const char *scheme); |
| 166 | static federatedx_io *construct(MEM_ROOT *server_root, |
| 167 | FEDERATEDX_SERVER *server); |
| 168 | |
| 169 | static void *operator new(size_t size, MEM_ROOT *mem_root) throw () |
| 170 | { return alloc_root(mem_root, size); } |
| 171 | static void operator delete(void *ptr, size_t size) |
| 172 | { TRASH_FREE(ptr, size); } |
| 173 | static void operator delete(void *, MEM_ROOT *) |
| 174 | { } |
| 175 | |
| 176 | virtual int query(const char *buffer, size_t length)=0; |
| 177 | virtual FEDERATEDX_IO_RESULT *store_result()=0; |
| 178 | |
| 179 | virtual size_t max_query_size() const=0; |
| 180 | |
| 181 | virtual my_ulonglong affected_rows() const=0; |
| 182 | virtual my_ulonglong last_insert_id() const=0; |
| 183 | |
| 184 | virtual int error_code()=0; |
| 185 | virtual const char *error_str()=0; |
| 186 | |
| 187 | virtual void reset()=0; |
| 188 | virtual int commit()=0; |
| 189 | virtual int rollback()=0; |
| 190 | |
| 191 | virtual int savepoint_set(ulong sp)=0; |
| 192 | virtual ulong savepoint_release(ulong sp)=0; |
| 193 | virtual ulong savepoint_rollback(ulong sp)=0; |
| 194 | virtual void savepoint_restrict(ulong sp)=0; |
| 195 | |
| 196 | virtual ulong last_savepoint() const=0; |
| 197 | virtual ulong actual_savepoint() const=0; |
| 198 | virtual bool is_autocommit() const=0; |
| 199 | |
| 200 | virtual bool table_metadata(ha_statistics *stats, const char *table_name, |
| 201 | uint table_name_length, uint flag) = 0; |
| 202 | |
| 203 | /* resultset operations */ |
| 204 | |
| 205 | virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; |
| 206 | virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; |
| 207 | virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; |
| 208 | virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0; |
| 209 | virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; |
| 210 | virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, |
| 211 | unsigned int column)=0; |
| 212 | virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, |
| 213 | unsigned int column) const=0; |
| 214 | |
| 215 | virtual size_t get_ref_length() const=0; |
| 216 | virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, |
| 217 | void *ref)=0; |
| 218 | virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, |
| 219 | const void *ref)=0; |
| 220 | virtual void set_thd(void *thd) { } |
| 221 | |
| 222 | }; |
| 223 | |
| 224 | |
| 225 | class federatedx_txn |
| 226 | { |
| 227 | federatedx_io *txn_list; |
| 228 | ulong savepoint_level; |
| 229 | ulong savepoint_stmt; |
| 230 | ulong savepoint_next; |
| 231 | |
| 232 | void release_scan(); |
| 233 | public: |
| 234 | federatedx_txn(); |
| 235 | ~federatedx_txn(); |
| 236 | |
| 237 | bool has_connections() const { return txn_list != NULL; } |
| 238 | bool in_transaction() const { return savepoint_next != 0; } |
| 239 | int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io); |
| 240 | void release(federatedx_io **io); |
| 241 | void close(FEDERATEDX_SERVER *); |
| 242 | |
| 243 | bool txn_begin(); |
| 244 | int txn_commit(); |
| 245 | int txn_rollback(); |
| 246 | |
| 247 | bool sp_acquire(ulong *save); |
| 248 | int sp_rollback(ulong *save); |
| 249 | int sp_release(ulong *save); |
| 250 | |
| 251 | bool stmt_begin(); |
| 252 | int stmt_commit(); |
| 253 | int stmt_rollback(); |
| 254 | void stmt_autocommit(); |
| 255 | }; |
| 256 | |
| 257 | |
| 258 | /* |
| 259 | Class definition for the storage engine |
| 260 | */ |
| 261 | class ha_federatedx: public handler |
| 262 | { |
| 263 | friend int federatedx_db_init(void *p); |
| 264 | |
| 265 | THR_LOCK_DATA lock; /* MySQL lock */ |
| 266 | FEDERATEDX_SHARE *share; /* Shared lock info */ |
| 267 | federatedx_txn *txn; |
| 268 | federatedx_io *io; |
| 269 | FEDERATEDX_IO_RESULT *stored_result; |
| 270 | /** |
| 271 | Array of all stored results we get during a query execution. |
| 272 | */ |
| 273 | DYNAMIC_ARRAY results; |
| 274 | bool position_called; |
| 275 | uint fetch_num; // stores the fetch num |
| 276 | int remote_error_number; |
| 277 | char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; |
| 278 | bool ignore_duplicates, replace_duplicates; |
| 279 | bool insert_dup_update, table_will_be_deleted; |
| 280 | DYNAMIC_STRING bulk_insert; |
| 281 | |
| 282 | private: |
| 283 | /* |
| 284 | return 0 on success |
| 285 | return errorcode otherwise |
| 286 | */ |
| 287 | uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, |
| 288 | FEDERATEDX_IO_RESULT *result); |
| 289 | bool create_where_from_key(String *to, KEY *key_info, |
| 290 | const key_range *start_key, |
| 291 | const key_range *end_key, |
| 292 | bool records_in_range, bool eq_range); |
| 293 | int stash_remote_error(); |
| 294 | |
| 295 | federatedx_txn *get_txn(THD *thd, bool no_create= FALSE); |
| 296 | |
| 297 | static int disconnect(handlerton *hton, MYSQL_THD thd); |
| 298 | static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv); |
| 299 | static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv); |
| 300 | static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); |
| 301 | static int commit(handlerton *hton, MYSQL_THD thd, bool all); |
| 302 | static int rollback(handlerton *hton, MYSQL_THD thd, bool all); |
| 303 | static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, |
| 304 | HA_CREATE_INFO *); |
| 305 | |
| 306 | bool append_stmt_insert(String *query); |
| 307 | |
| 308 | int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result); |
| 309 | int index_read_idx_with_result_set(uchar *buf, uint index, |
| 310 | const uchar *key, |
| 311 | uint key_len, |
| 312 | ha_rkey_function find_flag, |
| 313 | FEDERATEDX_IO_RESULT **result); |
| 314 | int real_query(const char *query, uint length); |
| 315 | int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag); |
| 316 | public: |
| 317 | ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); |
| 318 | ~ha_federatedx() {} |
| 319 | /* |
| 320 | The name of the index type that will be used for display |
| 321 | don't implement this method unless you really have indexes |
| 322 | */ |
| 323 | // perhaps get index type |
| 324 | const char *index_type(uint inx) { return "REMOTE" ; } |
| 325 | /* |
| 326 | This is a list of flags that says what the storage engine |
| 327 | implements. The current table flags are documented in |
| 328 | handler.h |
| 329 | */ |
| 330 | ulonglong table_flags() const |
| 331 | { |
| 332 | /* fix server to be able to get remote server table flags */ |
| 333 | return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED |
| 334 | | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | |
| 335 | HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR | |
| 336 | HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | |
| 337 | HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY); |
| 338 | } |
| 339 | /* |
| 340 | This is a bitmap of flags that says how the storage engine |
| 341 | implements indexes. The current index flags are documented in |
| 342 | handler.h. If you do not implement indexes, just return zero |
| 343 | here. |
| 344 | |
| 345 | part is the key part to check. First key part is 0 |
| 346 | If all_parts it's set, MySQL want to know the flags for the combined |
| 347 | index up to and including 'part'. |
| 348 | */ |
| 349 | /* fix server to be able to get remote server index flags */ |
| 350 | ulong index_flags(uint inx, uint part, bool all_parts) const |
| 351 | { |
| 352 | return (HA_READ_NEXT | HA_READ_RANGE); |
| 353 | } |
| 354 | uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } |
| 355 | uint max_supported_keys() const { return MAX_KEY; } |
| 356 | uint max_supported_key_parts() const { return MAX_REF_PARTS; } |
| 357 | uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } |
| 358 | uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } |
| 359 | /* |
| 360 | Called in test_quick_select to determine if indexes should be used. |
| 361 | Normally, we need to know number of blocks . For federatedx we need to |
| 362 | know number of blocks on remote side, and number of packets and blocks |
| 363 | on the network side (?) |
| 364 | Talk to Kostja about this - how to get the |
| 365 | number of rows * ... |
| 366 | disk scan time on other side (block size, size of the row) + network time ... |
| 367 | The reason for "records * 1000" is that such a large number forces |
| 368 | this to use indexes " |
| 369 | */ |
| 370 | double scan_time() |
| 371 | { |
| 372 | DBUG_PRINT("info" , ("records %lu" , (ulong) stats.records)); |
| 373 | return (double)(stats.records*1000); |
| 374 | } |
| 375 | /* |
| 376 | The next method will never be called if you do not implement indexes. |
| 377 | */ |
| 378 | double read_time(uint index, uint ranges, ha_rows rows) |
| 379 | { |
| 380 | /* |
| 381 | Per Brian, this number is bugus, but this method must be implemented, |
| 382 | and at a later date, he intends to document this issue for handler code |
| 383 | */ |
| 384 | return (double) rows / 20.0+1; |
| 385 | } |
| 386 | |
| 387 | const key_map *keys_to_use_for_scanning() { return &key_map_full; } |
| 388 | /* |
| 389 | Everything below are methods that we implment in ha_federatedx.cc. |
| 390 | |
| 391 | Most of these methods are not obligatory, skip them and |
| 392 | MySQL will treat them as not implemented |
| 393 | */ |
| 394 | int open(const char *name, int mode, uint test_if_locked); // required |
| 395 | int close(void); // required |
| 396 | |
| 397 | void start_bulk_insert(ha_rows rows, uint flags); |
| 398 | int end_bulk_insert(); |
| 399 | int write_row(uchar *buf); |
| 400 | int update_row(const uchar *old_data, const uchar *new_data); |
| 401 | int delete_row(const uchar *buf); |
| 402 | int index_init(uint keynr, bool sorted); |
| 403 | ha_rows estimate_rows_upper_bound(); |
| 404 | int index_read(uchar *buf, const uchar *key, |
| 405 | uint key_len, enum ha_rkey_function find_flag); |
| 406 | int index_read_idx(uchar *buf, uint idx, const uchar *key, |
| 407 | uint key_len, enum ha_rkey_function find_flag); |
| 408 | int index_next(uchar *buf); |
| 409 | int index_end(); |
| 410 | int read_range_first(const key_range *start_key, |
| 411 | const key_range *end_key, |
| 412 | bool eq_range, bool sorted); |
| 413 | int read_range_next(); |
| 414 | /* |
| 415 | unlike index_init(), rnd_init() can be called two times |
| 416 | without rnd_end() in between (it only makes sense if scan=1). |
| 417 | then the second call should prepare for the new table scan |
| 418 | (e.g if rnd_init allocates the cursor, second call should |
| 419 | position it to the start of the table, no need to deallocate |
| 420 | and allocate it again |
| 421 | */ |
| 422 | int rnd_init(bool scan); //required |
| 423 | int rnd_end(); |
| 424 | int rnd_next(uchar *buf); //required |
| 425 | int rnd_pos(uchar *buf, uchar *pos); //required |
| 426 | void position(const uchar *record); //required |
| 427 | int info(uint); //required |
| 428 | int (ha_extra_function operation); |
| 429 | |
| 430 | void update_auto_increment(void); |
| 431 | int repair(THD* thd, HA_CHECK_OPT* check_opt); |
| 432 | int optimize(THD* thd, HA_CHECK_OPT* check_opt); |
| 433 | |
| 434 | int delete_all_rows(void); |
| 435 | int create(const char *name, TABLE *form, |
| 436 | HA_CREATE_INFO *create_info); //required |
| 437 | ha_rows records_in_range(uint inx, key_range *start_key, |
| 438 | key_range *end_key); |
| 439 | uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; } |
| 440 | |
| 441 | THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, |
| 442 | enum thr_lock_type lock_type); //required |
| 443 | bool get_error_message(int error, String *buf); |
| 444 | int start_stmt(THD *thd, thr_lock_type lock_type); |
| 445 | int external_lock(THD *thd, int lock_type); |
| 446 | int reset(void); |
| 447 | int free_result(void); |
| 448 | }; |
| 449 | |
| 450 | extern const char ident_quote_char; // Character for quoting |
| 451 | // identifiers |
| 452 | extern const char value_quote_char; // Character for quoting |
| 453 | // literals |
| 454 | |
| 455 | extern bool append_ident(String *string, const char *name, size_t length, |
| 456 | const char quote_char); |
| 457 | |
| 458 | |
| 459 | extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, |
| 460 | FEDERATEDX_SERVER *server); |
| 461 | extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, |
| 462 | FEDERATEDX_SERVER *server); |
| 463 | |