1/*
2 Copyright (c) 2012 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 or later of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17/*
18 Implementation if OS independent timers.
19 This is done based on pthread primitives, especially pthread_cond_timedwait()
20*/
21
22#include "mysys_priv.h"
23#include "thr_timer.h"
24#include <m_string.h>
25#include <queues.h>
26#ifdef HAVE_TIMER_CREATE
27#include <sys/syscall.h>
28#endif
29
30struct timespec next_timer_expire_time;
31
32static my_bool thr_timer_inited= 0;
33static mysql_mutex_t LOCK_timer;
34static mysql_cond_t COND_timer;
35static QUEUE timer_queue;
36pthread_t timer_thread;
37
38#define set_max_time(abs_time) \
39 { (abs_time)->MY_tv_sec= INT_MAX32; (abs_time)->MY_tv_nsec= 0; }
40
41
42static void *timer_handler(void *arg __attribute__((unused)));
43
44/*
45 Compare two timespecs
46*/
47
48static int compare_timespec(void *not_used __attribute__((unused)),
49 uchar *a_ptr, uchar *b_ptr)
50{
51 return cmp_timespec((*(struct timespec*) a_ptr),
52 (*(struct timespec*) b_ptr));
53}
54
55
56/**
57 Initialize timer variables and create timer thread
58
59 @param alloc_timers Init allocation of timers. Will be autoextended
60 if needed
61 @return 0 ok
62 @return 1 error; Can't create thread
63*/
64
65static thr_timer_t max_timer_data;
66
67my_bool init_thr_timer(uint alloc_timers)
68{
69 pthread_attr_t thr_attr;
70 my_bool res= 0;
71 DBUG_ENTER("init_thr_timer");
72
73 init_queue(&timer_queue, alloc_timers+2, offsetof(thr_timer_t,expire_time),
74 0, compare_timespec, NullS,
75 offsetof(thr_timer_t, index_in_queue)+1, 1);
76 mysql_mutex_init(key_LOCK_timer, &LOCK_timer, MY_MUTEX_INIT_FAST);
77 mysql_cond_init(key_COND_timer, &COND_timer, NULL);
78
79 /* Set dummy element with max time into the queue to simplify usage */
80 bzero(&max_timer_data, sizeof(max_timer_data));
81 set_max_time(&max_timer_data.expire_time);
82 queue_insert(&timer_queue, (uchar*) &max_timer_data);
83 next_timer_expire_time= max_timer_data.expire_time;
84
85 /* Create a thread to handle timers */
86 pthread_attr_init(&thr_attr);
87 pthread_attr_setscope(&thr_attr,PTHREAD_SCOPE_PROCESS);
88 pthread_attr_setstacksize(&thr_attr,8196);
89 thr_timer_inited= 1;
90 if (mysql_thread_create(key_thread_timer, &timer_thread, &thr_attr,
91 timer_handler, NULL))
92 {
93 thr_timer_inited= 0;
94 res= 1;
95 mysql_mutex_destroy(&LOCK_timer);
96 mysql_cond_destroy(&COND_timer);
97 delete_queue(&timer_queue);
98 }
99 pthread_attr_destroy(&thr_attr);
100
101 DBUG_RETURN(res);
102}
103
104
105void end_thr_timer(void)
106{
107 DBUG_ENTER("end_thr_timer");
108
109 if (!thr_timer_inited)
110 DBUG_VOID_RETURN;
111
112 mysql_mutex_lock(&LOCK_timer);
113 thr_timer_inited= 0; /* Signal abort */
114 mysql_cond_signal(&COND_timer);
115 mysql_mutex_unlock(&LOCK_timer);
116 pthread_join(timer_thread, NULL);
117
118 mysql_mutex_destroy(&LOCK_timer);
119 mysql_cond_destroy(&COND_timer);
120 delete_queue(&timer_queue);
121 DBUG_VOID_RETURN;
122}
123
124
125/*
126 Initialize a timer object
127
128 @param timer_data Timer structure
129 @param function Function to be called when getting timeout
130 @param argument Argument for function
131*/
132
133void thr_timer_init(thr_timer_t *timer_data, void(*function)(void*),
134 void *arg)
135{
136 DBUG_ENTER("thr_timer_init");
137 bzero(timer_data, sizeof(*timer_data));
138 timer_data->func= function;
139 timer_data->func_arg= arg;
140 timer_data->expired= 1; /* Not active */
141 DBUG_VOID_RETURN;
142}
143
144
145/*
146 Request timer after X milliseconds
147
148 SYNOPSIS
149 thr_timer()
150 timer_data Pointer to timer structure
151 micro_seconds; Number of microseconds until timer
152
153 RETURN VALUES
154 0 ok
155 1 If no more timers are allowed (aborted by process)
156
157 Stores in first argument a pointer to a non-zero int which is set to 0
158 when the timer has been given
159*/
160
161my_bool thr_timer_settime(thr_timer_t *timer_data, ulonglong micro_seconds)
162{
163 int reschedule;
164 DBUG_ENTER("thr_timer_settime");
165 DBUG_PRINT("enter",("thread: %s micro_seconds: %llu",my_thread_name(),
166 micro_seconds));
167
168 DBUG_ASSERT(timer_data->expired == 1);
169
170 set_timespec_nsec(timer_data->expire_time, micro_seconds*1000);
171 timer_data->expired= 0;
172
173 mysql_mutex_lock(&LOCK_timer); /* Lock from threads & timers */
174 if (queue_insert_safe(&timer_queue,(uchar*) timer_data))
175 {
176 DBUG_PRINT("info", ("timer queue full"));
177 fprintf(stderr,"Warning: thr_timer queue is full\n");
178 timer_data->expired= 1;
179 mysql_mutex_unlock(&LOCK_timer);
180 DBUG_RETURN(1);
181 }
182
183 /* Reschedule timer if the current one has more time left than new one */
184 reschedule= cmp_timespec(next_timer_expire_time, timer_data->expire_time);
185 mysql_mutex_unlock(&LOCK_timer);
186 if (reschedule > 0)
187 {
188#if defined(MAIN)
189 printf("reschedule\n"); fflush(stdout);
190#endif
191 DBUG_PRINT("info", ("reschedule"));
192 mysql_cond_signal(&COND_timer);
193 }
194
195 DBUG_RETURN(0);
196}
197
198
199/*
200 Remove timer from list of timers
201
202 notes: Timer will be marked as expired
203*/
204
205void thr_timer_end(thr_timer_t *timer_data)
206{
207 DBUG_ENTER("thr_timer_end");
208
209 mysql_mutex_lock(&LOCK_timer);
210 if (!timer_data->expired)
211 {
212 DBUG_ASSERT(timer_data->index_in_queue != 0);
213 DBUG_ASSERT(queue_element(&timer_queue, timer_data->index_in_queue) ==
214 (uchar*) timer_data);
215 queue_remove(&timer_queue, timer_data->index_in_queue);
216 /* Mark as expired for asserts to work */
217 timer_data->expired= 1;
218 }
219 mysql_mutex_unlock(&LOCK_timer);
220 DBUG_VOID_RETURN;
221}
222
223
224/*
225 Come here when some timer in queue is due.
226*/
227
228static sig_handler process_timers(struct timespec *now)
229{
230 thr_timer_t *timer_data;
231 DBUG_ENTER("process_timers");
232 DBUG_PRINT("info",("active timers: %d", timer_queue.elements - 1));
233
234#if defined(MAIN)
235 printf("process_timer\n"); fflush(stdout);
236#endif
237
238 /* We can safely remove the first one as it has already expired */
239 for (;;)
240 {
241 void (*function)(void*);
242 void *func_arg;
243
244 timer_data= (thr_timer_t*) queue_top(&timer_queue);
245 function= timer_data->func;
246 func_arg= timer_data->func_arg;
247 timer_data->expired= 1; /* Mark expired */
248 /*
249 We remove timer before calling timer function to allow thread to
250 delete it's timer data any time.
251 */
252 queue_remove_top(&timer_queue); /* Remove timer */
253 (*function)(func_arg); /* Inform thread of timeout */
254
255 /* Check if next one has also expired */
256 timer_data= (thr_timer_t*) queue_top(&timer_queue);
257 if (cmp_timespec(timer_data->expire_time, (*now)) > 0)
258 break; /* All data processed */
259 }
260 DBUG_VOID_RETURN;
261}
262
263
264/*
265 set up a timer thread to handle timeouts
266 This will be killed when thr_timer_inited is set to false.
267*/
268
269static void *timer_handler(void *arg __attribute__((unused)))
270{
271 my_thread_init();
272
273 mysql_mutex_lock(&LOCK_timer);
274 while (likely(thr_timer_inited))
275 {
276 int error;
277 struct timespec *top_time;
278 struct timespec now, abstime;
279
280 set_timespec(now, 0);
281
282 top_time= &(((thr_timer_t*) queue_top(&timer_queue))->expire_time);
283
284 if (cmp_timespec((*top_time), now) <= 0)
285 {
286 process_timers(&now);
287 top_time= &(((thr_timer_t*) queue_top(&timer_queue))->expire_time);
288 }
289
290 abstime= *top_time;
291 next_timer_expire_time= *top_time;
292 if ((error= mysql_cond_timedwait(&COND_timer, &LOCK_timer, &abstime)) &&
293 error != ETIME && error != ETIMEDOUT)
294 {
295#ifdef MAIN
296 printf("Got error: %d from ptread_cond_timedwait (errno: %d)\n",
297 error,errno);
298#endif
299 }
300 }
301 mysql_mutex_unlock(&LOCK_timer);
302 my_thread_end();
303 pthread_exit(0);
304 return 0; /* Impossible */
305}
306
307
308/****************************************************************************
309 Testing of thr_timer (when compiled with -DMAIN)
310***************************************************************************/
311
312#ifdef MAIN
313
314static mysql_cond_t COND_thread_count;
315static mysql_mutex_t LOCK_thread_count;
316static uint thread_count, benchmark_runs, test_to_run= 1;
317
318static void send_signal(void *arg)
319{
320 struct st_my_thread_var *current_my_thread_var= arg;
321#if defined(MAIN)
322 printf("sending signal\n"); fflush(stdout);
323#endif
324 mysql_mutex_lock(&current_my_thread_var->mutex);
325 mysql_cond_signal(&current_my_thread_var->suspend);
326 mysql_mutex_unlock(&current_my_thread_var->mutex);
327}
328
329
330static void run_thread_test(int param)
331{
332 int i,wait_time,retry;
333 my_hrtime_t start_time;
334 thr_timer_t timer_data;
335 struct st_my_thread_var *current_my_thread_var;
336 DBUG_ENTER("run_thread_test");
337
338 current_my_thread_var= my_thread_var;
339 thr_timer_init(&timer_data, send_signal, current_my_thread_var);
340
341 for (i=1 ; i <= 10 ; i++)
342 {
343 wait_time=param ? 11-i : i;
344 start_time= my_hrtime();
345
346 mysql_mutex_lock(&current_my_thread_var->mutex);
347 if (thr_timer_settime(&timer_data, wait_time * 1000000))
348 {
349 printf("Thread: %s timers aborted\n",my_thread_name());
350 break;
351 }
352 if (wait_time == 3)
353 {
354 printf("Thread: %s Simulation of no timer needed\n",my_thread_name());
355 fflush(stdout);
356 }
357 else
358 {
359 for (retry=0 ; !timer_data.expired && retry < 10 ; retry++)
360 {
361 printf("Thread: %s Waiting %d sec\n",my_thread_name(),wait_time);
362 mysql_cond_wait(&current_my_thread_var->suspend,
363 &current_my_thread_var->mutex);
364
365 }
366 if (!timer_data.expired)
367 {
368 printf("Thread: %s didn't get an timer. Aborting!\n",
369 my_thread_name());
370 break;
371 }
372 }
373 mysql_mutex_unlock(&current_my_thread_var->mutex);
374 printf("Thread: %s Slept for %g (%d) sec\n",my_thread_name(),
375 (int) (my_hrtime().val-start_time.val)/1000000.0, wait_time);
376 fflush(stdout);
377 thr_timer_end(&timer_data);
378 fflush(stdout);
379 }
380 DBUG_VOID_RETURN;
381}
382
383
384static void run_thread_benchmark(int param)
385{
386 int i;
387 struct st_my_thread_var *current_my_thread_var;
388 thr_timer_t timer_data;
389 DBUG_ENTER("run_thread_benchmark");
390
391 current_my_thread_var= my_thread_var;
392 thr_timer_init(&timer_data, send_signal, current_my_thread_var);
393
394 for (i=1 ; i <= param ; i++)
395 {
396 if (thr_timer_settime(&timer_data, 1000000))
397 {
398 printf("Thread: %s timers aborted\n",my_thread_name());
399 break;
400 }
401 thr_timer_end(&timer_data);
402 }
403 DBUG_VOID_RETURN;
404}
405
406
407#ifdef HAVE_TIMER_CREATE
408
409/* Test for benchmarking posix timers against thr_timer */
410
411#ifndef sigev_notify_thread_id
412#define sigev_notify_thread_id _sigev_un._tid
413#endif
414
415static void run_timer_benchmark(int param)
416{
417 int i;
418 timer_t timerid;
419 struct sigevent sigev;
420 pid_t thread_id= (pid_t) syscall(SYS_gettid);
421 DBUG_ENTER("run_timer_benchmark");
422
423 /* Setup a signal that will never be signaled */
424 sigev.sigev_value.sival_ptr= 0;
425 sigev.sigev_signo= SIGRTMIN; /* First free signal */
426 sigev.sigev_notify= SIGEV_SIGNAL | SIGEV_THREAD_ID;
427 sigev.sigev_notify_thread_id= thread_id;
428
429 if (timer_create(CLOCK_MONOTONIC, &sigev, &timerid))
430 {
431 printf("Could not create timer\n");
432 exit(1);
433 }
434
435 for (i=1 ; i <= param ; i++)
436 {
437 struct itimerspec abstime;
438 abstime.it_interval.tv_sec= 0;
439 abstime.it_interval.tv_nsec= 0;
440 abstime.it_value.tv_sec= 1;
441 abstime.it_value.tv_nsec= 0;
442
443 if (timer_settime(timerid, 0, &abstime, NULL))
444 {
445 printf("Thread: %s timers aborted\n",my_thread_name());
446 break;
447 }
448 abstime.it_interval.tv_sec= 0;
449 abstime.it_interval.tv_nsec= 0;
450 abstime.it_value.tv_sec= 0;
451 abstime.it_value.tv_nsec= 0;
452 timer_settime(timerid, 0, &abstime, NULL);
453 }
454 timer_delete(timerid);
455 DBUG_VOID_RETURN;
456}
457#endif /* HAVE_TIMER_CREATE */
458
459
460static void *start_thread(void *arg)
461{
462 my_thread_init();
463 printf("Thread %d (%s) started\n",*((int*) arg),my_thread_name());
464 fflush(stdout);
465
466 switch (test_to_run) {
467 case 1:
468 run_thread_test(*((int*) arg));
469 break;
470 case 2:
471 run_thread_benchmark(benchmark_runs);
472 break;
473 case 3:
474#ifdef HAVE_TIMER_CREATE
475 run_timer_benchmark(benchmark_runs);
476#endif
477 break;
478 }
479 free((uchar*) arg);
480 mysql_mutex_lock(&LOCK_thread_count);
481 thread_count--;
482 mysql_cond_signal(&COND_thread_count); /* Tell main we are ready */
483 mysql_mutex_unlock(&LOCK_thread_count);
484 my_thread_end();
485 return 0;
486}
487
488
489/* Start a lot of threads that will run with timers */
490
491static void run_test()
492{
493 pthread_t tid;
494 pthread_attr_t thr_attr;
495 int i,*param,error;
496 DBUG_ENTER("run_test");
497
498 if (init_thr_timer(5))
499 {
500 printf("Can't initialize timers\n");
501 exit(1);
502 }
503
504 mysql_mutex_init(0, &LOCK_thread_count, MY_MUTEX_INIT_FAST);
505 mysql_cond_init(0, &COND_thread_count, NULL);
506
507 thr_setconcurrency(3);
508 pthread_attr_init(&thr_attr);
509 pthread_attr_setscope(&thr_attr,PTHREAD_SCOPE_PROCESS);
510 printf("Main thread: %s\n",my_thread_name());
511 for (i=0 ; i < 2 ; i++)
512 {
513 param=(int*) malloc(sizeof(int));
514 *param= i;
515 mysql_mutex_lock(&LOCK_thread_count);
516 if ((error= mysql_thread_create(0,
517 &tid, &thr_attr, start_thread,
518 (void*) param)))
519 {
520 printf("Can't create thread %d, error: %d\n",i,error);
521 exit(1);
522 }
523 thread_count++;
524 mysql_mutex_unlock(&LOCK_thread_count);
525 }
526
527 pthread_attr_destroy(&thr_attr);
528 mysql_mutex_lock(&LOCK_thread_count);
529 while (thread_count)
530 {
531 mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
532 }
533 mysql_mutex_unlock(&LOCK_thread_count);
534 DBUG_ASSERT(timer_queue.elements == 1);
535 end_thr_timer();
536 printf("Test succeeded\n");
537 DBUG_VOID_RETURN;
538}
539
540
541int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
542{
543 MY_INIT(argv[0]);
544
545 if (argc > 1 && argv[1][0] == '-')
546 {
547 switch (argv[1][1]) {
548 case '#':
549 test_to_run= 1;
550 DBUG_PUSH(argv[1]+2);
551 break;
552 case 'b':
553 test_to_run= 2;
554 benchmark_runs= atoi(argv[1]+2);
555 break;
556 case 't':
557 test_to_run= 3;
558 benchmark_runs= atoi(argv[1]+2);
559 break;
560 }
561 }
562 if (!benchmark_runs)
563 benchmark_runs= 1000000;
564
565 run_test();
566 my_end(1);
567 return 0;
568}
569
570#endif /* MAIN */
571