1/* Copyright (C) 2012 Monty Program Ab
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16#include "mariadb.h"
17#include <violite.h>
18#include <sql_priv.h>
19#include <sql_class.h>
20#include <my_pthread.h>
21#include <scheduler.h>
22
23#ifdef HAVE_POOL_OF_THREADS
24
25#ifdef _WIN32
26/* AIX may define this, too ?*/
27#define HAVE_IOCP
28#endif
29
30#ifdef HAVE_IOCP
31#define OPTIONAL_IO_POLL_READ_PARAM this
32#else
33#define OPTIONAL_IO_POLL_READ_PARAM 0
34#endif
35
36#ifdef _WIN32
37typedef HANDLE TP_file_handle;
38#else
39typedef int TP_file_handle;
40#define INVALID_HANDLE_VALUE -1
41#endif
42
43
44#include <sql_connect.h>
45#include <mysqld.h>
46#include <debug_sync.h>
47#include <time.h>
48#include <sql_plist.h>
49#include <threadpool.h>
50#include <time.h>
51#ifdef __linux__
52#include <sys/epoll.h>
53typedef struct epoll_event native_event;
54#elif defined(HAVE_KQUEUE)
55#include <sys/event.h>
56typedef struct kevent native_event;
57#elif defined (__sun)
58#include <port.h>
59typedef port_event_t native_event;
60#elif defined (HAVE_IOCP)
61typedef OVERLAPPED_ENTRY native_event;
62#else
63#error threadpool is not available on this platform
64#endif
65
66
67static void io_poll_close(TP_file_handle fd)
68{
69#ifdef _WIN32
70 CloseHandle(fd);
71#else
72 close(fd);
73#endif
74}
75
76
77/** Maximum number of native events a listener can read in one go */
78#define MAX_EVENTS 1024
79
80/** Indicates that threadpool was initialized*/
81static bool threadpool_started= false;
82
83/*
84 Define PSI Keys for performance schema.
85 We have a mutex per group, worker threads, condition per worker thread,
86 and timer thread with its own mutex and condition.
87*/
88
89
90#ifdef HAVE_PSI_INTERFACE
91static PSI_mutex_key key_group_mutex;
92static PSI_mutex_key key_timer_mutex;
93static PSI_mutex_info mutex_list[]=
94{
95 { &key_group_mutex, "group_mutex", 0},
96 { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
97};
98
99static PSI_cond_key key_worker_cond;
100static PSI_cond_key key_timer_cond;
101static PSI_cond_info cond_list[]=
102{
103 { &key_worker_cond, "worker_cond", 0},
104 { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
105};
106
107static PSI_thread_key key_worker_thread;
108static PSI_thread_key key_timer_thread;
109static PSI_thread_info thread_list[] =
110{
111 {&key_worker_thread, "worker_thread", 0},
112 {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
113};
114
115/* Macro to simplify performance schema registration */
116#define PSI_register(X) \
117 if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
118#else
119#define PSI_register(X) /* no-op */
120#endif
121
122
123struct thread_group_t;
124
125/* Per-thread structure for workers */
126struct worker_thread_t
127{
128 ulonglong event_count; /* number of request handled by this thread */
129 thread_group_t* thread_group;
130 worker_thread_t *next_in_list;
131 worker_thread_t **prev_in_list;
132
133 mysql_cond_t cond;
134 bool woken;
135};
136
137typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
138 &worker_thread_t::next_in_list,
139 &worker_thread_t::prev_in_list>
140 >
141worker_list_t;
142
143struct TP_connection_generic:public TP_connection
144{
145 TP_connection_generic(CONNECT *c);
146 ~TP_connection_generic();
147
148 virtual int init(){ return 0; };
149 virtual void set_io_timeout(int sec);
150 virtual int start_io();
151 virtual void wait_begin(int type);
152 virtual void wait_end();
153
154 thread_group_t *thread_group;
155 TP_connection_generic *next_in_queue;
156 TP_connection_generic **prev_in_queue;
157 ulonglong abs_wait_timeout;
158 ulonglong dequeue_time;
159 TP_file_handle fd;
160 bool bound_to_poll_descriptor;
161 int waiting;
162#ifdef HAVE_IOCP
163 OVERLAPPED overlapped;
164#endif
165#ifdef _WIN32
166 enum_vio_type vio_type;
167#endif
168};
169
170
171typedef I_P_List<TP_connection_generic,
172 I_P_List_adapter<TP_connection_generic,
173 &TP_connection_generic::next_in_queue,
174 &TP_connection_generic::prev_in_queue>,
175 I_P_List_null_counter,
176 I_P_List_fast_push_back<TP_connection_generic> >
177connection_queue_t;
178
179const int NQUEUES=2; /* We have high and low priority queues*/
180
181struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
182{
183 mysql_mutex_t mutex;
184 connection_queue_t queues[NQUEUES];
185 worker_list_t waiting_threads;
186 worker_thread_t *listener;
187 pthread_attr_t *pthread_attr;
188 TP_file_handle pollfd;
189 int thread_count;
190 int active_thread_count;
191 int connection_count;
192 /* Stats for the deadlock detection timer routine.*/
193 int io_event_count;
194 int queue_event_count;
195 ulonglong last_thread_creation_time;
196 int shutdown_pipe[2];
197 bool shutdown;
198 bool stalled;
199};
200
201static thread_group_t *all_groups;
202static uint group_count;
203static int32 shutdown_group_count;
204
205/**
206 Used for printing "pool blocked" message, see
207 print_pool_blocked_message();
208*/
209static ulonglong pool_block_start;
210
211/* Global timer for all groups */
212struct pool_timer_t
213{
214 mysql_mutex_t mutex;
215 mysql_cond_t cond;
216 volatile uint64 current_microtime;
217 volatile uint64 next_timeout_check;
218 int tick_interval;
219 bool shutdown;
220 pthread_t timer_thread_id;
221};
222
223static pool_timer_t pool_timer;
224
225static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
226static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
227static int wake_thread(thread_group_t *thread_group);
228static int wake_or_create_thread(thread_group_t *thread_group);
229static int create_worker(thread_group_t *thread_group);
230static void *worker_main(void *param);
231static void check_stall(thread_group_t *thread_group);
232static void set_next_timeout_check(ulonglong abstime);
233static void print_pool_blocked_message(bool);
234
235/**
236 Asynchronous network IO.
237
238 We use native edge-triggered network IO multiplexing facility.
239 This maps to different APIs on different Unixes.
240
241 Supported are currently Linux with epoll, Solaris with event ports,
242 OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
243 (the event is signalled once client has written something into the socket,
244 then socket is removed from the "poll-set" until the command is finished,
245 and we need to re-arm/re-register socket)
246
247 No implementation for poll/select is currently provided.
248
249 The API closely resembles all of the above mentioned platform APIs
250 and consists of following functions.
251
252 - io_poll_create()
253 Creates an io_poll descriptor
254 On Linux: epoll_create()
255
256 - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
257 Associate file descriptor with io poll descriptor
258 On Linux : epoll_ctl(..EPOLL_CTL_ADD))
259
260 - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
261 Associate file descriptor with io poll descriptor
262 On Linux: epoll_ctl(..EPOLL_CTL_DEL)
263
264
265 - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
266 The same as io_poll_associate_fd(), but cannot be used before
267 io_poll_associate_fd() was called.
268 On Linux : epoll_ctl(..EPOLL_CTL_MOD)
269
270 - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
271 int timeout_ms)
272
273 wait until one or more descriptors added with io_poll_associate_fd()
274 or io_poll_start_read() becomes readable. Data associated with
275 descriptors can be retrieved from native_events array, using
276 native_event_get_userdata() function.
277
278
279 On Linux: epoll_wait()
280*/
281
282#if defined (__linux__)
283#ifndef EPOLLRDHUP
284/* Early 2.6 kernel did not have EPOLLRDHUP */
285#define EPOLLRDHUP 0
286#endif
287static TP_file_handle io_poll_create()
288{
289 return epoll_create(1);
290}
291
292
293int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
294{
295 struct epoll_event ev;
296 ev.data.u64= 0; /* Keep valgrind happy */
297 ev.data.ptr= data;
298 ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
299 return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev);
300}
301
302
303
304int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
305{
306 struct epoll_event ev;
307 ev.data.u64= 0; /* Keep valgrind happy */
308 ev.data.ptr= data;
309 ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
310 return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
311}
312
313int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
314{
315 struct epoll_event ev;
316 return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
317}
318
319
320/*
321 Wrapper around epoll_wait.
322 NOTE - in case of EINTR, it restarts with original timeout. Since we use
323 either infinite or 0 timeouts, this is not critical
324*/
325int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
326 int timeout_ms)
327{
328 int ret;
329 do
330 {
331 ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
332 }
333 while(ret == -1 && errno == EINTR);
334 return ret;
335}
336
337
338static void *native_event_get_userdata(native_event *event)
339{
340 return event->data.ptr;
341}
342
343#elif defined(HAVE_KQUEUE)
344
345/*
346 NetBSD is incompatible with other BSDs , last parameter in EV_SET macro
347 (udata, user data) needs to be intptr_t, whereas it needs to be void*
348 everywhere else.
349*/
350
351#ifdef __NetBSD__
352#define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g)
353#else
354#define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g)
355#endif
356
357
358TP_file_handle io_poll_create()
359{
360 return kqueue();
361}
362
363int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
364{
365 struct kevent ke;
366 MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
367 0, 0, data);
368 return kevent(pollfd, &ke, 1, 0, 0, 0);
369}
370
371
372int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
373{
374 struct kevent ke;
375 MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
376 0, 0, data);
377 return io_poll_start_read(pollfd,fd, data, 0);
378}
379
380
381int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
382{
383 struct kevent ke;
384 MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
385 return kevent(pollfd, &ke, 1, 0, 0, 0);
386}
387
388
389int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
390{
391 struct timespec ts;
392 int ret;
393 if (timeout_ms >= 0)
394 {
395 ts.tv_sec= timeout_ms/1000;
396 ts.tv_nsec= (timeout_ms%1000)*1000000;
397 }
398 do
399 {
400 ret= kevent(pollfd, 0, 0, events, maxevents,
401 (timeout_ms >= 0)?&ts:NULL);
402 }
403 while (ret == -1 && errno == EINTR);
404 return ret;
405}
406
407static void* native_event_get_userdata(native_event *event)
408{
409 return (void *)event->udata;
410}
411
412#elif defined (__sun)
413
414static TP_file_handle io_poll_create()
415{
416 return port_create();
417}
418
419int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
420{
421 return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
422}
423
424static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
425{
426 return io_poll_start_read(pollfd, fd, data, 0);
427}
428
429int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
430{
431 return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
432}
433
434int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
435{
436 struct timespec ts;
437 int ret;
438 uint_t nget= 1;
439 if (timeout_ms >= 0)
440 {
441 ts.tv_sec= timeout_ms/1000;
442 ts.tv_nsec= (timeout_ms%1000)*1000000;
443 }
444 do
445 {
446 ret= port_getn(pollfd, events, maxevents, &nget,
447 (timeout_ms >= 0)?&ts:NULL);
448 }
449 while (ret == -1 && errno == EINTR);
450 DBUG_ASSERT(nget < INT_MAX);
451 return (int)nget;
452}
453
454static void* native_event_get_userdata(native_event *event)
455{
456 return event->portev_user;
457}
458
459#elif defined(HAVE_IOCP)
460
461
462static TP_file_handle io_poll_create()
463{
464 return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
465}
466
467
468int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
469{
470 static char c;
471 TP_connection_generic *con= (TP_connection_generic *)opt;
472 OVERLAPPED *overlapped= &con->overlapped;
473 if (con->vio_type == VIO_TYPE_NAMEDPIPE)
474 {
475 if (ReadFile(fd, &c, 0, NULL, overlapped))
476 return 0;
477 }
478 else
479 {
480 WSABUF buf;
481 buf.buf= &c;
482 buf.len= 0;
483 DWORD flags=0;
484
485 if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
486 return 0;
487 }
488
489 if (GetLastError() == ERROR_IO_PENDING)
490 return 0;
491
492 return 1;
493}
494
495
496static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
497{
498 HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
499 if (!h)
500 return -1;
501 return io_poll_start_read(pollfd,fd, 0, opt);
502}
503
504
505int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
506{
507 /* Not possible to unbind/rebind file descriptor in IOCP. */
508 return 0;
509}
510
511
512int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
513{
514 ULONG n;
515 BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
516 maxevents, &n, timeout_ms, FALSE);
517
518 return ok ? (int)n : -1;
519}
520
521
522static void* native_event_get_userdata(native_event *event)
523{
524 return (void *)event->lpCompletionKey;
525}
526#endif
527
528
529/* Dequeue element from a workqueue */
530
531static TP_connection_generic *queue_get(thread_group_t *thread_group)
532{
533 DBUG_ENTER("queue_get");
534 thread_group->queue_event_count++;
535 TP_connection_generic *c;
536 for (int i=0; i < NQUEUES;i++)
537 {
538 c= thread_group->queues[i].pop_front();
539 if (c)
540 DBUG_RETURN(c);
541 }
542 DBUG_RETURN(0);
543}
544
545static bool is_queue_empty(thread_group_t *thread_group)
546{
547 for (int i=0; i < NQUEUES; i++)
548 {
549 if (!thread_group->queues[i].is_empty())
550 return false;
551 }
552 return true;
553}
554
555
556static void queue_init(thread_group_t *thread_group)
557{
558 for (int i=0; i < NQUEUES; i++)
559 {
560 thread_group->queues[i].empty();
561 }
562}
563
564static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
565{
566 ulonglong now= pool_timer.current_microtime;
567 for(int i=0; i < cnt; i++)
568 {
569 TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
570 c->dequeue_time= now;
571 thread_group->queues[c->priority].push_back(c);
572 }
573}
574
575/*
576 Handle wait timeout :
577 Find connections that have been idle for too long and kill them.
578 Also, recalculate time when next timeout check should run.
579*/
580
581static void timeout_check(pool_timer_t *timer)
582{
583 DBUG_ENTER("timeout_check");
584
585 mysql_mutex_lock(&LOCK_thread_count);
586 I_List_iterator<THD> it(threads);
587
588 /* Reset next timeout check, it will be recalculated in the loop below */
589 my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
590
591 THD *thd;
592 while ((thd=it++))
593 {
594 if (thd->net.reading_or_writing != 1)
595 continue;
596
597 TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
598 if (!connection)
599 {
600 /*
601 Connection does not have scheduler data. This happens for example
602 if THD belongs to a different scheduler, that is listening to extra_port.
603 */
604 continue;
605 }
606
607 if(connection->abs_wait_timeout < timer->current_microtime)
608 {
609 tp_timeout_handler(connection);
610 }
611 else
612 {
613 set_next_timeout_check(connection->abs_wait_timeout);
614 }
615 }
616 mysql_mutex_unlock(&LOCK_thread_count);
617 DBUG_VOID_RETURN;
618}
619
620
621/*
622 Timer thread.
623
624 Periodically, check if one of the thread groups is stalled. Stalls happen if
625 events are not being dequeued from the queue, or from the network, Primary
626 reason for stall can be a lengthy executing non-blocking request. It could
627 also happen that thread is waiting but wait_begin/wait_end is forgotten by
628 storage engine. Timer thread will create a new thread in group in case of
629 a stall.
630
631 Besides checking for stalls, timer thread is also responsible for terminating
632 clients that have been idle for longer than wait_timeout seconds.
633
634 TODO: Let the timer sleep for long time if there is no work to be done.
635 Currently it wakes up rather often on and idle server.
636*/
637
638static void* timer_thread(void *param)
639{
640 uint i;
641 pool_timer_t* timer=(pool_timer_t *)param;
642
643 my_thread_init();
644 DBUG_ENTER("timer_thread");
645 timer->next_timeout_check= ULONGLONG_MAX;
646 timer->current_microtime= microsecond_interval_timer();
647
648 for(;;)
649 {
650 struct timespec ts;
651 int err;
652
653 set_timespec_nsec(ts,timer->tick_interval*1000000);
654 mysql_mutex_lock(&timer->mutex);
655 err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
656 if (timer->shutdown)
657 {
658 mysql_mutex_unlock(&timer->mutex);
659 break;
660 }
661 if (err == ETIMEDOUT)
662 {
663 timer->current_microtime= microsecond_interval_timer();
664
665 /* Check stalls in thread groups */
666 for (i= 0; i < threadpool_max_size; i++)
667 {
668 if(all_groups[i].connection_count)
669 check_stall(&all_groups[i]);
670 }
671
672 /* Check if any client exceeded wait_timeout */
673 if (timer->next_timeout_check <= timer->current_microtime)
674 timeout_check(timer);
675 }
676 mysql_mutex_unlock(&timer->mutex);
677 }
678
679 mysql_mutex_destroy(&timer->mutex);
680 my_thread_end();
681 return NULL;
682}
683
684
685
686void check_stall(thread_group_t *thread_group)
687{
688 mysql_mutex_lock(&thread_group->mutex);
689
690 /*
691 Bump priority for the low priority connections that spent too much
692 time in low prio queue.
693 */
694 TP_connection_generic *c;
695 for (;;)
696 {
697 c= thread_group->queues[TP_PRIORITY_LOW].front();
698 if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
699 {
700 thread_group->queues[TP_PRIORITY_LOW].remove(c);
701 thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
702 }
703 else
704 break;
705 }
706
707 /*
708 Check if listener is present. If not, check whether any IO
709 events were dequeued since last time. If not, this means
710 listener is either in tight loop or thd_wait_begin()
711 was forgotten. Create a new worker(it will make itself listener).
712 */
713 if (!thread_group->listener && !thread_group->io_event_count)
714 {
715 wake_or_create_thread(thread_group);
716 mysql_mutex_unlock(&thread_group->mutex);
717 return;
718 }
719
720 /* Reset io event count */
721 thread_group->io_event_count= 0;
722
723 /*
724 Check whether requests from the workqueue are being dequeued.
725
726 The stall detection and resolution works as follows:
727
728 1. There is a counter thread_group->queue_event_count for the number of
729 events removed from the queue. Timer resets the counter to 0 on each run.
730 2. Timer determines stall if this counter remains 0 since last check
731 and the queue is not empty.
732 3. Once timer determined a stall it sets thread_group->stalled flag and
733 wakes and idle worker (or creates a new one, subject to throttling).
734 4. The stalled flag is reset, when an event is dequeued.
735
736 Q : Will this handling lead to an unbound growth of threads, if queue
737 stalls permanently?
738 A : No. If queue stalls permanently, it is an indication for many very long
739 simultaneous queries. The maximum number of simultanoues queries is
740 max_connections, further we have threadpool_max_threads limit, upon which no
741 worker threads are created. So in case there is a flood of very long
742 queries, threadpool would slowly approach thread-per-connection behavior.
743 NOTE:
744 If long queries never wait, creation of the new threads is done by timer,
745 so it is slower than in real thread-per-connection. However if long queries
746 do wait and indicate that via thd_wait_begin/end callbacks, thread creation
747 will be faster.
748 */
749 if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
750 {
751 thread_group->stalled= true;
752 wake_or_create_thread(thread_group);
753 }
754
755 /* Reset queue event count */
756 thread_group->queue_event_count= 0;
757
758 mysql_mutex_unlock(&thread_group->mutex);
759}
760
761
762static void start_timer(pool_timer_t* timer)
763{
764 DBUG_ENTER("start_timer");
765 mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
766 mysql_cond_init(key_timer_cond, &timer->cond, NULL);
767 timer->shutdown = false;
768 mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL,
769 timer_thread, timer);
770 DBUG_VOID_RETURN;
771}
772
773
774static void stop_timer(pool_timer_t *timer)
775{
776 DBUG_ENTER("stop_timer");
777 mysql_mutex_lock(&timer->mutex);
778 timer->shutdown = true;
779 mysql_cond_signal(&timer->cond);
780 mysql_mutex_unlock(&timer->mutex);
781 pthread_join(timer->timer_thread_id, NULL);
782 DBUG_VOID_RETURN;
783}
784
785
786/**
787 Poll for socket events and distribute them to worker threads
788 In many case current thread will handle single event itself.
789
790 @return a ready connection, or NULL on shutdown
791*/
792static TP_connection_generic * listener(worker_thread_t *current_thread,
793 thread_group_t *thread_group)
794{
795 DBUG_ENTER("listener");
796 TP_connection_generic *retval= NULL;
797
798 for(;;)
799 {
800 native_event ev[MAX_EVENTS];
801 int cnt;
802
803 if (thread_group->shutdown)
804 break;
805
806 cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
807
808 if (cnt <=0)
809 {
810 DBUG_ASSERT(thread_group->shutdown);
811 break;
812 }
813
814 mysql_mutex_lock(&thread_group->mutex);
815
816 if (thread_group->shutdown)
817 {
818 mysql_mutex_unlock(&thread_group->mutex);
819 break;
820 }
821
822 thread_group->io_event_count += cnt;
823
824 /*
825 We got some network events and need to make decisions : whether
826 listener hould handle events and whether or not any wake worker
827 threads so they can handle events.
828
829 Q1 : Should listener handle an event itself, or put all events into
830 queue and let workers handle the events?
831
832 Solution :
833 Generally, listener that handles events itself is preferable. We do not
834 want listener thread to change its state from waiting to running too
835 often, Since listener has just woken from poll, it better uses its time
836 slice and does some work. Besides, not handling events means they go to
837 the queue, and often to wake another worker must wake up to handle the
838 event. This is not good, as we want to avoid wakeups.
839
840 The downside of listener that also handles queries is that we can
841 potentially leave thread group for long time not picking the new
842 network events. It is not a major problem, because this stall will be
843 detected sooner or later by the timer thread. Still, relying on timer
844 is not always good, because it may "tick" too slow (large timer_interval)
845
846 We use following strategy to solve this problem - if queue was not empty
847 we suspect flood of network events and listener stays, Otherwise, it
848 handles a query.
849
850
851 Q2: If queue is not empty, how many workers to wake?
852
853 Solution:
854 We generally try to keep one thread per group active (threads handling
855 queries are considered active, unless they stuck in inside some "wait")
856 Thus, we will wake only one worker, and only if there is not active
857 threads currently,and listener is not going to handle a query. When we
858 don't wake, we hope that currently active threads will finish fast and
859 handle the queue. If this does not happen, timer thread will detect stall
860 and wake a worker.
861
862 NOTE: Currently nothing is done to detect or prevent long queuing times.
863 A solution for the future would be to give up "one active thread per
864 group" principle, if events stay in the queue for too long, and just wake
865 more workers.
866 */
867
868 bool listener_picks_event=is_queue_empty(thread_group);
869 queue_put(thread_group, ev, cnt);
870 if (listener_picks_event)
871 {
872 /* Handle the first event. */
873 retval= queue_get(thread_group);
874 mysql_mutex_unlock(&thread_group->mutex);
875 break;
876 }
877
878 if(thread_group->active_thread_count==0)
879 {
880 /* We added some work items to queue, now wake a worker. */
881 if(wake_thread(thread_group))
882 {
883 /*
884 Wake failed, hence groups has no idle threads. Now check if there are
885 any threads in the group except listener.
886 */
887 if(thread_group->thread_count == 1)
888 {
889 /*
890 Currently there is no worker thread in the group, as indicated by
891 thread_count == 1 (this means listener is the only one thread in
892 the group).
893 The queue is not empty, and listener is not going to handle
894 events. In order to drain the queue, we create a worker here.
895 Alternatively, we could just rely on timer to detect stall, and
896 create thread, but waiting for timer would be an inefficient and
897 pointless delay.
898 */
899 create_worker(thread_group);
900 }
901 }
902 }
903 mysql_mutex_unlock(&thread_group->mutex);
904 }
905
906 DBUG_RETURN(retval);
907}
908
909/**
910 Adjust thread counters in group or global
911 whenever thread is created or is about to exit
912
913 @param thread_group
914 @param count - 1, when new thread is created
915 -1, when thread is about to exit
916*/
917
918static void add_thread_count(thread_group_t *thread_group, int32 count)
919{
920 thread_group->thread_count += count;
921 /* worker starts out and end in "active" state */
922 thread_group->active_thread_count += count;
923 my_atomic_add32(&tp_stats.num_worker_threads, count);
924}
925
926
927/**
928 Creates a new worker thread.
929 thread_mutex must be held when calling this function
930
931 NOTE: in rare cases, the number of threads can exceed
932 threadpool_max_threads, because we need at least 2 threads
933 per group to prevent deadlocks (one listener + one worker)
934*/
935
936static int create_worker(thread_group_t *thread_group)
937{
938 pthread_t thread_id;
939 bool max_threads_reached= false;
940 int err;
941
942 DBUG_ENTER("create_worker");
943 if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
944 && thread_group->thread_count >= 2)
945 {
946 err= 1;
947 max_threads_reached= true;
948 goto end;
949 }
950
951
952 err= mysql_thread_create(key_worker_thread, &thread_id,
953 thread_group->pthread_attr, worker_main, thread_group);
954 if (!err)
955 {
956 thread_group->last_thread_creation_time=microsecond_interval_timer();
957 statistic_increment(thread_created,&LOCK_status);
958 add_thread_count(thread_group, 1);
959 }
960 else
961 {
962 my_errno= errno;
963 }
964
965end:
966 if (err)
967 print_pool_blocked_message(max_threads_reached);
968 else
969 pool_block_start= 0; /* Reset pool blocked timer, if it was set */
970
971 DBUG_RETURN(err);
972}
973
974
975/**
976 Calculate microseconds throttling delay for thread creation.
977
978 The value depends on how many threads are already in the group:
979 small number of threads means no delay, the more threads the larger
980 the delay.
981
982 The actual values were not calculated using any scientific methods.
983 They just look right, and behave well in practice.
984
985 TODO: Should throttling depend on thread_pool_stall_limit?
986*/
987static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
988{
989 int count= thread_group->thread_count;
990
991 if (count < 4)
992 return 0;
993
994 if (count < 8)
995 return 50*1000;
996
997 if(count < 16)
998 return 100*1000;
999
1000 return 200*1000;
1001}
1002
1003
1004/**
1005 Wakes a worker thread, or creates a new one.
1006
1007 Worker creation is throttled, so we avoid too many threads
1008 to be created during the short time.
1009*/
1010static int wake_or_create_thread(thread_group_t *thread_group)
1011{
1012 DBUG_ENTER("wake_or_create_thread");
1013
1014 if (thread_group->shutdown)
1015 DBUG_RETURN(0);
1016
1017 if (wake_thread(thread_group) == 0)
1018 DBUG_RETURN(0);
1019
1020 if (thread_group->thread_count > thread_group->connection_count)
1021 DBUG_RETURN(-1);
1022
1023
1024 if (thread_group->active_thread_count == 0)
1025 {
1026 /*
1027 We're better off creating a new thread here with no delay, either there
1028 are no workers at all, or they all are all blocking and there was no
1029 idle thread to wakeup. Smells like a potential deadlock or very slowly
1030 executing requests, e.g sleeps or user locks.
1031 */
1032 DBUG_RETURN(create_worker(thread_group));
1033 }
1034
1035 ulonglong now = microsecond_interval_timer();
1036 ulonglong time_since_last_thread_created =
1037 (now - thread_group->last_thread_creation_time);
1038
1039 /* Throttle thread creation. */
1040 if (time_since_last_thread_created >
1041 microsecond_throttling_interval(thread_group))
1042 {
1043 DBUG_RETURN(create_worker(thread_group));
1044 }
1045
1046 DBUG_RETURN(-1);
1047}
1048
1049
1050
1051int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
1052{
1053 DBUG_ENTER("thread_group_init");
1054 thread_group->pthread_attr = thread_attr;
1055 mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
1056 thread_group->pollfd= INVALID_HANDLE_VALUE;
1057 thread_group->shutdown_pipe[0]= -1;
1058 thread_group->shutdown_pipe[1]= -1;
1059 queue_init(thread_group);
1060 DBUG_RETURN(0);
1061}
1062
1063
1064void thread_group_destroy(thread_group_t *thread_group)
1065{
1066 mysql_mutex_destroy(&thread_group->mutex);
1067 if (thread_group->pollfd != INVALID_HANDLE_VALUE)
1068 {
1069 io_poll_close(thread_group->pollfd);
1070 thread_group->pollfd= INVALID_HANDLE_VALUE;
1071 }
1072#ifndef HAVE_IOCP
1073 for(int i=0; i < 2; i++)
1074 {
1075 if(thread_group->shutdown_pipe[i] != -1)
1076 {
1077 close(thread_group->shutdown_pipe[i]);
1078 thread_group->shutdown_pipe[i]= -1;
1079 }
1080 }
1081#endif
1082
1083 if (my_atomic_add32(&shutdown_group_count, -1) == 1)
1084 my_free(all_groups);
1085}
1086
1087/**
1088 Wake sleeping thread from waiting list
1089*/
1090
1091static int wake_thread(thread_group_t *thread_group)
1092{
1093 DBUG_ENTER("wake_thread");
1094 worker_thread_t *thread = thread_group->waiting_threads.front();
1095 if(thread)
1096 {
1097 thread->woken= true;
1098 thread_group->waiting_threads.remove(thread);
1099 mysql_cond_signal(&thread->cond);
1100 DBUG_RETURN(0);
1101 }
1102 DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
1103}
1104
1105/*
1106 Wake listener thread (during shutdown)
1107 Self-pipe trick is used in most cases,except IOCP.
1108*/
1109static int wake_listener(thread_group_t *thread_group)
1110{
1111#ifndef HAVE_IOCP
1112 if (pipe(thread_group->shutdown_pipe))
1113 {
1114 return -1;
1115 }
1116
1117 /* Wake listener */
1118 if (io_poll_associate_fd(thread_group->pollfd,
1119 thread_group->shutdown_pipe[0], NULL, NULL))
1120 {
1121 return -1;
1122 }
1123 char c= 0;
1124 if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
1125 return -1;
1126#else
1127 PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
1128#endif
1129 return 0;
1130}
1131/**
1132 Initiate shutdown for thread group.
1133
1134 The shutdown is asynchronous, we only care to wake all threads in here, so
1135 they can finish. We do not wait here until threads terminate. Final cleanup
1136 of the group (thread_group_destroy) will be done by the last exiting threads.
1137*/
1138
1139static void thread_group_close(thread_group_t *thread_group)
1140{
1141 DBUG_ENTER("thread_group_close");
1142
1143 mysql_mutex_lock(&thread_group->mutex);
1144 if (thread_group->thread_count == 0)
1145 {
1146 mysql_mutex_unlock(&thread_group->mutex);
1147 thread_group_destroy(thread_group);
1148 DBUG_VOID_RETURN;
1149 }
1150
1151 thread_group->shutdown= true;
1152 thread_group->listener= NULL;
1153
1154 wake_listener(thread_group);
1155
1156 /* Wake all workers. */
1157 while(wake_thread(thread_group) == 0)
1158 {
1159 }
1160
1161 mysql_mutex_unlock(&thread_group->mutex);
1162
1163 DBUG_VOID_RETURN;
1164}
1165
1166
1167/*
1168 Add work to the queue. Maybe wake a worker if they all sleep.
1169
1170 Currently, this function is only used when new connections need to
1171 perform login (this is done in worker threads).
1172
1173*/
1174
1175static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
1176{
1177 DBUG_ENTER("queue_put");
1178
1179 connection->dequeue_time= pool_timer.current_microtime;
1180 thread_group->queues[connection->priority].push_back(connection);
1181
1182 if (thread_group->active_thread_count == 0)
1183 wake_or_create_thread(thread_group);
1184
1185 DBUG_VOID_RETURN;
1186}
1187
1188
1189/*
1190 Prevent too many threads executing at the same time,if the workload is
1191 not CPU bound.
1192*/
1193
1194static bool too_many_threads(thread_group_t *thread_group)
1195{
1196 return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe
1197 && !thread_group->stalled);
1198}
1199
1200
1201/**
1202 Retrieve a connection with pending event.
1203
1204 Pending event in our case means that there is either a pending login request
1205 (if connection is not yet logged in), or there are unread bytes on the socket.
1206
1207 If there are no pending events currently, thread will wait.
1208 If timeout specified in abstime parameter passes, the function returns NULL.
1209
1210 @param current_thread - current worker thread
1211 @param thread_group - current thread group
1212 @param abstime - absolute wait timeout
1213
1214 @return
1215 connection with pending event.
1216 NULL is returned if timeout has expired,or on shutdown.
1217*/
1218
1219TP_connection_generic *get_event(worker_thread_t *current_thread,
1220 thread_group_t *thread_group, struct timespec *abstime)
1221{
1222 DBUG_ENTER("get_event");
1223 TP_connection_generic *connection = NULL;
1224
1225
1226 mysql_mutex_lock(&thread_group->mutex);
1227 DBUG_ASSERT(thread_group->active_thread_count >= 0);
1228
1229 for(;;)
1230 {
1231 int err=0;
1232 bool oversubscribed = too_many_threads(thread_group);
1233 if (thread_group->shutdown)
1234 break;
1235
1236 /* Check if queue is not empty */
1237 if (!oversubscribed)
1238 {
1239 connection = queue_get(thread_group);
1240 if(connection)
1241 break;
1242 }
1243
1244 /* If there is currently no listener in the group, become one. */
1245 if(!thread_group->listener)
1246 {
1247 thread_group->listener= current_thread;
1248 thread_group->active_thread_count--;
1249 mysql_mutex_unlock(&thread_group->mutex);
1250
1251 connection = listener(current_thread, thread_group);
1252
1253 mysql_mutex_lock(&thread_group->mutex);
1254 thread_group->active_thread_count++;
1255 /* There is no listener anymore, it just returned. */
1256 thread_group->listener= NULL;
1257 break;
1258 }
1259
1260
1261 /*
1262 Last thing we try before going to sleep is to
1263 non-blocking event poll, i.e with timeout = 0.
1264 If this returns events, pick one
1265 */
1266 if (!oversubscribed)
1267 {
1268
1269 native_event ev[MAX_EVENTS];
1270 int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
1271 if (cnt > 0)
1272 {
1273 queue_put(thread_group, ev, cnt);
1274 connection= queue_get(thread_group);
1275 break;
1276 }
1277 }
1278
1279
1280 /* And now, finally sleep */
1281 current_thread->woken = false; /* wake() sets this to true */
1282
1283 /*
1284 Add current thread to the head of the waiting list and wait.
1285 It is important to add thread to the head rather than tail
1286 as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
1287 */
1288 thread_group->waiting_threads.push_front(current_thread);
1289
1290 thread_group->active_thread_count--;
1291 if (abstime)
1292 {
1293 err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
1294 abstime);
1295 }
1296 else
1297 {
1298 err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
1299 }
1300 thread_group->active_thread_count++;
1301
1302 if (!current_thread->woken)
1303 {
1304 /*
1305 Thread was not signalled by wake(), it might be a spurious wakeup or
1306 a timeout. Anyhow, we need to remove ourselves from the list now.
1307 If thread was explicitly woken, than caller removed us from the list.
1308 */
1309 thread_group->waiting_threads.remove(current_thread);
1310 }
1311
1312 if (err)
1313 break;
1314 }
1315
1316 thread_group->stalled= false;
1317 mysql_mutex_unlock(&thread_group->mutex);
1318
1319 DBUG_RETURN(connection);
1320}
1321
1322
1323
1324/**
1325 Tells the pool that worker starts waiting on IO, lock, condition,
1326 sleep() or similar.
1327*/
1328
1329void wait_begin(thread_group_t *thread_group)
1330{
1331 DBUG_ENTER("wait_begin");
1332 mysql_mutex_lock(&thread_group->mutex);
1333 thread_group->active_thread_count--;
1334
1335 DBUG_ASSERT(thread_group->active_thread_count >=0);
1336 DBUG_ASSERT(thread_group->connection_count > 0);
1337
1338 if ((thread_group->active_thread_count == 0) &&
1339 (is_queue_empty(thread_group) || !thread_group->listener))
1340 {
1341 /*
1342 Group might stall while this thread waits, thus wake
1343 or create a worker to prevent stall.
1344 */
1345 wake_or_create_thread(thread_group);
1346 }
1347
1348 mysql_mutex_unlock(&thread_group->mutex);
1349 DBUG_VOID_RETURN;
1350}
1351
1352/**
1353 Tells the pool has finished waiting.
1354*/
1355
1356void wait_end(thread_group_t *thread_group)
1357{
1358 DBUG_ENTER("wait_end");
1359 mysql_mutex_lock(&thread_group->mutex);
1360 thread_group->active_thread_count++;
1361 mysql_mutex_unlock(&thread_group->mutex);
1362 DBUG_VOID_RETURN;
1363}
1364
1365
1366
1367
1368TP_connection * TP_pool_generic::new_connection(CONNECT *c)
1369{
1370 return new (std::nothrow) TP_connection_generic(c);
1371}
1372
1373/**
1374 Add a new connection to thread pool..
1375*/
1376
1377void TP_pool_generic::add(TP_connection *c)
1378{
1379 DBUG_ENTER("tp_add_connection");
1380
1381 TP_connection_generic *connection=(TP_connection_generic *)c;
1382 thread_group_t *thread_group= connection->thread_group;
1383 /*
1384 Add connection to the work queue.Actual logon
1385 will be done by a worker thread.
1386 */
1387 mysql_mutex_lock(&thread_group->mutex);
1388 queue_put(thread_group, connection);
1389 mysql_mutex_unlock(&thread_group->mutex);
1390 DBUG_VOID_RETURN;
1391}
1392
1393
1394
1395/**
1396 MySQL scheduler callback: wait begin
1397*/
1398
1399void TP_connection_generic::wait_begin(int type)
1400{
1401 DBUG_ENTER("wait_begin");
1402
1403 DBUG_ASSERT(!waiting);
1404 waiting++;
1405 if (waiting == 1)
1406 ::wait_begin(thread_group);
1407 DBUG_VOID_RETURN;
1408}
1409
1410
1411/**
1412 MySQL scheduler callback: wait end
1413*/
1414
1415void TP_connection_generic::wait_end()
1416{
1417 DBUG_ENTER("wait_end");
1418 DBUG_ASSERT(waiting);
1419 waiting--;
1420 if (waiting == 0)
1421 ::wait_end(thread_group);
1422 DBUG_VOID_RETURN;
1423}
1424
1425
1426static void set_next_timeout_check(ulonglong abstime)
1427{
1428 DBUG_ENTER("set_next_timeout_check");
1429 while(abstime < pool_timer.next_timeout_check)
1430 {
1431 longlong old= (longlong)pool_timer.next_timeout_check;
1432 my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
1433 &old, abstime);
1434 }
1435 DBUG_VOID_RETURN;
1436}
1437
1438TP_connection_generic::TP_connection_generic(CONNECT *c):
1439 TP_connection(c),
1440 thread_group(0),
1441 next_in_queue(0),
1442 prev_in_queue(0),
1443 abs_wait_timeout(ULONGLONG_MAX),
1444 bound_to_poll_descriptor(false),
1445 waiting(false)
1446#ifdef HAVE_IOCP
1447, overlapped()
1448#endif
1449{
1450 DBUG_ASSERT(c->vio);
1451
1452#ifdef _WIN32
1453 vio_type= c->vio->type;
1454 fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
1455 c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
1456#else
1457 fd= mysql_socket_getfd(c->vio->mysql_socket);
1458#endif
1459
1460 /* Assign connection to a group. */
1461 thread_group_t *group=
1462 &all_groups[c->thread_id%group_count];
1463
1464 thread_group=group;
1465
1466 mysql_mutex_lock(&group->mutex);
1467 group->connection_count++;
1468 mysql_mutex_unlock(&group->mutex);
1469}
1470
1471TP_connection_generic::~TP_connection_generic()
1472{
1473 mysql_mutex_lock(&thread_group->mutex);
1474 thread_group->connection_count--;
1475 mysql_mutex_unlock(&thread_group->mutex);
1476}
1477
1478/**
1479 Set wait timeout for connection.
1480*/
1481
1482void TP_connection_generic::set_io_timeout(int timeout_sec)
1483{
1484 DBUG_ENTER("set_wait_timeout");
1485 /*
1486 Calculate wait deadline for this connection.
1487 Instead of using microsecond_interval_timer() which has a syscall
1488 overhead, use pool_timer.current_microtime and take
1489 into account that its value could be off by at most
1490 one tick interval.
1491 */
1492
1493 abs_wait_timeout= pool_timer.current_microtime +
1494 1000LL*pool_timer.tick_interval +
1495 1000000LL*timeout_sec;
1496
1497 set_next_timeout_check(abs_wait_timeout);
1498 DBUG_VOID_RETURN;
1499}
1500
1501
1502#ifndef HAVE_IOCP
1503/**
1504 Handle a (rare) special case,where connection needs to
1505 migrate to a different group because group_count has changed
1506 after thread_pool_size setting.
1507*/
1508
1509static int change_group(TP_connection_generic *c,
1510 thread_group_t *old_group,
1511 thread_group_t *new_group)
1512{
1513 int ret= 0;
1514
1515 DBUG_ASSERT(c->thread_group == old_group);
1516
1517 /* Remove connection from the old group. */
1518 mysql_mutex_lock(&old_group->mutex);
1519 if (c->bound_to_poll_descriptor)
1520 {
1521 io_poll_disassociate_fd(old_group->pollfd,c->fd);
1522 c->bound_to_poll_descriptor= false;
1523 }
1524 c->thread_group->connection_count--;
1525 mysql_mutex_unlock(&old_group->mutex);
1526
1527 /* Add connection to the new group. */
1528 mysql_mutex_lock(&new_group->mutex);
1529 c->thread_group= new_group;
1530 new_group->connection_count++;
1531 /* Ensure that there is a listener in the new group. */
1532 if (!new_group->thread_count)
1533 ret= create_worker(new_group);
1534 mysql_mutex_unlock(&new_group->mutex);
1535 return ret;
1536}
1537#endif
1538
1539int TP_connection_generic::start_io()
1540{
1541#ifndef HAVE_IOCP
1542 /*
1543 Usually, connection will stay in the same group for the entire
1544 connection's life. However, we do allow group_count to
1545 change at runtime, which means in rare cases when it changes is
1546 connection should need to migrate to another group, this ensures
1547 to ensure equal load between groups.
1548
1549 So we recalculate in which group the connection should be, based
1550 on thread_id and current group count, and migrate if necessary.
1551 */
1552 thread_group_t *group =
1553 &all_groups[thd->thread_id%group_count];
1554
1555 if (group != thread_group)
1556 {
1557 if (change_group(this, thread_group, group))
1558 return -1;
1559 }
1560#endif
1561
1562 /*
1563 Bind to poll descriptor if not yet done.
1564 */
1565 if (!bound_to_poll_descriptor)
1566 {
1567 bound_to_poll_descriptor= true;
1568 return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1569 }
1570
1571 return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
1572}
1573
1574
1575
1576/**
1577 Worker thread's main
1578*/
1579
1580static void *worker_main(void *param)
1581{
1582
1583 worker_thread_t this_thread;
1584 pthread_detach_this_thread();
1585 my_thread_init();
1586
1587 DBUG_ENTER("worker_main");
1588
1589 thread_group_t *thread_group = (thread_group_t *)param;
1590
1591 /* Init per-thread structure */
1592 mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
1593 this_thread.thread_group= thread_group;
1594 this_thread.event_count=0;
1595
1596 /* Run event loop */
1597 for(;;)
1598 {
1599 TP_connection_generic *connection;
1600 struct timespec ts;
1601 set_timespec(ts,threadpool_idle_timeout);
1602 connection = get_event(&this_thread, thread_group, &ts);
1603 if (!connection)
1604 break;
1605 this_thread.event_count++;
1606 tp_callback(connection);
1607 }
1608
1609 /* Thread shutdown: cleanup per-worker-thread structure. */
1610 mysql_cond_destroy(&this_thread.cond);
1611
1612 bool last_thread; /* last thread in group exits */
1613 mysql_mutex_lock(&thread_group->mutex);
1614 add_thread_count(thread_group, -1);
1615 last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
1616 mysql_mutex_unlock(&thread_group->mutex);
1617
1618 /* Last thread in group exits and pool is terminating, destroy group.*/
1619 if (last_thread)
1620 thread_group_destroy(thread_group);
1621
1622 my_thread_end();
1623 return NULL;
1624}
1625
1626
1627TP_pool_generic::TP_pool_generic()
1628{}
1629
1630int TP_pool_generic::init()
1631{
1632 DBUG_ENTER("TP_pool_generic::TP_pool_generic");
1633 threadpool_max_size= MY_MAX(threadpool_size, 128);
1634 all_groups= (thread_group_t *)
1635 my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
1636 if (!all_groups)
1637 {
1638 threadpool_max_size= 0;
1639 sql_print_error("Allocation failed");
1640 DBUG_RETURN(-1);
1641 }
1642 scheduler_init();
1643 threadpool_started= true;
1644 for (uint i= 0; i < threadpool_max_size; i++)
1645 {
1646 thread_group_init(&all_groups[i], get_connection_attrib());
1647 }
1648 set_pool_size(threadpool_size);
1649 if(group_count == 0)
1650 {
1651 /* Something went wrong */
1652 sql_print_error("Can't set threadpool size to %d",threadpool_size);
1653 DBUG_RETURN(-1);
1654 }
1655 PSI_register(mutex);
1656 PSI_register(cond);
1657 PSI_register(thread);
1658
1659 pool_timer.tick_interval= threadpool_stall_limit;
1660 start_timer(&pool_timer);
1661 DBUG_RETURN(0);
1662}
1663
1664TP_pool_generic::~TP_pool_generic()
1665{
1666 DBUG_ENTER("tp_end");
1667
1668 if (!threadpool_started)
1669 DBUG_VOID_RETURN;
1670
1671 stop_timer(&pool_timer);
1672 shutdown_group_count= threadpool_max_size;
1673 for (uint i= 0; i < threadpool_max_size; i++)
1674 {
1675 thread_group_close(&all_groups[i]);
1676 }
1677 threadpool_started= false;
1678 DBUG_VOID_RETURN;
1679}
1680
1681
1682/** Ensure that poll descriptors are created when threadpool_size changes */
1683int TP_pool_generic::set_pool_size(uint size)
1684{
1685 bool success= true;
1686
1687 for(uint i=0; i< size; i++)
1688 {
1689 thread_group_t *group= &all_groups[i];
1690 mysql_mutex_lock(&group->mutex);
1691 if (group->pollfd == INVALID_HANDLE_VALUE)
1692 {
1693 group->pollfd= io_poll_create();
1694 success= (group->pollfd != INVALID_HANDLE_VALUE);
1695 if(!success)
1696 {
1697 sql_print_error("io_poll_create() failed, errno=%d\n", errno);
1698 }
1699 }
1700 mysql_mutex_unlock(&group->mutex);
1701 if (!success)
1702 {
1703 group_count= i;
1704 return -1;
1705 }
1706 }
1707 group_count= size;
1708 return 0;
1709}
1710
1711int TP_pool_generic::set_stall_limit(uint limit)
1712{
1713 mysql_mutex_lock(&(pool_timer.mutex));
1714 pool_timer.tick_interval= limit;
1715 mysql_mutex_unlock(&(pool_timer.mutex));
1716 mysql_cond_signal(&(pool_timer.cond));
1717 return 0;
1718}
1719
1720
1721/**
1722 Calculate number of idle/waiting threads in the pool.
1723
1724 Sum idle threads over all groups.
1725 Don't do any locking, it is not required for stats.
1726*/
1727
1728int TP_pool_generic::get_idle_thread_count()
1729{
1730 int sum=0;
1731 for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
1732 {
1733 sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
1734 }
1735 return sum;
1736}
1737
1738
1739/* Report threadpool problems */
1740
1741/**
1742 Delay in microseconds, after which "pool blocked" message is printed.
1743 (30 sec == 30 Mio usec)
1744*/
1745#define BLOCK_MSG_DELAY (30*1000000)
1746
1747#define MAX_THREADS_REACHED_MSG \
1748"Threadpool could not create additional thread to handle queries, because the \
1749number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
1750parameter can help in this situation.\n \
1751If 'extra_port' parameter is set, you can still connect to the database with \
1752superuser account (it must be TCP connection using extra_port as TCP port) \
1753and troubleshoot the situation. \
1754A likely cause of pool blocks are clients that lock resources for long time. \
1755'show processlist' or 'show engine innodb status' can give additional hints."
1756
1757#define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
1758
1759/**
1760 Write a message when blocking situation in threadpool occurs.
1761 The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
1762 It will be just a single message for each blocking situation (to prevent
1763 log flood).
1764*/
1765
1766static void print_pool_blocked_message(bool max_threads_reached)
1767{
1768 ulonglong now;
1769 static bool msg_written;
1770
1771 now= microsecond_interval_timer();
1772 if (pool_block_start == 0)
1773 {
1774 pool_block_start= now;
1775 msg_written = false;
1776 return;
1777 }
1778
1779 if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
1780 {
1781 if (max_threads_reached)
1782 sql_print_error(MAX_THREADS_REACHED_MSG);
1783 else
1784 sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno);
1785
1786 sql_print_information("Threadpool has been blocked for %u seconds\n",
1787 (uint)((now- pool_block_start)/1000000));
1788 /* avoid reperated messages for the same blocking situation */
1789 msg_written= true;
1790 }
1791}
1792
1793#endif /* HAVE_POOL_OF_THREADS */
1794