1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3/* -*- mode: C; c-basic-offset: 4 -*- */
4#ident "$Id$"
5/*======
6This file is part of TokuDB
7
8
9Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10
11 TokuDBis is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License, version 2,
13 as published by the Free Software Foundation.
14
15 TokuDB is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
22
23======= */
24
25#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26
27#ifndef _TOKUDB_SYNC_H
28#define _TOKUDB_SYNC_H
29
30#include "hatoku_defines.h"
31#include "tokudb_debug.h"
32#include "tokudb_time.h"
33
34namespace tokudb {
35namespace thread {
36
37extern const pfs_key_t pfs_not_instrumented;
38
39uint my_tid(void);
40
41// Your basic mutex
42class mutex_t {
43public:
44 explicit mutex_t(pfs_key_t key);
45 mutex_t(void) : mutex_t(pfs_not_instrumented) {}
46 ~mutex_t(void);
47
48 void reinit(pfs_key_t key);
49 void lock(
50#ifdef HAVE_PSI_MUTEX_INTERFACE
51 const char* src_file,
52 uint src_line
53#endif // HAVE_PSI_MUTEX_INTERFACE
54 );
55 void unlock(
56#ifdef HAVE_PSI_MUTEX_INTERFACE
57 const char* src_file,
58 uint src_line
59#endif // HAVE_PSI_MUTEX_INTERFACE
60 );
61#ifdef TOKUDB_DEBUG
62 bool is_owned_by_me(void) const;
63#endif
64private:
65 static pthread_t _null_owner;
66 mysql_mutex_t _mutex;
67#ifdef TOKUDB_DEBUG
68 uint _owners;
69 pthread_t _owner;
70#endif
71};
72
73// Simple read write lock
74class rwlock_t {
75public:
76 explicit rwlock_t(pfs_key_t key);
77 rwlock_t(void) : rwlock_t(pfs_not_instrumented) {}
78 ~rwlock_t(void);
79
80 void lock_read(
81#ifdef HAVE_PSI_RWLOCK_INTERFACE
82 const char* src_file,
83 uint src_line
84#endif // HAVE_PSI_RWLOCK_INTERFACE
85 );
86 void lock_write(
87#ifdef HAVE_PSI_RWLOCK_INTERFACE
88 const char* src_file,
89 uint src_line
90#endif // HAVE_PSI_RWLOCK_INTERFACE
91 );
92 void unlock(void);
93
94private:
95 rwlock_t(const rwlock_t&);
96 rwlock_t& operator=(const rwlock_t&);
97
98 mysql_rwlock_t _rwlock;
99};
100
101// Simple event signal/wait class
102class event_t {
103public:
104 // create_signalled - create the event in a signalled state
105 // manual_reset - create an event that must be manually reset
106 // after signaling
107 event_t(
108 bool create_signalled = false,
109 bool manual_reset = false);
110 ~event_t(void);
111
112 // wait for the event to become signalled
113 void wait(void);
114 int wait(ulonglong microseconds);
115
116 // signal the event
117 void signal(void);
118
119 // pulse the event (signal and free exactly one waiter)
120 void pulse(void);
121
122 // is the event currently signalled
123 bool signalled(void);
124
125 // unsignal/clear the event
126 void reset(void);
127
128private:
129 event_t(const event_t&);
130 event_t& operator=(const event_t&);
131
132 pthread_mutex_t _mutex;
133 pthread_cond_t _cond;
134 bool _signalled;
135 bool _pulsed;
136 bool _manual_reset;
137};
138
139// Semaphore signal/wait class
140class semaphore_t {
141public:
142 // initial_count - the initial signal count of the semaphore
143 // max_count - the maximum signal count for the semaphore.
144 semaphore_t(int initial_count, int max_count);
145 ~semaphore_t(void);
146
147 enum E_WAIT {
148 E_SIGNALLED = 0,
149 E_INTERRUPTED = 1,
150 E_TIMEDOUT = 2
151 };
152
153 // wait for the semaphore to become signalled
154 E_WAIT wait(void);
155 E_WAIT wait(ulonglong microseconds);
156
157 // signal the semaphore to increase the count
158 // return true if signalled, false if ignored due to count
159 bool signal(void);
160
161 // what is the semaphore signal count
162 int signalled(void);
163
164 // unsignal a signalled semaphore
165 void reset(void);
166
167 // set to interrupt any waiters, as long as is set,
168 // waiters will return immediately with E_INTERRUPTED.
169 // the semaphore signal count and tracking will continue
170 // accepting signals and leave the signalled state intact
171 void set_interrupt(void);
172 void clear_interrupt(void);
173
174private:
175 semaphore_t(const semaphore_t&);
176 semaphore_t& operator=(const semaphore_t&);
177
178 pthread_mutex_t _mutex;
179 pthread_cond_t _cond;
180 bool _interrupted;
181 int _signalled;
182 int _initial_count;
183 int _max_count;
184};
185
186// Thread class
187class thread_t {
188public:
189 thread_t(void);
190 ~thread_t(void);
191
192 int start(void* (*pfn)(void*), void* arg);
193 int join(void** value_ptr);
194 int detach(void);
195
196private:
197 pthread_t _thread;
198};
199
200inline uint my_tid(void) { return (uint)toku_os_gettid(); }
201
202inline mutex_t::mutex_t(pfs_key_t key) {
203#ifdef TOKUDB_DEBUG
204 _owners = 0;
205 _owner = _null_owner;
206#endif
207 int r MY_ATTRIBUTE((unused)) = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
208 assert_debug(r == 0);
209}
210inline mutex_t::~mutex_t() {
211#ifdef TOKUDB_DEBUG
212 assert_debug(_owners == 0);
213#endif
214 int r MY_ATTRIBUTE((unused)) = mysql_mutex_destroy(&_mutex);
215 assert_debug(r == 0);
216}
217inline void mutex_t::reinit(pfs_key_t key) {
218#ifdef TOKUDB_DEBUG
219 assert_debug(_owners == 0);
220#endif
221 int r MY_ATTRIBUTE((unused));
222 r = mysql_mutex_destroy(&_mutex);
223 assert_debug(r == 0);
224 r = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
225 assert_debug(r == 0);
226}
227inline void mutex_t::lock(
228#ifdef HAVE_PSI_MUTEX_INTERFACE
229 const char* src_file,
230 uint src_line
231#endif // HAVE_PSI_MUTEX_INTERFACE
232 ) {
233 assert_debug(is_owned_by_me() == false);
234 int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_lock(&_mutex
235#ifdef HAVE_PSI_MUTEX_INTERFACE
236 ,
237 src_file,
238 src_line
239#endif // HAVE_PSI_MUTEX_INTERFACE
240 );
241 assert_debug(r == 0);
242#ifdef TOKUDB_DEBUG
243 _owners++;
244 _owner = pthread_self();
245#endif
246}
247inline void mutex_t::unlock(
248#ifdef HAVE_PSI_MUTEX_INTERFACE
249 const char* src_file,
250 uint src_line
251#endif // HAVE_PSI_MUTEX_INTERFACE
252 ) {
253#ifndef SAFE_MUTEX
254 (void)(src_file);
255 (void)(src_line);
256#endif // SAFE_MUTEX
257#ifdef TOKUDB_DEBUG
258 assert_debug(_owners > 0);
259 assert_debug(is_owned_by_me());
260 _owners--;
261 _owner = _null_owner;
262#endif
263 int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_unlock(&_mutex
264#ifdef SAFE_MUTEX
265 ,
266 src_file,
267 src_line
268#endif // SAFE_MUTEX
269 );
270 assert_debug(r == 0);
271}
272#ifdef TOKUDB_DEBUG
273inline bool mutex_t::is_owned_by_me(void) const {
274 return pthread_equal(pthread_self(), _owner) != 0 ? true : false;
275}
276#endif
277
278inline rwlock_t::rwlock_t(pfs_key_t key) {
279 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_init(key, &_rwlock);
280 assert_debug(r == 0);
281}
282inline rwlock_t::~rwlock_t() {
283 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_destroy(&_rwlock);
284 assert_debug(r == 0);
285}
286inline void rwlock_t::lock_read(
287#ifdef HAVE_PSI_RWLOCK_INTERFACE
288 const char* src_file,
289 uint src_line
290#endif // HAVE_PSI_RWLOCK_INTERFACE
291 ) {
292 int r;
293 while ((r = inline_mysql_rwlock_rdlock(&_rwlock
294#ifdef HAVE_PSI_RWLOCK_INTERFACE
295 ,
296 src_file,
297 src_line
298#endif // HAVE_PSI_RWLOCK_INTERFACE
299 )) != 0) {
300 if (r == EBUSY || r == EAGAIN) {
301 time::sleep_microsec(1000);
302 continue;
303 }
304 break;
305 }
306 assert_debug(r == 0);
307}
308inline void rwlock_t::lock_write(
309#ifdef HAVE_PSI_RWLOCK_INTERFACE
310 const char* src_file,
311 uint src_line
312#endif // HAVE_PSI_RWLOCK_INTERFACE
313 ) {
314 int r;
315 while ((r = inline_mysql_rwlock_wrlock(&_rwlock
316#ifdef HAVE_PSI_RWLOCK_INTERFACE
317 ,
318 src_file,
319 src_line
320#endif // HAVE_PSI_RWLOCK_INTERFACE
321 )) != 0) {
322 if (r == EBUSY || r == EAGAIN) {
323 time::sleep_microsec(1000);
324 continue;
325 }
326 break;
327 }
328 assert_debug(r == 0);
329}
330inline void rwlock_t::unlock(void) {
331 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_unlock(&_rwlock);
332 assert_debug(r == 0);
333}
334inline rwlock_t::rwlock_t(const rwlock_t&) {}
335inline rwlock_t& rwlock_t::operator=(const rwlock_t&) {
336 return *this;
337}
338
339
340inline event_t::event_t(bool create_signalled, bool manual_reset) :
341 _manual_reset(manual_reset) {
342
343 int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
344 assert_debug(r == 0);
345 r = pthread_cond_init(&_cond, NULL);
346 assert_debug(r == 0);
347 if (create_signalled) {
348 _signalled = true;
349 } else {
350 _signalled = false;
351 }
352 _pulsed = false;
353}
354inline event_t::~event_t(void) {
355 int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
356 assert_debug(r == 0);
357 r = pthread_cond_destroy(&_cond);
358 assert_debug(r == 0);
359}
360inline void event_t::wait(void) {
361 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
362 assert_debug(r == 0);
363 while (_signalled == false && _pulsed == false) {
364 r = pthread_cond_wait(&_cond, &_mutex);
365 assert_debug(r == 0);
366 }
367 if (_manual_reset == false)
368 _signalled = false;
369 if (_pulsed)
370 _pulsed = false;
371 r = pthread_mutex_unlock(&_mutex);
372 assert_debug(r == 0);
373 return;
374}
375inline int event_t::wait(ulonglong microseconds) {
376 timespec waittime = time::offset_timespec(microseconds);
377 int r = pthread_mutex_timedlock(&_mutex, &waittime);
378 if (r == ETIMEDOUT) return ETIMEDOUT;
379 assert_debug(r == 0);
380 while (_signalled == false && _pulsed == false) {
381 r = pthread_cond_timedwait(&_cond, &_mutex, &waittime);
382 if (r == ETIMEDOUT) {
383 r = pthread_mutex_unlock(&_mutex);
384 assert_debug(r == 0);
385 return ETIMEDOUT;
386 }
387 assert_debug(r == 0);
388 }
389 if (_manual_reset == false)
390 _signalled = false;
391 if (_pulsed)
392 _pulsed = false;
393 r = pthread_mutex_unlock(&_mutex);
394 assert_debug(r == 0);
395 return 0;
396}
397inline void event_t::signal(void) {
398 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
399 assert_debug(r == 0);
400 _signalled = true;
401 if (_manual_reset) {
402 r = pthread_cond_broadcast(&_cond);
403 assert_debug(r == 0);
404 } else {
405 r = pthread_cond_signal(&_cond);
406 assert_debug(r == 0);
407 }
408 r = pthread_mutex_unlock(&_mutex);
409 assert_debug(r == 0);
410}
411inline void event_t::pulse(void) {
412 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
413 assert_debug(r == 0);
414 _pulsed = true;
415 r = pthread_cond_signal(&_cond);
416 assert_debug(r == 0);
417 r = pthread_mutex_unlock(&_mutex);
418 assert_debug(r == 0);
419}
420inline bool event_t::signalled(void) {
421 bool ret = false;
422 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
423 assert_debug(r == 0);
424 ret = _signalled;
425 r = pthread_mutex_unlock(&_mutex);
426 assert_debug(r == 0);
427 return ret;
428}
429inline void event_t::reset(void) {
430 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
431 assert_debug(r == 0);
432 _signalled = false;
433 _pulsed = false;
434 r = pthread_mutex_unlock(&_mutex);
435 assert_debug(r == 0);
436 return;
437}
438inline event_t::event_t(const event_t&) {
439}
440inline event_t& event_t::operator=(const event_t&) {
441 return *this;
442}
443
444
445inline semaphore_t::semaphore_t(
446 int initial_count,
447 int max_count) :
448 _interrupted(false),
449 _initial_count(initial_count),
450 _max_count(max_count) {
451
452 int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
453 assert_debug(r == 0);
454 r = pthread_cond_init(&_cond, NULL);
455 assert_debug(r == 0);
456 _signalled = _initial_count;
457}
458inline semaphore_t::~semaphore_t(void) {
459 int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
460 assert_debug(r == 0);
461 r = pthread_cond_destroy(&_cond);
462 assert_debug(r == 0);
463}
464inline semaphore_t::E_WAIT semaphore_t::wait(void) {
465 E_WAIT ret;
466 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
467 assert_debug(r == 0);
468 while (_signalled == 0 && _interrupted == false) {
469 r = pthread_cond_wait(&_cond, &_mutex);
470 assert_debug(r == 0);
471 }
472 if (_interrupted) {
473 ret = E_INTERRUPTED;
474 } else {
475 _signalled--;
476 ret = E_SIGNALLED;
477 }
478 r = pthread_mutex_unlock(&_mutex);
479 assert_debug(r == 0);
480 return ret;
481}
482inline semaphore_t::E_WAIT semaphore_t::wait(ulonglong microseconds) {
483 E_WAIT ret;
484 timespec waittime = time::offset_timespec(microseconds);
485 int r = pthread_mutex_timedlock(&_mutex, &waittime);
486 if (r == ETIMEDOUT) return E_TIMEDOUT;
487 assert_debug(r == 0);
488 while (_signalled == 0 && _interrupted == false) {
489 r = pthread_cond_timedwait(&_cond, &_mutex, &waittime);
490 if (r == ETIMEDOUT) {
491 r = pthread_mutex_unlock(&_mutex);
492 assert_debug(r == 0);
493 return E_TIMEDOUT;
494 }
495 assert_debug(r == 0);
496 }
497 if (_interrupted) {
498 ret = E_INTERRUPTED;
499 } else {
500 _signalled--;
501 ret = E_SIGNALLED;
502 }
503 r = pthread_mutex_unlock(&_mutex);
504 assert_debug(r == 0);
505 return ret;
506}
507inline bool semaphore_t::signal(void) {
508 bool ret = false;
509 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
510 assert_debug(r == 0);
511 if (_signalled < _max_count) {
512 _signalled++;
513 ret = true;
514 }
515 r = pthread_cond_signal(&_cond);
516 assert_debug(r == 0);
517 r = pthread_mutex_unlock(&_mutex);
518 assert_debug(r == 0);
519 return ret;
520}
521inline int semaphore_t::signalled(void) {
522 int ret = 0;
523 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
524 assert_debug(r == 0);
525 ret = _signalled;
526 r = pthread_mutex_unlock(&_mutex);
527 assert_debug(r == 0);
528 return ret;
529}
530inline void semaphore_t::reset(void) {
531 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
532 assert_debug(r == 0);
533 _signalled = 0;
534 r = pthread_mutex_unlock(&_mutex);
535 assert_debug(r == 0);
536 return;
537}
538inline void semaphore_t::set_interrupt(void) {
539 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
540 assert_debug(r == 0);
541 _interrupted = true;
542 r = pthread_cond_broadcast(&_cond);
543 assert_debug(r == 0);
544 r = pthread_mutex_unlock(&_mutex);
545 assert_debug(r == 0);
546}
547inline void semaphore_t::clear_interrupt(void) {
548 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
549 assert_debug(r == 0);
550 _interrupted = false;
551 r = pthread_mutex_unlock(&_mutex);
552 assert_debug(r == 0);
553}
554inline semaphore_t::semaphore_t(const semaphore_t&) {
555}
556inline semaphore_t& semaphore_t::operator=(const semaphore_t&) {
557 return *this;
558}
559
560
561inline thread_t::thread_t(void) : _thread(0) {
562}
563inline thread_t::~thread_t(void) {
564}
565inline int thread_t::start(void*(*pfn)(void*), void* arg) {
566 return pthread_create(&_thread, NULL, pfn, arg);
567}
568inline int thread_t::join(void** value_ptr) {
569 return pthread_join(_thread, value_ptr);
570}
571inline int thread_t::detach(void) {
572 return pthread_detach(_thread);
573}
574
575} // namespace thread
576} // namespace tokudb
577
578
579#endif // _TOKUDB_SYNC_H
580