1/* Copyright (C) 2008-2017 Kentoku Shiba
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15
16#define MYSQL_SERVER 1
17#include <my_global.h>
18#include "mysql_version.h"
19#include "spd_environ.h"
20#if MYSQL_VERSION_ID < 50500
21#include "mysql_priv.h"
22#include <mysql/plugin.h>
23#else
24#include "sql_priv.h"
25#include "probes_mysql.h"
26#include "sql_class.h"
27#include "sql_partition.h"
28#include "tztime.h"
29#endif
30#include "spd_err.h"
31#include "spd_param.h"
32#include "spd_db_include.h"
33#include "spd_include.h"
34#include "ha_spider.h"
35#include "spd_db_conn.h"
36#include "spd_trx.h"
37#include "spd_conn.h"
38#include "spd_table.h"
39#include "spd_direct_sql.h"
40#include "spd_ping_table.h"
41#include "spd_malloc.h"
42#include "spd_err.h"
43
44#ifdef SPIDER_HAS_NEXT_THREAD_ID
45#define SPIDER_set_next_thread_id(A)
46#else
47extern ulong *spd_db_att_thread_id;
48inline void SPIDER_set_next_thread_id(THD *A)
49{
50 pthread_mutex_lock(&LOCK_thread_count);
51 A->thread_id = (*spd_db_att_thread_id)++;
52 pthread_mutex_unlock(&LOCK_thread_count);
53}
54#endif
55
56extern handlerton *spider_hton_ptr;
57extern SPIDER_DBTON spider_dbton[SPIDER_DBTON_SIZE];
58pthread_mutex_t spider_conn_id_mutex;
59pthread_mutex_t spider_ipport_conn_mutex;
60ulonglong spider_conn_id = 1;
61
62#ifndef WITHOUT_SPIDER_BG_SEARCH
63extern pthread_attr_t spider_pt_attr;
64
65#ifdef HAVE_PSI_INTERFACE
66extern PSI_mutex_key spd_key_mutex_mta_conn;
67extern PSI_mutex_key spd_key_mutex_conn_i;
68extern PSI_cond_key spd_key_cond_conn_i;
69#ifndef WITHOUT_SPIDER_BG_SEARCH
70extern PSI_mutex_key spd_key_mutex_bg_conn_chain;
71extern PSI_mutex_key spd_key_mutex_bg_conn_sync;
72extern PSI_mutex_key spd_key_mutex_bg_conn;
73extern PSI_mutex_key spd_key_mutex_bg_job_stack;
74extern PSI_mutex_key spd_key_mutex_bg_mon;
75extern PSI_cond_key spd_key_cond_bg_conn_sync;
76extern PSI_cond_key spd_key_cond_bg_conn;
77extern PSI_cond_key spd_key_cond_bg_sts;
78extern PSI_cond_key spd_key_cond_bg_sts_sync;
79extern PSI_cond_key spd_key_cond_bg_crd;
80extern PSI_cond_key spd_key_cond_bg_crd_sync;
81extern PSI_cond_key spd_key_cond_bg_mon;
82extern PSI_cond_key spd_key_cond_bg_mon_sleep;
83extern PSI_thread_key spd_key_thd_bg;
84extern PSI_thread_key spd_key_thd_bg_sts;
85extern PSI_thread_key spd_key_thd_bg_crd;
86extern PSI_thread_key spd_key_thd_bg_mon;
87#endif
88#endif
89#endif
90
91HASH spider_open_connections;
92uint spider_open_connections_id;
93HASH spider_ipport_conns;
94long spider_conn_mutex_id = 0;
95
96const char *spider_open_connections_func_name;
97const char *spider_open_connections_file_name;
98ulong spider_open_connections_line_no;
99pthread_mutex_t spider_conn_mutex;
100#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
101HASH spider_hs_r_conn_hash;
102uint spider_hs_r_conn_hash_id;
103const char *spider_hs_r_conn_hash_func_name;
104const char *spider_hs_r_conn_hash_file_name;
105ulong spider_hs_r_conn_hash_line_no;
106pthread_mutex_t spider_hs_r_conn_mutex;
107HASH spider_hs_w_conn_hash;
108uint spider_hs_w_conn_hash_id;
109const char *spider_hs_w_conn_hash_func_name;
110const char *spider_hs_w_conn_hash_file_name;
111ulong spider_hs_w_conn_hash_line_no;
112pthread_mutex_t spider_hs_w_conn_mutex;
113#endif
114
115/* for spider_open_connections and trx_conn_hash */
116uchar *spider_conn_get_key(
117 SPIDER_CONN *conn,
118 size_t *length,
119 my_bool not_used __attribute__ ((unused))
120) {
121 DBUG_ENTER("spider_conn_get_key");
122 *length = conn->conn_key_length;
123 DBUG_PRINT("info",("spider conn_kind=%u", conn->conn_kind));
124#ifndef DBUG_OFF
125 spider_print_keys(conn->conn_key, conn->conn_key_length);
126#endif
127 DBUG_RETURN((uchar*) conn->conn_key);
128}
129
130uchar *spider_ipport_conn_get_key(
131 SPIDER_IP_PORT_CONN *ip_port,
132 size_t *length,
133 my_bool not_used __attribute__ ((unused))
134)
135{
136 DBUG_ENTER("spider_ipport_conn_get_key");
137 *length = ip_port->key_len;
138 DBUG_RETURN((uchar*) ip_port->key);
139}
140
141int spider_reset_conn_setted_parameter(
142 SPIDER_CONN *conn,
143 THD *thd
144) {
145 DBUG_ENTER("spider_reset_conn_setted_parameter");
146 conn->autocommit = spider_param_remote_autocommit();
147 conn->sql_log_off = spider_param_remote_sql_log_off();
148 if (thd && spider_param_remote_time_zone())
149 {
150 int tz_length = strlen(spider_param_remote_time_zone());
151 String tz_str(spider_param_remote_time_zone(), tz_length,
152 &my_charset_latin1);
153 conn->time_zone = my_tz_find(thd, &tz_str);
154 } else
155 conn->time_zone = NULL;
156 conn->trx_isolation = spider_param_remote_trx_isolation();
157 DBUG_PRINT("info",("spider conn->trx_isolation=%d", conn->trx_isolation));
158 if (spider_param_remote_access_charset())
159 {
160 if (!(conn->access_charset =
161 get_charset_by_csname(spider_param_remote_access_charset(),
162 MY_CS_PRIMARY, MYF(MY_WME))))
163 DBUG_RETURN(ER_UNKNOWN_CHARACTER_SET);
164 } else
165 conn->access_charset = NULL;
166 char *default_database = spider_param_remote_default_database();
167 if (default_database)
168 {
169 uint default_database_length = strlen(default_database);
170 if (conn->default_database.reserve(default_database_length + 1))
171 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
172 conn->default_database.q_append(default_database,
173 default_database_length + 1);
174 conn->default_database.length(default_database_length);
175 } else
176 conn->default_database.length(0);
177 DBUG_RETURN(0);
178}
179
180int spider_free_conn_alloc(
181 SPIDER_CONN *conn
182) {
183 DBUG_ENTER("spider_free_conn_alloc");
184#ifndef WITHOUT_SPIDER_BG_SEARCH
185 spider_free_conn_thread(conn);
186#endif
187 spider_db_disconnect(conn);
188 if (conn->db_conn)
189 {
190 delete conn->db_conn;
191 conn->db_conn = NULL;
192 }
193 DBUG_ASSERT(!conn->mta_conn_mutex_file_pos.file_name);
194 pthread_mutex_destroy(&conn->mta_conn_mutex);
195 conn->default_database.free();
196 DBUG_RETURN(0);
197}
198
199void spider_free_conn_from_trx(
200 SPIDER_TRX *trx,
201 SPIDER_CONN *conn,
202 bool another,
203 bool trx_free,
204 int *roop_count
205) {
206 ha_spider *spider;
207 SPIDER_IP_PORT_CONN *ip_port_conn = conn->ip_port_conn;
208 DBUG_ENTER("spider_free_conn_from_trx");
209 spider_conn_clear_queue(conn);
210 conn->use_for_active_standby = FALSE;
211 conn->error_mode = 1;
212#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
213 if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
214 {
215#endif
216 if (
217 trx_free ||
218 (
219 (
220 conn->server_lost ||
221 spider_param_conn_recycle_mode(trx->thd) != 2
222 ) &&
223 !conn->opened_handlers
224 )
225 ) {
226 conn->thd = NULL;
227 if (another)
228 {
229 ha_spider *next_spider;
230#ifdef HASH_UPDATE_WITH_HASH_VALUE
231 my_hash_delete_with_hash_value(&trx->trx_another_conn_hash,
232 conn->conn_key_hash_value, (uchar*) conn);
233#else
234 my_hash_delete(&trx->trx_another_conn_hash, (uchar*) conn);
235#endif
236 spider = (ha_spider*) conn->another_ha_first;
237 while (spider)
238 {
239 next_spider = spider->next;
240 spider_free_tmp_dbton_handler(spider);
241 spider_free_tmp_dbton_share(spider->share);
242 spider_free_tmp_share_alloc(spider->share);
243 spider_free(spider_current_trx, spider->share, MYF(0));
244 delete spider;
245 spider = next_spider;
246 }
247 conn->another_ha_first = NULL;
248 conn->another_ha_last = NULL;
249 } else {
250#ifdef HASH_UPDATE_WITH_HASH_VALUE
251 my_hash_delete_with_hash_value(&trx->trx_conn_hash,
252 conn->conn_key_hash_value, (uchar*) conn);
253#else
254 my_hash_delete(&trx->trx_conn_hash, (uchar*) conn);
255#endif
256 }
257
258 if (
259 !trx_free &&
260 !conn->server_lost &&
261 !conn->queued_connect &&
262 spider_param_conn_recycle_mode(trx->thd) == 1
263 ) {
264 /* conn_recycle_mode == 1 */
265 *conn->conn_key = '0';
266 conn->casual_read_base_conn = NULL;
267 if (
268 conn->quick_target &&
269 spider_db_free_result((ha_spider *) conn->quick_target, FALSE)
270 ) {
271 spider_free_conn(conn);
272 } else {
273 pthread_mutex_lock(&spider_conn_mutex);
274 uint old_elements = spider_open_connections.array.max_element;
275#ifdef HASH_UPDATE_WITH_HASH_VALUE
276 if (my_hash_insert_with_hash_value(&spider_open_connections,
277 conn->conn_key_hash_value, (uchar*) conn))
278#else
279 if (my_hash_insert(&spider_open_connections, (uchar*) conn))
280#endif
281 {
282 pthread_mutex_unlock(&spider_conn_mutex);
283 spider_free_conn(conn);
284 } else {
285 if (ip_port_conn)
286 { /* exists */
287 if (ip_port_conn->waiting_count)
288 {
289 pthread_mutex_lock(&ip_port_conn->mutex);
290 pthread_cond_signal(&ip_port_conn->cond);
291 pthread_mutex_unlock(&ip_port_conn->mutex);
292 }
293 }
294 if (spider_open_connections.array.max_element > old_elements)
295 {
296 spider_alloc_calc_mem(spider_current_trx,
297 spider_open_connections,
298 (spider_open_connections.array.max_element - old_elements) *
299 spider_open_connections.array.size_of_element);
300 }
301 pthread_mutex_unlock(&spider_conn_mutex);
302 }
303 }
304 } else {
305 /* conn_recycle_mode == 0 */
306 spider_free_conn(conn);
307 }
308 } else if (roop_count)
309 (*roop_count)++;
310#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
311 } else if (conn->conn_kind == SPIDER_CONN_KIND_HS_READ)
312 {
313 spider_db_hs_request_buf_reset(conn);
314 if (
315 trx_free ||
316 (
317 (
318 conn->server_lost ||
319 spider_param_hs_r_conn_recycle_mode(trx->thd) != 2
320 ) &&
321 !conn->opened_handlers
322 )
323 ) {
324 conn->thd = NULL;
325#ifdef HASH_UPDATE_WITH_HASH_VALUE
326 my_hash_delete_with_hash_value(&trx->trx_hs_r_conn_hash,
327 conn->conn_key_hash_value, (uchar*) conn);
328#else
329 my_hash_delete(&trx->trx_hs_r_conn_hash, (uchar*) conn);
330#endif
331
332 DBUG_ASSERT(conn->opened_handlers ==
333 conn->db_conn->get_opened_handler_count());
334 if (conn->db_conn->get_opened_handler_count())
335 {
336 conn->db_conn->reset_opened_handler();
337 }
338
339 if (
340 !trx_free &&
341 !conn->server_lost &&
342 !conn->queued_connect &&
343 spider_param_hs_r_conn_recycle_mode(trx->thd) == 1
344 ) {
345 /* conn_recycle_mode == 1 */
346 *conn->conn_key = '0';
347 pthread_mutex_lock(&spider_hs_r_conn_mutex);
348 uint old_elements = spider_hs_r_conn_hash.array.max_element;
349#ifdef HASH_UPDATE_WITH_HASH_VALUE
350 if (my_hash_insert_with_hash_value(&spider_hs_r_conn_hash,
351 conn->conn_key_hash_value, (uchar*) conn))
352#else
353 if (my_hash_insert(&spider_hs_r_conn_hash, (uchar*) conn))
354#endif
355 {
356 pthread_mutex_unlock(&spider_hs_r_conn_mutex);
357 spider_free_conn(conn);
358 } else {
359 if (spider_hs_r_conn_hash.array.max_element > old_elements)
360 {
361 spider_alloc_calc_mem(spider_current_trx,
362 spider_hs_r_conn_hash,
363 (spider_hs_r_conn_hash.array.max_element - old_elements) *
364 spider_hs_r_conn_hash.array.size_of_element);
365 }
366 pthread_mutex_unlock(&spider_hs_r_conn_mutex);
367 }
368 } else {
369 /* conn_recycle_mode == 0 */
370 spider_free_conn(conn);
371 }
372 } else if (roop_count)
373 (*roop_count)++;
374 } else {
375 spider_db_hs_request_buf_reset(conn);
376 if (
377 trx_free ||
378 (
379 (
380 conn->server_lost ||
381 spider_param_hs_w_conn_recycle_mode(trx->thd) != 2
382 ) &&
383 !conn->opened_handlers
384 )
385 ) {
386 conn->thd = NULL;
387#ifdef HASH_UPDATE_WITH_HASH_VALUE
388 my_hash_delete_with_hash_value(&trx->trx_hs_w_conn_hash,
389 conn->conn_key_hash_value, (uchar*) conn);
390#else
391 my_hash_delete(&trx->trx_hs_w_conn_hash, (uchar*) conn);
392#endif
393
394 DBUG_ASSERT(conn->opened_handlers ==
395 conn->db_conn->get_opened_handler_count());
396 if (conn->db_conn->get_opened_handler_count())
397 {
398 conn->db_conn->reset_opened_handler();
399 }
400
401 if (
402 !trx_free &&
403 !conn->server_lost &&
404 !conn->queued_connect &&
405 spider_param_hs_w_conn_recycle_mode(trx->thd) == 1
406 ) {
407 /* conn_recycle_mode == 1 */
408 *conn->conn_key = '0';
409 pthread_mutex_lock(&spider_hs_w_conn_mutex);
410 uint old_elements = spider_hs_w_conn_hash.array.max_element;
411#ifdef HASH_UPDATE_WITH_HASH_VALUE
412 if (my_hash_insert_with_hash_value(&spider_hs_w_conn_hash,
413 conn->conn_key_hash_value, (uchar*) conn))
414#else
415 if (my_hash_insert(&spider_hs_w_conn_hash, (uchar*) conn))
416#endif
417 {
418 pthread_mutex_unlock(&spider_hs_w_conn_mutex);
419 spider_free_conn(conn);
420 } else {
421 if (spider_hs_w_conn_hash.array.max_element > old_elements)
422 {
423 spider_alloc_calc_mem(spider_current_trx,
424 spider_hs_w_conn_hash,
425 (spider_hs_w_conn_hash.array.max_element - old_elements) *
426 spider_hs_w_conn_hash.array.size_of_element);
427 }
428 pthread_mutex_unlock(&spider_hs_w_conn_mutex);
429 }
430 } else {
431 /* conn_recycle_mode == 0 */
432 spider_free_conn(conn);
433 }
434 } else if (roop_count)
435 (*roop_count)++;
436 }
437#endif
438 DBUG_VOID_RETURN;
439}
440
441SPIDER_CONN *spider_create_conn(
442 SPIDER_SHARE *share,
443 ha_spider *spider,
444 int link_idx,
445 int base_link_idx,
446 uint conn_kind,
447 int *error_num
448) {
449 int *need_mon;
450 SPIDER_CONN *conn;
451 SPIDER_IP_PORT_CONN *ip_port_conn;
452 char *tmp_name, *tmp_host, *tmp_username, *tmp_password, *tmp_socket;
453 char *tmp_wrapper, *tmp_ssl_ca, *tmp_ssl_capath, *tmp_ssl_cert;
454 char *tmp_ssl_cipher, *tmp_ssl_key, *tmp_default_file, *tmp_default_group;
455 DBUG_ENTER("spider_create_conn");
456
457#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
458 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
459 {
460#endif
461 if (!(conn = (SPIDER_CONN *)
462 spider_bulk_malloc(spider_current_trx, 18, MYF(MY_WME | MY_ZEROFILL),
463 &conn, sizeof(*conn),
464 &tmp_name, share->conn_keys_lengths[link_idx] + 1,
465 &tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
466 &tmp_username,
467 share->tgt_usernames_lengths[link_idx] + 1,
468 &tmp_password,
469 share->tgt_passwords_lengths[link_idx] + 1,
470 &tmp_socket, share->tgt_sockets_lengths[link_idx] + 1,
471 &tmp_wrapper,
472 share->tgt_wrappers_lengths[link_idx] + 1,
473 &tmp_ssl_ca, share->tgt_ssl_cas_lengths[link_idx] + 1,
474 &tmp_ssl_capath,
475 share->tgt_ssl_capaths_lengths[link_idx] + 1,
476 &tmp_ssl_cert,
477 share->tgt_ssl_certs_lengths[link_idx] + 1,
478 &tmp_ssl_cipher,
479 share->tgt_ssl_ciphers_lengths[link_idx] + 1,
480 &tmp_ssl_key,
481 share->tgt_ssl_keys_lengths[link_idx] + 1,
482 &tmp_default_file,
483 share->tgt_default_files_lengths[link_idx] + 1,
484 &tmp_default_group,
485 share->tgt_default_groups_lengths[link_idx] + 1,
486 &need_mon, sizeof(int),
487 NullS))
488 ) {
489 *error_num = HA_ERR_OUT_OF_MEM;
490 goto error_alloc_conn;
491 }
492
493 conn->default_database.init_calc_mem(75);
494 conn->conn_key_length = share->conn_keys_lengths[link_idx];
495 conn->conn_key = tmp_name;
496 memcpy(conn->conn_key, share->conn_keys[link_idx],
497 share->conn_keys_lengths[link_idx]);
498#ifdef SPIDER_HAS_HASH_VALUE_TYPE
499 conn->conn_key_hash_value = share->conn_keys_hash_value[link_idx];
500#endif
501 conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
502 conn->tgt_host = tmp_host;
503 memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
504 share->tgt_hosts_lengths[link_idx]);
505 conn->tgt_username_length = share->tgt_usernames_lengths[link_idx];
506 conn->tgt_username = tmp_username;
507 memcpy(conn->tgt_username, share->tgt_usernames[link_idx],
508 share->tgt_usernames_lengths[link_idx]);
509 conn->tgt_password_length = share->tgt_passwords_lengths[link_idx];
510 conn->tgt_password = tmp_password;
511 memcpy(conn->tgt_password, share->tgt_passwords[link_idx],
512 share->tgt_passwords_lengths[link_idx]);
513 conn->tgt_socket_length = share->tgt_sockets_lengths[link_idx];
514 conn->tgt_socket = tmp_socket;
515 memcpy(conn->tgt_socket, share->tgt_sockets[link_idx],
516 share->tgt_sockets_lengths[link_idx]);
517 conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
518 conn->tgt_wrapper = tmp_wrapper;
519 memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
520 share->tgt_wrappers_lengths[link_idx]);
521 conn->tgt_ssl_ca_length = share->tgt_ssl_cas_lengths[link_idx];
522 if (conn->tgt_ssl_ca_length)
523 {
524 conn->tgt_ssl_ca = tmp_ssl_ca;
525 memcpy(conn->tgt_ssl_ca, share->tgt_ssl_cas[link_idx],
526 share->tgt_ssl_cas_lengths[link_idx]);
527 } else
528 conn->tgt_ssl_ca = NULL;
529 conn->tgt_ssl_capath_length = share->tgt_ssl_capaths_lengths[link_idx];
530 if (conn->tgt_ssl_capath_length)
531 {
532 conn->tgt_ssl_capath = tmp_ssl_capath;
533 memcpy(conn->tgt_ssl_capath, share->tgt_ssl_capaths[link_idx],
534 share->tgt_ssl_capaths_lengths[link_idx]);
535 } else
536 conn->tgt_ssl_capath = NULL;
537 conn->tgt_ssl_cert_length = share->tgt_ssl_certs_lengths[link_idx];
538 if (conn->tgt_ssl_cert_length)
539 {
540 conn->tgt_ssl_cert = tmp_ssl_cert;
541 memcpy(conn->tgt_ssl_cert, share->tgt_ssl_certs[link_idx],
542 share->tgt_ssl_certs_lengths[link_idx]);
543 } else
544 conn->tgt_ssl_cert = NULL;
545 conn->tgt_ssl_cipher_length = share->tgt_ssl_ciphers_lengths[link_idx];
546 if (conn->tgt_ssl_cipher_length)
547 {
548 conn->tgt_ssl_cipher = tmp_ssl_cipher;
549 memcpy(conn->tgt_ssl_cipher, share->tgt_ssl_ciphers[link_idx],
550 share->tgt_ssl_ciphers_lengths[link_idx]);
551 } else
552 conn->tgt_ssl_cipher = NULL;
553 conn->tgt_ssl_key_length = share->tgt_ssl_keys_lengths[link_idx];
554 if (conn->tgt_ssl_key_length)
555 {
556 conn->tgt_ssl_key = tmp_ssl_key;
557 memcpy(conn->tgt_ssl_key, share->tgt_ssl_keys[link_idx],
558 share->tgt_ssl_keys_lengths[link_idx]);
559 } else
560 conn->tgt_ssl_key = NULL;
561 conn->tgt_default_file_length = share->tgt_default_files_lengths[link_idx];
562 if (conn->tgt_default_file_length)
563 {
564 conn->tgt_default_file = tmp_default_file;
565 memcpy(conn->tgt_default_file, share->tgt_default_files[link_idx],
566 share->tgt_default_files_lengths[link_idx]);
567 } else
568 conn->tgt_default_file = NULL;
569 conn->tgt_default_group_length =
570 share->tgt_default_groups_lengths[link_idx];
571 if (conn->tgt_default_group_length)
572 {
573 conn->tgt_default_group = tmp_default_group;
574 memcpy(conn->tgt_default_group, share->tgt_default_groups[link_idx],
575 share->tgt_default_groups_lengths[link_idx]);
576 } else
577 conn->tgt_default_group = NULL;
578 conn->tgt_port = share->tgt_ports[link_idx];
579 conn->tgt_ssl_vsc = share->tgt_ssl_vscs[link_idx];
580 conn->dbton_id = share->sql_dbton_ids[link_idx];
581#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
582 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ) {
583 if (!(conn = (SPIDER_CONN *)
584 spider_bulk_malloc(spider_current_trx, 19, MYF(MY_WME | MY_ZEROFILL),
585 &conn, sizeof(*conn),
586 &tmp_name, share->hs_read_conn_keys_lengths[link_idx] + 1,
587 &tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
588 &tmp_socket, share->hs_read_socks_lengths[link_idx] + 1,
589 &tmp_wrapper,
590 share->tgt_wrappers_lengths[link_idx] + 1,
591 &need_mon, sizeof(int),
592 NullS))
593 ) {
594 *error_num = HA_ERR_OUT_OF_MEM;
595 goto error_alloc_conn;
596 }
597
598 conn->default_database.init_calc_mem(76);
599 conn->conn_key_length = share->hs_read_conn_keys_lengths[link_idx];
600 conn->conn_key = tmp_name;
601 memcpy(conn->conn_key, share->hs_read_conn_keys[link_idx],
602 share->hs_read_conn_keys_lengths[link_idx]);
603#ifdef SPIDER_HAS_HASH_VALUE_TYPE
604 conn->conn_key_hash_value = share->hs_read_conn_keys_hash_value[link_idx];
605#endif
606 conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
607 conn->tgt_host = tmp_host;
608 memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
609 share->tgt_hosts_lengths[link_idx]);
610 conn->hs_sock_length = share->hs_read_socks_lengths[link_idx];
611 if (conn->hs_sock_length)
612 {
613 conn->hs_sock = tmp_socket;
614 memcpy(conn->hs_sock, share->hs_read_socks[link_idx],
615 share->hs_read_socks_lengths[link_idx]);
616 } else
617 conn->hs_sock = NULL;
618 conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
619 conn->tgt_wrapper = tmp_wrapper;
620 memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
621 share->tgt_wrappers_lengths[link_idx]);
622 conn->hs_port = share->hs_read_ports[link_idx];
623 conn->dbton_id = share->hs_dbton_ids[link_idx];
624 } else {
625 if (!(conn = (SPIDER_CONN *)
626 spider_bulk_malloc(spider_current_trx, 20, MYF(MY_WME | MY_ZEROFILL),
627 &conn, sizeof(*conn),
628 &tmp_name, share->hs_write_conn_keys_lengths[link_idx] + 1,
629 &tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
630 &tmp_socket, share->hs_write_socks_lengths[link_idx] + 1,
631 &tmp_wrapper,
632 share->tgt_wrappers_lengths[link_idx] + 1,
633 &need_mon, sizeof(int),
634 NullS))
635 ) {
636 *error_num = HA_ERR_OUT_OF_MEM;
637 goto error_alloc_conn;
638 }
639
640 conn->default_database.init_calc_mem(77);
641 conn->conn_key_length = share->hs_write_conn_keys_lengths[link_idx];
642 conn->conn_key = tmp_name;
643 memcpy(conn->conn_key, share->hs_write_conn_keys[link_idx],
644 share->hs_write_conn_keys_lengths[link_idx]);
645#ifdef SPIDER_HAS_HASH_VALUE_TYPE
646 conn->conn_key_hash_value = share->hs_write_conn_keys_hash_value[link_idx];
647#endif
648 conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
649 conn->tgt_host = tmp_host;
650 memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
651 share->tgt_hosts_lengths[link_idx]);
652 conn->hs_sock_length = share->hs_write_socks_lengths[link_idx];
653 if (conn->hs_sock_length)
654 {
655 conn->hs_sock = tmp_socket;
656 memcpy(conn->hs_sock, share->hs_write_socks[link_idx],
657 share->hs_write_socks_lengths[link_idx]);
658 } else
659 conn->hs_sock = NULL;
660 conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
661 conn->tgt_wrapper = tmp_wrapper;
662 memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
663 share->tgt_wrappers_lengths[link_idx]);
664 conn->hs_port = share->hs_write_ports[link_idx];
665 conn->dbton_id = share->hs_dbton_ids[link_idx];
666 }
667#endif
668 if (conn->dbton_id == SPIDER_DBTON_SIZE)
669 {
670#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
671 if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
672 {
673#endif
674 my_printf_error(
675 ER_SPIDER_SQL_WRAPPER_IS_INVALID_NUM,
676 ER_SPIDER_SQL_WRAPPER_IS_INVALID_STR,
677 MYF(0), conn->tgt_wrapper);
678 *error_num = ER_SPIDER_SQL_WRAPPER_IS_INVALID_NUM;
679#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
680 } else {
681 my_printf_error(
682 ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_NUM,
683 ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_STR,
684 MYF(0), conn->tgt_wrapper);
685 *error_num = ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_NUM;
686 }
687#endif
688 goto error_invalid_wrapper;
689 }
690 if (!(conn->db_conn = spider_dbton[conn->dbton_id].create_db_conn(conn)))
691 {
692 *error_num = HA_ERR_OUT_OF_MEM;
693 goto error_db_conn_create;
694 }
695 if ((*error_num = conn->db_conn->init()))
696 {
697 goto error_db_conn_init;
698 }
699 conn->join_trx = 0;
700 conn->thd = NULL;
701 conn->table_lock = 0;
702 conn->semi_trx_isolation = -2;
703 conn->semi_trx_isolation_chk = FALSE;
704 conn->semi_trx_chk = FALSE;
705 conn->link_idx = base_link_idx;
706 conn->conn_kind = conn_kind;
707 conn->conn_need_mon = need_mon;
708 if (spider)
709 conn->need_mon = &spider->need_mons[base_link_idx];
710 else
711 conn->need_mon = need_mon;
712
713#if MYSQL_VERSION_ID < 50500
714 if (pthread_mutex_init(&conn->mta_conn_mutex, MY_MUTEX_INIT_FAST))
715#else
716 if (mysql_mutex_init(spd_key_mutex_mta_conn, &conn->mta_conn_mutex,
717 MY_MUTEX_INIT_FAST))
718#endif
719 {
720 *error_num = HA_ERR_OUT_OF_MEM;
721 goto error_mta_conn_mutex_init;
722 }
723
724 spider_conn_queue_connect(share, conn, link_idx);
725 conn->ping_time = (time_t) time((time_t*) 0);
726 conn->connect_error_time = conn->ping_time;
727 pthread_mutex_lock(&spider_conn_id_mutex);
728 conn->conn_id = spider_conn_id;
729 ++spider_conn_id;
730 pthread_mutex_unlock(&spider_conn_id_mutex);
731
732 pthread_mutex_lock(&spider_ipport_conn_mutex);
733#ifdef SPIDER_HAS_HASH_VALUE_TYPE
734 if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search_using_hash_value(
735 &spider_ipport_conns, conn->conn_key_hash_value,
736 (uchar*)conn->conn_key, conn->conn_key_length)))
737#else
738 if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search(
739 &spider_ipport_conns, (uchar*)conn->conn_key, conn->conn_key_length)))
740#endif
741 { /* exists, +1 */
742 pthread_mutex_unlock(&spider_ipport_conn_mutex);
743 pthread_mutex_lock(&ip_port_conn->mutex);
744 if (spider_param_max_connections())
745 { /* enable conncetion pool */
746 if (ip_port_conn->ip_port_count >= spider_param_max_connections())
747 { /* bigger than the max num of connections, free conn and return NULL */
748 pthread_mutex_unlock(&ip_port_conn->mutex);
749 *error_num = ER_SPIDER_CON_COUNT_ERROR;
750 goto error_too_many_ipport_count;
751 }
752 }
753 ip_port_conn->ip_port_count++;
754 pthread_mutex_unlock(&ip_port_conn->mutex);
755 }
756 else
757 {// do not exist
758 ip_port_conn = spider_create_ipport_conn(conn);
759 if (!ip_port_conn) {
760 /* failed, always do not effect 'create conn' */
761 pthread_mutex_unlock(&spider_ipport_conn_mutex);
762 DBUG_RETURN(conn);
763 }
764 if (my_hash_insert(&spider_ipport_conns, (uchar *)ip_port_conn)) {
765 /* insert failed, always do not effect 'create conn' */
766 pthread_mutex_unlock(&spider_ipport_conn_mutex);
767 DBUG_RETURN(conn);
768 }
769 pthread_mutex_unlock(&spider_ipport_conn_mutex);
770 }
771 conn->ip_port_conn = ip_port_conn;
772
773 DBUG_RETURN(conn);
774
775/*
776error_init_lock_table_hash:
777 DBUG_ASSERT(!conn->mta_conn_mutex_file_pos.file_name);
778 pthread_mutex_destroy(&conn->mta_conn_mutex);
779*/
780error_too_many_ipport_count:
781error_mta_conn_mutex_init:
782error_db_conn_init:
783 delete conn->db_conn;
784error_db_conn_create:
785error_invalid_wrapper:
786 spider_free(spider_current_trx, conn, MYF(0));
787error_alloc_conn:
788 DBUG_RETURN(NULL);
789}
790
791SPIDER_CONN *spider_get_conn(
792 SPIDER_SHARE *share,
793 int link_idx,
794 char *conn_key,
795 SPIDER_TRX *trx,
796 ha_spider *spider,
797 bool another,
798 bool thd_chg,
799 uint conn_kind,
800 int *error_num
801) {
802 SPIDER_CONN *conn = NULL;
803 int base_link_idx = link_idx;
804 DBUG_ENTER("spider_get_conn");
805 DBUG_PRINT("info",("spider conn_kind=%u", conn_kind));
806
807 if (spider)
808 link_idx = spider->conn_link_idx[base_link_idx];
809 DBUG_PRINT("info",("spider link_idx=%u", link_idx));
810 DBUG_PRINT("info",("spider base_link_idx=%u", base_link_idx));
811
812#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
813 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
814 {
815#endif
816#ifndef DBUG_OFF
817 spider_print_keys(conn_key, share->conn_keys_lengths[link_idx]);
818#endif
819#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
820 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
821 {
822 conn_key = share->hs_read_conn_keys[link_idx];
823#ifndef DBUG_OFF
824 spider_print_keys(conn_key, share->hs_read_conn_keys_lengths[link_idx]);
825#endif
826 } else {
827 conn_key = share->hs_write_conn_keys[link_idx];
828#ifndef DBUG_OFF
829 spider_print_keys(conn_key, share->hs_write_conn_keys_lengths[link_idx]);
830#endif
831 }
832#endif
833#ifdef SPIDER_HAS_HASH_VALUE_TYPE
834 if (
835#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
836 (conn_kind == SPIDER_CONN_KIND_MYSQL &&
837 (
838#endif
839 (another &&
840 !(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
841 &trx->trx_another_conn_hash,
842 share->conn_keys_hash_value[link_idx],
843 (uchar*) conn_key, share->conn_keys_lengths[link_idx]))) ||
844 (!another &&
845 !(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
846 &trx->trx_conn_hash,
847 share->conn_keys_hash_value[link_idx],
848 (uchar*) conn_key, share->conn_keys_lengths[link_idx])))
849#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
850 )
851 ) ||
852 (conn_kind == SPIDER_CONN_KIND_HS_READ &&
853 !(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
854 &trx->trx_hs_r_conn_hash,
855 share->hs_read_conn_keys_hash_value[link_idx],
856 (uchar*) conn_key, share->hs_read_conn_keys_lengths[link_idx]))
857 ) ||
858 (conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
859 !(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
860 &trx->trx_hs_w_conn_hash,
861 share->hs_write_conn_keys_hash_value[link_idx],
862 (uchar*) conn_key, share->hs_write_conn_keys_lengths[link_idx]))
863 )
864#endif
865 )
866#else
867 if (
868#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
869 (conn_kind == SPIDER_CONN_KIND_MYSQL &&
870 (
871#endif
872 (another &&
873 !(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_another_conn_hash,
874 (uchar*) conn_key, share->conn_keys_lengths[link_idx]))) ||
875 (!another &&
876 !(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_conn_hash,
877 (uchar*) conn_key, share->conn_keys_lengths[link_idx])))
878#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
879 )
880 ) ||
881 (conn_kind == SPIDER_CONN_KIND_HS_READ &&
882 !(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_hs_r_conn_hash,
883 (uchar*) conn_key, share->hs_read_conn_keys_lengths[link_idx]))
884 ) ||
885 (conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
886 !(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_hs_w_conn_hash,
887 (uchar*) conn_key, share->hs_write_conn_keys_lengths[link_idx]))
888 )
889#endif
890 )
891#endif
892 {
893 if (
894 !trx->thd ||
895#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
896 (conn_kind == SPIDER_CONN_KIND_MYSQL &&
897#endif
898 (
899 (spider_param_conn_recycle_mode(trx->thd) & 1) ||
900 spider_param_conn_recycle_strict(trx->thd)
901 )
902#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
903 ) ||
904 (conn_kind == SPIDER_CONN_KIND_HS_READ &&
905 (
906 (spider_param_hs_r_conn_recycle_mode(trx->thd) & 1) ||
907 spider_param_hs_r_conn_recycle_strict(trx->thd)
908 )
909 ) ||
910 (conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
911 (
912 (spider_param_hs_w_conn_recycle_mode(trx->thd) & 1) ||
913 spider_param_hs_w_conn_recycle_strict(trx->thd)
914 )
915 )
916#endif
917 ) {
918#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
919 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
920 {
921#endif
922 pthread_mutex_lock(&spider_conn_mutex);
923#ifdef SPIDER_HAS_HASH_VALUE_TYPE
924 if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
925 &spider_open_connections, share->conn_keys_hash_value[link_idx],
926 (uchar*) share->conn_keys[link_idx],
927 share->conn_keys_lengths[link_idx])))
928#else
929 if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_open_connections,
930 (uchar*) share->conn_keys[link_idx],
931 share->conn_keys_lengths[link_idx])))
932#endif
933 {
934 pthread_mutex_unlock(&spider_conn_mutex);
935 if (spider_param_max_connections())
936 { /* enable connection pool */
937 conn = spider_get_conn_from_idle_connection(share, link_idx, conn_key, spider, conn_kind, base_link_idx, error_num);
938 /* failed get conn, goto error */
939 if (!conn)
940 goto error;
941
942 }
943 else
944 { /* did not enable conncetion pool , create_conn */
945 DBUG_PRINT("info",("spider create new conn"));
946 if (!(conn = spider_create_conn(share, spider, link_idx,
947 base_link_idx, conn_kind, error_num)))
948 goto error;
949 *conn->conn_key = *conn_key;
950 if (spider)
951 {
952 spider->conns[base_link_idx] = conn;
953 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
954 conn->use_for_active_standby = TRUE;
955 }
956 }
957 } else {
958#ifdef HASH_UPDATE_WITH_HASH_VALUE
959 my_hash_delete_with_hash_value(&spider_open_connections,
960 conn->conn_key_hash_value, (uchar*) conn);
961#else
962 my_hash_delete(&spider_open_connections, (uchar*) conn);
963#endif
964 pthread_mutex_unlock(&spider_conn_mutex);
965 DBUG_PRINT("info",("spider get global conn"));
966 if (spider)
967 {
968 spider->conns[base_link_idx] = conn;
969 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
970 conn->use_for_active_standby = TRUE;
971 }
972 }
973#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
974 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
975 {
976 pthread_mutex_lock(&spider_hs_r_conn_mutex);
977#ifdef SPIDER_HAS_HASH_VALUE_TYPE
978 if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
979 &spider_hs_r_conn_hash,
980 share->hs_read_conn_keys_hash_value[link_idx],
981 (uchar*) share->hs_read_conn_keys[link_idx],
982 share->hs_read_conn_keys_lengths[link_idx])))
983#else
984 if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_hs_r_conn_hash,
985 (uchar*) share->hs_read_conn_keys[link_idx],
986 share->hs_read_conn_keys_lengths[link_idx])))
987#endif
988 {
989 pthread_mutex_unlock(&spider_hs_r_conn_mutex);
990 DBUG_PRINT("info",("spider create new hs r conn"));
991 if (!(conn = spider_create_conn(share, spider, link_idx,
992 base_link_idx, conn_kind, error_num)))
993 goto error;
994 *conn->conn_key = *conn_key;
995 if (spider)
996 {
997 spider->hs_r_conns[base_link_idx] = conn;
998 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
999 conn->use_for_active_standby = TRUE;
1000 }
1001 } else {
1002#ifdef HASH_UPDATE_WITH_HASH_VALUE
1003 my_hash_delete_with_hash_value(&spider_hs_r_conn_hash,
1004 conn->conn_key_hash_value, (uchar*) conn);
1005#else
1006 my_hash_delete(&spider_hs_r_conn_hash, (uchar*) conn);
1007#endif
1008 pthread_mutex_unlock(&spider_hs_r_conn_mutex);
1009 DBUG_PRINT("info",("spider get global hs r conn"));
1010 if (spider)
1011 {
1012 spider->hs_r_conns[base_link_idx] = conn;
1013 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
1014 conn->use_for_active_standby = TRUE;
1015 }
1016 }
1017 } else {
1018 pthread_mutex_lock(&spider_hs_w_conn_mutex);
1019#ifdef SPIDER_HAS_HASH_VALUE_TYPE
1020 if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
1021 &spider_hs_w_conn_hash,
1022 share->hs_write_conn_keys_hash_value[link_idx],
1023 (uchar*) share->hs_write_conn_keys[link_idx],
1024 share->hs_write_conn_keys_lengths[link_idx])))
1025#else
1026 if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_hs_w_conn_hash,
1027 (uchar*) share->hs_write_conn_keys[link_idx],
1028 share->hs_write_conn_keys_lengths[link_idx])))
1029#endif
1030 {
1031 pthread_mutex_unlock(&spider_hs_w_conn_mutex);
1032 DBUG_PRINT("info",("spider create new hs w conn"));
1033 if (!(conn = spider_create_conn(share, spider, link_idx,
1034 base_link_idx, conn_kind, error_num)))
1035 goto error;
1036 *conn->conn_key = *conn_key;
1037 if (spider)
1038 {
1039 spider->hs_w_conns[base_link_idx] = conn;
1040 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
1041 conn->use_for_active_standby = TRUE;
1042 }
1043 } else {
1044#ifdef HASH_UPDATE_WITH_HASH_VALUE
1045 my_hash_delete_with_hash_value(&spider_hs_w_conn_hash,
1046 conn->conn_key_hash_value, (uchar*) conn);
1047#else
1048 my_hash_delete(&spider_hs_w_conn_hash, (uchar*) conn);
1049#endif
1050 pthread_mutex_unlock(&spider_hs_w_conn_mutex);
1051 DBUG_PRINT("info",("spider get global hs w conn"));
1052 if (spider)
1053 {
1054 spider->hs_w_conns[base_link_idx] = conn;
1055 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
1056 conn->use_for_active_standby = TRUE;
1057 }
1058 }
1059 }
1060#endif
1061 } else {
1062 DBUG_PRINT("info",("spider create new conn"));
1063 /* conn_recycle_strict = 0 and conn_recycle_mode = 0 or 2 */
1064 if (!(conn = spider_create_conn(share, spider, link_idx, base_link_idx,
1065 conn_kind, error_num)))
1066 goto error;
1067 *conn->conn_key = *conn_key;
1068 if (spider)
1069 {
1070#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1071 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
1072 {
1073#endif
1074 spider->conns[base_link_idx] = conn;
1075#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1076 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
1077 {
1078 spider->hs_r_conns[base_link_idx] = conn;
1079 } else {
1080 spider->hs_w_conns[base_link_idx] = conn;
1081 }
1082#endif
1083 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
1084 conn->use_for_active_standby = TRUE;
1085 }
1086 }
1087 conn->thd = trx->thd;
1088 conn->priority = share->priority;
1089
1090#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1091 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
1092 {
1093#endif
1094 if (another)
1095 {
1096 uint old_elements = trx->trx_another_conn_hash.array.max_element;
1097#ifdef HASH_UPDATE_WITH_HASH_VALUE
1098 if (my_hash_insert_with_hash_value(&trx->trx_another_conn_hash,
1099 share->conn_keys_hash_value[link_idx],
1100 (uchar*) conn))
1101#else
1102 if (my_hash_insert(&trx->trx_another_conn_hash, (uchar*) conn))
1103#endif
1104 {
1105 spider_free_conn(conn);
1106 *error_num = HA_ERR_OUT_OF_MEM;
1107 goto error;
1108 }
1109 if (trx->trx_another_conn_hash.array.max_element > old_elements)
1110 {
1111 spider_alloc_calc_mem(spider_current_trx,
1112 trx->trx_another_conn_hash,
1113 (trx->trx_another_conn_hash.array.max_element - old_elements) *
1114 trx->trx_another_conn_hash.array.size_of_element);
1115 }
1116 } else {
1117 uint old_elements = trx->trx_conn_hash.array.max_element;
1118#ifdef HASH_UPDATE_WITH_HASH_VALUE
1119 if (my_hash_insert_with_hash_value(&trx->trx_conn_hash,
1120 share->conn_keys_hash_value[link_idx],
1121 (uchar*) conn))
1122#else
1123 if (my_hash_insert(&trx->trx_conn_hash, (uchar*) conn))
1124#endif
1125 {
1126 spider_free_conn(conn);
1127 *error_num = HA_ERR_OUT_OF_MEM;
1128 goto error;
1129 }
1130 if (trx->trx_conn_hash.array.max_element > old_elements)
1131 {
1132 spider_alloc_calc_mem(spider_current_trx,
1133 trx->trx_conn_hash,
1134 (trx->trx_conn_hash.array.max_element - old_elements) *
1135 trx->trx_conn_hash.array.size_of_element);
1136 }
1137 }
1138#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1139 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
1140 {
1141 uint old_elements = trx->trx_hs_r_conn_hash.array.max_element;
1142#ifdef HASH_UPDATE_WITH_HASH_VALUE
1143 if (my_hash_insert_with_hash_value(&trx->trx_hs_r_conn_hash,
1144 share->hs_read_conn_keys_hash_value[link_idx],
1145 (uchar*) conn))
1146#else
1147 if (my_hash_insert(&trx->trx_hs_r_conn_hash, (uchar*) conn))
1148#endif
1149 {
1150 spider_free_conn(conn);
1151 *error_num = HA_ERR_OUT_OF_MEM;
1152 goto error;
1153 }
1154 if (trx->trx_hs_r_conn_hash.array.max_element > old_elements)
1155 {
1156 spider_alloc_calc_mem(spider_current_trx,
1157 trx->trx_hs_r_conn_hash,
1158 (trx->trx_hs_r_conn_hash.array.max_element - old_elements) *
1159 trx->trx_hs_r_conn_hash.array.size_of_element);
1160 }
1161 } else {
1162 uint old_elements = trx->trx_hs_w_conn_hash.array.max_element;
1163#ifdef HASH_UPDATE_WITH_HASH_VALUE
1164 if (my_hash_insert_with_hash_value(&trx->trx_hs_w_conn_hash,
1165 share->hs_write_conn_keys_hash_value[link_idx],
1166 (uchar*) conn))
1167#else
1168 if (my_hash_insert(&trx->trx_hs_w_conn_hash, (uchar*) conn))
1169#endif
1170 {
1171 spider_free_conn(conn);
1172 *error_num = HA_ERR_OUT_OF_MEM;
1173 goto error;
1174 }
1175 if (trx->trx_hs_w_conn_hash.array.max_element > old_elements)
1176 {
1177 spider_alloc_calc_mem(spider_current_trx,
1178 trx->trx_hs_w_conn_hash,
1179 (trx->trx_hs_w_conn_hash.array.max_element - old_elements) *
1180 trx->trx_hs_w_conn_hash.array.size_of_element);
1181 }
1182 }
1183#endif
1184 } else if (spider)
1185 {
1186#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1187 if (conn_kind == SPIDER_CONN_KIND_MYSQL)
1188 {
1189#endif
1190 spider->conns[base_link_idx] = conn;
1191#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1192 } else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
1193 {
1194 spider->hs_r_conns[base_link_idx] = conn;
1195 } else {
1196 spider->hs_w_conns[base_link_idx] = conn;
1197 }
1198#endif
1199 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
1200 conn->use_for_active_standby = TRUE;
1201 }
1202 conn->link_idx = base_link_idx;
1203
1204 if (conn->queued_connect)
1205 spider_conn_queue_connect_rewrite(share, conn, link_idx);
1206
1207 if (conn->queued_ping)
1208 {
1209 if (spider)
1210 spider_conn_queue_ping_rewrite(spider, conn, base_link_idx);
1211 else
1212 conn->queued_ping = FALSE;
1213 }
1214
1215 DBUG_PRINT("info",("spider conn=%p", conn));
1216 DBUG_RETURN(conn);
1217
1218error:
1219 DBUG_RETURN(NULL);
1220}
1221
1222int spider_free_conn(
1223 SPIDER_CONN *conn
1224) {
1225 DBUG_ENTER("spider_free_conn");
1226 DBUG_PRINT("info", ("spider conn=%p", conn));
1227 SPIDER_IP_PORT_CONN* ip_port_conn = conn->ip_port_conn;
1228 if (ip_port_conn)
1229 { /* free conn, ip_port_count-- */
1230 pthread_mutex_lock(&ip_port_conn->mutex);
1231 if (ip_port_conn->ip_port_count > 0)
1232 ip_port_conn->ip_port_count--;
1233 pthread_mutex_unlock(&ip_port_conn->mutex);
1234 }
1235 spider_free_conn_alloc(conn);
1236 spider_free(spider_current_trx, conn, MYF(0));
1237 DBUG_RETURN(0);
1238}
1239
1240int spider_check_and_get_casual_read_conn(
1241 THD *thd,
1242 ha_spider *spider,
1243 int link_idx
1244) {
1245 int error_num;
1246 DBUG_ENTER("spider_check_and_get_casual_read_conn");
1247 if (spider->result_list.casual_read[link_idx])
1248 {
1249 SPIDER_CONN *conn = spider->conns[link_idx];
1250 if (conn->casual_read_query_id != thd->query_id)
1251 {
1252 conn->casual_read_query_id = thd->query_id;
1253 conn->casual_read_current_id = 2;
1254 }
1255 if (spider->result_list.casual_read[link_idx] == 1)
1256 {
1257 spider->result_list.casual_read[link_idx] = conn->casual_read_current_id;
1258 ++conn->casual_read_current_id;
1259 if (conn->casual_read_current_id > 63)
1260 {
1261 conn->casual_read_current_id = 2;
1262 }
1263 }
1264 char first_byte_bak = *spider->conn_keys[link_idx];
1265 *spider->conn_keys[link_idx] =
1266 '0' + spider->result_list.casual_read[link_idx];
1267 if (
1268 !(spider->conns[link_idx] =
1269 spider_get_conn(spider->share, link_idx,
1270 spider->conn_keys[link_idx], spider->trx,
1271 spider, FALSE, TRUE, SPIDER_CONN_KIND_MYSQL,
1272 &error_num))
1273 ) {
1274 *spider->conn_keys[link_idx] = first_byte_bak;
1275 DBUG_RETURN(error_num);
1276 }
1277 *spider->conn_keys[link_idx] = first_byte_bak;
1278 spider->conns[link_idx]->casual_read_base_conn = conn;
1279 conn = spider->conns[link_idx];
1280 spider_check_and_set_autocommit(thd, conn, NULL);
1281 }
1282 DBUG_RETURN(0);
1283}
1284
1285int spider_check_and_init_casual_read(
1286 THD *thd,
1287 ha_spider *spider,
1288 int link_idx
1289) {
1290 int error_num;
1291 SPIDER_RESULT_LIST *result_list = &spider->result_list;
1292 SPIDER_SHARE *share = spider->share;
1293 DBUG_ENTER("spider_check_and_init_casual_read");
1294 if (
1295 spider_param_sync_autocommit(thd) &&
1296 (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
1297 (
1298 result_list->direct_order_limit
1299#ifdef HANDLER_HAS_DIRECT_AGGREGATE
1300 || result_list->direct_aggregate
1301#endif
1302 )
1303 ) {
1304 if (!result_list->casual_read[link_idx])
1305 {
1306 result_list->casual_read[link_idx] =
1307 spider_param_casual_read(thd, share->casual_read);
1308 }
1309 if ((error_num = spider_check_and_get_casual_read_conn(thd, spider,
1310 link_idx)))
1311 {
1312 DBUG_RETURN(error_num);
1313 }
1314 SPIDER_CONN *conn = spider->conns[link_idx];
1315 if (
1316 conn->casual_read_base_conn &&
1317 (error_num = spider_create_conn_thread(conn))
1318 ) {
1319 DBUG_RETURN(error_num);
1320 }
1321 }
1322 DBUG_RETURN(0);
1323}
1324
1325void spider_conn_queue_connect(
1326 SPIDER_SHARE *share,
1327 SPIDER_CONN *conn,
1328 int link_idx
1329) {
1330 DBUG_ENTER("spider_conn_queue_connect");
1331 DBUG_PRINT("info", ("spider conn=%p", conn));
1332 conn->queued_connect = TRUE;
1333/*
1334 conn->queued_connect_share = share;
1335 conn->queued_connect_link_idx = link_idx;
1336*/
1337 DBUG_VOID_RETURN;
1338}
1339
1340void spider_conn_queue_connect_rewrite(
1341 SPIDER_SHARE *share,
1342 SPIDER_CONN *conn,
1343 int link_idx
1344) {
1345 DBUG_ENTER("spider_conn_queue_connect_rewrite");
1346 DBUG_PRINT("info", ("spider conn=%p", conn));
1347 conn->queued_connect_share = share;
1348 conn->queued_connect_link_idx = link_idx;
1349 DBUG_VOID_RETURN;
1350}
1351
1352void spider_conn_queue_ping(
1353 ha_spider *spider,
1354 SPIDER_CONN *conn,
1355 int link_idx
1356) {
1357 DBUG_ENTER("spider_conn_queue_ping");
1358 DBUG_PRINT("info", ("spider conn=%p", conn));
1359 conn->queued_ping = TRUE;
1360 conn->queued_ping_spider = spider;
1361 conn->queued_ping_link_idx = link_idx;
1362 DBUG_VOID_RETURN;
1363}
1364
1365void spider_conn_queue_ping_rewrite(
1366 ha_spider *spider,
1367 SPIDER_CONN *conn,
1368 int link_idx
1369) {
1370 DBUG_ENTER("spider_conn_queue_ping_rewrite");
1371 DBUG_PRINT("info", ("spider conn=%p", conn));
1372 conn->queued_ping_spider = spider;
1373 conn->queued_ping_link_idx = link_idx;
1374 DBUG_VOID_RETURN;
1375}
1376
1377void spider_conn_queue_trx_isolation(
1378 SPIDER_CONN *conn,
1379 int trx_isolation
1380) {
1381 DBUG_ENTER("spider_conn_queue_trx_isolation");
1382 DBUG_PRINT("info", ("spider conn=%p", conn));
1383 conn->queued_trx_isolation = TRUE;
1384 conn->queued_trx_isolation_val = trx_isolation;
1385 DBUG_VOID_RETURN;
1386}
1387
1388void spider_conn_queue_semi_trx_isolation(
1389 SPIDER_CONN *conn,
1390 int trx_isolation
1391) {
1392 DBUG_ENTER("spider_conn_queue_semi_trx_isolation");
1393 DBUG_PRINT("info", ("spider conn=%p", conn));
1394 conn->queued_semi_trx_isolation = TRUE;
1395 conn->queued_semi_trx_isolation_val = trx_isolation;
1396 DBUG_VOID_RETURN;
1397}
1398
1399void spider_conn_queue_autocommit(
1400 SPIDER_CONN *conn,
1401 bool autocommit
1402) {
1403 DBUG_ENTER("spider_conn_queue_autocommit");
1404 DBUG_PRINT("info", ("spider conn=%p", conn));
1405 conn->queued_autocommit = TRUE;
1406 conn->queued_autocommit_val = autocommit;
1407 DBUG_VOID_RETURN;
1408}
1409
1410void spider_conn_queue_sql_log_off(
1411 SPIDER_CONN *conn,
1412 bool sql_log_off
1413) {
1414 DBUG_ENTER("spider_conn_queue_sql_log_off");
1415 DBUG_PRINT("info", ("spider conn=%p", conn));
1416 conn->queued_sql_log_off = TRUE;
1417 conn->queued_sql_log_off_val = sql_log_off;
1418 DBUG_VOID_RETURN;
1419}
1420
1421void spider_conn_queue_time_zone(
1422 SPIDER_CONN *conn,
1423 Time_zone *time_zone
1424) {
1425 DBUG_ENTER("spider_conn_queue_time_zone");
1426 DBUG_PRINT("info", ("spider conn=%p", conn));
1427 conn->queued_time_zone = TRUE;
1428 conn->queued_time_zone_val = time_zone;
1429 DBUG_VOID_RETURN;
1430}
1431
1432void spider_conn_queue_start_transaction(
1433 SPIDER_CONN *conn
1434) {
1435 DBUG_ENTER("spider_conn_queue_start_transaction");
1436 DBUG_PRINT("info", ("spider conn=%p", conn));
1437 DBUG_ASSERT(!conn->trx_start);
1438 conn->queued_trx_start = TRUE;
1439 conn->trx_start = TRUE;
1440 DBUG_VOID_RETURN;
1441}
1442
1443void spider_conn_queue_xa_start(
1444 SPIDER_CONN *conn,
1445 XID *xid
1446) {
1447 DBUG_ENTER("spider_conn_queue_xa_start");
1448 DBUG_PRINT("info", ("spider conn=%p", conn));
1449 conn->queued_xa_start = TRUE;
1450 conn->queued_xa_start_xid = xid;
1451 DBUG_VOID_RETURN;
1452}
1453
1454void spider_conn_clear_queue(
1455 SPIDER_CONN *conn
1456) {
1457 DBUG_ENTER("spider_conn_clear_queue");
1458 DBUG_PRINT("info", ("spider conn=%p", conn));
1459/*
1460 conn->queued_connect = FALSE;
1461 conn->queued_ping = FALSE;
1462*/
1463 conn->queued_trx_isolation = FALSE;
1464 conn->queued_semi_trx_isolation = FALSE;
1465 conn->queued_autocommit = FALSE;
1466 conn->queued_sql_log_off = FALSE;
1467 conn->queued_time_zone = FALSE;
1468 conn->queued_trx_start = FALSE;
1469 conn->queued_xa_start = FALSE;
1470 DBUG_VOID_RETURN;
1471}
1472
1473void spider_conn_clear_queue_at_commit(
1474 SPIDER_CONN *conn
1475) {
1476 DBUG_ENTER("spider_conn_clear_queue_at_commit");
1477 DBUG_PRINT("info", ("spider conn=%p", conn));
1478 if (conn->queued_trx_start)
1479 {
1480 conn->queued_trx_start = FALSE;
1481 conn->trx_start = FALSE;
1482 }
1483 conn->queued_xa_start = FALSE;
1484 DBUG_VOID_RETURN;
1485}
1486
1487void spider_conn_set_timeout(
1488 SPIDER_CONN *conn,
1489 uint net_read_timeout,
1490 uint net_write_timeout
1491) {
1492 DBUG_ENTER("spider_conn_set_timeout");
1493 DBUG_PRINT("info", ("spider conn=%p", conn));
1494 if (net_read_timeout != conn->net_read_timeout)
1495 {
1496 DBUG_PRINT("info",("spider net_read_timeout set from %u to %u",
1497 conn->net_read_timeout, net_read_timeout));
1498 conn->queued_net_timeout = TRUE;
1499 conn->net_read_timeout = net_read_timeout;
1500 }
1501 if (net_write_timeout != conn->net_write_timeout)
1502 {
1503 DBUG_PRINT("info",("spider net_write_timeout set from %u to %u",
1504 conn->net_write_timeout, net_write_timeout));
1505 conn->queued_net_timeout = TRUE;
1506 conn->net_write_timeout = net_write_timeout;
1507 }
1508 DBUG_VOID_RETURN;
1509}
1510
1511void spider_conn_set_timeout_from_share(
1512 SPIDER_CONN *conn,
1513 int link_idx,
1514 THD *thd,
1515 SPIDER_SHARE *share
1516) {
1517 DBUG_ENTER("spider_conn_set_timeout_from_share");
1518 spider_conn_set_timeout(
1519 conn,
1520 spider_param_net_read_timeout(thd, share->net_read_timeouts[link_idx]),
1521 spider_param_net_write_timeout(thd, share->net_write_timeouts[link_idx])
1522 );
1523 DBUG_VOID_RETURN;
1524}
1525
1526void spider_conn_set_timeout_from_direct_sql(
1527 SPIDER_CONN *conn,
1528 THD *thd,
1529 SPIDER_DIRECT_SQL *direct_sql
1530) {
1531 DBUG_ENTER("spider_conn_set_timeout_from_direct_sql");
1532 spider_conn_set_timeout(
1533 conn,
1534 spider_param_net_read_timeout(thd, direct_sql->net_read_timeout),
1535 spider_param_net_write_timeout(thd, direct_sql->net_write_timeout)
1536 );
1537 DBUG_VOID_RETURN;
1538}
1539
1540void spider_tree_insert(
1541 SPIDER_CONN *top,
1542 SPIDER_CONN *conn
1543) {
1544 SPIDER_CONN *current = top;
1545 longlong priority = conn->priority;
1546 DBUG_ENTER("spider_tree_insert");
1547 while (TRUE)
1548 {
1549 if (priority < current->priority)
1550 {
1551 if (current->c_small == NULL)
1552 {
1553 conn->p_small = NULL;
1554 conn->p_big = current;
1555 conn->c_small = NULL;
1556 conn->c_big = NULL;
1557 current->c_small = conn;
1558 break;
1559 } else
1560 current = current->c_small;
1561 } else {
1562 if (current->c_big == NULL)
1563 {
1564 conn->p_small = current;
1565 conn->p_big = NULL;
1566 conn->c_small = NULL;
1567 conn->c_big = NULL;
1568 current->c_big = conn;
1569 break;
1570 } else
1571 current = current->c_big;
1572 }
1573 }
1574 DBUG_VOID_RETURN;
1575}
1576
1577SPIDER_CONN *spider_tree_first(
1578 SPIDER_CONN *top
1579) {
1580 SPIDER_CONN *current = top;
1581 DBUG_ENTER("spider_tree_first");
1582 while (current)
1583 {
1584 if (current->c_small == NULL)
1585 break;
1586 else
1587 current = current->c_small;
1588 }
1589 DBUG_RETURN(current);
1590}
1591
1592SPIDER_CONN *spider_tree_last(
1593 SPIDER_CONN *top
1594) {
1595 SPIDER_CONN *current = top;
1596 DBUG_ENTER("spider_tree_last");
1597 while (TRUE)
1598 {
1599 if (current->c_big == NULL)
1600 break;
1601 else
1602 current = current->c_big;
1603 }
1604 DBUG_RETURN(current);
1605}
1606
1607SPIDER_CONN *spider_tree_next(
1608 SPIDER_CONN *current
1609) {
1610 DBUG_ENTER("spider_tree_next");
1611 if (current->c_big)
1612 DBUG_RETURN(spider_tree_first(current->c_big));
1613 while (TRUE)
1614 {
1615 if (current->p_big)
1616 DBUG_RETURN(current->p_big);
1617 if (!current->p_small)
1618 DBUG_RETURN(NULL);
1619 current = current->p_small;
1620 }
1621}
1622
1623SPIDER_CONN *spider_tree_delete(
1624 SPIDER_CONN *conn,
1625 SPIDER_CONN *top
1626) {
1627 DBUG_ENTER("spider_tree_delete");
1628 if (conn->p_small)
1629 {
1630 if (conn->c_small)
1631 {
1632 conn->c_small->p_big = NULL;
1633 conn->c_small->p_small = conn->p_small;
1634 conn->p_small->c_big = conn->c_small;
1635 if (conn->c_big)
1636 {
1637 SPIDER_CONN *last = spider_tree_last(conn->c_small);
1638 conn->c_big->p_small = last;
1639 last->c_big = conn->c_big;
1640 }
1641 } else if (conn->c_big)
1642 {
1643 conn->c_big->p_small = conn->p_small;
1644 conn->p_small->c_big = conn->c_big;
1645 } else
1646 conn->p_small->c_big = NULL;
1647 } else if (conn->p_big)
1648 {
1649 if (conn->c_small)
1650 {
1651 conn->c_small->p_big = conn->p_big;
1652 conn->p_big->c_small = conn->c_small;
1653 if (conn->c_big)
1654 {
1655 SPIDER_CONN *last = spider_tree_last(conn->c_small);
1656 conn->c_big->p_small = last;
1657 last->c_big = conn->c_big;
1658 }
1659 } else if (conn->c_big)
1660 {
1661 conn->c_big->p_big = conn->p_big;
1662 conn->c_big->p_small = NULL;
1663 conn->p_big->c_small = conn->c_big;
1664 } else
1665 conn->p_big->c_small = NULL;
1666 } else {
1667 if (conn->c_small)
1668 {
1669 conn->c_small->p_big = NULL;
1670 conn->c_small->p_small = NULL;
1671 if (conn->c_big)
1672 {
1673 SPIDER_CONN *last = spider_tree_last(conn->c_small);
1674 conn->c_big->p_small = last;
1675 last->c_big = conn->c_big;
1676 }
1677 DBUG_RETURN(conn->c_small);
1678 } else if (conn->c_big)
1679 {
1680 conn->c_big->p_small = NULL;
1681 DBUG_RETURN(conn->c_big);
1682 }
1683 DBUG_RETURN(NULL);
1684 }
1685 DBUG_RETURN(top);
1686}
1687
1688#ifndef WITHOUT_SPIDER_BG_SEARCH
1689int spider_set_conn_bg_param(
1690 ha_spider *spider
1691) {
1692 int error_num, roop_count, bgs_mode;
1693 SPIDER_SHARE *share = spider->share;
1694 SPIDER_RESULT_LIST *result_list = &spider->result_list;
1695 THD *thd = spider->trx->thd;
1696 DBUG_ENTER("spider_set_conn_bg_param");
1697 DBUG_PRINT("info",("spider spider=%p", spider));
1698 bgs_mode =
1699 spider_param_bgs_mode(thd, share->bgs_mode);
1700 if (bgs_mode == 0)
1701 result_list->bgs_phase = 0;
1702 else if (
1703 bgs_mode <= 2 &&
1704 (result_list->lock_type == F_WRLCK || spider->lock_mode == 2)
1705 )
1706 result_list->bgs_phase = 0;
1707 else if (bgs_mode <= 1 && spider->lock_mode == 1)
1708 result_list->bgs_phase = 0;
1709 else {
1710 result_list->bgs_phase = 1;
1711
1712 result_list->bgs_split_read = spider_bg_split_read_param(spider);
1713 if (spider->use_pre_call)
1714 {
1715 DBUG_PRINT("info",("spider use_pre_call=TRUE"));
1716 result_list->bgs_first_read = result_list->bgs_split_read;
1717 result_list->bgs_second_read = result_list->bgs_split_read;
1718 } else {
1719 DBUG_PRINT("info",("spider use_pre_call=FALSE"));
1720 result_list->bgs_first_read =
1721 spider_param_bgs_first_read(thd, share->bgs_first_read);
1722 result_list->bgs_second_read =
1723 spider_param_bgs_second_read(thd, share->bgs_second_read);
1724 }
1725 DBUG_PRINT("info",("spider bgs_split_read=%lld",
1726 result_list->bgs_split_read));
1727 DBUG_PRINT("info",("spider bgs_first_read=%lld", share->bgs_first_read));
1728 DBUG_PRINT("info",("spider bgs_second_read=%lld", share->bgs_second_read));
1729
1730 result_list->split_read =
1731 result_list->bgs_first_read > 0 ?
1732 result_list->bgs_first_read :
1733 result_list->bgs_split_read;
1734 }
1735
1736 if (result_list->bgs_phase > 0)
1737 {
1738#ifdef SPIDER_HAS_GROUP_BY_HANDLER
1739 if (spider->use_fields)
1740 {
1741 SPIDER_LINK_IDX_CHAIN *link_idx_chain;
1742 spider_fields *fields = spider->fields;
1743 fields->set_pos_to_first_link_idx_chain();
1744 while ((link_idx_chain = fields->get_next_link_idx_chain()))
1745 {
1746 if ((error_num = spider_create_conn_thread(link_idx_chain->conn)))
1747 DBUG_RETURN(error_num);
1748 }
1749 } else {
1750#endif
1751 for (
1752 roop_count = spider_conn_link_idx_next(share->link_statuses,
1753 spider->conn_link_idx, -1, share->link_count,
1754 spider->lock_mode ?
1755 SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK);
1756 roop_count < (int) share->link_count;
1757 roop_count = spider_conn_link_idx_next(share->link_statuses,
1758 spider->conn_link_idx, roop_count, share->link_count,
1759 spider->lock_mode ?
1760 SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK)
1761 ) {
1762 if ((error_num = spider_create_conn_thread(spider->conns[roop_count])))
1763 DBUG_RETURN(error_num);
1764#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
1765 if ((error_num = spider_create_conn_thread(
1766 spider->hs_r_conns[roop_count])))
1767 DBUG_RETURN(error_num);
1768 if ((error_num = spider_create_conn_thread(
1769 spider->hs_w_conns[roop_count])))
1770 DBUG_RETURN(error_num);
1771#endif
1772 }
1773#ifdef SPIDER_HAS_GROUP_BY_HANDLER
1774 }
1775#endif
1776 }
1777 DBUG_RETURN(0);
1778}
1779
1780int spider_create_conn_thread(
1781 SPIDER_CONN *conn
1782) {
1783 int error_num;
1784 DBUG_ENTER("spider_create_conn_thread");
1785 if (conn && !conn->bg_init)
1786 {
1787#if MYSQL_VERSION_ID < 50500
1788 if (pthread_mutex_init(&conn->bg_conn_chain_mutex, MY_MUTEX_INIT_FAST))
1789#else
1790 if (mysql_mutex_init(spd_key_mutex_bg_conn_chain,
1791 &conn->bg_conn_chain_mutex, MY_MUTEX_INIT_FAST))
1792#endif
1793 {
1794 error_num = HA_ERR_OUT_OF_MEM;
1795 goto error_chain_mutex_init;
1796 }
1797 conn->bg_conn_chain_mutex_ptr = NULL;
1798#if MYSQL_VERSION_ID < 50500
1799 if (pthread_mutex_init(&conn->bg_conn_sync_mutex, MY_MUTEX_INIT_FAST))
1800#else
1801 if (mysql_mutex_init(spd_key_mutex_bg_conn_sync,
1802 &conn->bg_conn_sync_mutex, MY_MUTEX_INIT_FAST))
1803#endif
1804 {
1805 error_num = HA_ERR_OUT_OF_MEM;
1806 goto error_sync_mutex_init;
1807 }
1808#if MYSQL_VERSION_ID < 50500
1809 if (pthread_mutex_init(&conn->bg_conn_mutex, MY_MUTEX_INIT_FAST))
1810#else
1811 if (mysql_mutex_init(spd_key_mutex_bg_conn, &conn->bg_conn_mutex,
1812 MY_MUTEX_INIT_FAST))
1813#endif
1814 {
1815 error_num = HA_ERR_OUT_OF_MEM;
1816 goto error_mutex_init;
1817 }
1818#if MYSQL_VERSION_ID < 50500
1819 if (pthread_mutex_init(&conn->bg_job_stack_mutex, MY_MUTEX_INIT_FAST))
1820#else
1821 if (mysql_mutex_init(spd_key_mutex_bg_job_stack, &conn->bg_job_stack_mutex,
1822 MY_MUTEX_INIT_FAST))
1823#endif
1824 {
1825 error_num = HA_ERR_OUT_OF_MEM;
1826 goto error_job_stack_mutex_init;
1827 }
1828 if (SPD_INIT_DYNAMIC_ARRAY2(&conn->bg_job_stack, sizeof(void *), NULL, 16,
1829 16, MYF(MY_WME)))
1830 {
1831 error_num = HA_ERR_OUT_OF_MEM;
1832 goto error_job_stack_init;
1833 }
1834 spider_alloc_calc_mem_init(conn->bg_job_stack, 163);
1835 spider_alloc_calc_mem(spider_current_trx,
1836 conn->bg_job_stack,
1837 conn->bg_job_stack.max_element *
1838 conn->bg_job_stack.size_of_element);
1839 conn->bg_job_stack_cur_pos = 0;
1840#if MYSQL_VERSION_ID < 50500
1841 if (pthread_cond_init(&conn->bg_conn_sync_cond, NULL))
1842#else
1843 if (mysql_cond_init(spd_key_cond_bg_conn_sync,
1844 &conn->bg_conn_sync_cond, NULL))
1845#endif
1846 {
1847 error_num = HA_ERR_OUT_OF_MEM;
1848 goto error_sync_cond_init;
1849 }
1850#if MYSQL_VERSION_ID < 50500
1851 if (pthread_cond_init(&conn->bg_conn_cond, NULL))
1852#else
1853 if (mysql_cond_init(spd_key_cond_bg_conn,
1854 &conn->bg_conn_cond, NULL))
1855#endif
1856 {
1857 error_num = HA_ERR_OUT_OF_MEM;
1858 goto error_cond_init;
1859 }
1860 pthread_mutex_lock(&conn->bg_conn_mutex);
1861#if MYSQL_VERSION_ID < 50500
1862 if (pthread_create(&conn->bg_thread, &spider_pt_attr,
1863 spider_bg_conn_action, (void *) conn)
1864 )
1865#else
1866 if (mysql_thread_create(spd_key_thd_bg, &conn->bg_thread,
1867 &spider_pt_attr, spider_bg_conn_action, (void *) conn)
1868 )
1869#endif
1870 {
1871 pthread_mutex_unlock(&conn->bg_conn_mutex);
1872 error_num = HA_ERR_OUT_OF_MEM;
1873 goto error_thread_create;
1874 }
1875 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
1876 pthread_mutex_unlock(&conn->bg_conn_mutex);
1877 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
1878 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
1879 if (!conn->bg_init)
1880 {
1881 error_num = HA_ERR_OUT_OF_MEM;
1882 goto error_thread_create;
1883 }
1884 }
1885 DBUG_RETURN(0);
1886
1887error_thread_create:
1888 pthread_cond_destroy(&conn->bg_conn_cond);
1889error_cond_init:
1890 pthread_cond_destroy(&conn->bg_conn_sync_cond);
1891error_sync_cond_init:
1892 spider_free_mem_calc(spider_current_trx,
1893 conn->bg_job_stack_id,
1894 conn->bg_job_stack.max_element *
1895 conn->bg_job_stack.size_of_element);
1896 delete_dynamic(&conn->bg_job_stack);
1897error_job_stack_init:
1898 pthread_mutex_destroy(&conn->bg_job_stack_mutex);
1899error_job_stack_mutex_init:
1900 pthread_mutex_destroy(&conn->bg_conn_mutex);
1901error_mutex_init:
1902 pthread_mutex_destroy(&conn->bg_conn_sync_mutex);
1903error_sync_mutex_init:
1904 pthread_mutex_destroy(&conn->bg_conn_chain_mutex);
1905error_chain_mutex_init:
1906 DBUG_RETURN(error_num);
1907}
1908
1909void spider_free_conn_thread(
1910 SPIDER_CONN *conn
1911) {
1912 DBUG_ENTER("spider_free_conn_thread");
1913 if (conn->bg_init)
1914 {
1915 spider_bg_conn_break(conn, NULL);
1916 pthread_mutex_lock(&conn->bg_conn_mutex);
1917 conn->bg_kill = TRUE;
1918 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
1919 pthread_cond_signal(&conn->bg_conn_cond);
1920 pthread_mutex_unlock(&conn->bg_conn_mutex);
1921 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
1922 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
1923 pthread_join(conn->bg_thread, NULL);
1924 pthread_cond_destroy(&conn->bg_conn_cond);
1925 pthread_cond_destroy(&conn->bg_conn_sync_cond);
1926 spider_free_mem_calc(spider_current_trx,
1927 conn->bg_job_stack_id,
1928 conn->bg_job_stack.max_element *
1929 conn->bg_job_stack.size_of_element);
1930 delete_dynamic(&conn->bg_job_stack);
1931 pthread_mutex_destroy(&conn->bg_job_stack_mutex);
1932 pthread_mutex_destroy(&conn->bg_conn_mutex);
1933 pthread_mutex_destroy(&conn->bg_conn_sync_mutex);
1934 pthread_mutex_destroy(&conn->bg_conn_chain_mutex);
1935 conn->bg_kill = FALSE;
1936 conn->bg_init = FALSE;
1937 }
1938 DBUG_VOID_RETURN;
1939}
1940
1941void spider_bg_conn_wait(
1942 SPIDER_CONN *conn
1943) {
1944 DBUG_ENTER("spider_bg_conn_wait");
1945 if (conn->bg_init)
1946 {
1947 pthread_mutex_lock(&conn->bg_conn_mutex);
1948 pthread_mutex_unlock(&conn->bg_conn_mutex);
1949 }
1950 DBUG_VOID_RETURN;
1951}
1952
1953void spider_bg_all_conn_wait(
1954 ha_spider *spider
1955) {
1956 int roop_count;
1957 SPIDER_CONN *conn;
1958 SPIDER_SHARE *share = spider->share;
1959 SPIDER_RESULT_LIST *result_list = &spider->result_list;
1960 DBUG_ENTER("spider_bg_all_conn_wait");
1961 for (
1962 roop_count = spider_conn_link_idx_next(share->link_statuses,
1963 spider->conn_link_idx, -1, share->link_count,
1964 SPIDER_LINK_STATUS_RECOVERY);
1965 roop_count < (int) share->link_count;
1966 roop_count = spider_conn_link_idx_next(share->link_statuses,
1967 spider->conn_link_idx, roop_count, share->link_count,
1968 SPIDER_LINK_STATUS_RECOVERY)
1969 ) {
1970 conn = spider->conns[roop_count];
1971#ifndef WITHOUT_SPIDER_BG_SEARCH
1972 if (conn && result_list->bgs_working)
1973 spider_bg_conn_wait(conn);
1974#endif
1975 }
1976 DBUG_VOID_RETURN;
1977}
1978
1979int spider_bg_all_conn_pre_next(
1980 ha_spider *spider,
1981 int link_idx
1982) {
1983#ifndef WITHOUT_SPIDER_BG_SEARCH
1984 int roop_start, roop_end, roop_count, lock_mode, link_ok, error_num;
1985 SPIDER_RESULT_LIST *result_list = &spider->result_list;
1986 SPIDER_SHARE *share = spider->share;
1987#endif
1988 DBUG_ENTER("spider_bg_all_conn_pre_next");
1989#ifndef WITHOUT_SPIDER_BG_SEARCH
1990 if (result_list->bgs_phase > 0)
1991 {
1992 lock_mode = spider_conn_lock_mode(spider);
1993 if (lock_mode)
1994 {
1995 /* "for update" or "lock in share mode" */
1996 link_ok = spider_conn_link_idx_next(share->link_statuses,
1997 spider->conn_link_idx, -1, share->link_count,
1998 SPIDER_LINK_STATUS_OK);
1999 roop_start = spider_conn_link_idx_next(share->link_statuses,
2000 spider->conn_link_idx, -1, share->link_count,
2001 SPIDER_LINK_STATUS_RECOVERY);
2002 roop_end = spider->share->link_count;
2003 } else {
2004 link_ok = link_idx;
2005 roop_start = link_idx;
2006 roop_end = link_idx + 1;
2007 }
2008
2009 for (roop_count = roop_start; roop_count < roop_end;
2010 roop_count = spider_conn_link_idx_next(share->link_statuses,
2011 spider->conn_link_idx, roop_count, share->link_count,
2012 SPIDER_LINK_STATUS_RECOVERY)
2013 ) {
2014 if ((error_num = spider_bg_conn_search(spider, roop_count, roop_start,
2015 TRUE, TRUE, (roop_count != link_ok))))
2016 DBUG_RETURN(error_num);
2017 }
2018 }
2019#endif
2020 DBUG_RETURN(0);
2021}
2022
2023void spider_bg_conn_break(
2024 SPIDER_CONN *conn,
2025 ha_spider *spider
2026) {
2027 DBUG_ENTER("spider_bg_conn_break");
2028 if (
2029 conn->bg_init &&
2030 conn->bg_thd != current_thd &&
2031 (
2032 !spider ||
2033 (
2034 spider->result_list.bgs_working &&
2035 conn->bg_target == spider
2036 )
2037 )
2038 ) {
2039 conn->bg_break = TRUE;
2040 pthread_mutex_lock(&conn->bg_conn_mutex);
2041 pthread_mutex_unlock(&conn->bg_conn_mutex);
2042 conn->bg_break = FALSE;
2043 }
2044 DBUG_VOID_RETURN;
2045}
2046
2047void spider_bg_all_conn_break(
2048 ha_spider *spider
2049) {
2050 int roop_count;
2051 SPIDER_CONN *conn;
2052 SPIDER_SHARE *share = spider->share;
2053 SPIDER_RESULT_LIST *result_list = &spider->result_list;
2054 DBUG_ENTER("spider_bg_all_conn_break");
2055 for (
2056 roop_count = spider_conn_link_idx_next(share->link_statuses,
2057 spider->conn_link_idx, -1, share->link_count,
2058 SPIDER_LINK_STATUS_RECOVERY);
2059 roop_count < (int) share->link_count;
2060 roop_count = spider_conn_link_idx_next(share->link_statuses,
2061 spider->conn_link_idx, roop_count, share->link_count,
2062 SPIDER_LINK_STATUS_RECOVERY)
2063 ) {
2064 conn = spider->conns[roop_count];
2065#ifndef WITHOUT_SPIDER_BG_SEARCH
2066 if (conn && result_list->bgs_working)
2067 spider_bg_conn_break(conn, spider);
2068#endif
2069 if (spider->quick_targets[roop_count])
2070 {
2071 DBUG_ASSERT(spider->quick_targets[roop_count] == conn->quick_target);
2072 DBUG_PRINT("info", ("spider conn[%p]->quick_target=NULL", conn));
2073 conn->quick_target = NULL;
2074 spider->quick_targets[roop_count] = NULL;
2075 }
2076 }
2077 DBUG_VOID_RETURN;
2078}
2079
2080bool spider_bg_conn_get_job(
2081 SPIDER_CONN *conn
2082) {
2083 DBUG_ENTER("spider_bg_conn_get_job");
2084 pthread_mutex_lock(&conn->bg_job_stack_mutex);
2085 if (conn->bg_job_stack_cur_pos >= conn->bg_job_stack.elements)
2086 {
2087 DBUG_PRINT("info",("spider bg all jobs are completed"));
2088 conn->bg_get_job_stack_off = FALSE;
2089 pthread_mutex_unlock(&conn->bg_job_stack_mutex);
2090 DBUG_RETURN(FALSE);
2091 }
2092 DBUG_PRINT("info",("spider bg get job %u",
2093 conn->bg_job_stack_cur_pos));
2094 conn->bg_target = ((void **) (conn->bg_job_stack.buffer +
2095 conn->bg_job_stack.size_of_element * conn->bg_job_stack_cur_pos))[0];
2096 conn->bg_job_stack_cur_pos++;
2097 if (conn->bg_job_stack_cur_pos == conn->bg_job_stack.elements)
2098 {
2099 DBUG_PRINT("info",("spider bg shift job stack"));
2100 conn->bg_job_stack_cur_pos = 0;
2101 conn->bg_job_stack.elements = 0;
2102 }
2103 pthread_mutex_unlock(&conn->bg_job_stack_mutex);
2104 DBUG_RETURN(TRUE);
2105}
2106
2107int spider_bg_conn_search(
2108 ha_spider *spider,
2109 int link_idx,
2110 int first_link_idx,
2111 bool first,
2112 bool pre_next,
2113 bool discard_result
2114) {
2115 int error_num;
2116 SPIDER_CONN *conn, *first_conn = NULL;
2117 SPIDER_RESULT_LIST *result_list = &spider->result_list;
2118 bool with_lock = FALSE;
2119 DBUG_ENTER("spider_bg_conn_search");
2120 DBUG_PRINT("info",("spider spider=%p", spider));
2121#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2122 if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
2123 {
2124#endif
2125 conn = spider->conns[link_idx];
2126 with_lock = (spider_conn_lock_mode(spider) != SPIDER_LOCK_MODE_NO_LOCK);
2127 first_conn = spider->conns[first_link_idx];
2128#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2129 } else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_READ)
2130 conn = spider->hs_r_conns[link_idx];
2131 else
2132 conn = spider->hs_w_conns[link_idx];
2133#endif
2134 if (first)
2135 {
2136 if (spider->use_pre_call)
2137 {
2138 DBUG_PRINT("info",("spider skip bg first search"));
2139 } else {
2140 DBUG_PRINT("info",("spider bg first search"));
2141 pthread_mutex_lock(&conn->bg_conn_mutex);
2142 result_list->bgs_working = TRUE;
2143 conn->bg_search = TRUE;
2144 conn->bg_caller_wait = TRUE;
2145 conn->bg_target = spider;
2146 conn->link_idx = link_idx;
2147 conn->bg_discard_result = discard_result;
2148 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2149 pthread_cond_signal(&conn->bg_conn_cond);
2150 pthread_mutex_unlock(&conn->bg_conn_mutex);
2151 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
2152 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2153 conn->bg_caller_wait = FALSE;
2154 if (result_list->bgs_error)
2155 {
2156 if (result_list->bgs_error_with_message)
2157 my_message(result_list->bgs_error,
2158 result_list->bgs_error_msg, MYF(0));
2159 DBUG_RETURN(result_list->bgs_error);
2160 }
2161 }
2162 if (result_list->bgs_working || !result_list->finish_flg)
2163 {
2164 pthread_mutex_lock(&conn->bg_conn_mutex);
2165 if (!result_list->finish_flg)
2166 {
2167 DBUG_PRINT("info",("spider bg second search"));
2168 if (!spider->use_pre_call || pre_next)
2169 {
2170 if (result_list->bgs_error)
2171 {
2172 pthread_mutex_unlock(&conn->bg_conn_mutex);
2173 DBUG_PRINT("info",("spider bg error"));
2174 if (result_list->bgs_error == HA_ERR_END_OF_FILE)
2175 {
2176 DBUG_PRINT("info",("spider bg current->finish_flg=%s",
2177 result_list->current ?
2178 (result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
2179 DBUG_RETURN(0);
2180 }
2181 if (result_list->bgs_error_with_message)
2182 my_message(result_list->bgs_error,
2183 result_list->bgs_error_msg, MYF(0));
2184 DBUG_RETURN(result_list->bgs_error);
2185 }
2186 DBUG_PRINT("info",("spider result_list->quick_mode=%d",
2187 result_list->quick_mode));
2188 DBUG_PRINT("info",("spider result_list->bgs_current->result=%p",
2189 result_list->bgs_current->result));
2190 if (
2191 result_list->quick_mode == 0 ||
2192 !result_list->bgs_current->result
2193 ) {
2194 DBUG_PRINT("info",("spider result_list->bgs_second_read=%lld",
2195 result_list->bgs_second_read));
2196 DBUG_PRINT("info",("spider result_list->bgs_split_read=%lld",
2197 result_list->bgs_split_read));
2198 result_list->split_read =
2199 result_list->bgs_second_read > 0 ?
2200 result_list->bgs_second_read :
2201 result_list->bgs_split_read;
2202 result_list->limit_num =
2203 result_list->internal_limit - result_list->record_num >=
2204 result_list->split_read ?
2205 result_list->split_read :
2206 result_list->internal_limit - result_list->record_num;
2207 DBUG_PRINT("info",("spider sql_kinds=%u", spider->sql_kinds));
2208 if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
2209 {
2210 if ((error_num = spider->reappend_limit_sql_part(
2211 result_list->internal_offset + result_list->record_num,
2212 result_list->limit_num,
2213 SPIDER_SQL_TYPE_SELECT_SQL)))
2214 {
2215 pthread_mutex_unlock(&conn->bg_conn_mutex);
2216 DBUG_RETURN(error_num);
2217 }
2218 if (
2219 !result_list->use_union &&
2220 (error_num = spider->append_select_lock_sql_part(
2221 SPIDER_SQL_TYPE_SELECT_SQL))
2222 ) {
2223 pthread_mutex_unlock(&conn->bg_conn_mutex);
2224 DBUG_RETURN(error_num);
2225 }
2226 }
2227 if (spider->sql_kinds & SPIDER_SQL_KIND_HANDLER)
2228 {
2229 spider_db_append_handler_next(spider);
2230 if ((error_num = spider->reappend_limit_sql_part(
2231 0, result_list->limit_num,
2232 SPIDER_SQL_TYPE_HANDLER)))
2233 {
2234 pthread_mutex_unlock(&conn->bg_conn_mutex);
2235 DBUG_RETURN(error_num);
2236 }
2237 }
2238 }
2239 result_list->bgs_phase = 2;
2240 }
2241 result_list->bgs_working = TRUE;
2242 conn->bg_search = TRUE;
2243 if (with_lock)
2244 conn->bg_conn_chain_mutex_ptr = &first_conn->bg_conn_chain_mutex;
2245 conn->bg_caller_sync_wait = TRUE;
2246 conn->bg_target = spider;
2247 conn->link_idx = link_idx;
2248 conn->bg_discard_result = discard_result;
2249#ifdef SPIDER_HAS_GROUP_BY_HANDLER
2250 conn->link_idx_chain = spider->link_idx_chain;
2251#endif
2252 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2253 pthread_cond_signal(&conn->bg_conn_cond);
2254 pthread_mutex_unlock(&conn->bg_conn_mutex);
2255 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
2256 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2257 conn->bg_caller_sync_wait = FALSE;
2258 } else {
2259 pthread_mutex_unlock(&conn->bg_conn_mutex);
2260 DBUG_PRINT("info",("spider bg current->finish_flg=%s",
2261 result_list->current ?
2262 (result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
2263 if (result_list->bgs_error)
2264 {
2265 DBUG_PRINT("info",("spider bg error"));
2266 if (result_list->bgs_error != HA_ERR_END_OF_FILE)
2267 {
2268 if (result_list->bgs_error_with_message)
2269 my_message(result_list->bgs_error,
2270 result_list->bgs_error_msg, MYF(0));
2271 DBUG_RETURN(result_list->bgs_error);
2272 }
2273 }
2274 }
2275 } else {
2276 DBUG_PRINT("info",("spider bg current->finish_flg=%s",
2277 result_list->current ?
2278 (result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
2279 if (result_list->bgs_error)
2280 {
2281 DBUG_PRINT("info",("spider bg error"));
2282 if (result_list->bgs_error != HA_ERR_END_OF_FILE)
2283 {
2284 if (result_list->bgs_error_with_message)
2285 my_message(result_list->bgs_error,
2286 result_list->bgs_error_msg, MYF(0));
2287 DBUG_RETURN(result_list->bgs_error);
2288 }
2289 }
2290 }
2291 } else {
2292 DBUG_PRINT("info",("spider bg search"));
2293 if (result_list->current->finish_flg)
2294 {
2295 DBUG_PRINT("info",("spider bg end of file"));
2296 result_list->table->status = STATUS_NOT_FOUND;
2297 DBUG_RETURN(HA_ERR_END_OF_FILE);
2298 }
2299 if (result_list->bgs_working)
2300 {
2301 /* wait */
2302 DBUG_PRINT("info",("spider bg working wait"));
2303 pthread_mutex_lock(&conn->bg_conn_mutex);
2304 pthread_mutex_unlock(&conn->bg_conn_mutex);
2305 }
2306 if (result_list->bgs_error)
2307 {
2308 DBUG_PRINT("info",("spider bg error"));
2309 if (result_list->bgs_error == HA_ERR_END_OF_FILE)
2310 {
2311 result_list->current = result_list->current->next;
2312 result_list->current_row_num = 0;
2313 result_list->table->status = STATUS_NOT_FOUND;
2314 }
2315 if (result_list->bgs_error_with_message)
2316 my_message(result_list->bgs_error,
2317 result_list->bgs_error_msg, MYF(0));
2318 DBUG_RETURN(result_list->bgs_error);
2319 }
2320 result_list->current = result_list->current->next;
2321 result_list->current_row_num = 0;
2322 if (result_list->current == result_list->bgs_current)
2323 {
2324 DBUG_PRINT("info",("spider bg next search"));
2325 if (!result_list->current->finish_flg)
2326 {
2327 DBUG_PRINT("info",("spider result_list->quick_mode=%d",
2328 result_list->quick_mode));
2329 DBUG_PRINT("info",("spider result_list->bgs_current->result=%p",
2330 result_list->bgs_current->result));
2331 pthread_mutex_lock(&conn->bg_conn_mutex);
2332 result_list->bgs_phase = 3;
2333 if (
2334 result_list->quick_mode == 0 ||
2335 !result_list->bgs_current->result
2336 ) {
2337 result_list->split_read = result_list->bgs_split_read;
2338 result_list->limit_num =
2339 result_list->internal_limit - result_list->record_num >=
2340 result_list->split_read ?
2341 result_list->split_read :
2342 result_list->internal_limit - result_list->record_num;
2343 DBUG_PRINT("info",("spider sql_kinds=%u", spider->sql_kinds));
2344 if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
2345 {
2346 if ((error_num = spider->reappend_limit_sql_part(
2347 result_list->internal_offset + result_list->record_num,
2348 result_list->limit_num,
2349 SPIDER_SQL_TYPE_SELECT_SQL)))
2350 {
2351 pthread_mutex_unlock(&conn->bg_conn_mutex);
2352 DBUG_RETURN(error_num);
2353 }
2354 if (
2355 !result_list->use_union &&
2356 (error_num = spider->append_select_lock_sql_part(
2357 SPIDER_SQL_TYPE_SELECT_SQL))
2358 ) {
2359 pthread_mutex_unlock(&conn->bg_conn_mutex);
2360 DBUG_RETURN(error_num);
2361 }
2362 }
2363 if (spider->sql_kinds & SPIDER_SQL_KIND_HANDLER)
2364 {
2365 spider_db_append_handler_next(spider);
2366 if ((error_num = spider->reappend_limit_sql_part(
2367 0, result_list->limit_num,
2368 SPIDER_SQL_TYPE_HANDLER)))
2369 {
2370 pthread_mutex_unlock(&conn->bg_conn_mutex);
2371 DBUG_RETURN(error_num);
2372 }
2373 }
2374 }
2375 conn->bg_target = spider;
2376 conn->link_idx = link_idx;
2377 conn->bg_discard_result = discard_result;
2378#ifdef SPIDER_HAS_GROUP_BY_HANDLER
2379 conn->link_idx_chain = spider->link_idx_chain;
2380#endif
2381 result_list->bgs_working = TRUE;
2382 conn->bg_search = TRUE;
2383 if (with_lock)
2384 conn->bg_conn_chain_mutex_ptr = &first_conn->bg_conn_chain_mutex;
2385 conn->bg_caller_sync_wait = TRUE;
2386 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2387 pthread_cond_signal(&conn->bg_conn_cond);
2388 pthread_mutex_unlock(&conn->bg_conn_mutex);
2389 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
2390 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2391 conn->bg_caller_sync_wait = FALSE;
2392 }
2393 }
2394 }
2395 DBUG_RETURN(0);
2396}
2397
2398void spider_bg_conn_simple_action(
2399 SPIDER_CONN *conn,
2400 uint simple_action,
2401 bool caller_wait,
2402 void *target,
2403 uint link_idx,
2404 int *error_num
2405) {
2406 DBUG_ENTER("spider_bg_conn_simple_action");
2407 pthread_mutex_lock(&conn->bg_conn_mutex);
2408 conn->bg_target = target;
2409 conn->link_idx = link_idx;
2410 conn->bg_simple_action = simple_action;
2411 conn->bg_error_num = error_num;
2412 if (caller_wait)
2413 {
2414 conn->bg_caller_wait = TRUE;
2415 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2416 } else {
2417 conn->bg_caller_sync_wait = TRUE;
2418 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2419 }
2420 pthread_cond_signal(&conn->bg_conn_cond);
2421 pthread_mutex_unlock(&conn->bg_conn_mutex);
2422 if (caller_wait)
2423 {
2424 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
2425 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2426 conn->bg_caller_wait = FALSE;
2427 } else {
2428 pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
2429 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2430 conn->bg_caller_sync_wait = FALSE;
2431 }
2432 DBUG_VOID_RETURN;
2433}
2434
2435void *spider_bg_conn_action(
2436 void *arg
2437) {
2438 int error_num;
2439 SPIDER_CONN *conn = (SPIDER_CONN*) arg;
2440 SPIDER_TRX *trx;
2441 ha_spider *spider;
2442 SPIDER_RESULT_LIST *result_list;
2443 THD *thd;
2444 my_thread_init();
2445 DBUG_ENTER("spider_bg_conn_action");
2446 /* init start */
2447 if (!(thd = SPIDER_new_THD(next_thread_id())))
2448 {
2449 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2450 pthread_cond_signal(&conn->bg_conn_sync_cond);
2451 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2452 my_thread_end();
2453 DBUG_RETURN(NULL);
2454 }
2455 SPIDER_set_next_thread_id(thd);
2456#ifdef HAVE_PSI_INTERFACE
2457 mysql_thread_set_psi_id(thd->thread_id);
2458#endif
2459 thd->thread_stack = (char*) &thd;
2460 thd->store_globals();
2461 if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
2462 {
2463 delete thd;
2464 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2465 pthread_cond_signal(&conn->bg_conn_sync_cond);
2466 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2467#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
2468 my_pthread_setspecific_ptr(THR_THD, NULL);
2469#endif
2470 my_thread_end();
2471 DBUG_RETURN(NULL);
2472 }
2473 /* lex_start(thd); */
2474 conn->bg_thd = thd;
2475 pthread_mutex_lock(&conn->bg_conn_mutex);
2476 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2477 pthread_cond_signal(&conn->bg_conn_sync_cond);
2478 conn->bg_init = TRUE;
2479 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2480 /* init end */
2481
2482 while (TRUE)
2483 {
2484 if (conn->bg_conn_chain_mutex_ptr)
2485 {
2486 pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
2487 conn->bg_conn_chain_mutex_ptr = NULL;
2488 }
2489 thd->clear_error();
2490 pthread_cond_wait(&conn->bg_conn_cond, &conn->bg_conn_mutex);
2491 DBUG_PRINT("info",("spider bg roop start"));
2492#ifndef DBUG_OFF
2493 DBUG_PRINT("info",("spider conn->thd=%p", conn->thd));
2494 if (conn->thd)
2495 {
2496 DBUG_PRINT("info",("spider query_id=%lld", conn->thd->query_id));
2497 }
2498#endif
2499 if (conn->bg_caller_sync_wait)
2500 {
2501 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2502 if (conn->bg_direct_sql)
2503 conn->bg_get_job_stack_off = TRUE;
2504 pthread_cond_signal(&conn->bg_conn_sync_cond);
2505 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2506 if (conn->bg_conn_chain_mutex_ptr)
2507 {
2508 pthread_mutex_lock(conn->bg_conn_chain_mutex_ptr);
2509 if ((&conn->bg_conn_chain_mutex) != conn->bg_conn_chain_mutex_ptr)
2510 {
2511 pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
2512 conn->bg_conn_chain_mutex_ptr = NULL;
2513 }
2514 }
2515 }
2516 if (conn->bg_kill)
2517 {
2518 DBUG_PRINT("info",("spider bg kill start"));
2519 if (conn->bg_conn_chain_mutex_ptr)
2520 {
2521 pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
2522 conn->bg_conn_chain_mutex_ptr = NULL;
2523 }
2524 spider_free_trx(trx, TRUE);
2525 /* lex_end(thd->lex); */
2526 delete thd;
2527 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2528 pthread_cond_signal(&conn->bg_conn_sync_cond);
2529 pthread_mutex_unlock(&conn->bg_conn_mutex);
2530 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2531#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
2532 my_pthread_setspecific_ptr(THR_THD, NULL);
2533#endif
2534 my_thread_end();
2535 DBUG_RETURN(NULL);
2536 }
2537 if (conn->bg_get_job_stack)
2538 {
2539 conn->bg_get_job_stack = FALSE;
2540 if (!spider_bg_conn_get_job(conn))
2541 {
2542 conn->bg_direct_sql = FALSE;
2543 }
2544 }
2545 if (conn->bg_search)
2546 {
2547 SPIDER_SHARE *share;
2548 spider_db_handler *dbton_handler;
2549 DBUG_PRINT("info",("spider bg search start"));
2550 spider = (ha_spider*) conn->bg_target;
2551 share = spider->share;
2552 dbton_handler = spider->dbton_handler[conn->dbton_id];
2553 result_list = &spider->result_list;
2554 result_list->bgs_error = 0;
2555 result_list->bgs_error_with_message = FALSE;
2556 if (
2557 result_list->quick_mode == 0 ||
2558 result_list->bgs_phase == 1 ||
2559 !result_list->bgs_current->result
2560 ) {
2561 ulong sql_type;
2562#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2563 if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
2564 {
2565#endif
2566 if (spider->sql_kind[conn->link_idx] == SPIDER_SQL_KIND_SQL)
2567 {
2568 sql_type = SPIDER_SQL_TYPE_SELECT_SQL | SPIDER_SQL_TYPE_TMP_SQL;
2569 } else {
2570 sql_type = SPIDER_SQL_TYPE_HANDLER;
2571 }
2572#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2573 } else {
2574 sql_type = SPIDER_SQL_TYPE_SELECT_HS;
2575 }
2576#endif
2577 if (dbton_handler->need_lock_before_set_sql_for_exec(sql_type))
2578 {
2579 pthread_mutex_lock(&conn->mta_conn_mutex);
2580 SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
2581 }
2582 if (spider->use_fields)
2583 {
2584 if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
2585 conn->link_idx, conn->link_idx_chain)))
2586 {
2587 result_list->bgs_error = error_num;
2588 if ((result_list->bgs_error_with_message = thd->is_error()))
2589 strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
2590 }
2591 } else {
2592 if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
2593 conn->link_idx)))
2594 {
2595 result_list->bgs_error = error_num;
2596 if ((result_list->bgs_error_with_message = thd->is_error()))
2597 strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
2598 }
2599 }
2600 if (!dbton_handler->need_lock_before_set_sql_for_exec(sql_type))
2601 {
2602 pthread_mutex_lock(&conn->mta_conn_mutex);
2603 SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
2604 }
2605 sql_type &= ~SPIDER_SQL_TYPE_TMP_SQL;
2606 DBUG_PRINT("info",("spider sql_type=%lu", sql_type));
2607#ifdef HA_CAN_BULK_ACCESS
2608 if (spider->is_bulk_access_clone)
2609 {
2610 spider->connection_ids[conn->link_idx] = conn->connection_id;
2611 spider_trx_add_bulk_access_conn(spider->trx, conn);
2612 }
2613#endif
2614 if (!result_list->bgs_error)
2615 {
2616 conn->need_mon = &spider->need_mons[conn->link_idx];
2617 conn->mta_conn_mutex_lock_already = TRUE;
2618 conn->mta_conn_mutex_unlock_later = TRUE;
2619#ifdef HA_CAN_BULK_ACCESS
2620 if (!spider->is_bulk_access_clone)
2621 {
2622#endif
2623 if (!(result_list->bgs_error =
2624 spider_db_set_names(spider, conn, conn->link_idx)))
2625 {
2626 if (
2627 result_list->tmp_table_join && spider->bka_mode != 2 &&
2628 spider_bit_is_set(result_list->tmp_table_join_first,
2629 conn->link_idx)
2630 ) {
2631 spider_clear_bit(result_list->tmp_table_join_first,
2632 conn->link_idx);
2633 spider_set_bit(result_list->tmp_table_created,
2634 conn->link_idx);
2635 result_list->tmp_tables_created = TRUE;
2636 spider_conn_set_timeout_from_share(conn, conn->link_idx,
2637 spider->trx->thd, share);
2638 if (dbton_handler->execute_sql(
2639 SPIDER_SQL_TYPE_TMP_SQL,
2640 conn,
2641 -1,
2642 &spider->need_mons[conn->link_idx])
2643 ) {
2644 result_list->bgs_error = spider_db_errorno(conn);
2645 if ((result_list->bgs_error_with_message = thd->is_error()))
2646 strmov(result_list->bgs_error_msg,
2647 spider_stmt_da_message(thd));
2648 } else
2649 spider_db_discard_multiple_result(spider, conn->link_idx,
2650 conn);
2651 }
2652 if (!result_list->bgs_error)
2653 {
2654 spider_conn_set_timeout_from_share(conn, conn->link_idx,
2655 spider->trx->thd, share);
2656 if (dbton_handler->execute_sql(
2657 sql_type,
2658 conn,
2659 result_list->quick_mode,
2660 &spider->need_mons[conn->link_idx])
2661 ) {
2662 result_list->bgs_error = spider_db_errorno(conn);
2663 if ((result_list->bgs_error_with_message = thd->is_error()))
2664 strmov(result_list->bgs_error_msg,
2665 spider_stmt_da_message(thd));
2666 } else {
2667 spider->connection_ids[conn->link_idx] = conn->connection_id;
2668 if (!conn->bg_discard_result)
2669 {
2670 if (!(result_list->bgs_error =
2671 spider_db_store_result(spider, conn->link_idx,
2672 result_list->table)))
2673 spider->result_link_idx = conn->link_idx;
2674 else {
2675 if ((result_list->bgs_error_with_message =
2676 thd->is_error()))
2677 strmov(result_list->bgs_error_msg,
2678 spider_stmt_da_message(thd));
2679 }
2680 } else {
2681 result_list->bgs_error = 0;
2682 spider_db_discard_result(spider, conn->link_idx, conn);
2683 }
2684 }
2685 }
2686 } else {
2687 if ((result_list->bgs_error_with_message = thd->is_error()))
2688 strmov(result_list->bgs_error_msg,
2689 spider_stmt_da_message(thd));
2690 }
2691#ifdef HA_CAN_BULK_ACCESS
2692 }
2693#endif
2694 conn->mta_conn_mutex_lock_already = FALSE;
2695 conn->mta_conn_mutex_unlock_later = FALSE;
2696 SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
2697 pthread_mutex_unlock(&conn->mta_conn_mutex);
2698 } else {
2699 SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
2700 pthread_mutex_unlock(&conn->mta_conn_mutex);
2701 }
2702 } else {
2703 spider->connection_ids[conn->link_idx] = conn->connection_id;
2704 conn->mta_conn_mutex_unlock_later = TRUE;
2705 result_list->bgs_error =
2706 spider_db_store_result(spider, conn->link_idx, result_list->table);
2707 if ((result_list->bgs_error_with_message = thd->is_error()))
2708 strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
2709 conn->mta_conn_mutex_unlock_later = FALSE;
2710 }
2711 conn->bg_search = FALSE;
2712 result_list->bgs_working = FALSE;
2713 if (conn->bg_caller_wait)
2714 {
2715 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2716 pthread_cond_signal(&conn->bg_conn_sync_cond);
2717 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2718 }
2719 continue;
2720 }
2721 if (conn->bg_direct_sql)
2722 {
2723 bool is_error = FALSE;
2724 DBUG_PRINT("info",("spider bg direct sql start"));
2725 do {
2726 SPIDER_DIRECT_SQL *direct_sql = (SPIDER_DIRECT_SQL *) conn->bg_target;
2727 if (
2728 (error_num = spider_db_udf_direct_sql(direct_sql))
2729 ) {
2730 if (thd->is_error())
2731 {
2732 if (
2733 direct_sql->error_rw_mode &&
2734 spider_db_conn_is_network_error(error_num)
2735 ) {
2736 thd->clear_error();
2737 } else {
2738 SPIDER_BG_DIRECT_SQL *bg_direct_sql =
2739 (SPIDER_BG_DIRECT_SQL *) direct_sql->parent;
2740 pthread_mutex_lock(direct_sql->bg_mutex);
2741 bg_direct_sql->bg_error = spider_stmt_da_sql_errno(thd);
2742 strmov((char *) bg_direct_sql->bg_error_msg,
2743 spider_stmt_da_message(thd));
2744 pthread_mutex_unlock(direct_sql->bg_mutex);
2745 is_error = TRUE;
2746 }
2747 }
2748 }
2749 if (direct_sql->modified_non_trans_table)
2750 {
2751 SPIDER_BG_DIRECT_SQL *bg_direct_sql =
2752 (SPIDER_BG_DIRECT_SQL *) direct_sql->parent;
2753 pthread_mutex_lock(direct_sql->bg_mutex);
2754 bg_direct_sql->modified_non_trans_table = TRUE;
2755 pthread_mutex_unlock(direct_sql->bg_mutex);
2756 }
2757 spider_udf_free_direct_sql_alloc(direct_sql, TRUE);
2758 } while (!is_error && spider_bg_conn_get_job(conn));
2759 if (is_error)
2760 {
2761 while (spider_bg_conn_get_job(conn))
2762 spider_udf_free_direct_sql_alloc(
2763 (SPIDER_DIRECT_SQL *) conn->bg_target, TRUE);
2764 }
2765 conn->bg_direct_sql = FALSE;
2766 continue;
2767 }
2768 if (conn->bg_exec_sql)
2769 {
2770 DBUG_PRINT("info",("spider bg exec sql start"));
2771 spider = (ha_spider*) conn->bg_target;
2772 *conn->bg_error_num = spider_db_query_with_set_names(
2773 conn->bg_sql_type,
2774 spider,
2775 conn,
2776 conn->link_idx
2777 );
2778 conn->bg_exec_sql = FALSE;
2779 continue;
2780 }
2781 if (conn->bg_simple_action)
2782 {
2783 switch (conn->bg_simple_action)
2784 {
2785 case SPIDER_BG_SIMPLE_CONNECT:
2786 conn->db_conn->bg_connect();
2787 break;
2788 case SPIDER_BG_SIMPLE_DISCONNECT:
2789 conn->db_conn->bg_disconnect();
2790 break;
2791 case SPIDER_BG_SIMPLE_RECORDS:
2792 DBUG_PRINT("info",("spider bg simple records"));
2793 spider = (ha_spider*) conn->bg_target;
2794 *conn->bg_error_num =
2795 spider->dbton_handler[conn->dbton_id]->
2796 show_records(conn->link_idx);
2797 break;
2798 default:
2799 break;
2800 }
2801 conn->bg_simple_action = SPIDER_BG_SIMPLE_NO_ACTION;
2802 if (conn->bg_caller_wait)
2803 {
2804 pthread_mutex_lock(&conn->bg_conn_sync_mutex);
2805 pthread_cond_signal(&conn->bg_conn_sync_cond);
2806 pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
2807 }
2808 continue;
2809 }
2810 if (conn->bg_break)
2811 {
2812 DBUG_PRINT("info",("spider bg break start"));
2813 spider = (ha_spider*) conn->bg_target;
2814 result_list = &spider->result_list;
2815 result_list->bgs_working = FALSE;
2816 continue;
2817 }
2818 }
2819}
2820
2821int spider_create_sts_thread(
2822 SPIDER_SHARE *share
2823) {
2824 int error_num;
2825 DBUG_ENTER("spider_create_sts_thread");
2826 if (!share->bg_sts_init)
2827 {
2828#if MYSQL_VERSION_ID < 50500
2829 if (pthread_cond_init(&share->bg_sts_cond, NULL))
2830#else
2831 if (mysql_cond_init(spd_key_cond_bg_sts,
2832 &share->bg_sts_cond, NULL))
2833#endif
2834 {
2835 error_num = HA_ERR_OUT_OF_MEM;
2836 goto error_cond_init;
2837 }
2838#if MYSQL_VERSION_ID < 50500
2839 if (pthread_cond_init(&share->bg_sts_sync_cond, NULL))
2840#else
2841 if (mysql_cond_init(spd_key_cond_bg_sts_sync,
2842 &share->bg_sts_sync_cond, NULL))
2843#endif
2844 {
2845 error_num = HA_ERR_OUT_OF_MEM;
2846 goto error_sync_cond_init;
2847 }
2848#if MYSQL_VERSION_ID < 50500
2849 if (pthread_create(&share->bg_sts_thread, &spider_pt_attr,
2850 spider_bg_sts_action, (void *) share)
2851 )
2852#else
2853 if (mysql_thread_create(spd_key_thd_bg_sts, &share->bg_sts_thread,
2854 &spider_pt_attr, spider_bg_sts_action, (void *) share)
2855 )
2856#endif
2857 {
2858 error_num = HA_ERR_OUT_OF_MEM;
2859 goto error_thread_create;
2860 }
2861 share->bg_sts_init = TRUE;
2862 }
2863 DBUG_RETURN(0);
2864
2865error_thread_create:
2866 pthread_cond_destroy(&share->bg_sts_sync_cond);
2867error_sync_cond_init:
2868 pthread_cond_destroy(&share->bg_sts_cond);
2869error_cond_init:
2870 DBUG_RETURN(error_num);
2871}
2872
2873void spider_free_sts_thread(
2874 SPIDER_SHARE *share
2875) {
2876 DBUG_ENTER("spider_free_sts_thread");
2877 if (share->bg_sts_init)
2878 {
2879 pthread_mutex_lock(&share->sts_mutex);
2880 share->bg_sts_kill = TRUE;
2881 pthread_cond_signal(&share->bg_sts_cond);
2882 pthread_cond_wait(&share->bg_sts_sync_cond, &share->sts_mutex);
2883 pthread_mutex_unlock(&share->sts_mutex);
2884 pthread_join(share->bg_sts_thread, NULL);
2885 pthread_cond_destroy(&share->bg_sts_sync_cond);
2886 pthread_cond_destroy(&share->bg_sts_cond);
2887 share->bg_sts_thd_wait = FALSE;
2888 share->bg_sts_kill = FALSE;
2889 share->bg_sts_init = FALSE;
2890 }
2891 DBUG_VOID_RETURN;
2892}
2893
2894void *spider_bg_sts_action(
2895 void *arg
2896) {
2897 SPIDER_SHARE *share = (SPIDER_SHARE*) arg;
2898 SPIDER_TRX *trx;
2899 int error_num = 0, roop_count;
2900 ha_spider spider;
2901 int *need_mons;
2902 SPIDER_CONN **conns;
2903 uint *conn_link_idx;
2904 uchar *conn_can_fo;
2905 char **conn_keys;
2906#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2907 char **hs_r_conn_keys;
2908 char **hs_w_conn_keys;
2909#endif
2910 spider_db_handler **dbton_hdl;
2911 THD *thd;
2912 my_thread_init();
2913 DBUG_ENTER("spider_bg_sts_action");
2914 /* init start */
2915 char *ptr;
2916#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2917 ptr = (char *) my_alloca(
2918 (sizeof(int) * share->link_count) +
2919 (sizeof(SPIDER_CONN *) * share->link_count) +
2920 (sizeof(uint) * share->link_count) +
2921 (sizeof(uchar) * share->link_bitmap_size) +
2922 (sizeof(char *) * share->link_count) +
2923 (sizeof(char *) * share->link_count) +
2924 (sizeof(char *) * share->link_count) +
2925 (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
2926#else
2927 ptr = (char *) my_alloca(
2928 (sizeof(int) * share->link_count) +
2929 (sizeof(SPIDER_CONN *) * share->link_count) +
2930 (sizeof(uint) * share->link_count) +
2931 (sizeof(uchar) * share->link_bitmap_size) +
2932 (sizeof(char *) * share->link_count) +
2933 (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
2934#endif
2935 if (!ptr)
2936 {
2937 pthread_mutex_lock(&share->sts_mutex);
2938 share->bg_sts_thd_wait = FALSE;
2939 share->bg_sts_kill = FALSE;
2940 share->bg_sts_init = FALSE;
2941 pthread_mutex_unlock(&share->sts_mutex);
2942 my_thread_end();
2943 DBUG_RETURN(NULL);
2944 }
2945 need_mons = (int *) ptr;
2946 ptr += (sizeof(int) * share->link_count);
2947 conns = (SPIDER_CONN **) ptr;
2948 ptr += (sizeof(SPIDER_CONN *) * share->link_count);
2949 conn_link_idx = (uint *) ptr;
2950 ptr += (sizeof(uint) * share->link_count);
2951 conn_can_fo = (uchar *) ptr;
2952 ptr += (sizeof(uchar) * share->link_bitmap_size);
2953 conn_keys = (char **) ptr;
2954 ptr += (sizeof(char *) * share->link_count);
2955#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
2956 hs_r_conn_keys = (char **) ptr;
2957 ptr += (sizeof(char *) * share->link_count);
2958 hs_w_conn_keys = (char **) ptr;
2959 ptr += (sizeof(char *) * share->link_count);
2960#endif
2961 dbton_hdl = (spider_db_handler **) ptr;
2962 pthread_mutex_lock(&share->sts_mutex);
2963 if (!(thd = SPIDER_new_THD(next_thread_id())))
2964 {
2965 share->bg_sts_thd_wait = FALSE;
2966 share->bg_sts_kill = FALSE;
2967 share->bg_sts_init = FALSE;
2968 pthread_mutex_unlock(&share->sts_mutex);
2969 my_thread_end();
2970 my_afree(need_mons);
2971 DBUG_RETURN(NULL);
2972 }
2973 SPIDER_set_next_thread_id(thd);
2974#ifdef HAVE_PSI_INTERFACE
2975 mysql_thread_set_psi_id(thd->thread_id);
2976#endif
2977 thd->thread_stack = (char*) &thd;
2978 thd->store_globals();
2979 if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
2980 {
2981 delete thd;
2982 share->bg_sts_thd_wait = FALSE;
2983 share->bg_sts_kill = FALSE;
2984 share->bg_sts_init = FALSE;
2985 pthread_mutex_unlock(&share->sts_mutex);
2986#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
2987 my_pthread_setspecific_ptr(THR_THD, NULL);
2988#endif
2989 my_thread_end();
2990 my_afree(need_mons);
2991 DBUG_RETURN(NULL);
2992 }
2993 share->bg_sts_thd = thd;
2994 spider.trx = trx;
2995 spider.share = share;
2996 spider.conns = conns;
2997 spider.conn_link_idx = conn_link_idx;
2998 spider.conn_can_fo = conn_can_fo;
2999 spider.need_mons = need_mons;
3000 spider.conn_keys_first_ptr = share->conn_keys[0];
3001 spider.conn_keys = conn_keys;
3002#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
3003 spider.hs_r_conn_keys = hs_r_conn_keys;
3004 spider.hs_w_conn_keys = hs_w_conn_keys;
3005#endif
3006 spider.dbton_handler = dbton_hdl;
3007 memset(conns, 0, sizeof(SPIDER_CONN *) * share->link_count);
3008 memset(need_mons, 0, sizeof(int) * share->link_count);
3009 memset(dbton_hdl, 0, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE);
3010 spider_trx_set_link_idx_for_all(&spider);
3011 spider.search_link_idx = spider_conn_first_link_idx(thd,
3012 share->link_statuses, share->access_balances, spider.conn_link_idx,
3013 share->link_count, SPIDER_LINK_STATUS_OK);
3014 for (roop_count = 0; roop_count < SPIDER_DBTON_SIZE; roop_count++)
3015 {
3016 if (
3017 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3018 spider_dbton[roop_count].create_db_handler
3019 ) {
3020 if (!(dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
3021 &spider, share->dbton_share[roop_count])))
3022 break;
3023 if (dbton_hdl[roop_count]->init())
3024 break;
3025 }
3026 }
3027 if (roop_count < SPIDER_DBTON_SIZE)
3028 {
3029 DBUG_PRINT("info",("spider handler init error"));
3030 for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
3031 {
3032 if (
3033 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3034 dbton_hdl[roop_count]
3035 ) {
3036 delete dbton_hdl[roop_count];
3037 dbton_hdl[roop_count] = NULL;
3038 }
3039 }
3040 spider_free_trx(trx, TRUE);
3041 delete thd;
3042 share->bg_sts_thd_wait = FALSE;
3043 share->bg_sts_kill = FALSE;
3044 share->bg_sts_init = FALSE;
3045 pthread_mutex_unlock(&share->sts_mutex);
3046#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3047 my_pthread_setspecific_ptr(THR_THD, NULL);
3048#endif
3049 my_thread_end();
3050 my_afree(need_mons);
3051 DBUG_RETURN(NULL);
3052 }
3053 /* init end */
3054
3055 while (TRUE)
3056 {
3057 DBUG_PRINT("info",("spider bg sts roop start"));
3058 if (share->bg_sts_kill)
3059 {
3060 DBUG_PRINT("info",("spider bg sts kill start"));
3061 for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
3062 {
3063 if (
3064 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3065 dbton_hdl[roop_count]
3066 ) {
3067 delete dbton_hdl[roop_count];
3068 dbton_hdl[roop_count] = NULL;
3069 }
3070 }
3071 spider_free_trx(trx, TRUE);
3072 delete thd;
3073 pthread_cond_signal(&share->bg_sts_sync_cond);
3074 pthread_mutex_unlock(&share->sts_mutex);
3075#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3076 my_pthread_setspecific_ptr(THR_THD, NULL);
3077#endif
3078 my_thread_end();
3079 my_afree(need_mons);
3080 DBUG_RETURN(NULL);
3081 }
3082 if (spider.search_link_idx < 0)
3083 {
3084 spider_trx_set_link_idx_for_all(&spider);
3085/*
3086 spider.search_link_idx = spider_conn_next_link_idx(
3087 thd, share->link_statuses, share->access_balances,
3088 spider.conn_link_idx, spider.search_link_idx, share->link_count,
3089 SPIDER_LINK_STATUS_OK);
3090*/
3091 spider.search_link_idx = spider_conn_first_link_idx(thd,
3092 share->link_statuses, share->access_balances, spider.conn_link_idx,
3093 share->link_count, SPIDER_LINK_STATUS_OK);
3094 }
3095 if (spider.search_link_idx >= 0)
3096 {
3097 if (difftime(share->bg_sts_try_time, share->sts_get_time) >=
3098 share->bg_sts_interval)
3099 {
3100 if (!conns[spider.search_link_idx])
3101 {
3102 spider_get_conn(share, spider.search_link_idx,
3103 share->conn_keys[spider.search_link_idx],
3104 trx, &spider, FALSE, FALSE, SPIDER_CONN_KIND_MYSQL,
3105 &error_num);
3106 conns[spider.search_link_idx]->error_mode = 0;
3107/*
3108 if (
3109 error_num &&
3110 share->monitoring_kind[spider.search_link_idx] &&
3111 need_mons[spider.search_link_idx]
3112 ) {
3113 lex_start(thd);
3114 error_num = spider_ping_table_mon_from_table(
3115 trx,
3116 thd,
3117 share,
3118 spider.search_link_idx,
3119 (uint32) share->monitoring_sid[spider.search_link_idx],
3120 share->table_name,
3121 share->table_name_length,
3122 spider.conn_link_idx[spider.search_link_idx],
3123 NULL,
3124 0,
3125 share->monitoring_kind[spider.search_link_idx],
3126 share->monitoring_limit[spider.search_link_idx],
3127 share->monitoring_flag[spider.search_link_idx],
3128 TRUE
3129 );
3130 lex_end(thd->lex);
3131 }
3132*/
3133 spider.search_link_idx = -1;
3134 }
3135 if (spider.search_link_idx != -1 && conns[spider.search_link_idx])
3136 {
3137#ifdef WITH_PARTITION_STORAGE_ENGINE
3138 if (spider_get_sts(share, spider.search_link_idx,
3139 share->bg_sts_try_time, &spider,
3140 share->bg_sts_interval, share->bg_sts_mode,
3141 share->bg_sts_sync,
3142 2, HA_STATUS_CONST | HA_STATUS_VARIABLE))
3143#else
3144 if (spider_get_sts(share, spider.search_link_idx,
3145 share->bg_sts_try_time, &spider,
3146 share->bg_sts_interval, share->bg_sts_mode,
3147 2, HA_STATUS_CONST | HA_STATUS_VARIABLE))
3148#endif
3149 {
3150/*
3151 if (
3152 share->monitoring_kind[spider.search_link_idx] &&
3153 need_mons[spider.search_link_idx]
3154 ) {
3155 lex_start(thd);
3156 error_num = spider_ping_table_mon_from_table(
3157 trx,
3158 thd,
3159 share,
3160 spider.search_link_idx,
3161 (uint32) share->monitoring_sid[spider.search_link_idx],
3162 share->table_name,
3163 share->table_name_length,
3164 spider.conn_link_idx[spider.search_link_idx],
3165 NULL,
3166 0,
3167 share->monitoring_kind[spider.search_link_idx],
3168 share->monitoring_limit[spider.search_link_idx],
3169 share->monitoring_flag[spider.search_link_idx],
3170 TRUE
3171 );
3172 lex_end(thd->lex);
3173 }
3174*/
3175 spider.search_link_idx = -1;
3176 }
3177 }
3178 }
3179 }
3180 memset(need_mons, 0, sizeof(int) * share->link_count);
3181 share->bg_sts_thd_wait = TRUE;
3182 pthread_cond_wait(&share->bg_sts_cond, &share->sts_mutex);
3183 }
3184}
3185
3186int spider_create_crd_thread(
3187 SPIDER_SHARE *share
3188) {
3189 int error_num;
3190 DBUG_ENTER("spider_create_crd_thread");
3191 if (!share->bg_crd_init)
3192 {
3193#if MYSQL_VERSION_ID < 50500
3194 if (pthread_cond_init(&share->bg_crd_cond, NULL))
3195#else
3196 if (mysql_cond_init(spd_key_cond_bg_crd,
3197 &share->bg_crd_cond, NULL))
3198#endif
3199 {
3200 error_num = HA_ERR_OUT_OF_MEM;
3201 goto error_cond_init;
3202 }
3203#if MYSQL_VERSION_ID < 50500
3204 if (pthread_cond_init(&share->bg_crd_sync_cond, NULL))
3205#else
3206 if (mysql_cond_init(spd_key_cond_bg_crd_sync,
3207 &share->bg_crd_sync_cond, NULL))
3208#endif
3209 {
3210 error_num = HA_ERR_OUT_OF_MEM;
3211 goto error_sync_cond_init;
3212 }
3213#if MYSQL_VERSION_ID < 50500
3214 if (pthread_create(&share->bg_crd_thread, &spider_pt_attr,
3215 spider_bg_crd_action, (void *) share)
3216 )
3217#else
3218 if (mysql_thread_create(spd_key_thd_bg_crd, &share->bg_crd_thread,
3219 &spider_pt_attr, spider_bg_crd_action, (void *) share)
3220 )
3221#endif
3222 {
3223 error_num = HA_ERR_OUT_OF_MEM;
3224 goto error_thread_create;
3225 }
3226 share->bg_crd_init = TRUE;
3227 }
3228 DBUG_RETURN(0);
3229
3230error_thread_create:
3231 pthread_cond_destroy(&share->bg_crd_sync_cond);
3232error_sync_cond_init:
3233 pthread_cond_destroy(&share->bg_crd_cond);
3234error_cond_init:
3235 DBUG_RETURN(error_num);
3236}
3237
3238void spider_free_crd_thread(
3239 SPIDER_SHARE *share
3240) {
3241 DBUG_ENTER("spider_free_crd_thread");
3242 if (share->bg_crd_init)
3243 {
3244 pthread_mutex_lock(&share->crd_mutex);
3245 share->bg_crd_kill = TRUE;
3246 pthread_cond_signal(&share->bg_crd_cond);
3247 pthread_cond_wait(&share->bg_crd_sync_cond, &share->crd_mutex);
3248 pthread_mutex_unlock(&share->crd_mutex);
3249 pthread_join(share->bg_crd_thread, NULL);
3250 pthread_cond_destroy(&share->bg_crd_sync_cond);
3251 pthread_cond_destroy(&share->bg_crd_cond);
3252 share->bg_crd_thd_wait = FALSE;
3253 share->bg_crd_kill = FALSE;
3254 share->bg_crd_init = FALSE;
3255 }
3256 DBUG_VOID_RETURN;
3257}
3258
3259void *spider_bg_crd_action(
3260 void *arg
3261) {
3262 SPIDER_SHARE *share = (SPIDER_SHARE*) arg;
3263 SPIDER_TRX *trx;
3264 int error_num = 0, roop_count;
3265 ha_spider spider;
3266 TABLE table;
3267 int *need_mons;
3268 SPIDER_CONN **conns;
3269 uint *conn_link_idx;
3270 uchar *conn_can_fo;
3271 char **conn_keys;
3272#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
3273 char **hs_r_conn_keys;
3274 char **hs_w_conn_keys;
3275#endif
3276 spider_db_handler **dbton_hdl;
3277 THD *thd;
3278 my_thread_init();
3279 DBUG_ENTER("spider_bg_crd_action");
3280 /* init start */
3281 char *ptr;
3282#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
3283 ptr = (char *) my_alloca(
3284 (sizeof(int) * share->link_count) +
3285 (sizeof(SPIDER_CONN *) * share->link_count) +
3286 (sizeof(uint) * share->link_count) +
3287 (sizeof(uchar) * share->link_bitmap_size) +
3288 (sizeof(char *) * share->link_count) +
3289 (sizeof(char *) * share->link_count) +
3290 (sizeof(char *) * share->link_count) +
3291 (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
3292#else
3293 ptr = (char *) my_alloca(
3294 (sizeof(int) * share->link_count) +
3295 (sizeof(SPIDER_CONN *) * share->link_count) +
3296 (sizeof(uint) * share->link_count) +
3297 (sizeof(uchar) * share->link_bitmap_size) +
3298 (sizeof(char *) * share->link_count) +
3299 (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
3300#endif
3301 if (!ptr)
3302 {
3303 pthread_mutex_lock(&share->crd_mutex);
3304 share->bg_crd_thd_wait = FALSE;
3305 share->bg_crd_kill = FALSE;
3306 share->bg_crd_init = FALSE;
3307 pthread_mutex_unlock(&share->crd_mutex);
3308 my_thread_end();
3309 DBUG_RETURN(NULL);
3310 }
3311 need_mons = (int *) ptr;
3312 ptr += (sizeof(int) * share->link_count);
3313 conns = (SPIDER_CONN **) ptr;
3314 ptr += (sizeof(SPIDER_CONN *) * share->link_count);
3315 conn_link_idx = (uint *) ptr;
3316 ptr += (sizeof(uint) * share->link_count);
3317 conn_can_fo = (uchar *) ptr;
3318 ptr += (sizeof(uchar) * share->link_bitmap_size);
3319 conn_keys = (char **) ptr;
3320 ptr += (sizeof(char *) * share->link_count);
3321#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
3322 hs_r_conn_keys = (char **) ptr;
3323 ptr += (sizeof(char *) * share->link_count);
3324 hs_w_conn_keys = (char **) ptr;
3325 ptr += (sizeof(char *) * share->link_count);
3326#endif
3327 dbton_hdl = (spider_db_handler **) ptr;
3328 pthread_mutex_lock(&share->crd_mutex);
3329 if (!(thd = SPIDER_new_THD(next_thread_id())))
3330 {
3331 share->bg_crd_thd_wait = FALSE;
3332 share->bg_crd_kill = FALSE;
3333 share->bg_crd_init = FALSE;
3334 pthread_mutex_unlock(&share->crd_mutex);
3335 my_thread_end();
3336 my_afree(need_mons);
3337 DBUG_RETURN(NULL);
3338 }
3339 SPIDER_set_next_thread_id(thd);
3340#ifdef HAVE_PSI_INTERFACE
3341 mysql_thread_set_psi_id(thd->thread_id);
3342#endif
3343 thd->thread_stack = (char*) &thd;
3344 thd->store_globals();
3345 if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
3346 {
3347 delete thd;
3348 share->bg_crd_thd_wait = FALSE;
3349 share->bg_crd_kill = FALSE;
3350 share->bg_crd_init = FALSE;
3351 pthread_mutex_unlock(&share->crd_mutex);
3352#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3353 my_pthread_setspecific_ptr(THR_THD, NULL);
3354#endif
3355 my_thread_end();
3356 my_afree(need_mons);
3357 DBUG_RETURN(NULL);
3358 }
3359 share->bg_crd_thd = thd;
3360 table.s = share->table_share;
3361 table.field = share->table_share->field;
3362 table.key_info = share->table_share->key_info;
3363 spider.trx = trx;
3364 spider.change_table_ptr(&table, share->table_share);
3365 spider.share = share;
3366 spider.conns = conns;
3367 spider.conn_link_idx = conn_link_idx;
3368 spider.conn_can_fo = conn_can_fo;
3369 spider.need_mons = need_mons;
3370 spider.conn_keys_first_ptr = share->conn_keys[0];
3371 spider.conn_keys = conn_keys;
3372#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
3373 spider.hs_r_conn_keys = hs_r_conn_keys;
3374 spider.hs_w_conn_keys = hs_w_conn_keys;
3375#endif
3376 spider.dbton_handler = dbton_hdl;
3377 memset(conns, 0, sizeof(SPIDER_CONN *) * share->link_count);
3378 memset(need_mons, 0, sizeof(int) * share->link_count);
3379 memset(dbton_hdl, 0, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE);
3380 spider_trx_set_link_idx_for_all(&spider);
3381 spider.search_link_idx = spider_conn_first_link_idx(thd,
3382 share->link_statuses, share->access_balances, spider.conn_link_idx,
3383 share->link_count, SPIDER_LINK_STATUS_OK);
3384 for (roop_count = 0; roop_count < SPIDER_DBTON_SIZE; roop_count++)
3385 {
3386 if (
3387 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3388 spider_dbton[roop_count].create_db_handler
3389 ) {
3390 if (!(dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
3391 &spider, share->dbton_share[roop_count])))
3392 break;
3393 if (dbton_hdl[roop_count]->init())
3394 break;
3395 }
3396 }
3397 if (roop_count < SPIDER_DBTON_SIZE)
3398 {
3399 DBUG_PRINT("info",("spider handler init error"));
3400 for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
3401 {
3402 if (
3403 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3404 dbton_hdl[roop_count]
3405 ) {
3406 delete dbton_hdl[roop_count];
3407 dbton_hdl[roop_count] = NULL;
3408 }
3409 }
3410 spider_free_trx(trx, TRUE);
3411 delete thd;
3412 share->bg_crd_thd_wait = FALSE;
3413 share->bg_crd_kill = FALSE;
3414 share->bg_crd_init = FALSE;
3415 pthread_mutex_unlock(&share->crd_mutex);
3416#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3417 my_pthread_setspecific_ptr(THR_THD, NULL);
3418#endif
3419 my_thread_end();
3420 my_afree(need_mons);
3421 DBUG_RETURN(NULL);
3422 }
3423 /* init end */
3424
3425 while (TRUE)
3426 {
3427 DBUG_PRINT("info",("spider bg crd roop start"));
3428 if (share->bg_crd_kill)
3429 {
3430 DBUG_PRINT("info",("spider bg crd kill start"));
3431 for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
3432 {
3433 if (
3434 spider_bit_is_set(share->dbton_bitmap, roop_count) &&
3435 dbton_hdl[roop_count]
3436 ) {
3437 delete dbton_hdl[roop_count];
3438 dbton_hdl[roop_count] = NULL;
3439 }
3440 }
3441 spider_free_trx(trx, TRUE);
3442 delete thd;
3443 pthread_cond_signal(&share->bg_crd_sync_cond);
3444 pthread_mutex_unlock(&share->crd_mutex);
3445#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3446 my_pthread_setspecific_ptr(THR_THD, NULL);
3447#endif
3448 my_thread_end();
3449 my_afree(need_mons);
3450 DBUG_RETURN(NULL);
3451 }
3452 if (spider.search_link_idx < 0)
3453 {
3454 spider_trx_set_link_idx_for_all(&spider);
3455/*
3456 spider.search_link_idx = spider_conn_next_link_idx(
3457 thd, share->link_statuses, share->access_balances,
3458 spider.conn_link_idx, spider.search_link_idx, share->link_count,
3459 SPIDER_LINK_STATUS_OK);
3460*/
3461 spider.search_link_idx = spider_conn_first_link_idx(thd,
3462 share->link_statuses, share->access_balances, spider.conn_link_idx,
3463 share->link_count, SPIDER_LINK_STATUS_OK);
3464 }
3465 if (spider.search_link_idx >= 0)
3466 {
3467 if (difftime(share->bg_crd_try_time, share->crd_get_time) >=
3468 share->bg_crd_interval)
3469 {
3470 if (!conns[spider.search_link_idx])
3471 {
3472 spider_get_conn(share, spider.search_link_idx,
3473 share->conn_keys[spider.search_link_idx],
3474 trx, &spider, FALSE, FALSE, SPIDER_CONN_KIND_MYSQL,
3475 &error_num);
3476 conns[spider.search_link_idx]->error_mode = 0;
3477/*
3478 if (
3479 error_num &&
3480 share->monitoring_kind[spider.search_link_idx] &&
3481 need_mons[spider.search_link_idx]
3482 ) {
3483 lex_start(thd);
3484 error_num = spider_ping_table_mon_from_table(
3485 trx,
3486 thd,
3487 share,
3488 spider.search_link_idx,
3489 (uint32) share->monitoring_sid[spider.search_link_idx],
3490 share->table_name,
3491 share->table_name_length,
3492 spider.conn_link_idx[spider.search_link_idx],
3493 NULL,
3494 0,
3495 share->monitoring_kind[spider.search_link_idx],
3496 share->monitoring_limit[spider.search_link_idx],
3497 share->monitoring_flag[spider.search_link_idx],
3498 TRUE
3499 );
3500 lex_end(thd->lex);
3501 }
3502*/
3503 spider.search_link_idx = -1;
3504 }
3505 if (spider.search_link_idx != -1 && conns[spider.search_link_idx])
3506 {
3507#ifdef WITH_PARTITION_STORAGE_ENGINE
3508 if (spider_get_crd(share, spider.search_link_idx,
3509 share->bg_crd_try_time, &spider, &table,
3510 share->bg_crd_interval, share->bg_crd_mode,
3511 share->bg_crd_sync,
3512 2))
3513#else
3514 if (spider_get_crd(share, spider.search_link_idx,
3515 share->bg_crd_try_time, &spider, &table,
3516 share->bg_crd_interval, share->bg_crd_mode,
3517 2))
3518#endif
3519 {
3520/*
3521 if (
3522 share->monitoring_kind[spider.search_link_idx] &&
3523 need_mons[spider.search_link_idx]
3524 ) {
3525 lex_start(thd);
3526 error_num = spider_ping_table_mon_from_table(
3527 trx,
3528 thd,
3529 share,
3530 spider.search_link_idx,
3531 (uint32) share->monitoring_sid[spider.search_link_idx],
3532 share->table_name,
3533 share->table_name_length,
3534 spider.conn_link_idx[spider.search_link_idx],
3535 NULL,
3536 0,
3537 share->monitoring_kind[spider.search_link_idx],
3538 share->monitoring_limit[spider.search_link_idx],
3539 share->monitoring_flag[spider.search_link_idx],
3540 TRUE
3541 );
3542 lex_end(thd->lex);
3543 }
3544*/
3545 spider.search_link_idx = -1;
3546 }
3547 }
3548 }
3549 }
3550 memset(need_mons, 0, sizeof(int) * share->link_count);
3551 share->bg_crd_thd_wait = TRUE;
3552 pthread_cond_wait(&share->bg_crd_cond, &share->crd_mutex);
3553 }
3554}
3555
3556int spider_create_mon_threads(
3557 SPIDER_TRX *trx,
3558 SPIDER_SHARE *share
3559) {
3560 bool create_bg_mons = FALSE;
3561 int error_num, roop_count, roop_count2;
3562 SPIDER_LINK_PACK link_pack;
3563 SPIDER_TABLE_MON_LIST *table_mon_list;
3564 DBUG_ENTER("spider_create_mon_threads");
3565 if (!share->bg_mon_init)
3566 {
3567 for (roop_count = 0; roop_count < (int) share->all_link_count;
3568 roop_count++)
3569 {
3570 if (share->monitoring_bg_kind[roop_count])
3571 {
3572 create_bg_mons = TRUE;
3573 break;
3574 }
3575 }
3576 if (create_bg_mons)
3577 {
3578 char link_idx_str[SPIDER_SQL_INT_LEN];
3579 int link_idx_str_length;
3580 char *buf = (char *) my_alloca(share->table_name_length + SPIDER_SQL_INT_LEN + 1);
3581 spider_string conv_name_str(buf, share->table_name_length +
3582 SPIDER_SQL_INT_LEN + 1, system_charset_info);
3583 conv_name_str.init_calc_mem(105);
3584 conv_name_str.length(0);
3585 conv_name_str.q_append(share->table_name, share->table_name_length);
3586 for (roop_count = 0; roop_count < (int) share->all_link_count;
3587 roop_count++)
3588 {
3589 if (share->monitoring_bg_kind[roop_count])
3590 {
3591 conv_name_str.length(share->table_name_length);
3592 if (share->static_link_ids[roop_count])
3593 {
3594 memcpy(link_idx_str, share->static_link_ids[roop_count],
3595 share->static_link_ids_lengths[roop_count] + 1);
3596 link_idx_str_length = share->static_link_ids_lengths[roop_count];
3597 } else {
3598 link_idx_str_length = my_sprintf(link_idx_str, (link_idx_str,
3599 "%010d", roop_count));
3600 }
3601 conv_name_str.q_append(link_idx_str, link_idx_str_length + 1);
3602 conv_name_str.length(conv_name_str.length() - 1);
3603 if (!(table_mon_list = spider_get_ping_table_mon_list(trx, trx->thd,
3604 &conv_name_str, share->table_name_length, roop_count,
3605 share->static_link_ids[roop_count],
3606 share->static_link_ids_lengths[roop_count],
3607 (uint32) share->monitoring_sid[roop_count], FALSE, &error_num)))
3608 {
3609 my_afree(buf);
3610 goto error_get_ping_table_mon_list;
3611 }
3612 spider_free_ping_table_mon_list(table_mon_list);
3613 }
3614 }
3615 if (!(share->bg_mon_thds = (THD **)
3616 spider_bulk_malloc(spider_current_trx, 23, MYF(MY_WME | MY_ZEROFILL),
3617 &share->bg_mon_thds, sizeof(THD *) * share->all_link_count,
3618 &share->bg_mon_threads, sizeof(pthread_t) * share->all_link_count,
3619 &share->bg_mon_mutexes, sizeof(pthread_mutex_t) *
3620 share->all_link_count,
3621 &share->bg_mon_conds, sizeof(pthread_cond_t) * share->all_link_count,
3622 &share->bg_mon_sleep_conds,
3623 sizeof(pthread_cond_t) * share->all_link_count,
3624 NullS))
3625 ) {
3626 error_num = HA_ERR_OUT_OF_MEM;
3627 my_afree(buf);
3628 goto error_alloc_base;
3629 }
3630 for (roop_count = 0; roop_count < (int) share->all_link_count;
3631 roop_count++)
3632 {
3633 if (
3634 share->monitoring_bg_kind[roop_count] &&
3635#if MYSQL_VERSION_ID < 50500
3636 pthread_mutex_init(&share->bg_mon_mutexes[roop_count],
3637 MY_MUTEX_INIT_FAST)
3638#else
3639 mysql_mutex_init(spd_key_mutex_bg_mon,
3640 &share->bg_mon_mutexes[roop_count], MY_MUTEX_INIT_FAST)
3641#endif
3642 ) {
3643 error_num = HA_ERR_OUT_OF_MEM;
3644 my_afree(buf);
3645 goto error_mutex_init;
3646 }
3647 }
3648 for (roop_count = 0; roop_count < (int) share->all_link_count;
3649 roop_count++)
3650 {
3651 if (
3652 share->monitoring_bg_kind[roop_count] &&
3653#if MYSQL_VERSION_ID < 50500
3654 pthread_cond_init(&share->bg_mon_conds[roop_count], NULL)
3655#else
3656 mysql_cond_init(spd_key_cond_bg_mon,
3657 &share->bg_mon_conds[roop_count], NULL)
3658#endif
3659 ) {
3660 error_num = HA_ERR_OUT_OF_MEM;
3661 my_afree(buf);
3662 goto error_cond_init;
3663 }
3664 }
3665 for (roop_count = 0; roop_count < (int) share->all_link_count;
3666 roop_count++)
3667 {
3668 if (
3669 share->monitoring_bg_kind[roop_count] &&
3670#if MYSQL_VERSION_ID < 50500
3671 pthread_cond_init(&share->bg_mon_sleep_conds[roop_count], NULL)
3672#else
3673 mysql_cond_init(spd_key_cond_bg_mon_sleep,
3674 &share->bg_mon_sleep_conds[roop_count], NULL)
3675#endif
3676 ) {
3677 error_num = HA_ERR_OUT_OF_MEM;
3678 my_afree(buf);
3679 goto error_sleep_cond_init;
3680 }
3681 }
3682 link_pack.share = share;
3683 for (roop_count = 0; roop_count < (int) share->all_link_count;
3684 roop_count++)
3685 {
3686 if (share->monitoring_bg_kind[roop_count])
3687 {
3688 link_pack.link_idx = roop_count;
3689 pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
3690#if MYSQL_VERSION_ID < 50500
3691 if (pthread_create(&share->bg_mon_threads[roop_count],
3692 &spider_pt_attr, spider_bg_mon_action, (void *) &link_pack)
3693 )
3694#else
3695 if (mysql_thread_create(spd_key_thd_bg_mon,
3696 &share->bg_mon_threads[roop_count], &spider_pt_attr,
3697 spider_bg_mon_action, (void *) &link_pack)
3698 )
3699#endif
3700 {
3701 error_num = HA_ERR_OUT_OF_MEM;
3702 my_afree(buf);
3703 goto error_thread_create;
3704 }
3705 pthread_cond_wait(&share->bg_mon_conds[roop_count],
3706 &share->bg_mon_mutexes[roop_count]);
3707 pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
3708 }
3709 }
3710 share->bg_mon_init = TRUE;
3711 my_afree(buf);
3712 }
3713 }
3714 DBUG_RETURN(0);
3715
3716error_thread_create:
3717 roop_count2 = roop_count;
3718 for (roop_count--; roop_count >= 0; roop_count--)
3719 {
3720 if (share->monitoring_bg_kind[roop_count])
3721 pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
3722 }
3723 share->bg_mon_kill = TRUE;
3724 for (roop_count = roop_count2 - 1; roop_count >= 0; roop_count--)
3725 {
3726 if (share->monitoring_bg_kind[roop_count])
3727 {
3728 pthread_cond_wait(&share->bg_mon_conds[roop_count],
3729 &share->bg_mon_mutexes[roop_count]);
3730 pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
3731 }
3732 }
3733 share->bg_mon_kill = FALSE;
3734 roop_count = share->all_link_count;
3735error_sleep_cond_init:
3736 for (roop_count--; roop_count >= 0; roop_count--)
3737 {
3738 if (share->monitoring_bg_kind[roop_count])
3739 pthread_cond_destroy(&share->bg_mon_sleep_conds[roop_count]);
3740 }
3741 roop_count = share->all_link_count;
3742error_cond_init:
3743 for (roop_count--; roop_count >= 0; roop_count--)
3744 {
3745 if (share->monitoring_bg_kind[roop_count])
3746 pthread_cond_destroy(&share->bg_mon_conds[roop_count]);
3747 }
3748 roop_count = share->all_link_count;
3749error_mutex_init:
3750 for (roop_count--; roop_count >= 0; roop_count--)
3751 {
3752 if (share->monitoring_bg_kind[roop_count])
3753 pthread_mutex_destroy(&share->bg_mon_mutexes[roop_count]);
3754 }
3755 spider_free(spider_current_trx, share->bg_mon_thds, MYF(0));
3756error_alloc_base:
3757error_get_ping_table_mon_list:
3758 DBUG_RETURN(error_num);
3759}
3760
3761void spider_free_mon_threads(
3762 SPIDER_SHARE *share
3763) {
3764 int roop_count;
3765 DBUG_ENTER("spider_free_mon_threads");
3766 if (share->bg_mon_init)
3767 {
3768 for (roop_count = 0; roop_count < (int) share->all_link_count;
3769 roop_count++)
3770 {
3771 if (
3772 share->monitoring_bg_kind[roop_count] &&
3773 share->bg_mon_thds[roop_count]
3774 ) {
3775 share->bg_mon_thds[roop_count]->killed = SPIDER_THD_KILL_CONNECTION;
3776 }
3777 }
3778 for (roop_count = 0; roop_count < (int) share->all_link_count;
3779 roop_count++)
3780 {
3781 if (share->monitoring_bg_kind[roop_count])
3782 pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
3783 }
3784 share->bg_mon_kill = TRUE;
3785 for (roop_count = 0; roop_count < (int) share->all_link_count;
3786 roop_count++)
3787 {
3788 if (share->monitoring_bg_kind[roop_count])
3789 {
3790 pthread_cond_signal(&share->bg_mon_sleep_conds[roop_count]);
3791 pthread_cond_wait(&share->bg_mon_conds[roop_count],
3792 &share->bg_mon_mutexes[roop_count]);
3793 pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
3794 pthread_join(share->bg_mon_threads[roop_count], NULL);
3795 pthread_cond_destroy(&share->bg_mon_conds[roop_count]);
3796 pthread_cond_destroy(&share->bg_mon_sleep_conds[roop_count]);
3797 pthread_mutex_destroy(&share->bg_mon_mutexes[roop_count]);
3798 }
3799 }
3800 spider_free(spider_current_trx, share->bg_mon_thds, MYF(0));
3801 share->bg_mon_kill = FALSE;
3802 share->bg_mon_init = FALSE;
3803 }
3804 DBUG_VOID_RETURN;
3805}
3806
3807void *spider_bg_mon_action(
3808 void *arg
3809) {
3810 SPIDER_LINK_PACK *link_pack = (SPIDER_LINK_PACK*) arg;
3811 SPIDER_SHARE *share = link_pack->share;
3812 SPIDER_TRX *trx;
3813 int error_num, link_idx = link_pack->link_idx;
3814 THD *thd;
3815 my_thread_init();
3816 DBUG_ENTER("spider_bg_mon_action");
3817 /* init start */
3818 pthread_mutex_lock(&share->bg_mon_mutexes[link_idx]);
3819 if (!(thd = SPIDER_new_THD(next_thread_id())))
3820 {
3821 share->bg_mon_kill = FALSE;
3822 share->bg_mon_init = FALSE;
3823 pthread_cond_signal(&share->bg_mon_conds[link_idx]);
3824 pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
3825 my_thread_end();
3826 DBUG_RETURN(NULL);
3827 }
3828 SPIDER_set_next_thread_id(thd);
3829#ifdef HAVE_PSI_INTERFACE
3830 mysql_thread_set_psi_id(thd->thread_id);
3831#endif
3832 thd->thread_stack = (char*) &thd;
3833 thd->store_globals();
3834 if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
3835 {
3836 delete thd;
3837 share->bg_mon_kill = FALSE;
3838 share->bg_mon_init = FALSE;
3839 pthread_cond_signal(&share->bg_mon_conds[link_idx]);
3840 pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
3841#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3842 my_pthread_setspecific_ptr(THR_THD, NULL);
3843#endif
3844 my_thread_end();
3845 DBUG_RETURN(NULL);
3846 }
3847 share->bg_mon_thds[link_idx] = thd;
3848 pthread_cond_signal(&share->bg_mon_conds[link_idx]);
3849/*
3850 pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
3851*/
3852 /* init end */
3853
3854 while (TRUE)
3855 {
3856 DBUG_PRINT("info",("spider bg mon sleep %lld",
3857 share->monitoring_bg_interval[link_idx]));
3858 if (!share->bg_mon_kill)
3859 {
3860 struct timespec abstime;
3861 set_timespec_nsec(abstime,
3862 share->monitoring_bg_interval[link_idx] * 1000);
3863 pthread_cond_timedwait(&share->bg_mon_sleep_conds[link_idx],
3864 &share->bg_mon_mutexes[link_idx], &abstime);
3865/*
3866 my_sleep((ulong) share->monitoring_bg_interval[link_idx]);
3867*/
3868 }
3869 DBUG_PRINT("info",("spider bg mon roop start"));
3870 if (share->bg_mon_kill)
3871 {
3872 DBUG_PRINT("info",("spider bg mon kill start"));
3873/*
3874 pthread_mutex_lock(&share->bg_mon_mutexes[link_idx]);
3875*/
3876 pthread_cond_signal(&share->bg_mon_conds[link_idx]);
3877 pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
3878 spider_free_trx(trx, TRUE);
3879 delete thd;
3880#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
3881 my_pthread_setspecific_ptr(THR_THD, NULL);
3882#endif
3883 my_thread_end();
3884 DBUG_RETURN(NULL);
3885 }
3886 if (share->monitoring_bg_kind[link_idx])
3887 {
3888 lex_start(thd);
3889 error_num = spider_ping_table_mon_from_table(
3890 trx,
3891 thd,
3892 share,
3893 link_idx,
3894 (uint32) share->monitoring_sid[link_idx],
3895 share->table_name,
3896 share->table_name_length,
3897 link_idx,
3898 NULL,
3899 0,
3900 share->monitoring_bg_kind[link_idx],
3901 share->monitoring_limit[link_idx],
3902 share->monitoring_bg_flag[link_idx],
3903 TRUE
3904 );
3905 lex_end(thd->lex);
3906 }
3907 }
3908}
3909#endif
3910
3911int spider_conn_first_link_idx(
3912 THD *thd,
3913 long *link_statuses,
3914 long *access_balances,
3915 uint *conn_link_idx,
3916 int link_count,
3917 int link_status
3918) {
3919 int roop_count, active_links = 0;
3920 longlong balance_total = 0, balance_val;
3921 double rand_val;
3922 int *link_idxs, link_idx;
3923 long *balances;
3924 DBUG_ENTER("spider_conn_first_link_idx");
3925 char *ptr;
3926 ptr = (char *) my_alloca((sizeof(int) * link_count) + (sizeof(long) * link_count));
3927 if (!ptr)
3928 {
3929 DBUG_PRINT("info",("spider out of memory"));
3930 DBUG_RETURN(-2);
3931 }
3932 link_idxs = (int *) ptr;
3933 ptr += sizeof(int) * link_count;
3934 balances = (long *) ptr;
3935 for (roop_count = 0; roop_count < link_count; roop_count++)
3936 {
3937 DBUG_ASSERT((conn_link_idx[roop_count] - roop_count) % link_count == 0);
3938 if (link_statuses[conn_link_idx[roop_count]] <= link_status)
3939 {
3940 link_idxs[active_links] = roop_count;
3941 balances[active_links] = access_balances[roop_count];
3942 balance_total += access_balances[roop_count];
3943 active_links++;
3944 }
3945 }
3946
3947 if (active_links == 0)
3948 {
3949 DBUG_PRINT("info",("spider all links are failed"));
3950 my_afree(link_idxs);
3951 DBUG_RETURN(-1);
3952 }
3953#if defined(MARIADB_BASE_VERSION) && MYSQL_VERSION_ID >= 100002
3954 DBUG_PRINT("info",("spider server_id=%lu", thd->variables.server_id));
3955#else
3956 DBUG_PRINT("info",("spider server_id=%u", thd->server_id));
3957#endif
3958 DBUG_PRINT("info",("spider thread_id=%lu", thd_get_thread_id(thd)));
3959#if defined(MARIADB_BASE_VERSION) && MYSQL_VERSION_ID >= 100002
3960 rand_val = spider_rand(thd->variables.server_id + thd_get_thread_id(thd));
3961#else
3962 rand_val = spider_rand(thd->server_id + thd_get_thread_id(thd));
3963#endif
3964 DBUG_PRINT("info",("spider rand_val=%f", rand_val));
3965 balance_val = (longlong) (rand_val * balance_total);
3966 DBUG_PRINT("info",("spider balance_val=%lld", balance_val));
3967 for (roop_count = 0; roop_count < active_links - 1; roop_count++)
3968 {
3969 DBUG_PRINT("info",("spider balances[%d]=%ld",
3970 roop_count, balances[roop_count]));
3971 if (balance_val < balances[roop_count])
3972 break;
3973 balance_val -= balances[roop_count];
3974 }
3975
3976 DBUG_PRINT("info",("spider first link_idx=%d", link_idxs[roop_count]));
3977 link_idx = link_idxs[roop_count];
3978 my_afree(link_idxs);
3979 DBUG_RETURN(link_idx);
3980}
3981
3982int spider_conn_next_link_idx(
3983 THD *thd,
3984 long *link_statuses,
3985 long *access_balances,
3986 uint *conn_link_idx,
3987 int link_idx,
3988 int link_count,
3989 int link_status
3990) {
3991 int tmp_link_idx;
3992 DBUG_ENTER("spider_conn_next_link_idx");
3993 DBUG_ASSERT((conn_link_idx[link_idx] - link_idx) % link_count == 0);
3994 tmp_link_idx = spider_conn_first_link_idx(thd, link_statuses,
3995 access_balances, conn_link_idx, link_count, link_status);
3996 if (
3997 tmp_link_idx >= 0 &&
3998 tmp_link_idx == link_idx
3999 ) {
4000 do {
4001 tmp_link_idx++;
4002 if (tmp_link_idx >= link_count)
4003 tmp_link_idx = 0;
4004 if (tmp_link_idx == link_idx)
4005 break;
4006 } while (link_statuses[conn_link_idx[tmp_link_idx]] > link_status);
4007 DBUG_PRINT("info",("spider next link_idx=%d", tmp_link_idx));
4008 DBUG_RETURN(tmp_link_idx);
4009 }
4010 DBUG_PRINT("info",("spider next link_idx=%d", tmp_link_idx));
4011 DBUG_RETURN(tmp_link_idx);
4012}
4013
4014int spider_conn_link_idx_next(
4015 long *link_statuses,
4016 uint *conn_link_idx,
4017 int link_idx,
4018 int link_count,
4019 int link_status
4020) {
4021 DBUG_ENTER("spider_conn_link_idx_next");
4022 do {
4023 link_idx++;
4024 if (link_idx >= link_count)
4025 break;
4026 DBUG_ASSERT((conn_link_idx[link_idx] - link_idx) % link_count == 0);
4027 } while (link_statuses[conn_link_idx[link_idx]] > link_status);
4028 DBUG_PRINT("info",("spider link_idx=%d", link_idx));
4029 DBUG_RETURN(link_idx);
4030}
4031
4032int spider_conn_get_link_status(
4033 long *link_statuses,
4034 uint *conn_link_idx,
4035 int link_idx
4036) {
4037 DBUG_ENTER("spider_conn_get_link_status");
4038 DBUG_PRINT("info",("spider link_status=%d",
4039 (int) link_statuses[conn_link_idx[link_idx]]));
4040 DBUG_RETURN((int) link_statuses[conn_link_idx[link_idx]]);
4041}
4042
4043int spider_conn_lock_mode(
4044 ha_spider *spider
4045) {
4046 SPIDER_RESULT_LIST *result_list = &spider->result_list;
4047 DBUG_ENTER("spider_conn_lock_mode");
4048 if (result_list->lock_type == F_WRLCK || spider->lock_mode == 2)
4049 DBUG_RETURN(SPIDER_LOCK_MODE_EXCLUSIVE);
4050 else if (spider->lock_mode == 1)
4051 DBUG_RETURN(SPIDER_LOCK_MODE_SHARED);
4052 DBUG_RETURN(SPIDER_LOCK_MODE_NO_LOCK);
4053}
4054
4055bool spider_conn_check_recovery_link(
4056 SPIDER_SHARE *share
4057) {
4058 int roop_count;
4059 DBUG_ENTER("spider_check_recovery_link");
4060 for (roop_count = 0; roop_count < (int) share->link_count; roop_count++)
4061 {
4062 if (share->link_statuses[roop_count] == SPIDER_LINK_STATUS_RECOVERY)
4063 DBUG_RETURN(TRUE);
4064 }
4065 DBUG_RETURN(FALSE);
4066}
4067
4068bool spider_conn_use_handler(
4069 ha_spider *spider,
4070 int lock_mode,
4071 int link_idx
4072) {
4073 THD *thd = spider->trx->thd;
4074 int use_handler = spider_param_use_handler(thd,
4075 spider->share->use_handlers[link_idx]);
4076 DBUG_ENTER("spider_conn_use_handler");
4077 DBUG_PRINT("info",("spider use_handler=%d", use_handler));
4078 DBUG_PRINT("info",("spider spider->conn_kind[link_idx]=%u",
4079 spider->conn_kind[link_idx]));
4080#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4081 if (spider->conn_kind[link_idx] != SPIDER_CONN_KIND_MYSQL)
4082 {
4083 DBUG_PRINT("info",("spider TRUE by HS"));
4084 spider->sql_kinds |= SPIDER_SQL_KIND_HS;
4085 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HS;
4086#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
4087 if (
4088 spider->do_direct_update &&
4089 spider_bit_is_set(spider->do_hs_direct_update, link_idx)
4090 ) {
4091 DBUG_PRINT("info",("spider using HS direct_update"));
4092 spider->direct_update_kinds |= SPIDER_SQL_KIND_HS;
4093 }
4094#endif
4095 DBUG_RETURN(TRUE);
4096 }
4097#endif
4098#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
4099 if (spider->do_direct_update)
4100 {
4101 spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
4102 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
4103#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4104 if (spider_bit_is_set(spider->do_hs_direct_update, link_idx))
4105 {
4106 spider->direct_update_kinds |= SPIDER_SQL_KIND_HS;
4107 DBUG_PRINT("info",("spider TRUE by using HS direct_update"));
4108 DBUG_RETURN(TRUE);
4109 } else {
4110#endif
4111 spider->direct_update_kinds |= SPIDER_SQL_KIND_SQL;
4112#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4113 }
4114 if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
4115 {
4116#endif
4117 DBUG_PRINT("info",("spider FALSE by using direct_update"));
4118 DBUG_RETURN(FALSE);
4119#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4120 } else {
4121 DBUG_PRINT("info",("spider TRUE by using BOTH"));
4122 DBUG_RETURN(TRUE);
4123 }
4124#endif
4125 }
4126#endif
4127 if (spider->use_spatial_index)
4128 {
4129 DBUG_PRINT("info",("spider FALSE by use_spatial_index"));
4130 spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
4131 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
4132 DBUG_RETURN(FALSE);
4133 }
4134 uint dbton_id;
4135 spider_db_handler *dbton_hdl;
4136 dbton_id = spider->share->sql_dbton_ids[spider->conn_link_idx[link_idx]];
4137 dbton_hdl = spider->dbton_handler[dbton_id];
4138 if (!dbton_hdl->support_use_handler(use_handler))
4139 {
4140 DBUG_PRINT("info",("spider FALSE by dbton"));
4141 spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
4142 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
4143 DBUG_RETURN(FALSE);
4144 }
4145 if (
4146 spider->sql_command == SQLCOM_HA_READ &&
4147 (
4148 !(use_handler & 2) ||
4149 (
4150 spider_param_sync_trx_isolation(thd) &&
4151 thd_tx_isolation(thd) == ISO_SERIALIZABLE
4152 )
4153 )
4154 ) {
4155 DBUG_PRINT("info",("spider TRUE by HA"));
4156 spider->sql_kinds |= SPIDER_SQL_KIND_HANDLER;
4157 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HANDLER;
4158 DBUG_RETURN(TRUE);
4159 }
4160 if (
4161 spider->sql_command != SQLCOM_HA_READ &&
4162 lock_mode == SPIDER_LOCK_MODE_NO_LOCK &&
4163 spider_param_sync_trx_isolation(thd) &&
4164 thd_tx_isolation(thd) != ISO_SERIALIZABLE &&
4165 (use_handler & 1)
4166 ) {
4167 DBUG_PRINT("info",("spider TRUE by PARAM"));
4168 spider->sql_kinds |= SPIDER_SQL_KIND_HANDLER;
4169 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HANDLER;
4170 DBUG_RETURN(TRUE);
4171 }
4172 spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
4173 spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
4174 DBUG_RETURN(FALSE);
4175}
4176
4177bool spider_conn_need_open_handler(
4178 ha_spider *spider,
4179 uint idx,
4180 int link_idx
4181) {
4182#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4183 SPIDER_CONN *conn;
4184#endif
4185 DBUG_ENTER("spider_conn_need_open_handler");
4186 DBUG_PRINT("info",("spider spider=%p", spider));
4187 if (spider->handler_opened(link_idx, spider->conn_kind[link_idx]))
4188 {
4189#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4190#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
4191 if (
4192 spider->do_direct_update &&
4193 spider_bit_is_set(spider->do_hs_direct_update, link_idx)
4194 ) {
4195 conn = spider->hs_w_conns[link_idx];
4196 if (
4197 !conn->server_lost &&
4198 conn->hs_pre_age == spider->hs_w_conn_ages[link_idx]
4199 ) {
4200 DBUG_PRINT("info",("spider hs_write is already opened"));
4201 DBUG_RETURN(FALSE);
4202 }
4203 } else
4204#endif
4205 if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
4206 {
4207#endif
4208 DBUG_PRINT("info",("spider HA already opened"));
4209 DBUG_RETURN(FALSE);
4210#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
4211 } else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_READ)
4212 {
4213 DBUG_PRINT("info",("spider r_handler_index[%d]=%d",
4214 link_idx, spider->r_handler_index[link_idx]));
4215 DBUG_PRINT("info",("spider idx=%d", idx));
4216 DBUG_PRINT("info",("spider hs_pushed_ret_fields_num=%zu",
4217 spider->hs_pushed_ret_fields_num));
4218 DBUG_PRINT("info",("spider hs_r_ret_fields_num[%d]=%lu",
4219 link_idx, spider->hs_r_ret_fields_num[link_idx]));
4220 DBUG_PRINT("info",("spider hs_r_ret_fields[%d]=%p",
4221 link_idx, spider->hs_r_ret_fields[link_idx]));
4222#ifndef DBUG_OFF
4223 if (
4224 spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
4225 spider->hs_r_ret_fields[link_idx] &&
4226 spider->hs_pushed_ret_fields_num ==
4227 spider->hs_r_ret_fields_num[link_idx]
4228 ) {
4229 int roop_count;
4230 for (roop_count = 0; roop_count < (int) spider->hs_pushed_ret_fields_num;
4231 ++roop_count)
4232 {
4233 DBUG_PRINT("info",("spider hs_pushed_ret_fields[%d]=%u",
4234 roop_count, spider->hs_pushed_ret_fields[roop_count]));
4235 DBUG_PRINT("info",("spider hs_r_ret_fields[%d][%d]=%u",
4236 link_idx, roop_count,
4237 spider->hs_r_ret_fields[link_idx][roop_count]));
4238 }
4239 }
4240#endif
4241 if (
4242 spider->r_handler_index[link_idx] == idx
4243#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
4244 && (
4245 (
4246 spider->hs_pushed_ret_fields_num == MAX_FIELDS &&
4247 spider->hs_r_ret_fields_num[link_idx] == MAX_FIELDS
4248 ) ||
4249 (
4250 spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
4251 spider->hs_r_ret_fields[link_idx] &&
4252 spider->hs_pushed_ret_fields_num ==
4253 spider->hs_r_ret_fields_num[link_idx] &&
4254 !memcmp(spider->hs_pushed_ret_fields,
4255 spider->hs_r_ret_fields[link_idx],
4256 sizeof(uint32) * spider->hs_pushed_ret_fields_num)
4257 )
4258 )
4259#endif
4260 ) {
4261 SPIDER_CONN *conn = spider->hs_r_conns[link_idx];
4262 DBUG_PRINT("info",("spider conn=%p", conn));
4263 DBUG_PRINT("info",("spider conn->conn_id=%llu", conn->conn_id));
4264 DBUG_PRINT("info",("spider conn->connection_id=%llu",
4265 conn->connection_id));
4266 DBUG_PRINT("info",("spider conn->server_lost=%s",
4267 conn->server_lost ? "TRUE" : "FALSE"));
4268 DBUG_PRINT("info",("spider conn->hs_pre_age=%llu", conn->hs_pre_age));
4269 DBUG_PRINT("info",("spider hs_w_conn_ages[%d]=%llu",
4270 link_idx, spider->hs_w_conn_ages[link_idx]));
4271 if (
4272 !conn->server_lost &&
4273 conn->hs_pre_age == spider->hs_r_conn_ages[link_idx]
4274 ) {
4275 DBUG_PRINT("info",("spider hs_r same idx"));
4276 DBUG_RETURN(FALSE);
4277 }
4278 }
4279 } else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_WRITE)
4280 {
4281 DBUG_PRINT("info",("spider w_handler_index[%d]=%d",
4282 link_idx, spider->w_handler_index[link_idx]));
4283 DBUG_PRINT("info",("spider idx=%d", idx));
4284 DBUG_PRINT("info",("spider hs_pushed_ret_fields_num=%zu",
4285 spider->hs_pushed_ret_fields_num));
4286 DBUG_PRINT("info",("spider hs_w_ret_fields_num[%d]=%lu",
4287 link_idx, spider->hs_w_ret_fields_num[link_idx]));
4288 DBUG_PRINT("info",("spider hs_w_ret_fields[%d]=%p",
4289 link_idx, spider->hs_w_ret_fields[link_idx]));
4290#ifndef DBUG_OFF
4291 if (
4292 spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
4293 spider->hs_w_ret_fields[link_idx] &&
4294 spider->hs_pushed_ret_fields_num ==
4295 spider->hs_w_ret_fields_num[link_idx]
4296 ) {
4297 int roop_count;
4298 for (roop_count = 0; roop_count < (int) spider->hs_pushed_ret_fields_num;
4299 ++roop_count)
4300 {
4301 DBUG_PRINT("info",("spider hs_pushed_ret_fields[%d]=%u",
4302 roop_count, spider->hs_pushed_ret_fields[roop_count]));
4303 DBUG_PRINT("info",("spider hs_w_ret_fields[%d][%d]=%u",
4304 link_idx, roop_count,
4305 spider->hs_w_ret_fields[link_idx][roop_count]));
4306 }
4307 }
4308#endif
4309 if (
4310 spider->w_handler_index[link_idx] == idx
4311#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
4312 && (
4313 (
4314 spider->hs_pushed_ret_fields_num == MAX_FIELDS &&
4315 spider->hs_w_ret_fields_num[link_idx] == MAX_FIELDS
4316 ) ||
4317 (
4318 spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
4319 spider->hs_w_ret_fields[link_idx] &&
4320 spider->hs_pushed_ret_fields_num ==
4321 spider->hs_w_ret_fields_num[link_idx] &&
4322 !memcmp(spider->hs_pushed_ret_fields,
4323 spider->hs_w_ret_fields[link_idx],
4324 sizeof(uint32) * spider->hs_pushed_ret_fields_num)
4325 )
4326 )
4327#endif
4328 ) {
4329 SPIDER_CONN *conn = spider->hs_w_conns[link_idx];
4330 DBUG_PRINT("info",("spider conn=%p", conn));
4331 DBUG_PRINT("info",("spider conn->conn_id=%llu", conn->conn_id));
4332 DBUG_PRINT("info",("spider conn->connection_id=%llu",
4333 conn->connection_id));
4334 DBUG_PRINT("info",("spider conn->server_lost=%s",
4335 conn->server_lost ? "TRUE" : "FALSE"));
4336 DBUG_PRINT("info",("spider conn->hs_pre_age=%llu", conn->hs_pre_age));
4337 DBUG_PRINT("info",("spider hs_w_conn_ages[%d]=%llu",
4338 link_idx, spider->hs_w_conn_ages[link_idx]));
4339 if (
4340 !conn->server_lost &&
4341 conn->hs_pre_age == spider->hs_w_conn_ages[link_idx]
4342 ) {
4343 DBUG_PRINT("info",("spider hs_w same idx"));
4344 DBUG_RETURN(FALSE);
4345 }
4346 }
4347 }
4348#endif
4349 }
4350 DBUG_RETURN(TRUE);
4351}
4352
4353SPIDER_CONN* spider_get_conn_from_idle_connection(
4354 SPIDER_SHARE *share,
4355 int link_idx,
4356 char *conn_key,
4357 ha_spider *spider,
4358 uint conn_kind,
4359 int base_link_idx,
4360 int *error_num
4361 )
4362{
4363 DBUG_ENTER("spider_get_conn_from_idle_connection");
4364 SPIDER_IP_PORT_CONN *ip_port_conn;
4365 SPIDER_CONN *conn = NULL;
4366 uint spider_max_connections = spider_param_max_connections();
4367 struct timespec abstime;
4368 ulonglong start, inter_val = 0;
4369 longlong last_ntime = 0;
4370 ulonglong wait_time = (ulonglong)spider_param_conn_wait_timeout()*1000*1000*1000; // default 10s
4371
4372 unsigned long ip_port_count = 0; // init 0
4373
4374 set_timespec(abstime, 0);
4375
4376 pthread_mutex_lock(&spider_ipport_conn_mutex);
4377#ifdef SPIDER_HAS_HASH_VALUE_TYPE
4378 if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search_using_hash_value(
4379 &spider_ipport_conns, share->conn_keys_hash_value[link_idx],
4380 (uchar*) share->conn_keys[link_idx], share->conn_keys_lengths[link_idx])))
4381#else
4382 if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search(
4383 &spider_ipport_conns,
4384 (uchar*) share->conn_keys[link_idx], share->conn_keys_lengths[link_idx])))
4385#endif
4386 { /* exists */
4387 pthread_mutex_unlock(&spider_ipport_conn_mutex);
4388 pthread_mutex_lock(&ip_port_conn->mutex);
4389 ip_port_count = ip_port_conn->ip_port_count;
4390 } else {
4391 pthread_mutex_unlock(&spider_ipport_conn_mutex);
4392 }
4393
4394 if (
4395 ip_port_conn &&
4396 ip_port_count >= spider_max_connections &&
4397 spider_max_connections > 0
4398 ) { /* no idle conn && enable connection pool, wait */
4399 pthread_mutex_unlock(&ip_port_conn->mutex);
4400 start = my_hrtime().val;
4401 while(1)
4402 {
4403 int error;
4404 inter_val = my_hrtime().val - start; // us
4405 last_ntime = wait_time - inter_val*1000; // *1000, to ns
4406 if(last_ntime <= 0)
4407 {/* wait timeout */
4408 *error_num = ER_SPIDER_CON_COUNT_ERROR;
4409 DBUG_RETURN(NULL);
4410 }
4411 set_timespec_nsec(abstime, last_ntime);
4412 pthread_mutex_lock(&ip_port_conn->mutex);
4413 ++ip_port_conn->waiting_count;
4414 error = pthread_cond_timedwait(&ip_port_conn->cond, &ip_port_conn->mutex, &abstime);
4415 --ip_port_conn->waiting_count;
4416 pthread_mutex_unlock(&ip_port_conn->mutex);
4417 if (error == ETIMEDOUT || error == ETIME || error != 0 )
4418 {
4419 *error_num = ER_SPIDER_CON_COUNT_ERROR;
4420 DBUG_RETURN(NULL);
4421 }
4422
4423 pthread_mutex_lock(&spider_conn_mutex);
4424#ifdef SPIDER_HAS_HASH_VALUE_TYPE
4425 if ((conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
4426 &spider_open_connections, share->conn_keys_hash_value[link_idx],
4427 (uchar*) share->conn_keys[link_idx],
4428 share->conn_keys_lengths[link_idx])))
4429#else
4430 if ((conn = (SPIDER_CONN*) my_hash_search(&spider_open_connections,
4431 (uchar*) share->conn_keys[link_idx],
4432 share->conn_keys_lengths[link_idx])))
4433#endif
4434 {
4435 /* get conn from spider_open_connections, then delete conn in spider_open_connections */
4436#ifdef HASH_UPDATE_WITH_HASH_VALUE
4437 my_hash_delete_with_hash_value(&spider_open_connections,
4438 conn->conn_key_hash_value, (uchar*) conn);
4439#else
4440 my_hash_delete(&spider_open_connections, (uchar*) conn);
4441#endif
4442 pthread_mutex_unlock(&spider_conn_mutex);
4443 DBUG_PRINT("info",("spider get global conn"));
4444 if (spider)
4445 {
4446 spider->conns[base_link_idx] = conn;
4447 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
4448 conn->use_for_active_standby = TRUE;
4449 }
4450 DBUG_RETURN(conn);
4451 }
4452 else
4453 {
4454 pthread_mutex_unlock(&spider_conn_mutex);
4455 }
4456 }
4457 }
4458 else
4459 { /* create conn */
4460 if (ip_port_conn)
4461 pthread_mutex_unlock(&ip_port_conn->mutex);
4462 DBUG_PRINT("info",("spider create new conn"));
4463 if (!(conn = spider_create_conn(share, spider, link_idx, base_link_idx, conn_kind, error_num)))
4464 DBUG_RETURN(conn);
4465 *conn->conn_key = *conn_key;
4466 if (spider)
4467 {
4468 spider->conns[base_link_idx] = conn;
4469 if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
4470 conn->use_for_active_standby = TRUE;
4471 }
4472 }
4473
4474 DBUG_RETURN(conn);
4475}
4476
4477
4478SPIDER_IP_PORT_CONN* spider_create_ipport_conn(SPIDER_CONN *conn)
4479{
4480 DBUG_ENTER("spider_create_ipport_conn");
4481 if (conn)
4482 {
4483 SPIDER_IP_PORT_CONN *ret = (SPIDER_IP_PORT_CONN *) my_malloc(sizeof(*ret), MY_ZEROFILL | MY_WME);
4484 if (!ret)
4485 {
4486 goto err_return_direct;
4487 }
4488
4489#if MYSQL_VERSION_ID < 50500
4490 if (pthread_mutex_init(&ret->mutex, MY_MUTEX_INIT_FAST))
4491#else
4492 if (mysql_mutex_init(spd_key_mutex_conn_i, &ret->mutex, MY_MUTEX_INIT_FAST))
4493#endif
4494 {
4495 //error
4496 goto err_malloc_key;
4497 }
4498
4499#if MYSQL_VERSION_ID < 50500
4500 if (pthread_cond_init(&ret->cond, NULL))
4501#else
4502 if (mysql_cond_init(spd_key_cond_conn_i, &ret->cond, NULL))
4503#endif
4504 {
4505 pthread_mutex_destroy(&ret->mutex);
4506 goto err_malloc_key;
4507 //error
4508 }
4509
4510 ret->key_len = conn->conn_key_length;
4511 if (ret->key_len <= 0) {
4512 pthread_cond_destroy(&ret->cond);
4513 pthread_mutex_destroy(&ret->mutex);
4514 goto err_malloc_key;
4515 }
4516
4517 ret->key = (char *) my_malloc(ret->key_len, MY_ZEROFILL | MY_WME);
4518 if (!ret->key) {
4519 pthread_cond_destroy(&ret->cond);
4520 pthread_mutex_destroy(&ret->mutex);
4521 goto err_malloc_key;
4522 }
4523
4524 memcpy(ret->key, conn->conn_key, ret->key_len);
4525
4526 strncpy(ret->remote_ip_str, conn->tgt_host, sizeof(ret->remote_ip_str));
4527 ret->remote_port = conn->tgt_port;
4528 ret->conn_id = conn->conn_id;
4529 ret->ip_port_count = 1; // init
4530
4531#ifdef SPIDER_HAS_HASH_VALUE_TYPE
4532 ret->key_hash_value = conn->conn_key_hash_value;
4533#endif
4534 DBUG_RETURN(ret);
4535err_malloc_key:
4536 spider_my_free(ret, MYF(0));
4537err_return_direct:
4538 DBUG_RETURN(NULL);
4539 }
4540 DBUG_RETURN(NULL);
4541}
4542
4543
4544void spider_free_ipport_conn(void *info)
4545{
4546 DBUG_ENTER("spider_free_ipport_conn");
4547 if (info)
4548 {
4549 SPIDER_IP_PORT_CONN *p = (SPIDER_IP_PORT_CONN *)info;
4550 pthread_cond_destroy(&p->cond);
4551 pthread_mutex_destroy(&p->mutex);
4552 spider_my_free(p->key, MYF(0));
4553 spider_my_free(p, MYF(0));
4554 }
4555 DBUG_VOID_RETURN;
4556}
4557