1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "select.hpp"
32#if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
33
34#if defined ZMQ_HAVE_WINDOWS
35#elif defined ZMQ_HAVE_HPUX
36#include <sys/param.h>
37#include <sys/types.h>
38#include <sys/time.h>
39#elif defined ZMQ_HAVE_OPENVMS
40#include <sys/types.h>
41#include <sys/time.h>
42#elif defined ZMQ_HAVE_VXWORKS
43#include <sys/types.h>
44#include <sys/time.h>
45#include <strings.h>
46#else
47#include <sys/select.h>
48#endif
49
50#include "err.hpp"
51#include "config.hpp"
52#include "i_poll_events.hpp"
53
54#include <algorithm>
55#include <limits>
56#include <climits>
57
58zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
59 worker_poller_base_t (ctx_),
60#if defined ZMQ_HAVE_WINDOWS
61 // Fine as long as map is not cleared.
62 _current_family_entry_it (_family_entries.end ())
63#else
64 _max_fd (retired_fd)
65#endif
66{
67#if defined ZMQ_HAVE_WINDOWS
68 for (size_t i = 0; i < fd_family_cache_size; ++i)
69 _fd_family_cache[i] = std::make_pair (retired_fd, 0);
70#endif
71}
72
73zmq::select_t::~select_t ()
74{
75 stop_worker ();
76}
77
78zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
79{
80 check_thread ();
81 zmq_assert (fd_ != retired_fd);
82
83 fd_entry_t fd_entry;
84 fd_entry.fd = fd_;
85 fd_entry.events = events_;
86
87#if defined ZMQ_HAVE_WINDOWS
88 u_short family = get_fd_family (fd_);
89 wsa_assert (family != AF_UNSPEC);
90 family_entry_t &family_entry = _family_entries[family];
91#else
92 family_entry_t &family_entry = _family_entry;
93#endif
94 family_entry.fd_entries.push_back (fd_entry);
95 FD_SET (fd_, &family_entry.fds_set.error);
96
97#if !defined ZMQ_HAVE_WINDOWS
98 if (fd_ > _max_fd)
99 _max_fd = fd_;
100#endif
101
102 adjust_load (1);
103
104 return fd_;
105}
106
107zmq::select_t::fd_entries_t::iterator
108zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
109 handle_t handle_)
110{
111 fd_entries_t::iterator fd_entry_it;
112 for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
113 ++fd_entry_it)
114 if (fd_entry_it->fd == handle_)
115 break;
116
117 return fd_entry_it;
118}
119
120void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
121 const fds_set_t &local_fds_set_,
122 int event_count_)
123{
124 // Size is cached to avoid iteration through recently added descriptors.
125 for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
126 i < size && event_count_ > 0; ++i) {
127 // fd_entries_[i] may not be stored, since calls to
128 // in_event/out_event may reallocate the vector
129
130 if (is_retired_fd (fd_entries_[i]))
131 continue;
132
133 if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
134 fd_entries_[i].events->in_event ();
135 --event_count_;
136 }
137
138 // TODO: can the is_retired_fd be true at this point? if it
139 // was retired before, we would already have continued, and I
140 // don't see where it might have been modified
141 // And if rc == 0, we can break instead of continuing
142 if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
143 continue;
144
145 if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
146 fd_entries_[i].events->out_event ();
147 --event_count_;
148 }
149
150 // TODO: same as above
151 if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
152 continue;
153
154 if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
155 fd_entries_[i].events->in_event ();
156 --event_count_;
157 }
158 }
159}
160
161#if defined ZMQ_HAVE_WINDOWS
162int zmq::select_t::try_retire_fd_entry (
163 family_entries_t::iterator family_entry_it_, zmq::fd_t &handle_)
164{
165 family_entry_t &family_entry = family_entry_it_->second;
166
167 fd_entries_t::iterator fd_entry_it =
168 find_fd_entry_by_handle (family_entry.fd_entries, handle_);
169
170 if (fd_entry_it == family_entry.fd_entries.end ())
171 return 0;
172
173 fd_entry_t &fd_entry = *fd_entry_it;
174 zmq_assert (fd_entry.fd != retired_fd);
175
176 if (family_entry_it_ != _current_family_entry_it) {
177 // Family is not currently being iterated and can be safely
178 // modified in-place. So later it can be skipped without
179 // re-verifying its content.
180 family_entry.fd_entries.erase (fd_entry_it);
181 } else {
182 // Otherwise mark removed entries as retired. It will be cleaned up
183 // at the end of the iteration. See zmq::select_t::loop
184 fd_entry.fd = retired_fd;
185 family_entry.has_retired = true;
186 }
187 family_entry.fds_set.remove_fd (handle_);
188 return 1;
189}
190#endif
191
192void zmq::select_t::rm_fd (handle_t handle_)
193{
194 check_thread ();
195 int retired = 0;
196#if defined ZMQ_HAVE_WINDOWS
197 u_short family = get_fd_family (handle_);
198 if (family != AF_UNSPEC) {
199 family_entries_t::iterator family_entry_it =
200 _family_entries.find (family);
201
202 retired += try_retire_fd_entry (family_entry_it, handle_);
203 } else {
204 // get_fd_family may fail and return AF_UNSPEC if the socket was not
205 // successfully connected. In that case, we need to look for the
206 // socket in all family_entries.
207 family_entries_t::iterator end = _family_entries.end ();
208 for (family_entries_t::iterator family_entry_it =
209 _family_entries.begin ();
210 family_entry_it != end; ++family_entry_it) {
211 if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
212 break;
213 }
214 }
215 }
216#else
217 fd_entries_t::iterator fd_entry_it =
218 find_fd_entry_by_handle (_family_entry.fd_entries, handle_);
219 assert (fd_entry_it != _family_entry.fd_entries.end ());
220
221 zmq_assert (fd_entry_it->fd != retired_fd);
222 fd_entry_it->fd = retired_fd;
223 _family_entry.fds_set.remove_fd (handle_);
224
225 ++retired;
226
227 if (handle_ == _max_fd) {
228 _max_fd = retired_fd;
229 for (fd_entry_it = _family_entry.fd_entries.begin ();
230 fd_entry_it != _family_entry.fd_entries.end (); ++fd_entry_it)
231 if (fd_entry_it->fd > _max_fd)
232 _max_fd = fd_entry_it->fd;
233 }
234
235 _family_entry.has_retired = true;
236#endif
237 zmq_assert (retired == 1);
238 adjust_load (-1);
239}
240
241void zmq::select_t::set_pollin (handle_t handle_)
242{
243 check_thread ();
244#if defined ZMQ_HAVE_WINDOWS
245 u_short family = get_fd_family (handle_);
246 wsa_assert (family != AF_UNSPEC);
247 family_entry_t &family_entry = _family_entries[family];
248#else
249 family_entry_t &family_entry = _family_entry;
250#endif
251 FD_SET (handle_, &family_entry.fds_set.read);
252}
253
254void zmq::select_t::reset_pollin (handle_t handle_)
255{
256 check_thread ();
257#if defined ZMQ_HAVE_WINDOWS
258 u_short family = get_fd_family (handle_);
259 wsa_assert (family != AF_UNSPEC);
260 family_entry_t &family_entry = _family_entries[family];
261#else
262 family_entry_t &family_entry = _family_entry;
263#endif
264 FD_CLR (handle_, &family_entry.fds_set.read);
265}
266
267void zmq::select_t::set_pollout (handle_t handle_)
268{
269 check_thread ();
270#if defined ZMQ_HAVE_WINDOWS
271 u_short family = get_fd_family (handle_);
272 wsa_assert (family != AF_UNSPEC);
273 family_entry_t &family_entry = _family_entries[family];
274#else
275 family_entry_t &family_entry = _family_entry;
276#endif
277 FD_SET (handle_, &family_entry.fds_set.write);
278}
279
280void zmq::select_t::reset_pollout (handle_t handle_)
281{
282 check_thread ();
283#if defined ZMQ_HAVE_WINDOWS
284 u_short family = get_fd_family (handle_);
285 wsa_assert (family != AF_UNSPEC);
286 family_entry_t &family_entry = _family_entries[family];
287#else
288 family_entry_t &family_entry = _family_entry;
289#endif
290 FD_CLR (handle_, &family_entry.fds_set.write);
291}
292
293void zmq::select_t::stop ()
294{
295 check_thread ();
296 // no-op... thread is stopped when no more fds or timers are registered
297}
298
299int zmq::select_t::max_fds ()
300{
301 return FD_SETSIZE;
302}
303
304void zmq::select_t::loop ()
305{
306 while (true) {
307 // Execute any due timers.
308 int timeout = static_cast<int> (execute_timers ());
309
310 cleanup_retired ();
311
312#ifdef _WIN32
313 if (_family_entries.empty ()) {
314#else
315 if (_family_entry.fd_entries.empty ()) {
316#endif
317 zmq_assert (get_load () == 0);
318
319 if (timeout == 0)
320 break;
321
322 // TODO sleep for timeout
323 continue;
324 }
325
326#if defined ZMQ_HAVE_OSX
327 struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
328#else
329 struct timeval tv = {static_cast<long> (timeout / 1000),
330 static_cast<long> (timeout % 1000 * 1000)};
331#endif
332
333#if defined ZMQ_HAVE_WINDOWS
334 /*
335 On Windows select does not allow to mix descriptors from different
336 service providers. It seems to work for AF_INET and AF_INET6,
337 but fails for AF_INET and VMCI. The workaround is to use
338 WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
339 select to find out what actually changed. WSAWaitForMultipleEvents
340 cannot be used alone, because it does not support more than 64 events
341 which is not enough.
342
343 To reduce unnecessary overhead, WSA is only used when there are more
344 than one family. Moreover, AF_INET and AF_INET6 are considered the same
345 family because Windows seems to handle them properly.
346 See get_fd_family for details.
347 */
348
349 // If there is just one family, there is no reason to use WSA events.
350 int rc = 0;
351 const bool use_wsa_events = _family_entries.size () > 1;
352 if (use_wsa_events) {
353 // TODO: I don't really understand why we are doing this. If any of
354 // the events was signaled, we will call select for each fd_family
355 // afterwards. The only benefit is if none of the events was
356 // signaled, then we continue early.
357 // IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
358 // should be used, but not both
359
360 wsa_events_t wsa_events;
361
362 for (family_entries_t::iterator family_entry_it =
363 _family_entries.begin ();
364 family_entry_it != _family_entries.end (); ++family_entry_it) {
365 family_entry_t &family_entry = family_entry_it->second;
366
367 for (fd_entries_t::iterator fd_entry_it =
368 family_entry.fd_entries.begin ();
369 fd_entry_it != family_entry.fd_entries.end ();
370 ++fd_entry_it) {
371 fd_t fd = fd_entry_it->fd;
372
373 // http://stackoverflow.com/q/35043420/188530
374 if (FD_ISSET (fd, &family_entry.fds_set.read)
375 && FD_ISSET (fd, &family_entry.fds_set.write))
376 rc = WSAEventSelect (fd, wsa_events.events[3],
377 FD_READ | FD_ACCEPT | FD_CLOSE
378 | FD_WRITE | FD_CONNECT);
379 else if (FD_ISSET (fd, &family_entry.fds_set.read))
380 rc = WSAEventSelect (fd, wsa_events.events[0],
381 FD_READ | FD_ACCEPT | FD_CLOSE);
382 else if (FD_ISSET (fd, &family_entry.fds_set.write))
383 rc = WSAEventSelect (fd, wsa_events.events[1],
384 FD_WRITE | FD_CONNECT);
385 else
386 rc = 0;
387
388 wsa_assert (rc != SOCKET_ERROR);
389 }
390 }
391
392 rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
393 timeout ? timeout : INFINITE, FALSE);
394 wsa_assert (rc != (int) WSA_WAIT_FAILED);
395 zmq_assert (rc != WSA_WAIT_IO_COMPLETION);
396
397 if (rc == WSA_WAIT_TIMEOUT)
398 continue;
399 }
400
401 for (_current_family_entry_it = _family_entries.begin ();
402 _current_family_entry_it != _family_entries.end ();
403 ++_current_family_entry_it) {
404 family_entry_t &family_entry = _current_family_entry_it->second;
405
406
407 if (use_wsa_events) {
408 // There is no reason to wait again after WSAWaitForMultipleEvents.
409 // Simply collect what is ready.
410 struct timeval tv_nodelay = {0, 0};
411 select_family_entry (family_entry, 0, true, tv_nodelay);
412 } else {
413 select_family_entry (family_entry, 0, timeout > 0, tv);
414 }
415 }
416#else
417 select_family_entry (_family_entry, _max_fd + 1, timeout > 0, tv);
418#endif
419 }
420}
421
422void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
423 const int max_fd_,
424 const bool use_timeout_,
425 struct timeval &tv_)
426{
427 // select will fail when run with empty sets.
428 fd_entries_t &fd_entries = family_entry_.fd_entries;
429 if (fd_entries.empty ())
430 return;
431
432 fds_set_t local_fds_set = family_entry_.fds_set;
433 int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
434 &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
435
436#if defined ZMQ_HAVE_WINDOWS
437 wsa_assert (rc != SOCKET_ERROR);
438#else
439 if (rc == -1) {
440 errno_assert (errno == EINTR);
441 return;
442 }
443#endif
444
445 trigger_events (fd_entries, local_fds_set, rc);
446
447 cleanup_retired (family_entry_);
448}
449
450zmq::select_t::fds_set_t::fds_set_t ()
451{
452 FD_ZERO (&read);
453 FD_ZERO (&write);
454 FD_ZERO (&error);
455}
456
457zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
458{
459#if defined ZMQ_HAVE_WINDOWS
460 // On Windows we don't need to copy the whole fd_set.
461 // SOCKETS are continuous from the beginning of fd_array in fd_set.
462 // We just need to copy fd_count elements of fd_array.
463 // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
464 memcpy (&read, &other_.read,
465 (char *) (other_.read.fd_array + other_.read.fd_count)
466 - (char *) &other_.read);
467 memcpy (&write, &other_.write,
468 (char *) (other_.write.fd_array + other_.write.fd_count)
469 - (char *) &other_.write);
470 memcpy (&error, &other_.error,
471 (char *) (other_.error.fd_array + other_.error.fd_count)
472 - (char *) &other_.error);
473#else
474 memcpy (&read, &other_.read, sizeof other_.read);
475 memcpy (&write, &other_.write, sizeof other_.write);
476 memcpy (&error, &other_.error, sizeof other_.error);
477#endif
478}
479
480zmq::select_t::fds_set_t &zmq::select_t::fds_set_t::
481operator= (const fds_set_t &other_)
482{
483#if defined ZMQ_HAVE_WINDOWS
484 // On Windows we don't need to copy the whole fd_set.
485 // SOCKETS are continuous from the beginning of fd_array in fd_set.
486 // We just need to copy fd_count elements of fd_array.
487 // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
488 memcpy (&read, &other_.read,
489 (char *) (other_.read.fd_array + other_.read.fd_count)
490 - (char *) &other_.read);
491 memcpy (&write, &other_.write,
492 (char *) (other_.write.fd_array + other_.write.fd_count)
493 - (char *) &other_.write);
494 memcpy (&error, &other_.error,
495 (char *) (other_.error.fd_array + other_.error.fd_count)
496 - (char *) &other_.error);
497#else
498 memcpy (&read, &other_.read, sizeof other_.read);
499 memcpy (&write, &other_.write, sizeof other_.write);
500 memcpy (&error, &other_.error, sizeof other_.error);
501#endif
502 return *this;
503}
504
505void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
506{
507 FD_CLR (fd_, &read);
508 FD_CLR (fd_, &write);
509 FD_CLR (fd_, &error);
510}
511
512bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
513{
514 if (family_entry_.has_retired) {
515 family_entry_.has_retired = false;
516 family_entry_.fd_entries.erase (
517 std::remove_if (family_entry_.fd_entries.begin (),
518 family_entry_.fd_entries.end (), is_retired_fd),
519 family_entry_.fd_entries.end ());
520 }
521 return family_entry_.fd_entries.empty ();
522}
523
524void zmq::select_t::cleanup_retired ()
525{
526#ifdef _WIN32
527 for (family_entries_t::iterator it = _family_entries.begin ();
528 it != _family_entries.end ();) {
529 if (cleanup_retired (it->second))
530 it = _family_entries.erase (it);
531 else
532 ++it;
533 }
534#else
535 cleanup_retired (_family_entry);
536#endif
537}
538
539bool zmq::select_t::is_retired_fd (const fd_entry_t &entry_)
540{
541 return entry_.fd == retired_fd;
542}
543
544zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
545{
546}
547
548
549#if defined ZMQ_HAVE_WINDOWS
550u_short zmq::select_t::get_fd_family (fd_t fd_)
551{
552 // cache the results of determine_fd_family, as this is frequently called
553 // for the same sockets, and determine_fd_family is expensive
554 size_t i;
555 for (i = 0; i < fd_family_cache_size; ++i) {
556 const std::pair<fd_t, u_short> &entry = _fd_family_cache[i];
557 if (entry.first == fd_) {
558 return entry.second;
559 }
560 if (entry.first == retired_fd)
561 break;
562 }
563
564 std::pair<fd_t, u_short> res =
565 std::make_pair (fd_, determine_fd_family (fd_));
566 if (i < fd_family_cache_size) {
567 _fd_family_cache[i] = res;
568 } else {
569 // just overwrite a random entry
570 // could be optimized by some LRU strategy
571 _fd_family_cache[rand () % fd_family_cache_size] = res;
572 }
573
574 return res.second;
575}
576
577u_short zmq::select_t::determine_fd_family (fd_t fd_)
578{
579 // Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
580 sockaddr_storage addr = {0};
581 int addr_size = sizeof addr;
582
583 int type;
584 int type_length = sizeof (int);
585
586 int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
587 reinterpret_cast<char *> (&type), &type_length);
588
589 if (rc == 0) {
590 if (type == SOCK_DGRAM)
591 return AF_INET;
592
593 rc =
594 getsockname (fd_, reinterpret_cast<sockaddr *> (&addr), &addr_size);
595
596 // AF_INET and AF_INET6 can be mixed in select
597 // TODO: If proven otherwise, should simply return addr.sa_family
598 if (rc != SOCKET_ERROR)
599 return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
600 }
601
602 return AF_UNSPEC;
603}
604
605zmq::select_t::wsa_events_t::wsa_events_t ()
606{
607 events[0] = WSACreateEvent ();
608 wsa_assert (events[0] != WSA_INVALID_EVENT);
609 events[1] = WSACreateEvent ();
610 wsa_assert (events[1] != WSA_INVALID_EVENT);
611 events[2] = WSACreateEvent ();
612 wsa_assert (events[2] != WSA_INVALID_EVENT);
613 events[3] = WSACreateEvent ();
614 wsa_assert (events[3] != WSA_INVALID_EVENT);
615}
616
617zmq::select_t::wsa_events_t::~wsa_events_t ()
618{
619 wsa_assert (WSACloseEvent (events[0]));
620 wsa_assert (WSACloseEvent (events[1]));
621 wsa_assert (WSACloseEvent (events[2]));
622 wsa_assert (WSACloseEvent (events[3]));
623}
624#endif
625
626#endif
627