1 | /* Copyright (C) 2012 Monty Program Ab |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ |
15 | |
16 | #include "mariadb.h" |
17 | #include <violite.h> |
18 | #include <sql_priv.h> |
19 | #include <sql_class.h> |
20 | #include <my_pthread.h> |
21 | #include <scheduler.h> |
22 | |
23 | #ifdef HAVE_POOL_OF_THREADS |
24 | |
25 | #ifdef _WIN32 |
26 | /* AIX may define this, too ?*/ |
27 | #define HAVE_IOCP |
28 | #endif |
29 | |
30 | #ifdef HAVE_IOCP |
31 | #define OPTIONAL_IO_POLL_READ_PARAM this |
32 | #else |
33 | #define OPTIONAL_IO_POLL_READ_PARAM 0 |
34 | #endif |
35 | |
36 | #ifdef _WIN32 |
37 | typedef HANDLE TP_file_handle; |
38 | #else |
39 | typedef int TP_file_handle; |
40 | #define INVALID_HANDLE_VALUE -1 |
41 | #endif |
42 | |
43 | |
44 | #include <sql_connect.h> |
45 | #include <mysqld.h> |
46 | #include <debug_sync.h> |
47 | #include <time.h> |
48 | #include <sql_plist.h> |
49 | #include <threadpool.h> |
50 | #include <time.h> |
51 | #ifdef __linux__ |
52 | #include <sys/epoll.h> |
53 | typedef struct epoll_event native_event; |
54 | #elif defined(HAVE_KQUEUE) |
55 | #include <sys/event.h> |
56 | typedef struct kevent native_event; |
57 | #elif defined (__sun) |
58 | #include <port.h> |
59 | typedef port_event_t native_event; |
60 | #elif defined (HAVE_IOCP) |
61 | typedef OVERLAPPED_ENTRY native_event; |
62 | #else |
63 | #error threadpool is not available on this platform |
64 | #endif |
65 | |
66 | |
67 | static void io_poll_close(TP_file_handle fd) |
68 | { |
69 | #ifdef _WIN32 |
70 | CloseHandle(fd); |
71 | #else |
72 | close(fd); |
73 | #endif |
74 | } |
75 | |
76 | |
77 | /** Maximum number of native events a listener can read in one go */ |
78 | #define MAX_EVENTS 1024 |
79 | |
80 | /** Indicates that threadpool was initialized*/ |
81 | static bool threadpool_started= false; |
82 | |
83 | /* |
84 | Define PSI Keys for performance schema. |
85 | We have a mutex per group, worker threads, condition per worker thread, |
86 | and timer thread with its own mutex and condition. |
87 | */ |
88 | |
89 | |
90 | #ifdef HAVE_PSI_INTERFACE |
91 | static PSI_mutex_key key_group_mutex; |
92 | static PSI_mutex_key key_timer_mutex; |
93 | static PSI_mutex_info mutex_list[]= |
94 | { |
95 | { &key_group_mutex, "group_mutex" , 0}, |
96 | { &key_timer_mutex, "timer_mutex" , PSI_FLAG_GLOBAL} |
97 | }; |
98 | |
99 | static PSI_cond_key key_worker_cond; |
100 | static PSI_cond_key key_timer_cond; |
101 | static PSI_cond_info cond_list[]= |
102 | { |
103 | { &key_worker_cond, "worker_cond" , 0}, |
104 | { &key_timer_cond, "timer_cond" , PSI_FLAG_GLOBAL} |
105 | }; |
106 | |
107 | static PSI_thread_key key_worker_thread; |
108 | static PSI_thread_key key_timer_thread; |
109 | static PSI_thread_info thread_list[] = |
110 | { |
111 | {&key_worker_thread, "worker_thread" , 0}, |
112 | {&key_timer_thread, "timer_thread" , PSI_FLAG_GLOBAL} |
113 | }; |
114 | |
115 | /* Macro to simplify performance schema registration */ |
116 | #define PSI_register(X) \ |
117 | if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) |
118 | #else |
119 | #define PSI_register(X) /* no-op */ |
120 | #endif |
121 | |
122 | |
123 | struct thread_group_t; |
124 | |
125 | /* Per-thread structure for workers */ |
126 | struct worker_thread_t |
127 | { |
128 | ulonglong event_count; /* number of request handled by this thread */ |
129 | thread_group_t* thread_group; |
130 | worker_thread_t *next_in_list; |
131 | worker_thread_t **prev_in_list; |
132 | |
133 | mysql_cond_t cond; |
134 | bool woken; |
135 | }; |
136 | |
137 | typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, |
138 | &worker_thread_t::next_in_list, |
139 | &worker_thread_t::prev_in_list> |
140 | > |
141 | worker_list_t; |
142 | |
143 | struct TP_connection_generic:public TP_connection |
144 | { |
145 | TP_connection_generic(CONNECT *c); |
146 | ~TP_connection_generic(); |
147 | |
148 | virtual int init(){ return 0; }; |
149 | virtual void set_io_timeout(int sec); |
150 | virtual int start_io(); |
151 | virtual void wait_begin(int type); |
152 | virtual void wait_end(); |
153 | |
154 | thread_group_t *thread_group; |
155 | TP_connection_generic *next_in_queue; |
156 | TP_connection_generic **prev_in_queue; |
157 | ulonglong abs_wait_timeout; |
158 | ulonglong dequeue_time; |
159 | TP_file_handle fd; |
160 | bool bound_to_poll_descriptor; |
161 | int waiting; |
162 | #ifdef HAVE_IOCP |
163 | OVERLAPPED overlapped; |
164 | #endif |
165 | #ifdef _WIN32 |
166 | enum_vio_type vio_type; |
167 | #endif |
168 | }; |
169 | |
170 | |
171 | typedef I_P_List<TP_connection_generic, |
172 | I_P_List_adapter<TP_connection_generic, |
173 | &TP_connection_generic::next_in_queue, |
174 | &TP_connection_generic::prev_in_queue>, |
175 | I_P_List_null_counter, |
176 | I_P_List_fast_push_back<TP_connection_generic> > |
177 | connection_queue_t; |
178 | |
179 | const int NQUEUES=2; /* We have high and low priority queues*/ |
180 | |
181 | struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t |
182 | { |
183 | mysql_mutex_t mutex; |
184 | connection_queue_t queues[NQUEUES]; |
185 | worker_list_t waiting_threads; |
186 | worker_thread_t *listener; |
187 | pthread_attr_t *pthread_attr; |
188 | TP_file_handle pollfd; |
189 | int thread_count; |
190 | int active_thread_count; |
191 | int connection_count; |
192 | /* Stats for the deadlock detection timer routine.*/ |
193 | int io_event_count; |
194 | int queue_event_count; |
195 | ulonglong last_thread_creation_time; |
196 | int shutdown_pipe[2]; |
197 | bool shutdown; |
198 | bool stalled; |
199 | }; |
200 | |
201 | static thread_group_t *all_groups; |
202 | static uint group_count; |
203 | static int32 shutdown_group_count; |
204 | |
205 | /** |
206 | Used for printing "pool blocked" message, see |
207 | print_pool_blocked_message(); |
208 | */ |
209 | static ulonglong pool_block_start; |
210 | |
211 | /* Global timer for all groups */ |
212 | struct pool_timer_t |
213 | { |
214 | mysql_mutex_t mutex; |
215 | mysql_cond_t cond; |
216 | volatile uint64 current_microtime; |
217 | volatile uint64 next_timeout_check; |
218 | int tick_interval; |
219 | bool shutdown; |
220 | pthread_t timer_thread_id; |
221 | }; |
222 | |
223 | static pool_timer_t pool_timer; |
224 | |
225 | static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); |
226 | static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); |
227 | static int wake_thread(thread_group_t *thread_group); |
228 | static int wake_or_create_thread(thread_group_t *thread_group); |
229 | static int create_worker(thread_group_t *thread_group); |
230 | static void *worker_main(void *param); |
231 | static void check_stall(thread_group_t *thread_group); |
232 | static void set_next_timeout_check(ulonglong abstime); |
233 | static void print_pool_blocked_message(bool); |
234 | |
235 | /** |
236 | Asynchronous network IO. |
237 | |
238 | We use native edge-triggered network IO multiplexing facility. |
239 | This maps to different APIs on different Unixes. |
240 | |
241 | Supported are currently Linux with epoll, Solaris with event ports, |
242 | OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags |
243 | (the event is signalled once client has written something into the socket, |
244 | then socket is removed from the "poll-set" until the command is finished, |
245 | and we need to re-arm/re-register socket) |
246 | |
247 | No implementation for poll/select is currently provided. |
248 | |
249 | The API closely resembles all of the above mentioned platform APIs |
250 | and consists of following functions. |
251 | |
252 | - io_poll_create() |
253 | Creates an io_poll descriptor |
254 | On Linux: epoll_create() |
255 | |
256 | - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt) |
257 | Associate file descriptor with io poll descriptor |
258 | On Linux : epoll_ctl(..EPOLL_CTL_ADD)) |
259 | |
260 | - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) |
261 | Associate file descriptor with io poll descriptor |
262 | On Linux: epoll_ctl(..EPOLL_CTL_DEL) |
263 | |
264 | |
265 | - io_poll_start_read(int poll_fd,int fd, void *data, void *opt) |
266 | The same as io_poll_associate_fd(), but cannot be used before |
267 | io_poll_associate_fd() was called. |
268 | On Linux : epoll_ctl(..EPOLL_CTL_MOD) |
269 | |
270 | - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents, |
271 | int timeout_ms) |
272 | |
273 | wait until one or more descriptors added with io_poll_associate_fd() |
274 | or io_poll_start_read() becomes readable. Data associated with |
275 | descriptors can be retrieved from native_events array, using |
276 | native_event_get_userdata() function. |
277 | |
278 | |
279 | On Linux: epoll_wait() |
280 | */ |
281 | |
282 | #if defined (__linux__) |
283 | #ifndef EPOLLRDHUP |
284 | /* Early 2.6 kernel did not have EPOLLRDHUP */ |
285 | #define EPOLLRDHUP 0 |
286 | #endif |
287 | static TP_file_handle io_poll_create() |
288 | { |
289 | return epoll_create(1); |
290 | } |
291 | |
292 | |
293 | int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*) |
294 | { |
295 | struct epoll_event ev; |
296 | ev.data.u64= 0; /* Keep valgrind happy */ |
297 | ev.data.ptr= data; |
298 | ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; |
299 | return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev); |
300 | } |
301 | |
302 | |
303 | |
304 | int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) |
305 | { |
306 | struct epoll_event ev; |
307 | ev.data.u64= 0; /* Keep valgrind happy */ |
308 | ev.data.ptr= data; |
309 | ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; |
310 | return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); |
311 | } |
312 | |
313 | int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) |
314 | { |
315 | struct epoll_event ev; |
316 | return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); |
317 | } |
318 | |
319 | |
320 | /* |
321 | Wrapper around epoll_wait. |
322 | NOTE - in case of EINTR, it restarts with original timeout. Since we use |
323 | either infinite or 0 timeouts, this is not critical |
324 | */ |
325 | int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents, |
326 | int timeout_ms) |
327 | { |
328 | int ret; |
329 | do |
330 | { |
331 | ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms); |
332 | } |
333 | while(ret == -1 && errno == EINTR); |
334 | return ret; |
335 | } |
336 | |
337 | |
338 | static void *native_event_get_userdata(native_event *event) |
339 | { |
340 | return event->data.ptr; |
341 | } |
342 | |
343 | #elif defined(HAVE_KQUEUE) |
344 | |
345 | /* |
346 | NetBSD is incompatible with other BSDs , last parameter in EV_SET macro |
347 | (udata, user data) needs to be intptr_t, whereas it needs to be void* |
348 | everywhere else. |
349 | */ |
350 | |
351 | #ifdef __NetBSD__ |
352 | #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g) |
353 | #else |
354 | #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g) |
355 | #endif |
356 | |
357 | |
358 | TP_file_handle io_poll_create() |
359 | { |
360 | return kqueue(); |
361 | } |
362 | |
363 | int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) |
364 | { |
365 | struct kevent ke; |
366 | MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, |
367 | 0, 0, data); |
368 | return kevent(pollfd, &ke, 1, 0, 0, 0); |
369 | } |
370 | |
371 | |
372 | int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) |
373 | { |
374 | struct kevent ke; |
375 | MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, |
376 | 0, 0, data); |
377 | return io_poll_start_read(pollfd,fd, data, 0); |
378 | } |
379 | |
380 | |
381 | int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) |
382 | { |
383 | struct kevent ke; |
384 | MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0); |
385 | return kevent(pollfd, &ke, 1, 0, 0, 0); |
386 | } |
387 | |
388 | |
389 | int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms) |
390 | { |
391 | struct timespec ts; |
392 | int ret; |
393 | if (timeout_ms >= 0) |
394 | { |
395 | ts.tv_sec= timeout_ms/1000; |
396 | ts.tv_nsec= (timeout_ms%1000)*1000000; |
397 | } |
398 | do |
399 | { |
400 | ret= kevent(pollfd, 0, 0, events, maxevents, |
401 | (timeout_ms >= 0)?&ts:NULL); |
402 | } |
403 | while (ret == -1 && errno == EINTR); |
404 | return ret; |
405 | } |
406 | |
407 | static void* native_event_get_userdata(native_event *event) |
408 | { |
409 | return (void *)event->udata; |
410 | } |
411 | |
412 | #elif defined (__sun) |
413 | |
414 | static TP_file_handle io_poll_create() |
415 | { |
416 | return port_create(); |
417 | } |
418 | |
419 | int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) |
420 | { |
421 | return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); |
422 | } |
423 | |
424 | static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) |
425 | { |
426 | return io_poll_start_read(pollfd, fd, data, 0); |
427 | } |
428 | |
429 | int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) |
430 | { |
431 | return port_dissociate(pollfd, PORT_SOURCE_FD, fd); |
432 | } |
433 | |
434 | int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) |
435 | { |
436 | struct timespec ts; |
437 | int ret; |
438 | uint_t nget= 1; |
439 | if (timeout_ms >= 0) |
440 | { |
441 | ts.tv_sec= timeout_ms/1000; |
442 | ts.tv_nsec= (timeout_ms%1000)*1000000; |
443 | } |
444 | do |
445 | { |
446 | ret= port_getn(pollfd, events, maxevents, &nget, |
447 | (timeout_ms >= 0)?&ts:NULL); |
448 | } |
449 | while (ret == -1 && errno == EINTR); |
450 | DBUG_ASSERT(nget < INT_MAX); |
451 | return (int)nget; |
452 | } |
453 | |
454 | static void* native_event_get_userdata(native_event *event) |
455 | { |
456 | return event->portev_user; |
457 | } |
458 | |
459 | #elif defined(HAVE_IOCP) |
460 | |
461 | |
462 | static TP_file_handle io_poll_create() |
463 | { |
464 | return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); |
465 | } |
466 | |
467 | |
468 | int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt) |
469 | { |
470 | static char c; |
471 | TP_connection_generic *con= (TP_connection_generic *)opt; |
472 | OVERLAPPED *overlapped= &con->overlapped; |
473 | if (con->vio_type == VIO_TYPE_NAMEDPIPE) |
474 | { |
475 | if (ReadFile(fd, &c, 0, NULL, overlapped)) |
476 | return 0; |
477 | } |
478 | else |
479 | { |
480 | WSABUF buf; |
481 | buf.buf= &c; |
482 | buf.len= 0; |
483 | DWORD flags=0; |
484 | |
485 | if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0) |
486 | return 0; |
487 | } |
488 | |
489 | if (GetLastError() == ERROR_IO_PENDING) |
490 | return 0; |
491 | |
492 | return 1; |
493 | } |
494 | |
495 | |
496 | static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt) |
497 | { |
498 | HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0); |
499 | if (!h) |
500 | return -1; |
501 | return io_poll_start_read(pollfd,fd, 0, opt); |
502 | } |
503 | |
504 | |
505 | int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) |
506 | { |
507 | /* Not possible to unbind/rebind file descriptor in IOCP. */ |
508 | return 0; |
509 | } |
510 | |
511 | |
512 | int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) |
513 | { |
514 | ULONG n; |
515 | BOOL ok = GetQueuedCompletionStatusEx(pollfd, events, |
516 | maxevents, &n, timeout_ms, FALSE); |
517 | |
518 | return ok ? (int)n : -1; |
519 | } |
520 | |
521 | |
522 | static void* native_event_get_userdata(native_event *event) |
523 | { |
524 | return (void *)event->lpCompletionKey; |
525 | } |
526 | #endif |
527 | |
528 | |
529 | /* Dequeue element from a workqueue */ |
530 | |
531 | static TP_connection_generic *queue_get(thread_group_t *thread_group) |
532 | { |
533 | DBUG_ENTER("queue_get" ); |
534 | thread_group->queue_event_count++; |
535 | TP_connection_generic *c; |
536 | for (int i=0; i < NQUEUES;i++) |
537 | { |
538 | c= thread_group->queues[i].pop_front(); |
539 | if (c) |
540 | DBUG_RETURN(c); |
541 | } |
542 | DBUG_RETURN(0); |
543 | } |
544 | |
545 | static bool is_queue_empty(thread_group_t *thread_group) |
546 | { |
547 | for (int i=0; i < NQUEUES; i++) |
548 | { |
549 | if (!thread_group->queues[i].is_empty()) |
550 | return false; |
551 | } |
552 | return true; |
553 | } |
554 | |
555 | |
556 | static void queue_init(thread_group_t *thread_group) |
557 | { |
558 | for (int i=0; i < NQUEUES; i++) |
559 | { |
560 | thread_group->queues[i].empty(); |
561 | } |
562 | } |
563 | |
564 | static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) |
565 | { |
566 | ulonglong now= pool_timer.current_microtime; |
567 | for(int i=0; i < cnt; i++) |
568 | { |
569 | TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); |
570 | c->dequeue_time= now; |
571 | thread_group->queues[c->priority].push_back(c); |
572 | } |
573 | } |
574 | |
575 | /* |
576 | Handle wait timeout : |
577 | Find connections that have been idle for too long and kill them. |
578 | Also, recalculate time when next timeout check should run. |
579 | */ |
580 | |
581 | static void timeout_check(pool_timer_t *timer) |
582 | { |
583 | DBUG_ENTER("timeout_check" ); |
584 | |
585 | mysql_mutex_lock(&LOCK_thread_count); |
586 | I_List_iterator<THD> it(threads); |
587 | |
588 | /* Reset next timeout check, it will be recalculated in the loop below */ |
589 | my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX); |
590 | |
591 | THD *thd; |
592 | while ((thd=it++)) |
593 | { |
594 | if (thd->net.reading_or_writing != 1) |
595 | continue; |
596 | |
597 | TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data; |
598 | if (!connection) |
599 | { |
600 | /* |
601 | Connection does not have scheduler data. This happens for example |
602 | if THD belongs to a different scheduler, that is listening to extra_port. |
603 | */ |
604 | continue; |
605 | } |
606 | |
607 | if(connection->abs_wait_timeout < timer->current_microtime) |
608 | { |
609 | tp_timeout_handler(connection); |
610 | } |
611 | else |
612 | { |
613 | set_next_timeout_check(connection->abs_wait_timeout); |
614 | } |
615 | } |
616 | mysql_mutex_unlock(&LOCK_thread_count); |
617 | DBUG_VOID_RETURN; |
618 | } |
619 | |
620 | |
621 | /* |
622 | Timer thread. |
623 | |
624 | Periodically, check if one of the thread groups is stalled. Stalls happen if |
625 | events are not being dequeued from the queue, or from the network, Primary |
626 | reason for stall can be a lengthy executing non-blocking request. It could |
627 | also happen that thread is waiting but wait_begin/wait_end is forgotten by |
628 | storage engine. Timer thread will create a new thread in group in case of |
629 | a stall. |
630 | |
631 | Besides checking for stalls, timer thread is also responsible for terminating |
632 | clients that have been idle for longer than wait_timeout seconds. |
633 | |
634 | TODO: Let the timer sleep for long time if there is no work to be done. |
635 | Currently it wakes up rather often on and idle server. |
636 | */ |
637 | |
638 | static void* timer_thread(void *param) |
639 | { |
640 | uint i; |
641 | pool_timer_t* timer=(pool_timer_t *)param; |
642 | |
643 | my_thread_init(); |
644 | DBUG_ENTER("timer_thread" ); |
645 | timer->next_timeout_check= ULONGLONG_MAX; |
646 | timer->current_microtime= microsecond_interval_timer(); |
647 | |
648 | for(;;) |
649 | { |
650 | struct timespec ts; |
651 | int err; |
652 | |
653 | set_timespec_nsec(ts,timer->tick_interval*1000000); |
654 | mysql_mutex_lock(&timer->mutex); |
655 | err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); |
656 | if (timer->shutdown) |
657 | { |
658 | mysql_mutex_unlock(&timer->mutex); |
659 | break; |
660 | } |
661 | if (err == ETIMEDOUT) |
662 | { |
663 | timer->current_microtime= microsecond_interval_timer(); |
664 | |
665 | /* Check stalls in thread groups */ |
666 | for (i= 0; i < threadpool_max_size; i++) |
667 | { |
668 | if(all_groups[i].connection_count) |
669 | check_stall(&all_groups[i]); |
670 | } |
671 | |
672 | /* Check if any client exceeded wait_timeout */ |
673 | if (timer->next_timeout_check <= timer->current_microtime) |
674 | timeout_check(timer); |
675 | } |
676 | mysql_mutex_unlock(&timer->mutex); |
677 | } |
678 | |
679 | mysql_mutex_destroy(&timer->mutex); |
680 | my_thread_end(); |
681 | return NULL; |
682 | } |
683 | |
684 | |
685 | |
686 | void check_stall(thread_group_t *thread_group) |
687 | { |
688 | mysql_mutex_lock(&thread_group->mutex); |
689 | |
690 | /* |
691 | Bump priority for the low priority connections that spent too much |
692 | time in low prio queue. |
693 | */ |
694 | TP_connection_generic *c; |
695 | for (;;) |
696 | { |
697 | c= thread_group->queues[TP_PRIORITY_LOW].front(); |
698 | if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer) |
699 | { |
700 | thread_group->queues[TP_PRIORITY_LOW].remove(c); |
701 | thread_group->queues[TP_PRIORITY_HIGH].push_back(c); |
702 | } |
703 | else |
704 | break; |
705 | } |
706 | |
707 | /* |
708 | Check if listener is present. If not, check whether any IO |
709 | events were dequeued since last time. If not, this means |
710 | listener is either in tight loop or thd_wait_begin() |
711 | was forgotten. Create a new worker(it will make itself listener). |
712 | */ |
713 | if (!thread_group->listener && !thread_group->io_event_count) |
714 | { |
715 | wake_or_create_thread(thread_group); |
716 | mysql_mutex_unlock(&thread_group->mutex); |
717 | return; |
718 | } |
719 | |
720 | /* Reset io event count */ |
721 | thread_group->io_event_count= 0; |
722 | |
723 | /* |
724 | Check whether requests from the workqueue are being dequeued. |
725 | |
726 | The stall detection and resolution works as follows: |
727 | |
728 | 1. There is a counter thread_group->queue_event_count for the number of |
729 | events removed from the queue. Timer resets the counter to 0 on each run. |
730 | 2. Timer determines stall if this counter remains 0 since last check |
731 | and the queue is not empty. |
732 | 3. Once timer determined a stall it sets thread_group->stalled flag and |
733 | wakes and idle worker (or creates a new one, subject to throttling). |
734 | 4. The stalled flag is reset, when an event is dequeued. |
735 | |
736 | Q : Will this handling lead to an unbound growth of threads, if queue |
737 | stalls permanently? |
738 | A : No. If queue stalls permanently, it is an indication for many very long |
739 | simultaneous queries. The maximum number of simultanoues queries is |
740 | max_connections, further we have threadpool_max_threads limit, upon which no |
741 | worker threads are created. So in case there is a flood of very long |
742 | queries, threadpool would slowly approach thread-per-connection behavior. |
743 | NOTE: |
744 | If long queries never wait, creation of the new threads is done by timer, |
745 | so it is slower than in real thread-per-connection. However if long queries |
746 | do wait and indicate that via thd_wait_begin/end callbacks, thread creation |
747 | will be faster. |
748 | */ |
749 | if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) |
750 | { |
751 | thread_group->stalled= true; |
752 | wake_or_create_thread(thread_group); |
753 | } |
754 | |
755 | /* Reset queue event count */ |
756 | thread_group->queue_event_count= 0; |
757 | |
758 | mysql_mutex_unlock(&thread_group->mutex); |
759 | } |
760 | |
761 | |
762 | static void start_timer(pool_timer_t* timer) |
763 | { |
764 | DBUG_ENTER("start_timer" ); |
765 | mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL); |
766 | mysql_cond_init(key_timer_cond, &timer->cond, NULL); |
767 | timer->shutdown = false; |
768 | mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL, |
769 | timer_thread, timer); |
770 | DBUG_VOID_RETURN; |
771 | } |
772 | |
773 | |
774 | static void stop_timer(pool_timer_t *timer) |
775 | { |
776 | DBUG_ENTER("stop_timer" ); |
777 | mysql_mutex_lock(&timer->mutex); |
778 | timer->shutdown = true; |
779 | mysql_cond_signal(&timer->cond); |
780 | mysql_mutex_unlock(&timer->mutex); |
781 | pthread_join(timer->timer_thread_id, NULL); |
782 | DBUG_VOID_RETURN; |
783 | } |
784 | |
785 | |
786 | /** |
787 | Poll for socket events and distribute them to worker threads |
788 | In many case current thread will handle single event itself. |
789 | |
790 | @return a ready connection, or NULL on shutdown |
791 | */ |
792 | static TP_connection_generic * listener(worker_thread_t *current_thread, |
793 | thread_group_t *thread_group) |
794 | { |
795 | DBUG_ENTER("listener" ); |
796 | TP_connection_generic *retval= NULL; |
797 | |
798 | for(;;) |
799 | { |
800 | native_event ev[MAX_EVENTS]; |
801 | int cnt; |
802 | |
803 | if (thread_group->shutdown) |
804 | break; |
805 | |
806 | cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); |
807 | |
808 | if (cnt <=0) |
809 | { |
810 | DBUG_ASSERT(thread_group->shutdown); |
811 | break; |
812 | } |
813 | |
814 | mysql_mutex_lock(&thread_group->mutex); |
815 | |
816 | if (thread_group->shutdown) |
817 | { |
818 | mysql_mutex_unlock(&thread_group->mutex); |
819 | break; |
820 | } |
821 | |
822 | thread_group->io_event_count += cnt; |
823 | |
824 | /* |
825 | We got some network events and need to make decisions : whether |
826 | listener hould handle events and whether or not any wake worker |
827 | threads so they can handle events. |
828 | |
829 | Q1 : Should listener handle an event itself, or put all events into |
830 | queue and let workers handle the events? |
831 | |
832 | Solution : |
833 | Generally, listener that handles events itself is preferable. We do not |
834 | want listener thread to change its state from waiting to running too |
835 | often, Since listener has just woken from poll, it better uses its time |
836 | slice and does some work. Besides, not handling events means they go to |
837 | the queue, and often to wake another worker must wake up to handle the |
838 | event. This is not good, as we want to avoid wakeups. |
839 | |
840 | The downside of listener that also handles queries is that we can |
841 | potentially leave thread group for long time not picking the new |
842 | network events. It is not a major problem, because this stall will be |
843 | detected sooner or later by the timer thread. Still, relying on timer |
844 | is not always good, because it may "tick" too slow (large timer_interval) |
845 | |
846 | We use following strategy to solve this problem - if queue was not empty |
847 | we suspect flood of network events and listener stays, Otherwise, it |
848 | handles a query. |
849 | |
850 | |
851 | Q2: If queue is not empty, how many workers to wake? |
852 | |
853 | Solution: |
854 | We generally try to keep one thread per group active (threads handling |
855 | queries are considered active, unless they stuck in inside some "wait") |
856 | Thus, we will wake only one worker, and only if there is not active |
857 | threads currently,and listener is not going to handle a query. When we |
858 | don't wake, we hope that currently active threads will finish fast and |
859 | handle the queue. If this does not happen, timer thread will detect stall |
860 | and wake a worker. |
861 | |
862 | NOTE: Currently nothing is done to detect or prevent long queuing times. |
863 | A solution for the future would be to give up "one active thread per |
864 | group" principle, if events stay in the queue for too long, and just wake |
865 | more workers. |
866 | */ |
867 | |
868 | bool listener_picks_event=is_queue_empty(thread_group); |
869 | queue_put(thread_group, ev, cnt); |
870 | if (listener_picks_event) |
871 | { |
872 | /* Handle the first event. */ |
873 | retval= queue_get(thread_group); |
874 | mysql_mutex_unlock(&thread_group->mutex); |
875 | break; |
876 | } |
877 | |
878 | if(thread_group->active_thread_count==0) |
879 | { |
880 | /* We added some work items to queue, now wake a worker. */ |
881 | if(wake_thread(thread_group)) |
882 | { |
883 | /* |
884 | Wake failed, hence groups has no idle threads. Now check if there are |
885 | any threads in the group except listener. |
886 | */ |
887 | if(thread_group->thread_count == 1) |
888 | { |
889 | /* |
890 | Currently there is no worker thread in the group, as indicated by |
891 | thread_count == 1 (this means listener is the only one thread in |
892 | the group). |
893 | The queue is not empty, and listener is not going to handle |
894 | events. In order to drain the queue, we create a worker here. |
895 | Alternatively, we could just rely on timer to detect stall, and |
896 | create thread, but waiting for timer would be an inefficient and |
897 | pointless delay. |
898 | */ |
899 | create_worker(thread_group); |
900 | } |
901 | } |
902 | } |
903 | mysql_mutex_unlock(&thread_group->mutex); |
904 | } |
905 | |
906 | DBUG_RETURN(retval); |
907 | } |
908 | |
909 | /** |
910 | Adjust thread counters in group or global |
911 | whenever thread is created or is about to exit |
912 | |
913 | @param thread_group |
914 | @param count - 1, when new thread is created |
915 | -1, when thread is about to exit |
916 | */ |
917 | |
918 | static void add_thread_count(thread_group_t *thread_group, int32 count) |
919 | { |
920 | thread_group->thread_count += count; |
921 | /* worker starts out and end in "active" state */ |
922 | thread_group->active_thread_count += count; |
923 | my_atomic_add32(&tp_stats.num_worker_threads, count); |
924 | } |
925 | |
926 | |
927 | /** |
928 | Creates a new worker thread. |
929 | thread_mutex must be held when calling this function |
930 | |
931 | NOTE: in rare cases, the number of threads can exceed |
932 | threadpool_max_threads, because we need at least 2 threads |
933 | per group to prevent deadlocks (one listener + one worker) |
934 | */ |
935 | |
936 | static int create_worker(thread_group_t *thread_group) |
937 | { |
938 | pthread_t thread_id; |
939 | bool max_threads_reached= false; |
940 | int err; |
941 | |
942 | DBUG_ENTER("create_worker" ); |
943 | if (tp_stats.num_worker_threads >= (int)threadpool_max_threads |
944 | && thread_group->thread_count >= 2) |
945 | { |
946 | err= 1; |
947 | max_threads_reached= true; |
948 | goto end; |
949 | } |
950 | |
951 | |
952 | err= mysql_thread_create(key_worker_thread, &thread_id, |
953 | thread_group->pthread_attr, worker_main, thread_group); |
954 | if (!err) |
955 | { |
956 | thread_group->last_thread_creation_time=microsecond_interval_timer(); |
957 | statistic_increment(thread_created,&LOCK_status); |
958 | add_thread_count(thread_group, 1); |
959 | } |
960 | else |
961 | { |
962 | my_errno= errno; |
963 | } |
964 | |
965 | end: |
966 | if (err) |
967 | print_pool_blocked_message(max_threads_reached); |
968 | else |
969 | pool_block_start= 0; /* Reset pool blocked timer, if it was set */ |
970 | |
971 | DBUG_RETURN(err); |
972 | } |
973 | |
974 | |
975 | /** |
976 | Calculate microseconds throttling delay for thread creation. |
977 | |
978 | The value depends on how many threads are already in the group: |
979 | small number of threads means no delay, the more threads the larger |
980 | the delay. |
981 | |
982 | The actual values were not calculated using any scientific methods. |
983 | They just look right, and behave well in practice. |
984 | |
985 | TODO: Should throttling depend on thread_pool_stall_limit? |
986 | */ |
987 | static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) |
988 | { |
989 | int count= thread_group->thread_count; |
990 | |
991 | if (count < 4) |
992 | return 0; |
993 | |
994 | if (count < 8) |
995 | return 50*1000; |
996 | |
997 | if(count < 16) |
998 | return 100*1000; |
999 | |
1000 | return 200*1000; |
1001 | } |
1002 | |
1003 | |
1004 | /** |
1005 | Wakes a worker thread, or creates a new one. |
1006 | |
1007 | Worker creation is throttled, so we avoid too many threads |
1008 | to be created during the short time. |
1009 | */ |
1010 | static int wake_or_create_thread(thread_group_t *thread_group) |
1011 | { |
1012 | DBUG_ENTER("wake_or_create_thread" ); |
1013 | |
1014 | if (thread_group->shutdown) |
1015 | DBUG_RETURN(0); |
1016 | |
1017 | if (wake_thread(thread_group) == 0) |
1018 | DBUG_RETURN(0); |
1019 | |
1020 | if (thread_group->thread_count > thread_group->connection_count) |
1021 | DBUG_RETURN(-1); |
1022 | |
1023 | |
1024 | if (thread_group->active_thread_count == 0) |
1025 | { |
1026 | /* |
1027 | We're better off creating a new thread here with no delay, either there |
1028 | are no workers at all, or they all are all blocking and there was no |
1029 | idle thread to wakeup. Smells like a potential deadlock or very slowly |
1030 | executing requests, e.g sleeps or user locks. |
1031 | */ |
1032 | DBUG_RETURN(create_worker(thread_group)); |
1033 | } |
1034 | |
1035 | ulonglong now = microsecond_interval_timer(); |
1036 | ulonglong time_since_last_thread_created = |
1037 | (now - thread_group->last_thread_creation_time); |
1038 | |
1039 | /* Throttle thread creation. */ |
1040 | if (time_since_last_thread_created > |
1041 | microsecond_throttling_interval(thread_group)) |
1042 | { |
1043 | DBUG_RETURN(create_worker(thread_group)); |
1044 | } |
1045 | |
1046 | DBUG_RETURN(-1); |
1047 | } |
1048 | |
1049 | |
1050 | |
1051 | int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) |
1052 | { |
1053 | DBUG_ENTER("thread_group_init" ); |
1054 | thread_group->pthread_attr = thread_attr; |
1055 | mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); |
1056 | thread_group->pollfd= INVALID_HANDLE_VALUE; |
1057 | thread_group->shutdown_pipe[0]= -1; |
1058 | thread_group->shutdown_pipe[1]= -1; |
1059 | queue_init(thread_group); |
1060 | DBUG_RETURN(0); |
1061 | } |
1062 | |
1063 | |
1064 | void thread_group_destroy(thread_group_t *thread_group) |
1065 | { |
1066 | mysql_mutex_destroy(&thread_group->mutex); |
1067 | if (thread_group->pollfd != INVALID_HANDLE_VALUE) |
1068 | { |
1069 | io_poll_close(thread_group->pollfd); |
1070 | thread_group->pollfd= INVALID_HANDLE_VALUE; |
1071 | } |
1072 | #ifndef HAVE_IOCP |
1073 | for(int i=0; i < 2; i++) |
1074 | { |
1075 | if(thread_group->shutdown_pipe[i] != -1) |
1076 | { |
1077 | close(thread_group->shutdown_pipe[i]); |
1078 | thread_group->shutdown_pipe[i]= -1; |
1079 | } |
1080 | } |
1081 | #endif |
1082 | |
1083 | if (my_atomic_add32(&shutdown_group_count, -1) == 1) |
1084 | my_free(all_groups); |
1085 | } |
1086 | |
1087 | /** |
1088 | Wake sleeping thread from waiting list |
1089 | */ |
1090 | |
1091 | static int wake_thread(thread_group_t *thread_group) |
1092 | { |
1093 | DBUG_ENTER("wake_thread" ); |
1094 | worker_thread_t *thread = thread_group->waiting_threads.front(); |
1095 | if(thread) |
1096 | { |
1097 | thread->woken= true; |
1098 | thread_group->waiting_threads.remove(thread); |
1099 | mysql_cond_signal(&thread->cond); |
1100 | DBUG_RETURN(0); |
1101 | } |
1102 | DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ |
1103 | } |
1104 | |
1105 | /* |
1106 | Wake listener thread (during shutdown) |
1107 | Self-pipe trick is used in most cases,except IOCP. |
1108 | */ |
1109 | static int wake_listener(thread_group_t *thread_group) |
1110 | { |
1111 | #ifndef HAVE_IOCP |
1112 | if (pipe(thread_group->shutdown_pipe)) |
1113 | { |
1114 | return -1; |
1115 | } |
1116 | |
1117 | /* Wake listener */ |
1118 | if (io_poll_associate_fd(thread_group->pollfd, |
1119 | thread_group->shutdown_pipe[0], NULL, NULL)) |
1120 | { |
1121 | return -1; |
1122 | } |
1123 | char c= 0; |
1124 | if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) |
1125 | return -1; |
1126 | #else |
1127 | PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0); |
1128 | #endif |
1129 | return 0; |
1130 | } |
1131 | /** |
1132 | Initiate shutdown for thread group. |
1133 | |
1134 | The shutdown is asynchronous, we only care to wake all threads in here, so |
1135 | they can finish. We do not wait here until threads terminate. Final cleanup |
1136 | of the group (thread_group_destroy) will be done by the last exiting threads. |
1137 | */ |
1138 | |
1139 | static void thread_group_close(thread_group_t *thread_group) |
1140 | { |
1141 | DBUG_ENTER("thread_group_close" ); |
1142 | |
1143 | mysql_mutex_lock(&thread_group->mutex); |
1144 | if (thread_group->thread_count == 0) |
1145 | { |
1146 | mysql_mutex_unlock(&thread_group->mutex); |
1147 | thread_group_destroy(thread_group); |
1148 | DBUG_VOID_RETURN; |
1149 | } |
1150 | |
1151 | thread_group->shutdown= true; |
1152 | thread_group->listener= NULL; |
1153 | |
1154 | wake_listener(thread_group); |
1155 | |
1156 | /* Wake all workers. */ |
1157 | while(wake_thread(thread_group) == 0) |
1158 | { |
1159 | } |
1160 | |
1161 | mysql_mutex_unlock(&thread_group->mutex); |
1162 | |
1163 | DBUG_VOID_RETURN; |
1164 | } |
1165 | |
1166 | |
1167 | /* |
1168 | Add work to the queue. Maybe wake a worker if they all sleep. |
1169 | |
1170 | Currently, this function is only used when new connections need to |
1171 | perform login (this is done in worker threads). |
1172 | |
1173 | */ |
1174 | |
1175 | static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection) |
1176 | { |
1177 | DBUG_ENTER("queue_put" ); |
1178 | |
1179 | connection->dequeue_time= pool_timer.current_microtime; |
1180 | thread_group->queues[connection->priority].push_back(connection); |
1181 | |
1182 | if (thread_group->active_thread_count == 0) |
1183 | wake_or_create_thread(thread_group); |
1184 | |
1185 | DBUG_VOID_RETURN; |
1186 | } |
1187 | |
1188 | |
1189 | /* |
1190 | Prevent too many threads executing at the same time,if the workload is |
1191 | not CPU bound. |
1192 | */ |
1193 | |
1194 | static bool too_many_threads(thread_group_t *thread_group) |
1195 | { |
1196 | return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe |
1197 | && !thread_group->stalled); |
1198 | } |
1199 | |
1200 | |
1201 | /** |
1202 | Retrieve a connection with pending event. |
1203 | |
1204 | Pending event in our case means that there is either a pending login request |
1205 | (if connection is not yet logged in), or there are unread bytes on the socket. |
1206 | |
1207 | If there are no pending events currently, thread will wait. |
1208 | If timeout specified in abstime parameter passes, the function returns NULL. |
1209 | |
1210 | @param current_thread - current worker thread |
1211 | @param thread_group - current thread group |
1212 | @param abstime - absolute wait timeout |
1213 | |
1214 | @return |
1215 | connection with pending event. |
1216 | NULL is returned if timeout has expired,or on shutdown. |
1217 | */ |
1218 | |
1219 | TP_connection_generic *get_event(worker_thread_t *current_thread, |
1220 | thread_group_t *thread_group, struct timespec *abstime) |
1221 | { |
1222 | DBUG_ENTER("get_event" ); |
1223 | TP_connection_generic *connection = NULL; |
1224 | |
1225 | |
1226 | mysql_mutex_lock(&thread_group->mutex); |
1227 | DBUG_ASSERT(thread_group->active_thread_count >= 0); |
1228 | |
1229 | for(;;) |
1230 | { |
1231 | int err=0; |
1232 | bool oversubscribed = too_many_threads(thread_group); |
1233 | if (thread_group->shutdown) |
1234 | break; |
1235 | |
1236 | /* Check if queue is not empty */ |
1237 | if (!oversubscribed) |
1238 | { |
1239 | connection = queue_get(thread_group); |
1240 | if(connection) |
1241 | break; |
1242 | } |
1243 | |
1244 | /* If there is currently no listener in the group, become one. */ |
1245 | if(!thread_group->listener) |
1246 | { |
1247 | thread_group->listener= current_thread; |
1248 | thread_group->active_thread_count--; |
1249 | mysql_mutex_unlock(&thread_group->mutex); |
1250 | |
1251 | connection = listener(current_thread, thread_group); |
1252 | |
1253 | mysql_mutex_lock(&thread_group->mutex); |
1254 | thread_group->active_thread_count++; |
1255 | /* There is no listener anymore, it just returned. */ |
1256 | thread_group->listener= NULL; |
1257 | break; |
1258 | } |
1259 | |
1260 | |
1261 | /* |
1262 | Last thing we try before going to sleep is to |
1263 | non-blocking event poll, i.e with timeout = 0. |
1264 | If this returns events, pick one |
1265 | */ |
1266 | if (!oversubscribed) |
1267 | { |
1268 | |
1269 | native_event ev[MAX_EVENTS]; |
1270 | int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); |
1271 | if (cnt > 0) |
1272 | { |
1273 | queue_put(thread_group, ev, cnt); |
1274 | connection= queue_get(thread_group); |
1275 | break; |
1276 | } |
1277 | } |
1278 | |
1279 | |
1280 | /* And now, finally sleep */ |
1281 | current_thread->woken = false; /* wake() sets this to true */ |
1282 | |
1283 | /* |
1284 | Add current thread to the head of the waiting list and wait. |
1285 | It is important to add thread to the head rather than tail |
1286 | as it ensures LIFO wakeup order (hot caches, working inactivity timeout) |
1287 | */ |
1288 | thread_group->waiting_threads.push_front(current_thread); |
1289 | |
1290 | thread_group->active_thread_count--; |
1291 | if (abstime) |
1292 | { |
1293 | err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, |
1294 | abstime); |
1295 | } |
1296 | else |
1297 | { |
1298 | err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); |
1299 | } |
1300 | thread_group->active_thread_count++; |
1301 | |
1302 | if (!current_thread->woken) |
1303 | { |
1304 | /* |
1305 | Thread was not signalled by wake(), it might be a spurious wakeup or |
1306 | a timeout. Anyhow, we need to remove ourselves from the list now. |
1307 | If thread was explicitly woken, than caller removed us from the list. |
1308 | */ |
1309 | thread_group->waiting_threads.remove(current_thread); |
1310 | } |
1311 | |
1312 | if (err) |
1313 | break; |
1314 | } |
1315 | |
1316 | thread_group->stalled= false; |
1317 | mysql_mutex_unlock(&thread_group->mutex); |
1318 | |
1319 | DBUG_RETURN(connection); |
1320 | } |
1321 | |
1322 | |
1323 | |
1324 | /** |
1325 | Tells the pool that worker starts waiting on IO, lock, condition, |
1326 | sleep() or similar. |
1327 | */ |
1328 | |
1329 | void wait_begin(thread_group_t *thread_group) |
1330 | { |
1331 | DBUG_ENTER("wait_begin" ); |
1332 | mysql_mutex_lock(&thread_group->mutex); |
1333 | thread_group->active_thread_count--; |
1334 | |
1335 | DBUG_ASSERT(thread_group->active_thread_count >=0); |
1336 | DBUG_ASSERT(thread_group->connection_count > 0); |
1337 | |
1338 | if ((thread_group->active_thread_count == 0) && |
1339 | (is_queue_empty(thread_group) || !thread_group->listener)) |
1340 | { |
1341 | /* |
1342 | Group might stall while this thread waits, thus wake |
1343 | or create a worker to prevent stall. |
1344 | */ |
1345 | wake_or_create_thread(thread_group); |
1346 | } |
1347 | |
1348 | mysql_mutex_unlock(&thread_group->mutex); |
1349 | DBUG_VOID_RETURN; |
1350 | } |
1351 | |
1352 | /** |
1353 | Tells the pool has finished waiting. |
1354 | */ |
1355 | |
1356 | void wait_end(thread_group_t *thread_group) |
1357 | { |
1358 | DBUG_ENTER("wait_end" ); |
1359 | mysql_mutex_lock(&thread_group->mutex); |
1360 | thread_group->active_thread_count++; |
1361 | mysql_mutex_unlock(&thread_group->mutex); |
1362 | DBUG_VOID_RETURN; |
1363 | } |
1364 | |
1365 | |
1366 | |
1367 | |
1368 | TP_connection * TP_pool_generic::new_connection(CONNECT *c) |
1369 | { |
1370 | return new (std::nothrow) TP_connection_generic(c); |
1371 | } |
1372 | |
1373 | /** |
1374 | Add a new connection to thread pool.. |
1375 | */ |
1376 | |
1377 | void TP_pool_generic::add(TP_connection *c) |
1378 | { |
1379 | DBUG_ENTER("tp_add_connection" ); |
1380 | |
1381 | TP_connection_generic *connection=(TP_connection_generic *)c; |
1382 | thread_group_t *thread_group= connection->thread_group; |
1383 | /* |
1384 | Add connection to the work queue.Actual logon |
1385 | will be done by a worker thread. |
1386 | */ |
1387 | mysql_mutex_lock(&thread_group->mutex); |
1388 | queue_put(thread_group, connection); |
1389 | mysql_mutex_unlock(&thread_group->mutex); |
1390 | DBUG_VOID_RETURN; |
1391 | } |
1392 | |
1393 | |
1394 | |
1395 | /** |
1396 | MySQL scheduler callback: wait begin |
1397 | */ |
1398 | |
1399 | void TP_connection_generic::wait_begin(int type) |
1400 | { |
1401 | DBUG_ENTER("wait_begin" ); |
1402 | |
1403 | DBUG_ASSERT(!waiting); |
1404 | waiting++; |
1405 | if (waiting == 1) |
1406 | ::wait_begin(thread_group); |
1407 | DBUG_VOID_RETURN; |
1408 | } |
1409 | |
1410 | |
1411 | /** |
1412 | MySQL scheduler callback: wait end |
1413 | */ |
1414 | |
1415 | void TP_connection_generic::wait_end() |
1416 | { |
1417 | DBUG_ENTER("wait_end" ); |
1418 | DBUG_ASSERT(waiting); |
1419 | waiting--; |
1420 | if (waiting == 0) |
1421 | ::wait_end(thread_group); |
1422 | DBUG_VOID_RETURN; |
1423 | } |
1424 | |
1425 | |
1426 | static void set_next_timeout_check(ulonglong abstime) |
1427 | { |
1428 | DBUG_ENTER("set_next_timeout_check" ); |
1429 | while(abstime < pool_timer.next_timeout_check) |
1430 | { |
1431 | longlong old= (longlong)pool_timer.next_timeout_check; |
1432 | my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check, |
1433 | &old, abstime); |
1434 | } |
1435 | DBUG_VOID_RETURN; |
1436 | } |
1437 | |
1438 | TP_connection_generic::TP_connection_generic(CONNECT *c): |
1439 | TP_connection(c), |
1440 | thread_group(0), |
1441 | next_in_queue(0), |
1442 | prev_in_queue(0), |
1443 | abs_wait_timeout(ULONGLONG_MAX), |
1444 | bound_to_poll_descriptor(false), |
1445 | waiting(false) |
1446 | #ifdef HAVE_IOCP |
1447 | , overlapped() |
1448 | #endif |
1449 | { |
1450 | DBUG_ASSERT(c->vio); |
1451 | |
1452 | #ifdef _WIN32 |
1453 | vio_type= c->vio->type; |
1454 | fd= (vio_type == VIO_TYPE_NAMEDPIPE) ? |
1455 | c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket); |
1456 | #else |
1457 | fd= mysql_socket_getfd(c->vio->mysql_socket); |
1458 | #endif |
1459 | |
1460 | /* Assign connection to a group. */ |
1461 | thread_group_t *group= |
1462 | &all_groups[c->thread_id%group_count]; |
1463 | |
1464 | thread_group=group; |
1465 | |
1466 | mysql_mutex_lock(&group->mutex); |
1467 | group->connection_count++; |
1468 | mysql_mutex_unlock(&group->mutex); |
1469 | } |
1470 | |
1471 | TP_connection_generic::~TP_connection_generic() |
1472 | { |
1473 | mysql_mutex_lock(&thread_group->mutex); |
1474 | thread_group->connection_count--; |
1475 | mysql_mutex_unlock(&thread_group->mutex); |
1476 | } |
1477 | |
1478 | /** |
1479 | Set wait timeout for connection. |
1480 | */ |
1481 | |
1482 | void TP_connection_generic::set_io_timeout(int timeout_sec) |
1483 | { |
1484 | DBUG_ENTER("set_wait_timeout" ); |
1485 | /* |
1486 | Calculate wait deadline for this connection. |
1487 | Instead of using microsecond_interval_timer() which has a syscall |
1488 | overhead, use pool_timer.current_microtime and take |
1489 | into account that its value could be off by at most |
1490 | one tick interval. |
1491 | */ |
1492 | |
1493 | abs_wait_timeout= pool_timer.current_microtime + |
1494 | 1000LL*pool_timer.tick_interval + |
1495 | 1000000LL*timeout_sec; |
1496 | |
1497 | set_next_timeout_check(abs_wait_timeout); |
1498 | DBUG_VOID_RETURN; |
1499 | } |
1500 | |
1501 | |
1502 | #ifndef HAVE_IOCP |
1503 | /** |
1504 | Handle a (rare) special case,where connection needs to |
1505 | migrate to a different group because group_count has changed |
1506 | after thread_pool_size setting. |
1507 | */ |
1508 | |
1509 | static int change_group(TP_connection_generic *c, |
1510 | thread_group_t *old_group, |
1511 | thread_group_t *new_group) |
1512 | { |
1513 | int ret= 0; |
1514 | |
1515 | DBUG_ASSERT(c->thread_group == old_group); |
1516 | |
1517 | /* Remove connection from the old group. */ |
1518 | mysql_mutex_lock(&old_group->mutex); |
1519 | if (c->bound_to_poll_descriptor) |
1520 | { |
1521 | io_poll_disassociate_fd(old_group->pollfd,c->fd); |
1522 | c->bound_to_poll_descriptor= false; |
1523 | } |
1524 | c->thread_group->connection_count--; |
1525 | mysql_mutex_unlock(&old_group->mutex); |
1526 | |
1527 | /* Add connection to the new group. */ |
1528 | mysql_mutex_lock(&new_group->mutex); |
1529 | c->thread_group= new_group; |
1530 | new_group->connection_count++; |
1531 | /* Ensure that there is a listener in the new group. */ |
1532 | if (!new_group->thread_count) |
1533 | ret= create_worker(new_group); |
1534 | mysql_mutex_unlock(&new_group->mutex); |
1535 | return ret; |
1536 | } |
1537 | #endif |
1538 | |
1539 | int TP_connection_generic::start_io() |
1540 | { |
1541 | #ifndef HAVE_IOCP |
1542 | /* |
1543 | Usually, connection will stay in the same group for the entire |
1544 | connection's life. However, we do allow group_count to |
1545 | change at runtime, which means in rare cases when it changes is |
1546 | connection should need to migrate to another group, this ensures |
1547 | to ensure equal load between groups. |
1548 | |
1549 | So we recalculate in which group the connection should be, based |
1550 | on thread_id and current group count, and migrate if necessary. |
1551 | */ |
1552 | thread_group_t *group = |
1553 | &all_groups[thd->thread_id%group_count]; |
1554 | |
1555 | if (group != thread_group) |
1556 | { |
1557 | if (change_group(this, thread_group, group)) |
1558 | return -1; |
1559 | } |
1560 | #endif |
1561 | |
1562 | /* |
1563 | Bind to poll descriptor if not yet done. |
1564 | */ |
1565 | if (!bound_to_poll_descriptor) |
1566 | { |
1567 | bound_to_poll_descriptor= true; |
1568 | return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); |
1569 | } |
1570 | |
1571 | return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); |
1572 | } |
1573 | |
1574 | |
1575 | |
1576 | /** |
1577 | Worker thread's main |
1578 | */ |
1579 | |
1580 | static void *worker_main(void *param) |
1581 | { |
1582 | |
1583 | worker_thread_t this_thread; |
1584 | pthread_detach_this_thread(); |
1585 | my_thread_init(); |
1586 | |
1587 | DBUG_ENTER("worker_main" ); |
1588 | |
1589 | thread_group_t *thread_group = (thread_group_t *)param; |
1590 | |
1591 | /* Init per-thread structure */ |
1592 | mysql_cond_init(key_worker_cond, &this_thread.cond, NULL); |
1593 | this_thread.thread_group= thread_group; |
1594 | this_thread.event_count=0; |
1595 | |
1596 | /* Run event loop */ |
1597 | for(;;) |
1598 | { |
1599 | TP_connection_generic *connection; |
1600 | struct timespec ts; |
1601 | set_timespec(ts,threadpool_idle_timeout); |
1602 | connection = get_event(&this_thread, thread_group, &ts); |
1603 | if (!connection) |
1604 | break; |
1605 | this_thread.event_count++; |
1606 | tp_callback(connection); |
1607 | } |
1608 | |
1609 | /* Thread shutdown: cleanup per-worker-thread structure. */ |
1610 | mysql_cond_destroy(&this_thread.cond); |
1611 | |
1612 | bool last_thread; /* last thread in group exits */ |
1613 | mysql_mutex_lock(&thread_group->mutex); |
1614 | add_thread_count(thread_group, -1); |
1615 | last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown); |
1616 | mysql_mutex_unlock(&thread_group->mutex); |
1617 | |
1618 | /* Last thread in group exits and pool is terminating, destroy group.*/ |
1619 | if (last_thread) |
1620 | thread_group_destroy(thread_group); |
1621 | |
1622 | my_thread_end(); |
1623 | return NULL; |
1624 | } |
1625 | |
1626 | |
1627 | TP_pool_generic::TP_pool_generic() |
1628 | {} |
1629 | |
1630 | int TP_pool_generic::init() |
1631 | { |
1632 | DBUG_ENTER("TP_pool_generic::TP_pool_generic" ); |
1633 | threadpool_max_size= MY_MAX(threadpool_size, 128); |
1634 | all_groups= (thread_group_t *) |
1635 | my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL)); |
1636 | if (!all_groups) |
1637 | { |
1638 | threadpool_max_size= 0; |
1639 | sql_print_error("Allocation failed" ); |
1640 | DBUG_RETURN(-1); |
1641 | } |
1642 | scheduler_init(); |
1643 | threadpool_started= true; |
1644 | for (uint i= 0; i < threadpool_max_size; i++) |
1645 | { |
1646 | thread_group_init(&all_groups[i], get_connection_attrib()); |
1647 | } |
1648 | set_pool_size(threadpool_size); |
1649 | if(group_count == 0) |
1650 | { |
1651 | /* Something went wrong */ |
1652 | sql_print_error("Can't set threadpool size to %d" ,threadpool_size); |
1653 | DBUG_RETURN(-1); |
1654 | } |
1655 | PSI_register(mutex); |
1656 | PSI_register(cond); |
1657 | PSI_register(thread); |
1658 | |
1659 | pool_timer.tick_interval= threadpool_stall_limit; |
1660 | start_timer(&pool_timer); |
1661 | DBUG_RETURN(0); |
1662 | } |
1663 | |
1664 | TP_pool_generic::~TP_pool_generic() |
1665 | { |
1666 | DBUG_ENTER("tp_end" ); |
1667 | |
1668 | if (!threadpool_started) |
1669 | DBUG_VOID_RETURN; |
1670 | |
1671 | stop_timer(&pool_timer); |
1672 | shutdown_group_count= threadpool_max_size; |
1673 | for (uint i= 0; i < threadpool_max_size; i++) |
1674 | { |
1675 | thread_group_close(&all_groups[i]); |
1676 | } |
1677 | threadpool_started= false; |
1678 | DBUG_VOID_RETURN; |
1679 | } |
1680 | |
1681 | |
1682 | /** Ensure that poll descriptors are created when threadpool_size changes */ |
1683 | int TP_pool_generic::set_pool_size(uint size) |
1684 | { |
1685 | bool success= true; |
1686 | |
1687 | for(uint i=0; i< size; i++) |
1688 | { |
1689 | thread_group_t *group= &all_groups[i]; |
1690 | mysql_mutex_lock(&group->mutex); |
1691 | if (group->pollfd == INVALID_HANDLE_VALUE) |
1692 | { |
1693 | group->pollfd= io_poll_create(); |
1694 | success= (group->pollfd != INVALID_HANDLE_VALUE); |
1695 | if(!success) |
1696 | { |
1697 | sql_print_error("io_poll_create() failed, errno=%d\n" , errno); |
1698 | } |
1699 | } |
1700 | mysql_mutex_unlock(&group->mutex); |
1701 | if (!success) |
1702 | { |
1703 | group_count= i; |
1704 | return -1; |
1705 | } |
1706 | } |
1707 | group_count= size; |
1708 | return 0; |
1709 | } |
1710 | |
1711 | int TP_pool_generic::set_stall_limit(uint limit) |
1712 | { |
1713 | mysql_mutex_lock(&(pool_timer.mutex)); |
1714 | pool_timer.tick_interval= limit; |
1715 | mysql_mutex_unlock(&(pool_timer.mutex)); |
1716 | mysql_cond_signal(&(pool_timer.cond)); |
1717 | return 0; |
1718 | } |
1719 | |
1720 | |
1721 | /** |
1722 | Calculate number of idle/waiting threads in the pool. |
1723 | |
1724 | Sum idle threads over all groups. |
1725 | Don't do any locking, it is not required for stats. |
1726 | */ |
1727 | |
1728 | int TP_pool_generic::get_idle_thread_count() |
1729 | { |
1730 | int sum=0; |
1731 | for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) |
1732 | { |
1733 | sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count); |
1734 | } |
1735 | return sum; |
1736 | } |
1737 | |
1738 | |
1739 | /* Report threadpool problems */ |
1740 | |
1741 | /** |
1742 | Delay in microseconds, after which "pool blocked" message is printed. |
1743 | (30 sec == 30 Mio usec) |
1744 | */ |
1745 | #define BLOCK_MSG_DELAY (30*1000000) |
1746 | |
1747 | #define MAX_THREADS_REACHED_MSG \ |
1748 | "Threadpool could not create additional thread to handle queries, because the \ |
1749 | number of allowed threads was reached. Increasing 'thread_pool_max_threads' \ |
1750 | parameter can help in this situation.\n \ |
1751 | If 'extra_port' parameter is set, you can still connect to the database with \ |
1752 | superuser account (it must be TCP connection using extra_port as TCP port) \ |
1753 | and troubleshoot the situation. \ |
1754 | A likely cause of pool blocks are clients that lock resources for long time. \ |
1755 | 'show processlist' or 'show engine innodb status' can give additional hints." |
1756 | |
1757 | #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)." |
1758 | |
1759 | /** |
1760 | Write a message when blocking situation in threadpool occurs. |
1761 | The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds. |
1762 | It will be just a single message for each blocking situation (to prevent |
1763 | log flood). |
1764 | */ |
1765 | |
1766 | static void print_pool_blocked_message(bool max_threads_reached) |
1767 | { |
1768 | ulonglong now; |
1769 | static bool msg_written; |
1770 | |
1771 | now= microsecond_interval_timer(); |
1772 | if (pool_block_start == 0) |
1773 | { |
1774 | pool_block_start= now; |
1775 | msg_written = false; |
1776 | return; |
1777 | } |
1778 | |
1779 | if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) |
1780 | { |
1781 | if (max_threads_reached) |
1782 | sql_print_error(MAX_THREADS_REACHED_MSG); |
1783 | else |
1784 | sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno); |
1785 | |
1786 | sql_print_information("Threadpool has been blocked for %u seconds\n" , |
1787 | (uint)((now- pool_block_start)/1000000)); |
1788 | /* avoid reperated messages for the same blocking situation */ |
1789 | msg_written= true; |
1790 | } |
1791 | } |
1792 | |
1793 | #endif /* HAVE_POOL_OF_THREADS */ |
1794 | |