1/* Copyright (C) 2012 Monty Program Ab
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16#include "mariadb.h"
17#include <violite.h>
18#include <sql_priv.h>
19#include <sql_class.h>
20#include <my_pthread.h>
21#include <scheduler.h>
22#include <sql_connect.h>
23#include <sql_audit.h>
24#include <debug_sync.h>
25#include <threadpool.h>
26
27
28/* Threadpool parameters */
29
30uint threadpool_min_threads;
31uint threadpool_idle_timeout;
32uint threadpool_size;
33uint threadpool_max_size;
34uint threadpool_stall_limit;
35uint threadpool_max_threads;
36uint threadpool_oversubscribe;
37uint threadpool_mode;
38uint threadpool_prio_kickup_timer;
39
40/* Stats */
41TP_STATISTICS tp_stats;
42
43
44static void threadpool_remove_connection(THD *thd);
45static int threadpool_process_request(THD *thd);
46static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
47
48extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
49extern bool do_command(THD*);
50
51static inline TP_connection *get_TP_connection(THD *thd)
52{
53 return (TP_connection *)thd->event_scheduler.data;
54}
55
56/*
57 Worker threads contexts, and THD contexts.
58 =========================================
59
60 Both worker threads and connections have their sets of thread local variables
61 At the moment it is mysys_var (this has specific data for dbug, my_error and
62 similar goodies), and PSI per-client structure.
63
64 Whenever query is executed following needs to be done:
65
66 1. Save worker thread context.
67 2. Change TLS variables to connection specific ones using thread_attach(THD*).
68 This function does some additional work , e.g setting up
69 thread_stack/thread_ends_here pointers.
70 3. Process query
71 4. Restore worker thread context.
72
73 Connection login and termination follows similar schema w.r.t saving and
74 restoring contexts.
75
76 For both worker thread, and for the connection, mysys variables are created
77 using my_thread_init() and freed with my_thread_end().
78
79*/
80struct Worker_thread_context
81{
82 PSI_thread *psi_thread;
83 st_my_thread_var* mysys_var;
84
85 void save()
86 {
87 psi_thread = PSI_CALL_get_thread();
88 mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
89 }
90
91 void restore()
92 {
93 PSI_CALL_set_thread(psi_thread);
94 pthread_setspecific(THR_KEY_mysys,mysys_var);
95 pthread_setspecific(THR_THD, 0);
96 }
97};
98
99
100#ifdef HAVE_PSI_INTERFACE
101
102/*
103 The following fixes PSI "idle" psi instrumentation.
104 The server assumes that connection becomes idle
105 just before net_read_packet() and switches to active after it.
106 In out setup, server becomes idle when async socket io is made.
107*/
108
109extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
110
111static void dummy_before_header(struct st_net *, void *, size_t)
112{
113}
114
115static void re_init_net_server_extension(THD *thd)
116{
117 thd->m_net_server_extension.m_before_header = dummy_before_header;
118}
119
120#else
121
122#define re_init_net_server_extension(thd)
123
124#endif /* HAVE_PSI_INTERFACE */
125
126
127static inline void set_thd_idle(THD *thd)
128{
129 thd->net.reading_or_writing= 1;
130#ifdef HAVE_PSI_INTERFACE
131 net_before_header_psi(&thd->net, thd, 0);
132#endif
133}
134
135/*
136 Attach/associate the connection with the OS thread,
137*/
138static void thread_attach(THD* thd)
139{
140 pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
141 thd->thread_stack=(char*)&thd;
142 thd->store_globals();
143 PSI_CALL_set_thread(thd->event_scheduler.m_psi);
144 mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
145}
146
147/*
148 Determine connection priority , using current
149 transaction state and 'threadpool_priority' variable value.
150*/
151static TP_PRIORITY get_priority(TP_connection *c)
152{
153 DBUG_ASSERT(c->thd == current_thd);
154 TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
155 if (prio == TP_PRIORITY_AUTO)
156 {
157 return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
158 }
159 return prio;
160}
161
162
163void tp_callback(TP_connection *c)
164{
165 DBUG_ASSERT(c);
166
167 Worker_thread_context worker_context;
168 worker_context.save();
169
170 THD *thd= c->thd;
171
172 c->state = TP_STATE_RUNNING;
173
174 if (unlikely(!thd))
175 {
176 /* No THD, need to login first. */
177 DBUG_ASSERT(c->connect);
178 thd= c->thd= threadpool_add_connection(c->connect, c);
179 if (!thd)
180 {
181 /* Bail out on connect error.*/
182 goto error;
183 }
184 c->connect= 0;
185 }
186 else if (threadpool_process_request(thd))
187 {
188 /* QUIT or an error occured. */
189 goto error;
190 }
191
192 /* Set priority */
193 c->priority= get_priority(c);
194
195 /* Read next command from client. */
196 c->set_io_timeout(thd->get_net_wait_timeout());
197 c->state= TP_STATE_IDLE;
198 if (c->start_io())
199 goto error;
200
201 worker_context.restore();
202 return;
203
204error:
205 c->thd= 0;
206 delete c;
207
208 if (thd)
209 {
210 threadpool_remove_connection(thd);
211 }
212 worker_context.restore();
213}
214
215
216static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
217{
218 THD *thd= NULL;
219
220 /*
221 Create a new connection context: mysys_thread_var and PSI thread
222 Store them in THD.
223 */
224
225 pthread_setspecific(THR_KEY_mysys, 0);
226 my_thread_init();
227 st_my_thread_var* mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
228 if (!mysys_var ||!(thd= connect->create_thd(NULL)))
229 {
230 /* Out of memory? */
231 connect->close_and_delete();
232 if (mysys_var)
233 {
234#ifdef HAVE_PSI_INTERFACE
235 /*
236 current PSI is still from worker thread.
237 Set to 0, to avoid premature cleanup by my_thread_end
238 */
239 if (PSI_server) PSI_server->set_thread(0);
240#endif
241 my_thread_end();
242 }
243 return NULL;
244 }
245 delete connect;
246 add_to_active_threads(thd);
247 thd->set_mysys_var(mysys_var);
248 thd->event_scheduler.data= scheduler_data;
249
250 /* Create new PSI thread for use with the THD. */
251 thd->event_scheduler.m_psi=
252 PSI_CALL_new_thread(key_thread_one_connection, thd, thd->thread_id);
253
254
255 /* Login. */
256 thread_attach(thd);
257 re_init_net_server_extension(thd);
258 ulonglong now= microsecond_interval_timer();
259 thd->prior_thr_create_utime= now;
260 thd->start_utime= now;
261 thd->thr_create_utime= now;
262
263 if (setup_connection_thread_globals(thd))
264 goto end;
265
266 if (thd_prepare_connection(thd))
267 goto end;
268
269 /*
270 Check if THD is ok, as prepare_new_connection_state()
271 can fail, for example if init command failed.
272 */
273 if (!thd_is_connection_alive(thd))
274 goto end;
275
276 thd->skip_wait_timeout= true;
277 set_thd_idle(thd);
278 return thd;
279
280end:
281 threadpool_remove_connection(thd);
282 return NULL;
283}
284
285
286static void threadpool_remove_connection(THD *thd)
287{
288 thread_attach(thd);
289 thd->event_scheduler.data= 0;
290 thd->net.reading_or_writing = 0;
291 end_connection(thd);
292 close_connection(thd, 0);
293 unlink_thd(thd);
294 delete thd;
295
296 /*
297 Free resources associated with this connection:
298 mysys thread_var and PSI thread.
299 */
300 my_thread_end();
301}
302
303
304/*
305 Ensure that proper error message is sent to client,
306 and "aborted" message appears in the log in case of
307 wait timeout.
308
309 See also timeout handling in net_serv.cc
310*/
311static void handle_wait_timeout(THD *thd)
312{
313 thd->get_stmt_da()->reset_diagnostics_area();
314 thd->reset_killed();
315 my_error(ER_NET_READ_INTERRUPTED, MYF(0));
316 thd->net.last_errno= ER_NET_READ_INTERRUPTED;
317 thd->net.error= 2;
318}
319
320
321/**
322 Process a single client request or a single batch.
323*/
324static int threadpool_process_request(THD *thd)
325{
326 int retval= 0;
327 thread_attach(thd);
328
329 if (thd->killed >= KILL_CONNECTION)
330 {
331 /*
332 killed flag was set by timeout handler
333 or KILL command. Return error.
334 */
335 retval= 1;
336 if(thd->killed == KILL_WAIT_TIMEOUT)
337 handle_wait_timeout(thd);
338 goto end;
339 }
340
341
342 /*
343 In the loop below, the flow is essentially the copy of
344 thead-per-connections
345 logic, see do_handle_one_connection() in sql_connect.c
346
347 The goal is to execute a single query, thus the loop is normally executed
348 only once. However for SSL connections, it can be executed multiple times
349 (SSL can preread and cache incoming data, and vio->has_data() checks if it
350 was the case).
351 */
352 for(;;)
353 {
354 Vio *vio;
355 thd->net.reading_or_writing= 0;
356 mysql_audit_release(thd);
357
358 if ((retval= do_command(thd)) != 0)
359 goto end;
360
361 if (!thd_is_connection_alive(thd))
362 {
363 retval= 1;
364 goto end;
365 }
366
367 set_thd_idle(thd);
368
369 vio= thd->net.vio;
370 if (!vio->has_data(vio))
371 {
372 /* More info on this debug sync is in sql_parse.cc*/
373 DEBUG_SYNC(thd, "before_do_command_net_read");
374 goto end;
375 }
376 }
377
378end:
379 return retval;
380}
381
382
383
384/* Dummy functions, do nothing */
385
386static bool tp_init_new_connection_thread()
387{
388 return 0;
389}
390
391static bool tp_end_thread(THD *, bool)
392{
393 return 0;
394}
395
396static TP_pool *pool;
397
398static bool tp_init()
399{
400
401#ifdef _WIN32
402 if (threadpool_mode == TP_MODE_WINDOWS)
403 pool= new (std::nothrow) TP_pool_win;
404 else
405 pool= new (std::nothrow) TP_pool_generic;
406#else
407 pool= new (std::nothrow) TP_pool_generic;
408#endif
409 if (!pool)
410 return true;
411 if (pool->init())
412 {
413 delete pool;
414 pool= 0;
415 return true;
416 }
417 return false;
418}
419
420static void tp_add_connection(CONNECT *connect)
421{
422 TP_connection *c= pool->new_connection(connect);
423 DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
424 if (c)
425 pool->add(c);
426 else
427 connect->close_and_delete();
428}
429
430int tp_get_idle_thread_count()
431{
432 return pool? pool->get_idle_thread_count(): 0;
433}
434
435int tp_get_thread_count()
436{
437 return pool ? pool->get_thread_count() : 0;
438}
439
440void tp_set_min_threads(uint val)
441{
442 if (pool)
443 pool->set_min_threads(val);
444}
445
446
447void tp_set_max_threads(uint val)
448{
449 if (pool)
450 pool->set_max_threads(val);
451}
452
453void tp_set_threadpool_size(uint val)
454{
455 if (pool)
456 pool->set_pool_size(val);
457}
458
459
460void tp_set_threadpool_stall_limit(uint val)
461{
462 if (pool)
463 pool->set_stall_limit(val);
464}
465
466
467void tp_timeout_handler(TP_connection *c)
468{
469 if (c->state != TP_STATE_IDLE)
470 return;
471 THD *thd=c->thd;
472 mysql_mutex_lock(&thd->LOCK_thd_kill);
473 thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
474 c->priority= TP_PRIORITY_HIGH;
475 post_kill_notification(thd);
476 mysql_mutex_unlock(&thd->LOCK_thd_kill);
477}
478
479
480static void tp_wait_begin(THD *thd, int type)
481{
482 TP_connection *c = get_TP_connection(thd);
483 if (c)
484 c->wait_begin(type);
485}
486
487
488static void tp_wait_end(THD *thd)
489{
490 TP_connection *c = get_TP_connection(thd);
491 if (c)
492 c->wait_end();
493}
494
495
496static void tp_end()
497{
498 delete pool;
499}
500
501static void tp_post_kill_notification(THD *thd)
502{
503 TP_connection *c= get_TP_connection(thd);
504 if (c)
505 c->priority= TP_PRIORITY_HIGH;
506 post_kill_notification(thd);
507}
508
509static scheduler_functions tp_scheduler_functions=
510{
511 0, // max_threads
512 NULL,
513 NULL,
514 tp_init, // init
515 tp_init_new_connection_thread, // init_new_connection_thread
516 tp_add_connection, // add_connection
517 tp_wait_begin, // thd_wait_begin
518 tp_wait_end, // thd_wait_end
519 tp_post_kill_notification, // post kill notification
520 tp_end_thread, // Dummy function
521 tp_end // end
522};
523
524void pool_of_threads_scheduler(struct scheduler_functions *func,
525 ulong *arg_max_connections,
526 uint *arg_connection_count)
527{
528 *func = tp_scheduler_functions;
529 func->max_threads= threadpool_max_threads;
530 func->max_connections= arg_max_connections;
531 func->connection_count= arg_connection_count;
532 scheduler_init();
533}
534