1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "socket_poller.hpp"
32#include "err.hpp"
33#include "polling_util.hpp"
34#include "macros.hpp"
35
36#include <limits.h>
37
38static bool is_thread_safe (zmq::socket_base_t &socket_)
39{
40 // do not use getsockopt here, since that would fail during context termination
41 return socket_.is_thread_safe ();
42}
43
44zmq::socket_poller_t::socket_poller_t () :
45 _tag (0xCAFEBABE),
46 _signaler (NULL)
47#if defined ZMQ_POLL_BASED_ON_POLL
48 ,
49 _pollfds (NULL)
50#elif defined ZMQ_POLL_BASED_ON_SELECT
51 ,
52 _max_fd (0)
53#endif
54{
55 rebuild ();
56}
57
58zmq::socket_poller_t::~socket_poller_t ()
59{
60 // Mark the socket_poller as dead
61 _tag = 0xdeadbeef;
62
63 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
64 ++it) {
65 // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
66 if (it->socket && it->socket->check_tag ()
67 && is_thread_safe (*it->socket)) {
68 it->socket->remove_signaler (_signaler);
69 }
70 }
71
72 if (_signaler != NULL) {
73 LIBZMQ_DELETE (_signaler);
74 }
75
76#if defined ZMQ_POLL_BASED_ON_POLL
77 if (_pollfds) {
78 free (_pollfds);
79 _pollfds = NULL;
80 }
81#endif
82}
83
84bool zmq::socket_poller_t::check_tag ()
85{
86 return _tag == 0xCAFEBABE;
87}
88
89int zmq::socket_poller_t::signaler_fd (fd_t *fd_)
90{
91 if (_signaler) {
92 *fd_ = _signaler->get_fd ();
93 return 0;
94 }
95 // Only thread-safe socket types are guaranteed to have a signaler.
96 errno = EINVAL;
97 return -1;
98}
99
100int zmq::socket_poller_t::add (socket_base_t *socket_,
101 void *user_data_,
102 short events_)
103{
104 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
105 ++it) {
106 if (it->socket == socket_) {
107 errno = EINVAL;
108 return -1;
109 }
110 }
111
112 if (is_thread_safe (*socket_)) {
113 if (_signaler == NULL) {
114 _signaler = new (std::nothrow) signaler_t ();
115 if (!_signaler) {
116 errno = ENOMEM;
117 return -1;
118 }
119 if (!_signaler->valid ()) {
120 delete _signaler;
121 _signaler = NULL;
122 errno = EMFILE;
123 return -1;
124 }
125 }
126
127 socket_->add_signaler (_signaler);
128 }
129
130 item_t item = {
131 socket_,
132 0,
133 user_data_,
134 events_
135#if defined ZMQ_POLL_BASED_ON_POLL
136 ,
137 -1
138#endif
139 };
140 try {
141 _items.push_back (item);
142 }
143 catch (const std::bad_alloc &) {
144 errno = ENOMEM;
145 return -1;
146 }
147 _need_rebuild = true;
148
149 return 0;
150}
151
152int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
153{
154 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
155 ++it) {
156 if (!it->socket && it->fd == fd_) {
157 errno = EINVAL;
158 return -1;
159 }
160 }
161
162 item_t item = {
163 NULL,
164 fd_,
165 user_data_,
166 events_
167#if defined ZMQ_POLL_BASED_ON_POLL
168 ,
169 -1
170#endif
171 };
172 try {
173 _items.push_back (item);
174 }
175 catch (const std::bad_alloc &) {
176 errno = ENOMEM;
177 return -1;
178 }
179 _need_rebuild = true;
180
181 return 0;
182}
183
184int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
185{
186 const items_t::iterator end = _items.end ();
187 items_t::iterator it;
188
189 for (it = _items.begin (); it != end; ++it) {
190 if (it->socket == socket_)
191 break;
192 }
193
194 if (it == end) {
195 errno = EINVAL;
196 return -1;
197 }
198
199 it->events = events_;
200 _need_rebuild = true;
201
202 return 0;
203}
204
205
206int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
207{
208 const items_t::iterator end = _items.end ();
209 items_t::iterator it;
210
211 for (it = _items.begin (); it != end; ++it) {
212 if (!it->socket && it->fd == fd_)
213 break;
214 }
215
216 if (it == end) {
217 errno = EINVAL;
218 return -1;
219 }
220
221 it->events = events_;
222 _need_rebuild = true;
223
224 return 0;
225}
226
227
228int zmq::socket_poller_t::remove (socket_base_t *socket_)
229{
230 const items_t::iterator end = _items.end ();
231 items_t::iterator it;
232
233 for (it = _items.begin (); it != end; ++it) {
234 if (it->socket == socket_)
235 break;
236 }
237
238 if (it == end) {
239 errno = EINVAL;
240 return -1;
241 }
242
243 _items.erase (it);
244 _need_rebuild = true;
245
246 if (is_thread_safe (*socket_)) {
247 socket_->remove_signaler (_signaler);
248 }
249
250 return 0;
251}
252
253int zmq::socket_poller_t::remove_fd (fd_t fd_)
254{
255 const items_t::iterator end = _items.end ();
256 items_t::iterator it;
257
258 for (it = _items.begin (); it != end; ++it) {
259 if (!it->socket && it->fd == fd_)
260 break;
261 }
262
263 if (it == end) {
264 errno = EINVAL;
265 return -1;
266 }
267
268 _items.erase (it);
269 _need_rebuild = true;
270
271 return 0;
272}
273
274int zmq::socket_poller_t::rebuild ()
275{
276 _use_signaler = false;
277 _pollset_size = 0;
278 _need_rebuild = false;
279
280#if defined ZMQ_POLL_BASED_ON_POLL
281
282 if (_pollfds) {
283 free (_pollfds);
284 _pollfds = NULL;
285 }
286
287 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
288 ++it) {
289 if (it->events) {
290 if (it->socket && is_thread_safe (*it->socket)) {
291 if (!_use_signaler) {
292 _use_signaler = true;
293 _pollset_size++;
294 }
295 } else
296 _pollset_size++;
297 }
298 }
299
300 if (_pollset_size == 0)
301 return 0;
302
303 _pollfds = static_cast<pollfd *> (malloc (_pollset_size * sizeof (pollfd)));
304
305 if (!_pollfds) {
306 errno = ENOMEM;
307 _need_rebuild = true;
308 return -1;
309 }
310
311 int item_nbr = 0;
312
313 if (_use_signaler) {
314 item_nbr = 1;
315 _pollfds[0].fd = _signaler->get_fd ();
316 _pollfds[0].events = POLLIN;
317 }
318
319 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
320 ++it) {
321 if (it->events) {
322 if (it->socket) {
323 if (!is_thread_safe (*it->socket)) {
324 size_t fd_size = sizeof (zmq::fd_t);
325 int rc = it->socket->getsockopt (
326 ZMQ_FD, &_pollfds[item_nbr].fd, &fd_size);
327 zmq_assert (rc == 0);
328
329 _pollfds[item_nbr].events = POLLIN;
330 item_nbr++;
331 }
332 } else {
333 _pollfds[item_nbr].fd = it->fd;
334 _pollfds[item_nbr].events =
335 (it->events & ZMQ_POLLIN ? POLLIN : 0)
336 | (it->events & ZMQ_POLLOUT ? POLLOUT : 0)
337 | (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
338 it->pollfd_index = item_nbr;
339 item_nbr++;
340 }
341 }
342 }
343
344#elif defined ZMQ_POLL_BASED_ON_SELECT
345
346 // Ensure we do not attempt to select () on more than FD_SETSIZE
347 // file descriptors.
348 zmq_assert (_items.size () <= FD_SETSIZE);
349
350 _pollset_in.resize (_items.size ());
351 _pollset_out.resize (_items.size ());
352 _pollset_err.resize (_items.size ());
353
354 FD_ZERO (_pollset_in.get ());
355 FD_ZERO (_pollset_out.get ());
356 FD_ZERO (_pollset_err.get ());
357
358 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
359 ++it) {
360 if (it->socket && is_thread_safe (*it->socket) && it->events) {
361 _use_signaler = true;
362 FD_SET (_signaler->get_fd (), _pollset_in.get ());
363 _pollset_size = 1;
364 break;
365 }
366 }
367
368 _max_fd = 0;
369
370 // Build the fd_sets for passing to select ().
371 for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
372 ++it) {
373 if (it->events) {
374 // If the poll item is a 0MQ socket we are interested in input on the
375 // notification file descriptor retrieved by the ZMQ_FD socket option.
376 if (it->socket) {
377 if (!is_thread_safe (*it->socket)) {
378 zmq::fd_t notify_fd;
379 size_t fd_size = sizeof (zmq::fd_t);
380 int rc =
381 it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
382 zmq_assert (rc == 0);
383
384 FD_SET (notify_fd, _pollset_in.get ());
385 if (_max_fd < notify_fd)
386 _max_fd = notify_fd;
387
388 _pollset_size++;
389 }
390 }
391 // Else, the poll item is a raw file descriptor. Convert the poll item
392 // events to the appropriate fd_sets.
393 else {
394 if (it->events & ZMQ_POLLIN)
395 FD_SET (it->fd, _pollset_in.get ());
396 if (it->events & ZMQ_POLLOUT)
397 FD_SET (it->fd, _pollset_out.get ());
398 if (it->events & ZMQ_POLLERR)
399 FD_SET (it->fd, _pollset_err.get ());
400 if (_max_fd < it->fd)
401 _max_fd = it->fd;
402
403 _pollset_size++;
404 }
405 }
406 }
407
408#endif
409
410 return 0;
411}
412
413void zmq::socket_poller_t::zero_trail_events (
414 zmq::socket_poller_t::event_t *events_, int n_events_, int found_)
415{
416 for (int i = found_; i < n_events_; ++i) {
417 events_[i].socket = NULL;
418 events_[i].fd = 0;
419 events_[i].user_data = NULL;
420 events_[i].events = 0;
421 }
422}
423
424#if defined ZMQ_POLL_BASED_ON_POLL
425int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
426 int n_events_)
427#elif defined ZMQ_POLL_BASED_ON_SELECT
428int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
429 int n_events_,
430 fd_set &inset_,
431 fd_set &outset_,
432 fd_set &errset_)
433#endif
434{
435 int found = 0;
436 for (items_t::iterator it = _items.begin (), end = _items.end ();
437 it != end && found < n_events_; ++it) {
438 // The poll item is a 0MQ socket. Retrieve pending events
439 // using the ZMQ_EVENTS socket option.
440 if (it->socket) {
441 size_t events_size = sizeof (uint32_t);
442 uint32_t events;
443 if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size)
444 == -1) {
445 return -1;
446 }
447
448 if (it->events & events) {
449 events_[found].socket = it->socket;
450 events_[found].user_data = it->user_data;
451 events_[found].events = it->events & events;
452 ++found;
453 }
454 }
455 // Else, the poll item is a raw file descriptor, simply convert
456 // the events to zmq_pollitem_t-style format.
457 else {
458#if defined ZMQ_POLL_BASED_ON_POLL
459
460 short revents = _pollfds[it->pollfd_index].revents;
461 short events = 0;
462
463 if (revents & POLLIN)
464 events |= ZMQ_POLLIN;
465 if (revents & POLLOUT)
466 events |= ZMQ_POLLOUT;
467 if (revents & POLLPRI)
468 events |= ZMQ_POLLPRI;
469 if (revents & ~(POLLIN | POLLOUT | POLLPRI))
470 events |= ZMQ_POLLERR;
471
472#elif defined ZMQ_POLL_BASED_ON_SELECT
473
474 short events = 0;
475
476 if (FD_ISSET (it->fd, &inset_))
477 events |= ZMQ_POLLIN;
478 if (FD_ISSET (it->fd, &outset_))
479 events |= ZMQ_POLLOUT;
480 if (FD_ISSET (it->fd, &errset_))
481 events |= ZMQ_POLLERR;
482#endif //POLL_SELECT
483
484 if (events) {
485 events_[found].socket = NULL;
486 events_[found].user_data = it->user_data;
487 events_[found].fd = it->fd;
488 events_[found].events = events;
489 ++found;
490 }
491 }
492 }
493
494 return found;
495}
496
497//Return 0 if timeout is expired otherwise 1
498int zmq::socket_poller_t::adjust_timeout (zmq::clock_t &clock_,
499 long timeout_,
500 uint64_t &now_,
501 uint64_t &end_,
502 bool &first_pass_)
503{
504 // If socket_poller_t::timeout is zero, exit immediately whether there
505 // are events or not.
506 if (timeout_ == 0)
507 return 0;
508
509 // At this point we are meant to wait for events but there are none.
510 // If timeout is infinite we can just loop until we get some events.
511 if (timeout_ < 0) {
512 if (first_pass_)
513 first_pass_ = false;
514 return 1;
515 }
516
517 // The timeout is finite and there are no events. In the first pass
518 // we get a timestamp of when the polling have begun. (We assume that
519 // first pass have taken negligible time). We also compute the time
520 // when the polling should time out.
521 now_ = clock_.now_ms ();
522 if (first_pass_) {
523 end_ = now_ + timeout_;
524 first_pass_ = false;
525 return 1;
526 }
527
528 // Find out whether timeout have expired.
529 if (now_ >= end_)
530 return 0;
531
532 return 1;
533}
534
535int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
536 int n_events_,
537 long timeout_)
538{
539 if (_items.empty () && timeout_ < 0) {
540 errno = EFAULT;
541 return -1;
542 }
543
544 if (_need_rebuild) {
545 int rc = rebuild ();
546 if (rc == -1)
547 return -1;
548 }
549
550 if (unlikely (_pollset_size == 0)) {
551 // We'll report an error (timed out) as if the list was non-empty and
552 // no event occurred within the specified timeout. Otherwise the caller
553 // needs to check the return value AND the event to avoid using the
554 // nullified event data.
555 errno = EAGAIN;
556 if (timeout_ == 0)
557 return -1;
558#if defined ZMQ_HAVE_WINDOWS
559 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
560 return -1;
561#elif defined ZMQ_HAVE_ANDROID
562 usleep (timeout_ * 1000);
563 return -1;
564#elif defined ZMQ_HAVE_OSX
565 usleep (timeout_ * 1000);
566 errno = EAGAIN;
567 return -1;
568#elif defined ZMQ_HAVE_VXWORKS
569 struct timespec ns_;
570 ns_.tv_sec = timeout_ / 1000;
571 ns_.tv_nsec = timeout_ % 1000 * 1000000;
572 nanosleep (&ns_, 0);
573 return -1;
574#else
575 usleep (timeout_ * 1000);
576 return -1;
577#endif
578 }
579
580#if defined ZMQ_POLL_BASED_ON_POLL
581 zmq::clock_t clock;
582 uint64_t now = 0;
583 uint64_t end = 0;
584
585 bool first_pass = true;
586
587 while (true) {
588 // Compute the timeout for the subsequent poll.
589 int timeout;
590 if (first_pass)
591 timeout = 0;
592 else if (timeout_ < 0)
593 timeout = -1;
594 else
595 timeout =
596 static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
597
598 // Wait for events.
599 while (true) {
600 int rc = poll (_pollfds, _pollset_size, timeout);
601 if (rc == -1 && errno == EINTR) {
602 return -1;
603 }
604 errno_assert (rc >= 0);
605 break;
606 }
607
608 // Receive the signal from pollfd
609 if (_use_signaler && _pollfds[0].revents & POLLIN)
610 _signaler->recv ();
611
612 // Check for the events.
613 int found = check_events (events_, n_events_);
614 if (found) {
615 if (found > 0)
616 zero_trail_events (events_, n_events_, found);
617 return found;
618 }
619
620 // Adjust timeout or break
621 if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
622 break;
623 }
624 errno = EAGAIN;
625 return -1;
626
627#elif defined ZMQ_POLL_BASED_ON_SELECT
628
629 zmq::clock_t clock;
630 uint64_t now = 0;
631 uint64_t end = 0;
632
633 bool first_pass = true;
634
635 optimized_fd_set_t inset (_pollset_size);
636 optimized_fd_set_t outset (_pollset_size);
637 optimized_fd_set_t errset (_pollset_size);
638
639 while (true) {
640 // Compute the timeout for the subsequent poll.
641 timeval timeout;
642 timeval *ptimeout;
643 if (first_pass) {
644 timeout.tv_sec = 0;
645 timeout.tv_usec = 0;
646 ptimeout = &timeout;
647 } else if (timeout_ < 0)
648 ptimeout = NULL;
649 else {
650 timeout.tv_sec = static_cast<long> ((end - now) / 1000);
651 timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
652 ptimeout = &timeout;
653 }
654
655 // Wait for events. Ignore interrupts if there's infinite timeout.
656 while (true) {
657 memcpy (inset.get (), _pollset_in.get (),
658 valid_pollset_bytes (*_pollset_in.get ()));
659 memcpy (outset.get (), _pollset_out.get (),
660 valid_pollset_bytes (*_pollset_out.get ()));
661 memcpy (errset.get (), _pollset_err.get (),
662 valid_pollset_bytes (*_pollset_err.get ()));
663 const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
664 outset.get (), errset.get (), ptimeout);
665#if defined ZMQ_HAVE_WINDOWS
666 if (unlikely (rc == SOCKET_ERROR)) {
667 errno = wsa_error_to_errno (WSAGetLastError ());
668 wsa_assert (errno == ENOTSOCK);
669 return -1;
670 }
671#else
672 if (unlikely (rc == -1)) {
673 errno_assert (errno == EINTR || errno == EBADF);
674 return -1;
675 }
676#endif
677 break;
678 }
679
680 if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
681 _signaler->recv ();
682
683 // Check for the events.
684 const int found = check_events (events_, n_events_, *inset.get (),
685 *outset.get (), *errset.get ());
686 if (found) {
687 if (found > 0)
688 zero_trail_events (events_, n_events_, found);
689 return found;
690 }
691
692 // Adjust timeout or break
693 if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
694 break;
695 }
696
697 errno = EAGAIN;
698 return -1;
699
700#else
701
702 // Exotic platforms that support neither poll() nor select().
703 errno = ENOTSUP;
704 return -1;
705
706#endif
707}
708