1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | /* -*- mode: C; c-basic-offset: 4 -*- */ |
4 | #ident "$Id$" |
5 | /*====== |
6 | This file is part of TokuDB |
7 | |
8 | |
9 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
10 | |
11 | TokuDBis is free software: you can redistribute it and/or modify |
12 | it under the terms of the GNU General Public License, version 2, |
13 | as published by the Free Software Foundation. |
14 | |
15 | TokuDB is distributed in the hope that it will be useful, |
16 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
18 | GNU General Public License for more details. |
19 | |
20 | You should have received a copy of the GNU General Public License |
21 | along with TokuDB. If not, see <http://www.gnu.org/licenses/>. |
22 | |
23 | ======= */ |
24 | |
25 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
26 | |
27 | #ifndef _TOKUDB_SYNC_H |
28 | #define _TOKUDB_SYNC_H |
29 | |
30 | #include "hatoku_defines.h" |
31 | #include "tokudb_debug.h" |
32 | #include "tokudb_time.h" |
33 | |
34 | namespace tokudb { |
35 | namespace thread { |
36 | |
37 | extern const pfs_key_t pfs_not_instrumented; |
38 | |
39 | uint my_tid(void); |
40 | |
41 | // Your basic mutex |
42 | class mutex_t { |
43 | public: |
44 | explicit mutex_t(pfs_key_t key); |
45 | mutex_t(void) : mutex_t(pfs_not_instrumented) {} |
46 | ~mutex_t(void); |
47 | |
48 | void reinit(pfs_key_t key); |
49 | void lock( |
50 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
51 | const char* src_file, |
52 | uint src_line |
53 | #endif // HAVE_PSI_MUTEX_INTERFACE |
54 | ); |
55 | void unlock( |
56 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
57 | const char* src_file, |
58 | uint src_line |
59 | #endif // HAVE_PSI_MUTEX_INTERFACE |
60 | ); |
61 | #ifdef TOKUDB_DEBUG |
62 | bool is_owned_by_me(void) const; |
63 | #endif |
64 | private: |
65 | static pthread_t _null_owner; |
66 | mysql_mutex_t _mutex; |
67 | #ifdef TOKUDB_DEBUG |
68 | uint _owners; |
69 | pthread_t _owner; |
70 | #endif |
71 | }; |
72 | |
73 | // Simple read write lock |
74 | class rwlock_t { |
75 | public: |
76 | explicit rwlock_t(pfs_key_t key); |
77 | rwlock_t(void) : rwlock_t(pfs_not_instrumented) {} |
78 | ~rwlock_t(void); |
79 | |
80 | void lock_read( |
81 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
82 | const char* src_file, |
83 | uint src_line |
84 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
85 | ); |
86 | void lock_write( |
87 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
88 | const char* src_file, |
89 | uint src_line |
90 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
91 | ); |
92 | void unlock(void); |
93 | |
94 | private: |
95 | rwlock_t(const rwlock_t&); |
96 | rwlock_t& operator=(const rwlock_t&); |
97 | |
98 | mysql_rwlock_t _rwlock; |
99 | }; |
100 | |
101 | // Simple event signal/wait class |
102 | class event_t { |
103 | public: |
104 | // create_signalled - create the event in a signalled state |
105 | // manual_reset - create an event that must be manually reset |
106 | // after signaling |
107 | event_t( |
108 | bool create_signalled = false, |
109 | bool manual_reset = false); |
110 | ~event_t(void); |
111 | |
112 | // wait for the event to become signalled |
113 | void wait(void); |
114 | int wait(ulonglong microseconds); |
115 | |
116 | // signal the event |
117 | void signal(void); |
118 | |
119 | // pulse the event (signal and free exactly one waiter) |
120 | void pulse(void); |
121 | |
122 | // is the event currently signalled |
123 | bool signalled(void); |
124 | |
125 | // unsignal/clear the event |
126 | void reset(void); |
127 | |
128 | private: |
129 | event_t(const event_t&); |
130 | event_t& operator=(const event_t&); |
131 | |
132 | pthread_mutex_t _mutex; |
133 | pthread_cond_t _cond; |
134 | bool _signalled; |
135 | bool _pulsed; |
136 | bool _manual_reset; |
137 | }; |
138 | |
139 | // Semaphore signal/wait class |
140 | class semaphore_t { |
141 | public: |
142 | // initial_count - the initial signal count of the semaphore |
143 | // max_count - the maximum signal count for the semaphore. |
144 | semaphore_t(int initial_count, int max_count); |
145 | ~semaphore_t(void); |
146 | |
147 | enum E_WAIT { |
148 | E_SIGNALLED = 0, |
149 | E_INTERRUPTED = 1, |
150 | E_TIMEDOUT = 2 |
151 | }; |
152 | |
153 | // wait for the semaphore to become signalled |
154 | E_WAIT wait(void); |
155 | E_WAIT wait(ulonglong microseconds); |
156 | |
157 | // signal the semaphore to increase the count |
158 | // return true if signalled, false if ignored due to count |
159 | bool signal(void); |
160 | |
161 | // what is the semaphore signal count |
162 | int signalled(void); |
163 | |
164 | // unsignal a signalled semaphore |
165 | void reset(void); |
166 | |
167 | // set to interrupt any waiters, as long as is set, |
168 | // waiters will return immediately with E_INTERRUPTED. |
169 | // the semaphore signal count and tracking will continue |
170 | // accepting signals and leave the signalled state intact |
171 | void set_interrupt(void); |
172 | void clear_interrupt(void); |
173 | |
174 | private: |
175 | semaphore_t(const semaphore_t&); |
176 | semaphore_t& operator=(const semaphore_t&); |
177 | |
178 | pthread_mutex_t _mutex; |
179 | pthread_cond_t _cond; |
180 | bool _interrupted; |
181 | int _signalled; |
182 | int _initial_count; |
183 | int _max_count; |
184 | }; |
185 | |
186 | // Thread class |
187 | class thread_t { |
188 | public: |
189 | thread_t(void); |
190 | ~thread_t(void); |
191 | |
192 | int start(void* (*pfn)(void*), void* arg); |
193 | int join(void** value_ptr); |
194 | int detach(void); |
195 | |
196 | private: |
197 | pthread_t _thread; |
198 | }; |
199 | |
200 | inline uint my_tid(void) { return (uint)toku_os_gettid(); } |
201 | |
202 | inline mutex_t::mutex_t(pfs_key_t key) { |
203 | #ifdef TOKUDB_DEBUG |
204 | _owners = 0; |
205 | _owner = _null_owner; |
206 | #endif |
207 | int r MY_ATTRIBUTE((unused)) = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST); |
208 | assert_debug(r == 0); |
209 | } |
210 | inline mutex_t::~mutex_t() { |
211 | #ifdef TOKUDB_DEBUG |
212 | assert_debug(_owners == 0); |
213 | #endif |
214 | int r MY_ATTRIBUTE((unused)) = mysql_mutex_destroy(&_mutex); |
215 | assert_debug(r == 0); |
216 | } |
217 | inline void mutex_t::reinit(pfs_key_t key) { |
218 | #ifdef TOKUDB_DEBUG |
219 | assert_debug(_owners == 0); |
220 | #endif |
221 | int r MY_ATTRIBUTE((unused)); |
222 | r = mysql_mutex_destroy(&_mutex); |
223 | assert_debug(r == 0); |
224 | r = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST); |
225 | assert_debug(r == 0); |
226 | } |
227 | inline void mutex_t::lock( |
228 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
229 | const char* src_file, |
230 | uint src_line |
231 | #endif // HAVE_PSI_MUTEX_INTERFACE |
232 | ) { |
233 | assert_debug(is_owned_by_me() == false); |
234 | int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_lock(&_mutex |
235 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
236 | , |
237 | src_file, |
238 | src_line |
239 | #endif // HAVE_PSI_MUTEX_INTERFACE |
240 | ); |
241 | assert_debug(r == 0); |
242 | #ifdef TOKUDB_DEBUG |
243 | _owners++; |
244 | _owner = pthread_self(); |
245 | #endif |
246 | } |
247 | inline void mutex_t::unlock( |
248 | #ifdef HAVE_PSI_MUTEX_INTERFACE |
249 | const char* src_file, |
250 | uint src_line |
251 | #endif // HAVE_PSI_MUTEX_INTERFACE |
252 | ) { |
253 | #ifndef SAFE_MUTEX |
254 | (void)(src_file); |
255 | (void)(src_line); |
256 | #endif // SAFE_MUTEX |
257 | #ifdef TOKUDB_DEBUG |
258 | assert_debug(_owners > 0); |
259 | assert_debug(is_owned_by_me()); |
260 | _owners--; |
261 | _owner = _null_owner; |
262 | #endif |
263 | int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_unlock(&_mutex |
264 | #ifdef SAFE_MUTEX |
265 | , |
266 | src_file, |
267 | src_line |
268 | #endif // SAFE_MUTEX |
269 | ); |
270 | assert_debug(r == 0); |
271 | } |
272 | #ifdef TOKUDB_DEBUG |
273 | inline bool mutex_t::is_owned_by_me(void) const { |
274 | return pthread_equal(pthread_self(), _owner) != 0 ? true : false; |
275 | } |
276 | #endif |
277 | |
278 | inline rwlock_t::rwlock_t(pfs_key_t key) { |
279 | int r MY_ATTRIBUTE((unused)) = mysql_rwlock_init(key, &_rwlock); |
280 | assert_debug(r == 0); |
281 | } |
282 | inline rwlock_t::~rwlock_t() { |
283 | int r MY_ATTRIBUTE((unused)) = mysql_rwlock_destroy(&_rwlock); |
284 | assert_debug(r == 0); |
285 | } |
286 | inline void rwlock_t::lock_read( |
287 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
288 | const char* src_file, |
289 | uint src_line |
290 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
291 | ) { |
292 | int r; |
293 | while ((r = inline_mysql_rwlock_rdlock(&_rwlock |
294 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
295 | , |
296 | src_file, |
297 | src_line |
298 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
299 | )) != 0) { |
300 | if (r == EBUSY || r == EAGAIN) { |
301 | time::sleep_microsec(1000); |
302 | continue; |
303 | } |
304 | break; |
305 | } |
306 | assert_debug(r == 0); |
307 | } |
308 | inline void rwlock_t::lock_write( |
309 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
310 | const char* src_file, |
311 | uint src_line |
312 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
313 | ) { |
314 | int r; |
315 | while ((r = inline_mysql_rwlock_wrlock(&_rwlock |
316 | #ifdef HAVE_PSI_RWLOCK_INTERFACE |
317 | , |
318 | src_file, |
319 | src_line |
320 | #endif // HAVE_PSI_RWLOCK_INTERFACE |
321 | )) != 0) { |
322 | if (r == EBUSY || r == EAGAIN) { |
323 | time::sleep_microsec(1000); |
324 | continue; |
325 | } |
326 | break; |
327 | } |
328 | assert_debug(r == 0); |
329 | } |
330 | inline void rwlock_t::unlock(void) { |
331 | int r MY_ATTRIBUTE((unused)) = mysql_rwlock_unlock(&_rwlock); |
332 | assert_debug(r == 0); |
333 | } |
334 | inline rwlock_t::rwlock_t(const rwlock_t&) {} |
335 | inline rwlock_t& rwlock_t::operator=(const rwlock_t&) { |
336 | return *this; |
337 | } |
338 | |
339 | |
340 | inline event_t::event_t(bool create_signalled, bool manual_reset) : |
341 | _manual_reset(manual_reset) { |
342 | |
343 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL); |
344 | assert_debug(r == 0); |
345 | r = pthread_cond_init(&_cond, NULL); |
346 | assert_debug(r == 0); |
347 | if (create_signalled) { |
348 | _signalled = true; |
349 | } else { |
350 | _signalled = false; |
351 | } |
352 | _pulsed = false; |
353 | } |
354 | inline event_t::~event_t(void) { |
355 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex); |
356 | assert_debug(r == 0); |
357 | r = pthread_cond_destroy(&_cond); |
358 | assert_debug(r == 0); |
359 | } |
360 | inline void event_t::wait(void) { |
361 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
362 | assert_debug(r == 0); |
363 | while (_signalled == false && _pulsed == false) { |
364 | r = pthread_cond_wait(&_cond, &_mutex); |
365 | assert_debug(r == 0); |
366 | } |
367 | if (_manual_reset == false) |
368 | _signalled = false; |
369 | if (_pulsed) |
370 | _pulsed = false; |
371 | r = pthread_mutex_unlock(&_mutex); |
372 | assert_debug(r == 0); |
373 | return; |
374 | } |
375 | inline int event_t::wait(ulonglong microseconds) { |
376 | timespec waittime = time::offset_timespec(microseconds); |
377 | int r = pthread_mutex_timedlock(&_mutex, &waittime); |
378 | if (r == ETIMEDOUT) return ETIMEDOUT; |
379 | assert_debug(r == 0); |
380 | while (_signalled == false && _pulsed == false) { |
381 | r = pthread_cond_timedwait(&_cond, &_mutex, &waittime); |
382 | if (r == ETIMEDOUT) { |
383 | r = pthread_mutex_unlock(&_mutex); |
384 | assert_debug(r == 0); |
385 | return ETIMEDOUT; |
386 | } |
387 | assert_debug(r == 0); |
388 | } |
389 | if (_manual_reset == false) |
390 | _signalled = false; |
391 | if (_pulsed) |
392 | _pulsed = false; |
393 | r = pthread_mutex_unlock(&_mutex); |
394 | assert_debug(r == 0); |
395 | return 0; |
396 | } |
397 | inline void event_t::signal(void) { |
398 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
399 | assert_debug(r == 0); |
400 | _signalled = true; |
401 | if (_manual_reset) { |
402 | r = pthread_cond_broadcast(&_cond); |
403 | assert_debug(r == 0); |
404 | } else { |
405 | r = pthread_cond_signal(&_cond); |
406 | assert_debug(r == 0); |
407 | } |
408 | r = pthread_mutex_unlock(&_mutex); |
409 | assert_debug(r == 0); |
410 | } |
411 | inline void event_t::pulse(void) { |
412 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
413 | assert_debug(r == 0); |
414 | _pulsed = true; |
415 | r = pthread_cond_signal(&_cond); |
416 | assert_debug(r == 0); |
417 | r = pthread_mutex_unlock(&_mutex); |
418 | assert_debug(r == 0); |
419 | } |
420 | inline bool event_t::signalled(void) { |
421 | bool ret = false; |
422 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
423 | assert_debug(r == 0); |
424 | ret = _signalled; |
425 | r = pthread_mutex_unlock(&_mutex); |
426 | assert_debug(r == 0); |
427 | return ret; |
428 | } |
429 | inline void event_t::reset(void) { |
430 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
431 | assert_debug(r == 0); |
432 | _signalled = false; |
433 | _pulsed = false; |
434 | r = pthread_mutex_unlock(&_mutex); |
435 | assert_debug(r == 0); |
436 | return; |
437 | } |
438 | inline event_t::event_t(const event_t&) { |
439 | } |
440 | inline event_t& event_t::operator=(const event_t&) { |
441 | return *this; |
442 | } |
443 | |
444 | |
445 | inline semaphore_t::semaphore_t( |
446 | int initial_count, |
447 | int max_count) : |
448 | _interrupted(false), |
449 | _initial_count(initial_count), |
450 | _max_count(max_count) { |
451 | |
452 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL); |
453 | assert_debug(r == 0); |
454 | r = pthread_cond_init(&_cond, NULL); |
455 | assert_debug(r == 0); |
456 | _signalled = _initial_count; |
457 | } |
458 | inline semaphore_t::~semaphore_t(void) { |
459 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex); |
460 | assert_debug(r == 0); |
461 | r = pthread_cond_destroy(&_cond); |
462 | assert_debug(r == 0); |
463 | } |
464 | inline semaphore_t::E_WAIT semaphore_t::wait(void) { |
465 | E_WAIT ret; |
466 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
467 | assert_debug(r == 0); |
468 | while (_signalled == 0 && _interrupted == false) { |
469 | r = pthread_cond_wait(&_cond, &_mutex); |
470 | assert_debug(r == 0); |
471 | } |
472 | if (_interrupted) { |
473 | ret = E_INTERRUPTED; |
474 | } else { |
475 | _signalled--; |
476 | ret = E_SIGNALLED; |
477 | } |
478 | r = pthread_mutex_unlock(&_mutex); |
479 | assert_debug(r == 0); |
480 | return ret; |
481 | } |
482 | inline semaphore_t::E_WAIT semaphore_t::wait(ulonglong microseconds) { |
483 | E_WAIT ret; |
484 | timespec waittime = time::offset_timespec(microseconds); |
485 | int r = pthread_mutex_timedlock(&_mutex, &waittime); |
486 | if (r == ETIMEDOUT) return E_TIMEDOUT; |
487 | assert_debug(r == 0); |
488 | while (_signalled == 0 && _interrupted == false) { |
489 | r = pthread_cond_timedwait(&_cond, &_mutex, &waittime); |
490 | if (r == ETIMEDOUT) { |
491 | r = pthread_mutex_unlock(&_mutex); |
492 | assert_debug(r == 0); |
493 | return E_TIMEDOUT; |
494 | } |
495 | assert_debug(r == 0); |
496 | } |
497 | if (_interrupted) { |
498 | ret = E_INTERRUPTED; |
499 | } else { |
500 | _signalled--; |
501 | ret = E_SIGNALLED; |
502 | } |
503 | r = pthread_mutex_unlock(&_mutex); |
504 | assert_debug(r == 0); |
505 | return ret; |
506 | } |
507 | inline bool semaphore_t::signal(void) { |
508 | bool ret = false; |
509 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
510 | assert_debug(r == 0); |
511 | if (_signalled < _max_count) { |
512 | _signalled++; |
513 | ret = true; |
514 | } |
515 | r = pthread_cond_signal(&_cond); |
516 | assert_debug(r == 0); |
517 | r = pthread_mutex_unlock(&_mutex); |
518 | assert_debug(r == 0); |
519 | return ret; |
520 | } |
521 | inline int semaphore_t::signalled(void) { |
522 | int ret = 0; |
523 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
524 | assert_debug(r == 0); |
525 | ret = _signalled; |
526 | r = pthread_mutex_unlock(&_mutex); |
527 | assert_debug(r == 0); |
528 | return ret; |
529 | } |
530 | inline void semaphore_t::reset(void) { |
531 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
532 | assert_debug(r == 0); |
533 | _signalled = 0; |
534 | r = pthread_mutex_unlock(&_mutex); |
535 | assert_debug(r == 0); |
536 | return; |
537 | } |
538 | inline void semaphore_t::set_interrupt(void) { |
539 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
540 | assert_debug(r == 0); |
541 | _interrupted = true; |
542 | r = pthread_cond_broadcast(&_cond); |
543 | assert_debug(r == 0); |
544 | r = pthread_mutex_unlock(&_mutex); |
545 | assert_debug(r == 0); |
546 | } |
547 | inline void semaphore_t::clear_interrupt(void) { |
548 | int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); |
549 | assert_debug(r == 0); |
550 | _interrupted = false; |
551 | r = pthread_mutex_unlock(&_mutex); |
552 | assert_debug(r == 0); |
553 | } |
554 | inline semaphore_t::semaphore_t(const semaphore_t&) { |
555 | } |
556 | inline semaphore_t& semaphore_t::operator=(const semaphore_t&) { |
557 | return *this; |
558 | } |
559 | |
560 | |
561 | inline thread_t::thread_t(void) : _thread(0) { |
562 | } |
563 | inline thread_t::~thread_t(void) { |
564 | } |
565 | inline int thread_t::start(void*(*pfn)(void*), void* arg) { |
566 | return pthread_create(&_thread, NULL, pfn, arg); |
567 | } |
568 | inline int thread_t::join(void** value_ptr) { |
569 | return pthread_join(_thread, value_ptr); |
570 | } |
571 | inline int thread_t::detach(void) { |
572 | return pthread_detach(_thread); |
573 | } |
574 | |
575 | } // namespace thread |
576 | } // namespace tokudb |
577 | |
578 | |
579 | #endif // _TOKUDB_SYNC_H |
580 | |