1 | /* Copyright (C) 2012 Monty Program Ab |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ |
15 | |
16 | #include "mariadb.h" |
17 | #include <violite.h> |
18 | #include <sql_priv.h> |
19 | #include <sql_class.h> |
20 | #include <my_pthread.h> |
21 | #include <scheduler.h> |
22 | #include <sql_connect.h> |
23 | #include <sql_audit.h> |
24 | #include <debug_sync.h> |
25 | #include <threadpool.h> |
26 | |
27 | |
28 | /* Threadpool parameters */ |
29 | |
30 | uint threadpool_min_threads; |
31 | uint threadpool_idle_timeout; |
32 | uint threadpool_size; |
33 | uint threadpool_max_size; |
34 | uint threadpool_stall_limit; |
35 | uint threadpool_max_threads; |
36 | uint threadpool_oversubscribe; |
37 | uint threadpool_mode; |
38 | uint threadpool_prio_kickup_timer; |
39 | |
40 | /* Stats */ |
41 | TP_STATISTICS tp_stats; |
42 | |
43 | |
44 | static void threadpool_remove_connection(THD *thd); |
45 | static int threadpool_process_request(THD *thd); |
46 | static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data); |
47 | |
48 | extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); |
49 | extern bool do_command(THD*); |
50 | |
51 | static inline TP_connection *get_TP_connection(THD *thd) |
52 | { |
53 | return (TP_connection *)thd->event_scheduler.data; |
54 | } |
55 | |
56 | /* |
57 | Worker threads contexts, and THD contexts. |
58 | ========================================= |
59 | |
60 | Both worker threads and connections have their sets of thread local variables |
61 | At the moment it is mysys_var (this has specific data for dbug, my_error and |
62 | similar goodies), and PSI per-client structure. |
63 | |
64 | Whenever query is executed following needs to be done: |
65 | |
66 | 1. Save worker thread context. |
67 | 2. Change TLS variables to connection specific ones using thread_attach(THD*). |
68 | This function does some additional work , e.g setting up |
69 | thread_stack/thread_ends_here pointers. |
70 | 3. Process query |
71 | 4. Restore worker thread context. |
72 | |
73 | Connection login and termination follows similar schema w.r.t saving and |
74 | restoring contexts. |
75 | |
76 | For both worker thread, and for the connection, mysys variables are created |
77 | using my_thread_init() and freed with my_thread_end(). |
78 | |
79 | */ |
80 | struct Worker_thread_context |
81 | { |
82 | PSI_thread *psi_thread; |
83 | st_my_thread_var* mysys_var; |
84 | |
85 | void save() |
86 | { |
87 | psi_thread = PSI_CALL_get_thread(); |
88 | mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); |
89 | } |
90 | |
91 | void restore() |
92 | { |
93 | PSI_CALL_set_thread(psi_thread); |
94 | pthread_setspecific(THR_KEY_mysys,mysys_var); |
95 | pthread_setspecific(THR_THD, 0); |
96 | } |
97 | }; |
98 | |
99 | |
100 | #ifdef HAVE_PSI_INTERFACE |
101 | |
102 | /* |
103 | The following fixes PSI "idle" psi instrumentation. |
104 | The server assumes that connection becomes idle |
105 | just before net_read_packet() and switches to active after it. |
106 | In out setup, server becomes idle when async socket io is made. |
107 | */ |
108 | |
109 | extern void (struct st_net *net, void *user_data, size_t); |
110 | |
111 | static void (struct st_net *, void *, size_t) |
112 | { |
113 | } |
114 | |
115 | static void re_init_net_server_extension(THD *thd) |
116 | { |
117 | thd->m_net_server_extension.m_before_header = dummy_before_header; |
118 | } |
119 | |
120 | #else |
121 | |
122 | #define re_init_net_server_extension(thd) |
123 | |
124 | #endif /* HAVE_PSI_INTERFACE */ |
125 | |
126 | |
127 | static inline void set_thd_idle(THD *thd) |
128 | { |
129 | thd->net.reading_or_writing= 1; |
130 | #ifdef HAVE_PSI_INTERFACE |
131 | net_before_header_psi(&thd->net, thd, 0); |
132 | #endif |
133 | } |
134 | |
135 | /* |
136 | Attach/associate the connection with the OS thread, |
137 | */ |
138 | static void thread_attach(THD* thd) |
139 | { |
140 | pthread_setspecific(THR_KEY_mysys,thd->mysys_var); |
141 | thd->thread_stack=(char*)&thd; |
142 | thd->store_globals(); |
143 | PSI_CALL_set_thread(thd->event_scheduler.m_psi); |
144 | mysql_socket_set_thread_owner(thd->net.vio->mysql_socket); |
145 | } |
146 | |
147 | /* |
148 | Determine connection priority , using current |
149 | transaction state and 'threadpool_priority' variable value. |
150 | */ |
151 | static TP_PRIORITY get_priority(TP_connection *c) |
152 | { |
153 | DBUG_ASSERT(c->thd == current_thd); |
154 | TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority; |
155 | if (prio == TP_PRIORITY_AUTO) |
156 | { |
157 | return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW; |
158 | } |
159 | return prio; |
160 | } |
161 | |
162 | |
163 | void tp_callback(TP_connection *c) |
164 | { |
165 | DBUG_ASSERT(c); |
166 | |
167 | Worker_thread_context worker_context; |
168 | worker_context.save(); |
169 | |
170 | THD *thd= c->thd; |
171 | |
172 | c->state = TP_STATE_RUNNING; |
173 | |
174 | if (unlikely(!thd)) |
175 | { |
176 | /* No THD, need to login first. */ |
177 | DBUG_ASSERT(c->connect); |
178 | thd= c->thd= threadpool_add_connection(c->connect, c); |
179 | if (!thd) |
180 | { |
181 | /* Bail out on connect error.*/ |
182 | goto error; |
183 | } |
184 | c->connect= 0; |
185 | } |
186 | else if (threadpool_process_request(thd)) |
187 | { |
188 | /* QUIT or an error occured. */ |
189 | goto error; |
190 | } |
191 | |
192 | /* Set priority */ |
193 | c->priority= get_priority(c); |
194 | |
195 | /* Read next command from client. */ |
196 | c->set_io_timeout(thd->get_net_wait_timeout()); |
197 | c->state= TP_STATE_IDLE; |
198 | if (c->start_io()) |
199 | goto error; |
200 | |
201 | worker_context.restore(); |
202 | return; |
203 | |
204 | error: |
205 | c->thd= 0; |
206 | delete c; |
207 | |
208 | if (thd) |
209 | { |
210 | threadpool_remove_connection(thd); |
211 | } |
212 | worker_context.restore(); |
213 | } |
214 | |
215 | |
216 | static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) |
217 | { |
218 | THD *thd= NULL; |
219 | |
220 | /* |
221 | Create a new connection context: mysys_thread_var and PSI thread |
222 | Store them in THD. |
223 | */ |
224 | |
225 | pthread_setspecific(THR_KEY_mysys, 0); |
226 | my_thread_init(); |
227 | st_my_thread_var* mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); |
228 | if (!mysys_var ||!(thd= connect->create_thd(NULL))) |
229 | { |
230 | /* Out of memory? */ |
231 | connect->close_and_delete(); |
232 | if (mysys_var) |
233 | { |
234 | #ifdef HAVE_PSI_INTERFACE |
235 | /* |
236 | current PSI is still from worker thread. |
237 | Set to 0, to avoid premature cleanup by my_thread_end |
238 | */ |
239 | if (PSI_server) PSI_server->set_thread(0); |
240 | #endif |
241 | my_thread_end(); |
242 | } |
243 | return NULL; |
244 | } |
245 | delete connect; |
246 | add_to_active_threads(thd); |
247 | thd->set_mysys_var(mysys_var); |
248 | thd->event_scheduler.data= scheduler_data; |
249 | |
250 | /* Create new PSI thread for use with the THD. */ |
251 | thd->event_scheduler.m_psi= |
252 | PSI_CALL_new_thread(key_thread_one_connection, thd, thd->thread_id); |
253 | |
254 | |
255 | /* Login. */ |
256 | thread_attach(thd); |
257 | re_init_net_server_extension(thd); |
258 | ulonglong now= microsecond_interval_timer(); |
259 | thd->prior_thr_create_utime= now; |
260 | thd->start_utime= now; |
261 | thd->thr_create_utime= now; |
262 | |
263 | if (setup_connection_thread_globals(thd)) |
264 | goto end; |
265 | |
266 | if (thd_prepare_connection(thd)) |
267 | goto end; |
268 | |
269 | /* |
270 | Check if THD is ok, as prepare_new_connection_state() |
271 | can fail, for example if init command failed. |
272 | */ |
273 | if (!thd_is_connection_alive(thd)) |
274 | goto end; |
275 | |
276 | thd->skip_wait_timeout= true; |
277 | set_thd_idle(thd); |
278 | return thd; |
279 | |
280 | end: |
281 | threadpool_remove_connection(thd); |
282 | return NULL; |
283 | } |
284 | |
285 | |
286 | static void threadpool_remove_connection(THD *thd) |
287 | { |
288 | thread_attach(thd); |
289 | thd->event_scheduler.data= 0; |
290 | thd->net.reading_or_writing = 0; |
291 | end_connection(thd); |
292 | close_connection(thd, 0); |
293 | unlink_thd(thd); |
294 | delete thd; |
295 | |
296 | /* |
297 | Free resources associated with this connection: |
298 | mysys thread_var and PSI thread. |
299 | */ |
300 | my_thread_end(); |
301 | } |
302 | |
303 | |
304 | /* |
305 | Ensure that proper error message is sent to client, |
306 | and "aborted" message appears in the log in case of |
307 | wait timeout. |
308 | |
309 | See also timeout handling in net_serv.cc |
310 | */ |
311 | static void handle_wait_timeout(THD *thd) |
312 | { |
313 | thd->get_stmt_da()->reset_diagnostics_area(); |
314 | thd->reset_killed(); |
315 | my_error(ER_NET_READ_INTERRUPTED, MYF(0)); |
316 | thd->net.last_errno= ER_NET_READ_INTERRUPTED; |
317 | thd->net.error= 2; |
318 | } |
319 | |
320 | |
321 | /** |
322 | Process a single client request or a single batch. |
323 | */ |
324 | static int threadpool_process_request(THD *thd) |
325 | { |
326 | int retval= 0; |
327 | thread_attach(thd); |
328 | |
329 | if (thd->killed >= KILL_CONNECTION) |
330 | { |
331 | /* |
332 | killed flag was set by timeout handler |
333 | or KILL command. Return error. |
334 | */ |
335 | retval= 1; |
336 | if(thd->killed == KILL_WAIT_TIMEOUT) |
337 | handle_wait_timeout(thd); |
338 | goto end; |
339 | } |
340 | |
341 | |
342 | /* |
343 | In the loop below, the flow is essentially the copy of |
344 | thead-per-connections |
345 | logic, see do_handle_one_connection() in sql_connect.c |
346 | |
347 | The goal is to execute a single query, thus the loop is normally executed |
348 | only once. However for SSL connections, it can be executed multiple times |
349 | (SSL can preread and cache incoming data, and vio->has_data() checks if it |
350 | was the case). |
351 | */ |
352 | for(;;) |
353 | { |
354 | Vio *vio; |
355 | thd->net.reading_or_writing= 0; |
356 | mysql_audit_release(thd); |
357 | |
358 | if ((retval= do_command(thd)) != 0) |
359 | goto end; |
360 | |
361 | if (!thd_is_connection_alive(thd)) |
362 | { |
363 | retval= 1; |
364 | goto end; |
365 | } |
366 | |
367 | set_thd_idle(thd); |
368 | |
369 | vio= thd->net.vio; |
370 | if (!vio->has_data(vio)) |
371 | { |
372 | /* More info on this debug sync is in sql_parse.cc*/ |
373 | DEBUG_SYNC(thd, "before_do_command_net_read" ); |
374 | goto end; |
375 | } |
376 | } |
377 | |
378 | end: |
379 | return retval; |
380 | } |
381 | |
382 | |
383 | |
384 | /* Dummy functions, do nothing */ |
385 | |
386 | static bool tp_init_new_connection_thread() |
387 | { |
388 | return 0; |
389 | } |
390 | |
391 | static bool tp_end_thread(THD *, bool) |
392 | { |
393 | return 0; |
394 | } |
395 | |
396 | static TP_pool *pool; |
397 | |
398 | static bool tp_init() |
399 | { |
400 | |
401 | #ifdef _WIN32 |
402 | if (threadpool_mode == TP_MODE_WINDOWS) |
403 | pool= new (std::nothrow) TP_pool_win; |
404 | else |
405 | pool= new (std::nothrow) TP_pool_generic; |
406 | #else |
407 | pool= new (std::nothrow) TP_pool_generic; |
408 | #endif |
409 | if (!pool) |
410 | return true; |
411 | if (pool->init()) |
412 | { |
413 | delete pool; |
414 | pool= 0; |
415 | return true; |
416 | } |
417 | return false; |
418 | } |
419 | |
420 | static void tp_add_connection(CONNECT *connect) |
421 | { |
422 | TP_connection *c= pool->new_connection(connect); |
423 | DBUG_EXECUTE_IF("simulate_failed_connection_1" , delete c ; c= 0;); |
424 | if (c) |
425 | pool->add(c); |
426 | else |
427 | connect->close_and_delete(); |
428 | } |
429 | |
430 | int tp_get_idle_thread_count() |
431 | { |
432 | return pool? pool->get_idle_thread_count(): 0; |
433 | } |
434 | |
435 | int tp_get_thread_count() |
436 | { |
437 | return pool ? pool->get_thread_count() : 0; |
438 | } |
439 | |
440 | void tp_set_min_threads(uint val) |
441 | { |
442 | if (pool) |
443 | pool->set_min_threads(val); |
444 | } |
445 | |
446 | |
447 | void tp_set_max_threads(uint val) |
448 | { |
449 | if (pool) |
450 | pool->set_max_threads(val); |
451 | } |
452 | |
453 | void tp_set_threadpool_size(uint val) |
454 | { |
455 | if (pool) |
456 | pool->set_pool_size(val); |
457 | } |
458 | |
459 | |
460 | void tp_set_threadpool_stall_limit(uint val) |
461 | { |
462 | if (pool) |
463 | pool->set_stall_limit(val); |
464 | } |
465 | |
466 | |
467 | void tp_timeout_handler(TP_connection *c) |
468 | { |
469 | if (c->state != TP_STATE_IDLE) |
470 | return; |
471 | THD *thd=c->thd; |
472 | mysql_mutex_lock(&thd->LOCK_thd_kill); |
473 | thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT); |
474 | c->priority= TP_PRIORITY_HIGH; |
475 | post_kill_notification(thd); |
476 | mysql_mutex_unlock(&thd->LOCK_thd_kill); |
477 | } |
478 | |
479 | |
480 | static void tp_wait_begin(THD *thd, int type) |
481 | { |
482 | TP_connection *c = get_TP_connection(thd); |
483 | if (c) |
484 | c->wait_begin(type); |
485 | } |
486 | |
487 | |
488 | static void tp_wait_end(THD *thd) |
489 | { |
490 | TP_connection *c = get_TP_connection(thd); |
491 | if (c) |
492 | c->wait_end(); |
493 | } |
494 | |
495 | |
496 | static void tp_end() |
497 | { |
498 | delete pool; |
499 | } |
500 | |
501 | static void tp_post_kill_notification(THD *thd) |
502 | { |
503 | TP_connection *c= get_TP_connection(thd); |
504 | if (c) |
505 | c->priority= TP_PRIORITY_HIGH; |
506 | post_kill_notification(thd); |
507 | } |
508 | |
509 | static scheduler_functions tp_scheduler_functions= |
510 | { |
511 | 0, // max_threads |
512 | NULL, |
513 | NULL, |
514 | tp_init, // init |
515 | tp_init_new_connection_thread, // init_new_connection_thread |
516 | tp_add_connection, // add_connection |
517 | tp_wait_begin, // thd_wait_begin |
518 | tp_wait_end, // thd_wait_end |
519 | tp_post_kill_notification, // post kill notification |
520 | tp_end_thread, // Dummy function |
521 | tp_end // end |
522 | }; |
523 | |
524 | void pool_of_threads_scheduler(struct scheduler_functions *func, |
525 | ulong *arg_max_connections, |
526 | uint *arg_connection_count) |
527 | { |
528 | *func = tp_scheduler_functions; |
529 | func->max_threads= threadpool_max_threads; |
530 | func->max_connections= arg_max_connections; |
531 | func->connection_count= arg_connection_count; |
532 | scheduler_init(); |
533 | } |
534 | |