1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ |
31 | #define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ |
32 | |
33 | #include "err.hpp" |
34 | #include "mutex.hpp" |
35 | |
36 | // Condition variable class encapsulates OS mutex in a platform-independent way. |
37 | |
38 | #if defined(ZMQ_USE_CV_IMPL_NONE) |
39 | |
40 | namespace zmq |
41 | { |
42 | class condition_variable_t |
43 | { |
44 | public: |
45 | inline condition_variable_t () { zmq_assert (false); } |
46 | |
47 | inline int wait (mutex_t *mutex_, int timeout_) |
48 | { |
49 | zmq_assert (false); |
50 | return -1; |
51 | } |
52 | |
53 | inline void broadcast () { zmq_assert (false); } |
54 | |
55 | ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) |
56 | }; |
57 | } |
58 | |
59 | #elif defined(ZMQ_USE_CV_IMPL_WIN32API) |
60 | |
61 | #include "windows.hpp" |
62 | |
63 | namespace zmq |
64 | { |
65 | class condition_variable_t |
66 | { |
67 | public: |
68 | inline condition_variable_t () { InitializeConditionVariable (&_cv); } |
69 | |
70 | inline int wait (mutex_t *mutex_, int timeout_) |
71 | { |
72 | int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_); |
73 | |
74 | if (rc != 0) |
75 | return 0; |
76 | |
77 | rc = GetLastError (); |
78 | |
79 | if (rc != ERROR_TIMEOUT) |
80 | win_assert (rc); |
81 | |
82 | errno = EAGAIN; |
83 | return -1; |
84 | } |
85 | |
86 | inline void broadcast () { WakeAllConditionVariable (&_cv); } |
87 | |
88 | private: |
89 | CONDITION_VARIABLE _cv; |
90 | |
91 | ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) |
92 | }; |
93 | } |
94 | |
95 | #elif defined(ZMQ_USE_CV_IMPL_STL11) |
96 | |
97 | #include <condition_variable> |
98 | |
99 | namespace zmq |
100 | { |
101 | class condition_variable_t |
102 | { |
103 | public: |
104 | inline condition_variable_t () ZMQ_DEFAULT; |
105 | |
106 | inline int wait (mutex_t *mutex_, int timeout_) |
107 | { |
108 | // this assumes that the mutex mutex_ has been locked by the caller |
109 | int res = 0; |
110 | if (timeout_ == -1) { |
111 | _cv.wait ( |
112 | *mutex_); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all() |
113 | } else if (_cv.wait_for (*mutex_, std::chrono::milliseconds (timeout_)) |
114 | == std::cv_status::timeout) { |
115 | // time expired |
116 | errno = EAGAIN; |
117 | res = -1; |
118 | } |
119 | return res; |
120 | } |
121 | |
122 | inline void broadcast () |
123 | { |
124 | // this assumes that the mutex associated with _cv has been locked by the caller |
125 | _cv.notify_all (); |
126 | } |
127 | |
128 | private: |
129 | std::condition_variable_any _cv; |
130 | |
131 | ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) |
132 | }; |
133 | } |
134 | |
135 | #elif defined(ZMQ_USE_CV_IMPL_VXWORKS) |
136 | |
137 | #include <sysLib.h> |
138 | |
139 | namespace zmq |
140 | { |
141 | class condition_variable_t |
142 | { |
143 | public: |
144 | inline condition_variable_t () ZMQ_DEFAULT; |
145 | |
146 | inline ~condition_variable_t () |
147 | { |
148 | scoped_lock_t l (_listenersMutex); |
149 | for (size_t i = 0; i < _listeners.size (); i++) { |
150 | semDelete (_listeners[i]); |
151 | } |
152 | } |
153 | |
154 | inline int wait (mutex_t *mutex_, int timeout_) |
155 | { |
156 | //Atomically releases lock, blocks the current executing thread, |
157 | //and adds it to the list of threads waiting on *this. The thread |
158 | //will be unblocked when broadcast() is executed. |
159 | //It may also be unblocked spuriously. When unblocked, regardless |
160 | //of the reason, lock is reacquired and wait exits. |
161 | |
162 | SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY); |
163 | { |
164 | scoped_lock_t l (_listenersMutex); |
165 | _listeners.push_back (sem); |
166 | } |
167 | mutex_->unlock (); |
168 | |
169 | int rc; |
170 | if (timeout_ < 0) |
171 | rc = semTake (sem, WAIT_FOREVER); |
172 | else { |
173 | int ticksPerSec = sysClkRateGet (); |
174 | int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1; |
175 | rc = semTake (sem, timeoutTicks); |
176 | } |
177 | |
178 | { |
179 | scoped_lock_t l (_listenersMutex); |
180 | // remove sem from listeners |
181 | for (size_t i = 0; i < _listeners.size (); i++) { |
182 | if (_listeners[i] == sem) { |
183 | _listeners.erase (_listeners.begin () + i); |
184 | break; |
185 | } |
186 | } |
187 | semDelete (sem); |
188 | } |
189 | mutex_->lock (); |
190 | |
191 | if (rc == 0) |
192 | return 0; |
193 | |
194 | if (rc == S_objLib_OBJ_TIMEOUT) { |
195 | errno = EAGAIN; |
196 | return -1; |
197 | } |
198 | |
199 | return -1; |
200 | } |
201 | |
202 | inline void broadcast () |
203 | { |
204 | scoped_lock_t l (_listenersMutex); |
205 | for (size_t i = 0; i < _listeners.size (); i++) { |
206 | semGive (_listeners[i]); |
207 | } |
208 | } |
209 | |
210 | private: |
211 | mutex_t _listenersMutex; |
212 | std::vector<SEM_ID> _listeners; |
213 | |
214 | ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) |
215 | }; |
216 | } |
217 | |
218 | #elif defined(ZMQ_USE_CV_IMPL_PTHREADS) |
219 | |
220 | #include <pthread.h> |
221 | |
222 | #if defined(__ANDROID_API__) && __ANDROID_API__ < 21 |
223 | #define ANDROID_LEGACY |
224 | extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *, |
225 | pthread_mutex_t *, |
226 | const struct timespec *); |
227 | #endif |
228 | |
229 | namespace zmq |
230 | { |
231 | class condition_variable_t |
232 | { |
233 | public: |
234 | inline condition_variable_t () |
235 | { |
236 | pthread_condattr_t attr; |
237 | pthread_condattr_init (&attr); |
238 | #if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY) |
239 | pthread_condattr_setclock (&attr, CLOCK_MONOTONIC); |
240 | #endif |
241 | int rc = pthread_cond_init (&_cond, &attr); |
242 | posix_assert (rc); |
243 | } |
244 | |
245 | inline ~condition_variable_t () |
246 | { |
247 | int rc = pthread_cond_destroy (&_cond); |
248 | posix_assert (rc); |
249 | } |
250 | |
251 | inline int wait (mutex_t *mutex_, int timeout_) |
252 | { |
253 | int rc; |
254 | |
255 | if (timeout_ != -1) { |
256 | struct timespec timeout; |
257 | |
258 | #ifdef ZMQ_HAVE_OSX |
259 | timeout.tv_sec = 0; |
260 | timeout.tv_nsec = 0; |
261 | #else |
262 | clock_gettime (CLOCK_MONOTONIC, &timeout); |
263 | #endif |
264 | |
265 | timeout.tv_sec += timeout_ / 1000; |
266 | timeout.tv_nsec += (timeout_ % 1000) * 1000000; |
267 | |
268 | if (timeout.tv_nsec > 1000000000) { |
269 | timeout.tv_sec++; |
270 | timeout.tv_nsec -= 1000000000; |
271 | } |
272 | #ifdef ZMQ_HAVE_OSX |
273 | rc = pthread_cond_timedwait_relative_np ( |
274 | &_cond, mutex_->get_mutex (), &timeout); |
275 | #elif defined(ANDROID_LEGACY) |
276 | rc = pthread_cond_timedwait_monotonic_np ( |
277 | &_cond, mutex_->get_mutex (), &timeout); |
278 | #else |
279 | rc = |
280 | pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout); |
281 | #endif |
282 | } else |
283 | rc = pthread_cond_wait (&_cond, mutex_->get_mutex ()); |
284 | |
285 | if (rc == 0) |
286 | return 0; |
287 | |
288 | if (rc == ETIMEDOUT) { |
289 | errno = EAGAIN; |
290 | return -1; |
291 | } |
292 | |
293 | posix_assert (rc); |
294 | return -1; |
295 | } |
296 | |
297 | inline void broadcast () |
298 | { |
299 | int rc = pthread_cond_broadcast (&_cond); |
300 | posix_assert (rc); |
301 | } |
302 | |
303 | private: |
304 | pthread_cond_t _cond; |
305 | |
306 | ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) |
307 | }; |
308 | } |
309 | |
310 | #endif |
311 | |
312 | #endif |
313 | |