| 1 | /* | 
| 2 |     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file | 
| 3 |  | 
| 4 |     This file is part of libzmq, the ZeroMQ core engine in C++. | 
| 5 |  | 
| 6 |     libzmq is free software; you can redistribute it and/or modify it under | 
| 7 |     the terms of the GNU Lesser General Public License (LGPL) as published | 
| 8 |     by the Free Software Foundation; either version 3 of the License, or | 
| 9 |     (at your option) any later version. | 
| 10 |  | 
| 11 |     As a special exception, the Contributors give you permission to link | 
| 12 |     this library with independent modules to produce an executable, | 
| 13 |     regardless of the license terms of these independent modules, and to | 
| 14 |     copy and distribute the resulting executable under terms of your choice, | 
| 15 |     provided that you also meet, for each linked independent module, the | 
| 16 |     terms and conditions of the license of that module. An independent | 
| 17 |     module is a module which is not derived from or based on this library. | 
| 18 |     If you modify this library, you must extend this exception to your | 
| 19 |     version of the library. | 
| 20 |  | 
| 21 |     libzmq is distributed in the hope that it will be useful, but WITHOUT | 
| 22 |     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | 
| 23 |     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | 
| 24 |     License for more details. | 
| 25 |  | 
| 26 |     You should have received a copy of the GNU Lesser General Public License | 
| 27 |     along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
| 28 | */ | 
| 29 |  | 
| 30 | #include "precompiled.hpp" | 
| 31 | #include "socket_poller.hpp" | 
| 32 | #include "err.hpp" | 
| 33 | #include "polling_util.hpp" | 
| 34 | #include "macros.hpp" | 
| 35 |  | 
| 36 | #include <limits.h> | 
| 37 |  | 
| 38 | static bool is_thread_safe (zmq::socket_base_t &socket_) | 
| 39 | { | 
| 40 |     // do not use getsockopt here, since that would fail during context termination | 
| 41 |     return socket_.is_thread_safe (); | 
| 42 | } | 
| 43 |  | 
| 44 | zmq::socket_poller_t::socket_poller_t () : | 
| 45 |     _tag (0xCAFEBABE), | 
| 46 |     _signaler (NULL) | 
| 47 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 48 |     , | 
| 49 |     _pollfds (NULL) | 
| 50 | #elif defined ZMQ_POLL_BASED_ON_SELECT | 
| 51 |     , | 
| 52 |     _max_fd (0) | 
| 53 | #endif | 
| 54 | { | 
| 55 |     rebuild (); | 
| 56 | } | 
| 57 |  | 
| 58 | zmq::socket_poller_t::~socket_poller_t () | 
| 59 | { | 
| 60 |     //  Mark the socket_poller as dead | 
| 61 |     _tag = 0xdeadbeef; | 
| 62 |  | 
| 63 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 64 |          ++it) { | 
| 65 |         // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead? | 
| 66 |         if (it->socket && it->socket->check_tag () | 
| 67 |             && is_thread_safe (*it->socket)) { | 
| 68 |             it->socket->remove_signaler (_signaler); | 
| 69 |         } | 
| 70 |     } | 
| 71 |  | 
| 72 |     if (_signaler != NULL) { | 
| 73 |         LIBZMQ_DELETE (_signaler); | 
| 74 |     } | 
| 75 |  | 
| 76 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 77 |     if (_pollfds) { | 
| 78 |         free (_pollfds); | 
| 79 |         _pollfds = NULL; | 
| 80 |     } | 
| 81 | #endif | 
| 82 | } | 
| 83 |  | 
| 84 | bool zmq::socket_poller_t::check_tag () | 
| 85 | { | 
| 86 |     return _tag == 0xCAFEBABE; | 
| 87 | } | 
| 88 |  | 
| 89 | int zmq::socket_poller_t::signaler_fd (fd_t *fd_) | 
| 90 | { | 
| 91 |     if (_signaler) { | 
| 92 |         *fd_ = _signaler->get_fd (); | 
| 93 |         return 0; | 
| 94 |     } | 
| 95 |     // Only thread-safe socket types are guaranteed to have a signaler. | 
| 96 |     errno = EINVAL; | 
| 97 |     return -1; | 
| 98 | } | 
| 99 |  | 
| 100 | int zmq::socket_poller_t::add (socket_base_t *socket_, | 
| 101 |                                void *user_data_, | 
| 102 |                                short events_) | 
| 103 | { | 
| 104 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 105 |          ++it) { | 
| 106 |         if (it->socket == socket_) { | 
| 107 |             errno = EINVAL; | 
| 108 |             return -1; | 
| 109 |         } | 
| 110 |     } | 
| 111 |  | 
| 112 |     if (is_thread_safe (*socket_)) { | 
| 113 |         if (_signaler == NULL) { | 
| 114 |             _signaler = new (std::nothrow) signaler_t (); | 
| 115 |             if (!_signaler) { | 
| 116 |                 errno = ENOMEM; | 
| 117 |                 return -1; | 
| 118 |             } | 
| 119 |             if (!_signaler->valid ()) { | 
| 120 |                 delete _signaler; | 
| 121 |                 _signaler = NULL; | 
| 122 |                 errno = EMFILE; | 
| 123 |                 return -1; | 
| 124 |             } | 
| 125 |         } | 
| 126 |  | 
| 127 |         socket_->add_signaler (_signaler); | 
| 128 |     } | 
| 129 |  | 
| 130 |     item_t item = { | 
| 131 |         socket_, | 
| 132 |         0, | 
| 133 |         user_data_, | 
| 134 |         events_ | 
| 135 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 136 |         , | 
| 137 |         -1 | 
| 138 | #endif | 
| 139 |     }; | 
| 140 |     try { | 
| 141 |         _items.push_back (item); | 
| 142 |     } | 
| 143 |     catch (const std::bad_alloc &) { | 
| 144 |         errno = ENOMEM; | 
| 145 |         return -1; | 
| 146 |     } | 
| 147 |     _need_rebuild = true; | 
| 148 |  | 
| 149 |     return 0; | 
| 150 | } | 
| 151 |  | 
| 152 | int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) | 
| 153 | { | 
| 154 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 155 |          ++it) { | 
| 156 |         if (!it->socket && it->fd == fd_) { | 
| 157 |             errno = EINVAL; | 
| 158 |             return -1; | 
| 159 |         } | 
| 160 |     } | 
| 161 |  | 
| 162 |     item_t item = { | 
| 163 |         NULL, | 
| 164 |         fd_, | 
| 165 |         user_data_, | 
| 166 |         events_ | 
| 167 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 168 |         , | 
| 169 |         -1 | 
| 170 | #endif | 
| 171 |     }; | 
| 172 |     try { | 
| 173 |         _items.push_back (item); | 
| 174 |     } | 
| 175 |     catch (const std::bad_alloc &) { | 
| 176 |         errno = ENOMEM; | 
| 177 |         return -1; | 
| 178 |     } | 
| 179 |     _need_rebuild = true; | 
| 180 |  | 
| 181 |     return 0; | 
| 182 | } | 
| 183 |  | 
| 184 | int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_) | 
| 185 | { | 
| 186 |     const items_t::iterator end = _items.end (); | 
| 187 |     items_t::iterator it; | 
| 188 |  | 
| 189 |     for (it = _items.begin (); it != end; ++it) { | 
| 190 |         if (it->socket == socket_) | 
| 191 |             break; | 
| 192 |     } | 
| 193 |  | 
| 194 |     if (it == end) { | 
| 195 |         errno = EINVAL; | 
| 196 |         return -1; | 
| 197 |     } | 
| 198 |  | 
| 199 |     it->events = events_; | 
| 200 |     _need_rebuild = true; | 
| 201 |  | 
| 202 |     return 0; | 
| 203 | } | 
| 204 |  | 
| 205 |  | 
| 206 | int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) | 
| 207 | { | 
| 208 |     const items_t::iterator end = _items.end (); | 
| 209 |     items_t::iterator it; | 
| 210 |  | 
| 211 |     for (it = _items.begin (); it != end; ++it) { | 
| 212 |         if (!it->socket && it->fd == fd_) | 
| 213 |             break; | 
| 214 |     } | 
| 215 |  | 
| 216 |     if (it == end) { | 
| 217 |         errno = EINVAL; | 
| 218 |         return -1; | 
| 219 |     } | 
| 220 |  | 
| 221 |     it->events = events_; | 
| 222 |     _need_rebuild = true; | 
| 223 |  | 
| 224 |     return 0; | 
| 225 | } | 
| 226 |  | 
| 227 |  | 
| 228 | int zmq::socket_poller_t::remove (socket_base_t *socket_) | 
| 229 | { | 
| 230 |     const items_t::iterator end = _items.end (); | 
| 231 |     items_t::iterator it; | 
| 232 |  | 
| 233 |     for (it = _items.begin (); it != end; ++it) { | 
| 234 |         if (it->socket == socket_) | 
| 235 |             break; | 
| 236 |     } | 
| 237 |  | 
| 238 |     if (it == end) { | 
| 239 |         errno = EINVAL; | 
| 240 |         return -1; | 
| 241 |     } | 
| 242 |  | 
| 243 |     _items.erase (it); | 
| 244 |     _need_rebuild = true; | 
| 245 |  | 
| 246 |     if (is_thread_safe (*socket_)) { | 
| 247 |         socket_->remove_signaler (_signaler); | 
| 248 |     } | 
| 249 |  | 
| 250 |     return 0; | 
| 251 | } | 
| 252 |  | 
| 253 | int zmq::socket_poller_t::remove_fd (fd_t fd_) | 
| 254 | { | 
| 255 |     const items_t::iterator end = _items.end (); | 
| 256 |     items_t::iterator it; | 
| 257 |  | 
| 258 |     for (it = _items.begin (); it != end; ++it) { | 
| 259 |         if (!it->socket && it->fd == fd_) | 
| 260 |             break; | 
| 261 |     } | 
| 262 |  | 
| 263 |     if (it == end) { | 
| 264 |         errno = EINVAL; | 
| 265 |         return -1; | 
| 266 |     } | 
| 267 |  | 
| 268 |     _items.erase (it); | 
| 269 |     _need_rebuild = true; | 
| 270 |  | 
| 271 |     return 0; | 
| 272 | } | 
| 273 |  | 
| 274 | int zmq::socket_poller_t::rebuild () | 
| 275 | { | 
| 276 |     _use_signaler = false; | 
| 277 |     _pollset_size = 0; | 
| 278 |     _need_rebuild = false; | 
| 279 |  | 
| 280 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 281 |  | 
| 282 |     if (_pollfds) { | 
| 283 |         free (_pollfds); | 
| 284 |         _pollfds = NULL; | 
| 285 |     } | 
| 286 |  | 
| 287 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 288 |          ++it) { | 
| 289 |         if (it->events) { | 
| 290 |             if (it->socket && is_thread_safe (*it->socket)) { | 
| 291 |                 if (!_use_signaler) { | 
| 292 |                     _use_signaler = true; | 
| 293 |                     _pollset_size++; | 
| 294 |                 } | 
| 295 |             } else | 
| 296 |                 _pollset_size++; | 
| 297 |         } | 
| 298 |     } | 
| 299 |  | 
| 300 |     if (_pollset_size == 0) | 
| 301 |         return 0; | 
| 302 |  | 
| 303 |     _pollfds = static_cast<pollfd *> (malloc (_pollset_size * sizeof (pollfd))); | 
| 304 |  | 
| 305 |     if (!_pollfds) { | 
| 306 |         errno = ENOMEM; | 
| 307 |         _need_rebuild = true; | 
| 308 |         return -1; | 
| 309 |     } | 
| 310 |  | 
| 311 |     int item_nbr = 0; | 
| 312 |  | 
| 313 |     if (_use_signaler) { | 
| 314 |         item_nbr = 1; | 
| 315 |         _pollfds[0].fd = _signaler->get_fd (); | 
| 316 |         _pollfds[0].events = POLLIN; | 
| 317 |     } | 
| 318 |  | 
| 319 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 320 |          ++it) { | 
| 321 |         if (it->events) { | 
| 322 |             if (it->socket) { | 
| 323 |                 if (!is_thread_safe (*it->socket)) { | 
| 324 |                     size_t fd_size = sizeof (zmq::fd_t); | 
| 325 |                     int rc = it->socket->getsockopt ( | 
| 326 |                       ZMQ_FD, &_pollfds[item_nbr].fd, &fd_size); | 
| 327 |                     zmq_assert (rc == 0); | 
| 328 |  | 
| 329 |                     _pollfds[item_nbr].events = POLLIN; | 
| 330 |                     item_nbr++; | 
| 331 |                 } | 
| 332 |             } else { | 
| 333 |                 _pollfds[item_nbr].fd = it->fd; | 
| 334 |                 _pollfds[item_nbr].events = | 
| 335 |                   (it->events & ZMQ_POLLIN ? POLLIN : 0) | 
| 336 |                   | (it->events & ZMQ_POLLOUT ? POLLOUT : 0) | 
| 337 |                   | (it->events & ZMQ_POLLPRI ? POLLPRI : 0); | 
| 338 |                 it->pollfd_index = item_nbr; | 
| 339 |                 item_nbr++; | 
| 340 |             } | 
| 341 |         } | 
| 342 |     } | 
| 343 |  | 
| 344 | #elif defined ZMQ_POLL_BASED_ON_SELECT | 
| 345 |  | 
| 346 |     //  Ensure we do not attempt to select () on more than FD_SETSIZE | 
| 347 |     //  file descriptors. | 
| 348 |     zmq_assert (_items.size () <= FD_SETSIZE); | 
| 349 |  | 
| 350 |     _pollset_in.resize (_items.size ()); | 
| 351 |     _pollset_out.resize (_items.size ()); | 
| 352 |     _pollset_err.resize (_items.size ()); | 
| 353 |  | 
| 354 |     FD_ZERO (_pollset_in.get ()); | 
| 355 |     FD_ZERO (_pollset_out.get ()); | 
| 356 |     FD_ZERO (_pollset_err.get ()); | 
| 357 |  | 
| 358 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 359 |          ++it) { | 
| 360 |         if (it->socket && is_thread_safe (*it->socket) && it->events) { | 
| 361 |             _use_signaler = true; | 
| 362 |             FD_SET (_signaler->get_fd (), _pollset_in.get ()); | 
| 363 |             _pollset_size = 1; | 
| 364 |             break; | 
| 365 |         } | 
| 366 |     } | 
| 367 |  | 
| 368 |     _max_fd = 0; | 
| 369 |  | 
| 370 |     //  Build the fd_sets for passing to select (). | 
| 371 |     for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; | 
| 372 |          ++it) { | 
| 373 |         if (it->events) { | 
| 374 |             //  If the poll item is a 0MQ socket we are interested in input on the | 
| 375 |             //  notification file descriptor retrieved by the ZMQ_FD socket option. | 
| 376 |             if (it->socket) { | 
| 377 |                 if (!is_thread_safe (*it->socket)) { | 
| 378 |                     zmq::fd_t notify_fd; | 
| 379 |                     size_t fd_size = sizeof (zmq::fd_t); | 
| 380 |                     int rc = | 
| 381 |                       it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size); | 
| 382 |                     zmq_assert (rc == 0); | 
| 383 |  | 
| 384 |                     FD_SET (notify_fd, _pollset_in.get ()); | 
| 385 |                     if (_max_fd < notify_fd) | 
| 386 |                         _max_fd = notify_fd; | 
| 387 |  | 
| 388 |                     _pollset_size++; | 
| 389 |                 } | 
| 390 |             } | 
| 391 |             //  Else, the poll item is a raw file descriptor. Convert the poll item | 
| 392 |             //  events to the appropriate fd_sets. | 
| 393 |             else { | 
| 394 |                 if (it->events & ZMQ_POLLIN) | 
| 395 |                     FD_SET (it->fd, _pollset_in.get ()); | 
| 396 |                 if (it->events & ZMQ_POLLOUT) | 
| 397 |                     FD_SET (it->fd, _pollset_out.get ()); | 
| 398 |                 if (it->events & ZMQ_POLLERR) | 
| 399 |                     FD_SET (it->fd, _pollset_err.get ()); | 
| 400 |                 if (_max_fd < it->fd) | 
| 401 |                     _max_fd = it->fd; | 
| 402 |  | 
| 403 |                 _pollset_size++; | 
| 404 |             } | 
| 405 |         } | 
| 406 |     } | 
| 407 |  | 
| 408 | #endif | 
| 409 |  | 
| 410 |     return 0; | 
| 411 | } | 
| 412 |  | 
| 413 | void zmq::socket_poller_t::zero_trail_events ( | 
| 414 |   zmq::socket_poller_t::event_t *events_, int n_events_, int found_) | 
| 415 | { | 
| 416 |     for (int i = found_; i < n_events_; ++i) { | 
| 417 |         events_[i].socket = NULL; | 
| 418 |         events_[i].fd = 0; | 
| 419 |         events_[i].user_data = NULL; | 
| 420 |         events_[i].events = 0; | 
| 421 |     } | 
| 422 | } | 
| 423 |  | 
| 424 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 425 | int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_, | 
| 426 |                                         int n_events_) | 
| 427 | #elif defined ZMQ_POLL_BASED_ON_SELECT | 
| 428 | int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_, | 
| 429 |                                         int n_events_, | 
| 430 |                                         fd_set &inset_, | 
| 431 |                                         fd_set &outset_, | 
| 432 |                                         fd_set &errset_) | 
| 433 | #endif | 
| 434 | { | 
| 435 |     int found = 0; | 
| 436 |     for (items_t::iterator it = _items.begin (), end = _items.end (); | 
| 437 |          it != end && found < n_events_; ++it) { | 
| 438 |         //  The poll item is a 0MQ socket. Retrieve pending events | 
| 439 |         //  using the ZMQ_EVENTS socket option. | 
| 440 |         if (it->socket) { | 
| 441 |             size_t events_size = sizeof (uint32_t); | 
| 442 |             uint32_t events; | 
| 443 |             if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) | 
| 444 |                 == -1) { | 
| 445 |                 return -1; | 
| 446 |             } | 
| 447 |  | 
| 448 |             if (it->events & events) { | 
| 449 |                 events_[found].socket = it->socket; | 
| 450 |                 events_[found].user_data = it->user_data; | 
| 451 |                 events_[found].events = it->events & events; | 
| 452 |                 ++found; | 
| 453 |             } | 
| 454 |         } | 
| 455 |         //  Else, the poll item is a raw file descriptor, simply convert | 
| 456 |         //  the events to zmq_pollitem_t-style format. | 
| 457 |         else { | 
| 458 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 459 |  | 
| 460 |             short revents = _pollfds[it->pollfd_index].revents; | 
| 461 |             short events = 0; | 
| 462 |  | 
| 463 |             if (revents & POLLIN) | 
| 464 |                 events |= ZMQ_POLLIN; | 
| 465 |             if (revents & POLLOUT) | 
| 466 |                 events |= ZMQ_POLLOUT; | 
| 467 |             if (revents & POLLPRI) | 
| 468 |                 events |= ZMQ_POLLPRI; | 
| 469 |             if (revents & ~(POLLIN | POLLOUT | POLLPRI)) | 
| 470 |                 events |= ZMQ_POLLERR; | 
| 471 |  | 
| 472 | #elif defined ZMQ_POLL_BASED_ON_SELECT | 
| 473 |  | 
| 474 |             short events = 0; | 
| 475 |  | 
| 476 |             if (FD_ISSET (it->fd, &inset_)) | 
| 477 |                 events |= ZMQ_POLLIN; | 
| 478 |             if (FD_ISSET (it->fd, &outset_)) | 
| 479 |                 events |= ZMQ_POLLOUT; | 
| 480 |             if (FD_ISSET (it->fd, &errset_)) | 
| 481 |                 events |= ZMQ_POLLERR; | 
| 482 | #endif //POLL_SELECT | 
| 483 |  | 
| 484 |             if (events) { | 
| 485 |                 events_[found].socket = NULL; | 
| 486 |                 events_[found].user_data = it->user_data; | 
| 487 |                 events_[found].fd = it->fd; | 
| 488 |                 events_[found].events = events; | 
| 489 |                 ++found; | 
| 490 |             } | 
| 491 |         } | 
| 492 |     } | 
| 493 |  | 
| 494 |     return found; | 
| 495 | } | 
| 496 |  | 
| 497 | //Return 0 if timeout is expired otherwise 1 | 
| 498 | int zmq::socket_poller_t::adjust_timeout (zmq::clock_t &clock_, | 
| 499 |                                           long timeout_, | 
| 500 |                                           uint64_t &now_, | 
| 501 |                                           uint64_t &end_, | 
| 502 |                                           bool &first_pass_) | 
| 503 | { | 
| 504 |     //  If socket_poller_t::timeout is zero, exit immediately whether there | 
| 505 |     //  are events or not. | 
| 506 |     if (timeout_ == 0) | 
| 507 |         return 0; | 
| 508 |  | 
| 509 |     //  At this point we are meant to wait for events but there are none. | 
| 510 |     //  If timeout is infinite we can just loop until we get some events. | 
| 511 |     if (timeout_ < 0) { | 
| 512 |         if (first_pass_) | 
| 513 |             first_pass_ = false; | 
| 514 |         return 1; | 
| 515 |     } | 
| 516 |  | 
| 517 |     //  The timeout is finite and there are no events. In the first pass | 
| 518 |     //  we get a timestamp of when the polling have begun. (We assume that | 
| 519 |     //  first pass have taken negligible time). We also compute the time | 
| 520 |     //  when the polling should time out. | 
| 521 |     now_ = clock_.now_ms (); | 
| 522 |     if (first_pass_) { | 
| 523 |         end_ = now_ + timeout_; | 
| 524 |         first_pass_ = false; | 
| 525 |         return 1; | 
| 526 |     } | 
| 527 |  | 
| 528 |     //  Find out whether timeout have expired. | 
| 529 |     if (now_ >= end_) | 
| 530 |         return 0; | 
| 531 |  | 
| 532 |     return 1; | 
| 533 | } | 
| 534 |  | 
| 535 | int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, | 
| 536 |                                 int n_events_, | 
| 537 |                                 long timeout_) | 
| 538 | { | 
| 539 |     if (_items.empty () && timeout_ < 0) { | 
| 540 |         errno = EFAULT; | 
| 541 |         return -1; | 
| 542 |     } | 
| 543 |  | 
| 544 |     if (_need_rebuild) { | 
| 545 |         int rc = rebuild (); | 
| 546 |         if (rc == -1) | 
| 547 |             return -1; | 
| 548 |     } | 
| 549 |  | 
| 550 |     if (unlikely (_pollset_size == 0)) { | 
| 551 |         // We'll report an error (timed out) as if the list was non-empty and | 
| 552 |         // no event occurred within the specified timeout. Otherwise the caller | 
| 553 |         // needs to check the return value AND the event to avoid using the | 
| 554 |         // nullified event data. | 
| 555 |         errno = EAGAIN; | 
| 556 |         if (timeout_ == 0) | 
| 557 |             return -1; | 
| 558 | #if defined ZMQ_HAVE_WINDOWS | 
| 559 |         Sleep (timeout_ > 0 ? timeout_ : INFINITE); | 
| 560 |         return -1; | 
| 561 | #elif defined ZMQ_HAVE_ANDROID | 
| 562 |         usleep (timeout_ * 1000); | 
| 563 |         return -1; | 
| 564 | #elif defined ZMQ_HAVE_OSX | 
| 565 |         usleep (timeout_ * 1000); | 
| 566 |         errno = EAGAIN; | 
| 567 |         return -1; | 
| 568 | #elif defined ZMQ_HAVE_VXWORKS | 
| 569 |         struct timespec ns_; | 
| 570 |         ns_.tv_sec = timeout_ / 1000; | 
| 571 |         ns_.tv_nsec = timeout_ % 1000 * 1000000; | 
| 572 |         nanosleep (&ns_, 0); | 
| 573 |         return -1; | 
| 574 | #else | 
| 575 |         usleep (timeout_ * 1000); | 
| 576 |         return -1; | 
| 577 | #endif | 
| 578 |     } | 
| 579 |  | 
| 580 | #if defined ZMQ_POLL_BASED_ON_POLL | 
| 581 |     zmq::clock_t clock; | 
| 582 |     uint64_t now = 0; | 
| 583 |     uint64_t end = 0; | 
| 584 |  | 
| 585 |     bool first_pass = true; | 
| 586 |  | 
| 587 |     while (true) { | 
| 588 |         //  Compute the timeout for the subsequent poll. | 
| 589 |         int timeout; | 
| 590 |         if (first_pass) | 
| 591 |             timeout = 0; | 
| 592 |         else if (timeout_ < 0) | 
| 593 |             timeout = -1; | 
| 594 |         else | 
| 595 |             timeout = | 
| 596 |               static_cast<int> (std::min<uint64_t> (end - now, INT_MAX)); | 
| 597 |  | 
| 598 |         //  Wait for events. | 
| 599 |         while (true) { | 
| 600 |             int rc = poll (_pollfds, _pollset_size, timeout); | 
| 601 |             if (rc == -1 && errno == EINTR) { | 
| 602 |                 return -1; | 
| 603 |             } | 
| 604 |             errno_assert (rc >= 0); | 
| 605 |             break; | 
| 606 |         } | 
| 607 |  | 
| 608 |         //  Receive the signal from pollfd | 
| 609 |         if (_use_signaler && _pollfds[0].revents & POLLIN) | 
| 610 |             _signaler->recv (); | 
| 611 |  | 
| 612 |         //  Check for the events. | 
| 613 |         int found = check_events (events_, n_events_); | 
| 614 |         if (found) { | 
| 615 |             if (found > 0) | 
| 616 |                 zero_trail_events (events_, n_events_, found); | 
| 617 |             return found; | 
| 618 |         } | 
| 619 |  | 
| 620 |         //  Adjust timeout or break | 
| 621 |         if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0) | 
| 622 |             break; | 
| 623 |     } | 
| 624 |     errno = EAGAIN; | 
| 625 |     return -1; | 
| 626 |  | 
| 627 | #elif defined ZMQ_POLL_BASED_ON_SELECT | 
| 628 |  | 
| 629 |     zmq::clock_t clock; | 
| 630 |     uint64_t now = 0; | 
| 631 |     uint64_t end = 0; | 
| 632 |  | 
| 633 |     bool first_pass = true; | 
| 634 |  | 
| 635 |     optimized_fd_set_t inset (_pollset_size); | 
| 636 |     optimized_fd_set_t outset (_pollset_size); | 
| 637 |     optimized_fd_set_t errset (_pollset_size); | 
| 638 |  | 
| 639 |     while (true) { | 
| 640 |         //  Compute the timeout for the subsequent poll. | 
| 641 |         timeval timeout; | 
| 642 |         timeval *ptimeout; | 
| 643 |         if (first_pass) { | 
| 644 |             timeout.tv_sec = 0; | 
| 645 |             timeout.tv_usec = 0; | 
| 646 |             ptimeout = &timeout; | 
| 647 |         } else if (timeout_ < 0) | 
| 648 |             ptimeout = NULL; | 
| 649 |         else { | 
| 650 |             timeout.tv_sec = static_cast<long> ((end - now) / 1000); | 
| 651 |             timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000); | 
| 652 |             ptimeout = &timeout; | 
| 653 |         } | 
| 654 |  | 
| 655 |         //  Wait for events. Ignore interrupts if there's infinite timeout. | 
| 656 |         while (true) { | 
| 657 |             memcpy (inset.get (), _pollset_in.get (), | 
| 658 |                     valid_pollset_bytes (*_pollset_in.get ())); | 
| 659 |             memcpy (outset.get (), _pollset_out.get (), | 
| 660 |                     valid_pollset_bytes (*_pollset_out.get ())); | 
| 661 |             memcpy (errset.get (), _pollset_err.get (), | 
| 662 |                     valid_pollset_bytes (*_pollset_err.get ())); | 
| 663 |             const int rc = select (static_cast<int> (_max_fd + 1), inset.get (), | 
| 664 |                                    outset.get (), errset.get (), ptimeout); | 
| 665 | #if defined ZMQ_HAVE_WINDOWS | 
| 666 |             if (unlikely (rc == SOCKET_ERROR)) { | 
| 667 |                 errno = wsa_error_to_errno (WSAGetLastError ()); | 
| 668 |                 wsa_assert (errno == ENOTSOCK); | 
| 669 |                 return -1; | 
| 670 |             } | 
| 671 | #else | 
| 672 |             if (unlikely (rc == -1)) { | 
| 673 |                 errno_assert (errno == EINTR || errno == EBADF); | 
| 674 |                 return -1; | 
| 675 |             } | 
| 676 | #endif | 
| 677 |             break; | 
| 678 |         } | 
| 679 |  | 
| 680 |         if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ())) | 
| 681 |             _signaler->recv (); | 
| 682 |  | 
| 683 |         //  Check for the events. | 
| 684 |         const int found = check_events (events_, n_events_, *inset.get (), | 
| 685 |                                         *outset.get (), *errset.get ()); | 
| 686 |         if (found) { | 
| 687 |             if (found > 0) | 
| 688 |                 zero_trail_events (events_, n_events_, found); | 
| 689 |             return found; | 
| 690 |         } | 
| 691 |  | 
| 692 |         //  Adjust timeout or break | 
| 693 |         if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0) | 
| 694 |             break; | 
| 695 |     } | 
| 696 |  | 
| 697 |     errno = EAGAIN; | 
| 698 |     return -1; | 
| 699 |  | 
| 700 | #else | 
| 701 |  | 
| 702 |     //  Exotic platforms that support neither poll() nor select(). | 
| 703 |     errno = ENOTSUP; | 
| 704 |     return -1; | 
| 705 |  | 
| 706 | #endif | 
| 707 | } | 
| 708 |  |