1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "poller.hpp"
32#include "polling_util.hpp"
33
34#if defined ZMQ_POLL_BASED_ON_POLL
35#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
36#include <poll.h>
37#endif
38#elif defined ZMQ_POLL_BASED_ON_SELECT
39#if defined ZMQ_HAVE_WINDOWS
40#elif defined ZMQ_HAVE_HPUX
41#include <sys/param.h>
42#include <sys/types.h>
43#include <sys/time.h>
44#elif defined ZMQ_HAVE_OPENVMS
45#include <sys/types.h>
46#include <sys/time.h>
47#elif defined ZMQ_HAVE_VXWORKS
48#include <sys/types.h>
49#include <sys/time.h>
50#include <sockLib.h>
51#include <strings.h>
52#else
53#include <sys/select.h>
54#endif
55#endif
56
57#include "signaler.hpp"
58#include "likely.hpp"
59#include "stdint.hpp"
60#include "config.hpp"
61#include "err.hpp"
62#include "fd.hpp"
63#include "ip.hpp"
64#include "tcp.hpp"
65
66#if !defined ZMQ_HAVE_WINDOWS
67#include <unistd.h>
68#include <netinet/tcp.h>
69#include <sys/types.h>
70#include <sys/socket.h>
71#endif
72
73#if !defined(ZMQ_HAVE_WINDOWS)
74// Helper to sleep for specific number of milliseconds (or until signal)
75//
76static int sleep_ms (unsigned int ms_)
77{
78 if (ms_ == 0)
79 return 0;
80#if defined ZMQ_HAVE_ANDROID
81 usleep (ms_ * 1000);
82 return 0;
83#elif defined ZMQ_HAVE_VXWORKS
84 struct timespec ns_;
85 ns_.tv_sec = ms_ / 1000;
86 ns_.tv_nsec = ms_ % 1000 * 1000000;
87 return nanosleep (&ns_, 0);
88#else
89 return usleep (ms_ * 1000);
90#endif
91}
92
93// Helper to wait on close(), for non-blocking sockets, until it completes
94// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
95// the overall timeout is reached.
96//
97static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
98{
99 unsigned int ms_so_far = 0;
100 const unsigned int min_step_ms = 1;
101 const unsigned int max_step_ms = 100;
102 const unsigned int step_ms =
103 std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
104
105 int rc = 0; // do not sleep on first attempt
106 do {
107 if (rc == -1 && errno == EAGAIN) {
108 sleep_ms (step_ms);
109 ms_so_far += step_ms;
110 }
111 rc = close (fd_);
112 } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
113
114 return rc;
115}
116#endif
117
118zmq::signaler_t::signaler_t ()
119{
120 // Create the socketpair for signaling.
121 if (make_fdpair (&_r, &_w) == 0) {
122 unblock_socket (_w);
123 unblock_socket (_r);
124 }
125#ifdef HAVE_FORK
126 pid = getpid ();
127#endif
128}
129
130// This might get run after some part of construction failed, leaving one or
131// both of _r and _w retired_fd.
132zmq::signaler_t::~signaler_t ()
133{
134#if defined ZMQ_HAVE_EVENTFD
135 if (_r == retired_fd)
136 return;
137 int rc = close_wait_ms (_r);
138 errno_assert (rc == 0);
139#elif defined ZMQ_HAVE_WINDOWS
140 if (_w != retired_fd) {
141 const struct linger so_linger = {1, 0};
142 int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
143 reinterpret_cast<const char *> (&so_linger),
144 sizeof so_linger);
145 // Only check shutdown if WSASTARTUP was previously done
146 if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
147 wsa_assert (rc != SOCKET_ERROR);
148 rc = closesocket (_w);
149 wsa_assert (rc != SOCKET_ERROR);
150 if (_r == retired_fd)
151 return;
152 rc = closesocket (_r);
153 wsa_assert (rc != SOCKET_ERROR);
154 }
155 }
156#else
157 if (_w != retired_fd) {
158 int rc = close_wait_ms (_w);
159 errno_assert (rc == 0);
160 }
161 if (_r != retired_fd) {
162 int rc = close_wait_ms (_r);
163 errno_assert (rc == 0);
164 }
165#endif
166}
167
168zmq::fd_t zmq::signaler_t::get_fd () const
169{
170 return _r;
171}
172
173void zmq::signaler_t::send ()
174{
175#if defined HAVE_FORK
176 if (unlikely (pid != getpid ())) {
177 //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
178 return; // do not send anything in forked child context
179 }
180#endif
181#if defined ZMQ_HAVE_EVENTFD
182 const uint64_t inc = 1;
183 ssize_t sz = write (_w, &inc, sizeof (inc));
184 errno_assert (sz == sizeof (inc));
185#elif defined ZMQ_HAVE_WINDOWS
186 const char dummy = 0;
187 int nbytes;
188 do {
189 nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
190 wsa_assert (nbytes != SOCKET_ERROR);
191 // wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry.
192 } while (nbytes == SOCKET_ERROR);
193 // Given the small size of dummy (should be 1) expect that send was able to send everything.
194 zmq_assert (nbytes == sizeof (dummy));
195#elif defined ZMQ_HAVE_VXWORKS
196 unsigned char dummy = 0;
197 while (true) {
198 ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
199 if (unlikely (nbytes == -1 && errno == EINTR))
200 continue;
201#if defined(HAVE_FORK)
202 if (unlikely (pid != getpid ())) {
203 //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
204 errno = EINTR;
205 break;
206 }
207#endif
208 zmq_assert (nbytes == sizeof dummy);
209 break;
210 }
211#else
212 unsigned char dummy = 0;
213 while (true) {
214 ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
215 if (unlikely (nbytes == -1 && errno == EINTR))
216 continue;
217#if defined(HAVE_FORK)
218 if (unlikely (pid != getpid ())) {
219 //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
220 errno = EINTR;
221 break;
222 }
223#endif
224 zmq_assert (nbytes == sizeof dummy);
225 break;
226 }
227#endif
228}
229
230int zmq::signaler_t::wait (int timeout_)
231{
232#ifdef HAVE_FORK
233 if (unlikely (pid != getpid ())) {
234 // we have forked and the file descriptor is closed. Emulate an interrupt
235 // response.
236 //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
237 errno = EINTR;
238 return -1;
239 }
240#endif
241
242#ifdef ZMQ_POLL_BASED_ON_POLL
243 struct pollfd pfd;
244 pfd.fd = _r;
245 pfd.events = POLLIN;
246 const int rc = poll (&pfd, 1, timeout_);
247 if (unlikely (rc < 0)) {
248 errno_assert (errno == EINTR);
249 return -1;
250 }
251 if (unlikely (rc == 0)) {
252 errno = EAGAIN;
253 return -1;
254 }
255#ifdef HAVE_FORK
256 if (unlikely (pid != getpid ())) {
257 // we have forked and the file descriptor is closed. Emulate an interrupt
258 // response.
259 //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
260 errno = EINTR;
261 return -1;
262 }
263#endif
264 zmq_assert (rc == 1);
265 zmq_assert (pfd.revents & POLLIN);
266 return 0;
267
268#elif defined ZMQ_POLL_BASED_ON_SELECT
269
270 optimized_fd_set_t fds (1);
271 FD_ZERO (fds.get ());
272 FD_SET (_r, fds.get ());
273 struct timeval timeout;
274 if (timeout_ >= 0) {
275 timeout.tv_sec = timeout_ / 1000;
276 timeout.tv_usec = timeout_ % 1000 * 1000;
277 }
278#ifdef ZMQ_HAVE_WINDOWS
279 int rc =
280 select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
281 wsa_assert (rc != SOCKET_ERROR);
282#else
283 int rc =
284 select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
285 if (unlikely (rc < 0)) {
286 errno_assert (errno == EINTR);
287 return -1;
288 }
289#endif
290 if (unlikely (rc == 0)) {
291 errno = EAGAIN;
292 return -1;
293 }
294 zmq_assert (rc == 1);
295 return 0;
296
297#else
298#error
299#endif
300}
301
302void zmq::signaler_t::recv ()
303{
304// Attempt to read a signal.
305#if defined ZMQ_HAVE_EVENTFD
306 uint64_t dummy;
307 ssize_t sz = read (_r, &dummy, sizeof (dummy));
308 errno_assert (sz == sizeof (dummy));
309
310 // If we accidentally grabbed the next signal(s) along with the current
311 // one, return it back to the eventfd object.
312 if (unlikely (dummy > 1)) {
313 const uint64_t inc = dummy - 1;
314 ssize_t sz2 = write (_w, &inc, sizeof (inc));
315 errno_assert (sz2 == sizeof (inc));
316 return;
317 }
318
319 zmq_assert (dummy == 1);
320#else
321 unsigned char dummy;
322#if defined ZMQ_HAVE_WINDOWS
323 const int nbytes =
324 ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
325 wsa_assert (nbytes != SOCKET_ERROR);
326#elif defined ZMQ_HAVE_VXWORKS
327 ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
328 errno_assert (nbytes >= 0);
329#else
330 ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
331 errno_assert (nbytes >= 0);
332#endif
333 zmq_assert (nbytes == sizeof (dummy));
334 zmq_assert (dummy == 0);
335#endif
336}
337
338int zmq::signaler_t::recv_failable ()
339{
340// Attempt to read a signal.
341#if defined ZMQ_HAVE_EVENTFD
342 uint64_t dummy;
343 ssize_t sz = read (_r, &dummy, sizeof (dummy));
344 if (sz == -1) {
345 errno_assert (errno == EAGAIN);
346 return -1;
347 }
348 errno_assert (sz == sizeof (dummy));
349
350 // If we accidentally grabbed the next signal(s) along with the current
351 // one, return it back to the eventfd object.
352 if (unlikely (dummy > 1)) {
353 const uint64_t inc = dummy - 1;
354 ssize_t sz2 = write (_w, &inc, sizeof (inc));
355 errno_assert (sz2 == sizeof (inc));
356 return 0;
357 }
358
359 zmq_assert (dummy == 1);
360
361#else
362 unsigned char dummy;
363#if defined ZMQ_HAVE_WINDOWS
364 const int nbytes =
365 ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
366 if (nbytes == SOCKET_ERROR) {
367 const int last_error = WSAGetLastError ();
368 if (last_error == WSAEWOULDBLOCK) {
369 errno = EAGAIN;
370 return -1;
371 }
372 wsa_assert (last_error == WSAEWOULDBLOCK);
373 }
374#elif defined ZMQ_HAVE_VXWORKS
375 ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
376 if (nbytes == -1) {
377 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
378 errno = EAGAIN;
379 return -1;
380 }
381 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
382 || errno == EINTR);
383 }
384#else
385 ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
386 if (nbytes == -1) {
387 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
388 errno = EAGAIN;
389 return -1;
390 }
391 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
392 || errno == EINTR);
393 }
394#endif
395 zmq_assert (nbytes == sizeof (dummy));
396 zmq_assert (dummy == 0);
397#endif
398 return 0;
399}
400
401bool zmq::signaler_t::valid () const
402{
403 return _w != retired_fd;
404}
405
406#ifdef HAVE_FORK
407void zmq::signaler_t::forked ()
408{
409 // Close file descriptors created in the parent and create new pair
410 close (_r);
411 close (_w);
412 make_fdpair (&_r, &_w);
413}
414#endif
415