1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
31#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
32
33#include "err.hpp"
34#include "mutex.hpp"
35
36// Condition variable class encapsulates OS mutex in a platform-independent way.
37
38#if defined(ZMQ_USE_CV_IMPL_NONE)
39
40namespace zmq
41{
42class condition_variable_t
43{
44 public:
45 inline condition_variable_t () { zmq_assert (false); }
46
47 inline int wait (mutex_t *mutex_, int timeout_)
48 {
49 zmq_assert (false);
50 return -1;
51 }
52
53 inline void broadcast () { zmq_assert (false); }
54
55 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
56};
57}
58
59#elif defined(ZMQ_USE_CV_IMPL_WIN32API)
60
61#include "windows.hpp"
62
63namespace zmq
64{
65class condition_variable_t
66{
67 public:
68 inline condition_variable_t () { InitializeConditionVariable (&_cv); }
69
70 inline int wait (mutex_t *mutex_, int timeout_)
71 {
72 int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_);
73
74 if (rc != 0)
75 return 0;
76
77 rc = GetLastError ();
78
79 if (rc != ERROR_TIMEOUT)
80 win_assert (rc);
81
82 errno = EAGAIN;
83 return -1;
84 }
85
86 inline void broadcast () { WakeAllConditionVariable (&_cv); }
87
88 private:
89 CONDITION_VARIABLE _cv;
90
91 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
92};
93}
94
95#elif defined(ZMQ_USE_CV_IMPL_STL11)
96
97#include <condition_variable>
98
99namespace zmq
100{
101class condition_variable_t
102{
103 public:
104 inline condition_variable_t () ZMQ_DEFAULT;
105
106 inline int wait (mutex_t *mutex_, int timeout_)
107 {
108 // this assumes that the mutex mutex_ has been locked by the caller
109 int res = 0;
110 if (timeout_ == -1) {
111 _cv.wait (
112 *mutex_); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all()
113 } else if (_cv.wait_for (*mutex_, std::chrono::milliseconds (timeout_))
114 == std::cv_status::timeout) {
115 // time expired
116 errno = EAGAIN;
117 res = -1;
118 }
119 return res;
120 }
121
122 inline void broadcast ()
123 {
124 // this assumes that the mutex associated with _cv has been locked by the caller
125 _cv.notify_all ();
126 }
127
128 private:
129 std::condition_variable_any _cv;
130
131 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
132};
133}
134
135#elif defined(ZMQ_USE_CV_IMPL_VXWORKS)
136
137#include <sysLib.h>
138
139namespace zmq
140{
141class condition_variable_t
142{
143 public:
144 inline condition_variable_t () ZMQ_DEFAULT;
145
146 inline ~condition_variable_t ()
147 {
148 scoped_lock_t l (_listenersMutex);
149 for (size_t i = 0; i < _listeners.size (); i++) {
150 semDelete (_listeners[i]);
151 }
152 }
153
154 inline int wait (mutex_t *mutex_, int timeout_)
155 {
156 //Atomically releases lock, blocks the current executing thread,
157 //and adds it to the list of threads waiting on *this. The thread
158 //will be unblocked when broadcast() is executed.
159 //It may also be unblocked spuriously. When unblocked, regardless
160 //of the reason, lock is reacquired and wait exits.
161
162 SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY);
163 {
164 scoped_lock_t l (_listenersMutex);
165 _listeners.push_back (sem);
166 }
167 mutex_->unlock ();
168
169 int rc;
170 if (timeout_ < 0)
171 rc = semTake (sem, WAIT_FOREVER);
172 else {
173 int ticksPerSec = sysClkRateGet ();
174 int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1;
175 rc = semTake (sem, timeoutTicks);
176 }
177
178 {
179 scoped_lock_t l (_listenersMutex);
180 // remove sem from listeners
181 for (size_t i = 0; i < _listeners.size (); i++) {
182 if (_listeners[i] == sem) {
183 _listeners.erase (_listeners.begin () + i);
184 break;
185 }
186 }
187 semDelete (sem);
188 }
189 mutex_->lock ();
190
191 if (rc == 0)
192 return 0;
193
194 if (rc == S_objLib_OBJ_TIMEOUT) {
195 errno = EAGAIN;
196 return -1;
197 }
198
199 return -1;
200 }
201
202 inline void broadcast ()
203 {
204 scoped_lock_t l (_listenersMutex);
205 for (size_t i = 0; i < _listeners.size (); i++) {
206 semGive (_listeners[i]);
207 }
208 }
209
210 private:
211 mutex_t _listenersMutex;
212 std::vector<SEM_ID> _listeners;
213
214 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
215};
216}
217
218#elif defined(ZMQ_USE_CV_IMPL_PTHREADS)
219
220#include <pthread.h>
221
222#if defined(__ANDROID_API__) && __ANDROID_API__ < 21
223#define ANDROID_LEGACY
224extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *,
225 pthread_mutex_t *,
226 const struct timespec *);
227#endif
228
229namespace zmq
230{
231class condition_variable_t
232{
233 public:
234 inline condition_variable_t ()
235 {
236 pthread_condattr_t attr;
237 pthread_condattr_init (&attr);
238#if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY)
239 pthread_condattr_setclock (&attr, CLOCK_MONOTONIC);
240#endif
241 int rc = pthread_cond_init (&_cond, &attr);
242 posix_assert (rc);
243 }
244
245 inline ~condition_variable_t ()
246 {
247 int rc = pthread_cond_destroy (&_cond);
248 posix_assert (rc);
249 }
250
251 inline int wait (mutex_t *mutex_, int timeout_)
252 {
253 int rc;
254
255 if (timeout_ != -1) {
256 struct timespec timeout;
257
258#ifdef ZMQ_HAVE_OSX
259 timeout.tv_sec = 0;
260 timeout.tv_nsec = 0;
261#else
262 clock_gettime (CLOCK_MONOTONIC, &timeout);
263#endif
264
265 timeout.tv_sec += timeout_ / 1000;
266 timeout.tv_nsec += (timeout_ % 1000) * 1000000;
267
268 if (timeout.tv_nsec > 1000000000) {
269 timeout.tv_sec++;
270 timeout.tv_nsec -= 1000000000;
271 }
272#ifdef ZMQ_HAVE_OSX
273 rc = pthread_cond_timedwait_relative_np (
274 &_cond, mutex_->get_mutex (), &timeout);
275#elif defined(ANDROID_LEGACY)
276 rc = pthread_cond_timedwait_monotonic_np (
277 &_cond, mutex_->get_mutex (), &timeout);
278#else
279 rc =
280 pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout);
281#endif
282 } else
283 rc = pthread_cond_wait (&_cond, mutex_->get_mutex ());
284
285 if (rc == 0)
286 return 0;
287
288 if (rc == ETIMEDOUT) {
289 errno = EAGAIN;
290 return -1;
291 }
292
293 posix_assert (rc);
294 return -1;
295 }
296
297 inline void broadcast ()
298 {
299 int rc = pthread_cond_broadcast (&_cond);
300 posix_assert (rc);
301 }
302
303 private:
304 pthread_cond_t _cond;
305
306 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
307};
308}
309
310#endif
311
312#endif
313